ETL Pipeline - Example¶
Overview¶
This pipeline extracts data from source systems, transforms it, and loads into the data warehouse.
Pipeline Status
Schedule: Daily at 7:00 AM UTC
Owner: Prash
SLA: 4 hours
Architecture¶
flowchart TD
subgraph Sources
A[App script]
B[Appscript 2]
C[Excel Sheets]
end
subgraph Processing/Storage
D[Big Query]
E[Schedule Queries]
end
subgraph Dashboard
F[LookerStudio]
G[BI tools]
end
A --> D
B --> D
C --> D
D --> E
E --> F
style D fill:#ffecb3
style G fill:#c8e6c9
Configuration¶
# pipeline_config.py
from dataclasses import dataclass
from datetime import datetime
@dataclass
class PipelineConfig:
"""Configuration for the Sales ETL Pipeline."""
source_database: str = "postgres://sales_db"
target_schema: str = "warehouse.sales"
batch_size: int = 10000
retry_attempts: int = 3
def get_partition_date(self) -> str:
"""Get the partition date for the current run."""
return datetime.now().strftime("%Y-%m-%d")
Key SQL Transformations¶
Sales Aggregation¶
-- Aggregate daily sales by region
WITH daily_sales AS (
SELECT
DATE_TRUNC('day', order_date) AS sale_date,
region_id,
SUM(amount) AS total_sales,
COUNT(DISTINCT customer_id) AS unique_customers
FROM raw.orders
WHERE order_date >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY 1, 2
)
SELECT
sale_date,
r.region_name,
total_sales,
unique_customers,
total_sales / NULLIF(unique_customers, 0) AS avg_order_value
FROM daily_sales ds
JOIN dim.regions r ON ds.region_id = r.region_id
ORDER BY sale_date DESC, total_sales DESC;
Monitoring¶
Metric |
Threshold |
Alert |
|---|---|---|
Runtime |
> 4 hours |
|
Tracking Date |
NA |
Slack |
Error Rate |
> 1% |
Warning
If the pipeline fails, check the App Script logs first then review the BQ Schedule Queries/Notebooks.