"""download_past_year.py — One-shot data download: parliamentary motions for a date range. Fetches Stemming records from the OData API in chunks (default 90-day windows), stores motions into data/motions.db using MotionDatabase.batch_insert_motions(). Skips AI summarisation — this is a raw data fetch for the embedding pipeline. Usage: uv run python scripts/download_past_year.py [--db-path data/motions.db] [--days 365] uv run python scripts/download_past_year.py --start-date 2019-01-01 --end-date 2022-01-01 uv run python scripts/download_past_year.py --update-existing --start-date 2016-01-01 --end-date 2018-12-31 """ import argparse import sys import time from datetime import datetime, timedelta from typing import Optional, Tuple from urllib.parse import urlparse import duckdb sys.path.insert(0, ".") # run from project root from api_client import TweedeKamerAPI from database import MotionDatabase _STEMMINGSUITSLAGEN_PREFIX = "/kamerstukken/stemmingsuitslagen/" def extract_besluit_id(url: str) -> Optional[str]: """Extract the besluit_id (last path segment) from a motion URL. Expected format: https://www.tweedekamer.nl/kamerstukken/stemmingsuitslagen/{besluit_id} Returns None if the URL doesn't match expected format. """ if not url: return None try: parsed = urlparse(url) path = parsed.path.rstrip("/") if _STEMMINGSUITSLAGEN_PREFIX.rstrip("/") not in path: return None # Last path segment is the besluit_id besluit_id = path.split("/")[-1] return besluit_id if besluit_id else None except Exception: return None def update_existing_motions( db_path: str, api: "TweedeKamerAPI", start_date: str, end_date: str, delay: float = 1.0, ) -> Tuple[int, int]: """Backfill body_text for motions that are missing it. Queries for motions with NULL/empty body_text in the given date range, extracts besluit_id from the URL, fetches details via the API, and updates the row. Args: db_path: Path to the DuckDB database file. api: TweedeKamerAPI instance (uses api._get_motion_details). start_date: Start date string (YYYY-MM-DD). end_date: End date string (YYYY-MM-DD). delay: Seconds to wait between API calls. Returns: (updated_count, skipped_count) tuple. """ # Read motions with missing body_text conn_read = duckdb.connect(db_path, read_only=True) rows = conn_read.execute( """ SELECT id, url, title, description FROM motions WHERE date BETWEEN ? AND ? AND (body_text IS NULL OR body_text = '') """, (start_date, end_date), ).fetchall() conn_read.close() updated = 0 skipped = 0 conn_write = duckdb.connect(db_path, read_only=False) try: for row in rows: motion_id, url, title, description = row besluit_id = extract_besluit_id(url or "") if not besluit_id: print( f" Skipping motion {motion_id}: cannot extract besluit_id from URL" ) skipped += 1 continue print( f" Fetching details for motion {motion_id} (besluit_id={besluit_id})..." ) details = api._get_motion_details(besluit_id) if not details or not details.get("body_text"): print(f" Skipping motion {motion_id}: no body_text returned") skipped += 1 continue # Build update: always set body_text; also update title/description if # they were placeholder values (e.g. "Motion abc12345" or "No description available") new_body = details["body_text"] new_title = title new_desc = description if title and (title.startswith("Motion ") or title.startswith("Besluit ")): new_title = details.get("title") or title if description in ( None, "", "No description available", "Geen beschrijving beschikbaar", ): new_desc = details.get("description") or description conn_write.execute( """ UPDATE motions SET body_text = ?, title = ?, description = ? WHERE id = ? """, (new_body, new_title, new_desc, motion_id), ) updated += 1 print(f" Updated motion {motion_id}") if delay > 0 and updated + skipped < len(rows): time.sleep(delay) finally: conn_write.close() return updated, skipped def build_parser() -> argparse.ArgumentParser: """Build and return the argument parser for the download script.""" parser = argparse.ArgumentParser(description="Download motions for a date range") parser.add_argument("--db-path", default="data/motions.db") parser.add_argument( "--days", type=int, default=365, help="How many days back to fetch (ignored if --start-date given)", ) parser.add_argument( "--start-date", type=str, default=None, help="Explicit start date YYYY-MM-DD (overrides --days)", ) parser.add_argument( "--end-date", type=str, default=None, help="Explicit end date YYYY-MM-DD (default: today)", ) parser.add_argument("--chunk-days", type=int, default=90, help="Days per API chunk") parser.add_argument( "--limit-per-chunk", type=int, default=50000, help="Max motions (Besluit) per chunk", ) parser.add_argument( "--delay", type=float, default=2.0, help="Seconds between chunks" ) parser.add_argument( "--skip-details", action=argparse.BooleanOptionalAction, default=True, help="Skip fetching per-motion details (default: True). Use --no-skip-details to fetch body text.", ) parser.add_argument( "--update-existing", action="store_true", default=False, help="Backfill body_text for existing motions with missing text. Skips normal download.", ) return parser def main(): parser = build_parser() args = parser.parse_args() api = TweedeKamerAPI() db = MotionDatabase(args.db_path) end_date = ( datetime.strptime(args.end_date, "%Y-%m-%d") if args.end_date else datetime.now() ) if args.start_date: start_date = datetime.strptime(args.start_date, "%Y-%m-%d") else: start_date = end_date - timedelta(days=args.days) print( f"Downloading motions from {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}" ) print(f"DB: {args.db_path}") print() # --- Update-existing mode: backfill body_text, then exit --- if args.update_existing: print("Mode: update-existing (backfilling body_text for existing motions)\n") updated, skipped = update_existing_motions( db_path=args.db_path, api=api, start_date=start_date.strftime("%Y-%m-%d"), end_date=end_date.strftime("%Y-%m-%d"), delay=args.delay, ) print() print("=" * 50) print(f"Done. Updated: {updated} | Skipped: {skipped}") print("=" * 50) return # --- Normal download mode --- # Test connectivity first test_url = f"{api.odata_base_url}/Stemming" r = api.session.get(test_url, params={"$top": 1}, timeout=10) if r.status_code != 200: print(f"ERROR: API returned {r.status_code}. Aborting.") sys.exit(1) print("✅ API connection OK\n") chunk_start = start_date chunk_num = 0 total_fetched = 0 total_inserted = 0 total_duplicates = 0 while chunk_start < end_date: chunk_end = min(chunk_start + timedelta(days=args.chunk_days), end_date) chunk_num += 1 label = f"{chunk_start.strftime('%Y-%m-%d')} → {chunk_end.strftime('%Y-%m-%d')}" print(f"[Chunk {chunk_num}] {label}") try: motions = api.get_motions( start_date=chunk_start, end_date=chunk_end, limit=args.limit_per_chunk, skip_details=args.skip_details, ) print(f" Fetched {len(motions)} motions") total_fetched += len(motions) inserted = 0 duplicates = 0 inserted, duplicates = db.batch_insert_motions(motions) total_inserted += inserted total_duplicates += duplicates print(f" Inserted {inserted} new | {duplicates} duplicates skipped") except Exception as e: print(f" ERROR: {e}") chunk_start = chunk_end if chunk_start < end_date: print(f" Waiting {args.delay}s before next chunk…") time.sleep(args.delay) print() print("=" * 50) print(f"Done. Total fetched: {total_fetched}") print(f" Inserted: {total_inserted}") print(f" Duplicates: {total_duplicates}") print("=" * 50) if __name__ == "__main__": main()