Skip to content

Transforms

Transforms allow you to modify, enrich, or filter data as it flows through the pipeline. Each transform runs Python code on individual records.

Add transforms to your pipeline configuration:

name: my-pipeline
source:
name: kafka
stream: topic
destination:
name: bigquery
config:
project_id: my-project
dataset_id: analytics
transforms:
- label: parse-json
python: |
import json
data['payload'] = json.loads(data['payload'])
- label: extract-fields
python: |
payload = data['payload']
data = {
'user_id': payload['user_id'],
'event_type': payload['type'],
'timestamp': payload['ts']
}

Each transform has two fields:

FieldTypeDescription
labelstringIdentifier for logging and debugging
pythonstringPython code to execute

Your transform code has access to:

VariableTypeDescription
datadictThe record’s data payload (read/write)
recorddictFull record with metadata (read-only)

The data dict contains the source record payload. Modify it in place or reassign entirely:

transforms:
# Modify in place
- label: add-field
python: |
data['processed_at'] = '2024-01-01'
# Reassign entirely
- label: reshape
python: |
data = {
'id': data['user_id'],
'name': data['full_name']
}

The record dict provides read-only access to record metadata:

transforms:
- label: add-metadata
python: |
data['source_id'] = record['id']
data['extracted'] = str(record['timestamp'])
transforms:
- label: parse-json-value
python: |
import json
if isinstance(data.get('value'), str):
data['value'] = json.loads(data['value'])
transforms:
- label: flatten-nested
python: |
user = data.get('user', {})
data['user_id'] = user.get('id')
data['user_email'] = user.get('email')
del data['user'] # Remove nested object
transforms:
- label: convert-types
python: |
data['amount'] = float(data.get('amount', 0))
data['quantity'] = int(data.get('quantity', 0))
data['is_active'] = bool(data.get('is_active'))
transforms:
- label: categorize
python: |
amount = data.get('amount', 0)
if amount > 1000:
data['tier'] = 'premium'
elif amount > 100:
data['tier'] = 'standard'
else:
data['tier'] = 'basic'
transforms:
- label: format-dates
python: |
from datetime import datetime
ts = data.get('created_at')
if ts:
dt = datetime.fromisoformat(ts.replace('Z', '+00:00'))
data['created_date'] = dt.strftime('%Y-%m-%d')
data['created_hour'] = dt.hour

To skip records, set data to an empty dict:

transforms:
- label: filter-test-users
python: |
if data.get('email', '').endswith('@test.com'):
data = {} # Skip this record
transforms:
- label: compute-metrics
python: |
data['total'] = data.get('price', 0) * data.get('quantity', 1)
data['discount_amount'] = data['total'] * data.get('discount_pct', 0) / 100
data['final_total'] = data['total'] - data['discount_amount']

Transforms execute in order. Each transform receives the output of the previous:

transforms:
# Step 1: Parse JSON
- label: parse
python: |
import json
data = json.loads(data['raw_json'])
# Step 2: Extract fields (receives parsed data)
- label: extract
python: |
data = {
'id': data['user']['id'],
'action': data['event']['type']
}
# Step 3: Enrich (receives extracted data)
- label: enrich
python: |
data['processed'] = True

Standard library modules are available:

transforms:
- label: complex-transform
python: |
import json
import re
from datetime import datetime, timedelta
from urllib.parse import urlparse
# Use imported modules
url = urlparse(data.get('page_url', ''))
data['domain'] = url.netloc
  1. Keep transforms simple - Complex logic is harder to debug
  2. Handle missing data - Use .get() with defaults
  3. Log for debugging - Use print() statements (visible in DEBUG log level)
  4. Test locally - Use logger destination to verify transforms
transforms:
- label: safe-extract
python: |
# Handle missing nested data
user = data.get('user') or {}
address = user.get('address') or {}
data['user_id'] = user.get('id', 'unknown')
data['city'] = address.get('city', '')
data['country'] = address.get('country', 'US')
  • Transforms run on every record - keep them fast
  • Avoid expensive operations (HTTP calls, file I/O)
  • Pre-compile regex patterns if used repeatedly:
transforms:
- label: extract-ids
python: |
import re
# Pattern is compiled once, not per-record
pattern = re.compile(r'ID:(\d+)')
match = pattern.search(data.get('message', ''))
data['extracted_id'] = match.group(1) if match else None