Data Processing Utilities

Reusable Python functions for data processing tasks.

DataFrame Operations

Load and Clean Data

import pandas as pd
from typing import List, Optional

def load_and_clean(
    filepath: str,
    date_columns: Optional[List[str]] = None,
    drop_duplicates: bool = True
) -> pd.DataFrame:
    """
    Load CSV and perform standard cleaning operations.
    
    Args:
        filepath: Path to the CSV file
        date_columns: Columns to parse as dates
        drop_duplicates: Whether to remove duplicate rows
        
    Returns:
        Cleaned DataFrame
    """
    df = pd.read_csv(
        filepath,
        parse_dates=date_columns or []
    )
    
    # Strip whitespace from string columns
    str_columns = df.select_dtypes(include=['object']).columns
    df[str_columns] = df[str_columns].apply(lambda x: x.str.strip())
    
    if drop_duplicates:
        df = df.drop_duplicates()
    
    return df

Validate Schema

def validate_schema(
    df: pd.DataFrame,
    expected_columns: List[str],
    raise_on_error: bool = True
) -> bool:
    """
    Validate that DataFrame has expected columns.
    
    Args:
        df: DataFrame to validate
        expected_columns: List of required column names
        raise_on_error: Raise exception if validation fails
        
    Returns:
        True if valid, False otherwise
    """
    missing = set(expected_columns) - set(df.columns)
    
    if missing:
        if raise_on_error:
            raise ValueError(f"Missing columns: {missing}")
        return False
    
    return True

Database Utilities

Connection Manager

from contextlib import contextmanager
import sqlalchemy as sa

@contextmanager
def get_connection(connection_string: str):
    """
    Context manager for database connections.
    
    Usage:
        with get_connection("postgresql://...") as conn:
            df = pd.read_sql(query, conn)
    """
    engine = sa.create_engine(connection_string)
    connection = engine.connect()
    try:
        yield connection
    finally:
        connection.close()
        engine.dispose()

Note

Always use context managers for database connections to ensure proper cleanup.