CREATE CHANGEFEED

Note:

CREATE CHANGEFEED is an Enterprise-only feature. For the core version, see EXPERIMENTAL CHANGEFEED FOR.

The CREATE CHANGEFEED statement creates a new Enterprise changefeed, which targets an allowlist of tables called "watched rows". Every change to a watched row is emitted as a record in a configurable format (JSON or Avro) to a configurable sink (Kafka, a cloud storage sink, or a webhook sink). You can create, pause, resume, or cancel an Enterprise changefeed.

For more information, see Stream Data Out of CockroachDB Using Changefeeds.

Required privileges

To create a changefeed, the user must be a member of the admin role or have the CREATECHANGEFEED parameter set.

Synopsis

CREATE CHANGEFEED FOR TABLE table_name , INTO sink WITH option = value ,

Parameters

Parameter Description
table_name The name of the table (or tables in a comma separated list) to create a changefeed for.

Note: Changefeeds do not share internal buffers, so each running changefeed will increase total memory usage. To watch multiple tables, we recommend creating a changefeed with a comma-separated list of tables.
sink The location of the configurable sink. The scheme of the URI indicates the type. For more information, see Sink URI below.
option / value For a list of available options and their values, see Options below.

Sink URI

The sink URI follows the basic format of:

'{scheme}://{host}:{port}?{query_parameters}'
URI Component Description
scheme The type of sink: kafka, any cloud storage sink, or webhook sink.
host The sink's hostname or IP address.
port The sink's port.
query_parameters The sink's query parameters.

Kafka

Example of a Kafka sink URI:

'kafka://broker.address.com:9092?topic_prefix=bar_&tls_enabled=true&ca_cert=LS0tLS1CRUdJTiBDRVJUSUZ&sasl_enabled=true&sasl_user=petee&sasl_password=bones&sasl_mechanism=SASL-SCRAM-SHA-256'

Cloud storage sink

Use a cloud storage sink to deliver changefeed data to OLAP or big data systems without requiring transport via Kafka.

Example of a cloud storage sink URI with Amazon S3:

's3://acme-co/employees?AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456'

Some considerations when using cloud storage sinks:

  • Cloud storage sinks only work with JSON and emit newline-delimited JSON files.
  • The supported cloud schemes are: s3, gs, azure, http, and https.
  • Both http:// and https:// are cloud storage sinks, not webhook sinks. It is necessary to prefix the scheme with webhook- for webhook sinks.

Use Cloud Storage for Bulk Operations provides more detail on sink URI structure and authentication to cloud storage sinks.

Webhook sink

Note:

The webhook sink is currently in beta. For more information, read about its usage considerations available parameters, and options below.

New in v21.2: Use a webhook sink to deliver changefeed messages to an arbitrary HTTP endpoint.

Example of a webhook sink URL:

'webhook-https://{your-webhook-endpoint}?insecure_tls_skip_verify=true'

The following are considerations when using the webhook sink:

  • Only supports HTTPS. Use the insecure_tls_skip_verify parameter when testing to disable certificate verification; however, this still requires HTTPS and certificates.
  • Only supports JSON output format.
  • There is no concurrency configurability.

Query parameters

Note:

Parameters should always be URI-encoded before they are included the changefeed's URI, as they often contain special characters. Use Javascript's encodeURIComponent function or Go language's url.QueryEscape function to URI-encode the parameters. Other languages provide similar functions to URI-encode special characters.

Query parameters include:

Parameter
Sink Type
Type
Description
New in v21.2: topic_name Kafka STRING Allows arbitrary topic naming for Kafka topics. See the Kafka topic naming limitations for detail on supported characters etc.

For example, CREATE CHANGEFEED FOR foo,bar INTO 'kafka://sink?topic_name=all' will emit all records to a topic named all. Note that schemas will still be registered separately. Can be combined with the topic_prefix option.

Default: table name.
topic_prefix Kafka, cloud STRING Adds a prefix to all topic names.

For example, CREATE CHANGEFEED FOR TABLE foo INTO 'kafka://...?topic_prefix=bar_' would emit rows under the topic bar_foo instead of foo.
tls_enabled Kafka BOOL If true, enable Transport Layer Security (TLS) on the connection to Kafka. This can be used with a ca_cert (see below).

