This page covers how to use a Kafka topic as a data source in Rockset. This involves:

  • Creating an <<glossary:Integration>> - Securely send data from your Kafka cluster on **Confluent Platform** or **Open Source Apache Kafka** into Rockset.

  • Creating a [<<glossary:Collection>>](🔗) - Continuously syncs your data from a Kafka topic into Rockset in real-time.

To connect to Confluent Platform or Apache Kafka, you will need to have a Kafka Connect installation.

## How it works

You can use [Kafka Connect](🔗) to send data from one or more Kafka topics into Rockset collections. Kafka Connect is a separate component from your Kafka brokers. Typically, it runs on a separate set of nodes. Kafka connect is the recommended mechanism for reading and writing to/from Kafka topics in a reliable, fault tolerant and scalable manner.

A Kafka connect installation can be configured with different types of connector plugins:

  • **Source connector plugin** for writing data from an external data source into a Kafka topic

  • **Sink connector plugin** for writing from a Kafka topic into an external data source

Rockset has a sink connector plugin that can be installed into a Kafka connect cluster and can send JSON and Avro data from Kafka topics to Rockset.

How it works


## Set up Kafka Connect

As described above, in order to connect Kafka topics to Rockset, you must have a Kafka Connect installation that is connected to your Kafka broker cluster.

  • If you are using Confluent Platform, it may already come with a Kafka Connect installation. In that case, you can skip to the section that describes installing the Rockset sink connector plugin below.

  • If you are using Apache Kafka, you must set up and operate your own Kafka Connect installation.

As you progress through this section, refer to the Kafka [Configuration Reference](🔗) to tune properties, as needed.

### Prototype (Standalone Mode)

To quickly connect an existing Kafka cluster to Rockset, you can run a Kafka connect process in [standalone mode](🔗). Note that this is good for quickly getting started but not recommended to run in production. You can run the following steps on your local machine, in docker, or a cloud VM instance.

  1. [Download](🔗) the latest version of Apache Kafka binary distribution and extract it to your local directory.

  2. [Download](🔗) the Rockset Kafka Connect plugin.

Finally, you can edit the file `$KAFKA_HOME/config/connect-standalone.properties` and modify the following properties.



We are now ready to run the standalone Kafka Connect. You can generate configuration for forwarding specific topics from Rockset into Kafka by [setting up a Kafka integration](🔗).

Once you have this configuration, you can create a file containing that configuration in `$KAFKA_HOME/config/connect-rockset-sink.properties` and run the Kafka connect standalone cluster as follows:



### Production (Distributed Mode)

If you want to run Kafka connect in production, you must run it in [distributed mode](🔗). This may need setting up separate cloud VM instances, or docker containers that all work together to ensure data is sent from Kafka topics into Rockset in a fault tolerant and scalable manner. You will need to separately [download](🔗) the Rockset Kafka Connect plugin into each node of the Kafka connect distributed cluster.

  • If you are using the official docker images, you can install the above connector into it as described [here](🔗).

  • If you are using VMs and setting up Kafka Connect from the official binaries on them, you can follow the instructions [here](🔗).

The configuration of the Rockset Kafka Connect plugin can be done using the REST endpoint. The cURL command to do this can be obtained at the end of [setting up a Kafka integration](🔗).

### Running Kafka with SSL

If you have configured your Kafka cluster to use SSL, then you have to specify additional properties in the connect-properties file as documented in the [Kafka SSL configuration](🔗). Rockset is a sink connector that uses the Kafka source consumer, so you have to add the following properties to either connect-standalone.properties or connect-distributed.properties file. The sample values shown below are placeholders and you have to replace them with the appropriate ones from your Kafka cluster.



## Download the Rockset Kafka Connect plugin

Navigate to the [release page](🔗) of the plugin's [GitHub repository](🔗). Download `kafka-connect-rockset-VERSION-jar-with-dependencies.jar`, where `VERSION` is the current latest version. Note that only version 1.1.0 or later is supported.

## Create a Kafka Connect based integration

Create a new Kafka integration using the Rockset console by navigating to **Integrations** > **Add Integration** > **Kafka**. You can then name your integration and configure your data format and topics. The only currently supported data formats are JSON and Avro.

Integration details page


