8 ways to use CDC Transformations for more powerful data streaming

8 ways to use CDC Transformations for more powerful data streaming

Today we’re introducing CDC Transformations. CDC Transformations are SQL-like statements that allow you to (1) filter (2) transform (3) and choose the schema of your data stream. Instead of the headaches of sidecar services or downstream tools, filtering and transforming streaming data is now as simple as a SQL query. 

What are changefeeds?

Changefeeds allow you to stream continuous updates from a table or a one-time snapshot of the table to a SQL connection or external system. They’re native to CockroachDB, meaning you don’t need to spend time and money setting up a third party change data capture system. Changefeeds have been used to fulfill a wide range of goals from keeping inventory management systems up to date in real time, to replicating data in data warehouses. You can build a data pipeline for business analytics, event-driven architectures, data archival, audit logging, migrations, and more. 

CDC Transformations make it even easier and less expensive to operate changefeeds. If you know SQL, you know how to create with CDC Transformations. The basic format looks like this:

CREATE CHANGEFEED [INTO 'sink'] [WITH opt=val, ...]
AS SELECT [columns or expressions] FROM t WHERE [functions];

With these primitives in hand, we can easily and elegantly create interesting and complex streaming behaviors. Here are 8 things you can do with them.

Note: CDC Transformations are currently in preview, with a planned GA in our 23.1 release. CDC Transformations UX and capabilities are likely to evolve. Check out our documentation for up to date limitations and details. 

8 CDC Transformation Use Cases

1. Only send INSERTS & UPDATES for your outbox table

Every message in an outbox table is meant to be streamed and then deleted. In this case, you double the messages you have to send by sending the deletes! CDC Transformations allow you to filter out delete messages from the stream. 

CREATE CHANGEFEED INTO 'kafka://[endpoint]' WITH resolved, schema_change_policy=stop
AS SELECT * FROM outbox WHERE NOT cdc_is_delete();

2. Remove the need for an outbox table 

Outbox tables allow you to create an event stream with custom fields and structure. CDC Transformations can replace the need to store this information in an outbox table. Instead, you can set the metadata and structure in the changefeed.

CREATE CHANGEFEED INTO kafka://[endpoint]?topic_name=events AS SELECT
cdc_updated_timestamp()::int AS event_timestamp,
users AS table,
IF(cdc_is_delete(),delete,create) AS type,
jsonb_build_object(email,email, admin, admin) AS payload
FROM users;

In this example, our message will include metadata (the timestamp for the event, the name of the table, and whether this event was a delete or an update) and a payload column with the event data we want in json format.

3. Chose which table columns will be included in your stream

Often you only need a subset of columns from a table for your streaming use case. By filtering unneeded columns with CDC transformations, you only get messages when the columns you care about have been updated. Message sizes can also be greatly reduced. Filtering columns can be a great way to reduce Total Cost of Ownership (TCO) of your streaming pipeline by reducing the required allocation of compute and storage resources downstream.

CREATE CHANGEFEED INTO 'kafka://[endpoint]' 
WITH schema_change_policy=stop
AS SELECT column1, column2 FROM table;

4. Emit only the events that made changes you care about 

CDC transformations can filter out all updates that don’t modify the column we care about. 

For example, you run a gaming platform, where each game’s meta information is represented in a single row, with a status column to indicate when the game goes from “in progress” to “finished” state. You might have a microservice that manages the leaderboard calculation for your gaming platform. That microservice only cares about a game where the game state is “finished”. We can filter updates that do not change the game state.

CREATE CHANGEFEED INTO 'kafka://[endpoint]' 
WITH schema_change_policy=stop, diff
AS SELECT *, cdc_prev() AS previous FROM important_table
WHERE important_column != cdc_prev()->'important_column';

5. Emit only events that match a certain criteria

CDC Transformations let you filter a row out if the contents of the row do not match a certain criteria. For example, you may have a user or application column that tracks what user or application modified that row. You can create a changefeed that only serves messages modified by a particular user or application. Alternatively you may want to set a criteria to match on, such as age > 18. 

CREATE CHANGEFEED INTO 'kafka://[endpoint]' 
WITH schema_change_policy=stop
WHERE application='prod_microservice';

6. Emit only events from a certain region

In CockroachDB, you can set data-placement locality in a table at a row-level. CDC Transformations allow you to choose rows from a region or set of regions from your table to include in your changefeed. This can prevent costly cross-regional data transfer costs or help ensure data fencing. 

CREATE CHANGEFEED INTO 'kafka://[endpoint]' 
WITH schema_change_policy=stop
WHERE region='US/east';

7. Ensure a stable message schema, event with schema changes

CockroachDB allows online changes to your table’s schema that prevent application downtime. However, this can negatively impact streaming pipelines, because schema changes such as adding a new column or changing a column’s type can change the schema of your changefeed message as well. CDC Transformations let you set the columns and data type for your changefeed message, stabilizing the schema in the case of a schema change.

Note: while CDC Transformations are in preview, schema changes will cause the changefeed to fail. This limitation will be removed in future releases.

CREATE CHANGEFEED INTO 'kafka://[endpoint]' 
WITH schema_change_policy=stop
AS SELECT id::string, name::string, admin::string FROM users_table;

8. Load balance changefeed messages across feeds

CDC Transformations can be used to shard changefeeds, as a means to load balance. For example, you could create several changefeeds, each taking on ⅛ of the unique rows for the table:

CREATE CHANGEFEED INTO 'kafka://[endpoint]' 
WITH schema_change_policy=stop
WHERE id % 8 = 0;

Video demo of CDC Transformations at work

In this video demonstration of CDC Transformations I show three different use cases. If you have questions about those use cases you can comment on the video or reach out to us on our community slack.

Keep Reading

Change data capture: Fine-tuning changefeeds for performance and durability

NOTE: This blog requires a fairly in-depth understanding of your application and changefeeds. If you want to learn more …

Read More
Exporting data with changefeeds

Exporting data is a crucial tool in any database user’s toolkit. In CockroachDB, the EXPORT command has long …

Read More
Idempotency and ordering in event-driven systems

Many software systems use a batch-driven process to operate. They accumulate data in a database and periodically a job …

Read More
Developer Resources