Use Changefeeds

This page describes the main components to enabling and using changefeeds:

Read the following Considerations before working with changefeeds.

Considerations

  • It is necessary to enable rangefeeds for changefeeds to work.
  • Changefeeds do not share internal buffers, so each running changefeed will increase total memory usage. To watch multiple tables, we recommend creating a changefeed with a comma-separated list of tables.
  • Many DDL queries (including TRUNCATE, DROP TABLE, and queries that add a column family) will cause errors on a changefeed watching the affected tables. You will need to start a new changefeed.
  • Partial or intermittent sink unavailability may impact changefeed stability. If a sink is unavailable, messages can't send, which means that a changefeed's high-water mark timestamp is at risk of falling behind the cluster's garbage collection window. Throughput and latency can be affected once the sink is available again. However, ordering guarantees will still hold for as long as a changefeed remains active.
  • When an IMPORT INTO statement is run, any current changefeed jobs targeting that table will fail.

Enable rangefeeds

Changefeeds connect to a long-lived request (i.e., a rangefeed), which pushes changes as they happen. This reduces the latency of row changes, as well as reduces transaction restarts on tables being watched by a changefeed for some workloads.

Rangefeeds must be enabled for a changefeed to work. To enable the cluster setting:

icon/buttons/copy
> SET CLUSTER SETTING kv.rangefeed.enabled = true;

Any created changefeed will error until this setting is enabled. Note that enabling rangefeeds currently has a small performance cost (about a 5-10% increase in latencies), whether or not the rangefeed is being used in a changefeed.

The kv.closed_timestamp.target_duration cluster setting can be used with changefeeds. Resolved timestamps will always be behind by at least this setting's duration; however, decreasing the duration leads to more transaction restarts in your cluster, which can affect performance.

