Writing History: MVCC bulk ingestion and index backfills

Writing History: MVCC bulk ingestion and index backfills
[ Guides ]

CockroachDB: The Definitive Guide

We wrote the book on distributed scale. Literally.

Free O'Reilly Book

This is part 2 of a 3-part blog series about how we’ve improved the way CockroachDB stores and modifies data in bulk (here is part 1 in case you missed it). We went way down into the deepest layers of our storage system, then up to our SQL schema changes and their transaction timestamps - all without anybody noticing (or at least we hope!)

•••

Bulk ingestions are used to write large amounts of data with high throughput, such as imports, backup restoration, schema changes, and index backfills. These operations use a separate write path that bypasses transaction processing, instead ingesting data directly into the storage engine with highly amortized write and replication costs. However, because these operations bypass regular transaction processing, they have also been able to sidestep our normal mechanisms for preserving MVCC history.

The Pebble storage engine uses an LSM (log-structured merge tree) to store key/value pairs. Briefly, data is stored in 7 levels (L0 to L6), with more recent data in the higher levels. Each level is roughly 10 times larger than the preceding level, and as data ages it is moved down the levels through a process known as compaction. Notably, each level is made up of a set of SST (sorted string table) files: immutable files that contain a set of key/value pairs sorted by key. Normally, as keys are written to Pebble, they are buffered in memory and then flushed out to disk as a new SST file in L0. A compaction will eventually read all of the SSTs in L0, merge and sort their key/value pairs, write them out to a new SST file in L1, and so on down the levels (this avoids having to search across too many SST files when reading, i.e. read amplification).

SST Files

Rather than writing individual key/value pairs into Pebble’s memtable, bulk ingestions build SST files that are written directly into L0 as-is. These still have to pass through network RPC calls and Raft replication, but avoid much of the intermediate serialization, transaction processing, and write amplification that regular transactional writes go through (such as intent resolution and write-ahead logging), using a much more compact representation. This is done via an RPC method called AddSSTable, which allows an RPC client (the ingestion processor) to submit arbitrary SSTs.

MVCC-compliant AddSSTable

Because AddSSTable ingests SST files as-is, the RPC client (ingestion processor) is free to write keys with whatever MVCC timestamps it wishes, unlike transactional writes which write at the current time. Recall from part 1 of this blog series how mutating MVCC history can violate invariants that the rest of the system relies on: writing below the timestamp cache watermark can violate serializability by changing the result of a read, and writing below the closed timestamp can lead to stale reads or missing updates in e.g. changefeeds, incremental backups, and follower reads. While we currently avoid such violations through careful coordination at the SQL schema layer, this approach will not work with serverless tenants and cluster-to-cluster replication, due to the isolation between the host cluster and its tenants.

We therefore had to make AddSSTable ingestions MVCC-compliant by having them write at the current timestamp. In principle, this is straightforward: the RPC client should just use the current timestamp for keys when constructing the SST. In practice, however, constructing and sending an SST takes some amount of time, which can make the timestamp outdated by the time it arrives. Certain ingestion processes also relied on using arbitrary SST timestamps — in particular, index backfills used the backfill start time for the initial secondary index entries, as a way to coordinate with concurrent table updates during the backfill, which required building a new index backfiller using an MVCC-compliant approach.

If the RPC client constructs and sends the SST using a current timestamp, the SST construction itself — adding keys, compressing blocks, generating the index, updating bloom filters, writing metadata, etc. — takes time, as does the network transmission. As a result, the request timestamp will already be in the past by the time the request arrives at the server. It may then be subject to further delays because of contention with concurrent transactions or backpressure from admission control.

If too much time elapses since the SST timestamp was chosen, it may fall below the oldest time at which a write is still allowed by the timestamp cache or closed timestamp. This will cause the request timestamp to be pushed to a higher timestamp where writes are still allowed. However, the actual SST timestamps that will be ingested are still at the original SST construction timestamp. We could reject the request in this case and ask the RPC client to reconstruct and resend it at a newer timestamp, but this can be relatively expensive since these SSTs are fairly large, and it could fail again. Instead, AddSSTable evaluation detects this situation and automatically rewrites the SST timestamps to the new request timestamp before ingestion.

