This page covers how to use a Kafka topic hosted on Confluent Cloud as a data source in Rockset. This involves:

  • Creating an <<glossary:Integration>> - Securely send data from your Kafka cluster on **Confluent Cloud** to Rockset.

  • Creating a [<<glossary:Collection>>](🔗) - Continuously syncs your data from a Kafka topic to Rockset in real-time.

## Create a Confluent Cloud integration

Follow the steps below to create an integration between Rockset and your Confluent Cloud cluster:

  1. Identify your bootstrap server URL. You can find this on Confluent Cloud by navigating to **Cluster Overview** > **Cluster Settings**:

Cluster Settings


  1. Identify your Confluent Cloud cluster’s API key and secret. You can create a new API key or use an existing one from the **API Keys** tab under **Cluster Overview**:

API Keys


Create API Key Warning

There are two places to create an API key, and one method is using the "Cloud API keys" under the right-navigation. **Do not use it** as it is for [RBAC](🔗) setup on the Confluent Cloud.

Cloud API keys


  1. Input these fields into the Kafka integration creation form and save your integration:

Add Integration


Authentication Error Warning

There is a known issue with Confluent Cloud. Due to the delay of Confluent API key propagation, an authentication error may occur if you use it immediately after the creation:



Try waiting at least 3-5 minutes before retrying to allow time for the new API key to be effective on the Confluent side. Reach out to Rockset if your retry is still failing due to authentication.

**Optional:** If you expect to have topics that use **Avro** data format ingested into a Rockset collection, the integration must also be configured with schema registry settings. Avro schemas are stored on a schema registry server, and access to the schema registry is required for Rockset to process Avro messages from a Kafka topic. Currently, using schema registry is only supported for Avro data format. A Kafka integration configured with schema registry can be used to ingest from topics serving either JSON or Avro format messages (mixing JSON and Avro messages within the same topic is not a supported configuration). A Kafka integration without schema registry configured can only be used to ingest from topics serving JSON format messages.

To configure schema registry with Confluent Cloud:

Schema Registry 1 Schema Registry 2

Use the schema registry URL, key and secret from above while creating Kafka integration on Rockset.

## Create a collection from a Confluent Cloud integration

Follow the steps below to create a collection:

  1. Navigate to the [Collections tab of the Rockset Console](🔗) to create a collection from a Kafka source.

  2. Add a source which maps to a single Kafka topic. The required inputs are:

  • **Topic name**

  • **Starting offset**

  1. Specify the **Starting Offset**. This will determine where in the topic Rockset will begin ingesting data from. This can be either the earliest offset or the latest offset of the topic.

  • **Earliest:** The collection will contain all of the data starting from the beginning of the topic. Choose this option if you require all of the existing data in the Kafka topic.

  • **Latest:** The collection will contain data starting from the end of the topic, which means Rockset will only ingest records _arriving after_ the collection is created. Choose this option to avoid extra data ingestion costs if earlier data is not necessary for your use case.

  1. Specify the **Format** of messages in the Kafka topic. Currently, Rockset supports processing JSON and Avro message formats. For a Rockset collection to process Avro messages from a topic, the corresponding Kafka integration on Rockset must be configured with valid schema registry settings.

New Collection


After you create a collection backed by a Kafka topic, you should be able to see data flowing from your Kafka topic into the new collection. Rockset continuously ingests objects from your topic source and updates your collection automatically as new objects are added.

## Permissions

If ACLs are configured on the Kafka service account associated with the API key/secret pair that was used to create the integration related to the collection, _make sure that proper permissions are set_. Otherwise, you may see following permission issues at collection preview itself:

Topic Authentication Error Consumer Groups Error

For example, you can use confluent CLI to configure permissions as follows:



Rockset uses consumer group ID prefixed with "rockset", so the above commands ensure appropriate permissions are set for such a consumer group ID pattern. **Confirm permissions are set**:



## Metadata Access

Rockset automatically stores the following Kafka message metadata fields for you:

  • **key** with type `bytes`

  • **topic** with type `string`

  • **partition** with type `int`

  • **offset** with type `int`

  • **timestamp** with type `int`

  • **header**, which is a nested structure including key-value pairs

They are all stored in the `_meta.kafka` field which could be accessed like below:

Kafka Metadata Collection


If you don't want to retain all the metadata fields, consider using the following sample SQL transformation to drop them, for example this transformation query will only keep the source timestamp within the `_meta` fields:



Kafka Metadata With Only Timestamp


### Supporting Kafka Tombstone

Tombstone is a special type of record in Kafka which has a valid key and a null value. It is widely used for log compaction where the older message holding the same record key will be dropped from the topic. Rockset does support this behavior with a little additional work. To enable tombstone, you need to use SQL transformation to replace the content of `_id`, a special Rockset field for document identification, with **Kafka message key**.

For example, we have the following sample collection enabled with the following SQL transformation:



Kafka Tombstone Demo Before


When we produce an empty record with key `20` into the source topic, we could see it gets automatically dropped in Rockset:

Kafka Tombstone Demo After


## Limitations

Please reach out to us if the above limitation(s) are blocking for your use case. For now, you can still use the integration with the same [Kafka Connect setup](🔗) used for Confluent Platform and Open Source Apache Kafka.