New in v2.1: The CREATE CHANGEFEED statement creates a new changefeed, which provides row-level change subscriptions.

Changefeeds target a whitelist of tables, called the "watched rows." Every change to a watched row is emitted as a record in a configurable format (JSON) to a configurable sink (Kafka).

For more information, see Change Data Capture.

Warning:

This feature is under active development and only works for a targeted a use case. Please file a Github issue if you have feedback on the interface.

Note:

CREATE CHANGEFEED is an enterprise-only. There will be a core version in a future version.

Required privileges

Changefeeds can only be created by superusers, i.e., members of the admin role. The admin role exists by default with root as the member.

Synopsis

CREATE CHANGEFEED FOR TABLE table_name , INTO sink WITH option = value ,

Parameters

Parameter Description
table_name The name of the table (or tables in a comma separated list) to create a changefeed for.
sink The location of the configurable sink. The scheme of the URI indicates the type; currently, only kafka. There are query parameters that vary per type. Currently, the kafka scheme only has topic_prefix, which adds a prefix to all of the topic names.

Sink URI scheme: '[scheme]://[host]:[port][?topic_prefix=[foo]]'

For example, CREATE CHANGEFEED FOR TABLE foo INTO 'kafka://...?topic_prefix=bar_' would emit rows under the topic bar_foo instead of foo.
option / value For a list of available options and their values, see Options below.

Options

Option Value Description
updated N/A Include updated timestamps with each row.
resolved INTERVAL Periodically emit resolved timestamps to the changefeed. Optionally, set a minimum duration between emitting resolved timestamps. If unspecified, all resolved timestamps are emitted.

Example: resolved='10s'
envelope key_only / row Use key_only to emit only the key and no value, which is faster if you only want to know when the key changes.

Default: envelope=row
cursor Timestamp Emits any changes after the given timestamp, but does not output the current state of the table first. If cursor is not specified, the changefeed starts by doing a consistent scan of all the watched rows and emits the current value, then moves to emitting any changes that happen after the scan.

cursor can be used to start a new changefeed where a previous changefeed ended.

Example: CURSOR='1536242855577149065.0000000000'
format json / 'experimental-avro' Format of the emitted record. Currently, support for Avro is limited and experimental.

Default: format=json.
confluent_schema_registry Schema Registry address The Schema Registry address is required to use 'experimental-avro'.

Examples

Create a changefeed

copy
icon/buttons/copy
> CREATE CHANGEFEED FOR TABLE name
  INTO 'kafka://host:port'
  WITH updated, resolved;
+--------------------+
|       job_id       |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)

For more information on how to create a changefeed connected to Kafka, see Change Data Capture.

Create a changefeed with Avro

copy
icon/buttons/copy
> CREATE CHANGEFEED FOR TABLE name
  INTO 'kafka://host:port'
  WITH format = 'experimental-avro', confluent_schema_registry = '<schema_registry_address>';
+--------------------+
|       job_id       |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)

For more information on how to create a changefeed that emits an Avro record, see Change Data Capture.

Manage a changefeed

Use the following SQL statements to pause, resume, and cancel a changefeed.

Note:

Changefeed-specific SQL statements (e.g., CANCEL CHANGEFEED) will be added in the future.

Pause a changefeed

copy
icon/buttons/copy
> PAUSE JOB job_id;

For more information, see PAUSE JOB.

Resume a paused changefeed

copy
icon/buttons/copy
> RESUME JOB job_id;

For more information, see RESUME JOB.

Cancel a changefeed

copy
icon/buttons/copy
> CANCEL JOB job_id;

For more information, see CANCEL JOB.

Start a new changefeed where another ended

Find the high-water timestamp for the ended changefeed:

copy
icon/buttons/copy
> SELECT * FROM crdb_internal.jobs WHERE job_id = <job_id>;
        job_id       |  job_type  | ... |      high_water_timestamp      | error | coordinator_id
+--------------------+------------+ ... +--------------------------------+-------+----------------+
  383870400694353921 | CHANGEFEED | ... | 1537279405671006870.0000000000 |       |              1
(1 row)

Use the high_water_timestamp to start the new changefeed:

copy
icon/buttons/copy
> CREATE CHANGEFEED FOR TABLE name
  INTO 'kafka//host:port'
  WITH cursor = '<high_water_timestamp>';

See also



Yes No