Destinations Overview
Destinations receive extracted data and load it into target systems like data warehouses, files, or logging systems. Bizon buffers records for efficient batch writes.
Built-in Destinations
Section titled “Built-in Destinations”| Destination | Type | Use Case | Installation |
|---|---|---|---|
bigquery | Batch Load | Large batch loads via GCS | pip install bizon[bigquery] |
bigquery_streaming | Real-time | Low-latency streaming | pip install bizon[bigquery] |
bigquery_streaming_v2 | Real-time | High-throughput streaming | pip install bizon[bigquery] |
file | Local File | Testing, local storage | Built-in |
logger | Console | Debugging, testing | Built-in |
Destination Configuration
Section titled “Destination Configuration”destination: name: bigquery config: project_id: my-project dataset_id: my_dataset dataset_location: US buffer_size: 50 # Buffer size in MB buffer_flush_timeout: 600 # Max buffer age in secondsBase Configuration Fields
Section titled “Base Configuration Fields”All destinations inherit these options:
| Field | Type | Default | Description |
|---|---|---|---|
buffer_size | int | 50 | Buffer size in MB before flushing. Set to 0 for immediate writes. |
buffer_flush_timeout | int | 600 | Max seconds to buffer before force-flushing |
max_concurrent_threads | int | 10 | Parallel write threads |
unnest | bool | false | Flatten nested JSON to columns |
record_schemas | list | - | Schema definitions (required if unnest: true) |
destination_id | string | - | Default destination table identifier |
Buffer Behavior
Section titled “Buffer Behavior”Bizon buffers records in memory before writing to optimize throughput:
┌─────────────────────────────────────────────────────────────┐│ Buffer Strategy │├─────────────────────────────────────────────────────────────┤│ ││ Records flow in → Buffer accumulates → Flush on trigger ││ ││ Flush triggers: ││ 1. Buffer reaches size limit (buffer_size MB) ││ 2. Buffer age exceeds timeout (buffer_flush_timeout sec) ││ 3. Source indicates last iteration ││ │└─────────────────────────────────────────────────────────────┘Disable buffering for streaming workloads:
destination: name: bigquery_streaming_v2 config: buffer_size: 0 # Write immediatelySchema Configuration
Section titled “Schema Configuration”For structured writes, define schemas with record_schemas:
destination: name: bigquery_streaming_v2 config: project_id: my-project dataset_id: analytics unnest: true record_schemas: - destination_id: my-project.analytics.users record_schema: - name: user_id type: STRING mode: REQUIRED - name: email type: STRING mode: NULLABLE - name: created_at type: TIMESTAMP mode: REQUIRED clustering_keys: - user_idSchema Fields
Section titled “Schema Fields”| Field | Type | Description |
|---|---|---|
destination_id | string | Full table identifier (project.dataset.table) |
record_schema | list | Column definitions |
clustering_keys | list | Columns to cluster by (BigQuery) |
Column Definition
Section titled “Column Definition”| Field | Type | Description |
|---|---|---|
name | string | Column name |
type | string | Data type (STRING, INTEGER, TIMESTAMP, JSON, etc.) |
mode | string | NULLABLE, REQUIRED, or REPEATED |
description | string | Optional column description |
default_value_expression | string | Default value SQL expression |
Multi-Table Routing
Section titled “Multi-Table Routing”Route records to different tables using destination_id on each record:
source: name: kafka stream: topic topics: - name: user-events destination_id: project.dataset.user_events - name: system-events destination_id: project.dataset.system_events
destination: name: bigquery_streaming_v2 config: project_id: my-project dataset_id: analytics record_schemas: - destination_id: project.dataset.user_events record_schema: [...] - destination_id: project.dataset.system_events record_schema: [...]AbstractDestination API
Section titled “AbstractDestination API”Implement AbstractDestination to create custom destinations.
from bizon.destination.destination import AbstractDestinationimport polars as pl
class MyDestination(AbstractDestination):
def check_connection(self) -> bool: """Test connectivity to the destination.""" return True
def write_records( self, df_destination_records: pl.DataFrame ) -> tuple[bool, str | None]: """ Write a batch of records.
Returns: (success, error_message) """ try: # df_destination_records contains columns: # - bizon_id: str # - bizon_extracted_at: datetime # - bizon_loaded_at: datetime # - source_data: str (JSON) # - source_timestamp: datetime # - destination_id: str
for row in df_destination_records.iter_rows(named=True): self.write_row(row)
return True, None except Exception as e: return False, str(e)
def finalize(self) -> bool: """Optional: Post-write operations (e.g., temp to main table).""" return TrueRequired Methods
Section titled “Required Methods”| Method | Returns | Description |
|---|---|---|
check_connection() | bool | Test destination connectivity |
write_records(df) | tuple[bool, str|None] | Write batch, return (success, error) |
Optional Methods
Section titled “Optional Methods”| Method | Returns | Description |
|---|---|---|
finalize() | bool | Post-sync cleanup operations |
Destination Record Schema
Section titled “Destination Record Schema”Records passed to write_records contain:
| Column | Type | Description |
|---|---|---|
bizon_id | str | Unique record identifier |
bizon_extracted_at | datetime | When record was extracted |
bizon_loaded_at | datetime | When record is being loaded |
source_data | str | Original JSON data as string |
source_timestamp | datetime | Timestamp from source |
destination_id | str | Target table identifier |
Next Steps
Section titled “Next Steps”- BigQuery Destinations - Load to BigQuery
- Configuration Reference - Full YAML options
- Sync Modes - Understand data flow patterns