"""SyncFeed-based motion content enrichment. Walks four SyncFeed entity types (Besluit, Zaak, Document, DocumentVersie), joins them in memory to map each motion's besluit_id to a Zaak.Onderwerp title and an ExterneIdentifier, then fetches body text from officielebekendmakingen.nl and updates the motions table. Usage: .venv/bin/python scripts/sync_motion_content.py --db-path data/motions.db """ import argparse import logging import xml.etree.ElementTree as ET from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Dict, Iterator, List, Optional, Tuple try: import duckdb except Exception: # pragma: no cover - environment may not have duckdb installed duckdb = None import requests import time import re _logger = logging.getLogger(__name__) # Namespaces ATOM_NS = "http://www.w3.org/2005/Atom" NS_TK = "http://www.tweedekamer.nl/xsd/tkData/v1-0" SYNCFEED_BASE = "https://gegevensmagazijn.tweedekamer.nl/SyncFeed/2.0/Feed" BODY_TEXT_BASE = "https://zoek.officielebekendmakingen.nl/{ext_id}.html" # Default number of concurrent body fetch workers MAX_BODY_WORKERS = 10 # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _local(tag: str) -> str: """Strip XML namespace from a tag name.""" return tag.split("}", 1)[1] if tag.startswith("{") else tag def _is_deleted(element: ET.Element) -> bool: return element.attrib.get(f"{{{NS_TK}}}verwijderd", "false").lower() == "true" # --------------------------------------------------------------------------- # Parsers (accept ET.Element; public API also accepts XML string for tests) # --------------------------------------------------------------------------- def parse_besluit(element) -> Dict: """Parse a Besluit element (ET.Element or XML string). Returns dict with: id, verwijderd, zaak_refs (list of uuid strings). """ if isinstance(element, str): element = ET.fromstring(element) return { "id": element.attrib.get("id"), "verwijderd": _is_deleted(element), "zaak_refs": [ c.attrib["ref"] for c in element if _local(c.tag).lower() == "zaak" and "ref" in c.attrib ], } def parse_zaak(element) -> Dict: """Parse a Zaak element (ET.Element or XML string). Returns dict with: id, verwijderd, onderwerp, soort. """ if isinstance(element, str): element = ET.fromstring(element) children = {_local(c.tag).lower(): (c.text or "").strip() for c in element} return { "id": element.attrib.get("id"), "verwijderd": _is_deleted(element), "onderwerp": children.get("onderwerp"), "soort": children.get("soort"), } def parse_document(element) -> Dict: """Parse a Document element (ET.Element or XML string). Returns dict with: id, verwijderd, zaak_refs. """ if isinstance(element, str): element = ET.fromstring(element) return { "id": element.attrib.get("id"), "verwijderd": _is_deleted(element), "zaak_refs": [ c.attrib["ref"] for c in element if _local(c.tag).lower() == "zaak" and "ref" in c.attrib ], } def parse_documentversie(element) -> Dict: """Parse a DocumentVersie element (ET.Element or XML string). Returns dict with: id, verwijderd, document_id, externe_identifier, extensie. """ if isinstance(element, str): element = ET.fromstring(element) children = {_local(c.tag).lower(): c for c in element} return { "id": element.attrib.get("id"), "verwijderd": _is_deleted(element), "document_id": ( children["document"].attrib.get("ref") if "document" in children else None ), "externe_identifier": ( (children["externeidentifier"].text or "").strip() if "externeidentifier" in children else None ), "extensie": ( (children["extensie"].text or "").strip() if "extensie" in children else None ), } # --------------------------------------------------------------------------- # Join builders (pure in-memory; tested without HTTP) # --------------------------------------------------------------------------- def build_title_map( besluit_index: Dict[str, Dict], zaak_index: Dict[str, Dict], ) -> Dict[str, str]: """Map besluit_id -> Zaak.onderwerp, preferring soort == 'Motie'.""" out: Dict[str, str] = {} for besluit_id, b in besluit_index.items(): chosen = None for zid in b.get("zaak_refs", []): z = zaak_index.get(zid) if not z: continue if z.get("soort", "").lower() == "motie": chosen = z break if chosen is None: chosen = z if chosen and chosen.get("onderwerp"): out[besluit_id] = chosen["onderwerp"] return out def build_ext_id_map( besluit_index: Dict[str, Dict], zaak_index: Dict[str, Dict], doc_index: Dict[str, Dict], docversie_index: Dict[str, Dict], ) -> Dict[str, str]: """Map besluit_id -> externe_identifier by following document → zaak links.""" # document_id -> externe_identifier (prefer html extension) doc_to_ext: Dict[str, str] = {} for dv in docversie_index.values(): ext = dv.get("externe_identifier") doc_id = dv.get("document_id") if ext and doc_id: # prefer html over pdf when both exist existing = doc_to_ext.get(doc_id) if not existing or dv.get("extensie", "").lower() == "html": doc_to_ext[doc_id] = ext # Build zaak_id -> list of doc_ids zaak_to_docs: Dict[str, List[str]] = {} for doc in doc_index.values(): for zid in doc.get("zaak_refs", []): zaak_to_docs.setdefault(zid, []).append(doc["id"]) out: Dict[str, str] = {} for besluit_id, b in besluit_index.items(): found: Optional[str] = None for zid in b.get("zaak_refs", []): for doc_id in zaak_to_docs.get(zid, []): ext = doc_to_ext.get(doc_id) if ext: found = ext break if found: break if found: out[besluit_id] = found return out # --------------------------------------------------------------------------- # HTTP walker # --------------------------------------------------------------------------- def walk_syncfeed( category: str, session: requests.Session, start_skip_token: Optional[int] = None, ) -> Iterator[ET.Element]: """Yield entity ET.Element objects by walking a SyncFeed category.""" url: Optional[str] = SYNCFEED_BASE + f"?category={category}" if start_skip_token: url += f"&skiptoken={start_skip_token}" pages = 0 while url: try: resp = session.get(url, timeout=30) resp.raise_for_status() except Exception as exc: _logger.error("SyncFeed request failed (%s): %s", url, exc) break try: root = ET.fromstring(resp.text) except ET.ParseError as exc: _logger.error("XML parse error for %s: %s", url, exc) break for entry in root.findall(f"{{{ATOM_NS}}}entry"): content = entry.find(f"{{{ATOM_NS}}}content") if content is None: continue for child in content: yield child next_link = root.find(f".//{{{ATOM_NS}}}link[@rel='next']") url = next_link.attrib.get("href") if next_link is not None else None pages += 1 if pages % 50 == 0: _logger.info(" walked %d pages for category=%s", pages, category) _logger.info("Done walking category=%s (%d pages)", category, pages) # --------------------------------------------------------------------------- # Body text fetcher # --------------------------------------------------------------------------- def _extract_motion_text(html: str) -> Optional[str]: """Extract clean motion text from Overheid.nl HTML. Targets