Share via


Connect to Apache Kafka

This article describes how you can use Apache Kafka as either a source or a sink when running Structured Streaming workloads on Azure Databricks.

For more information about Kafka, see the Apache Kafka documentation.

Read data from Kafka

Azure Databricks provides the kafka keyword as a data format to configure connections to Kafka. The following is an example for a streaming read:

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()

SQL

CREATE OR REFRESH STREAMING TABLE <table_name> AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>'
);

Azure Databricks also supports batch read semantics, as shown in the following example:

Python

df = (spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

Scala

val df = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'earliest',
  endingOffsets => 'latest'
);

For incremental batch loading, Databricks recommends using Kafka with Trigger.AvailableNow. See AvailableNow: Incremental batch processing.

In Databricks Runtime 13.3 LTS and above, Azure Databricks also provides a SQL function for reading Kafka data. Streaming with SQL is supported only in Lakeflow Spark Declarative Pipelines or with streaming tables in Databricks SQL. See read_kafka table-valued function.

Configure Kafka Structured Streaming reader

The following option must be set for the Kafka source for both batch and streaming queries:

Option Value Description
kafka.bootstrap.servers A comma-separated list of host:port The Kafka cluster bootstrap servers

Additionally, one of the following options is required to specify which topics to subscribe to:

Option Value Description
subscribe A comma-separated list of topics. The topic list to subscribe to.
subscribePattern Java regex string. The pattern used to subscribe to topic(s).
assign JSON string {"topicA":[0,1],"topic":[2,4]}. Specific topicPartitions to consume.

See the Options page for the full list of available options.

Schema for Kafka records

The records returned by the Kafka Structured Streaming reader will have the following schema:

Column Type
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

The key and the value are always deserialized as byte arrays with the ByteArrayDeserializer. Use DataFrame operations (such as cast("string") or from_avro) to explicitly deserialize the keys and values.

Write data to Kafka

The following is an example for a streaming write to Kafka:

Python

(df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Scala

df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()

Azure Databricks also supports batch write semantics to Kafka data sinks, as shown in the following example:

Python

(df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Scala

df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()

Configure the Kafka Structured Streaming writer

Important

Databricks Runtime 13.3 LTS and above includes a newer version of the kafka-clients library that enables idempotent writes by default. If a Kafka sink uses version 2.8.0 or below with ACLs configured, but without IDEMPOTENT_WRITE enabled, the write fails with the error message org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state.

Resolve this error by upgrading to Kafka version 2.8.0 or above, or by setting .option(“kafka.enable.idempotence”, “false”) while configuring your Structured Streaming writer.

The following are common options set while writing to Kafka:

Option Value Default value Description
kafka.boostrap.servers A comma-separated list of <host:port> none [Required] The Kafka bootstrap.servers configuration.
topic STRING not set [Optional] Sets the topic for all rows to be written. This option overrides any topic column that exists in the data.
includeHeaders BOOLEAN false [Optional] Whether to include the Kafka headers in the row.

See the Options page for the full list of available options.

Schema for Kafka writer

When writing data to Kafka, the provided DataFrame may include the following fields:

Column name Required or optional Type
key optional STRING or BINARY
value required STRING or BINARY
headers optional ARRAY
topic optional (ignored if topic is set as writer option) STRING
partition optional INT

Authentication

Azure Databricks supports multiple authentication methods for Kafka, including Unity Catalog service credentials, SASL/SSL, and cloud-specific options for AWS MSK, Azure Event Hubs, and Google Cloud Managed Kafka. See Authentication.

Retrieve Kafka metrics

You can monitor how far a streaming query is lagging behind Kafka using the avgOffsetsBehindLatest, maxOffsetsBehindLatest, and minOffsetsBehindLatest metrics. These report the average, maximum, and minimum offset lag across all subscribed topic partitions, relative to the latest offsets in Kafka. See Reading Metrics Interactively.

To estimate how much data the query has not yet consumed, use the estimatedTotalBytesBehindLatest metric. This metric estimates the total number of bytes remaining across all subscribed partitions based on the batches processed in the last 300 seconds. You can modify the time window used for this estimate by setting the bytesEstimateWindowLength option. For example, to set it to 10 minutes:

Python

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds

If you are running the stream in a notebook, you can see these metrics under the Raw Data tab in the streaming query progress dashboard:

{
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic]]",
      "metrics": {
        "avgOffsetsBehindLatest": "4.0",
        "maxOffsetsBehindLatest": "4",
        "minOffsetsBehindLatest": "4",
        "estimatedTotalBytesBehindLatest": "80.0"
      }
    }
  ]
}

See Monitoring Structured Streaming queries on Azure Databricks for more information.

Code example: Kafka to Delta

The following example demonstrates a complete workflow for continuously streaming data from Kafka to a Delta table. This pattern is ideal for near-real-time data ingestion workloads.

This example uses a fixed JSON schema. For other formats like Avro or Protobuf, use from_avro or from_protobuf. You can also integrate with a schema registry. See Example with Schema Registry.

Python

from pyspark.sql.functions import from_json, col

# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"

# Configure Kafka options with service credentials
kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9092",
  "subscribe": "<topic-name>",
  "databricks.serviceCredential": "<service-credential-name>",
}

# Read from Kafka and parse JSON
parsed_df = (spark.readStream
  .format("kafka")
  .options(**kafka_options)
  .load()
  .select(
    from_json(col("key").cast("string"), key_schema).alias("key"),
    from_json(col("value").cast("string"), value_schema).alias("value")
  )
  .select("key.*", "value.*")
)

# Write to Delta table
query = (parsed_df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(processingTime="10 seconds")
  .toTable("catalog.schema.events_table")
)

query.awaitTermination()

Scala

import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.streaming.Trigger

// Define JSON schemas for key and value
val keySchema = "user_id STRING"
val valueSchema = "event_type STRING, event_ts TIMESTAMP"

// Configure Kafka options with service credentials
val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9092",
  "subscribe" -> "<topic-name>",
  "databricks.serviceCredential" -> "<service-credential-name>"
)

// Read from Kafka and parse JSON
val parsedDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()
  .select(
    from_json(col("key").cast("string"), keySchema).alias("key"),
    from_json(col("value").cast("string"), valueSchema).alias("value")
  )
  .select("key.*", "value.*")

// Write to Delta table
val query = parsedDF.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .toTable("catalog.schema.events_table")

query.awaitTermination()

SQL

-- Create a streaming table from Kafka using read_kafka
CREATE OR REFRESH STREAMING TABLE catalog.schema.events_table AS
SELECT
  key::string:user_id AS user_id,
  value::string:event_type AS event_type,
  to_timestamp(value::string:event_ts) AS event_ts
FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9092',
  subscribe => '<topic-name>',
  serviceCredential => '<service-credential-name>'
);