Default: false
ca_cert Kafka, webhook, (Confluent schema registry) STRING The base64-encoded ca_cert file. Specify ca_cert for a Kafka sink, webhook sink, and/or a Confluent schema registry.

For usage with a Kafka sink, see Kafka Sink URI.

It's necessary to state https in the schema registry's address when passing ca_cert:
confluent_schema_registry='https://schema_registry:8081?ca_cert=LS0tLS1CRUdJTiBDRVJUSUZ'
See confluent_schema_registry for more detail on using this option.

Note: To encode your ca.cert, run base64 -w 0 ca.cert.
client_cert Kafka STRING The base64-encoded Privacy Enhanced Mail (PEM) certificate. This is used with client_key.
client_key Kafka STRING The base64-encoded private key for the PEM certificate. This is used with client_cert.
sasl_enabled Kafka BOOL If true, the authentication protocol can be set to SCRAM or PLAIN using the sasl_mechanism parameter. You must have tls_enabled set to true to use SASL.

Default: false
sasl_mechanism Kafka STRING Can be set to SASL-SCRAM-SHA-256, SASL-SCRAM-SHA-512, or SASL-PLAIN. A sasl_user and sasl_password are required.

Default: SASL-PLAIN
sasl_user Kafka STRING Your SASL username.
sasl_password Kafka STRING Your SASL password.
file_size cloud STRING The file will be flushed (i.e., written to the sink) when it exceeds the specified file size. This can be used with the WITH resolved option, which flushes on a specified cadence.

Default: 16MB
insecure_tls_skip_verify Kafka, webhook BOOL If true, disable client-side validation of responses. Note that a CA certificate is still required; this parameter means that the client will not verify the certificate. Warning: Use this query parameter with caution, as it creates MITM vulnerabilities unless combined with another method of authentication.

Default: false

Options

Option Value Description
updated N/A Include updated timestamps with each row.

If a cursor is provided, the "updated" timestamps will match the MVCC timestamps of the emitted rows, and there is no initial scan. If a cursor is not provided, the changefeed will perform an initial scan (as of the time the changefeed was created), and the "updated" timestamp for each change record emitted in the initial scan will be the timestamp of the initial scan. Similarly, when a backfill is performed for a schema change, the "updated" timestamp is set to the first timestamp for when the new schema is valid.
resolved INTERVAL Emits resolved timestamp events per changefeed in a format dependent on the connected sink. Resolved timestamp events do not emit until all ranges in the changefeed have progressed to a specific point in time.

Set an optional minimal duration between emitting resolved timestamps. Example: resolved='10s'. This option will only emit a resolved timestamp event if the timestamp has advanced and at least the optional duration has elapsed. If unspecified, all resolved timestamps are emitted as the high-water mark advances.
envelope key_only / wrapped Use key_only to emit only the key and no value, which is faster if you only want to know when the key changes.

Default: envelope=wrapped
cursor Timestamp Emit any changes after the given timestamp, but does not output the current state of the table first. If cursor is not specified, the changefeed starts by doing an initial scan of all the watched rows and emits the current value, then moves to emitting any changes that happen after the scan.

When starting a changefeed at a specific cursor, the cursor cannot be before the configured garbage collection window (see gc.ttlseconds) for the table you're trying to follow; otherwise, the changefeed will error. With default garbage collection settings, this means you cannot create a changefeed that starts more than 25 hours in the past.

cursor can be used to start a new changefeed where a previous changefeed ended.

Example: CURSOR='1536242855577149065.0000000000'
format json / avro Format of the emitted record. For mappings of CockroachDB types to Avro types, see the table and detail on Avro limitations below.

Default: format=json.
mvcc_timestamp N/A New in v21.2: Include the MVCC timestamp for each emitted row in a changefeed. With the mvcc_timestamp option, each emitted row will always contain its MVCC timestamp, even during the changefeed's initial backfill.
confluent_schema_registry Schema Registry address The Schema Registry address is required to use avro.
key_in_value N/A Make the primary key of a deleted row recoverable in sinks where each message has a value but not a key (most have a key and value in each message). key_in_value is automatically used for cloud storage sinks and webhook sinks.
diff N/A Publish a before field with each message, which includes the value of the row before the update was applied.
compression gzip Compress changefeed data files written to a cloud storage sink. Currently, only Gzip is supported for compression.
on_error pause / fail New in v21.2: Use on_error to pause the changefeed instead of sending it into a terminal failure state in situations such as when a changefeed cannot connect to a downstream sink for an extended period of time. Use with protect_data_from_gc_on_pause to protect changes from garbage collection.

