fix(pipeline): fix API pagination, add skip_details fast path, bulk mp_votes insert

- _get_voting_records returns (records, besluit_meta) tuple; paginate via Besluit?expand=Stemming (469/mo vs 8400)
- get_motions(skip_details=True) bypasses per-motion detail chain (3 HTTP calls/motion)
- extract_mp_votes rewritten: bulk DataFrame insert (80k rows in 1.9s), includes party-level actors
- run_pipeline.py fixed: pass db_path not db, handle dict/int return types
- download_past_year.py: skip_details=True default, limit-per-chunk default 50000
main
Sven Geboers 1 month ago
parent f2a831dfcf
commit 847b783877
  1. 168
      api_client.py
  2. 76
      pipeline/extract_mp_votes.py
  3. 20
      pipeline/run_pipeline.py
  4. 112
      scripts/download_past_year.py

@ -21,19 +21,33 @@ class TweedeKamerAPI:
)
def get_motions(
self, start_date: datetime = None, end_date: datetime = None, limit: int = 500
self,
start_date: datetime = None,
end_date: datetime = None,
limit: int = 500,
skip_details: bool = False,
) -> List[Dict]:
"""Get motions with voting results using OData API"""
"""Get motions with voting results using OData API.
Args:
skip_details: If True, skip per-motion detail fetching (Zaak/Document/body text).
Uses BesluitTekst from the Besluit record instead. Much faster for
bulk historical downloads where AI summarisation is not needed.
"""
if not start_date:
start_date = datetime.now() - timedelta(days=730) # 2 years ago
try:
# Get voting records
voting_records = self._get_voting_records(start_date, end_date, limit)
voting_records, besluit_meta = self._get_voting_records(
start_date, end_date, limit
)
print(f"Fetched {len(voting_records)} voting records from API")
# Group by Besluit_Id (decision/motion) and get motion details
motions = self._process_voting_records(voting_records)
motions = self._process_voting_records(
voting_records, besluit_meta, skip_details=skip_details
)
print(f"Processed into {len(motions)} unique motions")
return motions
@ -43,52 +57,109 @@ class TweedeKamerAPI:
return []
def _get_voting_records(
self, start_date: datetime, end_date: datetime = None, limit: int = 500
) -> List[Dict]:
"""Get individual voting records from the API"""
self, start_date: datetime, end_date: datetime = None, limit: int = 50000
) -> tuple:
"""Get individual voting records from the API via Besluit?$expand=Stemming.
Uses Besluit (decisions) with embedded Stemming (votes) to avoid the
expensive per-record pagination of the flat Stemming endpoint.
Only returns Besluit records with StemmingsSoort set (actual votes).
Returns:
(voting_records, besluit_meta) where:
- voting_records: flat list of Stemming dicts with Besluit_Id set
- besluit_meta: dict of besluit_id {title, date, besluit_tekst}
"""
# Format date properly for OData
start_date_str = start_date.strftime("%Y-%m-%d")
filter_query = f"GewijzigdOp ge {start_date_str}T00:00:00Z"
filter_query = (
f"GewijzigdOp ge {start_date_str}T00:00:00Z"
" and StemmingsSoort ne null"
" and Verwijderd eq false"
)
if end_date:
end_date_str = end_date.strftime("%Y-%m-%d")
filter_query += f" and GewijzigdOp le {end_date_str}T23:59:59Z"
# Add filter to exclude deleted records
filter_query += " and Verwijderd eq false"
url = f"{self.odata_base_url}/Stemming"
params = {
page_size = 250 # API caps $top at 250
base_url = f"{self.odata_base_url}/Besluit"
base_params = {
"$filter": filter_query,
"$top": limit,
"$top": page_size,
"$expand": "Stemming",
"$orderby": "GewijzigdOp desc",
}
all_records: List[Dict] = []
besluit_meta: Dict[str, Dict] = {}
skip = 0
try:
response = self.session.get(url, params=params, timeout=config.API_TIMEOUT)
response.raise_for_status()
data = response.json()
while len(besluit_meta) < limit:
params = {**base_params, "$skip": skip}
response = self.session.get(
base_url, params=params, timeout=config.API_TIMEOUT
)
response.raise_for_status()
data = response.json()
voting_records = data.get("value", [])
besluit_page = data.get("value", [])
if not besluit_page:
break
# If we got the maximum, there might be more data
if len(voting_records) == limit:
print(
f"Retrieved maximum {limit} records, there might be more data available"
)
# Flatten: for each Besluit, capture metadata and emit each Stemming record
for besluit in besluit_page:
besluit_id = besluit.get("Id")
if not besluit_id:
continue
return voting_records
date_str = besluit.get("GewijzigdOp", "")
date = date_str.split("T")[0] if date_str else ""
besluit_meta[besluit_id] = {
"title": besluit.get("BesluitTekst")
or f"Besluit {besluit_id[:8]}",
"date": date,
"besluit_tekst": besluit.get("BesluitTekst") or "",
}
for stemming in besluit.get("Stemming", []):
stemming["Besluit_Id"] = besluit_id
all_records.append(stemming)
if len(besluit_page) < page_size:
break # last page
skip += page_size
print(
f"Retrieved {len(all_records)} voting records from {len(besluit_meta)} decisions"
)
return all_records, besluit_meta
except requests.exceptions.RequestException as e:
print(f"API request failed: {e}")
if hasattr(e, "response") and e.response is not None:
print(f"Response status: {e.response.status_code}")
print(f"Response text: {e.response.text[:500]}")
return []
return all_records, besluit_meta # return whatever we got before failure
def _process_voting_records(
self,
records: List[Dict],
besluit_meta: Dict[str, Dict] = None,
skip_details: bool = False,
) -> List[Dict]:
"""Process individual voting records into grouped motions.
def _process_voting_records(self, records: List[Dict]) -> List[Dict]:
"""Process individual voting records into grouped motions"""
Args:
records: Flat Stemming records, each with Besluit_Id set.
besluit_meta: Pre-fetched dict of besluit_id {title, date, besluit_tekst}.
If provided and skip_details=True, avoids per-motion HTTP calls.
skip_details: Skip fetching Zaak/Document/body text per motion.
"""
if besluit_meta is None:
besluit_meta = {}
# Group records by Besluit_Id (decision/motion)
motion_groups = defaultdict(
@ -146,18 +217,41 @@ class TweedeKamerAPI:
if len(motion_data["votes"]) < 3: # Skip motions with too few votes
continue
# Get motion details
motion_details = self._get_motion_details(besluit_id)
if not motion_details:
# Create basic motion data if we can't get details
# Get motion details — use pre-fetched meta if skip_details=True
if skip_details and besluit_id in besluit_meta:
meta = besluit_meta[besluit_id]
motion_details = {
"title": f"Motion {besluit_id[:8]}",
"description": "No description available",
"date": motion_data["latest_date"].split("T")[0]
if motion_data["latest_date"]
else datetime.now().strftime("%Y-%m-%d"),
"title": meta["title"],
"description": meta["besluit_tekst"] or meta["title"],
"date": meta["date"],
"externe_identifier": None,
"body_text": None,
}
else:
motion_details = self._get_motion_details(besluit_id)
if not motion_details:
# Fall back to besluit_meta if available, else generic placeholder
if besluit_id in besluit_meta:
meta = besluit_meta[besluit_id]
motion_details = {
"title": meta["title"],
"description": meta["besluit_tekst"] or meta["title"],
"date": meta["date"],
"externe_identifier": None,
"body_text": None,
}
else:
latest = motion_data["latest_date"] or ""
motion_details = {
"title": f"Motion {besluit_id[:8]}",
"description": "No description available",
"date": latest.split("T")[0]
if latest
else datetime.now().strftime("%Y-%m-%d"),
"externe_identifier": None,
"body_text": None,
}
# Calculate winning margin
voting_results = motion_data["votes"]

@ -3,6 +3,7 @@ import logging
from typing import Optional
import duckdb
import pandas as pd
from database import MotionDatabase
@ -13,6 +14,11 @@ def extract_mp_votes(db_path: Optional[str] = None, limit: Optional[int] = None)
"""Extract individual MP votes from motions.voting_results and store them
in the mp_votes table.
Handles both individual MP votes (ActorNaam contains comma) and party-level
votes (ActorNaam has no comma) by treating party names as actors.
Uses a single DuckDB connection with DataFrame bulk insert for performance.
Returns a dict with summary counts:
- motions_scanned: number of motions inspected
- mp_rows_inserted: number of mp_votes rows inserted
@ -31,42 +37,56 @@ def extract_mp_votes(db_path: Optional[str] = None, limit: Optional[int] = None)
rows = conn.execute(
"SELECT id, voting_results, date FROM motions"
).fetchall()
finally:
except Exception as e:
conn.close()
raise e
mp_rows_inserted = 0
motions_skipped = 0
motions_scanned = 0
motions_skipped = 0
batch = []
for motion_id, voting_results_json, date in rows:
motions_scanned += 1
try:
if db.mp_votes_exists_for_motion(motion_id):
_logger.debug(
"Skipping motion %s because mp_votes already exist", motion_id
)
motions_skipped += 1
continue
# voting_results may be stored as JSON text or as native JSON; ensure it's a dict
if isinstance(voting_results_json, str):
voting_results = json.loads(voting_results_json)
else:
voting_results = voting_results_json
for actor, vote in (voting_results or {}).items():
# Individual MP names contain a comma (e.g. "Last, F.")
if "," not in actor:
continue
inserted_id = db.insert_mp_vote(
motion_id=motion_id, mp_name=actor, vote=vote, date=date, party=None
)
if inserted_id and inserted_id > 0:
mp_rows_inserted += 1
# Check if mp_votes already exist for this motion
existing = conn.execute(
"SELECT COUNT(*) FROM mp_votes WHERE motion_id = ?", (motion_id,)
).fetchone()
if existing and existing[0] > 0:
_logger.debug("Skipping motion %s, mp_votes already exist", motion_id)
motions_skipped += 1
continue
# voting_results may be stored as JSON text or as native JSON; ensure it's a dict
if isinstance(voting_results_json, str):
voting_results = json.loads(voting_results_json)
else:
voting_results = voting_results_json
for actor, vote in (voting_results or {}).items():
# Individual MP names contain a comma (e.g. "Last, F.")
# Party names have no comma (e.g. "VVD", "GroenLinks-PvdA")
party = None if "," in actor else actor
batch.append((motion_id, actor, party, vote, str(date) if date else None))
# Bulk insert via DataFrame for performance (avoids per-row connection overhead)
mp_rows_inserted = 0
if batch:
try:
df = pd.DataFrame(
batch, columns=["motion_id", "mp_name", "party", "vote", "date"]
)
conn.execute(
"INSERT INTO mp_votes (motion_id, mp_name, party, vote, date) SELECT * FROM df"
)
mp_rows_inserted = len(batch)
_logger.info("Bulk inserted %d mp_votes rows", mp_rows_inserted)
except Exception as e:
_logger.error("Error processing motion %s: %s", motion_id, e)
_logger.error("Bulk insert failed: %s", e)
else:
_logger.info("No new mp_votes rows to insert")
conn.close()
return {
"motions_scanned": motions_scanned,

@ -115,8 +115,8 @@ def run(args: argparse.Namespace) -> int:
if not dry_run:
from pipeline.fetch_mp_metadata import fetch_mp_metadata
fetched, skipped = fetch_mp_metadata(db)
_logger.info(" mp_metadata: fetched=%d skipped=%d", fetched, skipped)
n = fetch_mp_metadata(db_path=db.db_path)
_logger.info(" mp_metadata: processed=%d", n)
else:
_logger.info(" [dry-run] would call fetch_mp_metadata(db)")
else:
@ -128,9 +128,12 @@ def run(args: argparse.Namespace) -> int:
if not dry_run:
from pipeline.extract_mp_votes import extract_mp_votes
inserted, skipped = extract_mp_votes(db)
result = extract_mp_votes(db_path=db.db_path)
_logger.info(
" mp_votes: inserted=%d motions skipped=%d", inserted, skipped
" mp_votes: inserted=%d motions_scanned=%d skipped=%d",
result["mp_rows_inserted"],
result["motions_scanned"],
result["motions_skipped"],
)
else:
_logger.info(" [dry-run] would call extract_mp_votes(db)")
@ -199,11 +202,12 @@ def run(args: argparse.Namespace) -> int:
model=args.text_model,
)
_logger.info(
" window %s: fused=%d skipped_no_svd=%d skipped_no_text=%d",
" window %s: fused=%d skipped_no_svd=%d skipped_no_text=%d errors=%d",
window_id,
result["fused"],
result.get("skipped_no_svd", 0),
result.get("skipped_no_text", 0),
result.get("inserted", 0),
result.get("skipped_missing_svd", 0),
result.get("skipped_missing_text", 0),
result.get("errors", 0),
)
else:
_logger.info(" [dry-run] would fuse window %s", window_id)

@ -0,0 +1,112 @@
"""download_past_year.py — One-shot data download: past year of parliamentary motions.
Fetches Stemming records from the OData API in quarterly chunks (90-day windows),
stores motions into data/motions.db using MotionDatabase.insert_motion().
Skips AI summarisation this is a raw data fetch for the embedding pipeline.
Usage:
uv run python scripts/download_past_year.py [--db-path data/motions.db] [--days 365]
"""
import argparse
import sys
import time
from datetime import datetime, timedelta
sys.path.insert(0, ".") # run from project root
from api_client import TweedeKamerAPI
from database import MotionDatabase
def main():
parser = argparse.ArgumentParser(description="Download past year of motions")
parser.add_argument("--db-path", default="data/motions.db")
parser.add_argument(
"--days", type=int, default=365, help="How many days back to fetch"
)
parser.add_argument("--chunk-days", type=int, default=90, help="Days per API chunk")
parser.add_argument(
"--limit-per-chunk",
type=int,
default=50000,
help="Max motions (Besluit) per chunk",
)
parser.add_argument(
"--delay", type=float, default=2.0, help="Seconds between chunks"
)
args = parser.parse_args()
api = TweedeKamerAPI()
db = MotionDatabase(args.db_path)
end_date = datetime.now()
start_date = end_date - timedelta(days=args.days)
print(
f"Downloading motions from {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}"
)
print(f"DB: {args.db_path}")
print()
# Test connectivity first
test_url = f"{api.odata_base_url}/Stemming"
r = api.session.get(test_url, params={"$top": 1}, timeout=10)
if r.status_code != 200:
print(f"ERROR: API returned {r.status_code}. Aborting.")
sys.exit(1)
print("✅ API connection OK\n")
chunk_start = start_date
chunk_num = 0
total_fetched = 0
total_inserted = 0
total_duplicates = 0
while chunk_start < end_date:
chunk_end = min(chunk_start + timedelta(days=args.chunk_days), end_date)
chunk_num += 1
label = f"{chunk_start.strftime('%Y-%m-%d')}{chunk_end.strftime('%Y-%m-%d')}"
print(f"[Chunk {chunk_num}] {label}")
try:
motions = api.get_motions(
start_date=chunk_start,
end_date=chunk_end,
limit=args.limit_per_chunk,
skip_details=True,
)
print(f" Fetched {len(motions)} motions")
total_fetched += len(motions)
inserted = 0
duplicates = 0
for m in motions:
if db.insert_motion(m):
inserted += 1
else:
duplicates += 1
total_inserted += inserted
total_duplicates += duplicates
print(f" Inserted {inserted} new | {duplicates} duplicates skipped")
except Exception as e:
print(f" ERROR: {e}")
chunk_start = chunk_end
if chunk_start < end_date:
print(f" Waiting {args.delay}s before next chunk…")
time.sleep(args.delay)
print()
print("=" * 50)
print(f"Done. Total fetched: {total_fetched}")
print(f" Inserted: {total_inserted}")
print(f" Duplicates: {total_duplicates}")
print("=" * 50)
if __name__ == "__main__":
main()
Loading…
Cancel
Save