Sources Overview
Sources extract data from external systems like APIs, databases, and message queues. Bizon provides built-in sources and an extensible API for creating custom connectors.
Built-in Sources
Section titled “Built-in Sources”| Source | Type | Incremental | Installation |
|---|---|---|---|
dummy | Test data | No | Built-in |
kafka | Message queue | N/A (Stream) | pip install bizon[kafka] |
hubspot | REST API | Yes | pip install bizon[hubspot] |
notion | REST API | No | pip install bizon[notion] |
gsheets | REST API | No | pip install bizon[gsheets] |
pokeapi | REST API | No | Built-in |
Check available sources with:
bizon source listSource Configuration
Section titled “Source Configuration”Every source requires these base configuration fields:
source: name: hubspot # Source connector name stream: contacts # Stream to sync sync_mode: full_refresh # full_refresh, incremental, or stream authentication: # Auth config (varies by source) type: api_key params: token: your-tokenBase Configuration Fields
Section titled “Base Configuration Fields”| Field | Type | Required | Default | Description |
|---|---|---|---|---|
name | string | Yes | - | Source connector identifier |
stream | string | Yes | - | Stream name to sync |
sync_mode | enum | No | full_refresh | Sync mode: full_refresh, incremental, stream |
authentication | object | Varies | - | Authentication configuration |
source_file_path | string | No | - | Path to custom source file |
force_ignore_checkpoint | bool | No | false | Reset and ignore existing checkpoints |
max_iterations | int | No | - | Limit iterations (for testing) |
api_config.retry_limit | int | No | 10 | Max API retry attempts |
AbstractSource API
Section titled “AbstractSource API”Implement AbstractSource to create custom source connectors.
from bizon.source.source import AbstractSourcefrom bizon.source.config import SourceConfigfrom bizon.source.models import SourceIteration, SourceRecord
class MySource(AbstractSource):
@staticmethod def streams() -> list[str]: """Return available stream names.""" return ["users", "orders"]
@staticmethod def get_config_class() -> type[SourceConfig]: """Return the config class for this source.""" return MySourceConfig
def get_authenticator(self): """Return a requests.auth.AuthBase instance or None.""" return None
def check_connection(self) -> tuple[bool, any]: """Test the connection. Return (success, error_or_none).""" return True, None
def get_total_records_count(self) -> int | None: """Return total record count if known, else None.""" return None
def get(self, pagination: dict = None) -> SourceIteration: """Fetch the next batch of records.""" records = [ SourceRecord(id="1", data={"name": "Alice"}), SourceRecord(id="2", data={"name": "Bob"}), ] return SourceIteration( records=records, next_pagination={} # Empty dict = no more data )Required Methods
Section titled “Required Methods”| Method | Returns | Description |
|---|---|---|
streams() | list[str] | Static method returning available stream names |
get_config_class() | type[SourceConfig] | Static method returning config class |
get_authenticator() | AuthBase | None | Return authenticator for requests session |
check_connection() | tuple[bool, any] | Test connectivity |
get_total_records_count() | int | None | Total records available (for progress) |
get(pagination) | SourceIteration | Fetch next batch of records |
Optional Methods
Section titled “Optional Methods”| Method | Returns | Description |
|---|---|---|
get_records_after(state, pagination) | SourceIteration | Incremental fetch (for incremental mode) |
commit() | None | Commit progress to source (for Kafka offsets) |
set_streams_config(streams) | None | Accept stream routing configuration |
Data Models
Section titled “Data Models”SourceRecord
Section titled “SourceRecord”Individual record extracted from the source:
class SourceRecord: id: str # Unique identifier data: dict # JSON payload timestamp: datetime # Extraction timestamp (default: now) destination_id: str # Target table (for multi-table routing)SourceIteration
Section titled “SourceIteration”Batch of records with pagination state:
class SourceIteration: records: list[SourceRecord] # Records in this batch next_pagination: dict # State for next iterationPagination behavior:
- Return an empty
{}dict to signal no more data - Return pagination state (cursor, offset, page) to continue fetching
SourceIncrementalState
Section titled “SourceIncrementalState”State for incremental syncs:
class SourceIncrementalState: last_run: datetime # Timestamp of last successful sync state: dict # Custom state from previous syncExample: Pagination Patterns
Section titled “Example: Pagination Patterns”Cursor-based
Section titled “Cursor-based”def get(self, pagination: dict = None) -> SourceIteration: cursor = pagination.get("cursor") if pagination else None
response = self.session.get( "https://api.example.com/users", params={"cursor": cursor} ) data = response.json()
records = [ SourceRecord(id=str(u["id"]), data=u) for u in data["users"] ]
next_cursor = data.get("next_cursor") return SourceIteration( records=records, next_pagination={"cursor": next_cursor} if next_cursor else {} )Offset-based
Section titled “Offset-based”def get(self, pagination: dict = None) -> SourceIteration: offset = pagination.get("offset", 0) if pagination else 0 limit = 100
response = self.session.get( "https://api.example.com/users", params={"offset": offset, "limit": limit} ) data = response.json()
records = [SourceRecord(id=str(u["id"]), data=u) for u in data]
if len(records) < limit: return SourceIteration(records=records, next_pagination={})
return SourceIteration( records=records, next_pagination={"offset": offset + limit} )Next Steps
Section titled “Next Steps”- Kafka Source - Stream from Kafka topics
- HubSpot Source - Sync CRM data
- Custom Sources - Build your own connector
- Authentication - Configure auth methods