import json import logging from typing import Optional import duckdb import pandas as pd from database import MotionDatabase _logger = logging.getLogger(__name__) def extract_mp_votes(db_path: Optional[str] = None, limit: Optional[int] = None): """Extract individual MP votes from motions.voting_results and store them in the mp_votes table. Handles both individual MP votes (ActorNaam contains comma) and party-level votes (ActorNaam has no comma) by treating party names as actors. Uses a single DuckDB connection with DataFrame bulk insert for performance. Returns a dict with summary counts: - motions_scanned: number of motions inspected - mp_rows_inserted: number of mp_votes rows inserted - motions_skipped: number of motions skipped because mp_votes already existed """ db = MotionDatabase(db_path=db_path) if db_path else MotionDatabase() conn = duckdb.connect(db.db_path) try: # support optional limit to only scan a subset of motions if limit is not None: rows = conn.execute( "SELECT id, voting_results, date FROM motions LIMIT ?", (limit,) ).fetchall() else: rows = conn.execute( "SELECT id, voting_results, date FROM motions" ).fetchall() except Exception as e: conn.close() raise e motions_scanned = 0 motions_skipped = 0 batch = [] for motion_id, voting_results_json, date in rows: motions_scanned += 1 # Check if mp_votes already exist for this motion existing = conn.execute( "SELECT COUNT(*) FROM mp_votes WHERE motion_id = ?", (motion_id,) ).fetchone() if existing and existing[0] > 0: _logger.debug("Skipping motion %s, mp_votes already exist", motion_id) motions_skipped += 1 continue # voting_results may be stored as JSON text or as native JSON; ensure it's a dict if isinstance(voting_results_json, str): voting_results = json.loads(voting_results_json) else: voting_results = voting_results_json for actor, vote in (voting_results or {}).items(): # Individual MP names contain a comma (e.g. "Last, F.") # Party names have no comma (e.g. "VVD", "GroenLinks-PvdA") party = None if "," in actor else actor batch.append((motion_id, actor, party, vote, str(date) if date else None)) # Bulk insert via DataFrame for performance (avoids per-row connection overhead) mp_rows_inserted = 0 if batch: try: df = pd.DataFrame( batch, columns=["motion_id", "mp_name", "party", "vote", "date"] ) conn.execute( "INSERT INTO mp_votes (motion_id, mp_name, party, vote, date) SELECT * FROM df" ) mp_rows_inserted = len(batch) _logger.info("Bulk inserted %d mp_votes rows", mp_rows_inserted) except Exception as e: _logger.error("Bulk insert failed: %s", e) else: _logger.info("No new mp_votes rows to insert") conn.close() return { "motions_scanned": motions_scanned, "mp_rows_inserted": mp_rows_inserted, "motions_skipped": motions_skipped, }