Custom Sources
Create custom source connectors to extract data from any API or system not covered by built-in connectors.
Overview
Section titled “Overview”A custom source consists of:
- A config class extending
SourceConfig - A source class implementing
AbstractSource
Quick Start
Section titled “Quick Start”-
Create the source file
Create
my_source.py:from typing import List, Optional, Tuple, Typefrom requests.auth import AuthBasefrom bizon.source.source import AbstractSourcefrom bizon.source.config import SourceConfigfrom bizon.source.models import SourceIteration, SourceRecordfrom bizon.source.auth.builder import AuthBuilderfrom pydantic import Fieldclass MySourceConfig(SourceConfig):"""Configuration for MySource."""api_url: str = Field(..., description="API base URL")page_size: int = Field(100, description="Records per page")class MySource(AbstractSource):@staticmethoddef streams() -> List[str]:return ["users", "orders"]@staticmethoddef get_config_class() -> Type[SourceConfig]:return MySourceConfigdef get_authenticator(self) -> Optional[AuthBase]:if self.config.authentication:return AuthBuilder.from_config(self.config.authentication)return Nonedef check_connection(self) -> Tuple[bool, Optional[str]]:try:response = self.session.get(f"{self.config.api_url}/health")return response.ok, Noneexcept Exception as e:return False, str(e)def get_total_records_count(self) -> Optional[int]:return None # Unknowndef get(self, pagination: dict = None) -> SourceIteration:page = pagination.get("page", 1) if pagination else 1response = self.session.get(f"{self.config.api_url}/{self.config.stream}",params={"page": page, "limit": self.config.page_size})data = response.json()records = [SourceRecord(id=str(item["id"]), data=item)for item in data["items"]]has_more = len(records) == self.config.page_sizenext_pagination = {"page": page + 1} if has_more else {}return SourceIteration(records=records, next_pagination=next_pagination) -
Create the config file
Create
config.yml:name: my-custom-pipelinesource:name: my_sourcestream: usersapi_url: https://api.example.compage_size: 100authentication:type: api_keyparams:token: BIZON_ENV_API_TOKENdestination:name: loggerconfig: {} -
Run the pipeline
Terminal window bizon run config.yml --custom-source ./my_source.py
Config Class
Section titled “Config Class”Define source-specific configuration by extending SourceConfig:
from pydantic import Fieldfrom bizon.source.config import SourceConfig
class MySourceConfig(SourceConfig): # Required fields api_url: str = Field(..., description="API base URL")
# Optional fields with defaults page_size: int = Field(100, description="Records per page") timeout: int = Field(30, description="Request timeout in seconds") include_deleted: bool = Field(False, description="Include deleted records")Inherited Fields
Section titled “Inherited Fields”Your config automatically includes these base fields:
| Field | Type | Default | Description |
|---|---|---|---|
name | string | Required | Source identifier |
stream | string | Required | Stream to sync |
sync_mode | enum | full_refresh | Sync mode |
authentication | object | None | Auth config |
force_ignore_checkpoint | bool | false | Reset checkpoints |
max_iterations | int | None | Limit iterations |
Source Class
Section titled “Source Class”Implement AbstractSource with required methods:
from bizon.source.source import AbstractSource
class MySource(AbstractSource):
@staticmethod def streams() -> List[str]: """Return available stream names.""" return ["users", "orders", "products"]
@staticmethod def get_config_class() -> Type[SourceConfig]: """Return the config class.""" return MySourceConfig
def get_authenticator(self) -> Optional[AuthBase]: """Return authenticator for the session.""" if self.config.authentication: return AuthBuilder.from_config(self.config.authentication) return None
def check_connection(self) -> Tuple[bool, Optional[str]]: """Test connectivity. Return (success, error_message).""" try: response = self.session.get(f"{self.config.api_url}/ping") return response.ok, None except Exception as e: return False, str(e)
def get_total_records_count(self) -> Optional[int]: """Return total record count if known.""" response = self.session.get(f"{self.config.api_url}/count") return response.json().get("total")
def get(self, pagination: dict = None) -> SourceIteration: """Fetch next batch of records.""" # Implementation here passPagination Patterns
Section titled “Pagination Patterns”Cursor-Based
Section titled “Cursor-Based”def get(self, pagination: dict = None) -> SourceIteration: cursor = pagination.get("cursor") if pagination else None
params = {"limit": self.config.page_size} if cursor: params["cursor"] = cursor
response = self.session.get( f"{self.config.api_url}/{self.config.stream}", params=params ) data = response.json()
records = [ SourceRecord(id=str(item["id"]), data=item) for item in data["items"] ]
next_cursor = data.get("next_cursor") return SourceIteration( records=records, next_pagination={"cursor": next_cursor} if next_cursor else {} )Offset-Based
Section titled “Offset-Based”def get(self, pagination: dict = None) -> SourceIteration: offset = pagination.get("offset", 0) if pagination else 0
response = self.session.get( f"{self.config.api_url}/{self.config.stream}", params={"offset": offset, "limit": self.config.page_size} ) data = response.json()
records = [ SourceRecord(id=str(item["id"]), data=item) for item in data["items"] ]
if len(records) < self.config.page_size: return SourceIteration(records=records, next_pagination={})
return SourceIteration( records=records, next_pagination={"offset": offset + self.config.page_size} )Page-Based
Section titled “Page-Based”def get(self, pagination: dict = None) -> SourceIteration: page = pagination.get("page", 1) if pagination else 1
response = self.session.get( f"{self.config.api_url}/{self.config.stream}", params={"page": page, "per_page": self.config.page_size} ) data = response.json()
records = [ SourceRecord(id=str(item["id"]), data=item) for item in data["items"] ]
total_pages = data.get("total_pages", 1) if page >= total_pages: return SourceIteration(records=records, next_pagination={})
return SourceIteration( records=records, next_pagination={"page": page + 1} )Incremental Sync
Section titled “Incremental Sync”Support incremental mode by implementing get_records_after:
from bizon.source.models import SourceIncrementalState
class MySource(AbstractSource):
def get_records_after( self, source_state: SourceIncrementalState, pagination: dict = None ) -> SourceIteration: """Fetch records updated after last sync."""
cursor = pagination.get("cursor") if pagination else None since = source_state.last_run.isoformat()
params = { "updated_since": since, "limit": self.config.page_size } if cursor: params["cursor"] = cursor
response = self.session.get( f"{self.config.api_url}/{self.config.stream}", params=params ) data = response.json()
records = [ SourceRecord(id=str(item["id"]), data=item) for item in data["items"] ]
next_cursor = data.get("next_cursor") return SourceIteration( records=records, next_pagination={"cursor": next_cursor} if next_cursor else {} )Then use incremental mode:
source: name: my_source stream: users sync_mode: incrementalMulti-Table Routing
Section titled “Multi-Table Routing”Route records to different destinations:
def get(self, pagination: dict = None) -> SourceIteration: # Fetch data...
records = [] for item in data["items"]: record = SourceRecord( id=str(item["id"]), data=item, destination_id=f"project.dataset.{item['type']}_events" ) records.append(record)
return SourceIteration(records=records, next_pagination={})Authentication
Section titled “Authentication”Use Bizon’s built-in authenticators:
from bizon.source.auth.builder import AuthBuilder
def get_authenticator(self) -> Optional[AuthBase]: if self.config.authentication: return AuthBuilder.from_config(self.config.authentication) return NoneOr create a custom authenticator:
from requests.auth import AuthBase
class CustomAuth(AuthBase): def __init__(self, api_key: str, secret: str): self.api_key = api_key self.secret = secret
def __call__(self, request): signature = self._sign(request) request.headers["X-API-Key"] = self.api_key request.headers["X-Signature"] = signature return request
def _sign(self, request): # Custom signing logic pass
class MySource(AbstractSource): def get_authenticator(self) -> Optional[AuthBase]: return CustomAuth( api_key=self.config.api_key, secret=self.config.api_secret )Error Handling
Section titled “Error Handling”Handle errors gracefully:
def get(self, pagination: dict = None) -> SourceIteration: try: response = self.session.get( f"{self.config.api_url}/{self.config.stream}", timeout=self.config.timeout ) response.raise_for_status() data = response.json()
except requests.exceptions.Timeout: raise RuntimeError(f"Request timed out after {self.config.timeout}s")
except requests.exceptions.HTTPError as e: if response.status_code == 429: # Rate limited - Bizon will retry raise RuntimeError("Rate limited, will retry") raise RuntimeError(f"API error: {e}")
except Exception as e: raise RuntimeError(f"Failed to fetch data: {e}")
# Process data...Testing
Section titled “Testing”Test your source with the logger destination:
source: name: my_source stream: users api_url: https://api.example.com max_iterations: 3 # Limit for testing
destination: name: logger config: {}bizon run config.yml --custom-source ./my_source.py --log-level DEBUGComplete Example
Section titled “Complete Example”Full REST API source implementation:
from datetime import datetimefrom typing import List, Optional, Tuple, Typefrom requests.auth import AuthBasefrom pydantic import Fieldfrom pytz import UTC
from bizon.source.source import AbstractSourcefrom bizon.source.config import SourceConfigfrom bizon.source.models import ( SourceIteration, SourceRecord, SourceIncrementalState)from bizon.source.auth.builder import AuthBuilder
class RestApiSourceConfig(SourceConfig): """Configuration for REST API Source.""" base_url: str = Field(..., description="API base URL") page_size: int = Field(100, description="Records per page") timeout: int = Field(30, description="Request timeout")
class RestApiSource(AbstractSource): """Generic REST API source connector."""
@staticmethod def streams() -> List[str]: return ["users", "orders", "products"]
@staticmethod def get_config_class() -> Type[SourceConfig]: return RestApiSourceConfig
def get_authenticator(self) -> Optional[AuthBase]: if self.config.authentication: return AuthBuilder.from_config(self.config.authentication) return None
def check_connection(self) -> Tuple[bool, Optional[str]]: try: response = self.session.get( f"{self.config.base_url}/health", timeout=self.config.timeout ) return response.ok, None except Exception as e: return False, str(e)
def get_total_records_count(self) -> Optional[int]: try: response = self.session.get( f"{self.config.base_url}/{self.config.stream}/count" ) return response.json().get("total") except: return None
def get(self, pagination: dict = None) -> SourceIteration: cursor = pagination.get("cursor") if pagination else None
params = {"limit": self.config.page_size} if cursor: params["after"] = cursor
response = self.session.get( f"{self.config.base_url}/{self.config.stream}", params=params, timeout=self.config.timeout ) response.raise_for_status() data = response.json()
records = [ SourceRecord( id=str(item["id"]), data=item, timestamp=datetime.fromisoformat( item.get("updated_at", datetime.now(UTC).isoformat()) ) ) for item in data.get("data", []) ]
next_cursor = data.get("pagination", {}).get("next_cursor") return SourceIteration( records=records, next_pagination={"cursor": next_cursor} if next_cursor else {} )
def get_records_after( self, source_state: SourceIncrementalState, pagination: dict = None ) -> SourceIteration: cursor = pagination.get("cursor") if pagination else None
params = { "limit": self.config.page_size, "updated_since": source_state.last_run.isoformat() } if cursor: params["after"] = cursor
response = self.session.get( f"{self.config.base_url}/{self.config.stream}", params=params, timeout=self.config.timeout ) response.raise_for_status() data = response.json()
records = [ SourceRecord( id=str(item["id"]), data=item, timestamp=datetime.fromisoformat(item["updated_at"]) ) for item in data.get("data", []) ]
next_cursor = data.get("pagination", {}).get("next_cursor") return SourceIteration( records=records, next_pagination={"cursor": next_cursor} if next_cursor else {} )Next Steps
Section titled “Next Steps”- Custom Source Tutorial - Step-by-step guide
- Sources Overview - AbstractSource API reference
- Authentication - Auth configuration