Note:

CREATE CHANGEFEED is an enterprise-only feature. For the core version, see EXPERIMENTAL CHANGEFEED FOR.

The CREATE CHANGEFEED statement creates a new enterprise changefeed, which targets a whitelist of tables, called "watched rows". Every change to a watched row is emitted as a record in a configurable format (JSON or Avro) to a configurable sink (Kafka or a cloud storage sink). You can create, pause, resume, or cancel an enterprise changefeed.

For more information, see Change Data Capture.

Required privileges

Changefeeds can only be created by superusers, i.e., members of the admin role. The admin role exists by default with root as the member.

Synopsis

CREATE CHANGEFEED FOR TABLE table_name , INTO sink WITH option = value ,

Parameters

Parameter Description
table_name The name of the table (or tables in a comma separated list) to create a changefeed for.

Note: Changefeeds do not share internal buffers, so each running changefeed will increase total memory usage. To watch multiple tables, we recommend creating a changefeed with a comma-separated list of tables.
sink The location of the configurable sink. The scheme of the URI indicates the type. For more information, see Sink URI below.
option / value For a list of available options and their values, see Options below.

Sink URI

The sink URI follows the basic format of:

'[scheme]://[host]:[port]?[query_parameters]'

The scheme can be kafka or any cloud storage sink.

Kafka

Example of a Kafka sink URI:

'kafka://broker.address.com:9092?topic_prefix=bar_&tls_enabled=true&ca_cert=LS0tLS1CRUdJTiBDRVJUSUZ&sasl_enabled=true&sasl_user=petee&sasl_password=bones'

Query parameters include:

Parameter Value Description
topic_prefix STRING Adds a prefix to all topic names.

For example, CREATE CHANGEFEED FOR TABLE foo INTO 'kafka://...?topic_prefix=bar_' would emit rows under the topic bar_foo instead of foo.
tls_enabled=true BOOL If true, enable Transport Layer Security (TLS) on the connection to Kafka. This can be used with a ca_cert (see below).
ca_cert STRING The base64-encoded ca_cert file.

Note: To encode your ca.cert, run base64 -w 0 ca.cert.
client_cert STRING The base64-encoded Privacy Enhanced Mail (PEM) certificate. This is used with client_key.
client_key STRING The base64-encoded private key for the PEM certificate. This is used with client_cert.
sasl_enabled BOOL If true, use SASL/PLAIN to authenticate. This requires a sasl_user and sasl_password (see below).
sasl_user STRING Your SASL username.
sasl_password STRING Your SASL password.

Cloud storage sink

Use a cloud storage sink to deliver changefeed data to OLAP or big data systems without requiring transport via Kafka.

Note:

Currently, cloud storage sinks only work with JSON and emits newline-delimited JSON files.

Example of a cloud storage sink (i.e., AWS S3) URI:

'experimental-s3://test-s3encryption/test?AWS_ACCESS_KEY_ID=ABCDEFGHIJKLMNOPQ&AWS_SECRET_ACCESS_KEY=LS0tLS1CRUdJTiBDRVJUSUZ'
Note:

The scheme for a cloud storage sink should be prepended with experimental-.

Any of the cloud storages below can be used as a sink:

[scheme]://[host]/[path]?[parameters]
Location Scheme Host Parameters
Amazon s3 Bucket name AUTH 1 (optional; can be implicit or specified), AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN
Azure azure N/A (see Example file URLs AZURE_ACCOUNT_KEY, AZURE_ACCOUNT_NAME
Google Cloud 2 gs Bucket name AUTH (optional; can be default, implicit, or specified), CREDENTIALS
HTTP 3 http Remote host N/A
NFS/Local 4 nodelocal Empty or nodeID 5 (see Example file URLs) N/A
S3-compatible services 6 s3 Bucket name AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, AWS_REGION 7 (optional), AWS_ENDPOINT
Note:

The location parameters often contain special characters that need to be URI-encoded. Use Javascript's encodeURIComponent function or Go language's url.QueryEscape function to URI-encode the parameters. Other languages provide similar functions to URI-encode special characters.

Note:

If your environment requires an HTTP or HTTPS proxy server for outgoing connections, you can set the standard HTTP_PROXY and HTTPS_PROXY environment variables when starting CockroachDB.

  • 1 If the AUTH parameter is not provided, AWS connections default to specified and the access keys must be provided in the URI parameters. If the AUTH parameter is implicit, the access keys can be ommitted and the credentials will be loaded from the environment.

  • 2 If the AUTH parameter is not specified, the cloudstorage.gs.default.key cluster setting will be used if it is non-empty, otherwise the implicit behavior is used. If the AUTH parameter is implicit, all GCS connections use Google's default authentication strategy. If the AUTH parameter is default, the cloudstorage.gs.default.key cluster setting must be set to the contents of a service account file which will be used during authentication. If the AUTH parameter is specified, GCS connections are authenticated on a per-statement basis, which allows the JSON key object to be sent in the CREDENTIALS parameter. The JSON key object should be base64-encoded (using the standard encoding in RFC 4648).

  • 3 You can create your own HTTP server with Caddy or nginx. A custom root CA can be appended to the system's default CAs by setting the cloudstorage.http.custom_ca cluster setting, which will be used when verifying certificates from HTTPS URLs.

  • 4 The file system backup location on the NFS drive is relative to the path specified by the --external-io-dir flag set while starting the node. If the flag is set to disabled, then imports from local directories and NFS drives are disabled.

  • 5 New in v20.1: If a nodeID is provided, the data files will be in the extern directory of the specified node. Currently, using a nodeID is optional but strongly encouraged. If you do not specify a nodeID when using nodelocal storage, the individual data files will be in the extern directories of arbitrary nodes and will likely not work as intended; to work correctly, each node must have the --external-io-dir flag point to the same NFS mount or other network-backed, shared storage.

  • 6 A custom root CA can be appended to the system's default CAs by setting the cloudstorage.http.custom_ca cluster setting, which will be used when verifying certificates from an S3-compatible service.

  • 7 The AWS_REGION parameter is optional since it is not a required parameter for most S3-compatible services. Specify the parameter only if your S3-compatible service requires it.

Example file URLs

Location Example
Amazon S3 s3://acme-co/employees.sql?AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456
Azure azure://employees.sql?AZURE_ACCOUNT_KEY=123&AZURE_ACCOUNT_NAME=acme-co
Google Cloud gs://acme-co/employees.sql
HTTP http://localhost:8080/employees.sql
NFS/Local nodelocal:///path/employees, nodelocal://2/path/employees 5

Options

Option Value Description
updated N/A Include updated timestamps with each row.

If a cursor is provided, the "updated" timestamps will match the MVCC timestamps of the emitted rows, and there is no initial scan. If a cursor is not provided, the changefeed will perform an initial scan (as of the time the changefeed was created), and the "updated" timestamp for each change record emitted in the initial scan will be the timestamp of the initial scan. Similarly, when a backfill is performed for a schema change, the "updated" timestamp is set to the first timestamp for when the new schema is valid.
resolved INTERVAL Periodically emit resolved timestamps to the changefeed. Optionally, set a minimum duration between emitting resolved timestamps. If unspecified, all resolved timestamps are emitted.

Example: resolved='10s'
envelope key_only / wrapped Use key_only to emit only the key and no value, which is faster if you only want to know when the key changes.

Default: envelope=wrapped
cursor Timestamp Emits any changes after the given timestamp, but does not output the current state of the table first. If cursor is not specified, the changefeed starts by doing an initial scan of all the watched rows and emits the current value, then moves to emitting any changes that happen after the scan.

When starting a changefeed at a specific cursor, the cursor cannot be before the configured garbage collection window (see gc.ttlseconds) for the table you're trying to follow; otherwise, the changefeed will error. With default garbage collection settings, this means you cannot create a changefeed that starts more than 25 hours in the past.

cursor can be used to start a new changefeed where a previous changefeed ended.

Example: CURSOR='1536242855577149065.0000000000'
format json / experimental_avro Format of the emitted record. Currently, support for Avro is limited and experimental. For mappings of CockroachDB types to Avro types, see the table below.

Default: format=json.
confluent_schema_registry Schema Registry address The Schema Registry address is required to use experimental_avro.
key_in_value N/A Makes the primary key of a deleted row recoverable in sinks where each message has a value but not a key (most have a key and value in each message). key_in_value is automatically used for these sinks (currently only cloud storage sinks).

New in v20.1: Using the format=experimental_avro, envelope=key_only, and updated options together is rejected. envelope=key_only prevents any rows with updated fields from being emitted, which makes the updated option meaningless.