Changefeed Messages

On this page Carat arrow pointing down
Warning:
Cockroach Labs will stop providing Assistance Support for v22.2 on June 5, 2024. Prior to that date, upgrade to a more recent version to continue receiving support. For more details, see the Release Support Policy.

Changefeeds emit messages as changes happen to watched tables. CockroachDB changefeeds have an at-least-once delivery guarantee as well as message ordering guarantees. You can also configure the format of changefeed messages with different options (e.g., format=avro).

This page describes the format and behavior of changefeed messages. You will find the following information on this page:

  • Responses: The general format of changefeed messages.
  • Ordering guarantees: CockroachDB's guarantees for a changefeed's message ordering.
  • Delete messages: The format of messages when a row is deleted.
  • Schema changes: The effect of schema changes on a changefeed.
  • Garbage collection: How protected timestamps and garbage collection interacts with running changefeeds.
  • Avro: The limitations and type mapping when creating a changefeed using Avro format.

Responses

By default, changefeed messages emitted to a sink contain keys and values of the watched table entries that have changed, with messages composed of the following fields:

  • Key: An array always composed of the row's PRIMARY KEY field(s) (e.g., [1] for JSON or {"id":{"long":1}} for Avro).
  • Value:
    • One of three possible top-level fields:
      • after, which contains the state of the row after the update (or null for DELETEs).
      • updated, which contains the updated timestamp.
      • resolved, which is emitted for records representing resolved timestamps. These records do not include an after value since they only function as checkpoints.
    • For INSERT and UPDATE, the current state of the row inserted or updated.
    • For DELETE, null.

For example:

Statement Response
INSERT INTO office_dogs VALUES (1, 'Petee'); JSON: [1] {"after": {"id": 1, "name": "Petee"}}
Avro: {"id":{"long":1}} {"after":{"office_dogs":{"id":{"long":1},"name":{"string":"Petee"}}}}
DELETE FROM office_dogs WHERE name = 'Petee' JSON: [1] {"after": null}
Avro: {"id":{"long":1}} {"after":null}

To limit messages to just the changed key value, use the envelope option set to key_only.

When a changefeed targets a table with multiple column families, the family name is appended to the table name as part of the topic. See Tables with columns families in changefeeds for guidance.

For webhook sinks, the response format arrives as a batch of changefeed messages with a payload and length. Batching is done with a per-key guarantee, which means that messages with the same key are considered for the same batch. Note that batches are only collected for row updates and not resolved timestamps:

{"payload": [{"after" : {"a" : 1, "b" : "a"}, "key": [1], "topic": "foo"}, {"after": {"a": 1, "b": "b"}, "key": [1], "topic": "foo" }], "length":2}

See changefeed files for more detail on the file naming format for Enterprise changefeeds.

