Skip to content

Kafka to BigQuery Streaming Pipeline

In this tutorial, you’ll build a complete streaming pipeline that consumes events from Kafka and loads them into BigQuery in real-time.

A production-ready pipeline that:

  • Consumes messages from Kafka topics
  • Decodes Avro messages with Schema Registry
  • Routes multiple topics to different BigQuery tables
  • Runs continuously with checkpoint recovery
  • Includes production configuration for backends and monitoring
┌──────────────┐ ┌───────────────┐ ┌─────────────────┐
│ Kafka │────▶│ Bizon │────▶│ BigQuery │
│ Topics │ │ Pipeline │ │ Tables │
└──────────────┘ └───────────────┘ └─────────────────┘
│ │
▼ ▼
┌──────────────┐ ┌───────────────┐
│ Schema │ │ Postgres │
│ Registry │ │ Backend │
└──────────────┘ └───────────────┘
  • Python 3.9+
  • Access to a Kafka cluster (or Redpanda)
  • Google Cloud project with BigQuery
  • Service account with BigQuery permissions
  • (Optional) Schema Registry for Avro
  1. Install Bizon with dependencies

    Terminal window
    pip install "bizon[kafka,bigquery]"
  2. Set up environment variables

    Create a .env file:

    Terminal window
    export BIZON_ENV_KAFKA_USER="your-kafka-username"
    export BIZON_ENV_KAFKA_PASSWORD="your-kafka-password"
    export BIZON_ENV_GCP_SA_KEY='{"type":"service_account",...}'
    export BIZON_ENV_DB_PASSWORD="your-postgres-password"

    Source it:

    Terminal window
    source .env
  3. Create a basic configuration

    Start with a simple setup. Create config.yml:

    name: kafka-to-bigquery
    source:
    name: kafka
    stream: topic
    sync_mode: stream
    bootstrap_servers: localhost:9092
    group_id: bizon-consumer
    message_encoding: utf-8
    batch_size: 100
    consumer_timeout: 10
    topics:
    - name: events
    destination_id: my-project.analytics.events
    authentication:
    type: basic
    params:
    username: BIZON_ENV_KAFKA_USER
    password: BIZON_ENV_KAFKA_PASSWORD
    destination:
    name: logger
    config: {}
  4. Test the basic setup

    Run with the logger destination to verify Kafka connectivity:

    Terminal window
    bizon run config.yml --runner stream --log-level DEBUG

    You should see messages being consumed and logged.

  5. Add BigQuery destination

    Update the destination to BigQuery Streaming V2:

    destination:
    name: bigquery_streaming_v2
    config:
    project_id: my-project
    dataset_id: analytics
    dataset_location: US
    buffer_size: 50
    buffer_flush_timeout: 300
    bq_max_rows_per_request: 5000
    authentication:
    service_account_key: BIZON_ENV_GCP_SA_KEY
  6. Add schema configuration

    Enable structured loading with unnest: true:

    destination:
    name: bigquery_streaming_v2
    config:
    project_id: my-project
    dataset_id: analytics
    dataset_location: US
    buffer_size: 50
    buffer_flush_timeout: 300
    unnest: true
    record_schemas:
    - destination_id: my-project.analytics.events
    record_schema:
    - name: event_id
    type: STRING
    mode: REQUIRED
    - name: event_type
    type: STRING
    mode: REQUIRED
    - name: user_id
    type: STRING
    mode: NULLABLE
    - name: payload
    type: JSON
    mode: NULLABLE
    - name: timestamp
    type: TIMESTAMP
    mode: REQUIRED
    clustering_keys:
    - event_type
    - user_id
    authentication:
    service_account_key: BIZON_ENV_GCP_SA_KEY
  7. Add transforms

    Transform Kafka messages to match your schema:

    transforms:
    - label: extract-fields
    python: |
    import json
    # Parse the Kafka message value
    value = data.get('value', {})
    if isinstance(value, str):
    value = json.loads(value)
    # Extract fields for BigQuery
    data = {
    'event_id': data.get('keys', {}).get('id', str(data.get('offset'))),
    'event_type': value.get('type', 'unknown'),
    'user_id': value.get('user_id'),
    'payload': json.dumps(value.get('data', {})),
    'timestamp': data.get('timestamp')
    }
  8. Configure multi-topic routing

    Route different topics to different tables:

    source:
    name: kafka
    stream: topic
    sync_mode: stream
    bootstrap_servers: kafka:9092
    group_id: bizon-production
    message_encoding: utf-8
    topics:
    - name: user-events
    destination_id: my-project.analytics.user_events
    - name: order-events
    destination_id: my-project.analytics.order_events
    - name: system-logs
    destination_id: my-project.analytics.system_logs
    authentication:
    type: basic
    params:
    username: BIZON_ENV_KAFKA_USER
    password: BIZON_ENV_KAFKA_PASSWORD

    Add corresponding schemas:

    destination:
    name: bigquery_streaming_v2
    config:
    project_id: my-project
    dataset_id: analytics
    unnest: true
    record_schemas:
    - destination_id: my-project.analytics.user_events
    record_schema:
    - name: user_id
    type: STRING
    mode: REQUIRED
    - name: action
    type: STRING
    mode: REQUIRED
    - name: timestamp
    type: TIMESTAMP
    mode: REQUIRED
    clustering_keys:
    - user_id
    - destination_id: my-project.analytics.order_events
    record_schema:
    - name: order_id
    type: STRING
    mode: REQUIRED
    - name: amount
    type: FLOAT64
    mode: REQUIRED
    - name: status
    type: STRING
    mode: REQUIRED
    clustering_keys:
    - status
    - destination_id: my-project.analytics.system_logs
    record_schema:
    - name: level
    type: STRING
    mode: REQUIRED
    - name: message
    type: STRING
    mode: NULLABLE
    - name: component
    type: STRING
    mode: NULLABLE
  9. Add Avro support with Schema Registry

    For Avro-encoded messages, configure Schema Registry:

    source:
    name: kafka
    stream: topic
    sync_mode: stream
    bootstrap_servers: kafka:9092
    group_id: bizon-production
    message_encoding: avro # Changed from utf-8
    topics:
    - name: events
    destination_id: my-project.analytics.events
    authentication:
    type: basic
    params:
    username: BIZON_ENV_KAFKA_USER
    password: BIZON_ENV_KAFKA_PASSWORD
    schema_registry_type: apicurio
    schema_registry_url: https://registry.example.com/apis/ccompat/v7
    schema_registry_username: registry-user
    schema_registry_password: BIZON_ENV_REGISTRY_PASSWORD
  10. Add production engine configuration

    Configure a persistent backend for checkpointing:

    engine:
    backend:
    type: postgres
    config:
    host: db.example.com
    port: 5432
    database: bizon
    schema: public
    username: bizon
    password: BIZON_ENV_DB_PASSWORD
    syncCursorInDBEvery: 5
    runner:
    type: stream
    log_level: INFO
  11. Create the complete production configuration

    Here’s the full production.yml:

    name: kafka-to-bigquery-production
    source:
    name: kafka
    stream: topic
    sync_mode: stream
    bootstrap_servers: kafka.example.com:9092
    group_id: bizon-production
    message_encoding: avro
    batch_size: 500
    consumer_timeout: 30
    skip_message_empty_value: true
    topics:
    - name: user-events
    destination_id: my-project.analytics.user_events
    - name: order-events
    destination_id: my-project.analytics.order_events
    authentication:
    type: basic
    params:
    username: BIZON_ENV_KAFKA_USER
    password: BIZON_ENV_KAFKA_PASSWORD
    schema_registry_type: apicurio
    schema_registry_url: https://registry.example.com/apis/ccompat/v7
    schema_registry_username: registry-user
    schema_registry_password: BIZON_ENV_REGISTRY_PASSWORD
    consumer_config:
    auto.offset.reset: earliest
    session.timeout.ms: 60000
    transforms:
    - label: extract-common-fields
    python: |
    import json
    from datetime import datetime
    value = data.get('value', {})
    # Common fields for all events
    data['kafka_topic'] = data.get('topic')
    data['kafka_offset'] = data.get('offset')
    data['kafka_partition'] = data.get('partition')
    data['event_timestamp'] = data.get('timestamp')
    # Flatten the value into data
    if isinstance(value, dict):
    for key, val in value.items():
    if isinstance(val, (dict, list)):
    data[key] = json.dumps(val)
    else:
    data[key] = val
    destination:
    name: bigquery_streaming_v2
    config:
    project_id: my-project
    dataset_id: analytics
    dataset_location: US
    buffer_size: 100
    buffer_flush_timeout: 300
    bq_max_rows_per_request: 5000
    unnest: true
    time_partitioning:
    type: DAY
    field: _bizon_loaded_at
    record_schemas:
    - destination_id: my-project.analytics.user_events
    record_schema:
    - name: user_id
    type: STRING
    mode: REQUIRED
    - name: action
    type: STRING
    mode: REQUIRED
    - name: properties
    type: JSON
    mode: NULLABLE
    - name: event_timestamp
    type: TIMESTAMP
    mode: REQUIRED
    - name: kafka_offset
    type: INTEGER
    mode: NULLABLE
    clustering_keys:
    - user_id
    - action
    - destination_id: my-project.analytics.order_events
    record_schema:
    - name: order_id
    type: STRING
    mode: REQUIRED
    - name: customer_id
    type: STRING
    mode: REQUIRED
    - name: amount
    type: FLOAT64
    mode: REQUIRED
    - name: status
    type: STRING
    mode: REQUIRED
    - name: event_timestamp
    type: TIMESTAMP
    mode: REQUIRED
    clustering_keys:
    - status
    - customer_id
    authentication:
    service_account_key: BIZON_ENV_GCP_SA_KEY
    engine:
    backend:
    type: postgres
    config:
    host: db.example.com
    port: 5432
    database: bizon
    schema: public
    username: bizon
    password: BIZON_ENV_DB_PASSWORD
    syncCursorInDBEvery: 5
    runner:
    type: stream
    log_level: INFO
  12. Run the production pipeline

    Terminal window
    bizon run production.yml --runner stream

    The pipeline will:

    • Connect to Kafka and consume messages
    • Decode Avro messages using Schema Registry
    • Transform and route to appropriate BigQuery tables
    • Checkpoint progress to PostgreSQL
    • Resume from last checkpoint on restart

