This document covers how to bring change data capture (CDC) data from your source into Rockset. CDC data is any stream of records that represent insert/update/delete operations taking place on some source system.

If you're looking for the automated CDC templates, skip ahead to their [reference](🔗) below.

## Optimized for CDC

Rockset's ingestion platform includes all the building blocks required to ingest CDC data in real-time.

  • Our managed Data Source [<<glossary:Integrations>>](🔗) are purpose built for real-time ingestion and include several source types which are popular conduits for CDC data. Our [<<glossary:Write API>>](🔗) also enables real-time, push-based sources.

  • [<<glossary:Ingest Transformations>>](🔗) give you the power to drop, rename or combine fields, filter out incoming rows, cast, or otherwise transform the incoming CDC stream in real-time without the need for additional data pipelines.

  • Rockset's indexes are fully mutable, meaning updates and deletes can be efficiently applied in real-time so you save on compute even while queries run on the freshest data.

## How To Ingest CDC Data

Ingesting CDC data is largely automated with [CDC templates](🔗) discussed below, but here we'll explain how it works in case your desired CDC format does not have a corresponding template, or you want to configure it manually.

Once you bring your CDC data into one of Rockset's supported data sources, or configure your application to send CDC records to our Write API, you need to create a <<glossary:Collection>> in the Rockset console and configure an ingest transformation to

  1. Map the CDC fields to Rockset's special fields [`_op`](🔗), [`_id`](🔗), and [`_event_time`](🔗).

  2. Project the source fields you want indexed in Rockset.

  3. (Optionally) Filter out any CDC records you want to ignore.

### Mapping Special Fields

You can read more about the semantics of Rockset's special fields [here](🔗).

**\_op**

The `_op` field is how Rockset knows how to handle incoming records from your source. Based on the value of `_op` in a record, that record can lead to an insert, update, delete, replace, or some combination of those operations on your collection.

The supported operation types (case insensitive) are: `INSERT`, `UPDATE`, `UPSERT` (default), `DELETE`, `REPLACE`, and `REPSERT`. For more on their exact semantics, refer to the [documentation](🔗) for `_op`.

CDC formats typically have their own unique codes for whether the record is an insert, update, delete, etc. You can write your transformation to account for this using conditional expressions like [CASE](🔗) or [IF](🔗).

For example, if a format has a field `changeType` which is one of `[CREATE, UPDATE, DELETE]`, then you would map this to `_op` with an expression like



or if the format is such that for delete operations an `after` field is null, otherwise it is not null, then



**Tip:** A common gotcha here is remembering that SQL uses single quotes for string literals. So having a predicate comparing to "DELETE" (double quotes) will actually look for a field called `DELETE` and not do what you're intending.

**\_id**

The primary key of your source needs to map to the \_id of your documents in Rockset so that updates and deletes are properly applied. In the case of single primary key, this is as simple as



or in the case where your primary key is not a string type in your source system (since \_id must always be a string)



In case of compound primary keys in your source, you need to map the fields of the primary key to a unique string. We have a built-in SQL function, [ID_HASH](🔗), exactly for this purpose.



We strongly advise you use the `ID_HASH` function for these kinds of cases. But if you do choose to use another [hash function](🔗), note that most have a return type of bytes, whereas `_id` must be a string type. Therefore if you use some other hash to construct \_id, you should wrap it in another function like `TO_BASE64` or `TO_HEX` that takes a bytes argument and returns a string result.

Note on Rollup Collections

[<<glossary:Rollup>>](🔗) collections are insert only and _do not allow_ a custom mapping for \_id. For that reason you cannot do the above mapping for rollup collections and any update/delete deltas coming from your source will be rejected.

**\_event_time**

Though not strictly necessary like the special fields above, it is good practice to create a custom mapping for the document timestamp `_event_time` if the CDC format exposes the time of record creation. `_event_time` must be either a timestamp or an integer that represents the microseconds since epoch.

You can use any of the relevant [timestamp functions](🔗) to do this conversion, e.g.