This SST rewrite is highly optimized, and manipulates the SST’s internal blocks directly and in parallel, instead of reconstructing the SST from scratch which would involve a lot of wasted work such as recomputing Bloom filters and block properties. In benchmarks, this was able to rewrite an SST with a million keys in about 37 milliseconds, which is fairly minor considering the overall cost of fetching the source data, constructing the SST, transmitting and replicating it across the network, writing it to disk, and ingesting it into Pebble. Furthermore, these rewrites rarely happen in practice, as requests are rarely pushed — the source data is already buffered in memory before picking a timestamp and constructing the SST, and the requests rarely experience contention since the keyspan isn’t seeing live foreground traffic. End-to-end import benchmarks did not show any significant difference in throughput.

We did consider an alternative approach, where the SSTs were written with a placeholder timestamp that would be replaced with the actual write timestamp during Pebble reads, before finally replacing the placeholders with the actual timestamp in a later compaction. However, this came with significant complexity and read-time overhead — the SST rewrite approach ended up being much simpler, with no read-time overhead and negligible write-time overhead.

Incremental index backfiller

One of the main uses of AddSSTable inside CockroachDB is the index backfilling process. Index backfilling occurs when a user adds a new index to a table.

In CockroachDB, an index is a span of key/value pairs and a descriptor that describes the data contained in those key/value pairs. For example, suppose we have the following table with a couple of rows:

CREATE TABLE users (
    id INT8 NOT NULL,
    name STRING NULL,
    email STRING NULL,
    CONSTRAINT users_pkey PRIMARY KEY (id ASC)
);
INSERT INTO users VALUES (1, 'dade', 'zerocool@hotmail.com'), (2, 'kate', 'acidburn@yahoo.com');

The data for these rows are stored in the table’s primary index. The keys for these rows may look like this:

|- TableID 
|   |- IndexID
|   | |- Primary Key Column Values
|   | | |- ColumnFamilyID
v   V V V
/114/1/1/0
/114/1/2/0

If we were to add a unique index on email addresses:

 CREATE UNIQUE INDEX ON users(email);

This index would be stored as a series of new key/value pairs whose keys are based on the columns used in the index. Constructing keys from the indexed columns allows queries that have predicates using those columns (such as `SELECT * FROM users WHERE email = $1`) to find the relevant row quickly, without scanning all of the data in the primary index.  For our table above, we would expect to find two new key/value pairs for the new email index:

|- TableID 
|   |- IndexID
|   | |- Indexed Column Values
|   | |                    |- ColumnFamilyID
v   V V                    V
/114/2/"[acidburn@yahoo.com](mailto:acidburn@yahoo.com)"/0
/114/2/"zerocool@hotmail.com"/0 

Note that this example is for UNIQUE indexes. The key encoding for a non-unique index is a bit different.

When creating a new index on an existing table, these secondary index entries for the table’s existing data need to be constructed and written to the CockroachDB’s internal key/value store (“backfilled”) before the index can be used by SQL queries.

Further, CockroachDB supports online schema changes, allowing users to make changes to a table’s schema while concurrently allowing transactions to write to the table. To allow a CREATE INDEX schema change to happen while the table is online, we need a way to backfill these index entries (a potentially time consuming process for very large tables) while concurrent transactions may be modifying the very data we need to backfill. 

To achieve this, CockroachDB uses a process similar to that described in “Online, Asynchronous Schema Change in F1.” Schema changes are decomposed into a series of descriptor state transitions. A table descriptor is data stored in the database that contains the schema of the table, all of the associated indexes, and a version that is incremented any time the descriptor is updated. The descriptor’s representation of the index also includes a state that informs SQL operations what operations (READ, WRITE, DELETE) are permitted for the data associated with that index. This ensures that we don’t attempt to read from an index that is still in the process of being backfilled, for example.

