Skip to content

Creating a Custom Source Connector

In this tutorial, you’ll build a complete source connector that extracts data from a REST API. We’ll use the PokéAPI as our example - a free, open API with data about Pokémon.

A source connector that:

  • Fetches Pokémon, types, and abilities from PokéAPI
  • Supports offset-based pagination
  • Works with Bizon’s sync modes
  • Can be used with any destination
  • Python 3.9+
  • Bizon installed: pip install bizon
  • Basic Python and YAML knowledge
  1. Create the project structure

    Create a new directory for your custom source:

    Terminal window
    mkdir pokeapi-source
    cd pokeapi-source

    Create these files:

    • source.py - The source connector code
    • config.yml - Pipeline configuration
  2. Define the config class

    Open source.py and add the configuration class:

    from typing import List, Optional, Tuple, Type
    from pydantic import Field
    from requests.auth import AuthBase
    from bizon.source.source import AbstractSource
    from bizon.source.config import SourceConfig
    from bizon.source.models import SourceIteration, SourceRecord
    class PokeAPIConfig(SourceConfig):
    """Configuration for PokéAPI source."""
    base_url: str = Field(
    default="https://pokeapi.co/api/v2",
    description="API base URL"
    )
    page_size: int = Field(
    default=20,
    description="Number of records per page (max 100)"
    )

    The config class:

    • Extends SourceConfig to inherit base fields (name, stream, sync_mode, etc.)
    • Adds source-specific fields using Pydantic’s Field
  3. Implement the source class skeleton

    Add the source class with required static methods:

    class PokeAPISource(AbstractSource):
    """Source connector for PokéAPI."""
    @staticmethod
    def streams() -> List[str]:
    """Return available stream names."""
    return ["pokemon", "type", "ability", "move", "item"]
    @staticmethod
    def get_config_class() -> Type[SourceConfig]:
    """Return the config class for this source."""
    return PokeAPIConfig
    def get_authenticator(self) -> Optional[AuthBase]:
    """Return authenticator (None for public APIs)."""
    return None
    def check_connection(self) -> Tuple[bool, Optional[str]]:
    """Test the API connection."""
    try:
    response = self.session.get(f"{self.config.base_url}/pokemon/1")
    if response.ok:
    return True, None
    return False, f"API returned status {response.status_code}"
    except Exception as e:
    return False, str(e)
    def get_total_records_count(self) -> Optional[int]:
    """Return total record count from API."""
    try:
    response = self.session.get(
    f"{self.config.base_url}/{self.config.stream}",
    params={"limit": 1}
    )
    return response.json().get("count")
    except:
    return None
  4. Implement the get method

    Add the main data extraction logic. PokéAPI returns a list of resources with URLs, so we’ll fetch the details for each:

    def get(self, pagination: dict = None) -> SourceIteration:
    """Fetch the next batch of records."""
    # Get current offset from pagination state
    offset = pagination.get("offset", 0) if pagination else 0
    # Fetch list of resources
    response = self.session.get(
    f"{self.config.base_url}/{self.config.stream}",
    params={
    "offset": offset,
    "limit": self.config.page_size
    }
    )
    response.raise_for_status()
    data = response.json()
    # Fetch details for each resource
    records = []
    for item in data.get("results", []):
    # Get the ID from the URL (e.g., .../pokemon/25/ -> 25)
    item_id = item["url"].rstrip("/").split("/")[-1]
    # Fetch full details
    detail_response = self.session.get(item["url"])
    if detail_response.ok:
    records.append(
    SourceRecord(
    id=item_id,
    data=detail_response.json()
    )
    )
    # Check if there are more pages
    if data.get("next"):
    next_pagination = {"offset": offset + self.config.page_size}
    else:
    next_pagination = {}
    return SourceIteration(
    records=records,
    next_pagination=next_pagination
    )

    Key points:

    • pagination contains state from the previous iteration
    • PokéAPI provides a next URL when more data is available
    • Return empty next_pagination={} when there’s no more data
    • Each record needs a unique id
  5. Create the complete source file

    Your complete source.py should look like:

    from typing import List, Optional, Tuple, Type
    from pydantic import Field
    from requests.auth import AuthBase
    from bizon.source.source import AbstractSource
    from bizon.source.config import SourceConfig
    from bizon.source.models import SourceIteration, SourceRecord
    class PokeAPIConfig(SourceConfig):
    """Configuration for PokéAPI source."""
    base_url: str = Field(
    default="https://pokeapi.co/api/v2",
    description="API base URL"
    )
    page_size: int = Field(
    default=20,
    description="Number of records per page (max 100)"
    )
    class PokeAPISource(AbstractSource):
    """Source connector for PokéAPI."""
    @staticmethod
    def streams() -> List[str]:
    return ["pokemon", "type", "ability", "move", "item"]
    @staticmethod
    def get_config_class() -> Type[SourceConfig]:
    return PokeAPIConfig
    def get_authenticator(self) -> Optional[AuthBase]:
    return None
    def check_connection(self) -> Tuple[bool, Optional[str]]:
    try:
    response = self.session.get(f"{self.config.base_url}/pokemon/1")
    if response.ok:
    return True, None
    return False, f"API returned status {response.status_code}"
    except Exception as e:
    return False, str(e)
    def get_total_records_count(self) -> Optional[int]:
    try:
    response = self.session.get(
    f"{self.config.base_url}/{self.config.stream}",
    params={"limit": 1}
    )
    return response.json().get("count")
    except:
    return None
    def get(self, pagination: dict = None) -> SourceIteration:
    offset = pagination.get("offset", 0) if pagination else 0
    response = self.session.get(
    f"{self.config.base_url}/{self.config.stream}",
    params={
    "offset": offset,
    "limit": self.config.page_size
    }
    )
    response.raise_for_status()
    data = response.json()
    records = []
    for item in data.get("results", []):
    item_id = item["url"].rstrip("/").split("/")[-1]
    detail_response = self.session.get(item["url"])
    if detail_response.ok:
    records.append(
    SourceRecord(
    id=item_id,
    data=detail_response.json()
    )
    )
    if data.get("next"):
    next_pagination = {"offset": offset + self.config.page_size}
    else:
    next_pagination = {}
    return SourceIteration(
    records=records,
    next_pagination=next_pagination
    )
  6. Create the pipeline configuration

    Create config.yml:

    name: pokeapi-pokemon
    source:
    name: pokeapi
    stream: pokemon
    page_size: 5
    destination:
    name: logger
    config: {}
  7. Run the pipeline

    Test your source:

    Terminal window
    bizon run config.yml --custom-source ./source.py --log-level DEBUG

    You should see Pokémon being fetched and logged:

    INFO - Fetching records from source...
    INFO - Retrieved 5 records in iteration 0
    INFO - Retrieved 5 records in iteration 1
    ...
    INFO - Pipeline finished successfully.
  8. Test different streams

    Update config.yml to sync Pokémon types:

    source:
    name: pokeapi
    stream: type
    page_size: 10

    Run again:

    Terminal window
    bizon run config.yml --custom-source ./source.py