Ordering guarantees

  • In most cases, each version of a row will be emitted once. However, some infrequent conditions (e.g., node failures, network partitions) will cause them to be repeated. This gives our changefeeds an at-least-once delivery guarantee.

  • Once a row has been emitted with some timestamp, no previously unseen versions of that row will be emitted with a lower timestamp. That is, you will never see a new change for that row at an earlier timestamp.

    For example, if you ran the following:

    > CREATE TABLE foo (id INT PRIMARY KEY DEFAULT unique_rowid(), name STRING);
    > CREATE CHANGEFEED FOR TABLE foo INTO 'kafka://localhost:9092' WITH UPDATED;
    > INSERT INTO foo VALUES (1, 'Carl');
    > UPDATE foo SET name = 'Petee' WHERE id = 1;
    

    You'd expect the changefeed to emit:

    [1] {"__crdb__": {"updated": <timestamp 1>}, "id": 1, "name": "Carl"}
    [1] {"__crdb__": {"updated": <timestamp 2>}, "id": 1, "name": "Petee"}
    

    It is also possible that the changefeed emits an out of order duplicate of an earlier value that you already saw:

    [1] {"__crdb__": {"updated": <timestamp 1>}, "id": 1, "name": "Carl"}
    [1] {"__crdb__": {"updated": <timestamp 2>}, "id": 1, "name": "Petee"}
    [1] {"__crdb__": {"updated": <timestamp 1>}, "id": 1, "name": "Carl"}
    

    However, you will never see an output like the following (i.e., an out of order row that you've never seen before):

    [1] {"__crdb__": {"updated": <timestamp 2>}, "id": 1, "name": "Petee"}
    [1] {"__crdb__": {"updated": <timestamp 1>}, "id": 1, "name": "Carl"}
    
  • If a row is modified more than once in the same transaction, only the last change will be emitted.

  • Rows are sharded between Kafka partitions by the row’s primary key.

  • The UPDATED option adds an "updated" timestamp to each emitted row. You can also use the RESOLVED option to emit "resolved" timestamp messages to each Kafka partition. A "resolved" timestamp is a guarantee that no (previously unseen) rows with a lower update timestamp will be emitted on that partition.

    For example:

    {"__crdb__": {"updated": "1532377312562986715.0000000000"}, "id": 1, "name": "Petee H"}
    {"__crdb__": {"updated": "1532377306108205142.0000000000"}, "id": 2, "name": "Carl"}
    {"__crdb__": {"updated": "1532377358501715562.0000000000"}, "id": 3, "name": "Ernie"}
    {"__crdb__":{"resolved":"1532379887442299001.0000000000"}}
    {"__crdb__":{"resolved":"1532379888444290910.0000000000"}}
    {"__crdb__":{"resolved":"1532379889448662988.0000000000"}}
    ...
    {"__crdb__":{"resolved":"1532379922512859361.0000000000"}}
    {"__crdb__": {"updated": "1532379923319195777.0000000000"}, "id": 4, "name": "Lucky"}
    
  • With duplicates removed, an individual row is emitted in the same order as the transactions that updated it. However, this is not true for updates to two different rows, even two rows in the same table.

    To compare two different rows for happens-before, compare the "updated" timestamp. This works across anything in the same cluster (e.g., tables, nodes, etc.).

    Resolved timestamp notifications on every Kafka partition can be used to provide strong ordering and global consistency guarantees by buffering records in between timestamp closures. Use the "resolved" timestamp to see every row that changed at a certain time.

    The complexity with timestamps is necessary because CockroachDB supports transactions that can affect any part of the cluster, and it is not possible to horizontally divide the transaction log into independent changefeeds. For more information about this, read our blog post on CDC.

Delete messages

Deleting a row will result in a changefeed outputting the primary key of the deleted row and a null value. For example, with default options, deleting the row with primary key 5 will output:

[5] {"after": null}

In some unusual situations you may receive a delete message for a row without first seeing an insert message. For example, if an attempt is made to delete a row that does not exist, you may or may not get a delete message because the changefeed behavior is undefined to allow for optimizations at the storage layer. Similarly, if there are multiple writes to a row within a single transaction, only the last one will propagate to a changefeed. This means that creating and deleting a row within the same transaction will never result in an insert message, but may result in a delete message.

Schema Changes

Avro schema changes

To ensure that the Avro schemas that CockroachDB publishes will work with the schema compatibility rules used by the Confluent schema registry, CockroachDB emits all fields in Avro as nullable unions. This ensures that Avro and Confluent consider the schemas to be both backward- and forward-compatible, since the Confluent Schema Registry has a different set of rules than Avro for schemas to be backward- and forward-compatible.

Note that the original CockroachDB column definition is also included in the schema as a doc field, so it's still possible to distinguish between a NOT NULL CockroachDB column and a NULL CockroachDB column.

Schema changes with column backfill

When schema changes with column backfill (e.g., adding a column with a default, adding a computed column, adding a NOT NULL column, dropping a column) are made to watched rows, the changefeed will emit some duplicates during the backfill. When it finishes, CockroachDB outputs all watched rows using the new schema. When using Avro, rows that have been backfilled by a schema change are always re-emitted.

For an example of a schema change with column backfill, start with the changefeed created in this Kafka example:

[1] {"id": 1, "name": "Petee H"}
[2] {"id": 2, "name": "Carl"}
[3] {"id": 3, "name": "Ernie"}

Add a column to the watched table:

icon/buttons/copy
> ALTER TABLE office_dogs ADD COLUMN likes_treats BOOL DEFAULT TRUE;

The changefeed emits duplicate records 1, 2, and 3 before outputting the records using the new schema:

[1] {"id": 1, "name": "Petee H"}
[2] {"id": 2, "name": "Carl"}
[3] {"id": 3, "name": "Ernie"}
[1] {"id": 1, "name": "Petee H"}  # Duplicate
[2] {"id": 2, "name": "Carl"}     # Duplicate
[3] {"id": 3, "name": "Ernie"}    # Duplicate
[1] {"id": 1, "likes_treats": true, "name": "Petee H"}
[2] {"id": 2, "likes_treats": true, "name": "Carl"}
[3] {"id": 3, "likes_treats": true, "name": "Ernie"}

When using the schema_change_policy = nobackfill option, the changefeed will still emit duplicate records for the table that is being altered. In the preceding output, the records marked as # Duplicate will still emit with this option, but not the new schema records.

Note:

Changefeeds will emit NULL values for VIRTUAL computed columns and not the column's computed value.

Responses

Messages

The messages (i.e., keys and values) emitted to a sink are specific to the envelope. The default format is wrapped, and the output messages are composed of the following:

  • Key: An array always composed of the row's PRIMARY KEY field(s) (e.g., [1] for JSON or {"id":{"long":1}} for Avro).
  • Value:
    • One of three possible top-level fields:
      • after, which contains the state of the row after the update (or null' for DELETEs).
      • updated, which contains the updated timestamp.
      • resolved, which is emitted for records representing resolved timestamps. These records do not include an "after" value since they only function as checkpoints.
    • For INSERT and UPDATE, the current state of the row inserted or updated.
    • For DELETE, null.

For example:

Statement Response
INSERT INTO office_dogs VALUES (1, 'Petee'); JSON: [1] {"after": {"id": 1, "name": "Petee"}}
Avro: {"id":{"long":1}} {"after":{"office_dogs":{"id":{"long":1},"name":{"string":"Petee"}}}}
DELETE FROM office_dogs WHERE name = 'Petee' JSON: [1] {"after": null}
Avro: {"id":{"long":1}} {"after":null}

For webhook sinks, the response format comes as a batch of changefeed messages with a payload and length. Batching is done with a per-key guarantee, which means that the messages with the same key are considered for the same batch. Note that batches are only collected for row updates and not resolved timestamps:

{"payload": [{"after" : {"a" : 1, "b" : "a"}, "key": [1], "topic": "foo"}, {"after": {"a": 1, "b": "b"}, "key": [1], "topic": "foo" }], "length":2}

See the Files for more detail on the file naming format for Enterprise changefeeds.

Avro

The following sections provide information on Avro usage with CockroachDB changefeeds. Creating a changefeed using Avro is available in Core and Enterprise changefeeds.

Avro limitations

Below are clarifications for particular SQL types and values for Avro changefeeds:

  • Decimals must have precision specified.
  • BIT and VARBIT types are encoded as arrays of 64-bit integers.

    For efficiency, CockroachDB encodes BIT and VARBIT bitfield types as arrays of 64-bit integers. That is, base-2 (binary format) BIT and VARBIT data types are converted to base 10 and stored in arrays. Encoding in CockroachDB is big-endian, therefore the last value may have many trailing zeroes. For this reason, the first value of each array is the number of bits that are used in the last value of the array.

    For instance, if the bitfield is 129 bits long, there will be 4 integers in the array. The first integer will be 1; representing the number of bits in the last value, the second integer will be the first 64 bits, the third integer will be bits 65–128, and the last integer will either be 0 or 9223372036854775808 (i.e., the integer with only the first bit set, or 1000000000000000000000000000000000000000000000000000000000000000 when base 2).

    This example is base-10 encoded into an array as follows:

    {"array": [1, <first 64 bits>, <second 64 bits>, 0 or 9223372036854775808]}
    

    For downstream processing, it is necessary to base-2 encode every element in the array (except for the first element). The first number in the array gives you the number of bits to take from the last base-2 number — that is, the most significant bits. So, in the example above this would be 1. Finally, all the base-2 numbers can be appended together, which will result in the original number of bits, 129.

    In a different example of this process where the bitfield is 136 bits long, the array would be similar to the following when base-10 encoded:

    {"array": [8, 18293058736425533439, 18446744073709551615, 13690942867206307840]}
    

    To then work with this data, you would convert each of the elements in the array to base-2 numbers, besides the first element. For the above array, this would convert to:

    [8, 1111110111011011111111111111111111111111111111111111111111111111, 1111111111111111111111111111111111111111111111111111111111111111, 1011111000000000000000000000000000000000000000000000000000000000]
    

    Next, you use the first element in the array to take the number of bits from the last base-2 element, 10111110. Finally, you append each of the base-2 numbers together — in the above array, the second, third, and truncated last element. This results in 136 bits, the original number of bits.

Avro types

Below is a mapping of CockroachDB types to Avro types:

CockroachDB Type Avro Type Avro Logical Type
INT LONG
BOOL BOOLEAN
FLOAT DOUBLE
STRING STRING
DATE INT DATE
TIME LONG TIME-MICROS
TIMESTAMP LONG TIME-MICROS
TIMESTAMPTZ LONG TIME-MICROS
DECIMAL STRING, BYTES DECIMAL
UUID STRING
INET STRING
JSONB STRING
ENUMS STRING
INTERVAL STRING
ARRAY ARRAY
BIT Array of LONG
VARBIT Array of LONG
COLLATE STRING
Note:

The DECIMAL type is a union between Avro STRING and Avro DECIMAL types.

See also


Yes No