You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
motief/api_client.py

498 lines
18 KiB

# api_client.py (complete updated version)
import requests
import json
import re
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from config import config
import time
from collections import defaultdict
class TweedeKamerAPI:
def __init__(self):
self.odata_base_url = "https://gegevensmagazijn.tweedekamer.nl/OData/v4/2.0"
self.session = requests.Session()
self.session.headers.update(
{
"Accept": "application/json",
"User-Agent": "Dutch-Political-Compass-Tool/1.0",
}
)
def get_motions(
self,
start_date: datetime = None,
end_date: datetime = None,
limit: int = 500,
skip_details: bool = False,
) -> List[Dict]:
"""Get motions with voting results using OData API.
Args:
skip_details: If True, skip per-motion detail fetching (Zaak/Document/body text).
Uses BesluitTekst from the Besluit record instead. Much faster for
bulk historical downloads where AI summarisation is not needed.
"""
if not start_date:
start_date = datetime.now() - timedelta(days=730) # 2 years ago
try:
# Get voting records
voting_records, besluit_meta = self._get_voting_records(
start_date, end_date, limit
)
print(f"Fetched {len(voting_records)} voting records from API")
# Group by Besluit_Id (decision/motion) and get motion details
motions = self._process_voting_records(
voting_records, besluit_meta, skip_details=skip_details
)
print(f"Processed into {len(motions)} unique motions")
return motions
except Exception as e:
print(f"Error fetching motions from API: {e}")
return []
def _get_voting_records(
self, start_date: datetime, end_date: datetime = None, limit: int = 50000
) -> tuple:
"""Get individual voting records from the API via Besluit?$expand=Stemming.
Uses Besluit (decisions) with embedded Stemming (votes) to avoid the
expensive per-record pagination of the flat Stemming endpoint.
Only returns Besluit records with StemmingsSoort set (actual votes).
Returns:
(voting_records, besluit_meta) where:
- voting_records: flat list of Stemming dicts with Besluit_Id set
- besluit_meta: dict of besluit_id → {title, date, besluit_tekst}
"""
# Format date properly for OData
start_date_str = start_date.strftime("%Y-%m-%d")
filter_query = (
f"GewijzigdOp ge {start_date_str}T00:00:00Z"
" and StemmingsSoort ne null"
" and Verwijderd eq false"
)
if end_date:
end_date_str = end_date.strftime("%Y-%m-%d")
filter_query += f" and GewijzigdOp le {end_date_str}T23:59:59Z"
page_size = 250 # API caps $top at 250
base_url = f"{self.odata_base_url}/Besluit"
base_params = {
"$filter": filter_query,
"$top": page_size,
"$expand": "Stemming",
"$orderby": "GewijzigdOp desc",
}
all_records: List[Dict] = []
besluit_meta: Dict[str, Dict] = {}
skip = 0
try:
while len(besluit_meta) < limit:
params = {**base_params, "$skip": skip}
response = self.session.get(
base_url, params=params, timeout=config.API_TIMEOUT
)
response.raise_for_status()
data = response.json()
besluit_page = data.get("value", [])
if not besluit_page:
break
# Flatten: for each Besluit, capture metadata and emit each Stemming record
for besluit in besluit_page:
besluit_id = besluit.get("Id")
if not besluit_id:
continue
date_str = besluit.get("GewijzigdOp", "")
date = date_str.split("T")[0] if date_str else ""
besluit_meta[besluit_id] = {
"title": besluit.get("BesluitTekst")
or f"Besluit {besluit_id[:8]}",
"date": date,
"besluit_tekst": besluit.get("BesluitTekst") or "",
}
for stemming in besluit.get("Stemming", []):
stemming["Besluit_Id"] = besluit_id
all_records.append(stemming)
if len(besluit_page) < page_size:
break # last page
skip += page_size
print(
f"Retrieved {len(all_records)} voting records from {len(besluit_meta)} decisions"
)
return all_records, besluit_meta
except requests.exceptions.RequestException as e:
print(f"API request failed: {e}")
if hasattr(e, "response") and e.response is not None:
print(f"Response status: {e.response.status_code}")
print(f"Response text: {e.response.text[:500]}")
return all_records, besluit_meta # return whatever we got before failure
def _process_voting_records(
self,
records: List[Dict],
besluit_meta: Dict[str, Dict] = None,
skip_details: bool = False,
) -> List[Dict]:
"""Process individual voting records into grouped motions.
Args:
records: Flat Stemming records, each with Besluit_Id set.
besluit_meta: Pre-fetched dict of besluit_id → {title, date, besluit_tekst}.
If provided and skip_details=True, avoids per-motion HTTP calls.
skip_details: Skip fetching Zaak/Document/body text per motion.
"""
if besluit_meta is None:
besluit_meta = {}
# Group records by Besluit_Id (decision/motion)
motion_groups = defaultdict(
lambda: {
"votes": {},
"mp_vote_parties": {},
"besluit_id": None,
"latest_date": None,
}
)
for record in records:
besluit_id = record.get("Besluit_Id")
if not besluit_id:
continue
# Extract party and vote information
party_name = record.get("ActorNaam")
# Some records have Soort explicitly set to None; guard against that
vote_type = str(record.get("Soort") or "").lower()
record_date = record.get("GewijzigdOp", "")
if not party_name:
continue
# Map vote types to our format
if vote_type == "voor":
vote = "voor"
elif vote_type == "tegen":
vote = "tegen"
else:
vote = "afwezig"
# Store the vote
motion_groups[besluit_id]["votes"][party_name] = vote
motion_groups[besluit_id]["besluit_id"] = besluit_id
# For individual MPs (ActorNaam contains comma), also capture their party
if "," in party_name:
actor_fractie = record.get("ActorFractie")
if actor_fractie:
motion_groups[besluit_id]["mp_vote_parties"][party_name] = (
actor_fractie
)
# Track the latest date for this motion
if (
not motion_groups[besluit_id]["latest_date"]
or record_date > motion_groups[besluit_id]["latest_date"]
):
motion_groups[besluit_id]["latest_date"] = record_date
# Now get motion details for each unique Besluit_Id
motions = []
for besluit_id, motion_data in motion_groups.items():
if len(motion_data["votes"]) < 3: # Skip motions with too few votes
continue
# Get motion details — use pre-fetched meta if skip_details=True
if skip_details and besluit_id in besluit_meta:
meta = besluit_meta[besluit_id]
motion_details = {
"title": meta["title"],
"description": meta["besluit_tekst"] or meta["title"],
"date": meta["date"],
"externe_identifier": None,
"body_text": None,
}
else:
motion_details = self._get_motion_details(besluit_id)
if not motion_details:
# Fall back to besluit_meta if available, else generic placeholder
if besluit_id in besluit_meta:
meta = besluit_meta[besluit_id]
motion_details = {
"title": meta["title"],
"description": meta["besluit_tekst"] or meta["title"],
"date": meta["date"],
"externe_identifier": None,
"body_text": None,
}
else:
latest = motion_data["latest_date"] or ""
motion_details = {
"title": f"Motion {besluit_id[:8]}",
"description": "No description available",
"date": latest.split("T")[0]
if latest
else datetime.now().strftime("%Y-%m-%d"),
"externe_identifier": None,
"body_text": None,
}
# Calculate winning margin
voting_results = motion_data["votes"]
total_votes = sum(
1 for vote in voting_results.values() if vote in ["voor", "tegen"]
)
if total_votes == 0:
continue
votes_for = sum(1 for vote in voting_results.values() if vote == "voor")
winning_margin = abs(votes_for - (total_votes - votes_for)) / total_votes
motion = {
"title": motion_details["title"],
"description": motion_details["description"],
"date": motion_details["date"],
"policy_area": self._determine_policy_area(
motion_details["title"], motion_details["description"]
),
"voting_results": voting_results,
"mp_vote_parties": motion_data["mp_vote_parties"],
"winning_margin": winning_margin,
"url": f"https://www.tweedekamer.nl/kamerstukken/stemmingsuitslagen/{besluit_id}",
"externe_identifier": motion_details.get("externe_identifier"),
"body_text": motion_details.get("body_text"),
}
motions.append(motion)
return motions
def _get_motion_details(self, besluit_id: str) -> Optional[Dict]:
"""Get motion details from Besluit endpoint.
Fetches Zaak.Onderwerp for the human-readable title, then follows the
Zaak → Document → DocumentVersie chain to get the ExterneIdentifier,
which is used to scrape the full motion body text from
zoek.officielebekendmakingen.nl.
"""
try:
# Step 1: Besluit → Zaak (title) + Zaak.Id for document lookup
url = f"{self.odata_base_url}/Besluit({besluit_id})"
params = {"$expand": "Zaak($select=Id,Onderwerp)"}
response = self.session.get(url, params=params, timeout=config.API_TIMEOUT)
response.raise_for_status()
record = response.json()
zaak_list = record.get("Zaak", [])
onderwerp = None
zaak_id = None
if zaak_list:
onderwerp = zaak_list[0].get("Onderwerp")
zaak_id = zaak_list[0].get("Id")
besluit_tekst = record.get("BesluitTekst") or ""
date_str = record.get("GewijzigdOp", "")
date = (
date_str.split("T")[0]
if date_str
else datetime.now().strftime("%Y-%m-%d")
)
title = onderwerp or f"Motion {besluit_id[:8]}"
description = onderwerp or besluit_tekst or "Geen beschrijving beschikbaar"
# Step 2: Fetch ExterneIdentifier via Zaak → Document → DocumentVersie
externe_identifier = None
body_text = None
if zaak_id:
externe_identifier = self._get_externe_identifier(zaak_id)
if externe_identifier:
body_text = self._fetch_body_text(externe_identifier)
return {
"title": title,
"description": body_text or description,
"date": date,
"externe_identifier": externe_identifier,
"body_text": body_text,
}
except Exception as e:
print(f"Error getting motion details for {besluit_id}: {e}")
return None
def _get_externe_identifier(self, zaak_id: str) -> Optional[str]:
"""Fetch the ExterneIdentifier for the first non-deleted DocumentVersie of a Zaak."""
try:
url = f"{self.odata_base_url}/Zaak({zaak_id})"
params = {
"$expand": "Document($expand=DocumentVersie($select=Id,ExterneIdentifier,Extensie,Verwijderd))"
}
response = self.session.get(url, params=params, timeout=config.API_TIMEOUT)
response.raise_for_status()
data = response.json()
for doc in data.get("Document", []):
for versie in doc.get("DocumentVersie", []):
if versie.get("Verwijderd"):
continue
ext_id = versie.get("ExterneIdentifier")
if ext_id:
return ext_id
except Exception as e:
print(f"Error fetching ExterneIdentifier for zaak {zaak_id}: {e}")
return None
def _fetch_body_text(self, externe_identifier: str) -> Optional[str]:
"""Scrape full motion body text from zoek.officielebekendmakingen.nl."""
try:
url = f"https://zoek.officielebekendmakingen.nl/{externe_identifier}.html"
response = self.session.get(url, timeout=config.API_TIMEOUT)
response.raise_for_status()
html = response.text
# Strip tags
text = re.sub(r"<[^>]+>", " ", html)
text = re.sub(r"&[a-z]+;", " ", text)
text = re.sub(r"\s+", " ", text).strip()
# Find the motion body starting at the first relevant keyword
start_keywords = [
"constaterende",
"overwegende",
"verzoekt",
"spreekt uit",
"roept op",
"de kamer,",
]
start_pos = len(text)
for kw in start_keywords:
pos = text.lower().find(kw)
if pos != -1 and pos < start_pos:
start_pos = pos
if start_pos == len(text):
return None # No motion body found
body = text[start_pos:]
# Trim at end markers
end_markers = [
"gaat over tot de orde van de dag",
"naar boven",
"deze motie is",
"nr.",
]
for marker in end_markers:
pos = body.lower().find(marker)
if pos != -1:
body = body[:pos]
body = body.strip()
return body if len(body) > 50 else None
except Exception as e:
print(f"Error fetching body text for {externe_identifier}: {e}")
return None
def _determine_policy_area(self, title: str, description: str) -> str:
"""Determine policy area from motion title and description"""
text = (title + " " + description).lower()
# Policy area keyword mapping
policy_mapping = {
"Economie": [
"economie",
"belasting",
"budget",
"financiën",
"werkgelegenheid",
"bedrijven",
"economisch",
],
"Klimaat": [
"klimaat",
"co2",
"duurzaam",
"energie",
"milieu",
"uitstoot",
"klimaatverandering",
],
"Immigratie": [
"migratie",
"asiel",
"vreemdeling",
"integratie",
"naturalisatie",
"immigratie",
],
"Zorg": [
"zorg",
"gezondheid",
"ziekenhuis",
"medicijn",
"arts",
"patiënt",
"gezondheidszorg",
],
"Onderwijs": [
"onderwijs",
"school",
"universiteit",
"student",
"leraar",
"educatie",
],
"Defensie": [
"defensie",
"militair",
"veiligheid",
"oorlog",
"leger",
"veiligheidsdienst",
],
}
for area, keywords in policy_mapping.items():
if any(keyword in text for keyword in keywords):
return area
return "Algemeen"
def test_api_connection(self) -> bool:
"""Test if API is accessible"""
try:
url = f"{self.odata_base_url}/Stemming"
params = {"$top": 1}
response = self.session.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
return len(data.get("value", [])) > 0
except Exception as e:
print(f"API connection test failed: {e}")
return False