"""SyncFeed-based motion content enrichment. Walks four SyncFeed entity types (Besluit, Zaak, Document, DocumentVersie), joins them in memory to map each motion's besluit_id to a Zaak.Onderwerp title and an ExterneIdentifier, then fetches body text from officielebekendmakingen.nl and updates the motions table. Usage: .venv/bin/python scripts/sync_motion_content.py --db-path data/motions.db """ import argparse import logging import xml.etree.ElementTree as ET from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Dict, Iterator, List, Optional, Tuple try: import duckdb except Exception: # pragma: no cover - environment may not have duckdb installed duckdb = None import requests import time import re _logger = logging.getLogger(__name__) # Namespaces ATOM_NS = "http://www.w3.org/2005/Atom" NS_TK = "http://www.tweedekamer.nl/xsd/tkData/v1-0" SYNCFEED_BASE = "https://gegevensmagazijn.tweedekamer.nl/SyncFeed/2.0/Feed" BODY_TEXT_BASE = "https://zoek.officielebekendmakingen.nl/{ext_id}.html" # Default number of concurrent body fetch workers MAX_BODY_WORKERS = 10 # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _local(tag: str) -> str: """Strip XML namespace from a tag name.""" return tag.split("}", 1)[1] if tag.startswith("{") else tag def _is_deleted(element: ET.Element) -> bool: return element.attrib.get(f"{{{NS_TK}}}verwijderd", "false").lower() == "true" # --------------------------------------------------------------------------- # Parsers (accept ET.Element; public API also accepts XML string for tests) # --------------------------------------------------------------------------- def parse_besluit(element) -> Dict: """Parse a Besluit element (ET.Element or XML string). Returns dict with: id, verwijderd, zaak_refs (list of uuid strings). """ if isinstance(element, str): element = ET.fromstring(element) return { "id": element.attrib.get("id"), "verwijderd": _is_deleted(element), "zaak_refs": [ c.attrib["ref"] for c in element if _local(c.tag).lower() == "zaak" and "ref" in c.attrib ], } def parse_zaak(element) -> Dict: """Parse a Zaak element (ET.Element or XML string). Returns dict with: id, verwijderd, onderwerp, soort. """ if isinstance(element, str): element = ET.fromstring(element) children = {_local(c.tag).lower(): (c.text or "").strip() for c in element} return { "id": element.attrib.get("id"), "verwijderd": _is_deleted(element), "onderwerp": children.get("onderwerp"), "soort": children.get("soort"), } def parse_document(element) -> Dict: """Parse a Document element (ET.Element or XML string). Returns dict with: id, verwijderd, zaak_refs. """ if isinstance(element, str): element = ET.fromstring(element) return { "id": element.attrib.get("id"), "verwijderd": _is_deleted(element), "zaak_refs": [ c.attrib["ref"] for c in element if _local(c.tag).lower() == "zaak" and "ref" in c.attrib ], } def parse_documentversie(element) -> Dict: """Parse a DocumentVersie element (ET.Element or XML string). Returns dict with: id, verwijderd, document_id, externe_identifier, extensie. """ if isinstance(element, str): element = ET.fromstring(element) children = {_local(c.tag).lower(): c for c in element} return { "id": element.attrib.get("id"), "verwijderd": _is_deleted(element), "document_id": ( children["document"].attrib.get("ref") if "document" in children else None ), "externe_identifier": ( (children["externeidentifier"].text or "").strip() if "externeidentifier" in children else None ), "extensie": ( (children["extensie"].text or "").strip() if "extensie" in children else None ), } # --------------------------------------------------------------------------- # Join builders (pure in-memory; tested without HTTP) # --------------------------------------------------------------------------- def build_title_map( besluit_index: Dict[str, Dict], zaak_index: Dict[str, Dict], ) -> Dict[str, str]: """Map besluit_id -> Zaak.onderwerp, preferring soort == 'Motie'.""" out: Dict[str, str] = {} for besluit_id, b in besluit_index.items(): chosen = None for zid in b.get("zaak_refs", []): z = zaak_index.get(zid) if not z: continue if z.get("soort", "").lower() == "motie": chosen = z break if chosen is None: chosen = z if chosen and chosen.get("onderwerp"): out[besluit_id] = chosen["onderwerp"] return out def build_ext_id_map( besluit_index: Dict[str, Dict], zaak_index: Dict[str, Dict], doc_index: Dict[str, Dict], docversie_index: Dict[str, Dict], ) -> Dict[str, str]: """Map besluit_id -> externe_identifier by following document → zaak links.""" # document_id -> externe_identifier (prefer html extension) doc_to_ext: Dict[str, str] = {} for dv in docversie_index.values(): ext = dv.get("externe_identifier") doc_id = dv.get("document_id") if ext and doc_id: # prefer html over pdf when both exist existing = doc_to_ext.get(doc_id) if not existing or dv.get("extensie", "").lower() == "html": doc_to_ext[doc_id] = ext # Build zaak_id -> list of doc_ids zaak_to_docs: Dict[str, List[str]] = {} for doc in doc_index.values(): for zid in doc.get("zaak_refs", []): zaak_to_docs.setdefault(zid, []).append(doc["id"]) out: Dict[str, str] = {} for besluit_id, b in besluit_index.items(): found: Optional[str] = None for zid in b.get("zaak_refs", []): for doc_id in zaak_to_docs.get(zid, []): ext = doc_to_ext.get(doc_id) if ext: found = ext break if found: break if found: out[besluit_id] = found return out # --------------------------------------------------------------------------- # HTTP walker # --------------------------------------------------------------------------- def walk_syncfeed( category: str, session: requests.Session, start_skip_token: Optional[int] = None, ) -> Iterator[ET.Element]: """Yield entity ET.Element objects by walking a SyncFeed category.""" url: Optional[str] = SYNCFEED_BASE + f"?category={category}" if start_skip_token: url += f"&skiptoken={start_skip_token}" pages = 0 while url: try: resp = session.get(url, timeout=30) resp.raise_for_status() except Exception as exc: _logger.error("SyncFeed request failed (%s): %s", url, exc) break try: root = ET.fromstring(resp.text) except ET.ParseError as exc: _logger.error("XML parse error for %s: %s", url, exc) break for entry in root.findall(f"{{{ATOM_NS}}}entry"): content = entry.find(f"{{{ATOM_NS}}}content") if content is None: continue for child in content: yield child next_link = root.find(f".//{{{ATOM_NS}}}link[@rel='next']") url = next_link.attrib.get("href") if next_link is not None else None pages += 1 if pages % 50 == 0: _logger.info(" walked %d pages for category=%s", pages, category) _logger.info("Done walking category=%s (%d pages)", category, pages) # --------------------------------------------------------------------------- # Body text fetcher # --------------------------------------------------------------------------- def _fetch_body_text( ext_id: str, session: requests.Session, retries: int = 3 ) -> Optional[str]: """Fetch plain text body from officielebekendmakingen.nl for ext_id. Retries on network errors and on HTTP 5xx or 429 responses using exponential backoff starting at 0.5s. On permanent failure returns None and records an audit event via database.db.append_audit_event(...). """ import time import re from requests import exceptions as req_exceptions import database url = BODY_TEXT_BASE.format(ext_id=ext_id) attempt = 0 backoff = 0.5 last_exc = None while attempt < retries: attempt += 1 try: resp = session.get(url, timeout=30) # treat 5xx and 429 as transient status = getattr(resp, "status_code", None) if status == 429 or (status is not None and 500 <= status < 600): last_exc = Exception(f"HTTP {status}") raise req_exceptions.RequestException(f"HTTP {status}") resp.raise_for_status() # Very simple text extraction: strip tags text = re.sub(r"<[^>]+>", " ", resp.text) text = re.sub(r"\s+", " ", text).strip() return text[:32_000] if text else None except req_exceptions.RequestException as exc: last_exc = exc # retry for transient errors unless we've exhausted attempts if attempt < retries: _logger.info( "Transient body fetch error for %s (attempt %d/%d): %s; retrying in %.1fs", ext_id, attempt, retries, exc, backoff, ) try: time.sleep(backoff) except Exception: pass backoff *= 2 continue # exhausted retries => permanent failure _logger.warning( "Body text fetch permanently failed for %s: %s", ext_id, exc ) metadata = {"attempts": attempt, "error": str(exc)} try: # MotionDatabase.append_audit_event signature: (actor_id, action, ...) database.db.append_audit_event( None, "body_fetch_failed", target_type="document", target_id=ext_id, metadata=metadata, ) except Exception: _logger.exception( "Failed to write audit event for body fetch failure %s", ext_id ) return None except Exception as exc: # pragma: no cover - unexpected errors _logger.exception( "Unexpected error fetching body text for %s: %s", ext_id, exc ) last_exc = exc break # If we fall through here, ensure audit event is recorded try: database.db.append_audit_event( None, "body_fetch_failed", target_type="document", target_id=ext_id, metadata={"attempts": retries, "error": str(last_exc)}, ) except Exception: _logger.exception( "Failed to write audit event for body fetch failure %s", ext_id ) return None def fetch_body_texts( ext_ids: List[str], session: requests.Session, max_workers: int = MAX_BODY_WORKERS, ) -> Dict[str, Optional[str]]: """Parallel-fetch body texts for a list of externe_identifiers.""" results: Dict[str, Optional[str]] = {} with ThreadPoolExecutor(max_workers=max_workers) as pool: future_to_ext = { pool.submit(_fetch_body_text, ext_id, session): ext_id for ext_id in ext_ids } done = 0 total = len(future_to_ext) for future in as_completed(future_to_ext): ext_id = future_to_ext[future] try: results[ext_id] = future.result() except Exception as exc: _logger.warning("Body text future failed for %s: %s", ext_id, exc) results[ext_id] = None done += 1 if done % 500 == 0: _logger.info(" body text: %d/%d fetched", done, total) return results # --------------------------------------------------------------------------- # DB helpers # --------------------------------------------------------------------------- def _load_besluit_ids(db_path: str) -> Dict[str, int]: """Return {besluit_id: motion_id} for all motions with a besluit_id.""" conn = duckdb.connect(db_path, read_only=True) try: # Check whether the motions table actually has a besluit_id column cols = conn.execute("PRAGMA table_info('motions')").fetchall() col_names = [c[1] for c in cols] if "besluit_id" in col_names: rows = conn.execute( "SELECT besluit_id, id FROM motions WHERE besluit_id IS NOT NULL" ).fetchall() return {r[0]: r[1] for r in rows} # Fallback: many databases store the besluit id in the URL (last path segment). # Try to extract it from the motions.url column. rows = conn.execute( "SELECT id, url FROM motions WHERE url IS NOT NULL" ).fetchall() import re out: Dict[str, int] = {} for mid, url in rows: if not url: continue # naive extraction: last path segment try: seg = url.rstrip("/").split("/")[-1] except Exception: seg = None if not seg: continue # accept UUID-like segments (contain a dash) or reasonably long ids if ("-" in seg and len(seg) >= 8) or re.match(r"^[0-9a-fA-F]{8,}$", seg): out[seg] = int(mid) return out finally: conn.close() def _update_motions( db_path: str, updates: List[Tuple[int, Optional[str], Optional[str], Optional[str]]], ) -> int: """Batch-update motions with (motion_id, title, body_text, externe_identifier). Returns number of rows updated. """ if not updates: return 0 conn = duckdb.connect(db_path) try: updated = 0 for motion_id, title, body_text, ext_id in updates: parts = [] params: List = [] if title is not None: parts.append("title = ?") params.append(title) if body_text is not None: parts.append("body_text = ?") params.append(body_text) if ext_id is not None: parts.append("externe_identifier = ?") params.append(ext_id) if not parts: continue params.append(motion_id) conn.execute(f"UPDATE motions SET {', '.join(parts)} WHERE id = ?", params) updated += 1 conn.commit() return updated finally: conn.close() # --------------------------------------------------------------------------- # Main sync routine # --------------------------------------------------------------------------- def sync_motion_content(db_path: str, skip_body_text: bool = False) -> Dict: """Full sync: walk feeds, join, fetch body texts, update DB. Returns summary dict with counts. """ _logger.info("Loading motion besluit_ids from %s ...", db_path) besluit_to_motion = _load_besluit_ids(db_path) target_besluit_ids = set(besluit_to_motion.keys()) _logger.info("Found %d motions with besluit_id", len(target_besluit_ids)) session = requests.Session() session.headers["Accept"] = "application/xml" # Configure HTTPAdapter with a pool sized to MAX_BODY_WORKERS. Allows # controlling concurrency for body text fetches via --max-body-workers. try: from requests.adapters import HTTPAdapter adapter = HTTPAdapter( pool_connections=MAX_BODY_WORKERS, pool_maxsize=MAX_BODY_WORKERS ) session.mount("https://", adapter) session.mount("http://", adapter) except Exception: _logger.debug("Could not mount HTTPAdapter for connection pooling") # -- Walk Besluit feed (only keep those we care about) -- _logger.info("Walking Besluit feed ...") besluit_index: Dict[str, Dict] = {} for elem in walk_syncfeed("Besluit", session): b = parse_besluit(elem) if b["id"] and b["id"] in target_besluit_ids and not b["verwijderd"]: besluit_index[b["id"]] = b _logger.info("Collected %d relevant Besluit records", len(besluit_index)) # Collect all zaak_ids we need needed_zaak_ids: set = set() for b in besluit_index.values(): needed_zaak_ids.update(b["zaak_refs"]) # -- Walk Zaak feed -- _logger.info("Walking Zaak feed ...") zaak_index: Dict[str, Dict] = {} for elem in walk_syncfeed("Zaak", session): z = parse_zaak(elem) if z["id"] and z["id"] in needed_zaak_ids and not z["verwijderd"]: zaak_index[z["id"]] = z _logger.info("Collected %d Zaak records", len(zaak_index)) # -- Walk Document feed -- _logger.info("Walking Document feed ...") doc_index: Dict[str, Dict] = {} for elem in walk_syncfeed("Document", session): d = parse_document(elem) if d["id"] and not d["verwijderd"]: if any(zid in needed_zaak_ids for zid in d["zaak_refs"]): doc_index[d["id"]] = d needed_doc_ids = set(doc_index.keys()) _logger.info("Collected %d Document records", len(doc_index)) # -- Walk DocumentVersie feed -- _logger.info("Walking DocumentVersie feed ...") docversie_index: Dict[str, Dict] = {} for elem in walk_syncfeed("DocumentVersie", session): dv = parse_documentversie(elem) if ( dv["id"] and not dv["verwijderd"] and dv.get("document_id") in needed_doc_ids ): docversie_index[dv["id"]] = dv _logger.info("Collected %d DocumentVersie records", len(docversie_index)) # -- Build maps -- title_map = build_title_map(besluit_index, zaak_index) ext_id_map = build_ext_id_map(besluit_index, zaak_index, doc_index, docversie_index) _logger.info( "title_map: %d entries, ext_id_map: %d entries", len(title_map), len(ext_id_map) ) # -- Fetch body texts -- body_text_map: Dict[str, Optional[str]] = {} if not skip_body_text: ext_ids_to_fetch = list(set(ext_id_map.values())) _logger.info( "Fetching body texts for %d unique ext_ids ...", len(ext_ids_to_fetch) ) body_text_map = fetch_body_texts(ext_ids_to_fetch, session) _logger.info("Body text fetch complete") # -- Assemble updates -- updates: List[Tuple[int, Optional[str], Optional[str], Optional[str]]] = [] for besluit_id, motion_id in besluit_to_motion.items(): title = title_map.get(besluit_id) ext_id = ext_id_map.get(besluit_id) body_text = body_text_map.get(ext_id) if ext_id else None if title or ext_id or body_text: updates.append((motion_id, title, body_text, ext_id)) _logger.info("Applying %d motion updates to DB ...", len(updates)) updated = _update_motions(db_path, updates) _logger.info("Done. Updated %d motions.", updated) return { "motions_with_besluit_id": len(target_besluit_ids), "besluit_records": len(besluit_index), "zaak_records": len(zaak_index), "document_records": len(doc_index), "docversie_records": len(docversie_index), "title_map_entries": len(title_map), "ext_id_map_entries": len(ext_id_map), "body_texts_fetched": sum(1 for v in body_text_map.values() if v), "motions_updated": updated, } # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def _main(): logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s", ) # allow overriding MAX_BODY_WORKERS from CLI parser = argparse.ArgumentParser(description="Sync motion content from SyncFeed") parser.add_argument("--db-path", required=True, help="Path to motions.db") parser.add_argument( "--skip-body-text", action="store_true", help="Skip fetching body text from officielebekendmakingen.nl", ) parser.add_argument( "--max-body-workers", type=int, default=MAX_BODY_WORKERS, help=f"Maximum concurrent workers for fetching body text (default: {MAX_BODY_WORKERS})", ) # Use a local copy for the default to avoid referencing the name after assignment args = parser.parse_args() # Set module-level MAX_BODY_WORKERS based on CLI try: MAX_BODY_WORKERS = ( int(args.max_body_workers) if args.max_body_workers else MAX_BODY_WORKERS ) except Exception: pass summary = sync_motion_content(args.db_path, skip_body_text=args.skip_body_text) for k, v in summary.items(): print(f" {k}: {v}") if __name__ == "__main__": _main()