Skip to content

Custom Sources

Create custom source connectors to extract data from any API or system not covered by built-in connectors.

A custom source consists of:

  1. A config class extending SourceConfig
  2. A source class implementing AbstractSource
  1. Create the source file

    Create my_source.py:

    from typing import List, Optional, Tuple, Type
    from requests.auth import AuthBase
    from bizon.source.source import AbstractSource
    from bizon.source.config import SourceConfig
    from bizon.source.models import SourceIteration, SourceRecord
    from bizon.source.auth.builder import AuthBuilder
    from pydantic import Field
    class MySourceConfig(SourceConfig):
    """Configuration for MySource."""
    api_url: str = Field(..., description="API base URL")
    page_size: int = Field(100, description="Records per page")
    class MySource(AbstractSource):
    @staticmethod
    def streams() -> List[str]:
    return ["users", "orders"]
    @staticmethod
    def get_config_class() -> Type[SourceConfig]:
    return MySourceConfig
    def get_authenticator(self) -> Optional[AuthBase]:
    if self.config.authentication:
    return AuthBuilder.from_config(self.config.authentication)
    return None
    def check_connection(self) -> Tuple[bool, Optional[str]]:
    try:
    response = self.session.get(f"{self.config.api_url}/health")
    return response.ok, None
    except Exception as e:
    return False, str(e)
    def get_total_records_count(self) -> Optional[int]:
    return None # Unknown
    def get(self, pagination: dict = None) -> SourceIteration:
    page = pagination.get("page", 1) if pagination else 1
    response = self.session.get(
    f"{self.config.api_url}/{self.config.stream}",
    params={"page": page, "limit": self.config.page_size}
    )
    data = response.json()
    records = [
    SourceRecord(id=str(item["id"]), data=item)
    for item in data["items"]
    ]
    has_more = len(records) == self.config.page_size
    next_pagination = {"page": page + 1} if has_more else {}
    return SourceIteration(records=records, next_pagination=next_pagination)
  2. Create the config file

    Create config.yml:

    name: my-custom-pipeline
    source:
    name: my_source
    stream: users
    api_url: https://api.example.com
    page_size: 100
    authentication:
    type: api_key
    params:
    token: BIZON_ENV_API_TOKEN
    destination:
    name: logger
    config: {}
  3. Run the pipeline

    Terminal window
    bizon run config.yml --custom-source ./my_source.py

Define source-specific configuration by extending SourceConfig:

from pydantic import Field
from bizon.source.config import SourceConfig
class MySourceConfig(SourceConfig):
# Required fields
api_url: str = Field(..., description="API base URL")
# Optional fields with defaults
page_size: int = Field(100, description="Records per page")
timeout: int = Field(30, description="Request timeout in seconds")
include_deleted: bool = Field(False, description="Include deleted records")

Your config automatically includes these base fields:

FieldTypeDefaultDescription
namestringRequiredSource identifier
streamstringRequiredStream to sync
sync_modeenumfull_refreshSync mode
authenticationobjectNoneAuth config
force_ignore_checkpointboolfalseReset checkpoints
max_iterationsintNoneLimit iterations

Implement AbstractSource with required methods:

from bizon.source.source import AbstractSource
class MySource(AbstractSource):
@staticmethod
def streams() -> List[str]:
"""Return available stream names."""
return ["users", "orders", "products"]
@staticmethod
def get_config_class() -> Type[SourceConfig]:
"""Return the config class."""
return MySourceConfig
def get_authenticator(self) -> Optional[AuthBase]:
"""Return authenticator for the session."""
if self.config.authentication:
return AuthBuilder.from_config(self.config.authentication)
return None
def check_connection(self) -> Tuple[bool, Optional[str]]:
"""Test connectivity. Return (success, error_message)."""
try:
response = self.session.get(f"{self.config.api_url}/ping")
return response.ok, None
except Exception as e:
return False, str(e)
def get_total_records_count(self) -> Optional[int]:
"""Return total record count if known."""
response = self.session.get(f"{self.config.api_url}/count")
return response.json().get("total")
def get(self, pagination: dict = None) -> SourceIteration:
"""Fetch next batch of records."""
# Implementation here
pass
def get(self, pagination: dict = None) -> SourceIteration:
cursor = pagination.get("cursor") if pagination else None
params = {"limit": self.config.page_size}
if cursor:
params["cursor"] = cursor
response = self.session.get(
f"{self.config.api_url}/{self.config.stream}",
params=params
)
data = response.json()
records = [
SourceRecord(id=str(item["id"]), data=item)
for item in data["items"]
]
next_cursor = data.get("next_cursor")
return SourceIteration(
records=records,
next_pagination={"cursor": next_cursor} if next_cursor else {}
)
def get(self, pagination: dict = None) -> SourceIteration:
offset = pagination.get("offset", 0) if pagination else 0
response = self.session.get(
f"{self.config.api_url}/{self.config.stream}",
params={"offset": offset, "limit": self.config.page_size}
)
data = response.json()
records = [
SourceRecord(id=str(item["id"]), data=item)
for item in data["items"]
]
if len(records) < self.config.page_size:
return SourceIteration(records=records, next_pagination={})
return SourceIteration(
records=records,
next_pagination={"offset": offset + self.config.page_size}
)
def get(self, pagination: dict = None) -> SourceIteration:
page = pagination.get("page", 1) if pagination else 1
response = self.session.get(
f"{self.config.api_url}/{self.config.stream}",
params={"page": page, "per_page": self.config.page_size}
)
data = response.json()
records = [
SourceRecord(id=str(item["id"]), data=item)
for item in data["items"]
]
total_pages = data.get("total_pages", 1)
if page >= total_pages:
return SourceIteration(records=records, next_pagination={})
return SourceIteration(
records=records,
next_pagination={"page": page + 1}
)

Support incremental mode by implementing get_records_after:

from bizon.source.models import SourceIncrementalState
class MySource(AbstractSource):
def get_records_after(
self,
source_state: SourceIncrementalState,
pagination: dict = None
) -> SourceIteration:
"""Fetch records updated after last sync."""
cursor = pagination.get("cursor") if pagination else None
since = source_state.last_run.isoformat()
params = {
"updated_since": since,
"limit": self.config.page_size
}
if cursor:
params["cursor"] = cursor
response = self.session.get(
f"{self.config.api_url}/{self.config.stream}",
params=params
)
data = response.json()
records = [
SourceRecord(id=str(item["id"]), data=item)
for item in data["items"]
]
next_cursor = data.get("next_cursor")
return SourceIteration(
records=records,
next_pagination={"cursor": next_cursor} if next_cursor else {}
)

Then use incremental mode:

source:
name: my_source
stream: users
sync_mode: incremental

Route records to different destinations:

def get(self, pagination: dict = None) -> SourceIteration:
# Fetch data...
records = []
for item in data["items"]:
record = SourceRecord(
id=str(item["id"]),
data=item,
destination_id=f"project.dataset.{item['type']}_events"
)
records.append(record)
return SourceIteration(records=records, next_pagination={})

Use Bizon’s built-in authenticators:

from bizon.source.auth.builder import AuthBuilder
def get_authenticator(self) -> Optional[AuthBase]:
if self.config.authentication:
return AuthBuilder.from_config(self.config.authentication)
return None

Or create a custom authenticator:

from requests.auth import AuthBase
class CustomAuth(AuthBase):
def __init__(self, api_key: str, secret: str):
self.api_key = api_key
self.secret = secret
def __call__(self, request):
signature = self._sign(request)
request.headers["X-API-Key"] = self.api_key
request.headers["X-Signature"] = signature
return request
def _sign(self, request):
# Custom signing logic
pass
class MySource(AbstractSource):
def get_authenticator(self) -> Optional[AuthBase]:
return CustomAuth(
api_key=self.config.api_key,
secret=self.config.api_secret
)

