Core and Enterprise changefeeds offer different levels of configurability. Enterprise changefeeds allow for active changefeed jobs to be paused, resumed, and canceled.
This page describes:
- Guidelines to consider before creating a changefeed.
- Reference examples for creating and managing a changefeed.
Before you create a changefeed
- Enable rangefeeds on CockroachDB Advanced and CockroachDB self-hosted. Refer to Enable rangefeeds for instructions.
- Decide on whether you will run an Enterprise or basic changefeed. Refer to the Overview page for a comparative capability table.
- Plan the number of changefeeds versus the number of tables to include in a single changefeed for your cluster. We recommend limiting the number of changefeeds per cluster to 80. Refer to System resources and running changefeeds and Recommendations for the number of target tables.
- Consider whether your Enterprise changefeed use case would be better served by change data capture queries that can filter data on a single table. CDC queries can improve the efficiency of changefeeds because the job will not need to encode as much change data.
- Read the following:
- The Changefeed Best Practices reference for details on planning changefeeds, monitoring basics, and schema changes.
- The Considerations section that provides information on changefeed interactions that could affect how you configure or run your changefeed.
Enable rangefeeds
Changefeeds connect to a long-lived request called a rangefeed, which pushes changes as they happen. This reduces the latency of row changes, as well as reduces transaction restarts on tables being watched by a changefeed for some workloads.
Rangefeeds must be enabled for a changefeed to work. To enable the cluster setting:
SET CLUSTER SETTING kv.rangefeed.enabled = true;
Any created changefeeds will error until this setting is enabled. If you are working on a CockroachDB Serverless cluster, the kv.rangefeed.enabled
cluster setting is enabled by default.
Enabling rangefeeds has a small performance cost (about a 5–10% increase in write latencies), whether or not the rangefeed is being used in a changefeed. When kv.rangefeed.enabled
is set to true
, a small portion of the latency cost is caused by additional write event information that is sent to the Raft log and for replication. The remainder of the latency cost is incurred once a changefeed is running; the write event information is reconstructed and sent to an active rangefeed, which will push the event to the changefeed.
For further detail on performance-related configuration, refer to the Advanced Changefeed Confguration page.
MuxRangefeed
is a subsystem that improves the performance of rangefeeds with scale, which is enabled by default in v24.1 and later versions.
Considerations
- If you require
resolved
message frequency under30s
, then you must set themin_checkpoint_frequency
option to at least the desiredresolved
frequency. - Many DDL queries (including
TRUNCATE
,DROP TABLE
, and queries that add a column family) will cause errors on a changefeed watching the affected tables. You will need to start a new changefeed. If a table is truncated that a changefeed withon_error='pause'
is watching, you will also need to start a new changefeed. Refer to the change data capture Known Limitations for more detail. - Partial or intermittent sink unavailability may impact changefeed stability. If a sink is unavailable, messages can't send, which means that a changefeed's high-water mark timestamp is at risk of falling behind the cluster's garbage collection window. Throughput and latency can be affected once the sink is available again. However, ordering guarantees will still hold for as long as a changefeed remains active.
- When an
IMPORT INTO
statement is run, any current changefeed jobs targeting that table will fail. - After you restore from a full-cluster backup, changefeed jobs will not resume on the new cluster. It is necessary to manually create the changefeeds following the full-cluster restore.
- As of v22.1, changefeeds filter out
VIRTUAL
computed columns from events by default. This is a backward-incompatible change. To maintain the changefeed behavior in previous versions whereNULL
values are emitted for virtual computed columns, see thevirtual_columns
option for more detail.
The following Enterprise and Core sections outline how to create and configure each type of changefeed:
Configure a changefeed
An Enterprise changefeed streams row-level changes in a configurable format to one of the following sinks:
- Amazon MSK
- Apache Pulsar (in Preview)
- Azure Event Hubs
- Cloud Storage / HTTP
- Confluent Cloud
- Google Cloud Pub/Sub
- Kafka
- Webhook
You can create, pause, resume, and cancel an Enterprise changefeed. For a step-by-step example connecting to a specific sink, see the Changefeed Examples page.
Create
To create an Enterprise changefeed:
CREATE CHANGEFEED FOR TABLE table_name, table_name2 INTO '{scheme}://{host}:{port}?{query_parameters}';
Parameters should always be URI-encoded before they are included the changefeed's URI, as they often contain special characters. Use Javascript's encodeURIComponent function or Go language's url.QueryEscape function to URI-encode the parameters. Other languages provide similar functions to URI-encode special characters.
When you create a changefeed without specifying a sink, CockroachDB sends the changefeed events to the SQL client. Consider the following regarding the display format in your SQL client:
- If you do not define a display format, the CockroachDB SQL client will automatically use
ndjson
format. - If you specify a display format, the client will use that format (e.g.,
--format=csv
). - If you set the client display format to
ndjson
and set the changefeedformat
tocsv
, you'll receive JSON format with CSV nested inside. - If you set the client display format to
csv
and set the changefeedformat
tojson
, you'll receive a comma-separated list of JSON values.
For more information, see CREATE CHANGEFEED
.
Show
To show a list of Enterprise changefeed jobs:
SHOW CHANGEFEED JOBS;
job_id | description | ...
+----------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+ ...
685724608744325121 | CREATE CHANGEFEED FOR TABLE mytable INTO 'kafka://localhost:9092' WITH confluent_schema_registry = 'http://localhost:8081', format = 'avro', resolved, updated | ...
685723987509116929 | CREATE CHANGEFEED FOR TABLE mytable INTO 'kafka://localhost:9092' WITH confluent_schema_registry = 'http://localhost:8081', format = 'avro', resolved, updated | ...
(2 rows)
To show an individual Enterprise changefeed:
SHOW CHANGEFEED JOB {job_id};
job_id | description | user_name | status | running_status | created | started | finished | modified | high_water_timestamp | error | sink_uri | full_table_names | topics | format
---------------------+--------------------------------------------------------------------------------------+-----------+---------+------------------------------------------+----------------------------+----------------------------+----------+----------------------------+--------------------------------+-------+----------------+---------------------+--------+----------
866218332400680961 | CREATE CHANGEFEED FOR TABLE movr.users INTO 'external://aws' WITH format = 'parquet' | root | running | running: resolved=1684438482.937939878,0 | 2023-05-18 14:14:16.323465 | 2023-05-18 14:14:16.360245 | NULL | 2023-05-18 19:35:16.120407 | 1684438482937939878.0000000000 | | external://aws | {movr.public.users} | NULL | parquet
(1 row)
All changefeed jobs will display regardless of if the job completed and when it completed. You can define a retention time and delete completed jobs by using the jobs.retention_time
cluster setting.
You can filter the columns that SHOW CHANGEFEED JOBS
displays using a SELECT
statement:
SELECT job_id, sink_uri, status, format FROM [SHOW CHANGEFEED JOBS] WHERE job_id = 997306743028908033;
job_id | sink_uri | status | format
---------------------+------------------+----------+---------
997306743028908033 | external://kafka | running | json
For more information, refer to SHOW CHANGEFEED JOB
.
Pause
To pause an Enterprise changefeed:
PAUSE JOB job_id;
For more information, refer to PAUSE JOB
.
Resume
To resume a paused Enterprise changefeed:
RESUME JOB job_id;
For more information, refer to RESUME JOB
.
Cancel
To cancel an Enterprise changefeed:
CANCEL JOB job_id;
For more information, refer to CANCEL JOB
.
Modify a changefeed
To modify an Enterprise changefeed, pause the job and then use:
ALTER CHANGEFEED job_id {ADD table DROP table SET option UNSET option};
You can add new table targets, remove them, set new changefeed options, and unset them.
For more information, see ALTER CHANGEFEED
.
Configuring all changefeeds
It is useful to be able to pause all running changefeeds during troubleshooting, testing, or when a decrease in CPU load is needed.
To pause all running changefeeds:
PAUSE JOBS (WITH x AS (SHOW CHANGEFEED JOBS) SELECT job_id FROM x WHERE status = ('running'));
This will change the status for each of the running changefeeds to paused
, which can be verified with SHOW CHANGEFEED JOBS
.
To resume all running changefeeds:
RESUME JOBS (WITH x AS (SHOW CHANGEFEED JOBS) SELECT job_id FROM x WHERE status = ('paused'));
This will resume the changefeeds and update the status for each of the changefeeds to running
.
Create a changefeed
A basic changefeed streams row-level changes to the client indefinitely until the underlying connection is closed or the changefeed is canceled.
To create a basic changefeed:
EXPERIMENTAL CHANGEFEED FOR table_name;
For more information, see EXPERIMENTAL CHANGEFEED FOR
.
Known limitations
- Changefeed target options are limited to tables and column families. #73435
VPC Peering and AWS PrivateLink in CockroachDB Advanced clusters do not support connecting to a Kafka sink's internal IP addresses for changefeeds. To connect to a Kafka sink from CockroachDB Advanced, it is necessary to expose the Kafka cluster's external IP address and open ports with firewall rules to allow access from a CockroachDB Advanced cluster.
Webhook sinks only support HTTPS. Use the
insecure_tls_skip_verify
parameter when testing to disable certificate verification; however, this still requires HTTPS and certificates. #73431Formats for changefeed messages are not supported by all changefeed sinks. Refer to the Changefeed Sinks page for details on compatible formats with each sink and the
format
option to specify a changefeed message format. #73432Using the
split_column_families
andresolved
options on the same changefeed will cause an error when using the following sinks: Kafka and Google Cloud Pub/Sub. Instead, use the individualFAMILY
keyword to specify column families when creating a changefeed. #79452Changefeed types are not fully integrated with user-defined composite types. Running changefeeds with user-defined composite types is in Preview. Certain changefeed types do not support user-defined composite types. Refer to the change data capture Known Limitations for more detail. The following limitations apply:
- A changefeed in Avro format will not be able to serialize user-defined composite (tuple) types. #102903
- A changefeed emitting CSV will include
AS
labels in the message format when the changefeed serializes a user-defined composite type. #102905
ALTER CHANGEFEED
is not fully supported with changefeeds that use CDC queries. You can alter the options that a changefeed uses, but you cannot alter the changefeed target tables. #83033Creating a changefeed with CDC queries on tables with more than one column family is not supported. #127761
When you create a changefeed on a table with more than one column family , the changefeed will emit messages per column family in separate streams. As a result, changefeed messages for different column families will arrive at the sink under separate topics. #127736
Changefeeds created in v24.3 of CockroachDB that emit to Kafka, or changefeeds created in earlier versions with the
changefeed.new_kafka_sink.enabled
cluster setting enabled, do not support negative compression level values forGZIP
compression in thekafka_sink_config = {... "CompressionLevel" = ...}
option field. #136492