Skip to content

Sources Overview

Sources extract data from external systems like APIs, databases, and message queues. Bizon provides built-in sources and an extensible API for creating custom connectors.

SourceTypeIncrementalInstallation
dummyTest dataNoBuilt-in
kafkaMessage queueN/A (Stream)pip install bizon[kafka]
hubspotREST APIYespip install bizon[hubspot]
notionREST APINopip install bizon[notion]
gsheetsREST APINopip install bizon[gsheets]
pokeapiREST APINoBuilt-in

Check available sources with:

Terminal window
bizon source list

Every source requires these base configuration fields:

source:
name: hubspot # Source connector name
stream: contacts # Stream to sync
sync_mode: full_refresh # full_refresh, incremental, or stream
authentication: # Auth config (varies by source)
type: api_key
params:
token: your-token
FieldTypeRequiredDefaultDescription
namestringYes-Source connector identifier
streamstringYes-Stream name to sync
sync_modeenumNofull_refreshSync mode: full_refresh, incremental, stream
authenticationobjectVaries-Authentication configuration
source_file_pathstringNo-Path to custom source file
force_ignore_checkpointboolNofalseReset and ignore existing checkpoints
max_iterationsintNo-Limit iterations (for testing)
api_config.retry_limitintNo10Max API retry attempts

Implement AbstractSource to create custom source connectors.

from bizon.source.source import AbstractSource
from bizon.source.config import SourceConfig
from bizon.source.models import SourceIteration, SourceRecord
class MySource(AbstractSource):
@staticmethod
def streams() -> list[str]:
"""Return available stream names."""
return ["users", "orders"]
@staticmethod
def get_config_class() -> type[SourceConfig]:
"""Return the config class for this source."""
return MySourceConfig
def get_authenticator(self):
"""Return a requests.auth.AuthBase instance or None."""
return None
def check_connection(self) -> tuple[bool, any]:
"""Test the connection. Return (success, error_or_none)."""
return True, None
def get_total_records_count(self) -> int | None:
"""Return total record count if known, else None."""
return None
def get(self, pagination: dict = None) -> SourceIteration:
"""Fetch the next batch of records."""
records = [
SourceRecord(id="1", data={"name": "Alice"}),
SourceRecord(id="2", data={"name": "Bob"}),
]
return SourceIteration(
records=records,
next_pagination={} # Empty dict = no more data
)
MethodReturnsDescription
streams()list[str]Static method returning available stream names
get_config_class()type[SourceConfig]Static method returning config class
get_authenticator()AuthBase | NoneReturn authenticator for requests session
check_connection()tuple[bool, any]Test connectivity
get_total_records_count()int | NoneTotal records available (for progress)
get(pagination)SourceIterationFetch next batch of records
MethodReturnsDescription
get_records_after(state, pagination)SourceIterationIncremental fetch (for incremental mode)
commit()NoneCommit progress to source (for Kafka offsets)
set_streams_config(streams)NoneAccept stream routing configuration

Individual record extracted from the source:

class SourceRecord:
id: str # Unique identifier
data: dict # JSON payload
timestamp: datetime # Extraction timestamp (default: now)
destination_id: str # Target table (for multi-table routing)

Batch of records with pagination state:

class SourceIteration:
records: list[SourceRecord] # Records in this batch
next_pagination: dict # State for next iteration

Pagination behavior:

  • Return an empty {} dict to signal no more data
  • Return pagination state (cursor, offset, page) to continue fetching

State for incremental syncs:

class SourceIncrementalState:
last_run: datetime # Timestamp of last successful sync
state: dict # Custom state from previous sync
def get(self, pagination: dict = None) -> SourceIteration:
cursor = pagination.get("cursor") if pagination else None
response = self.session.get(
"https://api.example.com/users",
params={"cursor": cursor}
)
data = response.json()
records = [
SourceRecord(id=str(u["id"]), data=u)
for u in data["users"]
]
next_cursor = data.get("next_cursor")
return SourceIteration(
records=records,
next_pagination={"cursor": next_cursor} if next_cursor else {}
)
def get(self, pagination: dict = None) -> SourceIteration:
offset = pagination.get("offset", 0) if pagination else 0
limit = 100
response = self.session.get(
"https://api.example.com/users",
params={"offset": offset, "limit": limit}
)
data = response.json()
records = [SourceRecord(id=str(u["id"]), data=u) for u in data]
if len(records) < limit:
return SourceIteration(records=records, next_pagination={})
return SourceIteration(
records=records,
next_pagination={"offset": offset + limit}
)