Skip to content

Destinations Overview

Destinations receive extracted data and load it into target systems like data warehouses, files, or logging systems. Bizon buffers records for efficient batch writes.

DestinationTypeUse CaseInstallation
bigqueryBatch LoadLarge batch loads via GCSpip install bizon[bigquery]
bigquery_streamingReal-timeLow-latency streamingpip install bizon[bigquery]
bigquery_streaming_v2Real-timeHigh-throughput streamingpip install bizon[bigquery]
fileLocal FileTesting, local storageBuilt-in
loggerConsoleDebugging, testingBuilt-in
destination:
name: bigquery
config:
project_id: my-project
dataset_id: my_dataset
dataset_location: US
buffer_size: 50 # Buffer size in MB
buffer_flush_timeout: 600 # Max buffer age in seconds

All destinations inherit these options:

FieldTypeDefaultDescription
buffer_sizeint50Buffer size in MB before flushing. Set to 0 for immediate writes.
buffer_flush_timeoutint600Max seconds to buffer before force-flushing
max_concurrent_threadsint10Parallel write threads
unnestboolfalseFlatten nested JSON to columns
record_schemaslist-Schema definitions (required if unnest: true)
destination_idstring-Default destination table identifier

Bizon buffers records in memory before writing to optimize throughput:

┌─────────────────────────────────────────────────────────────┐
│ Buffer Strategy │
├─────────────────────────────────────────────────────────────┤
│ │
│ Records flow in → Buffer accumulates → Flush on trigger │
│ │
│ Flush triggers: │
│ 1. Buffer reaches size limit (buffer_size MB) │
│ 2. Buffer age exceeds timeout (buffer_flush_timeout sec) │
│ 3. Source indicates last iteration │
│ │
└─────────────────────────────────────────────────────────────┘

Disable buffering for streaming workloads:

destination:
name: bigquery_streaming_v2
config:
buffer_size: 0 # Write immediately

For structured writes, define schemas with record_schemas:

destination:
name: bigquery_streaming_v2
config:
project_id: my-project
dataset_id: analytics
unnest: true
record_schemas:
- destination_id: my-project.analytics.users
record_schema:
- name: user_id
type: STRING
mode: REQUIRED
- name: email
type: STRING
mode: NULLABLE
- name: created_at
type: TIMESTAMP
mode: REQUIRED
clustering_keys:
- user_id
FieldTypeDescription
destination_idstringFull table identifier (project.dataset.table)
record_schemalistColumn definitions
clustering_keyslistColumns to cluster by (BigQuery)
FieldTypeDescription
namestringColumn name
typestringData type (STRING, INTEGER, TIMESTAMP, JSON, etc.)
modestringNULLABLE, REQUIRED, or REPEATED
descriptionstringOptional column description
default_value_expressionstringDefault value SQL expression

Route records to different tables using destination_id on each record:

source:
name: kafka
stream: topic
topics:
- name: user-events
destination_id: project.dataset.user_events
- name: system-events
destination_id: project.dataset.system_events
destination:
name: bigquery_streaming_v2
config:
project_id: my-project
dataset_id: analytics
record_schemas:
- destination_id: project.dataset.user_events
record_schema: [...]
- destination_id: project.dataset.system_events
record_schema: [...]

Implement AbstractDestination to create custom destinations.

from bizon.destination.destination import AbstractDestination
import polars as pl
class MyDestination(AbstractDestination):
def check_connection(self) -> bool:
"""Test connectivity to the destination."""
return True
def write_records(
self,
df_destination_records: pl.DataFrame
) -> tuple[bool, str | None]:
"""
Write a batch of records.
Returns:
(success, error_message)
"""
try:
# df_destination_records contains columns:
# - bizon_id: str
# - bizon_extracted_at: datetime
# - bizon_loaded_at: datetime
# - source_data: str (JSON)
# - source_timestamp: datetime
# - destination_id: str
for row in df_destination_records.iter_rows(named=True):
self.write_row(row)
return True, None
except Exception as e:
return False, str(e)
def finalize(self) -> bool:
"""Optional: Post-write operations (e.g., temp to main table)."""
return True
MethodReturnsDescription
check_connection()boolTest destination connectivity
write_records(df)tuple[bool, str|None]Write batch, return (success, error)
MethodReturnsDescription
finalize()boolPost-sync cleanup operations

Records passed to write_records contain:

ColumnTypeDescription
bizon_idstrUnique record identifier
bizon_extracted_atdatetimeWhen record was extracted
bizon_loaded_atdatetimeWhen record is being loaded
source_datastrOriginal JSON data as string
source_timestampdatetimeTimestamp from source
destination_idstrTarget table identifier