For APIs that require authentication, implement get_authenticator:

from bizon.source.auth.builder import AuthBuilder
class MyAuthenticatedSource(AbstractSource):
def get_authenticator(self) -> Optional[AuthBase]:
if self.config.authentication:
return AuthBuilder.from_config(self.config.authentication)
return None

Then configure auth in YAML:

source:
name: my_source
stream: data
authentication:
type: api_key
params:
token: BIZON_ENV_API_TOKEN

Support incremental mode by implementing get_records_after:

from bizon.source.models import SourceIncrementalState
class MySource(AbstractSource):
def get_records_after(
self,
source_state: SourceIncrementalState,
pagination: dict = None
) -> SourceIteration:
"""Fetch records updated after last sync."""
offset = pagination.get("offset", 0) if pagination else 0
since = source_state.last_run.isoformat()
response = self.session.get(
f"{self.config.base_url}/{self.config.stream}",
params={
"updated_since": since,
"offset": offset,
"limit": self.config.page_size
}
)
# ... rest of implementation

Use with:

source:
name: my_source
stream: data
sync_mode: incremental

Update your config to load Pokémon data to BigQuery:

name: pokeapi-to-bigquery
source:
name: pokeapi
stream: pokemon
page_size: 50
destination:
name: bigquery
config:
project_id: my-project
dataset_id: raw_data
dataset_location: US
gcs_buffer_bucket: my-staging-bucket
transforms:
- label: extract-pokemon-data
python: |
# Extract key fields from the Pokemon data
pokemon = data
data = {
'id': pokemon.get('id'),
'name': pokemon.get('name'),
'height': pokemon.get('height'),
'weight': pokemon.get('weight'),
'base_experience': pokemon.get('base_experience'),
'types': [t['type']['name'] for t in pokemon.get('types', [])],
'abilities': [a['ability']['name'] for a in pokemon.get('abilities', [])]
}
from datetime import datetime
from typing import List, Optional, Tuple, Type
from pydantic import Field
from requests.auth import AuthBase
from pytz import UTC
from bizon.source.source import AbstractSource
from bizon.source.config import SourceConfig
from bizon.source.models import (
SourceIteration,
SourceRecord,
SourceIncrementalState
)
from bizon.source.auth.builder import AuthBuilder
class AdvancedAPIConfig(SourceConfig):
"""Configuration for Advanced API source."""
base_url: str = Field(..., description="API base URL")
page_size: int = Field(100, description="Records per page")
timeout: int = Field(30, description="Request timeout in seconds")
include_metadata: bool = Field(False, description="Include API metadata")
class AdvancedAPISource(AbstractSource):
"""Advanced API source with full feature support."""
@staticmethod
def streams() -> List[str]:
return ["users", "orders", "products", "events"]
@staticmethod
def get_config_class() -> Type[SourceConfig]:
return AdvancedAPIConfig
def get_authenticator(self) -> Optional[AuthBase]:
if self.config.authentication:
return AuthBuilder.from_config(self.config.authentication)
return None
def check_connection(self) -> Tuple[bool, Optional[str]]:
try:
response = self.session.get(
f"{self.config.base_url}/health",
timeout=self.config.timeout
)
return response.ok, None
except Exception as e:
return False, str(e)
def get_total_records_count(self) -> Optional[int]:
try:
response = self.session.get(
f"{self.config.base_url}/{self.config.stream}/count",
timeout=self.config.timeout
)
return response.json().get("total")
except:
return None
def get(self, pagination: dict = None) -> SourceIteration:
cursor = pagination.get("cursor") if pagination else None
params = {"limit": self.config.page_size}
if cursor:
params["after"] = cursor
response = self.session.get(
f"{self.config.base_url}/{self.config.stream}",
params=params,
timeout=self.config.timeout
)
response.raise_for_status()
data = response.json()
records = [
SourceRecord(
id=str(item["id"]),
data=item,
timestamp=datetime.fromisoformat(
item.get("updated_at", datetime.now(UTC).isoformat())
)
)
for item in data.get("items", [])
]
next_cursor = data.get("next_cursor")
return SourceIteration(
records=records,
next_pagination={"cursor": next_cursor} if next_cursor else {}
)
def get_records_after(
self,
source_state: SourceIncrementalState,
pagination: dict = None
) -> SourceIteration:
cursor = pagination.get("cursor") if pagination else None
params = {
"limit": self.config.page_size,
"updated_since": source_state.last_run.isoformat()
}
if cursor:
params["after"] = cursor
response = self.session.get(
f"{self.config.base_url}/{self.config.stream}",
params=params,
timeout=self.config.timeout
)
response.raise_for_status()
data = response.json()
records = [
SourceRecord(
id=str(item["id"]),
data=item,
timestamp=datetime.fromisoformat(item["updated_at"])
)
for item in data.get("items", [])
]
next_cursor = data.get("next_cursor")
return SourceIteration(
records=records,
next_pagination={"cursor": next_cursor} if next_cursor else {}
)