Skip to content

Pipeline Management

Pipelines are the core unit of work in Bizon Platform. Each pipeline defines a data flow from a source to a destination.

  1. Navigate to Pipelines

    Click “Pipelines” in the sidebar, then “Create Pipeline”.

  2. Configure Source

    Select a source connector and configure authentication:

    • Choose from available sources (HubSpot, Kafka, etc.)
    • Select the stream to sync (contacts, orders, etc.)
    • Enter authentication credentials
  3. Configure Destination

    Select where data should be written:

    • Choose destination (BigQuery, Logger, etc.)
    • Configure connection settings
    • Set buffer options for performance
  4. Set Schedule (Optional)

    Define when the pipeline runs:

    • Cron expression for recurring runs
    • Leave empty for manual-only execution
  5. Review and Create

    Review the configuration and create the pipeline.

A pipeline configuration consists of:

name: "hubspot-contacts-to-bigquery"
source:
name: hubspot
stream: contacts
authentication:
type: api_key
params:
token: "pat-xxx"
destination:
name: bigquery
config:
project_id: "my-project"
dataset: "raw_data"
buffer_size: 100
buffer_flush_timeout: 300
FieldRequiredDescription
nameYesSource connector name
streamYesData stream to sync
authenticationYesAuth configuration
FieldRequiredDescription
nameYesDestination connector name
configYesDestination-specific settings
buffer_sizeNoBuffer size in MB (default: 50)
buffer_flush_timeoutNoMax seconds before flush (default: 600)
max_concurrent_threadsNoParallel write threads (default: 10)

Click “Run” on any pipeline to trigger immediate execution.

Set a cron expression for automatic runs:

ExpressionSchedule
0 * * * *Every hour
0 */6 * * *Every 6 hours
0 0 * * *Daily at midnight
0 0 * * 0Weekly on Sunday
0 0 1 * *Monthly on the 1st
StatusDescription
pendingQueued, waiting for worker
runningCurrently executing
successCompleted successfully
failedFailed with error
cancelledManually cancelled

View all runs for a pipeline with:

  • Status and duration
  • Records processed
  • Error messages (if failed)

Access detailed logs for debugging:

  • Step-by-step execution trace
  • Record counts per batch
  • Error stack traces

For file-based destinations, download output files directly from the UI.

Apply Python transformations to records:

transforms:
- label: "Normalize email"
python: |
record['email'] = record.get('email', '').lower()
return record
- label: "Add timestamp"
python: |
from datetime import datetime
record['synced_at'] = datetime.utcnow().isoformat()
return record
  • Each transform receives a record dict
  • Must return the modified record
  • Has access to standard library (datetime, json, re)
  • Dangerous imports are blocked for security

Control checkpoint behavior:

engine:
syncCursorInDBEvery: 50 # Lower = more durable, slower

Organize pipelines by team:

domain_id: "marketing-team-uuid"
  1. Use saved connectors - Store credentials once, reuse across pipelines
  2. Set appropriate buffers - Balance memory vs. write frequency
  3. Add transforms carefully - Keep them simple and fast
  4. Monitor run history - Check for failures regularly
  5. Use domains - Organize by team for easier management