Creating a Custom Source Connector
In this tutorial, you’ll build a complete source connector that extracts data from a REST API. We’ll use the PokéAPI as our example - a free, open API with data about Pokémon.
What You’ll Build
Section titled “What You’ll Build”A source connector that:
- Fetches Pokémon, types, and abilities from PokéAPI
- Supports offset-based pagination
- Works with Bizon’s sync modes
- Can be used with any destination
Prerequisites
Section titled “Prerequisites”- Python 3.9+
- Bizon installed:
pip install bizon - Basic Python and YAML knowledge
Tutorial
Section titled “Tutorial”-
Create the project structure
Create a new directory for your custom source:
Terminal window mkdir pokeapi-sourcecd pokeapi-sourceCreate these files:
source.py- The source connector codeconfig.yml- Pipeline configuration
-
Define the config class
Open
source.pyand add the configuration class:from typing import List, Optional, Tuple, Typefrom pydantic import Fieldfrom requests.auth import AuthBasefrom bizon.source.source import AbstractSourcefrom bizon.source.config import SourceConfigfrom bizon.source.models import SourceIteration, SourceRecordclass PokeAPIConfig(SourceConfig):"""Configuration for PokéAPI source."""base_url: str = Field(default="https://pokeapi.co/api/v2",description="API base URL")page_size: int = Field(default=20,description="Number of records per page (max 100)")The config class:
- Extends
SourceConfigto inherit base fields (name,stream,sync_mode, etc.) - Adds source-specific fields using Pydantic’s
Field
- Extends
-
Implement the source class skeleton
Add the source class with required static methods:
class PokeAPISource(AbstractSource):"""Source connector for PokéAPI."""@staticmethoddef streams() -> List[str]:"""Return available stream names."""return ["pokemon", "type", "ability", "move", "item"]@staticmethoddef get_config_class() -> Type[SourceConfig]:"""Return the config class for this source."""return PokeAPIConfigdef get_authenticator(self) -> Optional[AuthBase]:"""Return authenticator (None for public APIs)."""return Nonedef check_connection(self) -> Tuple[bool, Optional[str]]:"""Test the API connection."""try:response = self.session.get(f"{self.config.base_url}/pokemon/1")if response.ok:return True, Nonereturn False, f"API returned status {response.status_code}"except Exception as e:return False, str(e)def get_total_records_count(self) -> Optional[int]:"""Return total record count from API."""try:response = self.session.get(f"{self.config.base_url}/{self.config.stream}",params={"limit": 1})return response.json().get("count")except:return None -
Implement the get method
Add the main data extraction logic. PokéAPI returns a list of resources with URLs, so we’ll fetch the details for each:
def get(self, pagination: dict = None) -> SourceIteration:"""Fetch the next batch of records."""# Get current offset from pagination stateoffset = pagination.get("offset", 0) if pagination else 0# Fetch list of resourcesresponse = self.session.get(f"{self.config.base_url}/{self.config.stream}",params={"offset": offset,"limit": self.config.page_size})response.raise_for_status()data = response.json()# Fetch details for each resourcerecords = []for item in data.get("results", []):# Get the ID from the URL (e.g., .../pokemon/25/ -> 25)item_id = item["url"].rstrip("/").split("/")[-1]# Fetch full detailsdetail_response = self.session.get(item["url"])if detail_response.ok:records.append(SourceRecord(id=item_id,data=detail_response.json()))# Check if there are more pagesif data.get("next"):next_pagination = {"offset": offset + self.config.page_size}else:next_pagination = {}return SourceIteration(records=records,next_pagination=next_pagination)Key points:
paginationcontains state from the previous iteration- PokéAPI provides a
nextURL when more data is available - Return empty
next_pagination={}when there’s no more data - Each record needs a unique
id
-
Create the complete source file
Your complete
source.pyshould look like:from typing import List, Optional, Tuple, Typefrom pydantic import Fieldfrom requests.auth import AuthBasefrom bizon.source.source import AbstractSourcefrom bizon.source.config import SourceConfigfrom bizon.source.models import SourceIteration, SourceRecordclass PokeAPIConfig(SourceConfig):"""Configuration for PokéAPI source."""base_url: str = Field(default="https://pokeapi.co/api/v2",description="API base URL")page_size: int = Field(default=20,description="Number of records per page (max 100)")class PokeAPISource(AbstractSource):"""Source connector for PokéAPI."""@staticmethoddef streams() -> List[str]:return ["pokemon", "type", "ability", "move", "item"]@staticmethoddef get_config_class() -> Type[SourceConfig]:return PokeAPIConfigdef get_authenticator(self) -> Optional[AuthBase]:return Nonedef check_connection(self) -> Tuple[bool, Optional[str]]:try:response = self.session.get(f"{self.config.base_url}/pokemon/1")if response.ok:return True, Nonereturn False, f"API returned status {response.status_code}"except Exception as e:return False, str(e)def get_total_records_count(self) -> Optional[int]:try:response = self.session.get(f"{self.config.base_url}/{self.config.stream}",params={"limit": 1})return response.json().get("count")except:return Nonedef get(self, pagination: dict = None) -> SourceIteration:offset = pagination.get("offset", 0) if pagination else 0response = self.session.get(f"{self.config.base_url}/{self.config.stream}",params={"offset": offset,"limit": self.config.page_size})response.raise_for_status()data = response.json()records = []for item in data.get("results", []):item_id = item["url"].rstrip("/").split("/")[-1]detail_response = self.session.get(item["url"])if detail_response.ok:records.append(SourceRecord(id=item_id,data=detail_response.json()))if data.get("next"):next_pagination = {"offset": offset + self.config.page_size}else:next_pagination = {}return SourceIteration(records=records,next_pagination=next_pagination) -
Create the pipeline configuration
Create
config.yml:name: pokeapi-pokemonsource:name: pokeapistream: pokemonpage_size: 5destination:name: loggerconfig: {} -
Run the pipeline
Test your source:
Terminal window bizon run config.yml --custom-source ./source.py --log-level DEBUGYou should see Pokémon being fetched and logged:
INFO - Fetching records from source...INFO - Retrieved 5 records in iteration 0INFO - Retrieved 5 records in iteration 1...INFO - Pipeline finished successfully. -
Test different streams
Update
config.ymlto sync Pokémon types:source:name: pokeapistream: typepage_size: 10Run again:
Terminal window bizon run config.yml --custom-source ./source.py
Adding Authentication
Section titled “Adding Authentication”For APIs that require authentication, implement get_authenticator:
from bizon.source.auth.builder import AuthBuilder
class MyAuthenticatedSource(AbstractSource):
def get_authenticator(self) -> Optional[AuthBase]: if self.config.authentication: return AuthBuilder.from_config(self.config.authentication) return NoneThen configure auth in YAML:
source: name: my_source stream: data authentication: type: api_key params: token: BIZON_ENV_API_TOKENAdding Incremental Sync
Section titled “Adding Incremental Sync”Support incremental mode by implementing get_records_after:
from bizon.source.models import SourceIncrementalState
class MySource(AbstractSource):
def get_records_after( self, source_state: SourceIncrementalState, pagination: dict = None ) -> SourceIteration: """Fetch records updated after last sync."""
offset = pagination.get("offset", 0) if pagination else 0 since = source_state.last_run.isoformat()
response = self.session.get( f"{self.config.base_url}/{self.config.stream}", params={ "updated_since": since, "offset": offset, "limit": self.config.page_size } ) # ... rest of implementationUse with:
source: name: my_source stream: data sync_mode: incrementalLoading to BigQuery
Section titled “Loading to BigQuery”Update your config to load Pokémon data to BigQuery:
name: pokeapi-to-bigquery
source: name: pokeapi stream: pokemon page_size: 50
destination: name: bigquery config: project_id: my-project dataset_id: raw_data dataset_location: US gcs_buffer_bucket: my-staging-bucket
transforms: - label: extract-pokemon-data python: | # Extract key fields from the Pokemon data pokemon = data data = { 'id': pokemon.get('id'), 'name': pokemon.get('name'), 'height': pokemon.get('height'), 'weight': pokemon.get('weight'), 'base_experience': pokemon.get('base_experience'), 'types': [t['type']['name'] for t in pokemon.get('types', [])], 'abilities': [a['ability']['name'] for a in pokemon.get('abilities', [])] }Complete Example with All Features
Section titled “Complete Example with All Features”from datetime import datetimefrom typing import List, Optional, Tuple, Typefrom pydantic import Fieldfrom requests.auth import AuthBasefrom pytz import UTC
from bizon.source.source import AbstractSourcefrom bizon.source.config import SourceConfigfrom bizon.source.models import ( SourceIteration, SourceRecord, SourceIncrementalState)from bizon.source.auth.builder import AuthBuilder
class AdvancedAPIConfig(SourceConfig): """Configuration for Advanced API source."""
base_url: str = Field(..., description="API base URL") page_size: int = Field(100, description="Records per page") timeout: int = Field(30, description="Request timeout in seconds") include_metadata: bool = Field(False, description="Include API metadata")
class AdvancedAPISource(AbstractSource): """Advanced API source with full feature support."""
@staticmethod def streams() -> List[str]: return ["users", "orders", "products", "events"]
@staticmethod def get_config_class() -> Type[SourceConfig]: return AdvancedAPIConfig
def get_authenticator(self) -> Optional[AuthBase]: if self.config.authentication: return AuthBuilder.from_config(self.config.authentication) return None
def check_connection(self) -> Tuple[bool, Optional[str]]: try: response = self.session.get( f"{self.config.base_url}/health", timeout=self.config.timeout ) return response.ok, None except Exception as e: return False, str(e)
def get_total_records_count(self) -> Optional[int]: try: response = self.session.get( f"{self.config.base_url}/{self.config.stream}/count", timeout=self.config.timeout ) return response.json().get("total") except: return None
def get(self, pagination: dict = None) -> SourceIteration: cursor = pagination.get("cursor") if pagination else None
params = {"limit": self.config.page_size} if cursor: params["after"] = cursor
response = self.session.get( f"{self.config.base_url}/{self.config.stream}", params=params, timeout=self.config.timeout ) response.raise_for_status() data = response.json()
records = [ SourceRecord( id=str(item["id"]), data=item, timestamp=datetime.fromisoformat( item.get("updated_at", datetime.now(UTC).isoformat()) ) ) for item in data.get("items", []) ]
next_cursor = data.get("next_cursor") return SourceIteration( records=records, next_pagination={"cursor": next_cursor} if next_cursor else {} )
def get_records_after( self, source_state: SourceIncrementalState, pagination: dict = None ) -> SourceIteration: cursor = pagination.get("cursor") if pagination else None
params = { "limit": self.config.page_size, "updated_since": source_state.last_run.isoformat() } if cursor: params["after"] = cursor
response = self.session.get( f"{self.config.base_url}/{self.config.stream}", params=params, timeout=self.config.timeout ) response.raise_for_status() data = response.json()
records = [ SourceRecord( id=str(item["id"]), data=item, timestamp=datetime.fromisoformat(item["updated_at"]) ) for item in data.get("items", []) ]
next_cursor = data.get("next_cursor") return SourceIteration( records=records, next_pagination={"cursor": next_cursor} if next_cursor else {} )Next Steps
Section titled “Next Steps”- Custom Sources Reference - Complete API reference
- Sources Overview - AbstractSource documentation
- Kafka to BigQuery Tutorial - Build a streaming pipeline