Default: on_error=fail
protect_data_from_gc_on_pause N/A When a changefeed is paused, ensure that the data needed to resume the changefeed is not garbage collected.

Note: If you use this option, changefeeds left paused can prevent garbage collection for long periods of time.
schema_change_events default / column_changes The type of schema change event that triggers the behavior specified by the schema_change_policy option:
  • default: Include all ADD COLUMN events for columns that have a non-NULL DEFAULT value or are computed, and all DROP COLUMN events.
  • column_changes: Include all all schema change events that add or remove any column.

Default: schema_change_events=default
schema_change_policy backfill / nobackfill / stop The behavior to take when an event specified by the schema_change_events option occurs:
Default: schema_change_policy=backfill
initial_scan / no_initial_scan N/A Control whether or not an initial scan will occur at the start time of a changefeed. initial_scan and no_initial_scan cannot be used simultaneously. If neither initial_scan nor no_initial_scan is specified, an initial scan will occur if there is no cursor, and will not occur if there is one. This preserves the behavior from previous releases.

Default: initial_scan
If used in conjunction with cursor, an initial scan will be performed at the cursor timestamp. If no cursor is specified, the initial scan is performed at now().
kafka_sink_config STRING New in v21.2: Set fields to configure the required level of message acknowledgement from the Kafka server, the version of the server, and batching parameters for Kafka sinks. See Advanced Configuration for more detail on configuring all the available fields for this option.

Example: CREATE CHANGEFEED FOR table INTO 'kafka://localhost:9092' WITH kafka_sink_config='{"Flush": {"MaxMessages": 1, "Frequency": "1s"}, "RequiredAcks": "ONE"}'
full_table_name N/A Use fully-qualified table name in topics, subjects, schemas, and record output instead of the default table name. This can prevent unintended behavior when the same table name is present in multiple databases.

Example: CREATE CHANGEFEED FOR foo... WITH full_table_name will create the topic name defaultdb.public.foo instead of foo.
avro_schema_prefix Schema prefix name Provide a namespace for the schema of a table in addition to the default, the table name. This allows multiple databases or clusters to share the same schema registry when the same table name is present in multiple databases.

Example: CREATE CHANGEFEED FOR foo WITH format=avro, confluent_schema_registry='registry_url', avro_schema_prefix='super' will register subjects as superfoo-key and superfoo-value with the namespace super.
webhook_client_timeout INTERVAL New in v21.2: If a response is not recorded from the sink within this timeframe, it will error and retry to connect. Note this must be a positive value.

Default: "3s"
webhook_auth_header STRING New in v21.2: Pass a value (password, token etc.) to the HTTP Authorization header with a webhook request for a "Basic" HTTP authentication scheme.

Example: With a username of "user" and password of "pwd", add a colon between "user:pwd" and then base64 encode, which results in "dXNlcjpwd2Q=". WITH webhook_auth_header='Basic dXNlcjpwd2Q='.
topic_in_value BOOL New in v21.2: Set to include the topic in each emitted row update. Note this is automatically set for webhook sinks.
webhook_sink_config STRING New in v21.2: Set fields to configure sink batching and retries. The schema is as follows:

{ "Flush": { "Messages": ..., "Bytes": ..., "Frequency": ..., }, "Retry": {"Max": ..., "Backoff": ..., } }.

Note that if either Messages or Bytes are nonzero, then a non-zero value for Frequency must be provided.

See the Webhook sink configuration section for more details on using this option.
Note:

Using the format=avro, envelope=key_only, and updated options together is rejected. envelope=key_only prevents any rows with updated fields from being emitted, which makes the updated option meaningless.

Avro limitations

