You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
motief/.mindmodel/examples/pipeline-example.py

217 lines
6.6 KiB

"""Example: Pipeline phase execution - from pipeline/run_pipeline.py and actual codebase."""
import argparse
from datetime import date, timedelta
from typing import List, Tuple
# Import pipeline modules
from pipeline.fetch_mp_metadata import fetch_mp_metadata
from pipeline.extract_mp_votes import extract_mp_votes
from pipeline.svd_pipeline import run_svd_pipeline
from pipeline.text_pipeline import run_text_pipeline
from pipeline.fusion import run_fusion
from database import MotionDatabase
# =============================================================================
# Example 1: Running full pipeline
# =============================================================================
def example_full_pipeline():
"""Run the complete data ingestion pipeline."""
# Parse arguments like CLI would
parser = argparse.ArgumentParser(description="Pipeline runner")
parser.add_argument("--db-path", default="data/motions.db")
parser.add_argument("--start-date", default=None)
parser.add_argument("--end-date", default=None)
parser.add_argument(
"--window-size", choices=["quarterly", "annual"], default="quarterly"
)
parser.add_argument("--svd-k", type=int, default=50)
args = parser.parse_args([])
# Resolve dates
end_date = date.fromisoformat(args.end_date) if args.end_date else date.today()
start_date = (
date.fromisoformat(args.start_date)
if args.start_date
else end_date - timedelta(days=730)
)
print(f"Running pipeline: {start_date}{end_date}")
print(f"Window size: {args.window_size}")
print(f"DB path: {args.db_path}")
# Initialize database
db = MotionDatabase(args.db_path)
# Phase 1: Fetch MP metadata
print("\n=== Phase 1: MP Metadata ===")
n_mp = fetch_mp_metadata(db_path=args.db_path)
print(f"Processed {n_mp} MPs")
# Phase 2: Extract MP votes
print("\n=== Phase 2: Extract Votes ===")
n_votes = extract_mp_votes(db_path=args.db_path)
print(f"Extracted {n_votes} vote records")
# Phase 3: Generate time windows
print("\n=== Phase 3: SVD Pipeline ===")
windows = generate_windows(start_date, end_date, args.window_size)
print(f"Generated {len(windows)} windows: {windows}")
# Phase 4: SVD per window
run_svd_pipeline(db, windows, args.svd_k)
print(f"Computed SVD for {len(windows)} windows")
# Phase 5: Text embeddings
print("\n=== Phase 4: Text Embeddings ===")
run_text_pipeline(args.db_path, batch_size=50)
print("Text embeddings completed")
# Phase 6: Fusion
print("\n=== Phase 5: Fusion ===")
run_fusion(args.db_path, windows)
print("Fusion completed")
print("\n=== Pipeline Complete ===")
# =============================================================================
# Example 2: Generate time windows
# =============================================================================
def generate_windows(
start: date, end: date, granularity: str
) -> List[Tuple[str, str, str]]:
"""Generate time windows for pipeline processing."""
windows = []
cursor = date(start.year, start.month, 1)
if granularity == "annual":
cursor = date(start.year, 1, 1)
while cursor <= end:
year_end = date(cursor.year, 12, 31)
w_end = min(year_end, end)
windows.append((str(cursor.year), cursor.isoformat(), w_end.isoformat()))
cursor = date(cursor.year + 1, 1, 1)
else:
# quarterly
quarter_starts = {1: 1, 2: 4, 3: 7, 4: 10}
quarter_ends = {1: 3, 2: 6, 3: 9, 4: 12}
q = (cursor.month - 1) // 3 + 1
cursor = date(cursor.year, quarter_starts[q], 1)
while cursor <= end:
q = (cursor.month - 1) // 3 + 1
import calendar
q_end_month = quarter_ends[q]
last_day = calendar.monthrange(cursor.year, q_end_month)[1]
q_end = date(cursor.year, q_end_month, last_day)
w_end = min(q_end, end)
window_id = f"{cursor.year}-Q{q}"
windows.append((window_id, cursor.isoformat(), w_end.isoformat()))
cursor = q_end + timedelta(days=1)
return windows
def example_window_generation():
"""Example of window generation."""
start = date(2023, 1, 1)
end = date(2024, 6, 30)
print("Quarterly windows:")
quarterly = generate_windows(start, end, "quarterly")
for wid, s, e in quarterly:
print(f" {wid}: {s} to {e}")
print("\nAnnual windows:")
annual = generate_windows(start, end, "annual")
for wid, s, e in annual:
print(f" {wid}: {s} to {e}")
# =============================================================================
# Example 3: Running individual phases
# =============================================================================
def example_individual_phases():
"""Run pipeline phases individually for debugging."""
db_path = "data/motions.db"
db = MotionDatabase(db_path)
# Only run MP metadata fetch
print("Fetching MP metadata...")
n = fetch_mp_metadata(db_path=db_path)
print(f" {n} MPs processed")
# Only run vote extraction
print("Extracting votes...")
n = extract_mp_votes(db_path=db_path)
print(f" {n} votes extracted")
# Only run SVD for specific window
print("Computing SVD...")
windows = [("2024-Q1", "2024-01-01", "2024-03-31")]
run_svd_pipeline(db, windows, k=50)
print(" SVD computed")
# Only run text embeddings
print("Computing embeddings...")
run_text_pipeline(db_path, batch_size=25) # Smaller batch for testing
print(" Embeddings computed")
# =============================================================================
# Example 4: Dry run
# =============================================================================
def example_dry_run():
"""Show what pipeline would do without making changes."""
print("DRY RUN - no writes will be made")
start_date = date(2024, 1, 1)
end_date = date(2024, 6, 30)
# Generate and show windows
windows = generate_windows(start_date, end_date, "quarterly")
print(f"Would process {len(windows)} windows:")
for wid, s, e in windows:
print(f" {wid}: {s} to {e}")
print("\nWould run phases:")
print(" 1. fetch_mp_metadata")
print(" 2. extract_mp_votes")
print(" 3. svd_pipeline")
print(" 4. text_pipeline")
print(" 5. fusion")
if __name__ == "__main__":
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
print("=== Window Generation ===")
example_window_generation()
print("\n=== Dry Run ===")
example_dry_run()