Note:

CREATE CHANGEFEED is an enterprise-only feature. For the core version, see EXPERIMENTAL CHANGEFEED FOR.

The CREATE CHANGEFEED statement creates a new enterprise changefeed, which targets an allowlist of tables, called "watched rows". Every change to a watched row is emitted as a record in a configurable format (JSON or Avro) to a configurable sink (Kafka or a cloud storage sink). You can create, pause, resume, or cancel an enterprise changefeed.

For more information, see Stream Data Out of CockroachDB Using Changefeeds.

Required privileges

Changefeeds can only be created by superusers, i.e., members of the admin role. The admin role exists by default with root as the member.

Synopsis

CREATE CHANGEFEED FOR TABLE table_name , INTO sink WITH option = value ,

Parameters

Parameter Description
table_name The name of the table (or tables in a comma separated list) to create a changefeed for.

Note: Changefeeds do not share internal buffers, so each running changefeed will increase total memory usage. To watch multiple tables, we recommend creating a changefeed with a comma-separated list of tables.
sink The location of the configurable sink. The scheme of the URI indicates the type. For more information, see Sink URI below.
option / value For a list of available options and their values, see Options below.

Sink URI

The sink URI follows the basic format of:

'[scheme]://[host]:[port]?[query_parameters]'
URI Component Description
scheme The type of sink: kafka or any cloud storage sink.
host The sink's hostname or IP address.
port The sink's port.
query_parameters The sink's query parameters.

Kafka

Example of a Kafka sink URI:

'kafka://broker.address.com:9092?topic_prefix=bar_&tls_enabled=true&ca_cert=LS0tLS1CRUdJTiBDRVJUSUZ&sasl_enabled=true&sasl_user=petee&sasl_password=bones'

Cloud storage sink

Use a cloud storage sink to deliver changefeed data to OLAP or big data systems without requiring transport via Kafka.

Note:

Currently, cloud storage sinks only work with JSON and emits newline-delimited JSON files.

For more information on the sink URL structure, see Use Cloud Storage for Bulk Operations.

Query parameters

Query parameters include:

Parameter
Sink Type
Description
topic_prefix Kafka, cloud Type: STRING

Adds a prefix to all topic names.

For example, CREATE CHANGEFEED FOR TABLE foo INTO 'kafka://...?topic_prefix=bar_' would emit rows under the topic bar_foo instead of foo.
tls_enabled=true Kafka Type: BOOL

If true, enable Transport Layer Security (TLS) on the connection to Kafka. This can be used with a ca_cert (see below).
ca_cert Kafka Type: STRING

The base64-encoded ca_cert file.

Note: To encode your ca.cert, run base64 -w 0 ca.cert.
client_cert Kafka Type: STRING

The base64-encoded Privacy Enhanced Mail (PEM) certificate. This is used with client_key.
client_key Kafka Type: STRING

The base64-encoded private key for the PEM certificate. This is used with client_cert.
sasl_enabled Kafka Type: BOOL

If true, use SASL/PLAIN to authenticate. This requires a sasl_user and sasl_password (see below).
sasl_user Kafka Type: STRING

Your SASL username.
sasl_password Kafka Type: STRING

Your SASL password.
file_size cloud Type: STRING

The file will be flushed (i.e., written to the sink) when it exceeds the specified file size. This can be used with the WITH resolved option, which flushes on a specified cadence.

Default: 16MB

Options

Option Value Description
updated N/A Include updated timestamps with each row.

If a cursor is provided, the "updated" timestamps will match the MVCC timestamps of the emitted rows, and there is no initial scan. If a cursor is not provided, the changefeed will perform an initial scan (as of the time the changefeed was created), and the "updated" timestamp for each change record emitted in the initial scan will be the timestamp of the initial scan. Similarly, when a backfill is performed for a schema change, the "updated" timestamp is set to the first timestamp for when the new schema is valid.
resolved INTERVAL Periodically emit resolved timestamps to the changefeed. Optionally, set a minimum duration between emitting resolved timestamps. If unspecified, all resolved timestamps are emitted.