Below are clarifications for particular SQL types and values for Avro changefeeds:

  • Decimals must have precision specified.
  • BIT and VARBIT types are encoded as arrays of 64-bit integers.

    For efficiency, CockroachDB encodes BIT and VARBIT bitfield types as arrays of 64-bit integers. That is, base-2 (binary format) BIT and VARBIT data types are converted to base 10 and stored in arrays. Encoding in CockroachDB is big-endian, therefore the last value may have many trailing zeroes. For this reason, the first value of each array is the number of bits that are used in the last value of the array.

    For instance, if the bitfield is 129 bits long, there will be 4 integers in the array. The first integer will be 1; representing the number of bits in the last value, the second integer will be the first 64 bits, the third integer will be bits 65–128, and the last integer will either be 0 or 9223372036854775808 (i.e. the integer with only the first bit set, or 1000000000000000000000000000000000000000000000000000000000000000 when base 2).

    This example is base-10 encoded into an array as follows:

    {"array": [1, <first 64 bits>, <second 64 bits>, 0 or 9223372036854775808]}
    

    For downstream processing, it is necessary to base-2 encode every element in the array (except for the first element). The first number in the array gives you the number of bits to take from the last base-2 number — that is, the most significant bits. So, in the example above this would be 1. Finally, all the base-2 numbers can be appended together, which will result in the original number of bits, 129.

    In a different example of this process where the bitfield is 136 bits long, the array would be similar to the following when base-10 encoded:

    {"array": [8, 18293058736425533439, 18446744073709551615, 13690942867206307840]}
    

    To then work with this data, you would convert each of the elements in the array to base-2 numbers, besides the first element. For the above array, this would convert to:

    [8, 1111110111011011111111111111111111111111111111111111111111111111, 1111111111111111111111111111111111111111111111111111111111111111, 1011111000000000000000000000000000000000000000000000000000000000]
    

    Next, you use the first element in the array to take the number of bits from the last base-2 element, 10111110. Finally, you append each of the base-2 numbers together — in the above array, the second, third, and truncated last element. This results in 136 bits, the original number of bits.

Avro types

Below is a mapping of CockroachDB types to Avro types:

CockroachDB Type Avro Type Avro Logical Type
INT LONG
BOOL BOOLEAN
FLOAT DOUBLE
STRING STRING
DATE INT DATE
TIME LONG TIME-MICROS
TIMESTAMP LONG TIME-MICROS
TIMESTAMPTZ LONG TIME-MICROS
DECIMAL STRING, BYTES DECIMAL
UUID STRING
INET STRING
JSONB STRING
ENUMS STRING
INTERVAL STRING
ARRAY ARRAY
BIT Array of LONG
VARBIT Array of LONG
COLLATE STRING
Note:

The DECIMAL type is a union between Avro STRING and Avro DECIMAL types.

Topic naming

By default, a Kafka topic has the same name as the table that a changefeed was created on. If a changefeed was created on multiple tables, the changefeed will write to multiple topics corresponding to those table names.

To modify the default topic naming, you can specify a topic prefix, an arbitrary topic name, or use the full_table_name option. Using the topic_name parameter, you can specify an arbitrary topic name and feed all tables into that topic.

You can either manually create a topic in your Kafka cluster before starting the changefeed, or the topic will be automatically created when the changefeed connects to your Kafka cluster.

Note:

You must have the Kafka cluster setting auto.create.topics.enable set to true for automatic topic creation. This will create the topic when the changefeed sends its first message. If you create the consumer before that, you will also need the Kafka consumer configuration allow.auto.create.topics to be set to true.

Kafka has the following topic limitations:

  • Legal characters are numbers, letters, and [._-].
  • The maximum character length of a topic name is 249.
  • Topics with a period (.) and underscore (_) can collide on internal Kafka data structures, so you should use either but not both.
  • Characters not accepted by Kafka will be automatically encoded as unicode characters by CockroachDB.

Advanced changefeed configuration

Warning:

The configurations and settings explained in these sections will have significant impact on a changefeed's behavior.

The following sections describe settings, configurations, and details to tune changefeeds for particular use cases:

Tuning for high durability delivery

