ETL Pipeline - Example

Overview

This pipeline extracts data from source systems, transforms it, and loads into the data warehouse.

Pipeline Status

Schedule: Daily at 7:00 AM UTC
Owner: Prash SLA: 4 hours

Architecture

        flowchart TD
    subgraph Sources
        A[App script] 
        B[Appscript 2]
        C[Excel Sheets]
    end
    
    subgraph Processing/Storage
        D[Big Query]
        E[Schedule Queries]
    end
    
    subgraph Dashboard
        F[LookerStudio]
        G[BI tools]
    end
    
    A --> D
    B --> D
    C --> D
    D --> E
    E --> F

    
    style D fill:#ffecb3
    style G fill:#c8e6c9
    

Configuration

# pipeline_config.py
from dataclasses import dataclass
from datetime import datetime

@dataclass
class PipelineConfig:
    """Configuration for the Sales ETL Pipeline."""
    
    source_database: str = "postgres://sales_db"
    target_schema: str = "warehouse.sales"
    batch_size: int = 10000
    retry_attempts: int = 3
    
    def get_partition_date(self) -> str:
        """Get the partition date for the current run."""
        return datetime.now().strftime("%Y-%m-%d")

Key SQL Transformations

Sales Aggregation

-- Aggregate daily sales by region
WITH daily_sales AS (
    SELECT 
        DATE_TRUNC('day', order_date) AS sale_date,
        region_id,
        SUM(amount) AS total_sales,
        COUNT(DISTINCT customer_id) AS unique_customers
    FROM raw.orders
    WHERE order_date >= CURRENT_DATE - INTERVAL '30 days'
    GROUP BY 1, 2
)
SELECT 
    sale_date,
    r.region_name,
    total_sales,
    unique_customers,
    total_sales / NULLIF(unique_customers, 0) AS avg_order_value
FROM daily_sales ds
JOIN dim.regions r ON ds.region_id = r.region_id
ORDER BY sale_date DESC, total_sales DESC;

Monitoring

Metric

Threshold

Alert

Runtime

> 4 hours

Email

Tracking Date

NA

Slack

Error Rate

> 1%

Email

Warning

If the pipeline fails, check the App Script logs first then review the BQ Schedule Queries/Notebooks.