Kafka Source
The Kafka source consumes messages from Kafka or Redpanda topics, supporting both UTF-8 JSON and Avro message formats with Schema Registry integration.
Installation
Section titled “Installation”pip install bizon[kafka]Quick Start
Section titled “Quick Start”name: kafka-pipeline
source: name: kafka stream: topic sync_mode: stream bootstrap_servers: localhost:9092 group_id: my-consumer-group message_encoding: utf-8 topics: - name: events destination_id: project.dataset.events authentication: type: basic params: username: user password: BIZON_ENV_KAFKA_PASSWORD
destination: name: logger config: {}name: kafka-avro-pipeline
source: name: kafka stream: topic sync_mode: stream bootstrap_servers: localhost:9092 group_id: my-consumer-group message_encoding: avro topics: - name: events destination_id: project.dataset.events authentication: type: basic params: username: user password: BIZON_ENV_KAFKA_PASSWORD schema_registry_type: apicurio schema_registry_url: https://registry.example.com/apis/ccompat/v7 schema_registry_username: registry-user schema_registry_password: BIZON_ENV_REGISTRY_PASSWORD
destination: name: bigquery_streaming_v2 config: project_id: my-project dataset_id: analyticsConfiguration Reference
Section titled “Configuration Reference”Source Fields
Section titled “Source Fields”| Field | Type | Required | Default | Description |
|---|---|---|---|---|
bootstrap_servers | string | Yes | - | Kafka bootstrap servers (comma-separated) |
group_id | string | No | bizon | Kafka consumer group ID |
topics | list | Yes | - | List of topics to consume |
message_encoding | enum | No | avro | Message format: utf-8 or avro |
batch_size | int | No | 100 | Messages per batch |
consumer_timeout | int | No | 10 | Seconds to wait for messages |
skip_message_empty_value | bool | No | true | Skip tombstone messages |
skip_message_invalid_keys | bool | No | false | Skip messages with unparsable keys |
consumer_config | dict | No | See below | Advanced Kafka consumer config |
Topic Configuration
Section titled “Topic Configuration”Each topic in the topics list:
| Field | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Kafka topic name |
destination_id | string | Yes | Target table identifier |
Authentication
Section titled “Authentication”The Kafka source uses Basic auth for SASL authentication:
authentication: type: basic params: username: kafka-user password: BIZON_ENV_KAFKA_PASSWORDSchema Registry (Avro)
Section titled “Schema Registry (Avro)”For Avro messages, configure Schema Registry:
| Field | Type | Required | Description |
|---|---|---|---|
schema_registry_type | enum | No | Registry type: apicurio |
schema_registry_url | string | Yes* | Schema Registry URL |
schema_registry_username | string | No | Registry username |
schema_registry_password | string | No | Registry password |
*Required when message_encoding: avro
Consumer Configuration
Section titled “Consumer Configuration”Default Kafka consumer settings:
consumer_config: auto.offset.reset: earliest enable.auto.commit: false session.timeout.ms: 45000 security.protocol: SASL_SSLOverride with any librdkafka configuration:
source: name: kafka stream: topic consumer_config: auto.offset.reset: latest session.timeout.ms: 60000 max.poll.interval.ms: 300000Sync Modes
Section titled “Sync Modes”The Kafka source supports two sync modes:
Stream Mode (Recommended)
Section titled “Stream Mode (Recommended)”Continuous consumption with offset tracking:
source: name: kafka stream: topic sync_mode: streamRun with the stream runner:
bizon run config.yml --runner streamFull Refresh Mode
Section titled “Full Refresh Mode”Manual partition assignment, syncs from beginning:
source: name: kafka stream: topic sync_mode: full_refreshRecord Output
Section titled “Record Output”Each Kafka message produces a record with:
| Field | Type | Description |
|---|---|---|
topic | string | Source topic name |
offset | int | Message offset |
partition | int | Partition number |
timestamp | datetime | Message timestamp |
keys | dict | Message key (parsed JSON) |
headers | dict | Message headers |
value | dict | Message value (parsed) |
schema | string | Avro schema (if applicable) |
Multi-Topic Routing
Section titled “Multi-Topic Routing”Route different topics to different tables:
source: name: kafka stream: topic sync_mode: stream bootstrap_servers: kafka:9092 topics: - name: user-events destination_id: project.dataset.user_events - name: order-events destination_id: project.dataset.order_events - name: system-logs destination_id: project.dataset.system_logs
destination: name: bigquery_streaming_v2 config: project_id: my-project dataset_id: analytics record_schemas: - destination_id: project.dataset.user_events record_schema: - name: user_id type: STRING mode: REQUIRED - name: action type: STRING mode: REQUIRED
- destination_id: project.dataset.order_events record_schema: - name: order_id type: STRING mode: REQUIRED - name: amount type: FLOAT64 mode: REQUIRED
- destination_id: project.dataset.system_logs record_schema: - name: level type: STRING mode: REQUIRED - name: message type: STRING mode: NULLABLEStreams Configuration
Section titled “Streams Configuration”For cleaner multi-topic setups, use the top-level streams config:
source: name: kafka stream: topic sync_mode: stream bootstrap_servers: kafka:9092 topics: [] # Empty - defined in streams below
destination: name: bigquery_streaming_v2 config: project_id: my-project dataset_id: analytics
streams: - name: user-events source: topic: user-events destination: table_id: project.dataset.user_events record_schema: - name: user_id type: STRING mode: REQUIRED clustering_keys: - user_id
- name: order-events source: topic: order-events destination: table_id: project.dataset.order_events record_schema: - name: order_id type: STRING mode: REQUIREDTransforms
Section titled “Transforms”Parse and transform Kafka messages:
source: name: kafka stream: topic message_encoding: utf-8
transforms: - label: parse-value python: | import json # data['value'] is the raw message value if isinstance(data.get('value'), str): data['value'] = json.loads(data['value'])
- label: extract-fields python: | value = data.get('value', {}) data = { 'event_type': value.get('type'), 'user_id': value.get('user_id'), 'timestamp': data.get('timestamp'), 'kafka_offset': data.get('offset') }Production Configuration
Section titled “Production Configuration”Complete production example:
name: kafka-to-bigquery
source: name: kafka stream: topic sync_mode: stream bootstrap_servers: kafka.example.com:9092 group_id: bizon-production message_encoding: avro batch_size: 500 consumer_timeout: 30 skip_message_empty_value: true topics: - name: events destination_id: my-project.analytics.events authentication: type: basic params: username: kafka-user password: BIZON_ENV_KAFKA_PASSWORD schema_registry_type: apicurio schema_registry_url: https://registry.example.com/apis/ccompat/v7 schema_registry_username: registry-user schema_registry_password: BIZON_ENV_REGISTRY_PASSWORD consumer_config: auto.offset.reset: earliest session.timeout.ms: 60000
destination: name: bigquery_streaming_v2 config: project_id: my-project dataset_id: analytics buffer_size: 100 buffer_flush_timeout: 300 unnest: true record_schemas: - destination_id: my-project.analytics.events record_schema: - name: event_id type: STRING mode: REQUIRED - name: event_type type: STRING mode: REQUIRED - name: payload type: JSON mode: NULLABLE - name: created_at type: TIMESTAMP mode: REQUIRED clustering_keys: - event_type
engine: backend: type: postgres config: host: db.example.com port: 5432 database: bizon schema: public username: bizon password: BIZON_ENV_DB_PASSWORD
runner: type: stream log_level: INFOTroubleshooting
Section titled “Troubleshooting”Connection Issues
Section titled “Connection Issues”# Increase timeout for slow networksconsumer_config: session.timeout.ms: 60000 request.timeout.ms: 30000Consumer Lag
Section titled “Consumer Lag”# Increase batch size for higher throughputbatch_size: 1000consumer_timeout: 60Schema Registry Errors
Section titled “Schema Registry Errors”Ensure the registry URL includes the compatibility API path:
# Apicurio Registryschema_registry_url: https://registry.example.com/apis/ccompat/v7
# Confluent Schema Registryschema_registry_url: https://registry.example.comNext Steps
Section titled “Next Steps”- Kafka to BigQuery Tutorial - Complete streaming guide
- BigQuery Destinations - Configure BigQuery loading
- Engine Configuration - Production backend setup