When designing a system that relies on high durability of message delivery — that is, not missing any message acknowledgement at the downstream sink — consider the following settings and configuration. Before tuning these settings we recommend reading details on our changefeed at-least-once-delivery guarantee.

  • Increase the number of seconds before garbage collection with the gc.ttlseconds setting to provide a higher recoverability window for data if a changefeed fails. For example, if a sink is unavailable, changes are queued until the sink is available again. While the sink is unavailable, changes will be retried until the garbage collection window is reached and then the data is removed.
    • You can also use the protect_data_from_gc_on_pause option in combination with on_error=pause to explicitly pause a changefeed on error (instead of going into a failure state) and to then protect the changes from garbage collection.
  • Determine what a successful write to Kafka is with the kafka_sink_config: {"RequiredAcks": "ALL"} option, which provides the highest consistency level.
  • Use Kafka or cloud storage sinks when tuning for high durability delivery in changefeeds. Both Kafka and cloud storage sinks offer built-in advanced protocols, whereas the webhook sink, while flexible, requires an understanding of how messages are acknowledged and committed by the particular system used for the webhook in order to ensure the durability of message delivery.
  • Ensure that data is ingested downstream in its new format after a schema change by using the schema_change_events and schema_schange_policy options. For example, setting schema_change_events=column_changes and schema_change_policy=stop will trigger an error to the cockroach.log file on a schema change and the changefeed to fail.

Tuning for high throughput

When designing a system that needs to emit a lot of changefeed messages, whether it be steady traffic or a burst in traffic, consider the following settings and configuration:

  • Avoid using the resolved option or set this to a higher duration. This will help to reduce emitted messages.
  • Batch messages to your sink. See the Flush parameter for the kafka_sink_config option. When using cloud storage sinks, use the file_size parameter to flush a file when it exceeds the specified size.
  • Set the changefeed.memory.per_changefeed_limit cluster setting to a higher limit to give more memory for buffering for a changefeed. This is useful in situations of heavy traffic.
  • Use avro as the emitted message format option with Kafka sinks; JSON encoding can potentially create a slowdown.
  • Use the compression option in cloud storage sinks with JSON to compress the changefeed data files.
  • Increase the changefeed.backfill.concurrent_scan_requests setting, which controls the number of concurrent scan requests per node issued during a backfill event. The default behavior, when this setting is at 0, is that the number of scan requests will be 3 times the number of nodes in the cluster (to a maximum of 100). While increasing this number will allow for higher throughput, it will increase the cluster load overall, including CPU and IO usage.
  • New in v21.2: Enable the kv.rangefeed.catchup_scan_iterator_optimization.enabled setting to have rangefeeds use time-bound iterators for catch-up scans when possible. Catch-up scans are run for each rangefeed request. This setting improves the performance of changefeeds during some range-split operations. Changefeeds using the WITH diff option do not currently receive any benefit from this setting.

Kafka sink configuration

New in v21.2: The kafka_sink_config option allows configuration of a changefeed's message delivery, Kafka server version, and batching parameters.

Warning:

Each of the following settings have significant impact on a changefeed's behavior, such as latency. For example, it is possible to configure batching parameters to be very high, which would negatively impact changefeed latency. As a result it would take a long time to see messages coming through to the sink. Also, large batches may be rejected by the Kafka server unless it's separately configured to accept a high max.message.bytes.

kafka_sink_config='{"Flush": {"MaxMessages": 1, "Frequency": "1s"}, "Version": "0.8.2.0", "RequiredAcks": "ONE" }'

The configurable fields include:

  • "Flush"."MaxMessages" and "Flush"."Frequency": These are configurable batching parameters depending on latency and throughput needs. For example, if "MaxMessages" is set to 1000 and "Frequency" to 1 second, it will flush to Kafka either after 1 second or after 1000 messages are batched, whichever comes first. It's important to consider that if there are not many messages, then a "1s" frequency will add 1 second latency. However, if there is a larger influx of messages these will be flushed quicker.

    • "MaxMessages": sets the maximum number of messages the producer will send in a single broker request. Increasing this value allows all messages to be sent in a batch. Default: 1. Type: INT.
    • "Frequency": sets the maximum time that messages will be batched. Default: "0s". Type: INTERVAL.
  • "Version": sets the appropriate Kafka cluster version, which can be used to connect to Kafka versions < v1.0 (kafka_sink_config='{"Version": "0.8.2.0"}'). Default: "1.0.0.0" Type: STRING.

  • "RequiredAcks": specifies what a successful write to Kafka is. CockroachDB guarantees at least once delivery of messages — this value defines the delivery. Type: STRING. The possible values are:
    • "ONE": a write to Kafka is successful once the leader node has committed and acknowledged the write. Note that this has the potential risk of dropped messages; if the leader node acknowledges before replicating to a quorum of other Kafka nodes, but then fails. This is the default value.
    • "NONE": no Kafka brokers are required to acknowledge that they have committed the message. This will decrease latency and increase throughput, but comes at the cost of lower consistency.
    • "ALL": a quorum must be reached (that is, most Kafka brokers have committed the message) before the leader can acknowledge. This is the highest consistency level.

