Skip to content

Custom Source Builder

The Custom Source Builder lets you create source connectors for any API, with AI assistance to generate code from documentation.

Custom sources extend Bizon’s capabilities to any data source:

  • AI-Powered - Generate code from API documentation URLs
  • Live Preview - Test your source with real data before saving
  • Validation - Security checks ensure safe execution
  • Reusable - Use custom sources in pipelines like built-in connectors
  1. Open the Builder

    Navigate to Connectors > Custom Sources > Create New.

  2. Choose Your Approach

    • AI Autonomous - Provide an API documentation URL and let AI generate the code
    • Manual - Write the code yourself
  3. Write or Generate Code

    The editor provides:

    • Syntax highlighting
    • Real-time validation
    • Error messages with line numbers
  4. Preview Data

    Test with real credentials:

    • Select a stream
    • Enter authentication
    • View sample records
  5. Save and Use

    Save your source and use it in pipelines.

Custom sources must extend AbstractSource:

from typing import Iterator
from bizon.source.base import AbstractSource
from bizon.source.models import SourceRecord
class MyAPISource(AbstractSource):
"""Custom source for My API."""
def get_streams(self) -> list[str]:
"""Return available data streams."""
return ["users", "orders", "products"]
def check_connection(self) -> bool:
"""Verify API credentials are valid."""
try:
# Make a test API call
response = self.http_client.get("/me")
return response.status_code == 200
except Exception:
return False
def get_records(self, stream: str) -> Iterator[SourceRecord]:
"""Yield records from the specified stream."""
if stream == "users":
response = self.http_client.get("/users")
for user in response.json():
yield SourceRecord(
id=str(user["id"]),
data=user
)
MethodDescription
get_streams()Return list of available stream names
check_connection()Return True if credentials are valid
get_records(stream)Yield SourceRecord objects

Define authentication requirements:

from pydantic import BaseModel
class MyAPIAuth(BaseModel):
"""Authentication configuration."""
api_key: str
api_secret: str
class MyAPISource(AbstractSource):
auth_model = MyAPIAuth
def get_records(self, stream: str):
# Access auth via self.authentication
headers = {
"X-API-Key": self.authentication.api_key,
"X-API-Secret": self.authentication.api_secret
}
# ...

A secure HTTP client is provided:

# GET request
response = self.http_client.get("/users", params={"page": 1})
# POST request
response = self.http_client.post("/search", json={"query": "test"})
# With headers
response = self.http_client.get(
"/data",
headers={"Authorization": f"Bearer {self.authentication.token}"}
)

Handle paginated APIs:

def get_records(self, stream: str) -> Iterator[SourceRecord]:
page = 1
while True:
response = self.http_client.get(
f"/{stream}",
params={"page": page, "limit": 100}
)
data = response.json()
if not data["items"]:
break
for item in data["items"]:
yield SourceRecord(id=str(item["id"]), data=item)
page += 1

The AI assistant can generate source code from documentation:

  1. Paste the API documentation URL
  2. AI analyzes endpoints, authentication, and data models
  3. Generated code includes:
    • Proper authentication handling
    • Stream detection from endpoints
    • Pagination logic
    • Error handling
URL: https://api.example.com/docs

For safety, these are blocked:

  • Imports: os, sys, subprocess, socket
  • Functions: eval, exec, compile, open
  • Network: Direct urllib/requests (use provided HTTP client)

Common validation issues:

ErrorSolution
”Missing get_streams method”Add required method
”Import ‘os’ is not allowed”Remove blocked import
”No AbstractSource subclass found”Extend AbstractSource

Once saved, use custom sources like built-in connectors:

source:
name: my_custom_source
stream: users
authentication:
type: custom
params:
api_key: "xxx"