After saving the integration, you will see the configuration for the Rockset sink plugin that can be used to configure a Kafka Connect cluster in standalone or distributed mode.

## Create a collection from a Kafka Connect based integration

To create a collection, you must pick an integration that has already been successfully set up. During collection creation, you may choose one or more Kafka topics that you want to sink into the collection.

You can create a collection from a Kafka source in the [Collections tab of the Rockset Console](🔗).

Create Kafka Collection


These operations can also be performed using the [Rockset client libraries](🔗), the [Rockset API](🔗), or the [Rockset CLI](🔗).

## Configuration Reference

### Important Properties

FieldDescription
`name`Specifies the name of the plugin currently running. We use the name of the integration for this, but that is not required.
`connector.class`Used by the Kafka Connect framework to instantiate our plugin correctly.
`task.max`The maximum number of sink tasks that can be run concurrently. This is used by the Kafka Connect framework to tune the performance of our plugin.
`topics`The topics we want the sink plugin to consume documents from.
`rockset.task.threads`The number of threads the plugin runs per task.
`rockset.apiserver.url`The URL of the API server the plugin will write the documents to.
`rockset.integration.key`Used by our internal services to make sure your data is tied to the correct integration and thus lands in the correct collections.
`format`Specifies the format our plugin expects your data to conform to.
`key.converter`Converter class for key data from Kafka Connect. This controls the converter we use to deserialize data from Kafka. The converter is different depending on whether JSON or Avro is specified as the data format.
`value.converter`Converter class for value data from Kafka Connect. This controls the converter we use to deserialize data from Kafka. The converter is different depending on whether JSON or Avro is specified as the data format.
`key.converter.schemas.enable` (JSON only)Whether the JSON data in Kafka has a schema associated with it. This is false in our case, as Rockset does not require a fixed schema for your data.
`value.converter.schemas.enable` (JSON only)Whether the JSON data in Kafka has a schema associated with it. This is false in our case, as Rockset does not require a fixed schema for your data.
`key.converter.schema.registry.url` (Avro only)The schema registry instances that will be used to look up schemas for key data from Kafka Connect. For more information, see [the Confluent documentation](🔗).
`value.converter.schema.registry.url` (Avro only)The schema registry instances that will be used to look up schemas for value data from Kafka Connect. For more information, see [the Confluent documentation](🔗).

### Starting Offset

The default starting offset is set to latest - meaning Kafka Connect starts reading from the end of the topic. To send earliest instead, `consumer.auto.offset.reset` needs to be updated on the source side. There are two ways to do this:

  1. Override at the **Worker** level which impacts all connectors on that worker. To do this, add the following to your connect-distributed.properties: `consumer.auto.offset.reset=earliest`

  2. Override at the **Connector** level (only for Apache Kafka 2.3 or later). To do this, first set `connector.client.config.override.policy=All` in the worker config. If using Docker, then set `CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY: 'All'`. Once set, add the following in the consumer properties for the connector you want to change: `"consumer.override.auto.offset.reset":"earliest"`

For both starting offset options, you will need to restart the Kafka Connect process to pick up the configuration change(s).

### Tuning Kafka Connect Properties

Below are some tips for tuning Kafka Connect and Rockset sink to help saturate batches going to Rockset:

  1. `connector.client.config.override.policy=All` This needs to be added to the Kafka Connect worker properties to allow overrides for each connector. You can narrow it down to consumer but it is generally safe to set to `All`. You can also just adjust the worker properties directly by using the `consumer` prefix if this is the only connector you are using. For more information on this parameter, click [here](🔗).

  2. `consumer.override.fetch.min.bytes=100000` This goes in the Rockset Sink connector configuration. Try setting this 500,000 (500kB) and adjust up or down, as needed. For more information on this parameter, click [here](🔗).

  3. `consumer.override.fetch.max.wait.ms=500` This goes in the Rockset Sink connector configuration. While 500 is the default, you may need to adjust this in order to saturate the minimum bytes needed for decent batching. Increasing this can also increase data latency, but will help with throughput. For more information on this parameter, click [here](🔗).

  4. `consumer.override.max.poll.records=1000` Batches are capped at 500 documents, so we recommend increasing to 1,000 to start and potentially going up to 5,000, if needed. For more information on this parameter, click [here](🔗).