Webhook sink configuration

New in v21.2: The webhook_sink_config option allows the changefeed flushing and retry behavior of your webhook sink to be configured.

The following details the configurable fields:

Field Type Description Default
Flush.Messages INT When the batch reaches this configured size, it should be flushed (batch sent). 0
Flush.Bytes INT When the total byte size of all the messages in the batch reaches this amount, it should be flushed. 0
Flush.Frequency INTERVAL When this amount of time has passed since the first received message in the batch without it flushing, it should be flushed. "0s"
Retry.Max INT or STRING The maximum amount of time the sink will retry a single HTTP request to send a batch. This value must be positive (> 0). If infinite retries are desired, use inf. "0s"
Retry.Backoff INTERVAL The initial backoff the sink will wait after the first failure. The backoff will double (exponential backoff strategy), until the max is hit. "500ms"
Warning:

Setting either Messages or Bytes with a non-zero value without setting Frequency, will cause the sink to assume Frequency has an infinity value. If either Messages or Bytes have a non-zero value, then a non-zero value for Frequency must be provided. This configuration is invalid and will cause an error, since the messages could sit in a batch indefinitely if the other conditions do not trigger.

Some complexities to consider when setting Flush fields for batching:

  • When all batching parameters are zero ("Messages", "Bytes", and "Frequency") the sink will interpret this configuration as "send batch every time." This would be the same as not providing any configuration at all:
{
  "Flush": {
    "Messages": 0,
    "Bytes": 0,
    "Frequency": "0s"
  }
}
  • If one or more fields are set as non-zero values, any fields with a zero value the sink will interpret as infinity. For example, in the following configuration, the sink will send a batch whenever the size reaches 100 messages, or, when 5 seconds has passed since the batch was populated with its first message. Bytes defaults to 0 in this case, so a batch will never trigger due to a configured byte size:
{
  "Flush": {
    "Messages": 100,
    "Frequency": "5s"
  }
}

Responses

Messages

The messages (i.e., keys and values) emitted to a sink are specific to the envelope. The default format is wrapped, and the output messages are composed of the following:

  • Key: An array always composed of the row's PRIMARY KEY field(s) (e.g., [1] for JSON or {"id":{"long":1}} for Avro).
  • Value:
    • One of three possible top-level fields:
      • after, which contains the state of the row after the update (or null' for DELETEs).
      • updated, which contains the updated timestamp.
      • resolved, which is emitted for records representing resolved timestamps. These records do not include an "after" value since they only function as checkpoints.
    • For INSERT and UPDATE, the current state of the row inserted or updated.
    • For DELETE, null.

For example:

Statement Response
INSERT INTO office_dogs VALUES (1, 'Petee'); JSON: [1] {"after": {"id": 1, "name": "Petee"}}
Avro: {"id":{"long":1}} {"after":{"office_dogs":{"id":{"long":1},"name":{"string":"Petee"}}}}
DELETE FROM office_dogs WHERE name = 'Petee' JSON: [1] {"after": null}
Avro: {"id":{"long":1}} {"after":null}

For webhook sinks, the response format comes as a batch of changefeed messages with a payload and length. Batching is done with a per-key guarantee, which means that the messages with the same key are considered for the same batch. Note that batches are only collected for row updates and not resolved timestamps:

{"payload": [{"after" : {"a" : 1, "b" : "a"}, "key": [1], "topic": "foo"}, {"after": {"a": 1, "b": "b"}, "key": [1], "topic": "foo" }], "length":2}

Files

The files emitted to a sink use the following naming conventions:

Note:

The timestamp format is YYYYMMDDHHMMSSNNNNNNNNNLLLLLLLLLL.

General file format

/[date]/[timestamp]-[uniquer]-[topic]-[schema-id]

For example:

/2020-04-02/202004022058072107140000000000000-56087568dba1e6b8-1-72-00000000-test_table-1.ndjson

Resolved file format

/[date]/[timestamp].RESOLVED

For example:

/2020-04-04/202004042351304139680000000000000.RESOLVED

Examples

Create a changefeed connected to Kafka

icon/buttons/copy
> CREATE CHANGEFEED FOR TABLE name, name2, name3
  INTO 'kafka://host:port'
  WITH updated, resolved;
+--------------------+
|       job_id       |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)