Ordering guarantees

  • In most cases, each version of a row will be emitted once. However, some infrequent conditions (e.g., node failures, network partitions) will cause them to be repeated. This gives our changefeeds an at-least-once delivery guarantee.

  • Once a row has been emitted with some timestamp, no previously unseen versions of that row will be emitted with a lower timestamp. That is, you will never see a new change for that row at an earlier timestamp.

    For example, if you ran the following:

    > CREATE TABLE foo (id INT PRIMARY KEY DEFAULT unique_rowid(), name STRING);
    > CREATE CHANGEFEED FOR TABLE foo INTO 'kafka://localhost:9092' WITH UPDATED;
    > INSERT INTO foo VALUES (1, 'Carl');
    > UPDATE foo SET name = 'Petee' WHERE id = 1;
    

    You'd expect the changefeed to emit:

    [1] {"__crdb__": {"updated": <timestamp 1>}, "id": 1, "name": "Carl"}
    [1] {"__crdb__": {"updated": <timestamp 2>}, "id": 1, "name": "Petee"}
    

    It is also possible that the changefeed emits an out of order duplicate of an earlier value that you already saw:

    [1] {"__crdb__": {"updated": <timestamp 1>}, "id": 1, "name": "Carl"}
    [1] {"__crdb__": {"updated": <timestamp 2>}, "id": 1, "name": "Petee"}
    [1] {"__crdb__": {"updated": <timestamp 1>}, "id": 1, "name": "Carl"}
    

    However, you will never see an output like the following (i.e., an out of order row that you've never seen before):

    [1] {"__crdb__": {"updated": <timestamp 2>}, "id": 1, "name": "Petee"}
    [1] {"__crdb__": {"updated": <timestamp 1>}, "id": 1, "name": "Carl"}
    
  • If a row is modified more than once in the same transaction, only the last change will be emitted.

  • Rows are sharded between Kafka partitions by the row’s primary key.

  • The UPDATED option adds an "updated" timestamp to each emitted row. You can also use the RESOLVED option to emit "resolved" timestamp messages to each Kafka partition. A "resolved" timestamp is a guarantee that no (previously unseen) rows with a lower update timestamp will be emitted on that partition.

    For example:

    {"__crdb__": {"updated": "1532377312562986715.0000000000"}, "id": 1, "name": "Petee H"}
    {"__crdb__": {"updated": "1532377306108205142.0000000000"}, "id": 2, "name": "Carl"}
    {"__crdb__": {"updated": "1532377358501715562.0000000000"}, "id": 3, "name": "Ernie"}
    {"__crdb__":{"resolved":"1532379887442299001.0000000000"}}
    {"__crdb__":{"resolved":"1532379888444290910.0000000000"}}
    {"__crdb__":{"resolved":"1532379889448662988.0000000000"}}
    ...
    {"__crdb__":{"resolved":"1532379922512859361.0000000000"}}
    {"__crdb__": {"updated": "1532379923319195777.0000000000"}, "id": 4, "name": "Lucky"}
    
  • With duplicates removed, an individual row is emitted in the same order as the transactions that updated it. However, this is not true for updates to two different rows, even two rows in the same table.

    To compare two different rows for happens-before, compare the "updated" timestamp. This works across anything in the same cluster (e.g., tables, nodes, etc.).

    Resolved timestamp notifications on every Kafka partition can be used to provide strong ordering and global consistency guarantees by buffering records in between timestamp closures. Use the "resolved" timestamp to see every row that changed at a certain time.

    The complexity with timestamps is necessary because CockroachDB supports transactions that can affect any part of the cluster, and it is not possible to horizontally divide the transaction log into independent changefeeds. For more information about this, read our blog post on CDC.

Delete messages

Deleting a row will result in a changefeed outputting the primary key of the deleted row and a null value. For example, with default options, deleting the row with primary key 5 will output:

[5] {"after": null}

In some unusual situations you may receive a delete message for a row without first seeing an insert message. For example, if an attempt is made to delete a row that does not exist, you may or may not get a delete message because the changefeed behavior is undefined to allow for optimizations at the storage layer. Similarly, if there are multiple writes to a row within a single transaction, only the last one will propagate to a changefeed. This means that creating and deleting a row within the same transaction will never result in an insert message, but may result in a delete message.

Schema Changes

In v22.1, CockroachDB introduced the declarative schema changer. When schema changes happen that use the declarative schema changer by default, changefeeds will not emit duplicate records for the table that is being altered. It will only emit a copy of the table using the new schema. Refer to Schema changes with column backfill for examples of this.

Avro schema changes

To ensure that the Avro schemas that CockroachDB publishes will work with the schema compatibility rules used by the Confluent schema registry, CockroachDB emits all fields in Avro as nullable unions. This ensures that Avro and Confluent consider the schemas to be both backward- and forward-compatible, because the Confluent Schema Registry has a different set of rules than Avro for schemas to be backward- and forward-compatible.

The original CockroachDB column definition is also included within a doc field __crdb__ in the schema. This allows CockroachDB to distinguish between a NOT NULL CockroachDB column and a NULL CockroachDB column.

Warning:

Schema validation tools should ignore the __crdb__ field. This is an internal CockroachDB schema type description that may change between CockroachDB versions.

Schema changes with column backfill

When schema changes with column backfill (e.g., adding a column with a default, adding a stored computed column, adding a NOT NULL column, dropping a column) are made to watched rows, CockroachDB emits a copy of the table using the new schema.

Note:

Schema changes that do not use the declarative schema changer by default will trigger a changefeed to emit a copy of the table being altered as well as a copy of the table using the new schema. For a list of supported schema changes, refer to the Declarative schema changer section.

The following example demonstrates the messages you will receive after creating a changefeed and then applying a schema change to the watched table:

icon/buttons/copy
CREATE TABLE office_dogs (
     id INT PRIMARY KEY,
     name STRING);
icon/buttons/copy
INSERT INTO office_dogs VALUES
   (1, 'Petee H'),
   (2, 'Carl'),
   (3, 'Ernie');
icon/buttons/copy
CREATE CHANGEFEED FOR TABLE office_dogs INTO 'external://cloud';

You receive each of the rows at the sink:

[1] {"id": 1, "name": "Petee H"}
[2] {"id": 2, "name": "Carl"}
[3] {"id": 3, "name": "Ernie"}

For example, add a column to the watched table:

icon/buttons/copy
ALTER TABLE office_dogs ADD COLUMN likes_treats BOOL DEFAULT TRUE;

After the schema change, the changefeed will emit a copy of the table with the new schema:

[1] {"id": 1, "name": "Petee H"}
[2] {"id": 2, "name": "Carl"}
[3] {"id": 3, "name": "Ernie"}
[1] {"id": 1, "likes_treats": true, "name": "Petee H"}
[2] {"id": 2, "likes_treats": true, "name": "Carl"}
[3] {"id": 3, "likes_treats": true, "name": "Ernie"}

If the schema change does not use the declarative schema change by default, the changefeed will emit a copy of the altered table and a copy of the table using the new schema:

[1] {"id": 1, "name": "Petee H"}
[2] {"id": 2, "name": "Carl"}
[3] {"id": 3, "name": "Ernie"}
[1] {"id": 1, "name": "Petee H"}  # Duplicate
[2] {"id": 2, "name": "Carl"}     # Duplicate
[3] {"id": 3, "name": "Ernie"}    # Duplicate
[1] {"id": 1, "likes_treats": true, "name": "Petee H"}
[2] {"id": 2, "likes_treats": true, "name": "Carl"}
[3] {"id": 3, "likes_treats": true, "name": "Ernie"}

To prevent the changefeed from emitting a copy of the table with the new schema, use the schema_change_policy = nobackfill option. In the preceding two output blocks, the new schema messages that include the "likes_treats" column will not emit.

Refer to the CREATE CHANGEFEED option table for detail on the schema_change_policy option. You can also use the schema_change_events option to define the type of schema change event that triggers the behavior specified in schema_change_policy.

Note:

As of v22.1, changefeeds filter out VIRTUAL computed columns from events by default. This is a backward-incompatible change. To maintain the changefeed behavior in previous versions where NULL values are emitted for virtual computed columns, see the virtual_columns option for more detail.

Garbage collection and changefeeds

By default, protected timestamps will protect changefeed data from garbage collection up to the time of the checkpoint.

Protected timestamps will protect changefeed data from garbage collection in the following scenarios:

  • The downstream changefeed sink is unavailable. Protected timestamps will protect changes until you either cancel the changefeed or the sink becomes available once again.
  • You pause a changefeed with the protect_data_from_gc_on_pause option enabled. Or, a changefeed with protect_data_from_gc_on_pause pauses from a retryable error. Protected timestamps will protect changes until you resume the changefeed.

However, if the changefeed lags too far behind, the protected changes could cause data storage issues. To release the protected timestamps and allow garbage collection to resume, you can cancel the changefeed or resume in the case of a paused changefeed.

We recommend monitoring storage and the number of running changefeeds. If a changefeed is not advancing and is retrying, it will (without limit) accumulate garbage while it retries to run.

When protect_data_from_gc_on_pause is unset, pausing the changefeed will release the existing protected timestamp record. As a result, you could lose the changes if the changefeed remains paused longer than the garbage collection window.

The only ways for changefeeds to not protect data are:

  • You pause the changefeed without protect_data_from_gc_on_pause set.
  • You cancel the changefeed.
  • The changefeed fails without on_error=pause set.

Avro

The following sections provide information on Avro usage with CockroachDB changefeeds. Creating a changefeed using Avro is available in Core and Enterprise changefeeds.

Avro limitations

Below are clarifications for particular SQL types and values for Avro changefeeds:

  • Decimals must have precision specified.
  • BYTES (or its aliases BYTEA and BLOB) are often used to store machine-readable data. When you stream these types through a changefeed with format=avro, CockroachDB does not encode or change the data. However, Avro clients can often include escape sequences to present the data in a printable format, which can interfere with deserialization. A potential solution is to hex-encode BYTES values when initially inserting them into CockroachDB. This will ensure that Avro clients can consistently decode the hexadecimal. Note that hex-encoding values at insertion will increase record size.
  • BIT and VARBIT types are encoded as arrays of 64-bit integers.

    For efficiency, CockroachDB encodes BIT and VARBIT bitfield types as arrays of 64-bit integers. That is, base-2 (binary format) BIT and VARBIT data types are converted to base 10 and stored in arrays. Encoding in CockroachDB is big-endian, therefore the last value may have many trailing zeroes. For this reason, the first value of each array is the number of bits that are used in the last value of the array.

    For instance, if the bitfield is 129 bits long, there will be 4 integers in the array. The first integer will be 1; representing the number of bits in the last value, the second integer will be the first 64 bits, the third integer will be bits 65–128, and the last integer will either be 0 or 9223372036854775808 (i.e., the integer with only the first bit set, or 1000000000000000000000000000000000000000000000000000000000000000 when base 2).

    This example is base-10 encoded into an array as follows:

    {"array": [1, <first 64 bits>, <second 64 bits>, 0 or 9223372036854775808]}
    

    For downstream processing, it is necessary to base-2 encode every element in the array (except for the first element). The first number in the array gives you the number of bits to take from the last base-2 number — that is, the most significant bits. So, in the example above this would be 1. Finally, all the base-2 numbers can be appended together, which will result in the original number of bits, 129.

    In a different example of this process where the bitfield is 136 bits long, the array would be similar to the following when base-10 encoded:

    {"array": [8, 18293058736425533439, 18446744073709551615, 13690942867206307840]}
    

    To then work with this data, you would convert each of the elements in the array to base-2 numbers, besides the first element. For the above array, this would convert to:

    [8, 1111110111011011111111111111111111111111111111111111111111111111, 1111111111111111111111111111111111111111111111111111111111111111, 1011111000000000000000000000000000000000000000000000000000000000]
    

    Next, you use the first element in the array to take the number of bits from the last base-2 element, 10111110. Finally, you append each of the base-2 numbers together — in the above array, the second, third, and truncated last element. This results in 136 bits, the original number of bits.

Avro types

Below is a mapping of CockroachDB types to Avro types:

CockroachDB Type Avro Type Avro Logical Type
ARRAY ARRAY
BIT Array of LONG
BLOB BYTES
BOOL BOOLEAN
BYTEA BYTES
BYTES BYTES
COLLATE STRING
DATE INT DATE
DECIMAL STRING, BYTES DECIMAL
ENUMS STRING
FLOAT DOUBLE
INET STRING
INT LONG
INTERVAL STRING
JSONB STRING
STRING STRING
TIME LONG TIME-MICROS
TIMESTAMP LONG TIME-MICROS
TIMESTAMPTZ LONG TIME-MICROS
UUID STRING
VARBIT Array of LONG
Note:

The DECIMAL type is a union between Avro STRING and Avro DECIMAL types.

CSV

You can use the format=csv option to emit CSV format messages from your changefeed. However, there are the following limitations with this option:

New in v22.2: Changefeeds emit the same CSV format as EXPORT. In v22.1, changefeeds emitted CSV data that wrapped some values in single quotes, which were not wrapped when exporting data with the EXPORT statement.

See Export Data with Changefeeds for detail on using changefeeds to export data from CockroachDB.

The following shows example CSV format output:

4ccccccc-cccc-4c00-8000-00000000000f,washington dc,Holly Williams,95153 Harvey Street Suite 5,2165526885
51eb851e-b851-4c00-8000-000000000010,washington dc,Ryan Hickman,21187 Dennis Village,1635328127
56242e0e-4935-4d21-a8cd-915f4002e53c,washington dc,Joshua Smith,80842 Edwards Bridge,1892482054
5707febd-0278-4e55-8715-adbb35f09759,washington dc,Preston Fisher,5603 David Mission Apt. 93,5802323725
576546de-d59c-429b-9251-be79472643d4,washington dc,Anna Underwood,81246 Lee Knoll,2838348371
596c1cf8-d59f-4ad6-9379-6aba82648ca9,washington dc,Gerald Good,59876 Wang Neck,6779715200
5d30f838-e24c-46cb-bb0c-4a5643ddc2b1,washington dc,Lawrence Lucas,67248 Robinson Way Apt. 46,6167017463
65c398b9-7cce-45c5-9a5b-9561569ae030,washington dc,Mr. Xavier Waters,85393 Diaz Camp,1783482816
7a78fb0b-d368-46f6-b530-f9c74c19ba25,washington dc,Christopher Owens,7460 Curtis Centers,1470959770
80696ab6-7ec9-4e55-afee-4f468478fe82,washington dc,Patricia Gibson,77436 Vaughn Ville,3271633253
93750763-f992-4018-8a11-bf15ebfecc06,washington dc,Alison Romero,15878 Grant Forks Suite 16,2742488244
9cc3f995-0a91-4612-a079-e81ca28257ab,washington dc,Corey Dunn,15958 Jenna Locks,2358457606
9efd7047-c5e5-4501-9fcd-cff2d27efc34,washington dc,Patricia Gray,16139 Nicholas Wells Suite 64,8935020269
a253a15c-8e0a-4d25-aa87-1a0839935005,washington dc,Samantha Lee,90429 Russell Coves,2990967825
a3081762-9841-4275-ad7a-75a7e8d5f69d,washington dc,Preston Fisher,5603 David Mission Apt. 93,5802323725
aebb80a6-eceb-4d10-9d9a-f26270188114,washington dc,Kenneth Miller,52393 Stephen Mill Apt. 7,3966083325

See also


Yes No
On this page

Yes No