Share via


Options

This page describes configuration options for reading from and writing to Apache Kafka using Structured Streaming on Azure Databricks.

The Azure Databricks Kafka connector is built on top of the Apache Spark Kafka connector and supports all standard Kafka configuration options. Any option prefixed with kafka. is passed through directly to the underlying Kafka client. For example, .option("kafka.max.poll.records", "500") sets the Kafka consumer's max.poll.records property. See the Kafka configuration documentation for the full list of available Kafka properties.

For additional Structured Streaming source and sink options not listed on this page, see the Structured Streaming + Kafka Integration Guide.

Required options

The following option is required for both reading and writing:

Option Value Description
kafka.bootstrap.servers A comma-separated list of host:port The Kafka bootstrap.servers configuration. If you find there is no data from Kafka, check the broker address list first. If the broker address list is incorrect, there might not be any errors. This is because Kafka client assumes the brokers will become available eventually and in the event of network errors retry forever.

When reading from Kafka, you must also specify one of the following options to identify which topics to consume:

Option Value Description
subscribe A comma-separated list of topics The topic list to subscribe to.
subscribePattern Java regex string The pattern used to subscribe to topic(s).
assign JSON string {"topicA":[0,1],"topicB":[2,4]} Specific topic partitions to consume.

When writing to Kafka, you can optionally set the topic option to specify a destination topic for all rows. If not set, the DataFrame must include a topic column.

Common reader options

The following options are commonly used when reading from Kafka:

Option Value Default Description
minPartitions INT none Minimum number of partitions to read from Kafka. Normally, Spark creates one partition per Kafka topic-partition. Setting this higher splits large Kafka partitions into smaller Spark partitions for increased parallelism. Useful for handling data skew or peak loads. Note: Enabling this reinitializes Kafka consumers at each trigger, which may impact performance when using SSL.
maxRecordsPerPartition LONG none Maximum number of records per Spark partition. When set, Spark splits Kafka partitions so each Spark partition has at most this many records. Can be used with minPartitions; when both are set, Spark uses whichever results in more partitions.
failOnDataLoss BOOLEAN true Whether to fail the query when it's possible that data was lost. Queries can permanently fail to read data from Kafka due to many scenarios such as deleted topics, topic truncation before processing, and so on. We try to estimate conservatively whether data was possibly lost or not. Sometimes this can cause false alarms. Set this option to false if it does not work as expected, or you want the query to continue processing despite data loss.
maxOffsetsPerTrigger LONG none [Streaming only] Rate limit on maximum number of offsets processed per trigger interval. The total number of offsets is proportionally split across topic partitions.
For more advanced flow control, you can also use minOffsetsPerTrigger (minimum offsets before triggering) and maxTriggerDelay (maximum wait time, default 15m). See the Spark Kafka integration guide for details.
startingOffsets earliest, latest, or JSON string latest Determines where to start reading when a query begins. Use earliest to read from the earliest available offsets, latest to read only new data after stream start, or a JSON string to specify a starting offset for each topic partition (for example, {"topicA":{"0":23,"1":-2},"topicB":{"0":-2}}). In the JSON, -2 refers to earliest and -1 to latest.
For streaming queries, this only applies when a new query is started; resuming always picks up from where the query left off. Newly discovered partitions start at earliest.
Note: For batch queries, latest (either implicitly or by using -1 in JSON) is not allowed. To start from a specific timestamp instead, use startingTimestamp or startingOffsetsByTimestamp.
endingOffsets latest or JSON string latest [Batch only] The end point when a batch query is ended. Use latest to read up to the most recent offsets, or a JSON string to specify an ending offset for each topic partition (for example, {"topicA":{"0":50,"1":-1},"topicB":{"0":-1}}). In the JSON, -1 refers to latest; -2 (earlisest) is not allowed. To end at a specific timestamp instead, use endingTimestamp or endingOffsetsByTimestamp.
groupIdPrefix STRING spark-kafka-source (streaming) or spark-kafka-relation (batch) Prefix for the auto-generated consumer group ID. The connector automatically generates a unique group.id for each query; this option customizes the prefix of that generated ID. Ignored if kafka.group.id is set.
kafka.group.id STRING none Group ID to use while reading from Kafka. Use this with caution. By default, each query generates a unique group ID for reading data. This ensures that each query has its own consumer group that does not face interference from any other consumer, and therefore can read all of the partitions of its subscribed topics. In some scenarios (for example, Kafka group-based authorization), you may want to use specific authorized group IDs to read data. You can optionally set the group ID. However, do this with extreme caution as it can cause unexpected behavior.
  • Concurrently running queries (both, batch and streaming) with the same group ID are likely interfere with each other causing each query to read only part of the data.
  • This may also occur when queries are started/restarted in quick succession. To minimize such issues, set the Kafka consumer configuration session.timeout.ms to be very small.
includeHeaders BOOLEAN false Whether to include Kafka message headers in the output.
bytesEstimateWindowLength STRING 300s [Streaming only] Time window used to estimate remaining bytes via the estimatedTotalBytesBehindLatest metric. Accepts duration strings like 10m (10 minutes) or 600s (600 seconds). See Retrieve Kafka metrics.

Common writer options

The following options are commonly used when writing to Kafka:

Option Value Default Description
topic STRING none Sets the topic for all rows. This overrides any topic column in the data.
includeHeaders BOOLEAN false Whether to include Kafka headers in the row.

Important

Databricks Runtime 13.3 LTS and above includes a newer version of the kafka-clients library that enables idempotent writes by default. If your Kafka sink uses version 2.8.0 or below with ACLs configured but without IDEMPOTENT_WRITE enabled, writes will fail. Resolve this by upgrading to Kafka 2.8.0 or above, or by setting .option("kafka.enable.idempotence", "false").

Authentication options

Azure Databricks supports multiple authentication methods for Kafka, including Unity Catalog service credentials, SASL/SSL, and cloud-specific options for AWS MSK, Azure Event Hubs, and Google Cloud Managed Kafka.

Azure Databricks recommends using Unity Catalog service credentials for authentication to cloud-managed Kafka services:

Option Value Description
databricks.serviceCredential STRING The name of a Unity Catalog service credential for authenticating to cloud-managed Kafka services (AWS MSK, Azure Event Hubs, or Google Cloud Managed Kafka). Available in Databricks Runtime 16.1 and above.
databricks.serviceCredential.scope STRING The OAuth scope for the service credential. Only set this if Azure Databricks cannot automatically infer the scope for your Kafka service.

When you use a Unity Catalog service credential, you do not need to specify SASL/SSL options like kafka.sasl.mechanism, kafka.sasl.jaas.config, or kafka.security.protocol.

Common SASL/SSL options include:

Option Value Description
kafka.security.protocol STRING Protocol used to communicate with brokers (for example, SASL_SSL, SSL, PLAINTEXT).
kafka.sasl.mechanism STRING SASL mechanism (for example, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER, AWS_MSK_IAM).
kafka.sasl.jaas.config STRING JAAS login configuration string.
kafka.sasl.login.callback.handler.class STRING Fully qualified class name of a login callback handler for SASL authentication.
kafka.sasl.client.callback.handler.class STRING Fully qualified class name of a client callback handler for SASL authentication.
kafka.ssl.truststore.location STRING Location of the SSL trust store file.
kafka.ssl.truststore.password STRING Password for the SSL trust store file.
kafka.ssl.keystore.location STRING Location of the SSL key store file.
kafka.ssl.keystore.password STRING Password for the SSL key store file.

For complete authentication setup instructions, see Authentication.

Additional resources