From 847b783877b02234829741a085efd858b1e517d8 Mon Sep 17 00:00:00 2001 From: Sven Geboers Date: Sat, 21 Mar 2026 23:24:06 +0100 Subject: [PATCH] fix(pipeline): fix API pagination, add skip_details fast path, bulk mp_votes insert - _get_voting_records returns (records, besluit_meta) tuple; paginate via Besluit?expand=Stemming (469/mo vs 8400) - get_motions(skip_details=True) bypasses per-motion detail chain (3 HTTP calls/motion) - extract_mp_votes rewritten: bulk DataFrame insert (80k rows in 1.9s), includes party-level actors - run_pipeline.py fixed: pass db_path not db, handle dict/int return types - download_past_year.py: skip_details=True default, limit-per-chunk default 50000 --- api_client.py | 168 ++++++++++++++++++++++++++-------- pipeline/extract_mp_votes.py | 76 +++++++++------ pipeline/run_pipeline.py | 20 ++-- scripts/download_past_year.py | 112 +++++++++++++++++++++++ 4 files changed, 303 insertions(+), 73 deletions(-) create mode 100644 scripts/download_past_year.py diff --git a/api_client.py b/api_client.py index b34d213..3c245ae 100644 --- a/api_client.py +++ b/api_client.py @@ -21,19 +21,33 @@ class TweedeKamerAPI: ) def get_motions( - self, start_date: datetime = None, end_date: datetime = None, limit: int = 500 + self, + start_date: datetime = None, + end_date: datetime = None, + limit: int = 500, + skip_details: bool = False, ) -> List[Dict]: - """Get motions with voting results using OData API""" + """Get motions with voting results using OData API. + + Args: + skip_details: If True, skip per-motion detail fetching (Zaak/Document/body text). + Uses BesluitTekst from the Besluit record instead. Much faster for + bulk historical downloads where AI summarisation is not needed. + """ if not start_date: start_date = datetime.now() - timedelta(days=730) # 2 years ago try: # Get voting records - voting_records = self._get_voting_records(start_date, end_date, limit) + voting_records, besluit_meta = self._get_voting_records( + start_date, end_date, limit + ) print(f"Fetched {len(voting_records)} voting records from API") # Group by Besluit_Id (decision/motion) and get motion details - motions = self._process_voting_records(voting_records) + motions = self._process_voting_records( + voting_records, besluit_meta, skip_details=skip_details + ) print(f"Processed into {len(motions)} unique motions") return motions @@ -43,52 +57,109 @@ class TweedeKamerAPI: return [] def _get_voting_records( - self, start_date: datetime, end_date: datetime = None, limit: int = 500 - ) -> List[Dict]: - """Get individual voting records from the API""" + self, start_date: datetime, end_date: datetime = None, limit: int = 50000 + ) -> tuple: + """Get individual voting records from the API via Besluit?$expand=Stemming. + + Uses Besluit (decisions) with embedded Stemming (votes) to avoid the + expensive per-record pagination of the flat Stemming endpoint. + Only returns Besluit records with StemmingsSoort set (actual votes). + + Returns: + (voting_records, besluit_meta) where: + - voting_records: flat list of Stemming dicts with Besluit_Id set + - besluit_meta: dict of besluit_id → {title, date, besluit_tekst} + """ # Format date properly for OData start_date_str = start_date.strftime("%Y-%m-%d") - filter_query = f"GewijzigdOp ge {start_date_str}T00:00:00Z" + filter_query = ( + f"GewijzigdOp ge {start_date_str}T00:00:00Z" + " and StemmingsSoort ne null" + " and Verwijderd eq false" + ) if end_date: end_date_str = end_date.strftime("%Y-%m-%d") filter_query += f" and GewijzigdOp le {end_date_str}T23:59:59Z" - # Add filter to exclude deleted records - filter_query += " and Verwijderd eq false" - - url = f"{self.odata_base_url}/Stemming" - params = { + page_size = 250 # API caps $top at 250 + base_url = f"{self.odata_base_url}/Besluit" + base_params = { "$filter": filter_query, - "$top": limit, + "$top": page_size, + "$expand": "Stemming", "$orderby": "GewijzigdOp desc", } + all_records: List[Dict] = [] + besluit_meta: Dict[str, Dict] = {} + skip = 0 + try: - response = self.session.get(url, params=params, timeout=config.API_TIMEOUT) - response.raise_for_status() - data = response.json() + while len(besluit_meta) < limit: + params = {**base_params, "$skip": skip} + response = self.session.get( + base_url, params=params, timeout=config.API_TIMEOUT + ) + response.raise_for_status() + data = response.json() - voting_records = data.get("value", []) + besluit_page = data.get("value", []) + if not besluit_page: + break - # If we got the maximum, there might be more data - if len(voting_records) == limit: - print( - f"Retrieved maximum {limit} records, there might be more data available" - ) + # Flatten: for each Besluit, capture metadata and emit each Stemming record + for besluit in besluit_page: + besluit_id = besluit.get("Id") + if not besluit_id: + continue - return voting_records + date_str = besluit.get("GewijzigdOp", "") + date = date_str.split("T")[0] if date_str else "" + besluit_meta[besluit_id] = { + "title": besluit.get("BesluitTekst") + or f"Besluit {besluit_id[:8]}", + "date": date, + "besluit_tekst": besluit.get("BesluitTekst") or "", + } + + for stemming in besluit.get("Stemming", []): + stemming["Besluit_Id"] = besluit_id + all_records.append(stemming) + + if len(besluit_page) < page_size: + break # last page + skip += page_size + + print( + f"Retrieved {len(all_records)} voting records from {len(besluit_meta)} decisions" + ) + return all_records, besluit_meta except requests.exceptions.RequestException as e: print(f"API request failed: {e}") if hasattr(e, "response") and e.response is not None: print(f"Response status: {e.response.status_code}") print(f"Response text: {e.response.text[:500]}") - return [] + return all_records, besluit_meta # return whatever we got before failure + + def _process_voting_records( + self, + records: List[Dict], + besluit_meta: Dict[str, Dict] = None, + skip_details: bool = False, + ) -> List[Dict]: + """Process individual voting records into grouped motions. - def _process_voting_records(self, records: List[Dict]) -> List[Dict]: - """Process individual voting records into grouped motions""" + Args: + records: Flat Stemming records, each with Besluit_Id set. + besluit_meta: Pre-fetched dict of besluit_id → {title, date, besluit_tekst}. + If provided and skip_details=True, avoids per-motion HTTP calls. + skip_details: Skip fetching Zaak/Document/body text per motion. + """ + if besluit_meta is None: + besluit_meta = {} # Group records by Besluit_Id (decision/motion) motion_groups = defaultdict( @@ -146,18 +217,41 @@ class TweedeKamerAPI: if len(motion_data["votes"]) < 3: # Skip motions with too few votes continue - # Get motion details - motion_details = self._get_motion_details(besluit_id) - - if not motion_details: - # Create basic motion data if we can't get details + # Get motion details — use pre-fetched meta if skip_details=True + if skip_details and besluit_id in besluit_meta: + meta = besluit_meta[besluit_id] motion_details = { - "title": f"Motion {besluit_id[:8]}", - "description": "No description available", - "date": motion_data["latest_date"].split("T")[0] - if motion_data["latest_date"] - else datetime.now().strftime("%Y-%m-%d"), + "title": meta["title"], + "description": meta["besluit_tekst"] or meta["title"], + "date": meta["date"], + "externe_identifier": None, + "body_text": None, } + else: + motion_details = self._get_motion_details(besluit_id) + + if not motion_details: + # Fall back to besluit_meta if available, else generic placeholder + if besluit_id in besluit_meta: + meta = besluit_meta[besluit_id] + motion_details = { + "title": meta["title"], + "description": meta["besluit_tekst"] or meta["title"], + "date": meta["date"], + "externe_identifier": None, + "body_text": None, + } + else: + latest = motion_data["latest_date"] or "" + motion_details = { + "title": f"Motion {besluit_id[:8]}", + "description": "No description available", + "date": latest.split("T")[0] + if latest + else datetime.now().strftime("%Y-%m-%d"), + "externe_identifier": None, + "body_text": None, + } # Calculate winning margin voting_results = motion_data["votes"] diff --git a/pipeline/extract_mp_votes.py b/pipeline/extract_mp_votes.py index 99a953c..3ff5d94 100644 --- a/pipeline/extract_mp_votes.py +++ b/pipeline/extract_mp_votes.py @@ -3,6 +3,7 @@ import logging from typing import Optional import duckdb +import pandas as pd from database import MotionDatabase @@ -13,6 +14,11 @@ def extract_mp_votes(db_path: Optional[str] = None, limit: Optional[int] = None) """Extract individual MP votes from motions.voting_results and store them in the mp_votes table. + Handles both individual MP votes (ActorNaam contains comma) and party-level + votes (ActorNaam has no comma) by treating party names as actors. + + Uses a single DuckDB connection with DataFrame bulk insert for performance. + Returns a dict with summary counts: - motions_scanned: number of motions inspected - mp_rows_inserted: number of mp_votes rows inserted @@ -31,42 +37,56 @@ def extract_mp_votes(db_path: Optional[str] = None, limit: Optional[int] = None) rows = conn.execute( "SELECT id, voting_results, date FROM motions" ).fetchall() - finally: + except Exception as e: conn.close() + raise e - mp_rows_inserted = 0 - motions_skipped = 0 motions_scanned = 0 + motions_skipped = 0 + batch = [] for motion_id, voting_results_json, date in rows: motions_scanned += 1 - try: - if db.mp_votes_exists_for_motion(motion_id): - _logger.debug( - "Skipping motion %s because mp_votes already exist", motion_id - ) - motions_skipped += 1 - continue - - # voting_results may be stored as JSON text or as native JSON; ensure it's a dict - if isinstance(voting_results_json, str): - voting_results = json.loads(voting_results_json) - else: - voting_results = voting_results_json - - for actor, vote in (voting_results or {}).items(): - # Individual MP names contain a comma (e.g. "Last, F.") - if "," not in actor: - continue - - inserted_id = db.insert_mp_vote( - motion_id=motion_id, mp_name=actor, vote=vote, date=date, party=None - ) - if inserted_id and inserted_id > 0: - mp_rows_inserted += 1 + # Check if mp_votes already exist for this motion + existing = conn.execute( + "SELECT COUNT(*) FROM mp_votes WHERE motion_id = ?", (motion_id,) + ).fetchone() + if existing and existing[0] > 0: + _logger.debug("Skipping motion %s, mp_votes already exist", motion_id) + motions_skipped += 1 + continue + + # voting_results may be stored as JSON text or as native JSON; ensure it's a dict + if isinstance(voting_results_json, str): + voting_results = json.loads(voting_results_json) + else: + voting_results = voting_results_json + + for actor, vote in (voting_results or {}).items(): + # Individual MP names contain a comma (e.g. "Last, F.") + # Party names have no comma (e.g. "VVD", "GroenLinks-PvdA") + party = None if "," in actor else actor + batch.append((motion_id, actor, party, vote, str(date) if date else None)) + + # Bulk insert via DataFrame for performance (avoids per-row connection overhead) + mp_rows_inserted = 0 + if batch: + try: + df = pd.DataFrame( + batch, columns=["motion_id", "mp_name", "party", "vote", "date"] + ) + conn.execute( + "INSERT INTO mp_votes (motion_id, mp_name, party, vote, date) SELECT * FROM df" + ) + mp_rows_inserted = len(batch) + _logger.info("Bulk inserted %d mp_votes rows", mp_rows_inserted) except Exception as e: - _logger.error("Error processing motion %s: %s", motion_id, e) + _logger.error("Bulk insert failed: %s", e) + else: + _logger.info("No new mp_votes rows to insert") + + conn.close() return { "motions_scanned": motions_scanned, diff --git a/pipeline/run_pipeline.py b/pipeline/run_pipeline.py index 8f855d9..0fb306d 100644 --- a/pipeline/run_pipeline.py +++ b/pipeline/run_pipeline.py @@ -115,8 +115,8 @@ def run(args: argparse.Namespace) -> int: if not dry_run: from pipeline.fetch_mp_metadata import fetch_mp_metadata - fetched, skipped = fetch_mp_metadata(db) - _logger.info(" mp_metadata: fetched=%d skipped=%d", fetched, skipped) + n = fetch_mp_metadata(db_path=db.db_path) + _logger.info(" mp_metadata: processed=%d", n) else: _logger.info(" [dry-run] would call fetch_mp_metadata(db)") else: @@ -128,9 +128,12 @@ def run(args: argparse.Namespace) -> int: if not dry_run: from pipeline.extract_mp_votes import extract_mp_votes - inserted, skipped = extract_mp_votes(db) + result = extract_mp_votes(db_path=db.db_path) _logger.info( - " mp_votes: inserted=%d motions skipped=%d", inserted, skipped + " mp_votes: inserted=%d motions_scanned=%d skipped=%d", + result["mp_rows_inserted"], + result["motions_scanned"], + result["motions_skipped"], ) else: _logger.info(" [dry-run] would call extract_mp_votes(db)") @@ -199,11 +202,12 @@ def run(args: argparse.Namespace) -> int: model=args.text_model, ) _logger.info( - " window %s: fused=%d skipped_no_svd=%d skipped_no_text=%d", + " window %s: fused=%d skipped_no_svd=%d skipped_no_text=%d errors=%d", window_id, - result["fused"], - result.get("skipped_no_svd", 0), - result.get("skipped_no_text", 0), + result.get("inserted", 0), + result.get("skipped_missing_svd", 0), + result.get("skipped_missing_text", 0), + result.get("errors", 0), ) else: _logger.info(" [dry-run] would fuse window %s", window_id) diff --git a/scripts/download_past_year.py b/scripts/download_past_year.py new file mode 100644 index 0000000..03aa419 --- /dev/null +++ b/scripts/download_past_year.py @@ -0,0 +1,112 @@ +"""download_past_year.py — One-shot data download: past year of parliamentary motions. + +Fetches Stemming records from the OData API in quarterly chunks (90-day windows), +stores motions into data/motions.db using MotionDatabase.insert_motion(). + +Skips AI summarisation — this is a raw data fetch for the embedding pipeline. + +Usage: + uv run python scripts/download_past_year.py [--db-path data/motions.db] [--days 365] +""" + +import argparse +import sys +import time +from datetime import datetime, timedelta + +sys.path.insert(0, ".") # run from project root + +from api_client import TweedeKamerAPI +from database import MotionDatabase + + +def main(): + parser = argparse.ArgumentParser(description="Download past year of motions") + parser.add_argument("--db-path", default="data/motions.db") + parser.add_argument( + "--days", type=int, default=365, help="How many days back to fetch" + ) + parser.add_argument("--chunk-days", type=int, default=90, help="Days per API chunk") + parser.add_argument( + "--limit-per-chunk", + type=int, + default=50000, + help="Max motions (Besluit) per chunk", + ) + parser.add_argument( + "--delay", type=float, default=2.0, help="Seconds between chunks" + ) + args = parser.parse_args() + + api = TweedeKamerAPI() + db = MotionDatabase(args.db_path) + + end_date = datetime.now() + start_date = end_date - timedelta(days=args.days) + + print( + f"Downloading motions from {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}" + ) + print(f"DB: {args.db_path}") + print() + + # Test connectivity first + test_url = f"{api.odata_base_url}/Stemming" + r = api.session.get(test_url, params={"$top": 1}, timeout=10) + if r.status_code != 200: + print(f"ERROR: API returned {r.status_code}. Aborting.") + sys.exit(1) + print("✅ API connection OK\n") + + chunk_start = start_date + chunk_num = 0 + total_fetched = 0 + total_inserted = 0 + total_duplicates = 0 + + while chunk_start < end_date: + chunk_end = min(chunk_start + timedelta(days=args.chunk_days), end_date) + chunk_num += 1 + label = f"{chunk_start.strftime('%Y-%m-%d')} → {chunk_end.strftime('%Y-%m-%d')}" + print(f"[Chunk {chunk_num}] {label}") + + try: + motions = api.get_motions( + start_date=chunk_start, + end_date=chunk_end, + limit=args.limit_per_chunk, + skip_details=True, + ) + print(f" Fetched {len(motions)} motions") + total_fetched += len(motions) + + inserted = 0 + duplicates = 0 + for m in motions: + if db.insert_motion(m): + inserted += 1 + else: + duplicates += 1 + + total_inserted += inserted + total_duplicates += duplicates + print(f" Inserted {inserted} new | {duplicates} duplicates skipped") + + except Exception as e: + print(f" ERROR: {e}") + + chunk_start = chunk_end + if chunk_start < end_date: + print(f" Waiting {args.delay}s before next chunk…") + time.sleep(args.delay) + + print() + print("=" * 50) + print(f"Done. Total fetched: {total_fetched}") + print(f" Inserted: {total_inserted}") + print(f" Duplicates: {total_duplicates}") + print("=" * 50) + + +if __name__ == "__main__": + main()