For more information on how to create a changefeed connected to Kafka, see Stream Data Out of CockroachDB Using Changefeeds.

Create a changefeed connected to Kafka using Avro

icon/buttons/copy
> CREATE CHANGEFEED FOR TABLE name, name2, name3
  INTO 'kafka://host:port'
  WITH format = avro, confluent_schema_registry = <schema_registry_address>;
+--------------------+
|       job_id       |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)

For more information on how to create a changefeed that emits an Avro record, see Stream Data Out of CockroachDB Using Changefeeds.

Create a changefeed connected to a cloud storage sink

icon/buttons/copy
> CREATE CHANGEFEED FOR TABLE name, name2, name3
  INTO 'scheme://host?parameters'
  WITH updated, resolved;
+--------------------+
|       job_id       |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)

For more information on how to create a changefeed connected to a cloud storage sink, see Stream Data Out of CockroachDB Using Changefeeds.

Create a changefeed connected to a webhook sink

Note:

The webhook sink is currently in beta — see usage considerations, available parameters, and options for more information.

icon/buttons/copy
CREATE CHANGEFEED FOR TABLE name, name2, name3
  INTO 'webhook-https://{your-webhook-endpoint}?insecure_tls_skip_verify=true'
  WITH updated;
+---------------------+
|      job_id         |
----------------------+
| 687842491801632769  |
+---------------------+
(1 row)

Manage a changefeed

New in v21.2: For enterprise changefeeds, use SHOW CHANGEFEED JOBS to check the status of your changefeed jobs:

icon/buttons/copy
> SHOW CHANGEFEED JOBS;

Use the following SQL statements to pause, resume, or cancel a changefeed.

Pause a changefeed

icon/buttons/copy
> PAUSE JOB job_id;

For more information, see PAUSE JOB.

Resume a paused changefeed

icon/buttons/copy
> RESUME JOB job_id;

For more information, see RESUME JOB.

Cancel a changefeed

icon/buttons/copy
> CANCEL JOB job_id;

For more information, see CANCEL JOB.

Configuring all changefeeds

It is useful to be able to pause all running changefeeds during troubleshooting, testing, or when a decrease in CPU load is needed.

To pause all running changefeeds:

icon/buttons/copy
PAUSE JOBS (SELECT * FROM [SHOW CHANGEFEED JOBS] WHERE status = ('running'));

This will change the status for each of the running changefeeds to paused, which can be verified with SHOW CHANGEFEED JOBS.

To resume all running changefeeds:

icon/buttons/copy
RESUME JOBS (SELECT * FROM [SHOW CHANGEFEED JOBS] WHERE status = ('paused'));

This will resume the changefeeds and update the status for each of the changefeeds to running.

Start a new changefeed where another ended

Find the high-water timestamp for the ended changefeed:

icon/buttons/copy
> SELECT * FROM crdb_internal.jobs WHERE job_id = <job_id>;
        job_id       |  job_type  | ... |      high_water_timestamp      | error | coordinator_id
+--------------------+------------+ ... +--------------------------------+-------+----------------+
  383870400694353921 | CHANGEFEED | ... | 1537279405671006870.0000000000 |       |              1
(1 row)

Use the high_water_timestamp to start the new changefeed:

icon/buttons/copy
> CREATE CHANGEFEED FOR TABLE name, name2, name3
  INTO 'kafka//host:port'
  WITH cursor = '<high_water_timestamp>';

Note that because the cursor is provided, the initial scan is not performed.

See also

YesYes NoNo