You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
motief/scripts/download_past_year.py

289 lines
8.9 KiB

"""download_past_year.py — One-shot data download: parliamentary motions for a date range.
Fetches Stemming records from the OData API in chunks (default 90-day windows),
stores motions into data/motions.db using MotionDatabase.batch_insert_motions().
Skips AI summarisation — this is a raw data fetch for the embedding pipeline.
Usage:
uv run python scripts/download_past_year.py [--db-path data/motions.db] [--days 365]
uv run python scripts/download_past_year.py --start-date 2019-01-01 --end-date 2022-01-01
uv run python scripts/download_past_year.py --update-existing --start-date 2016-01-01 --end-date 2018-12-31
"""
import argparse
import sys
import time
from datetime import datetime, timedelta
from typing import Optional, Tuple
from urllib.parse import urlparse
sys.path.insert(0, ".") # run from project root
from api_client import TweedeKamerAPI
from database import MotionDatabase
_STEMMINGSUITSLAGEN_PREFIX = "/kamerstukken/stemmingsuitslagen/"
def extract_besluit_id(url: str) -> Optional[str]:
"""Extract the besluit_id (last path segment) from a motion URL.
Expected format: https://www.tweedekamer.nl/kamerstukken/stemmingsuitslagen/{besluit_id}
Returns None if the URL doesn't match expected format.
"""
if not url:
return None
try:
parsed = urlparse(url)
path = parsed.path.rstrip("/")
if _STEMMINGSUITSLAGEN_PREFIX.rstrip("/") not in path:
return None
# Last path segment is the besluit_id
besluit_id = path.split("/")[-1]
return besluit_id if besluit_id else None
except Exception:
return None
def update_existing_motions(
db_path: str,
api: "TweedeKamerAPI",
start_date: str,
end_date: str,
delay: float = 1.0,
) -> Tuple[int, int]:
"""Backfill body_text for motions that are missing it.
Queries for motions with NULL/empty body_text in the given date range,
extracts besluit_id from the URL, fetches details via the API, and updates
the row.
Args:
db_path: Path to the DuckDB database file.
api: TweedeKamerAPI instance (uses api._get_motion_details).
start_date: Start date string (YYYY-MM-DD).
end_date: End date string (YYYY-MM-DD).
delay: Seconds to wait between API calls.
Returns:
(updated_count, skipped_count) tuple.
"""
import duckdb
# Read motions with missing body_text
conn_read = duckdb.connect(db_path, read_only=True)
rows = conn_read.execute(
"""
SELECT id, url, title, description
FROM motions
WHERE date BETWEEN ? AND ?
AND (body_text IS NULL OR body_text = '')
""",
(start_date, end_date),
).fetchall()
conn_read.close()
updated = 0
skipped = 0
for row in rows:
motion_id, url, title, description = row
besluit_id = extract_besluit_id(url or "")
if not besluit_id:
print(f" Skipping motion {motion_id}: cannot extract besluit_id from URL")
skipped += 1
continue
print(f" Fetching details for motion {motion_id} (besluit_id={besluit_id})...")
details = api._get_motion_details(besluit_id)
if not details or not details.get("body_text"):
print(f" Skipping motion {motion_id}: no body_text returned")
skipped += 1
continue
# Build update: always set body_text; also update title/description if
# they were placeholder values (e.g. "Motion abc12345" or "No description available")
new_body = details["body_text"]
new_title = title
new_desc = description
if title and (title.startswith("Motion ") or title.startswith("Besluit ")):
new_title = details.get("title") or title
if description in (
None,
"",
"No description available",
"Geen beschrijving beschikbaar",
):
new_desc = details.get("description") or description
conn_write = duckdb.connect(db_path, read_only=False)
conn_write.execute(
"""
UPDATE motions
SET body_text = ?, title = ?, description = ?
WHERE id = ?
""",
(new_body, new_title, new_desc, motion_id),
)
conn_write.close()
updated += 1
print(f" Updated motion {motion_id}")
if delay > 0 and updated + skipped < len(rows):
time.sleep(delay)
return updated, skipped
def build_parser() -> argparse.ArgumentParser:
"""Build and return the argument parser for the download script."""
parser = argparse.ArgumentParser(description="Download motions for a date range")
parser.add_argument("--db-path", default="data/motions.db")
parser.add_argument(
"--days",
type=int,
default=365,
help="How many days back to fetch (ignored if --start-date given)",
)
parser.add_argument(
"--start-date",
type=str,
default=None,
help="Explicit start date YYYY-MM-DD (overrides --days)",
)
parser.add_argument(
"--end-date",
type=str,
default=None,
help="Explicit end date YYYY-MM-DD (default: today)",
)
parser.add_argument("--chunk-days", type=int, default=90, help="Days per API chunk")
parser.add_argument(
"--limit-per-chunk",
type=int,
default=50000,
help="Max motions (Besluit) per chunk",
)
parser.add_argument(
"--delay", type=float, default=2.0, help="Seconds between chunks"
)
parser.add_argument(
"--skip-details",
action=argparse.BooleanOptionalAction,
default=True,
help="Skip fetching per-motion details (default: True). Use --no-skip-details to fetch body text.",
)
parser.add_argument(
"--update-existing",
action="store_true",
default=False,
help="Backfill body_text for existing motions with missing text. Skips normal download.",
)
return parser
def main():
parser = build_parser()
args = parser.parse_args()
api = TweedeKamerAPI()
db = MotionDatabase(args.db_path)
end_date = (
datetime.strptime(args.end_date, "%Y-%m-%d")
if args.end_date
else datetime.now()
)
if args.start_date:
start_date = datetime.strptime(args.start_date, "%Y-%m-%d")
else:
start_date = end_date - timedelta(days=args.days)
print(
f"Downloading motions from {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}"
)
print(f"DB: {args.db_path}")
print()
# --- Update-existing mode: backfill body_text, then exit ---
if args.update_existing:
print("Mode: update-existing (backfilling body_text for existing motions)\n")
updated, skipped = update_existing_motions(
db_path=args.db_path,
api=api,
start_date=start_date.strftime("%Y-%m-%d"),
end_date=end_date.strftime("%Y-%m-%d"),
delay=args.delay,
)
print()
print("=" * 50)
print(f"Done. Updated: {updated} | Skipped: {skipped}")
print("=" * 50)
return
# --- Normal download mode ---
# Test connectivity first
test_url = f"{api.odata_base_url}/Stemming"
r = api.session.get(test_url, params={"$top": 1}, timeout=10)
if r.status_code != 200:
print(f"ERROR: API returned {r.status_code}. Aborting.")
sys.exit(1)
print("✅ API connection OK\n")
chunk_start = start_date
chunk_num = 0
total_fetched = 0
total_inserted = 0
total_duplicates = 0
while chunk_start < end_date:
chunk_end = min(chunk_start + timedelta(days=args.chunk_days), end_date)
chunk_num += 1
label = f"{chunk_start.strftime('%Y-%m-%d')}{chunk_end.strftime('%Y-%m-%d')}"
print(f"[Chunk {chunk_num}] {label}")
try:
motions = api.get_motions(
start_date=chunk_start,
end_date=chunk_end,
limit=args.limit_per_chunk,
skip_details=args.skip_details,
)
print(f" Fetched {len(motions)} motions")
total_fetched += len(motions)
inserted = 0
duplicates = 0
inserted, duplicates = db.batch_insert_motions(motions)
total_inserted += inserted
total_duplicates += duplicates
print(f" Inserted {inserted} new | {duplicates} duplicates skipped")
except Exception as e:
print(f" ERROR: {e}")
chunk_start = chunk_end
if chunk_start < end_date:
print(f" Waiting {args.delay}s before next chunk…")
time.sleep(args.delay)
print()
print("=" * 50)
print(f"Done. Total fetched: {total_fetched}")
print(f" Inserted: {total_inserted}")
print(f" Duplicates: {total_duplicates}")
print("=" * 50)
if __name__ == "__main__":
main()