Example: resolved='10s'
envelope key_only / wrapped Use key_only to emit only the key and no value, which is faster if you only want to know when the key changes.

Default: envelope=wrapped
cursor Timestamp Emit any changes after the given timestamp, but does not output the current state of the table first. If cursor is not specified, the changefeed starts by doing an initial scan of all the watched rows and emits the current value, then moves to emitting any changes that happen after the scan.

When starting a changefeed at a specific cursor, the cursor cannot be before the configured garbage collection window (see gc.ttlseconds) for the table you're trying to follow; otherwise, the changefeed will error. With default garbage collection settings, this means you cannot create a changefeed that starts more than 25 hours in the past.

cursor can be used to start a new changefeed where a previous changefeed ended.

Example: CURSOR='1536242855577149065.0000000000'
format json / experimental_avro Format of the emitted record. Currently, support for Avro is limited and experimental. For mappings of CockroachDB types to Avro types, see the table below.

Default: format=json.
confluent_schema_registry Schema Registry address The Schema Registry address is required to use experimental_avro.
key_in_value N/A Make the primary key of a deleted row recoverable in sinks where each message has a value but not a key (most have a key and value in each message). key_in_value is automatically used for these sinks (currently only cloud storage sinks).
diff N/A Publish a before field with each message, which includes the value of the row before the update was applied.
compression gzip Compress changefeed data files written to a cloud storage sink. Currently, only Gzip is supported for compression.
protect_data_from_gc_on_pause N/A When a changefeed is paused, ensure that the data needed to resume the changefeed is not garbage collected.

Note: If you use this option, changefeeds left paused can prevent garbage collection for long periods of time.
schema_change_events default / column_changes The type of schema change event that triggers the behavior specified by the schema_change_policy option:
  • default: Include all ADD COLUMN events for columns that have a non-NULL DEFAULT value or are computed, and all DROP COLUMN events.
  • column_changes: Include all all schema change events that add or remove any column.

Default: schema_change_events=default
schema_change_policy backfill / nobackfill / stop The behavior to take when an event specified by the schema_change_events option occurs:
Default: schema_change_policy=backfill
initial_scan / no_initial_scan N/A Control whether or not an initial scan will occur at the start time of a changefeed. initial_scan and no_initial_scan cannot be used simultaneously. If neither initial_scan nor no_initial_scan is specified, an initial scan will occur if there is no cursor, and will not occur if there is one. This preserves the behavior from previous releases.

Default: initial_scan
If used in conjunction with cursor, an initial scan will be performed at the cursor timestamp. If no cursor is specified, the initial scan is performed at now().
Note:

Using the format=experimental_avro, envelope=key_only, and updated options together is rejected. envelope=key_only prevents any rows with updated fields from being emitted, which makes the updated option meaningless.

Avro limitations

Currently, support for Avro is limited and experimental. Below is a list of unsupported SQL types and values for Avro changefeeds:

Avro types

Below is a mapping of CockroachDB types to Avro types:

CockroachDB Type Avro Type Avro Logical Type
INT LONG
BOOL BOOLEAN
FLOAT DOUBLE
STRING STRING
DATE INT DATE
TIME LONG TIME-MICROS
TIMESTAMP LONG TIME-MICROS
TIMESTAMPTZ LONG TIME-MICROS
DECIMAL BYTES DECIMAL
UUID STRING
INET STRING
JSONB STRING

Responses

Messages