Since `_event_time` is an immutable field on a document, you only need to set it for insert type CDC records, not update or delete. This means you can combine this with the `_op` logic like



Else Case Warning

Importantly, the else case above uses `undefined` which is equivalent to "does not exist". You _cannot_ set it to `NULL` because a null event time is invalid and will cause the ingestion of that document to error.

### Projecting Source Fields

Typically CDC records have a deeply nested structure and it's a good idea to pull out the fields you want to persist and index in your Rockset collection explicitly.

For example consider a CDC record of the form



Presumably here you want `field1` and `field2` to be ingested, so you should explicitly project them like (ommitting special fields for brevity)



You can also take this opportunity to do any field pruning or renaming, e.g.



### Filtering Records

The WHERE clause of the ingest transformation can be used to filter out CDC records that are unneeded. This can be useful if you have CDC control records in the stream (e.g. dropping a table) that you don't want ingested alongside data records. It can also be valuable if you have multiple CDC streams being piped through the same source and you want to only ingest some of them.

For example



## Considerations

A few considerations to keep in mind when ingesting CDC data to ensure the smoothest experience

  • Only event stream sources provide ordering guarantees during ingestion. This means that for CDC records that include updates and deletes, bringing that data from another source type such as an object store may lead to out of order processing and unexpected behavior.

  • Rollup collections are insert only, so updates/deletes will be rejected.

  • Rockset does not support updating the primary key (\_id) of a document. If you update primary keys in your source, you'll have to delete the original document in Rockset then create a new one. Some CDC formats offer options to represent primary key updates in this way.

## CDC Transformation Templates

To help automate the process of bringing CDC data into Rockset, we have built support for a number of popular CDC formats directly into the ingest transformation process of creating a collection. These transformation templates can automatically infer which CDC format you have, and construct some or all of the transformation for you out of the box.

Below is a reference of the CDC formats for which automated templates are supported.

CDC Tips

Based on the specific configuration of your upstream CDC engine, it is possible the formats differ slightly from the expected format these templates reference. In that case, you can use the template as a starting point to automate many aspects but may need to manually adjust some of the sections where your data model differs from what is expected.

In order to get the most out of these templates, your source should have some records available for previewing during the collection creation process. This means it is preferable to first configure your upstream CDC system to send records to a source and then to go through the collection creation process in the Rockset console. For cases where that is not possible, for example when sending records to the write api, consider first downloading a few sample CDC records to a file and creating a sample file upload collection to iterate on the transformation before creating your final collection.

### Debezium

For Debezium data, both JSON encoded and Avro encoded CDC records are supported.

Refer to the Debezium [docs](🔗) for any source-specific limitations, including how data types are mapped.

#### JSON

This is for Debezium records encoded using a JSON serializer to a destination like Kafka.

**Expected Structure:**

  • `payload.op` is one of `[r, c, u, d]`. `d` is mapped to `_op=DELETE` and the rest to `_op=UPSERT`.

  • For delete operations, `payload.before` contains the full document, for other operations it is in `payload.after`.

  • `payload.source.ts_ms` is an integer representing milliseconds since epoch.

**Example Record:**



The resulting transformation that is automatically generated based on the sample record and specifying the primary key being `order_id` is



**Limitations:**

  • Primary key updates are not supported

#### Avro

This is for Debezium records encoded using an AVRO serializer to a destination like Kafka.

**Expected Structure:**

  • `op` is one of `[r, c, u, d]`. `d` is mapped to `_op=DELETE` and the rest to `_op=UPSERT`.

  • For delete operations, `before.Value` contains the full document, for other operations it is in `after.Value`.

  • `source.ts_ms` is an integer representing milliseconds since epoch.

**Example Record:**



The resulting transformation that is automatically generated based on the sample record and specifying the primary key being `order_id` is



**Limitations:**

  • Primary key updates are not supported

  • Avro nests certain fields, e.g. above having `{"date_created: {"string": "2021-01-21}}`. We try to unpack these in the transformation automatically, but you may need to do additional path extraction and/or casting on the extracted value.

