8 ways to use CDC Queries for more powerful data streaming

8 ways to use CDC Queries for more powerful data streaming
[ Webinar ]

What's new in CockroachDB?

See new features

CDC Queries are SQL-like statements that allow you to (1) filter (2) transform (3) and choose the schema of your data stream. Instead of the headaches of sidecar services or downstream tools, filtering and transforming streaming data is now as simple as a SQL query. Here is a quick video tutorial about how to use CDC Queries that accounts for all the new functionality as of the CockroachDB 23.1 release:

What are changefeeds?

Changefeeds allow you to stream continuous updates from a table or a one-time snapshot of the table to a SQL connection or external system. They’re native to CockroachDB, meaning you don’t need to spend time and money setting up a third party change data capture system. Changefeeds have been used to fulfill a wide range of goals from keeping inventory management systems up to date in real time, to replicating data in data warehouses. You can build a data pipeline for business analytics, event-driven architectures, data archival, audit logging, migrations, and more. 

RELATED Idempotency and ordering in event-driven systems

CDC Queries make it even easier and less expensive to operate changefeeds. If you know SQL, you know how to create with CDC Queries. The basic format looks like this:

CREATE CHANGEFEED [INTO 'sink'] [WITH opt=val, ...]
AS SELECT [columns or expressions] FROM t WHERE [functions];

With these primitives in hand, we can easily and elegantly create interesting and complex streaming behaviors. Here are 8 things you can do with them.

Note: CDC Queries are currently in preview, with a planned GA in our 23.1 release. CDC Queries UX and capabilities are likely to evolve. Check out our documentation for up to date limitations and details. 

8 CDC Transformation Use Cases

1. Only send INSERTS & UPDATES for your outbox table

Every message in an outbox table is meant to be streamed and then deleted. In this case, you double the messages you have to send by sending the deletes! CDC Queries allow you to filter out delete messages from the stream. 

CREATE CHANGEFEED INTO 'kafka://[endpoint]' WITH resolved, schema_change_policy=stop
AS SELECT * FROM outbox WHERE NOT cdc_is_delete();

2. Remove the need for an outbox table 

Outbox tables allow you to create an event stream with custom fields and structure. CDC Queries can replace the need to store this information in an outbox table. Instead, you can set the metadata and structure in the changefeed.

CREATE CHANGEFEED INTO kafka://[endpoint]?topic_name=events AS SELECT
cdc_updated_timestamp()::int AS event_timestamp,
users AS table,
IF(cdc_is_delete(),delete,create) AS type,
jsonb_build_object(email,email, admin, admin) AS payload
FROM users;

In this example, our message will include metadata (the timestamp for the event, the name of the table, and whether this event was a delete or an update) and a payload column with the event data we want in JSON format.

3. Chose which table columns will be included in your stream

Often you only need a subset of columns from a table for your streaming use case. By filtering unneeded columns with CDC Queries, you only get messages when the columns you care about have been updated. Message sizes can also be greatly reduced. Filtering columns can be a great way to reduce Total Cost of Ownership (TCO) of your streaming pipeline by reducing the required allocation of compute and storage resources downstream.

CREATE CHANGEFEED INTO 'kafka://[endpoint]' 
WITH schema_change_policy=stop
AS SELECT column1, column2 FROM table;

4. Emit only the events that made changes you care about 

CDC Queries can filter out all updates that don’t modify the column we care about. 

For example, you run a gaming platform, where each game’s meta information is represented in a single row, with a status column to indicate when the game goes from “in progress” to “finished” state. You might have a microservice that manages the leaderboard calculation for your gaming platform. That microservice only cares about a game where the game state is “finished”. We can filter updates that do not change the game state.

CREATE CHANGEFEED INTO 'kafka://[endpoint]' 
WITH schema_change_policy=stop, diff
AS SELECT *, cdc_prev() AS previous FROM important_table
WHERE important_column != cdc_prev()->'important_column';

5. Emit only events that match a certain criteria

CDC Queries let you filter a row out if the contents of the row do not match a certain criteria. For example, you may have a user or application column that tracks what user or application modified that row. You can create a changefeed that only serves messages modified by a particular user or application. Alternatively you may want to set a criteria to match on, such as age > 18. 

CREATE CHANGEFEED INTO 'kafka://[endpoint]' 
WITH schema_change_policy=stop
AS SELECT * FROM table
WHERE application='prod_microservice';

6. Emit only events from a certain region

In CockroachDB, you can set data-placement locality in a table at a row-level. CDC Queries allow you to choose rows from a region or set of regions from your table to include in your changefeed. This can prevent costly cross-regional data transfer costs or help ensure data fencing. 

CREATE CHANGEFEED INTO 'kafka://[endpoint]' 
WITH schema_change_policy=stop
AS SELECT * FROM table
WHERE region='US/east';

7. Ensure a stable message schema, event with schema changes

CockroachDB allows online changes to your table’s schema that prevent application downtime. However, this can negatively impact streaming pipelines, because schema changes such as adding a new column or changing a column’s type can change the schema of your changefeed message as well. CDC Queries let you set the columns and data type for your changefeed message, stabilizing the schema in the case of a schema change.

CREATE CHANGEFEED INTO 'kafka://[endpoint]' 
WITH schema_change_policy=stop
AS SELECT id::string, name::string, admin::string FROM users_table;

8. Load balance changefeed messages across feeds

CDC Queries can be used to shard changefeeds, as a means to load balance. For example, you could create several changefeeds, each taking on ⅛ of the unique rows for the table:

CREATE CHANGEFEED INTO 'kafka://[endpoint]' 
WITH schema_change_policy=stop
AS SELECT * FROM table 
WHERE id % 8 = 0;

Video demo of CDC Queries at work

In this video demonstration of CDC Queries I show three different use cases (Note: the name of our CDC Queries feature was “CDC Transformations” at the time of recording). If you have questions about those use cases you can comment on the video or reach out to us on our community slack.

Keep Reading

Change data capture: Fine-tuning changefeeds for performance and durability

NOTE: This blog requires a fairly in-depth understanding of your application and changefeeds. If you want to learn more …

Read more
What is a foreign key? (with SQL examples)

What is a foreign key? (TL;DR)

A foreign key is a column or columns in a database that (e.g. table_1.column_a) that are …

Read more
What is change data capture?

CockroachDB is an excellent system of record, but no technology exists in a vacuum. Some of our users would like to keep …

Read more