"""SyncFeed-based motion content enrichment. Walks four SyncFeed entity types (Besluit, Zaak, Document, DocumentVersie), joins them in memory to map each motion's besluit_id to a Zaak.Onderwerp title and an ExterneIdentifier, then fetches body text from officielebekendmakingen.nl and updates the motions table. Usage: .venv/bin/python scripts/sync_motion_content.py --db-path data/motions.db """ import argparse import logging import xml.etree.ElementTree as ET from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Dict, Iterator, List, Optional, Tuple try: import duckdb except Exception: # pragma: no cover - environment may not have duckdb installed duckdb = None import requests import time import re _logger = logging.getLogger(__name__) # Namespaces ATOM_NS = "http://www.w3.org/2005/Atom" NS_TK = "http://www.tweedekamer.nl/xsd/tkData/v1-0" SYNCFEED_BASE = "https://gegevensmagazijn.tweedekamer.nl/SyncFeed/2.0/Feed" BODY_TEXT_BASE = "https://zoek.officielebekendmakingen.nl/{ext_id}.html" # Default number of concurrent body fetch workers MAX_BODY_WORKERS = 10 # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _local(tag: str) -> str: """Strip XML namespace from a tag name.""" return tag.split("}", 1)[1] if tag.startswith("{") else tag def _is_deleted(element: ET.Element) -> bool: return element.attrib.get(f"{{{NS_TK}}}verwijderd", "false").lower() == "true" # --------------------------------------------------------------------------- # Parsers (accept ET.Element; public API also accepts XML string for tests) # --------------------------------------------------------------------------- def parse_besluit(element) -> Dict: """Parse a Besluit element (ET.Element or XML string). Returns dict with: id, verwijderd, zaak_refs (list of uuid strings). """ if isinstance(element, str): element = ET.fromstring(element) return { "id": element.attrib.get("id"), "verwijderd": _is_deleted(element), "zaak_refs": [ c.attrib["ref"] for c in element if _local(c.tag).lower() == "zaak" and "ref" in c.attrib ], } def parse_zaak(element) -> Dict: """Parse a Zaak element (ET.Element or XML string). Returns dict with: id, verwijderd, onderwerp, soort. """ if isinstance(element, str): element = ET.fromstring(element) children = {_local(c.tag).lower(): (c.text or "").strip() for c in element} return { "id": element.attrib.get("id"), "verwijderd": _is_deleted(element), "onderwerp": children.get("onderwerp"), "soort": children.get("soort"), } def parse_document(element) -> Dict: """Parse a Document element (ET.Element or XML string). Returns dict with: id, verwijderd, zaak_refs. """ if isinstance(element, str): element = ET.fromstring(element) return { "id": element.attrib.get("id"), "verwijderd": _is_deleted(element), "zaak_refs": [ c.attrib["ref"] for c in element if _local(c.tag).lower() == "zaak" and "ref" in c.attrib ], } def parse_documentversie(element) -> Dict: """Parse a DocumentVersie element (ET.Element or XML string). Returns dict with: id, verwijderd, document_id, externe_identifier, extensie. """ if isinstance(element, str): element = ET.fromstring(element) children = {_local(c.tag).lower(): c for c in element} return { "id": element.attrib.get("id"), "verwijderd": _is_deleted(element), "document_id": ( children["document"].attrib.get("ref") if "document" in children else None ), "externe_identifier": ( (children["externeidentifier"].text or "").strip() if "externeidentifier" in children else None ), "extensie": ( (children["extensie"].text or "").strip() if "extensie" in children else None ), } # --------------------------------------------------------------------------- # Join builders (pure in-memory; tested without HTTP) # --------------------------------------------------------------------------- def build_title_map( besluit_index: Dict[str, Dict], zaak_index: Dict[str, Dict], ) -> Dict[str, str]: """Map besluit_id -> Zaak.onderwerp, preferring soort == 'Motie'.""" out: Dict[str, str] = {} for besluit_id, b in besluit_index.items(): chosen = None for zid in b.get("zaak_refs", []): z = zaak_index.get(zid) if not z: continue if z.get("soort", "").lower() == "motie": chosen = z break if chosen is None: chosen = z if chosen and chosen.get("onderwerp"): out[besluit_id] = chosen["onderwerp"] return out def build_ext_id_map( besluit_index: Dict[str, Dict], zaak_index: Dict[str, Dict], doc_index: Dict[str, Dict], docversie_index: Dict[str, Dict], ) -> Dict[str, str]: """Map besluit_id -> externe_identifier by following document → zaak links.""" # document_id -> externe_identifier (prefer html extension) doc_to_ext: Dict[str, str] = {} for dv in docversie_index.values(): ext = dv.get("externe_identifier") doc_id = dv.get("document_id") if ext and doc_id: # prefer html over pdf when both exist existing = doc_to_ext.get(doc_id) if not existing or dv.get("extensie", "").lower() == "html": doc_to_ext[doc_id] = ext # Build zaak_id -> list of doc_ids zaak_to_docs: Dict[str, List[str]] = {} for doc in doc_index.values(): for zid in doc.get("zaak_refs", []): zaak_to_docs.setdefault(zid, []).append(doc["id"]) out: Dict[str, str] = {} for besluit_id, b in besluit_index.items(): found: Optional[str] = None for zid in b.get("zaak_refs", []): for doc_id in zaak_to_docs.get(zid, []): ext = doc_to_ext.get(doc_id) if ext: found = ext break if found: break if found: out[besluit_id] = found return out # --------------------------------------------------------------------------- # HTTP walker # --------------------------------------------------------------------------- def walk_syncfeed( category: str, session: requests.Session, start_skip_token: Optional[int] = None, ) -> Iterator[ET.Element]: """Yield entity ET.Element objects by walking a SyncFeed category.""" url: Optional[str] = SYNCFEED_BASE + f"?category={category}" if start_skip_token: url += f"&skiptoken={start_skip_token}" pages = 0 while url: try: resp = session.get(url, timeout=30) resp.raise_for_status() except Exception as exc: _logger.error("SyncFeed request failed (%s): %s", url, exc) break try: root = ET.fromstring(resp.text) except ET.ParseError as exc: _logger.error("XML parse error for %s: %s", url, exc) break for entry in root.findall(f"{{{ATOM_NS}}}entry"): content = entry.find(f"{{{ATOM_NS}}}content") if content is None: continue for child in content: yield child next_link = root.find(f".//{{{ATOM_NS}}}link[@rel='next']") url = next_link.attrib.get("href") if next_link is not None else None pages += 1 if pages % 50 == 0: _logger.info(" walked %d pages for category=%s", pages, category) _logger.info("Done walking category=%s (%d pages)", category, pages) # --------------------------------------------------------------------------- # Body text fetcher # --------------------------------------------------------------------------- def _extract_motion_text(html: str) -> Optional[str]: """Extract clean motion text from Overheid.nl HTML. Targets
which contains the actual kamerstuk body. Falls back to
if broodtekst is absent. Returns plain text with normalised whitespace, capped at 32 000 chars. """ from bs4 import BeautifulSoup soup = BeautifulSoup(html, "lxml") # Primary target: the kamerstuk body div node = soup.find("div", id="broodtekst") if node is None: # Fallback: main content area node = soup.find("div", id="content") if node is None: # Last resort: whole
if present node = soup.find("article") if node is None: # Final fallback: strip all tags from the full body node = soup.body or soup text = node.get_text(separator=" ") # Collapse whitespace text = re.sub(r"\s+", " ", text).strip() return text[:32_000] if text else None def _fetch_body_text( ext_id: str, session: requests.Session, retries: int = 3 ) -> Optional[str]: """Fetch plain motion text from officielebekendmakingen.nl for ext_id. Uses BeautifulSoup to extract only the
element, avoiding JavaScript, navigation, and cookie-banner noise. Retries on network errors and HTTP 5xx / 429 with exponential backoff starting at 0.5 s. Returns None on permanent failure. """ from requests import exceptions as req_exceptions url = BODY_TEXT_BASE.format(ext_id=ext_id) attempt = 0 backoff = 0.5 last_exc: Optional[Exception] = None while attempt < retries: attempt += 1 try: resp = session.get(url, timeout=30) status = getattr(resp, "status_code", None) if status == 429 or (status is not None and 500 <= status < 600): last_exc = Exception(f"HTTP {status}") raise req_exceptions.RequestException(f"HTTP {status}") resp.raise_for_status() text = _extract_motion_text(resp.text) return text except req_exceptions.RequestException as exc: last_exc = exc if attempt < retries: _logger.info( "Transient body fetch error for %s (attempt %d/%d): %s; retrying in %.1fs", ext_id, attempt, retries, exc, backoff, ) time.sleep(backoff) backoff *= 2 continue _logger.warning( "Body text fetch permanently failed for %s: %s", ext_id, exc ) try: import database database.db.append_audit_event( None, "body_fetch_failed", target_type="document", target_id=ext_id, metadata={"attempts": attempt, "error": str(exc)}, ) except Exception: pass return None except Exception as exc: # pragma: no cover _logger.exception( "Unexpected error fetching body text for %s: %s", ext_id, exc ) last_exc = exc break return None def fetch_body_texts( ext_ids: List[str], session: requests.Session, max_workers: int = MAX_BODY_WORKERS, ) -> Dict[str, Optional[str]]: """Parallel-fetch body texts for a list of externe_identifiers.""" results: Dict[str, Optional[str]] = {} with ThreadPoolExecutor(max_workers=max_workers) as pool: future_to_ext = { pool.submit(_fetch_body_text, ext_id, session): ext_id for ext_id in ext_ids } done = 0 total = len(future_to_ext) for future in as_completed(future_to_ext): ext_id = future_to_ext[future] try: results[ext_id] = future.result() except Exception as exc: _logger.warning("Body text future failed for %s: %s", ext_id, exc) results[ext_id] = None done += 1 if done % 500 == 0: _logger.info(" body text: %d/%d fetched", done, total) return results # --------------------------------------------------------------------------- # DB helpers # --------------------------------------------------------------------------- def _load_besluit_ids(db_path: str) -> Dict[str, int]: """Return {besluit_id: motion_id} for all motions with a besluit_id.""" conn = duckdb.connect(db_path, read_only=True) try: # Check whether the motions table actually has a besluit_id column cols = conn.execute("PRAGMA table_info('motions')").fetchall() col_names = [c[1] for c in cols] if "besluit_id" in col_names: rows = conn.execute( "SELECT besluit_id, id FROM motions WHERE besluit_id IS NOT NULL" ).fetchall() return {r[0]: r[1] for r in rows} # Fallback: many databases store the besluit id in the URL (last path segment). # Try to extract it from the motions.url column. rows = conn.execute( "SELECT id, url FROM motions WHERE url IS NOT NULL" ).fetchall() import re out: Dict[str, int] = {} for mid, url in rows: if not url: continue # naive extraction: last path segment try: seg = url.rstrip("/").split("/")[-1] except Exception: seg = None if not seg: continue # accept UUID-like segments (contain a dash) or reasonably long ids if ("-" in seg and len(seg) >= 8) or re.match(r"^[0-9a-fA-F]{8,}$", seg): out[seg] = int(mid) return out finally: conn.close() def _update_motions( db_path: str, updates: List[Tuple[int, Optional[str], Optional[str], Optional[str]]], ) -> int: """Batch-update motions with (motion_id, title, body_text, externe_identifier). Returns number of rows updated. """ if not updates: return 0 conn = duckdb.connect(db_path) try: updated = 0 for motion_id, title, body_text, ext_id in updates: parts = [] params: List = [] if title is not None: parts.append("title = ?") params.append(title) if body_text is not None: parts.append("body_text = ?") params.append(body_text) if ext_id is not None: parts.append("externe_identifier = ?") params.append(ext_id) if not parts: continue params.append(motion_id) conn.execute(f"UPDATE motions SET {', '.join(parts)} WHERE id = ?", params) updated += 1 conn.commit() return updated finally: conn.close() # --------------------------------------------------------------------------- # Main sync routine # --------------------------------------------------------------------------- def sync_motion_content(db_path: str, skip_body_text: bool = False) -> Dict: """Full sync: walk feeds, join, fetch body texts, update DB. Returns summary dict with counts. """ _logger.info("Loading motion besluit_ids from %s ...", db_path) besluit_to_motion = _load_besluit_ids(db_path) target_besluit_ids = set(besluit_to_motion.keys()) _logger.info("Found %d motions with besluit_id", len(target_besluit_ids)) session = requests.Session() session.headers["Accept"] = "application/xml" # Configure HTTPAdapter with a pool sized to MAX_BODY_WORKERS. Allows # controlling concurrency for body text fetches via --max-body-workers. try: from requests.adapters import HTTPAdapter adapter = HTTPAdapter( pool_connections=MAX_BODY_WORKERS, pool_maxsize=MAX_BODY_WORKERS ) session.mount("https://", adapter) session.mount("http://", adapter) except Exception: _logger.debug("Could not mount HTTPAdapter for connection pooling") # -- Walk Besluit feed (only keep those we care about) -- _logger.info("Walking Besluit feed ...") besluit_index: Dict[str, Dict] = {} for elem in walk_syncfeed("Besluit", session): b = parse_besluit(elem) if b["id"] and b["id"] in target_besluit_ids and not b["verwijderd"]: besluit_index[b["id"]] = b _logger.info("Collected %d relevant Besluit records", len(besluit_index)) # Collect all zaak_ids we need needed_zaak_ids: set = set() for b in besluit_index.values(): needed_zaak_ids.update(b["zaak_refs"]) # -- Walk Zaak feed -- _logger.info("Walking Zaak feed ...") zaak_index: Dict[str, Dict] = {} for elem in walk_syncfeed("Zaak", session): z = parse_zaak(elem) if z["id"] and z["id"] in needed_zaak_ids and not z["verwijderd"]: zaak_index[z["id"]] = z _logger.info("Collected %d Zaak records", len(zaak_index)) # -- Walk Document feed -- _logger.info("Walking Document feed ...") doc_index: Dict[str, Dict] = {} for elem in walk_syncfeed("Document", session): d = parse_document(elem) if d["id"] and not d["verwijderd"]: if any(zid in needed_zaak_ids for zid in d["zaak_refs"]): doc_index[d["id"]] = d needed_doc_ids = set(doc_index.keys()) _logger.info("Collected %d Document records", len(doc_index)) # -- Walk DocumentVersie feed -- _logger.info("Walking DocumentVersie feed ...") docversie_index: Dict[str, Dict] = {} for elem in walk_syncfeed("DocumentVersie", session): dv = parse_documentversie(elem) if ( dv["id"] and not dv["verwijderd"] and dv.get("document_id") in needed_doc_ids ): docversie_index[dv["id"]] = dv _logger.info("Collected %d DocumentVersie records", len(docversie_index)) # -- Build maps -- title_map = build_title_map(besluit_index, zaak_index) ext_id_map = build_ext_id_map(besluit_index, zaak_index, doc_index, docversie_index) _logger.info( "title_map: %d entries, ext_id_map: %d entries", len(title_map), len(ext_id_map) ) # -- Fetch body texts -- body_text_map: Dict[str, Optional[str]] = {} if not skip_body_text: ext_ids_to_fetch = list(set(ext_id_map.values())) _logger.info( "Fetching body texts for %d unique ext_ids ...", len(ext_ids_to_fetch) ) body_text_map = fetch_body_texts(ext_ids_to_fetch, session) _logger.info("Body text fetch complete") # -- Assemble updates -- updates: List[Tuple[int, Optional[str], Optional[str], Optional[str]]] = [] for besluit_id, motion_id in besluit_to_motion.items(): title = title_map.get(besluit_id) ext_id = ext_id_map.get(besluit_id) body_text = body_text_map.get(ext_id) if ext_id else None if title or ext_id or body_text: updates.append((motion_id, title, body_text, ext_id)) _logger.info("Applying %d motion updates to DB ...", len(updates)) updated = _update_motions(db_path, updates) _logger.info("Done. Updated %d motions.", updated) return { "motions_with_besluit_id": len(target_besluit_ids), "besluit_records": len(besluit_index), "zaak_records": len(zaak_index), "document_records": len(doc_index), "docversie_records": len(docversie_index), "title_map_entries": len(title_map), "ext_id_map_entries": len(ext_id_map), "body_texts_fetched": sum(1 for v in body_text_map.values() if v), "motions_updated": updated, } # --------------------------------------------------------------------------- # Body-only re-scrape (uses stored externe_identifier; no SyncFeed walk needed) # --------------------------------------------------------------------------- def rescrape_body_texts( db_path: str, max_workers: int = MAX_BODY_WORKERS, batch_size: int = 500, ) -> Dict: """Re-fetch and overwrite body_text for every motion that has an externe_identifier. Reads externe_identifier directly from the DB — no SyncFeed walk needed. Fetches in parallel (max_workers threads) and commits in batches of batch_size to limit memory use and provide progress checkpoints. Returns summary dict with counts. """ conn = duckdb.connect(db_path, read_only=True) rows = conn.execute( "SELECT id, externe_identifier FROM motions WHERE externe_identifier IS NOT NULL" ).fetchall() conn.close() total = len(rows) _logger.info( "Re-scraping body_text for %d motions (workers=%d) ...", total, max_workers ) session = requests.Session() session.headers["User-Agent"] = "stemwijzer-scraper/1.0" fetched = 0 failed = 0 committed = 0 # Process in batches so we can commit progress and log along the way for batch_start in range(0, total, batch_size): batch = rows[batch_start : batch_start + batch_size] with ThreadPoolExecutor(max_workers=max_workers) as pool: future_to_row = { pool.submit(_fetch_body_text, ext_id, session): (mid, ext_id) for mid, ext_id in batch } updates: List[Tuple[int, Optional[str], Optional[str], Optional[str]]] = [] for future in as_completed(future_to_row): mid, ext_id = future_to_row[future] try: text = future.result() except Exception as exc: _logger.warning( "Future failed for motion %d (%s): %s", mid, ext_id, exc ) text = None if text: fetched += 1 updates.append((mid, None, text, None)) else: failed += 1 _update_motions(db_path, updates) committed += len(updates) done = batch_start + len(batch) _logger.info( "Progress: %d/%d done — %d fetched, %d failed, %d committed", done, total, fetched, failed, committed, ) _logger.info( "Re-scrape complete. fetched=%d, failed=%d, total=%d", fetched, failed, total ) return {"total": total, "fetched": fetched, "failed": failed} # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def _main(): logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s", ) parser = argparse.ArgumentParser(description="Sync motion content from SyncFeed") parser.add_argument("--db-path", required=True, help="Path to motions.db") parser.add_argument( "--skip-body-text", action="store_true", help="Skip fetching body text from officielebekendmakingen.nl", ) parser.add_argument( "--body-only", action="store_true", help=( "Skip SyncFeed walk; re-fetch and overwrite body_text for all motions " "that already have an externe_identifier in the DB." ), ) parser.add_argument( "--max-body-workers", type=int, default=MAX_BODY_WORKERS, help=f"Maximum concurrent workers for fetching body text (default: {MAX_BODY_WORKERS})", ) args = parser.parse_args() max_workers = args.max_body_workers or MAX_BODY_WORKERS if args.body_only: summary = rescrape_body_texts(args.db_path, max_workers=max_workers) else: summary = sync_motion_content(args.db_path, skip_body_text=args.skip_body_text) for k, v in summary.items(): print(f" {k}: {v}") if __name__ == "__main__": _main()