QuestDB Kafka Connector
QuestDB ships a QuestDB Kafka connector for fast ingestion from Kafka into QuestDB. This is also useful for processing change data capture for the dataflow. The connector is based on the Kafka Connect framework and acts as a sink for Kafka topics.
This page has the following main sections:
Integration guide
This guide shows the steps to use the QuestDB Kafka connector to read JSON data from Kafka topics and write them as rows into a QuestDB table. For Confluent users, please check the instructions in the Confluent Docker images.
Prerequisites
You will need the following:
- Kafka
- QuestDB instance
- Local JDK installation
Configure Kafka
Before you begin manual configuration, note that that the connector is also available from the Confluent Hub.
Download the latest QuestDB Kafka connector package:
curl -s https://api.github.com/repos/questdb/kafka-questdb-connector/releases/latest |
jq -r '.assets[]|select(.content_type == "application/zip")|.browser_download_url'|
wget -qi -
Next, unzip the contents of the archive and copy the required .jar
files to
your Kafka libs
directory:
unzip kafka-questdb-connector-*-bin.zip
cd kafka-questdb-connector
cp ./*.jar /path/to/kafka_*.*-*.*.*/libs
Set Kafka configuration file
Create a Kafka Connect configuration file at
/path/to/kafka/config/questdb-connector.properties
. You can also define a
host, port, as well as a topic under the topics={mytopic}
key. For more
information on how to configure properties, see the
configuration manual.
This example file:
- Assumes a running InfluxDB Line Protocol default port of
9000
- Creates a reader from a Kafka topic:
example-topic
- Creates & writes into a QuestDB table:
example_table
name=questdb-sink
client.conf.string=http::addr=localhost:9000;
topics=example-topic
table=example_table
connector.class=io.questdb.kafka.QuestDBSinkConnector
# message format configuration
value.converter=org.apache.kafka.connect.json.JsonConverter
include.key=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
Start Kafka
The commands listed in this section must be run from the Kafka home directory and in the order shown below.
- Start the Kafka Zookeeper used to coordinate the server:
bin/zookeeper-server-start.sh config/zookeeper.properties
- Start a Kafka server:
bin/kafka-server-start.sh config/server.properties
- Start the QuestDB Kafka connector:
bin/connect-standalone.sh config/connect-standalone.properties config/questdb-connector.properties
Publish messages
Messages can be published via the console producer script:
bin/kafka-console-producer.sh --topic example-topic --bootstrap-server localhost:9092
A greater-than symbol, >
, indicates that a message can be published to the
example topic from the interactive session. Paste the following minified JSON as
a single line to publish the message and create the table example-topic
in the
QuestDB instance:
{"firstname": "Arthur", "lastname": "Dent", "age": 42}
Verify the integration
To verify that the data has been ingested into the example-topic
table, the
following request to QuestDB's /exp
REST API endpoint can be made to export
the table contents via the curl command.
curl -G \
--data-urlencode "query=select * from 'example_table'" \
http://localhost:9000/exp
The expected response based on the example JSON message published above will be similar to the following:
"firstname","age","lastname","timestamp"
"Arthur",42,"Dent","2022-11-01T13:11:55.558108Z"
If you can see the expected result, then congratulations!
You have successfully created and executed your first Kafka to QuestDB pipeline. 🎉
Additional sample projects
You can find additional sample projects on the QuestDB Kafka connector Github project page.
It includes a sample integration with Debezium for CDC from PostgreSQL.
Configuration manual
This section lists configuration options as well as further information about the Kafka Connect connector.
Configuration Options
The connector configuration consists of two parts: the client configuration string and the connector configuration options. The client configuration string specifies how the connector communicates with the QuestDB server, while the connector configuration options specify how the connector interprets and processes data from Kafka.
See the chapter Client configuration string for more information about the client configuration string. The table below lists the connector configuration options:
Name | Type | Example | Default | Meaning |
---|---|---|---|---|
client.conf.string | string | http::addr=localhost:9000; | N/A | Client configuration string |
topics | string | orders,audit | N/A | Kafka topics to read from |
key.converter | string | org.apache.kafka.connect.storage.StringConverter | N/A | Converter for keys stored in Kafka |
value.converter | string | org.apache.kafka.connect.json.JsonConverter | N/A | Converter for values stored in Kafka |
table | string | my_table | Same as Topic name | Target table in QuestDB |
key.prefix | string | from_key | key | Prefix for key fields |
value.prefix | string | from_value | N/A | Prefix for value fields |
allowed.lag | int | 250 | 1000 | Allowed lag when there are no new events |
skip.unsupported.types | boolean | false | false | Skip unsupported types |
timestamp.field.name | string | pickup_time | N/A | Designated timestamp field name |
timestamp.units | string | micros | auto | Designated timestamp field units |
timestamp.kafka.native | boolean | true | false | Use Kafka timestamps as designated timestamps |
timestamp.string.fields | string | creation_time,pickup_time | N/A | String fields with textual timestamps |
timestamp.string.format | string | yyyy-MM-dd HH:mm:ss.SSSUUU z | N/A | Timestamp format, used when parsing timestamp string fields |
include.key | boolean | false | true | Include message key in target table |
symbols | string | instrument,stock | N/A | Comma separated list of columns that should be symbol type |
doubles | string | volume,price | N/A | Comma separated list of columns that should be double type |
How does the connector work?
The connector reads data from Kafka topics and writes it to QuestDB tables via InfluxDB Line Protocol. The connector converts each field in the Kafka message to a column in the QuestDB table. Structures and maps are flatted into columns.
Example: Consider the following Kafka message:
{
"firstname": "John",
"lastname": "Doe",
"age": 30,
"address": {
"street": "Main Street",
"city": "New York"
}
}
The connector will create a table with the following columns:
firstname string | lastname string | age long | address_street string | address_city string |
---|---|---|---|---|
John | Doe | 30 | Main Street | New York |
Client configuration string
The connector internally uses the QuestDB Java client to communicate with the
QuestDB server. The client.conf.string
option allows you to configure the
client. Alternatively, you can set the client configuration string by exporting
an environment variable QDB_CLIENT_CONF
.
This string follows the format:
<protocol>::<key>=<value>;<key>=<value>;...;
Please note the trailing semicolon at the end of the string.
The supported transport protocols are:
- http
- https
A transport protocol and the key addr=host:port;
are required. The key addr
defines the hostname and port of the QuestDB server. If the port is not
specified, it defaults to 9000.
Minimal example with plaintext HTTP:
http::addr=localhost:9000;
Configuration string with HTTPS and custom retry timeout:
http::addr=localhost:9000;retry_timeout=60000;
See the Java Client configuration guide for all available options. Note that client configuration options are different from connector configuration options. The client configuration string specifies how the connector communicates with the QuestDB server, while the connector configuration options specify how the connector interprets and processes data from Kafka.
Besides HTTP transport, the QuestDB client also technically supports TCP transport; however, it is not recommended for use with the Kafka connector. TCP transport offers no delivery guarantees, so it is therefore unsuitable for use with Kafka.
Supported serialization formats
The connector does not deserialize data independently. It relies on Kafka
Connect converters. The connector has been tested predominantly with JSON, but
it should work with any converter, including Avro. Converters can be configured
using key.converter
and value.converter
options, both are included in the
Configuration options table above.
Designated timestamps
The connector supports designated timestamps.
There are three distinct strategies for designated timestamp handling:
- QuestDB server assigns a timestamp when it receives data from the connector. (Default)
- The connector extracts the timestamp from the Kafka message payload.
- The connector extracts timestamps from Kafka message metadata.
Kafka messages carry various metadata, one of which is a timestamp. To use the
Kafka message metadata timestamp as a QuestDB designated timestamp, set
timestamp.kafka.native
to true
.
If a message payload contains a timestamp field, the connector can utilize it as
a designated timestamp. The field's name should be configured using the
timestamp.field.name
option. This field should either be an integer or a
timestamp.
When the field is defined as an integer, the connector will automatically detect
its units. This is applicable for timestamps after 04/26/1970, 5:46:40 PM
.
The units can also be configured explicitly using the timestamp.units
option,
which supports the following values:
nanos
micros
millis
seconds
auto
(default)
Note: These 3 strategies are mutually exclusive. Cannot set both
timestamp.kafka.native=true
and timestamp.field.name
.
Textual timestamps parsing
Kafka messages often contain timestamps in a textual format. The connector can
parse these and use them as timestamps. Configure field names as a string with
the timestamp.string.fields
option. Set the timestamp format with the
timestamp.string.format
option, which adheres to the QuestDB timestamp format.
See the QuestDB timestamp documentation for more details.
Example
Consider the following Kafka message:
{
"firstname": "John",
"lastname": "Doe",
"born": "1982-01-07 05:42:11.123456 UTC",
"died": "2031-05-01 09:11:42.456123 UTC"
}
To use the born
field as a designated timestamp and died
as a regular
timestamp set the following properties in your QuestDB connector configuration:
timestamp.field.name=born
- the fieldborn
is a designated timestamp.timestamp.string.fields=died
- set the field namedied
as a textual timestamp. Notice this option does not contain the fieldborn
. This field is already set as a designated timestamp so the connector will attempt to parse it as a timestamp automatically.timestamp.string.format=yyyy-MM-dd HH:mm:ss.SSSUUU z
- set the timestamp format. Please note the correct format for microseconds isSSSUUU
(3 digits for milliseconds and 3 digits for microseconds).
Fault Tolerance
The connector automatically retries failed requests deemed recoverable. Recoverable errors include network errors, some server errors, and timeouts, while non-recoverable errors encompass invalid data, authentication errors, and other client-side errors.
Retrying is particularly beneficial during network issues or when the server is
temporarily unavailable. The retrying behavior can be configured through client
configuration parameter retry_timeout
. This parameter specifies the maximum
time in milliseconds the client will attempt to retry a request. The default
value is 10000 ms.
Example with a retry timeout of 60 seconds:
client.conf.string=http::addr=localhost:9000;retry_timeout=60000;
Exactly once delivery
Retrying might result in the same rows delivered multiple times. To ensure exactly once delivery, a target table must have deduplication enabled. Deduplication filters out duplicate rows, including those caused by retries.
Note that deduplication requires designated timestamps extracted either from message payload or Kafka message metadata. See the Designated timestamps section for more information.
Latency considerations
The connector waits for a batch of messages to accumulate before sending them to
the server. The batch size is determined by the auto_flush_rows
client string
configuration parameter, it defaults to 75,000 rows. When you have a low
throughput, you might want to reduce the batch size to reduce latency.
Example with a batch size 1,000:
client.conf.string=http::addr=localhost:9000;auto_flush_rows=1000;
The protocol also sends accumulated data to a QuestDB server when Kafka topics
has no new events for 1000 milliseconds. This value can be configured with the
allowed.lag
connector configuration parameter.
Example with allowed lag set to 250 milliseconds:
client.conf.string=http::addr=localhost:9000;auto_flush_rows=1000;
allowed.lag=250;
The connector also sends data to the server when Kafka Connect is committing
offsets. Frequency of commits can be configured in the Kafka Connect
configuration file by using the offset.flush.interval.ms
parameter. See the
Kafka Connect Reference
for more information.
Symbol type
QuestDB supports a special type called
symbol. Use the symbols
configuration option to specify which columns should be created as the symbol
type.
Numeric type inference for floating point type
When a configured Kafka Connect deserializer provides a schema, the connector
uses it to determine column types. If a schema is unavailable, the connector
infers the type from the value. This might produce unexpected results for
floating point numbers, which may be interpreted as long
initially and
generates an error.
Consider this example:
{
"instrument": "BTC-USD",
"volume": 42
}
Kafka Connect JSON converter deserializes the volume
field as a long
value.
The connector sends it to the QuestDB server as a long
value. If the target
table does not have a column volume
, the database creates a long
column. If
the next message contains a floating point value for the volume
field, the
connector sends it to QuestDB as a double
value. This causes an error because
the existing column volume
is of type long
.
To avoid this problem, the connector can be configured to send selected numeric
columns as double
regardless of the actual initial input value. Use the
doubles
configuration option to specify which columns should the connector
always send as the double
type.
Alternatively, you can use SQL to explicitly create the target table with the correct column types instead of relying on the connector to infer them. See the paragraph below.
Target table considerations
Table name
By default, the target table name in QuestDB is the same as the Kafka topic name from which a message originates. When a connector is configured to read from multiple topics, it uses a separate table for each topic.
Set the table configuration option to override this behavior. Once set, the connector will write all messages to the specified table, regardless of the topic from which they originate.
Example:
name=questdb-sink
connector.class=io.questdb.kafka.QuestDBSinkConnector
client.conf.string=http::addr=localhost:9000;
topics=example-topic
table=example_table
[...]
The table
options supports simple templating. You can use the following
variables in the table name: ${topic}
, ${key}
. The connector will replace
these variables with the actual topic name and key value from the Kafka message.
Example:
name=questdb-sink
connector.class=io.questdb.kafka.QuestDBSinkConnector
client.conf.string=http::addr=localhost:9000;
topics=example-topic
table=from_kafka_${topic}_${key}_suffix
[...]
The placeholder ${key}
will be replaced with the actual key value from the
Kafka message. If the key is not present in the message, the placeholder will be
replaced with the string null
.
Table schema
When a target table does not exist in QuestDB, it will be created automatically. This is the recommended approach for development and testing.
In production, it's recommended to use the SQL CREATE TABLE keyword, because it gives you more control over the table schema, allowing per-table partitioning, creating indexes, etc.
FAQ
Does this connector work with Schema Registry?
The Connector works independently of the serialization strategy used. It relies
on Kafka Connect converters to deserialize data. Converters can be configured
using key.converter
and value.converter
options, see the configuration
section above.
I'm getting this error: "org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires 'schema' and 'payload' fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration."
This error means that the connector is trying to deserialize data using a
converter that expects a schema. The connector does not require schemas, so you
need to configure the converter to not expect a schema. For example, if you are
using a JSON converter, you need to set value.converter.schemas.enable=false
or key.converter.schemas.enable=false
in the connector configuration.
Does this connector work with Debezium?
Yes, it's been tested with Debezium as a source and a
sample project
is available. Bear in mind that QuestDB is meant to be used as an append-only
database; hence, updates should be translated as new inserts. The connector
supports Debezium's ExtractNewRecordState
transformation to extract the new
state of the record. The transformation by default drops DELETE events, so there
is no need to handle them explicitly.
QuestDB is a time-series database, how does it fit into Change Data Capture via Debezium?
QuestDB works with Debezium just great! This is the recommended pattern: Transactional applications use a relational database to store the current state of the data. QuestDB is used to store the history of changes. Example: Imagine you have a PostgreSQL table with the most recent stock prices. Whenever a stock price changes, an application updates the PostgreSQL table. Debezium captures each UPDATE/INSERT and pushes it as an event to Kafka. Kafka Connect QuestDB connector reads the events and inserts them into QuestDB. In this way, PostgreSQL will have the most recent stock prices and QuestDB will have the history of changes. You can use QuestDB to build a dashboard with the most recent stock prices and a chart with the history of changes.
How I can select which fields to include in the target table?
Use the ReplaceField transformation to remove unwanted fields. For example, if
you want to remove the address
field, you can use the following configuration:
{
"name": "questdb-sink",
"config": {
"connector.class": "io.questdb.kafka.QuestDBSinkConnector",
"host": "localhost:9000",
"topics": "Orders",
"table": "orders_table",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "removeAddress",
"transforms.removeAddress.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.removeAddress.blacklist": "address"
}
}
See ReplaceField documentation for more details.
I need to run Kafka Connect on Java 8, but the connector says it requires Java 17. What should I do?
The Kafka Connect-specific part of the connectors works with Java 8. The
requirement for Java 17 is coming from QuestDB client itself. The zip archive
contains 2 JARs: questdb-kafka-connector-VERSION.jar
and
questdb-VERSION.jar
. You can replace the latter with
questdb-VERSION-jdk8.jar
from the
Maven central.
Please note that this setup is not officially supported, and you may encounter
issues. If you do, please report them to us.