Handle errors gracefully:

def get(self, pagination: dict = None) -> SourceIteration:
try:
response = self.session.get(
f"{self.config.api_url}/{self.config.stream}",
timeout=self.config.timeout
)
response.raise_for_status()
data = response.json()
except requests.exceptions.Timeout:
raise RuntimeError(f"Request timed out after {self.config.timeout}s")
except requests.exceptions.HTTPError as e:
if response.status_code == 429:
# Rate limited - Bizon will retry
raise RuntimeError("Rate limited, will retry")
raise RuntimeError(f"API error: {e}")
except Exception as e:
raise RuntimeError(f"Failed to fetch data: {e}")
# Process data...

Test your source with the logger destination:

source:
name: my_source
stream: users
api_url: https://api.example.com
max_iterations: 3 # Limit for testing
destination:
name: logger
config: {}
Terminal window
bizon run config.yml --custom-source ./my_source.py --log-level DEBUG

Full REST API source implementation:

from datetime import datetime
from typing import List, Optional, Tuple, Type
from requests.auth import AuthBase
from pydantic import Field
from pytz import UTC
from bizon.source.source import AbstractSource
from bizon.source.config import SourceConfig
from bizon.source.models import (
SourceIteration,
SourceRecord,
SourceIncrementalState
)
from bizon.source.auth.builder import AuthBuilder
class RestApiSourceConfig(SourceConfig):
"""Configuration for REST API Source."""
base_url: str = Field(..., description="API base URL")
page_size: int = Field(100, description="Records per page")
timeout: int = Field(30, description="Request timeout")
class RestApiSource(AbstractSource):
"""Generic REST API source connector."""
@staticmethod
def streams() -> List[str]:
return ["users", "orders", "products"]
@staticmethod
def get_config_class() -> Type[SourceConfig]:
return RestApiSourceConfig
def get_authenticator(self) -> Optional[AuthBase]:
if self.config.authentication:
return AuthBuilder.from_config(self.config.authentication)
return None
def check_connection(self) -> Tuple[bool, Optional[str]]:
try:
response = self.session.get(
f"{self.config.base_url}/health",
timeout=self.config.timeout
)
return response.ok, None
except Exception as e:
return False, str(e)
def get_total_records_count(self) -> Optional[int]:
try:
response = self.session.get(
f"{self.config.base_url}/{self.config.stream}/count"
)
return response.json().get("total")
except:
return None
def get(self, pagination: dict = None) -> SourceIteration:
cursor = pagination.get("cursor") if pagination else None
params = {"limit": self.config.page_size}
if cursor:
params["after"] = cursor
response = self.session.get(
f"{self.config.base_url}/{self.config.stream}",
params=params,
timeout=self.config.timeout
)
response.raise_for_status()
data = response.json()
records = [
SourceRecord(
id=str(item["id"]),
data=item,
timestamp=datetime.fromisoformat(
item.get("updated_at", datetime.now(UTC).isoformat())
)
)
for item in data.get("data", [])
]
next_cursor = data.get("pagination", {}).get("next_cursor")
return SourceIteration(
records=records,
next_pagination={"cursor": next_cursor} if next_cursor else {}
)
def get_records_after(
self,
source_state: SourceIncrementalState,
pagination: dict = None
) -> SourceIteration:
cursor = pagination.get("cursor") if pagination else None
params = {
"limit": self.config.page_size,
"updated_since": source_state.last_run.isoformat()
}
if cursor:
params["after"] = cursor
response = self.session.get(
f"{self.config.base_url}/{self.config.stream}",
params=params,
timeout=self.config.timeout
)
response.raise_for_status()
data = response.json()
records = [
SourceRecord(
id=str(item["id"]),
data=item,
timestamp=datetime.fromisoformat(item["updated_at"])
)
for item in data.get("data", [])
]
next_cursor = data.get("pagination", {}).get("next_cursor")
return SourceIteration(
records=records,
next_pagination={"cursor": next_cursor} if next_cursor else {}
)