"""Example: Pipeline phase execution - from pipeline/run_pipeline.py and actual codebase.""" import argparse from datetime import date, timedelta from typing import List, Tuple # Import pipeline modules from pipeline.fetch_mp_metadata import fetch_mp_metadata from pipeline.extract_mp_votes import extract_mp_votes from pipeline.svd_pipeline import run_svd_pipeline from pipeline.text_pipeline import run_text_pipeline from pipeline.fusion import run_fusion from database import MotionDatabase # ============================================================================= # Example 1: Running full pipeline # ============================================================================= def example_full_pipeline(): """Run the complete data ingestion pipeline.""" # Parse arguments like CLI would parser = argparse.ArgumentParser(description="Pipeline runner") parser.add_argument("--db-path", default="data/motions.db") parser.add_argument("--start-date", default=None) parser.add_argument("--end-date", default=None) parser.add_argument( "--window-size", choices=["quarterly", "annual"], default="quarterly" ) parser.add_argument("--svd-k", type=int, default=50) args = parser.parse_args([]) # Resolve dates end_date = date.fromisoformat(args.end_date) if args.end_date else date.today() start_date = ( date.fromisoformat(args.start_date) if args.start_date else end_date - timedelta(days=730) ) print(f"Running pipeline: {start_date} → {end_date}") print(f"Window size: {args.window_size}") print(f"DB path: {args.db_path}") # Initialize database db = MotionDatabase(args.db_path) # Phase 1: Fetch MP metadata print("\n=== Phase 1: MP Metadata ===") n_mp = fetch_mp_metadata(db_path=args.db_path) print(f"Processed {n_mp} MPs") # Phase 2: Extract MP votes print("\n=== Phase 2: Extract Votes ===") n_votes = extract_mp_votes(db_path=args.db_path) print(f"Extracted {n_votes} vote records") # Phase 3: Generate time windows print("\n=== Phase 3: SVD Pipeline ===") windows = generate_windows(start_date, end_date, args.window_size) print(f"Generated {len(windows)} windows: {windows}") # Phase 4: SVD per window run_svd_pipeline(db, windows, args.svd_k) print(f"Computed SVD for {len(windows)} windows") # Phase 5: Text embeddings print("\n=== Phase 4: Text Embeddings ===") run_text_pipeline(args.db_path, batch_size=50) print("Text embeddings completed") # Phase 6: Fusion print("\n=== Phase 5: Fusion ===") run_fusion(args.db_path, windows) print("Fusion completed") print("\n=== Pipeline Complete ===") # ============================================================================= # Example 2: Generate time windows # ============================================================================= def generate_windows( start: date, end: date, granularity: str ) -> List[Tuple[str, str, str]]: """Generate time windows for pipeline processing.""" windows = [] cursor = date(start.year, start.month, 1) if granularity == "annual": cursor = date(start.year, 1, 1) while cursor <= end: year_end = date(cursor.year, 12, 31) w_end = min(year_end, end) windows.append((str(cursor.year), cursor.isoformat(), w_end.isoformat())) cursor = date(cursor.year + 1, 1, 1) else: # quarterly quarter_starts = {1: 1, 2: 4, 3: 7, 4: 10} quarter_ends = {1: 3, 2: 6, 3: 9, 4: 12} q = (cursor.month - 1) // 3 + 1 cursor = date(cursor.year, quarter_starts[q], 1) while cursor <= end: q = (cursor.month - 1) // 3 + 1 import calendar q_end_month = quarter_ends[q] last_day = calendar.monthrange(cursor.year, q_end_month)[1] q_end = date(cursor.year, q_end_month, last_day) w_end = min(q_end, end) window_id = f"{cursor.year}-Q{q}" windows.append((window_id, cursor.isoformat(), w_end.isoformat())) cursor = q_end + timedelta(days=1) return windows def example_window_generation(): """Example of window generation.""" start = date(2023, 1, 1) end = date(2024, 6, 30) print("Quarterly windows:") quarterly = generate_windows(start, end, "quarterly") for wid, s, e in quarterly: print(f" {wid}: {s} to {e}") print("\nAnnual windows:") annual = generate_windows(start, end, "annual") for wid, s, e in annual: print(f" {wid}: {s} to {e}") # ============================================================================= # Example 3: Running individual phases # ============================================================================= def example_individual_phases(): """Run pipeline phases individually for debugging.""" db_path = "data/motions.db" db = MotionDatabase(db_path) # Only run MP metadata fetch print("Fetching MP metadata...") n = fetch_mp_metadata(db_path=db_path) print(f" {n} MPs processed") # Only run vote extraction print("Extracting votes...") n = extract_mp_votes(db_path=db_path) print(f" {n} votes extracted") # Only run SVD for specific window print("Computing SVD...") windows = [("2024-Q1", "2024-01-01", "2024-03-31")] run_svd_pipeline(db, windows, k=50) print(" SVD computed") # Only run text embeddings print("Computing embeddings...") run_text_pipeline(db_path, batch_size=25) # Smaller batch for testing print(" Embeddings computed") # ============================================================================= # Example 4: Dry run # ============================================================================= def example_dry_run(): """Show what pipeline would do without making changes.""" print("DRY RUN - no writes will be made") start_date = date(2024, 1, 1) end_date = date(2024, 6, 30) # Generate and show windows windows = generate_windows(start_date, end_date, "quarterly") print(f"Would process {len(windows)} windows:") for wid, s, e in windows: print(f" {wid}: {s} to {e}") print("\nWould run phases:") print(" 1. fetch_mp_metadata") print(" 2. extract_mp_votes") print(" 3. svd_pipeline") print(" 4. text_pipeline") print(" 5. fusion") if __name__ == "__main__": import logging logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) print("=== Window Generation ===") example_window_generation() print("\n=== Dry Run ===") example_dry_run()