Kafka to BigQuery Streaming Pipeline
In this tutorial, you’ll build a complete streaming pipeline that consumes events from Kafka and loads them into BigQuery in real-time.
What You’ll Build
Section titled “What You’ll Build”A production-ready pipeline that:
- Consumes messages from Kafka topics
- Decodes Avro messages with Schema Registry
- Routes multiple topics to different BigQuery tables
- Runs continuously with checkpoint recovery
- Includes production configuration for backends and monitoring
Architecture
Section titled “Architecture”┌──────────────┐ ┌───────────────┐ ┌─────────────────┐│ Kafka │────▶│ Bizon │────▶│ BigQuery ││ Topics │ │ Pipeline │ │ Tables │└──────────────┘ └───────────────┘ └─────────────────┘ │ │ ▼ ▼┌──────────────┐ ┌───────────────┐│ Schema │ │ Postgres ││ Registry │ │ Backend │└──────────────┘ └───────────────┘Prerequisites
Section titled “Prerequisites”- Python 3.9+
- Access to a Kafka cluster (or Redpanda)
- Google Cloud project with BigQuery
- Service account with BigQuery permissions
- (Optional) Schema Registry for Avro
Tutorial
Section titled “Tutorial”-
Install Bizon with dependencies
Terminal window pip install "bizon[kafka,bigquery]" -
Set up environment variables
Create a
.envfile:Terminal window export BIZON_ENV_KAFKA_USER="your-kafka-username"export BIZON_ENV_KAFKA_PASSWORD="your-kafka-password"export BIZON_ENV_GCP_SA_KEY='{"type":"service_account",...}'export BIZON_ENV_DB_PASSWORD="your-postgres-password"Source it:
Terminal window source .env -
Create a basic configuration
Start with a simple setup. Create
config.yml:name: kafka-to-bigquerysource:name: kafkastream: topicsync_mode: streambootstrap_servers: localhost:9092group_id: bizon-consumermessage_encoding: utf-8batch_size: 100consumer_timeout: 10topics:- name: eventsdestination_id: my-project.analytics.eventsauthentication:type: basicparams:username: BIZON_ENV_KAFKA_USERpassword: BIZON_ENV_KAFKA_PASSWORDdestination:name: loggerconfig: {} -
Test the basic setup
Run with the logger destination to verify Kafka connectivity:
Terminal window bizon run config.yml --runner stream --log-level DEBUGYou should see messages being consumed and logged.
-
Add BigQuery destination
Update the destination to BigQuery Streaming V2:
destination:name: bigquery_streaming_v2config:project_id: my-projectdataset_id: analyticsdataset_location: USbuffer_size: 50buffer_flush_timeout: 300bq_max_rows_per_request: 5000authentication:service_account_key: BIZON_ENV_GCP_SA_KEY -
Add schema configuration
Enable structured loading with
unnest: true:destination:name: bigquery_streaming_v2config:project_id: my-projectdataset_id: analyticsdataset_location: USbuffer_size: 50buffer_flush_timeout: 300unnest: truerecord_schemas:- destination_id: my-project.analytics.eventsrecord_schema:- name: event_idtype: STRINGmode: REQUIRED- name: event_typetype: STRINGmode: REQUIRED- name: user_idtype: STRINGmode: NULLABLE- name: payloadtype: JSONmode: NULLABLE- name: timestamptype: TIMESTAMPmode: REQUIREDclustering_keys:- event_type- user_idauthentication:service_account_key: BIZON_ENV_GCP_SA_KEY -
Add transforms
Transform Kafka messages to match your schema:
transforms:- label: extract-fieldspython: |import json# Parse the Kafka message valuevalue = data.get('value', {})if isinstance(value, str):value = json.loads(value)# Extract fields for BigQuerydata = {'event_id': data.get('keys', {}).get('id', str(data.get('offset'))),'event_type': value.get('type', 'unknown'),'user_id': value.get('user_id'),'payload': json.dumps(value.get('data', {})),'timestamp': data.get('timestamp')} -
Configure multi-topic routing
Route different topics to different tables:
source:name: kafkastream: topicsync_mode: streambootstrap_servers: kafka:9092group_id: bizon-productionmessage_encoding: utf-8topics:- name: user-eventsdestination_id: my-project.analytics.user_events- name: order-eventsdestination_id: my-project.analytics.order_events- name: system-logsdestination_id: my-project.analytics.system_logsauthentication:type: basicparams:username: BIZON_ENV_KAFKA_USERpassword: BIZON_ENV_KAFKA_PASSWORDAdd corresponding schemas:
destination:name: bigquery_streaming_v2config:project_id: my-projectdataset_id: analyticsunnest: truerecord_schemas:- destination_id: my-project.analytics.user_eventsrecord_schema:- name: user_idtype: STRINGmode: REQUIRED- name: actiontype: STRINGmode: REQUIRED- name: timestamptype: TIMESTAMPmode: REQUIREDclustering_keys:- user_id- destination_id: my-project.analytics.order_eventsrecord_schema:- name: order_idtype: STRINGmode: REQUIRED- name: amounttype: FLOAT64mode: REQUIRED- name: statustype: STRINGmode: REQUIREDclustering_keys:- status- destination_id: my-project.analytics.system_logsrecord_schema:- name: leveltype: STRINGmode: REQUIRED- name: messagetype: STRINGmode: NULLABLE- name: componenttype: STRINGmode: NULLABLE -
Add Avro support with Schema Registry
For Avro-encoded messages, configure Schema Registry:
source:name: kafkastream: topicsync_mode: streambootstrap_servers: kafka:9092group_id: bizon-productionmessage_encoding: avro # Changed from utf-8topics:- name: eventsdestination_id: my-project.analytics.eventsauthentication:type: basicparams:username: BIZON_ENV_KAFKA_USERpassword: BIZON_ENV_KAFKA_PASSWORDschema_registry_type: apicurioschema_registry_url: https://registry.example.com/apis/ccompat/v7schema_registry_username: registry-userschema_registry_password: BIZON_ENV_REGISTRY_PASSWORD -
Add production engine configuration
Configure a persistent backend for checkpointing:
engine:backend:type: postgresconfig:host: db.example.comport: 5432database: bizonschema: publicusername: bizonpassword: BIZON_ENV_DB_PASSWORDsyncCursorInDBEvery: 5runner:type: streamlog_level: INFO -
Create the complete production configuration
Here’s the full
production.yml:name: kafka-to-bigquery-productionsource:name: kafkastream: topicsync_mode: streambootstrap_servers: kafka.example.com:9092group_id: bizon-productionmessage_encoding: avrobatch_size: 500consumer_timeout: 30skip_message_empty_value: truetopics:- name: user-eventsdestination_id: my-project.analytics.user_events- name: order-eventsdestination_id: my-project.analytics.order_eventsauthentication:type: basicparams:username: BIZON_ENV_KAFKA_USERpassword: BIZON_ENV_KAFKA_PASSWORDschema_registry_type: apicurioschema_registry_url: https://registry.example.com/apis/ccompat/v7schema_registry_username: registry-userschema_registry_password: BIZON_ENV_REGISTRY_PASSWORDconsumer_config:auto.offset.reset: earliestsession.timeout.ms: 60000transforms:- label: extract-common-fieldspython: |import jsonfrom datetime import datetimevalue = data.get('value', {})# Common fields for all eventsdata['kafka_topic'] = data.get('topic')data['kafka_offset'] = data.get('offset')data['kafka_partition'] = data.get('partition')data['event_timestamp'] = data.get('timestamp')# Flatten the value into dataif isinstance(value, dict):for key, val in value.items():if isinstance(val, (dict, list)):data[key] = json.dumps(val)else:data[key] = valdestination:name: bigquery_streaming_v2config:project_id: my-projectdataset_id: analyticsdataset_location: USbuffer_size: 100buffer_flush_timeout: 300bq_max_rows_per_request: 5000unnest: truetime_partitioning:type: DAYfield: _bizon_loaded_atrecord_schemas:- destination_id: my-project.analytics.user_eventsrecord_schema:- name: user_idtype: STRINGmode: REQUIRED- name: actiontype: STRINGmode: REQUIRED- name: propertiestype: JSONmode: NULLABLE- name: event_timestamptype: TIMESTAMPmode: REQUIRED- name: kafka_offsettype: INTEGERmode: NULLABLEclustering_keys:- user_id- action- destination_id: my-project.analytics.order_eventsrecord_schema:- name: order_idtype: STRINGmode: REQUIRED- name: customer_idtype: STRINGmode: REQUIRED- name: amounttype: FLOAT64mode: REQUIRED- name: statustype: STRINGmode: REQUIRED- name: event_timestamptype: TIMESTAMPmode: REQUIREDclustering_keys:- status- customer_idauthentication:service_account_key: BIZON_ENV_GCP_SA_KEYengine:backend:type: postgresconfig:host: db.example.comport: 5432database: bizonschema: publicusername: bizonpassword: BIZON_ENV_DB_PASSWORDsyncCursorInDBEvery: 5runner:type: streamlog_level: INFO -
Run the production pipeline
Terminal window bizon run production.yml --runner streamThe pipeline will:
- Connect to Kafka and consume messages
- Decode Avro messages using Schema Registry
- Transform and route to appropriate BigQuery tables
- Checkpoint progress to PostgreSQL
- Resume from last checkpoint on restart
Deployment
Section titled “Deployment”Docker
Section titled “Docker”Create a Dockerfile:
FROM python:3.11-slim
WORKDIR /app
RUN pip install "bizon[kafka,bigquery]"
COPY production.yml .
CMD ["bizon", "run", "production.yml", "--runner", "stream"]Kubernetes
Section titled “Kubernetes”Create a deployment.yaml:
apiVersion: apps/v1kind: Deploymentmetadata: name: bizon-kafka-bigqueryspec: replicas: 1 selector: matchLabels: app: bizon-kafka-bigquery template: metadata: labels: app: bizon-kafka-bigquery spec: containers: - name: bizon image: your-registry/bizon-kafka-bigquery:latest env: - name: BIZON_ENV_KAFKA_USER valueFrom: secretKeyRef: name: kafka-secrets key: username - name: BIZON_ENV_KAFKA_PASSWORD valueFrom: secretKeyRef: name: kafka-secrets key: password - name: BIZON_ENV_GCP_SA_KEY valueFrom: secretKeyRef: name: gcp-secrets key: service-account - name: BIZON_ENV_DB_PASSWORD valueFrom: secretKeyRef: name: postgres-secrets key: password resources: requests: memory: "512Mi" cpu: "250m" limits: memory: "1Gi" cpu: "500m"Monitoring
Section titled “Monitoring”Check pipeline status in BigQuery:
-- Recent records loadedSELECT _bizon_loaded_at, COUNT(*) as recordsFROM `my-project.analytics.user_events`WHERE _bizon_loaded_at > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)GROUP BY 1ORDER BY 1 DESC;
-- Records by typeSELECT action, COUNT(*) as countFROM `my-project.analytics.user_events`WHERE _bizon_loaded_at > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)GROUP BY 1ORDER BY 2 DESC;Troubleshooting
Section titled “Troubleshooting”Consumer Lag
Section titled “Consumer Lag”If you see increasing consumer lag:
# Increase batch size and timeoutbatch_size: 1000consumer_timeout: 60
# Reduce buffer flush timeoutdestination: config: buffer_flush_timeout: 60Schema Registry Errors
Section titled “Schema Registry Errors”Ensure the URL includes the compatibility API path:
# Apicurioschema_registry_url: https://registry.example.com/apis/ccompat/v7
# Confluentschema_registry_url: https://registry.example.comConnection Timeouts
Section titled “Connection Timeouts”Increase Kafka timeouts:
consumer_config: session.timeout.ms: 90000 request.timeout.ms: 60000 max.poll.interval.ms: 600000Next Steps
Section titled “Next Steps”- Kafka Source Reference - Complete configuration options
- BigQuery Destinations - All BigQuery variants
- Engine Configuration - Production backends and queues