Transforms
Transforms allow you to modify, enrich, or filter data as it flows through the pipeline. Each transform runs Python code on individual records.
Configuration
Section titled “Configuration”Add transforms to your pipeline configuration:
name: my-pipeline
source: name: kafka stream: topic
destination: name: bigquery config: project_id: my-project dataset_id: analytics
transforms: - label: parse-json python: | import json data['payload'] = json.loads(data['payload'])
- label: extract-fields python: | payload = data['payload'] data = { 'user_id': payload['user_id'], 'event_type': payload['type'], 'timestamp': payload['ts'] }Transform Structure
Section titled “Transform Structure”Each transform has two fields:
| Field | Type | Description |
|---|---|---|
label | string | Identifier for logging and debugging |
python | string | Python code to execute |
Execution Context
Section titled “Execution Context”Your transform code has access to:
| Variable | Type | Description |
|---|---|---|
data | dict | The record’s data payload (read/write) |
record | dict | Full record with metadata (read-only) |
The data Variable
Section titled “The data Variable”The data dict contains the source record payload. Modify it in place or reassign entirely:
transforms: # Modify in place - label: add-field python: | data['processed_at'] = '2024-01-01'
# Reassign entirely - label: reshape python: | data = { 'id': data['user_id'], 'name': data['full_name'] }The record Variable
Section titled “The record Variable”The record dict provides read-only access to record metadata:
transforms: - label: add-metadata python: | data['source_id'] = record['id'] data['extracted'] = str(record['timestamp'])Common Patterns
Section titled “Common Patterns”JSON Parsing
Section titled “JSON Parsing”transforms: - label: parse-json-value python: | import json if isinstance(data.get('value'), str): data['value'] = json.loads(data['value'])Field Extraction
Section titled “Field Extraction”transforms: - label: flatten-nested python: | user = data.get('user', {}) data['user_id'] = user.get('id') data['user_email'] = user.get('email') del data['user'] # Remove nested objectType Conversion
Section titled “Type Conversion”transforms: - label: convert-types python: | data['amount'] = float(data.get('amount', 0)) data['quantity'] = int(data.get('quantity', 0)) data['is_active'] = bool(data.get('is_active'))Conditional Logic
Section titled “Conditional Logic”transforms: - label: categorize python: | amount = data.get('amount', 0) if amount > 1000: data['tier'] = 'premium' elif amount > 100: data['tier'] = 'standard' else: data['tier'] = 'basic'Date Formatting
Section titled “Date Formatting”transforms: - label: format-dates python: | from datetime import datetime
ts = data.get('created_at') if ts: dt = datetime.fromisoformat(ts.replace('Z', '+00:00')) data['created_date'] = dt.strftime('%Y-%m-%d') data['created_hour'] = dt.hourFiltering (Set to Empty)
Section titled “Filtering (Set to Empty)”To skip records, set data to an empty dict:
transforms: - label: filter-test-users python: | if data.get('email', '').endswith('@test.com'): data = {} # Skip this recordAdding Computed Fields
Section titled “Adding Computed Fields”transforms: - label: compute-metrics python: | data['total'] = data.get('price', 0) * data.get('quantity', 1) data['discount_amount'] = data['total'] * data.get('discount_pct', 0) / 100 data['final_total'] = data['total'] - data['discount_amount']Multiple Transforms
Section titled “Multiple Transforms”Transforms execute in order. Each transform receives the output of the previous:
transforms: # Step 1: Parse JSON - label: parse python: | import json data = json.loads(data['raw_json'])
# Step 2: Extract fields (receives parsed data) - label: extract python: | data = { 'id': data['user']['id'], 'action': data['event']['type'] }
# Step 3: Enrich (receives extracted data) - label: enrich python: | data['processed'] = TrueAvailable Imports
Section titled “Available Imports”Standard library modules are available:
transforms: - label: complex-transform python: | import json import re from datetime import datetime, timedelta from urllib.parse import urlparse
# Use imported modules url = urlparse(data.get('page_url', '')) data['domain'] = url.netlocBest Practices
Section titled “Best Practices”- Keep transforms simple - Complex logic is harder to debug
- Handle missing data - Use
.get()with defaults - Log for debugging - Use
print()statements (visible in DEBUG log level) - Test locally - Use
loggerdestination to verify transforms
Example: Defensive Transform
Section titled “Example: Defensive Transform”transforms: - label: safe-extract python: | # Handle missing nested data user = data.get('user') or {} address = user.get('address') or {}
data['user_id'] = user.get('id', 'unknown') data['city'] = address.get('city', '') data['country'] = address.get('country', 'US')Performance Considerations
Section titled “Performance Considerations”- Transforms run on every record - keep them fast
- Avoid expensive operations (HTTP calls, file I/O)
- Pre-compile regex patterns if used repeatedly:
transforms: - label: extract-ids python: | import re
# Pattern is compiled once, not per-record pattern = re.compile(r'ID:(\d+)') match = pattern.search(data.get('message', '')) data['extracted_id'] = match.group(1) if match else NoneNext Steps
Section titled “Next Steps”- Configuration Reference - Complete YAML options
- Sources Overview - Configure data extraction
- Destinations Overview - Configure data loading