Advanced Quick Start
This guide walks you through creating a production-ready streaming pipeline that consumes events from Kafka and loads them into BigQuery.
Prerequisites
Section titled “Prerequisites”- Python 3.9+
- Access to a Kafka cluster
- Google Cloud project with BigQuery enabled
- Service account with BigQuery Data Editor role
Installation
Section titled “Installation”Install Bizon with Kafka and BigQuery support:
pip install "bizon[kafka,bigquery]"Build the Pipeline
Section titled “Build the Pipeline”-
Set up environment variables
Create environment variables for your credentials:
Terminal window export BIZON_ENV_KAFKA_USER="your-kafka-username"export BIZON_ENV_KAFKA_PASSWORD="your-kafka-password"export BIZON_ENV_GCP_SA_KEY='{"type":"service_account","project_id":"...}' -
Create the pipeline configuration
Create
streaming-pipeline.yml:name: events-to-bigquerysource:name: kafkastream: topicsync_mode: streambootstrap_servers: kafka.example.com:9092group_id: bizon-eventsmessage_encoding: utf-8batch_size: 500consumer_timeout: 30topics:- name: eventsdestination_id: my-project.analytics.eventsauthentication:type: basicparams:username: BIZON_ENV_KAFKA_USERpassword: BIZON_ENV_KAFKA_PASSWORDtransforms:- label: parse-eventpython: |import jsonvalue = data.get('value', {})if isinstance(value, str):value = json.loads(value)data = {'event_id': str(data.get('offset')),'event_type': value.get('type', 'unknown'),'user_id': value.get('user_id'),'payload': json.dumps(value),'timestamp': data.get('timestamp')}destination:name: bigquery_streaming_v2config:project_id: my-projectdataset_id: analyticsdataset_location: USbuffer_size: 50buffer_flush_timeout: 300unnest: truerecord_schemas:- destination_id: my-project.analytics.eventsrecord_schema:- name: event_idtype: STRINGmode: REQUIRED- name: event_typetype: STRINGmode: REQUIRED- name: user_idtype: STRINGmode: NULLABLE- name: payloadtype: JSONmode: NULLABLE- name: timestamptype: TIMESTAMPmode: REQUIREDclustering_keys:- event_typeauthentication:service_account_key: BIZON_ENV_GCP_SA_KEYengine:runner:type: streamlog_level: INFO -
Run the pipeline
Start the streaming pipeline:
Terminal window bizon run streaming-pipeline.yml --runner streamThe pipeline will:
- Connect to your Kafka cluster
- Consume messages from the
eventstopic - Transform each message
- Buffer and load to BigQuery
- Run continuously until stopped
-
Verify in BigQuery
Query your data:
SELECT *FROM `my-project.analytics.events`WHERE _bizon_loaded_at > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)ORDER BY timestamp DESCLIMIT 100;
Production Configuration
Section titled “Production Configuration”For production deployments, add a persistent backend for checkpointing:
engine: backend: type: postgres config: host: db.example.com port: 5432 database: bizon schema: public username: bizon password: BIZON_ENV_DB_PASSWORD syncCursorInDBEvery: 5
runner: type: stream log_level: INFOengine: backend: type: bigquery config: project_id: my-project dataset_id: bizon_state database: bizon schema: public syncCursorInDBEvery: 5
runner: type: stream log_level: INFOMulti-Topic Routing
Section titled “Multi-Topic Routing”Route different topics to different BigQuery tables:
source: name: kafka stream: topic topics: - name: user-events destination_id: my-project.analytics.user_events - name: order-events destination_id: my-project.analytics.order_events
destination: name: bigquery_streaming_v2 config: project_id: my-project dataset_id: analytics unnest: true record_schemas: - destination_id: my-project.analytics.user_events record_schema: - name: user_id type: STRING mode: REQUIRED - name: action type: STRING mode: REQUIRED
- destination_id: my-project.analytics.order_events record_schema: - name: order_id type: STRING mode: REQUIRED - name: amount type: FLOAT64 mode: REQUIREDAvro Messages
Section titled “Avro Messages”For Avro-encoded messages with Schema Registry:
source: name: kafka stream: topic message_encoding: avro authentication: type: basic params: username: BIZON_ENV_KAFKA_USER password: BIZON_ENV_KAFKA_PASSWORD schema_registry_type: apicurio schema_registry_url: https://registry.example.com/apis/ccompat/v7 schema_registry_username: registry-user schema_registry_password: BIZON_ENV_REGISTRY_PASSWORDDeployment
Section titled “Deployment”Docker
Section titled “Docker”FROM python:3.11-slimRUN pip install "bizon[kafka,bigquery]"COPY streaming-pipeline.yml /app/WORKDIR /appCMD ["bizon", "run", "streaming-pipeline.yml", "--runner", "stream"]docker build -t bizon-pipeline .docker run -e BIZON_ENV_KAFKA_USER=... -e BIZON_ENV_KAFKA_PASSWORD=... bizon-pipelineWhat’s Next?
Section titled “What’s Next?”- Kafka to BigQuery Tutorial - Complete walkthrough
- Kafka Source Reference - All Kafka options
- BigQuery Destinations - BigQuery variants
- Engine Configuration - Backends and runners