### AWS Data Migration Service (DMS)

Refer to the DMS [docs](🔗) for any source-specific limitations, including how data types are mapped.

**Expected Structure:**

  • `metadata.record-type` is one of `[control, data]`. Control records are filtered out in the WHERE clause.

  • For data records, `metadata.operation` is one of `[load, insert, update, delete]`. `delete` is mapped to `_op=DELETE`, and the rest to `_op=UPSERT`.

  • `metadata.timestamp` is an ISO 8601 formatted timestamp string.

  • The full document is in `data`.

**Example Record:**



The resulting transformation that is automatically generated based on the sample record and specifying the primary key being `order_id` is



**Limitations:**

  • Since control records are ignored, operations like adding/removing columns, or even dropping the source table are unreflected in Rockset.

### Google Cloud Datastream

Refer to the Datastream [docs](🔗) for any source-specific limitations, including how data types are mapped.

**Expected Structure:**

  • `source_metadata.change_type` is one of `[INSERT, UPDATE, UPDATE-DELETE, UPDATE-INSERT, DELETE]`. `UPDATE_DELETE` and `DELETE` are mapped to `_op=DELETE`, and the rest to `_op=UPSERT`.

  • `source_metadata.source_timestamp` is an ISO 8601 formatted timestamp string.

  • The full document is in `payload`.

**Example Record:**



The resulting transformation that is automatically generated based on the sample record and specifying the primary key being `THIS_IS_MY_PK` is



### Striim

Refer to the Striim [docs](🔗) for any source-specific limitations, including how data types are mapped. The Striim template is compatible with the JSON and Avro formatters.

**Expected Structure:**

  • `metadata.OperationName` is one of `[INSERT, UPDATE, DELETE]`. `DELETE` is mapped to `_op=DELETE`, and the rest to `_op=UPSERT`.

  • `metadata.CommitTimestamp` is a timestamp string corresponding to the format specification `%d-%b-%Y %H:%M:%S`.

  • `data` contains the full document for all records; `before` contains the prior version of the document in the case of updates.

**Example Record:**



The resulting transformation that is automatically generated based on the sample record and specifying the primary key being `ID` is



**Limitations:**

  • Striim must be configured in either Async mode or Sync mode with batching disabled because you cannot extract multiple Rockset documents from a single incoming record.

### Arcion

Refer to the Arcion [docs](🔗) for any source-specific limitations, including how data types are mapped.

**Expected Structure:**

  • `opType` is one of `[I, U, D]`. `D` is mapped to `_op=DELETE`, and the rest to `_op=UPSERT`.

  • `cursor` is a serialized JSON object whose `timestamp` field is a number representing millieseconds since epoch.

  • For insert and update records, the full document is in `after`; for delete records, the full document is in `before`.

**Example Record:**



The resulting transformation that is automatically generated based on the sample record and specifying the primary key being `n_nationkey` is



## Example Walkthrough

This walkthrough shows the process of creating a collection on top of some sample CDC data. Here we'll work with Debezium CDC coming from a Postgres instance and being sent to a Kafka topic using a JSON serializer, which we assume has already been configured.

We navigate to the Rockset [console](🔗) to create a new collection and select Kafka. We fill out the Kafka topic name, set the offset policy to Earliest so any data we have in the topic prior to the collection creation will be ingested, and specify the format as JSON.



When we move to the second step of the collection creation process to set up the ingest transformation, we'll see that Rockset has already inferred the data we're bringing in is in Debezium format and has prepopulated the transformation template corresponding to Debezium.



We're almost done, but Rockset doesn't know which fields correspond to the primary key in our source. So we click on the multi-select input for the primary keys and select `order_id` from the list of available fields Rockset has auto-detected. You can also type your own custom field name here.





And that's it! It's a good idea to do a final inspection of the transformation to see if any additional type casting is required, if any of the projected fields may not be needed, etc. We can then complete the collection creation process and Rockset will begin ingesting the CDC records and applying the inserts, updates, and deletes accordingly.