Create a Dockerfile:

FROM python:3.11-slim
WORKDIR /app
RUN pip install "bizon[kafka,bigquery]"
COPY production.yml .
CMD ["bizon", "run", "production.yml", "--runner", "stream"]

Create a deployment.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
name: bizon-kafka-bigquery
spec:
replicas: 1
selector:
matchLabels:
app: bizon-kafka-bigquery
template:
metadata:
labels:
app: bizon-kafka-bigquery
spec:
containers:
- name: bizon
image: your-registry/bizon-kafka-bigquery:latest
env:
- name: BIZON_ENV_KAFKA_USER
valueFrom:
secretKeyRef:
name: kafka-secrets
key: username
- name: BIZON_ENV_KAFKA_PASSWORD
valueFrom:
secretKeyRef:
name: kafka-secrets
key: password
- name: BIZON_ENV_GCP_SA_KEY
valueFrom:
secretKeyRef:
name: gcp-secrets
key: service-account
- name: BIZON_ENV_DB_PASSWORD
valueFrom:
secretKeyRef:
name: postgres-secrets
key: password
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"

Check pipeline status in BigQuery:

-- Recent records loaded
SELECT
_bizon_loaded_at,
COUNT(*) as records
FROM `my-project.analytics.user_events`
WHERE _bizon_loaded_at > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
GROUP BY 1
ORDER BY 1 DESC;
-- Records by type
SELECT
action,
COUNT(*) as count
FROM `my-project.analytics.user_events`
WHERE _bizon_loaded_at > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
GROUP BY 1
ORDER BY 2 DESC;

If you see increasing consumer lag:

# Increase batch size and timeout
batch_size: 1000
consumer_timeout: 60
# Reduce buffer flush timeout
destination:
config:
buffer_flush_timeout: 60

Ensure the URL includes the compatibility API path:

# Apicurio
schema_registry_url: https://registry.example.com/apis/ccompat/v7
# Confluent
schema_registry_url: https://registry.example.com

Increase Kafka timeouts:

consumer_config:
session.timeout.ms: 90000
request.timeout.ms: 60000
max.poll.interval.ms: 600000