"""Backfill missing mp_votes.party values from mp_metadata and co-voting inference. Multi-tier strategy: 1) Tussenvoegsel-aware name match against mp_metadata. 2) Majority party already recorded in mp_votes for the same MP. 3) Looser last-name-token match against mp_metadata. 4) Co-voting inference: for MPs still unresolved, find which party's MPs they vote identically with most often, using a Jaccard-style overlap. Usage: uv run python3 scripts/fill_mp_votes_parties.py --db data/motions.db """ from __future__ import annotations import argparse import logging import re import unicodedata from collections import defaultdict from datetime import datetime import duckdb logger = logging.getLogger("fill_mp_votes_parties") _TUSSENVOEGSEL = { "van de", "van den", "van der", "van het", "van", "de", "den", "der", "het", "ter", "ten", "el", "al", "in 't", } # Build a regex that matches any known tussenvoegsel (longest first to avoid # partial matches like "van" eating the "van" in "van der"). _TV_PATTERN = re.compile( r"\b(" + "|".join(re.escape(tv) for tv in sorted(_TUSSENVOEGSEL, key=len, reverse=True)) + r")\b", re.IGNORECASE, ) def normalize_mp_key(name: str) -> str: """Produce a canonical key that matches regardless of tussenvoegsel position. Both "Burg van der, E." (mp_votes style) and "Van der Burg, E." (mp_metadata style) should produce the same key. Also strips diacritics so "Kostić, I." matches "Kostic, I.". Strategy: split into pre-comma and post-comma parts. From the pre-comma part, extract any tussenvoegsel tokens and the remaining lastname. Canonical key = "lastname tussenvoegsel initials", all lowercased. """ if not name: return "" # Strip diacritics: NFD decompose then drop combining marks s = unicodedata.normalize("NFD", name) s = "".join(c for c in s if unicodedata.category(c) != "Mn") # remove parenthetical fullnames e.g. "(Christine)" s = re.sub(r"\s*\(.*?\)", "", s).strip() # remove dots and commas for splitting but keep the comma position # Split on first comma: last_part, initials_part parts = s.split(",", 1) last_part = parts[0].strip() initials_part = parts[1].strip() if len(parts) > 1 else "" # Clean initials: remove dots initials = re.sub(r"\.", "", initials_part).strip().lower() # From last_part, extract tussenvoegsel and lastname last_lower = last_part.lower() # Find all tussenvoegsel matches found_tv = [] remaining = last_lower for m in _TV_PATTERN.finditer(last_lower): found_tv.append(m.group(0).lower()) # Remove tussenvoegsel tokens from remaining to get the pure lastname remaining = _TV_PATTERN.sub("", last_lower).strip() remaining = re.sub(r"\s+", " ", remaining).strip() # Sort tussenvoegsel to canonical order tv_str = " ".join(sorted(found_tv)) if found_tv else "" # Build canonical key: "lastname tv initials" key_parts = [remaining] if tv_str: key_parts.append(tv_str) if initials: key_parts.append(initials) return " ".join(key_parts) def pick_preferred_party(records: list) -> str | None: # records: list of dicts with keys party, van, tot # prefer active membership for r in records: if r.get("tot") is None and r.get("party"): return r.get("party") # otherwise pick most recent van best = None best_date = None for r in records: van = r.get("van") try: d = datetime.fromisoformat(van).date() if van else None except Exception: d = None if d and (best_date is None or d > best_date): best_date = d best = r if best: return best.get("party") # fallback to any party present for r in records: if r.get("party"): return r.get("party") return None def _infer_party_by_covoting(conn, mp_name: str, min_overlap: int = 10) -> str | None: """Infer party by finding which known-party MPs vote identically most often. For each motion where *mp_name* voted, find all other MPs who cast the same vote AND already have a party assigned. The party with the highest agreement count wins, provided the overlap exceeds *min_overlap*. """ rows = conn.execute( """ SELECT other.party, COUNT(*) AS agreement FROM mp_votes me JOIN mp_votes other ON me.motion_id = other.motion_id AND me.vote = other.vote WHERE me.mp_name = ? AND other.mp_name != ? AND other.party IS NOT NULL AND other.party != '' AND other.mp_name LIKE '%,%' GROUP BY other.party ORDER BY agreement DESC LIMIT 5 """, (mp_name, mp_name), ).fetchall() if not rows: return None best_party, best_count = rows[0] if best_count < min_overlap: return None # Require meaningful margin over second-best to avoid ambiguous assignment if len(rows) > 1: second_count = rows[1][1] # Best must have at least 20% more agreement than runner-up if best_count < second_count * 1.2: logger.debug( "Co-voting ambiguous for %s: %s=%d vs %s=%d", mp_name, best_party, best_count, rows[1][0], second_count, ) return None logger.info( "Co-voting inferred %s -> %s (agreement=%d)", mp_name, best_party, best_count, ) return best_party def main(argv=None) -> int: p = argparse.ArgumentParser() p.add_argument("--db", default="data/motions.db") args = p.parse_args(argv) conn = duckdb.connect(args.db) # Load mp_metadata md_rows = conn.execute( "SELECT mp_name, party, van, tot_en_met FROM mp_metadata" ).fetchall() metadata = defaultdict(list) for mp_name, party, van, tot in md_rows: key = normalize_mp_key(mp_name) metadata[key].append( {"mp_name": mp_name, "party": party, "van": van, "tot": tot} ) # Build majority-party mapping from existing mp_votes (non-null parties) party_counts = defaultdict(lambda: defaultdict(int)) rows_counts = conn.execute( "SELECT mp_name, party, COUNT(*) FROM mp_votes WHERE party IS NOT NULL AND party != '' GROUP BY mp_name, party" ).fetchall() for mp_name, party, cnt in rows_counts: key = normalize_mp_key(mp_name) party_counts[key][party] += cnt majority_by_norm = { k: max(v.items(), key=lambda kv: kv[1])[0] for k, v in party_counts.items() } # Target mp_votes rows: individual MPs (contain comma) with NULL or empty party target_rows = conn.execute( "SELECT id, mp_name FROM mp_votes WHERE (party IS NULL OR party = '') AND mp_name LIKE '%,%'" ).fetchall() updated = 0 # Track MPs that need co-voting inference (tier 4) — collect after tiers 1-3 covote_candidates: dict[str, list[int]] = defaultdict(list) # mp_name -> [ids] for id_, mp_name in target_rows: key = normalize_mp_key(mp_name) chosen_party = None # 1) exact normalized metadata match if key in metadata: chosen_party = pick_preferred_party(metadata[key]) # 2) fallback to majority observed in mp_votes if not chosen_party: chosen_party = majority_by_norm.get(key) # 3) try looser substring matches on lastname token if not chosen_party: tokens = key.split() if tokens: lastname = tokens[0] # find metadata keys that start with lastname for meta_key, recs in metadata.items(): if meta_key.split()[0] == lastname: chosen_party = pick_preferred_party(recs) if chosen_party: break if chosen_party: conn.execute( "UPDATE mp_votes SET party = ? WHERE id = ?", (chosen_party, id_) ) updated += 1 else: covote_candidates[mp_name].append(id_) # 4) Co-voting inference for remaining unresolved MPs for mp_name, ids in covote_candidates.items(): inferred = _infer_party_by_covoting(conn, mp_name) if inferred: for id_ in ids: conn.execute( "UPDATE mp_votes SET party = ? WHERE id = ?", (inferred, id_) ) updated += 1 conn.close() logger.info("Updated %d mp_votes rows with party info", updated) return 0 if __name__ == "__main__": raise SystemExit(main())