This page covers how to use a Kafka topic hosted on Confluent Cloud as a data source in Rockset. This involves:
- Creating an Integration - Securely send data from your Kafka cluster on Confluent Cloud to Rockset.
- Creating a Collection - Continuously syncs your data from a Kafka topic to Rockset in real-time.
Follow the steps below to create an integration between Rockset and your Confluent Cloud cluster:
- Identify your bootstrap server URL. You can find this on Confluent Cloud by navigating to Cluster Overview > Cluster Settings:
- Identify your Confluent Cloud cluster’s API key and secret. You can create a new API key or use an existing one from the API Keys tab under Cluster Overview:
Create API Key Warning
There are two places to create an API key, and one method is using the "Cloud API keys" under the right-navigation. Do not use it as it is for RBAC setup on the Confluent Cloud.
- Input these fields into the Kafka integration creation form and save your integration:
Authentication Error Warning
There is a known issue with Confluent Cloud. Due to the delay of Confluent API key propagation, an authentication error may occur if you use it immediately after the creation:
Try waiting at least 3-5 minutes before retrying to allow time for the new API key to be effective on the Confluent side. Reach out to Rockset if your retry is still failing due to authentication.
Optional: If you expect to have topics that use Avro data format ingested into a Rockset collection, the integration must also be configured with schema registry settings. Avro schemas are stored on a schema registry server, and access to the schema registry is required for Rockset to process Avro messages from a Kafka topic. Currently, using schema registry is only supported for Avro data format. A Kafka integration configured with schema registry can be used to ingest from topics serving either JSON or Avro format messages (mixing JSON and Avro messages within the same topic is not a supported configuration). A Kafka integration without schema registry configured can only be used to ingest from topics serving JSON format messages.
To configure schema registry with Confluent Cloud:
Use the schema registry URL, key and secret from above while creating Kafka integration on Rockset.
Follow the steps below to create a collection:
Navigate to the Collections tab of the Rockset Console to create a collection from a Kafka source.
Add a source which maps to a single Kafka topic. The required inputs are:
- Topic name
- Starting offset
- Specify the Starting Offset. This will determine where in the topic Rockset will begin ingesting data from. This can be either the earliest offset or the latest offset of the topic.
- Earliest: The collection will contain all of the data starting from the beginning of the topic. Choose this option if you require all of the existing data in the Kafka topic.
- Latest: The collection will contain data starting from the end of the topic, which means Rockset will only ingest records arriving after the collection is created. Choose this option to avoid extra data ingestion costs if earlier data is not necessary for your use case.
- Specify the Format of messages in the Kafka topic. Currently, Rockset supports processing JSON and Avro message formats. For a Rockset collection to process Avro messages from a topic, the corresponding Kafka integration on Rockset must be configured with valid schema registry settings.
After you create a collection backed by a Kafka topic, you should be able to see data flowing from your Kafka topic into the new collection. Rockset continuously ingests objects from your topic source and updates your collection automatically as new objects are added.
If ACLs are configured on the Kafka service account associated with the API key/secret pair that was used to create the integration related to the collection, make sure that proper permissions are set. Otherwise, you may see following permission issues at collection preview itself:
For example, you can use confluent CLI to configure permissions as follows:
> confluent kafka acl create --allow --service-account <service-account-id> --operation DESCRIBE --topic <topic>
> confluent kafka acl create --allow --service-account <service-account-id> --operation READ --topic <topic>
> confluent kafka acl create --allow --service-account <service-account-id> --operation DESCRIBE --prefix --consumer-group rockset
> confluent kafka acl create --allow --service-account <service-account-id> --operation READ --prefix --consumer-group rockset
Rockset uses consumer group ID prefixed with "rockset", so the above commands ensure appropriate permissions are set for such a consumer group ID pattern. Confirm permissions are set:
> confluent kafka acl list
Principal | Permission | Operation | ResourceType | ResourceName | PatternType
User:sa-abcdef | ALLOW | READ | TOPIC | <topic> | LITERAL User:sa-abcdef | ALLOW | DESCRIBE | TOPIC | <topic> | LITERAL User:sa-abcdef | ALLOW | READ | GROUP | rockset | PREFIXED User:sa-abcdef | ALLOW | DESCRIBE | GROUP | rockset | PREFIXED
Rockset automatically stores the following Kafka message metadata fields for you:
- key with type
- topic with type
- partition with type
- offset with type
- timestamp with type
- header, which is a nested structure including key-value pairs
They are all stored in the
_meta.kafka field which could be accessed like below:
If you don't want to retain all the metadata fields, consider using the following sample SQL transformation to drop them, for example this transformation query will only keep the source timestamp within the
} AS _meta, * EXCEPT(_meta)
Tombstone is a special type of record in Kafka which has a valid key and a null value. It is widely used for log compaction where the older message holding the same record key will be dropped from the topic. Rockset does support this behavior with a little additional work. To enable tombstone, you need to use SQL transformation to replace the content of
_id, a special Rockset field for document identification, with Kafka message key.
For example, we have the following sample collection enabled with the following SQL transformation:
SELECT CAST(_input._meta.kafka.key AS string) AS _id, * EXCEPT(_id) FROM _input
When we produce an empty record with key
20 into the source topic, we could see it gets automatically dropped in Rockset:
Please reach out to us if the above limitation(s) are blocking for your use case. For now, you can still use the integration with the same Kafka Connect setup used for Confluent Platform and Open Source Apache Kafka.
Updated 24 days ago