Skip to content

Kafka Source

The Kafka source consumes messages from Kafka or Redpanda topics, supporting both UTF-8 JSON and Avro message formats with Schema Registry integration.

Terminal window
pip install bizon[kafka]
name: kafka-pipeline
source:
name: kafka
stream: topic
sync_mode: stream
bootstrap_servers: localhost:9092
group_id: my-consumer-group
message_encoding: utf-8
topics:
- name: events
destination_id: project.dataset.events
authentication:
type: basic
params:
username: user
password: BIZON_ENV_KAFKA_PASSWORD
destination:
name: logger
config: {}
FieldTypeRequiredDefaultDescription
bootstrap_serversstringYes-Kafka bootstrap servers (comma-separated)
group_idstringNobizonKafka consumer group ID
topicslistYes-List of topics to consume
message_encodingenumNoavroMessage format: utf-8 or avro
batch_sizeintNo100Messages per batch
consumer_timeoutintNo10Seconds to wait for messages
skip_message_empty_valueboolNotrueSkip tombstone messages
skip_message_invalid_keysboolNofalseSkip messages with unparsable keys
consumer_configdictNoSee belowAdvanced Kafka consumer config

Each topic in the topics list:

FieldTypeRequiredDescription
namestringYesKafka topic name
destination_idstringYesTarget table identifier

The Kafka source uses Basic auth for SASL authentication:

authentication:
type: basic
params:
username: kafka-user
password: BIZON_ENV_KAFKA_PASSWORD

For Avro messages, configure Schema Registry:

FieldTypeRequiredDescription
schema_registry_typeenumNoRegistry type: apicurio
schema_registry_urlstringYes*Schema Registry URL
schema_registry_usernamestringNoRegistry username
schema_registry_passwordstringNoRegistry password

*Required when message_encoding: avro

Default Kafka consumer settings:

consumer_config:
auto.offset.reset: earliest
enable.auto.commit: false
session.timeout.ms: 45000
security.protocol: SASL_SSL

Override with any librdkafka configuration:

source:
name: kafka
stream: topic
consumer_config:
auto.offset.reset: latest
session.timeout.ms: 60000
max.poll.interval.ms: 300000

The Kafka source supports two sync modes:

Continuous consumption with offset tracking:

source:
name: kafka
stream: topic
sync_mode: stream

Run with the stream runner:

Terminal window
bizon run config.yml --runner stream

Manual partition assignment, syncs from beginning:

source:
name: kafka
stream: topic
sync_mode: full_refresh

Each Kafka message produces a record with:

FieldTypeDescription
topicstringSource topic name
offsetintMessage offset
partitionintPartition number
timestampdatetimeMessage timestamp
keysdictMessage key (parsed JSON)
headersdictMessage headers
valuedictMessage value (parsed)
schemastringAvro schema (if applicable)

Route different topics to different tables:

source:
name: kafka
stream: topic
sync_mode: stream
bootstrap_servers: kafka:9092
topics:
- name: user-events
destination_id: project.dataset.user_events
- name: order-events
destination_id: project.dataset.order_events
- name: system-logs
destination_id: project.dataset.system_logs
destination:
name: bigquery_streaming_v2
config:
project_id: my-project
dataset_id: analytics
record_schemas:
- destination_id: project.dataset.user_events
record_schema:
- name: user_id
type: STRING
mode: REQUIRED
- name: action
type: STRING
mode: REQUIRED
- destination_id: project.dataset.order_events
record_schema:
- name: order_id
type: STRING
mode: REQUIRED
- name: amount
type: FLOAT64
mode: REQUIRED
- destination_id: project.dataset.system_logs
record_schema:
- name: level
type: STRING
mode: REQUIRED
- name: message
type: STRING
mode: NULLABLE

For cleaner multi-topic setups, use the top-level streams config:

source:
name: kafka
stream: topic
sync_mode: stream
bootstrap_servers: kafka:9092
topics: [] # Empty - defined in streams below
destination:
name: bigquery_streaming_v2
config:
project_id: my-project
dataset_id: analytics
streams:
- name: user-events
source:
topic: user-events
destination:
table_id: project.dataset.user_events
record_schema:
- name: user_id
type: STRING
mode: REQUIRED
clustering_keys:
- user_id
- name: order-events
source:
topic: order-events
destination:
table_id: project.dataset.order_events
record_schema:
- name: order_id
type: STRING
mode: REQUIRED

Parse and transform Kafka messages:

source:
name: kafka
stream: topic
message_encoding: utf-8
transforms:
- label: parse-value
python: |
import json
# data['value'] is the raw message value
if isinstance(data.get('value'), str):
data['value'] = json.loads(data['value'])
- label: extract-fields
python: |
value = data.get('value', {})
data = {
'event_type': value.get('type'),
'user_id': value.get('user_id'),
'timestamp': data.get('timestamp'),
'kafka_offset': data.get('offset')
}

Complete production example:

name: kafka-to-bigquery
source:
name: kafka
stream: topic
sync_mode: stream
bootstrap_servers: kafka.example.com:9092
group_id: bizon-production
message_encoding: avro
batch_size: 500
consumer_timeout: 30
skip_message_empty_value: true
topics:
- name: events
destination_id: my-project.analytics.events
authentication:
type: basic
params:
username: kafka-user
password: BIZON_ENV_KAFKA_PASSWORD
schema_registry_type: apicurio
schema_registry_url: https://registry.example.com/apis/ccompat/v7
schema_registry_username: registry-user
schema_registry_password: BIZON_ENV_REGISTRY_PASSWORD
consumer_config:
auto.offset.reset: earliest
session.timeout.ms: 60000
destination:
name: bigquery_streaming_v2
config:
project_id: my-project
dataset_id: analytics
buffer_size: 100
buffer_flush_timeout: 300
unnest: true
record_schemas:
- destination_id: my-project.analytics.events
record_schema:
- name: event_id
type: STRING
mode: REQUIRED
- name: event_type
type: STRING
mode: REQUIRED
- name: payload
type: JSON
mode: NULLABLE
- name: created_at
type: TIMESTAMP
mode: REQUIRED
clustering_keys:
- event_type
engine:
backend:
type: postgres
config:
host: db.example.com
port: 5432
database: bizon
schema: public
username: bizon
password: BIZON_ENV_DB_PASSWORD
runner:
type: stream
log_level: INFO
# Increase timeout for slow networks
consumer_config:
session.timeout.ms: 60000
request.timeout.ms: 30000
# Increase batch size for higher throughput
batch_size: 1000
consumer_timeout: 60

Ensure the registry URL includes the compatibility API path:

# Apicurio Registry
schema_registry_url: https://registry.example.com/apis/ccompat/v7
# Confluent Schema Registry
schema_registry_url: https://registry.example.com