Data Migration Made Easy: Bulk Ingest from CSV

We think CockroachDB is a great database for many people, and want them to try us out. Not just for new applications, but for existing, large applications as well. The first problem that users with an existing database will hit when trying us out for the first time is getting their data into CockroachDB. For the 1.1 release, we built a new feature that performs high-speed, bulk data import. It works by transforming CSV files into our backup/restore format, then is able to quickly ingest the results.

CSV was chosen because it is so common, and most databases have ways to export to it (other formats can quickly be added in the future, given enough user need). Although this feature uses our enterprise backup and restore code, it does not (and will not) require a paid license. There is a doc describing all options and usage of CSV importing.

Because CSV import uses our enterprise restore feature, it is significantly faster than executing INSERTs or using the Postgres COPY protocol. In addition, the IMPORT statement supports (and encourages!) defining secondary indexes during CSV conversion. The indexes are created right alongside the primary index data, allowing all data to be ingested quickly. Predefining the indexes is much faster than importing and then adding indexes later.

Technical Details

Let’s go over the story again: a user wants to import a lot of data, and the fastest way to ingest data is with our RESTORE feature. Additionally, CSV is a common export format. Thus, we have made a new feature, IMPORT, that converts CSV to the format expected by RESTORE. This format, as detailed in the above link, relies on contiguous, non-overlapping SST files. So, the job of IMPORT is to convert CSVs into SSTs.

The basic idea for an implementation of this is:

  1. Read a table descriptor (a CREATE TABLE statement).
  2. For each line of a CSV file:
    • Parse and convert it to SQL datums.
    • Run the normal INSERT code that converts these datums to one or more KV pairs. There will be more than one pair if there are secondary indexes, for example.
  3. Sort the resulting KVs.
  4. Split them into 32MB SST files.

First implementation: local, no cluster

The initial implementation of this feature was done as a CLI subcommand that didn’t require a cluster at all, it just needed the input CSV files and table structure, and wrote the output to a directory. It was implemented in two phases.

Phase one converted the CSVs into KVs and added them to a RocksDB instance for sorting. Although RocksDB does store its data as SST files, those files were not directly used here. We used RocksDB only because it provides efficient sorting and iterating of KV pairs. Since RocksDB was doing the sorting, we were free to insert into it in any order. This allowed us to make the input parsing highly parallelizable and concurrent. We used separate goroutines to read the CSV, convert it into KVs, and write the KVs to RocksDB. The errgroup pipeline pattern was used to coordinate between the various goroutines in case any produced an error, and allowed much faster ingestion than using a naive, single-threaded solution. Phase two was to sort the KVs, then iterate through them in order, producing the desired 32MB files.

Overall this implementation performs well, is able to determine the 32MB split points as-needed, but requires 3x disk space (one copy for the CSVs, one for the intermediate RocksDB data, one for the final SSTs).

For CockroachDB 1.1, we connected this implementation to our SQL layer for easy use as the IMPORT statement. This allowed us to fetch and store data remotely, reducing the disk requirements to just the middle RocksDB temp data. The entire conversion will still happen on a single node, though. Only when restoring the created SSTs will CockroachDB’s distributed features be used.

Second implementation: distributed

Although the above single-node implementation works, we thought we would be able to process larger amounts of data faster if we could do it with distributed processing. This second implementation of CSV conversion used the DistSQL framework. DistSQL is meant to move SQL data processing (like WHERE clauses) down closer to where the data is on a disk and allow for concurrent operations across nodes, hopefully producing faster queries. The IMPORT statement’s use of DistSQL is a bit strange since there’s no SQL data on disk, but we were able to adapt it to be useful for this kind of mapreduce workflow.

The basic idea is similar to the single node implementation:

  1. Assign each CSV file to a node in the cluster.
  2. Convert CSVs to KVs.
  3. Route KVs by range each to a DistSQL processor.
  4. Each processor would collect KVs, sort them, and write the final SST file.

All of the steps here were similar to those above, except for the third step: routing. The hard part here is figuring out which processor to route each KV. This is difficult because we don’t know the key distribution before we’ve read the CSVs. To solve this, we read through the CSVs twice.

The first read samples the KVs at a certain rate. These samples are sent back to the coordinator, which can use them to estimate where the 32MB split points would be. The second read is then done, but now with the estimated split points, which are used to instruct the readers where to send the KVs. A side effect of this is that the actual sizes of the final SST files are just close to 32MB, with some standard deviation, since sampling doesn’t provide a perfect picture of the data.

We are still performing testing and benchmarking of this implementation (and as a result, it is not enabled by default in CockroachDB 1.1). Our goal is that our next major release will have the ability to choose between these implementations based on the size and number of input files so that whichever will be faster for the incoming data will be used.

Illustration by Quentin Vijoux

Ready to scale your database, but aren't sure where to start? We can help.

Get the guide