Data Ingestion Pipeline¶
Overview¶
Generic data ingestion framework for loading data from various sources into the data lake.
Supported Sources¶
Source Type |
Connector |
Status |
|---|---|---|
BigQuery |
|
✅ Active |
App Script |
|
✅ Active |
Lookerstudio |
|
✅ Active |
Usage Example¶
from ingestion import DataIngester
# Initialize the ingester
ingester = DataIngester(
source_type="postgresql",
connection_string="postgresql://user:pass@host:5432/db"
)
# Define extraction query
query = """
SELECT *
FROM customers
WHERE updated_at > %(last_run)s
"""
# Run incremental ingestion
result = ingester.extract(
query=query,
params={"last_run": "2024-01-01"},
target_path="s3://data-lake/raw/customers/"
)
print(f"Ingested {result.row_count} rows")
Data Flow¶
sequenceDiagram
participant S as App Scripts
participant I as Big Query
participant L as BQ
participant C as Lookerstudio
I->>S: Connect & Extract
S-->>I: Return Data
I->>I: Validate Schema
I->>L: Write Parquet
I->>C: Update Metadata
C-->>I: Confirm