As will be seen below, when adding a new index, the schema change process steps the descriptor through a series of states. Each state is designed to allow transactions to operate on the data correctly, regardless of whether they are using the current state of a descriptor or the previous state. Allowing two states to be active at once is important since we may have in-flight transactions that have not yet learned of the new descriptor state. CockroachDB’s descriptor leasing system allows us to wait until all in-flight transactions are using a single version of a descriptor, at which point we then know it is safe to transition the index to the next state.

In the case of adding a new index to an existing table, AddSSTable’s previous ability to write at old timestamps was central to this process. When a new index was created, it would first enter a state in which new writes would produce index entries at the current timestamp while a background process used AddSSTable to write index entries for existing rows at older timestamps.

The old world

To understand how this worked in practice, let’s look at how the previous schema change process would handle a concurrent update of one of the user table’s rows during the addition of the email index in our example schema above:

  1. The email index is added to the table descriptor and safely transitioned into a WRITE_ONLY state.  In this state, index entries related to table updates are created, updated, and deleted but not read. At this point, the email index is completely empty. We have two entries in the primary index at timestamps ts1 and ts2.
| Primary Index                                            | Email Index|
|----------------------------------------------------------+------------|
| /114/1/1/0@ts1  -> {name: dade, email:acidburn@yahoo.com}|            |
| /114/1/2/0@ts1  -> {name: kate, email:acidburn@yahoo.com}|            |
  1. While the new index is still in the WRITE_ONLY state, the backfill process scans the primary index and constructs SSTs containing the key/value pairs for the email index entries, using the timestamp at which it read the values from the primary index. The index entries’ keys are based on the columns being indexed and the values contain the primary key columns.
| Primary Index                                               | Email Index| In-flight SSTable                              |
|-------------------------------------------------------------+------------+------------------------------------------------|
| /114/1/1/0@ts1  -> {name: dade, email:zerocool@hotmail.com} |            | /114/2/"acidburn@yahoo.com"/0@ts2   -> {id: 2} |
| /114/1/2/0@ts1  -> {name: kate, email:acidburn@yahoo.com}   |            | /114/2/"zerocool@hotmail.com"/0@ts2 -> {id: 1} |	
  1. Before this SST is written to disk using AddSSTable, suppose the user table was concurrently modified to change Dade’s user ID.  In our example this table is very small, but for a much larger table, reading and constructing these batches could take a considerable amount of time, so it is all but guaranteed that this kind of concurrent write would occur.

This would result in two updates to the primary index: a delete of the old key /114/1/1/0 and the write of a new key /114/1/3/0. Since our email index is now WRITE_ONLY, it also receives a write to add an index entry. As a result our data looks like this:

AFTER: UPDATE users SET id=3 WHERE id = 1;

| Primary Index                                               | Email Index                                     | In-flight SSTable                              |
|-------------------------------------------------------------+-------------------------------------------------+------------------------------------------------|
| /114/1/2/0@ts1  -> {name: kate, email:acidburn@yahoo.com}   | /114/2/"zerocool@hotmail.com"/0@ts3 -> {id: 3}  | /114/2/"acidburn@yahoo.com"/0@ts2   -> {id: 2} |
| /114/1/3/0@ts3  -> {name: dade, email:zerocool@hotmail.com} |                                                 | /114/2/"zerocool@hotmail.com"/0@ts2 -> {id: 1} |
  1. Note that the in-flight SST still has the older value for the index entry /114/2/"[zerocool@hotmail.com](mailto:zerocool@hotmail.com)"/0.  Further, since an AddSSTable request is applied non-transactionally, it will never learn of the newer value.  But, the previous non-MVCC-compatible semantics of AddSSTable allow us to write at the older timestamp t2 while the newer version is at timestamp t3. As a result, despite the SST having stale data, the end state of our User index is correct:

After AddSSTable Application

