Skip to content

Advanced Quick Start

This guide walks you through creating a production-ready streaming pipeline that consumes events from Kafka and loads them into BigQuery.

  • Python 3.9+
  • Access to a Kafka cluster
  • Google Cloud project with BigQuery enabled
  • Service account with BigQuery Data Editor role

Install Bizon with Kafka and BigQuery support:

Terminal window
pip install "bizon[kafka,bigquery]"
  1. Set up environment variables

    Create environment variables for your credentials:

    Terminal window
    export BIZON_ENV_KAFKA_USER="your-kafka-username"
    export BIZON_ENV_KAFKA_PASSWORD="your-kafka-password"
    export BIZON_ENV_GCP_SA_KEY='{"type":"service_account","project_id":"...}'
  2. Create the pipeline configuration

    Create streaming-pipeline.yml:

    name: events-to-bigquery
    source:
    name: kafka
    stream: topic
    sync_mode: stream
    bootstrap_servers: kafka.example.com:9092
    group_id: bizon-events
    message_encoding: utf-8
    batch_size: 500
    consumer_timeout: 30
    topics:
    - name: events
    destination_id: my-project.analytics.events
    authentication:
    type: basic
    params:
    username: BIZON_ENV_KAFKA_USER
    password: BIZON_ENV_KAFKA_PASSWORD
    transforms:
    - label: parse-event
    python: |
    import json
    value = data.get('value', {})
    if isinstance(value, str):
    value = json.loads(value)
    data = {
    'event_id': str(data.get('offset')),
    'event_type': value.get('type', 'unknown'),
    'user_id': value.get('user_id'),
    'payload': json.dumps(value),
    'timestamp': data.get('timestamp')
    }
    destination:
    name: bigquery_streaming_v2
    config:
    project_id: my-project
    dataset_id: analytics
    dataset_location: US
    buffer_size: 50
    buffer_flush_timeout: 300
    unnest: true
    record_schemas:
    - destination_id: my-project.analytics.events
    record_schema:
    - name: event_id
    type: STRING
    mode: REQUIRED
    - name: event_type
    type: STRING
    mode: REQUIRED
    - name: user_id
    type: STRING
    mode: NULLABLE
    - name: payload
    type: JSON
    mode: NULLABLE
    - name: timestamp
    type: TIMESTAMP
    mode: REQUIRED
    clustering_keys:
    - event_type
    authentication:
    service_account_key: BIZON_ENV_GCP_SA_KEY
    engine:
    runner:
    type: stream
    log_level: INFO
  3. Run the pipeline

    Start the streaming pipeline:

    Terminal window
    bizon run streaming-pipeline.yml --runner stream

    The pipeline will:

    • Connect to your Kafka cluster
    • Consume messages from the events topic
    • Transform each message
    • Buffer and load to BigQuery
    • Run continuously until stopped
  4. Verify in BigQuery

    Query your data:

    SELECT *
    FROM `my-project.analytics.events`
    WHERE _bizon_loaded_at > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
    ORDER BY timestamp DESC
    LIMIT 100;

For production deployments, add a persistent backend for checkpointing:

engine:
backend:
type: postgres
config:
host: db.example.com
port: 5432
database: bizon
schema: public
username: bizon
password: BIZON_ENV_DB_PASSWORD
syncCursorInDBEvery: 5
runner:
type: stream
log_level: INFO

Route different topics to different BigQuery tables:

source:
name: kafka
stream: topic
topics:
- name: user-events
destination_id: my-project.analytics.user_events
- name: order-events
destination_id: my-project.analytics.order_events
destination:
name: bigquery_streaming_v2
config:
project_id: my-project
dataset_id: analytics
unnest: true
record_schemas:
- destination_id: my-project.analytics.user_events
record_schema:
- name: user_id
type: STRING
mode: REQUIRED
- name: action
type: STRING
mode: REQUIRED
- destination_id: my-project.analytics.order_events
record_schema:
- name: order_id
type: STRING
mode: REQUIRED
- name: amount
type: FLOAT64
mode: REQUIRED

For Avro-encoded messages with Schema Registry:

source:
name: kafka
stream: topic
message_encoding: avro
authentication:
type: basic
params:
username: BIZON_ENV_KAFKA_USER
password: BIZON_ENV_KAFKA_PASSWORD
schema_registry_type: apicurio
schema_registry_url: https://registry.example.com/apis/ccompat/v7
schema_registry_username: registry-user
schema_registry_password: BIZON_ENV_REGISTRY_PASSWORD
FROM python:3.11-slim
RUN pip install "bizon[kafka,bigquery]"
COPY streaming-pipeline.yml /app/
WORKDIR /app
CMD ["bizon", "run", "streaming-pipeline.yml", "--runner", "stream"]
Terminal window
docker build -t bizon-pipeline .
docker run -e BIZON_ENV_KAFKA_USER=... -e BIZON_ENV_KAFKA_PASSWORD=... bizon-pipeline