The messages (i.e., keys and values) emitted to a sink are specific to the envelope. The default format is wrapped, and the output messages are composed of the following:

  • Key: An array always composed of the row's PRIMARY KEY field(s) (e.g., [1] for JSON or {"id":{"long":1}} for Avro).
  • Value:
    • One of three possible top-level fields:
      • after, which contains the state of the row after the update (or null' for DELETEs).
      • updated, which contains the updated timestamp.
      • resolved, which is emitted for records representing resolved timestamps. These records do not include an "after" value since they only function as checkpoints.
    • For INSERT and UPDATE, the current state of the row inserted or updated.
    • For DELETE, null.

For example:

Statement Response
INSERT INTO office_dogs VALUES (1, 'Petee'); JSON: [1] {"after": {"id": 1, "name": "Petee"}}
Avro: {"id":{"long":1}} {"after":{"office_dogs":{"id":{"long":1},"name":{"string":"Petee"}}}}
DELETE FROM office_dogs WHERE name = 'Petee' JSON: [1] {"after": null}
Avro: {"id":{"long":1}} {"after":null}

Files

The files emitted to a sink use the following naming conventions:

Note:

The timestamp format is YYYYMMDDHHMMSSNNNNNNNNNLLLLLLLLLL.

General file format

/[date]/[timestamp]-[uniquer]-[topic]-[schema-id]

For example:

/2020-04-02/202004022058072107140000000000000-56087568dba1e6b8-1-72-00000000-test_table-1.ndjson

Resolved file format

/[date]/[timestamp].RESOLVED

For example:

/2020-04-04/202004042351304139680000000000000.RESOLVED

Examples

Create a changefeed connected to Kafka

copy
icon/buttons/copy
> CREATE CHANGEFEED FOR TABLE name, name2, name3
  INTO 'kafka://host:port'
  WITH updated, resolved;
+--------------------+
|       job_id       |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)

For more information on how to create a changefeed connected to Kafka, see Stream Data Out of CockroachDB Using Changefeeds.

Create a changefeed connected to Kafka using Avro

copy
icon/buttons/copy
> CREATE CHANGEFEED FOR TABLE name, name2, name3
  INTO 'kafka://host:port'
  WITH format = experimental_avro, confluent_schema_registry = <schema_registry_address>;
+--------------------+
|       job_id       |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)

For more information on how to create a changefeed that emits an Avro record, see Stream Data Out of CockroachDB Using Changefeeds.

Create a changefeed connected to a cloud storage sink

Warning:

This is an experimental feature. The interface and output are subject to change.

copy
icon/buttons/copy
> CREATE CHANGEFEED FOR TABLE name, name2, name3
  INTO 'experimental-scheme://host?parameters'
  WITH updated, resolved;
+--------------------+
|       job_id       |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)

For more information on how to create a changefeed connected to a cloud storage sink, see Stream Data Out of CockroachDB Using Changefeeds.

Manage a changefeed

Use the following SQL statements to pause, resume, and cancel a changefeed.

Note:

Changefeed-specific SQL statements (e.g., CANCEL CHANGEFEED) will be added in the future.

Pause a changefeed

copy
icon/buttons/copy
> PAUSE JOB job_id;

For more information, see PAUSE JOB.

Resume a paused changefeed

copy
icon/buttons/copy
> RESUME JOB job_id;

For more information, see RESUME JOB.

Cancel a changefeed

copy
icon/buttons/copy
> CANCEL JOB job_id;

For more information, see CANCEL JOB.

Start a new changefeed where another ended

Find the high-water timestamp for the ended changefeed:

copy
icon/buttons/copy
> SELECT * FROM crdb_internal.jobs WHERE job_id = <job_id>;
        job_id       |  job_type  | ... |      high_water_timestamp      | error | coordinator_id
+--------------------+------------+ ... +--------------------------------+-------+----------------+
  383870400694353921 | CHANGEFEED | ... | 1537279405671006870.0000000000 |       |              1
(1 row)

Use the high_water_timestamp to start the new changefeed:

copy
icon/buttons/copy
> CREATE CHANGEFEED FOR TABLE name, name2, name3
  INTO 'kafka//host:port'
  WITH cursor = '<high_water_timestamp>';

Note that because the cursor is provided, the initial scan is not performed.

See also



Yes No