| Primary Index                                               | Email Index                                     |
|-------------------------------------------------------------+-------------------------------------------------|
| /114/1/2/0@ts1  -> {name: kate, email:acidburn@yahoo.com}   | /114/2/"acidburn@yahoo.com"/0@ts2   -> {id: 2}  |
| /114/1/3/0@ts3  -> {name: dade, email:zerocool@hotmail.com} | /114/2/"zerocool@hotmail.com"/0@ts3 -> {id: 3}  |
|                                                             | /114/2/"zerocool@hotmail.com"/0@ts2 -> {id: 1}  |
  1. While the old version of the row was written to the index, it is shadowed by the newer version. That newer version is what will eventually be read by any new transactions using the index.
  2. Once the backfill is complete, we scan the newly created index to ensure that uniqueness constraints are met and then transition the index to a PUBLIC state in which it can be used for SQL queries.

This process worked well, but was incompatible with an MVCC-compliant AddSSTable request because it depends on AddSSTable’s ability to write at older timestamps. If, at step 4 above, we had forced the timestamps of the keys added by AddSStable to be the most recent timestamp, we would have clobbered the correct, updated value.

Without backdated keys in AddSSTable, we can’t safely apply an AddSSTable containing keys for an index receiving writes because we may be overwriting newer versions of those keys.

The new world

For index backfills to work with an MVCC-compatible AddSSTable, we needed a way to populate the index without writing at old timestamps and without interfering with ongoing transactional writes to the index. That is, we need to avoid sending any AddSSTable request that contains a key that might be written by a concurrent transactional write.

To do this, rather than allowing a newly added index to receive transactional writes during the backfill process, we introduce a temporary index capable of capturing those inflight writes while the newly added index’s backfill process progresses. Because the newly added index won’t be receiving writes, we can happily send AddSSTable requests to our newly added index without fear that we will be erroneously clobbering data from inflight writes. Once the backfill process is complete, we then merge the index entries captured by the temporary index into the new index using ordinary, transactional writes.

Let’s consider the same example we used with the old backfilling strategy and see how it works using our temporary index:

  1. The user index is created in a BACKFILLING state. In this state, no index entries are written for the index. A temporary index with the same indexed columns as the user index created and safely brought up to the WRITE_ONLY state. As before, when an index is in the WRITE_ONLY state we will write and delete the entries for that index whenever the table is updated.
| Primary Index                                             | Email Index | Temporary Index |
|-----------------------------------------------------------+-------------+-----------------|
| /114/1/1/0@ts1  -> {name: dade, email:acidburn@yahoo.com} |             |                 |
| /114/1/2/0@ts1  -> {name: kate, email:acidburn@yahoo.com} |             |                 |
  1. While the new index is still in the BACKFILLING state, the backfill process scans the primary index and constructs SSTs containing the key/value pairs for the new index entries.
| Primary Index                                               | Email Index | In-flight SSTable                              | Temporary Index                                |
|-------------------------------------------------------------+-------------+------------------------------------------------+------------------------------------------------|
| /114/1/2/0@ts1  -> {name: kate, email:acidburn@yahoo.com}   |             | /114/2/"acidburn@yahoo.com"/0@ts2   -> {id: 2} |                                                |
| /114/1/3/0@ts1  -> {name: dade, email:zerocool@hotmail.com} |             | /114/2/"zerocool@hotmail.com"/0@ts2 -> {id: 1} |                                                |
  1. As before, suppose an UPDATE was issued by a concurrent transaction. Since our email index is in BACKFILLING, it doesn’t receive an update, but our temporary index does:

AFTER: UPDATE users SET id=3 WHERE id = 1;

| Primary Index                                               | Email Index | In-flight SSTable                              | Temporary Index                                |
|-------------------------------------------------------------+-------------+------------------------------------------------+------------------------------------------------|
| /114/1/2/0@ts1  -> {name: kate, email:acidburn@yahoo.com}   |             | /114/2/"acidburn@yahoo.com"/0@ts2   -> {id: 2} | /114/3/"zerocool@hotmail.com"/0@ts3 -> {id: 3} |
| /114/1/3/0@ts3  -> {name: dade, email:zerocool@hotmail.com} |             | /114/2/"zerocool@hotmail.com"/0@ts2 -> {id: 1} |                                                |
  1. When we apply the SSTable to our email index, we do so using the new MVCC-compatible AddSSTable, which writes at the current timestamp. Because we’ve kept the email index in the BACKFILLING state, there is no danger that we may be clobbering data when we apply this AddSSTable request.

After AddSSTable Application

| Primary Index                                               | Email Index                                     | Temporary Index                                |
|-------------------------------------------------------------+-------------------------------------------------+------------------------------------------------|
| /114/1/2/0@ts1  -> {name: kate, email:acidburn@yahoo.com}   | /114/2/"acidburn@yahoo.com"/0@ts4   -> {id: 2}  | /114/3/"zerocool@hotmail.com"/0@ts3 -> {id: 3} |
| /114/1/3/0@ts3  -> {name: dade, email:zerocool@hotmail.com} | /114/2/"zerocool@hotmail.com"/0@ts4 -> {id: 1}  |                                                |                                        
  1. The email index is then brought up to a MERGING state. This state is similar to WRITE_ONLY in which all new writes generate index entries for this index. At this point, our index is nearly correct. We have index entries for all pre-existing data and we have index entries for all new writes going forward. But, we are missing the update that happened during the backfill process.
  2. The temporary index, however, captured that write for us. We can now read the k/v pair from this temporary index, change the key slightly to account for the different index ID, and write it to our email index using a normal transactional write that will write at the current timestamp. If we happen to get another update during this process, that update will be seen by both indexes and since we are using transactional writes, CockroachDBs normal transaction handling will ensure we are only ever writing the most up-to-date value.
| Primary Index                                               | Email Index                                     | Temporary Index                                |
|-------------------------------------------------------------+-------------------------------------------------+------------------------------------------------|
| /114/1/2/0@ts1  -> {name: kate, email:acidburn@yahoo.com}   | /114/2/"acidburn@yahoo.com"/0@ts4   -> {id: 2}  | /114/3/"zerocool@hotmail.com"/0@ts3 -> {id: 3} |
| /114/1/3/0@ts3  -> {name: dade, email:zerocool@hotmail.com} | /114/2/"zerocool@hotmail.com"/0@ts6 -> {id: 3}  |                                                |
|                                                             | /114/2/"zerocool@hotmail.com"/0@ts4 -> {id: 1}  |                                                |
  1. Now, our email index is ready to be brought online. Note that while the email index does contain a key/value for the stale version, it is completely shadowed by the value we merged into the index from the temporary index. Finally, we drop the temporary index and clean it up asynchronously in the background.

This process isn’t without a wrinkle or two. 

For instance, in the example above, we covered a case where an update to a table required a new key in our index. But what if the update required that we delete a key from the index?

Capturing this kind of concurrent change is the purpose of our temporary index. And a corresponding delete will indeed be issued for the relevant keys in the temporary index; but, how do we discover it during the merging process? While the underlying storage engine records deletion tombstones when deletes are issued, those tombstones aren’t typically returned when retrieving data. Further, those tombstones can be completely removed by the garbage collection process.

To account for this, we developed a new delete-preserving index encoding. Whenever an update would result in the deletion of an index entry, the delete-preserving encoded transforms the deletion into a write of a special tombstone value before sending it to the storage layer. The storage layer sees this as an ordinary write and will return it to us when we scan the index. During the merge process, we look for such tombstones and issue the appropriate deletion.

The description above elides some implementation details, but hopefully gives you a sense of how a change in the semantics of our storage-level AddSSTable command had ripple effects into a relatively high-level SQL operation.

Check back in tomorrow for part three of this three-part blog series: MVCC Range Tombstones

Erik Grinaker github link linkedin link

Versatile software engineer with expertise in Rust and Go systems programming, database internals, distributed systems, operating systems, and networking. Currently interested in building distributed database systems with ACID guarantees.

Keep Reading

Writing History: How we rebuilt bulk operations to preserve a history of changes

This is part 1 of a 3-part blog series about how we’ve improved the way CockroachDB stores and modifies data in …

Read more
Gotchas and solutions running a distributed system across Kubernetes clusters

```

I recently gave a talk at KubeCon North America -- “Experience Report: Running a Distributed System …

Read more
Writing History: How we rebuilt bulk operations to preserve a history of changes

This is part 1 of a 3-part blog series about how we’ve improved the way CockroachDB stores and modifies data in …

Read more