You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
277 lines
8.7 KiB
277 lines
8.7 KiB
"""Backfill missing mp_votes.party values from mp_metadata and co-voting inference.
|
|
|
|
Multi-tier strategy:
|
|
1) Tussenvoegsel-aware name match against mp_metadata.
|
|
2) Majority party already recorded in mp_votes for the same MP.
|
|
3) Looser last-name-token match against mp_metadata.
|
|
4) Co-voting inference: for MPs still unresolved, find which party's MPs
|
|
they vote identically with most often, using a Jaccard-style overlap.
|
|
|
|
Usage:
|
|
uv run python3 scripts/fill_mp_votes_parties.py --db data/motions.db
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import logging
|
|
import re
|
|
import unicodedata
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
|
|
import duckdb
|
|
|
|
logger = logging.getLogger("fill_mp_votes_parties")
|
|
|
|
|
|
_TUSSENVOEGSEL = {
|
|
"van de",
|
|
"van den",
|
|
"van der",
|
|
"van het",
|
|
"van",
|
|
"de",
|
|
"den",
|
|
"der",
|
|
"het",
|
|
"ter",
|
|
"ten",
|
|
"el",
|
|
"al",
|
|
"in 't",
|
|
}
|
|
|
|
# Build a regex that matches any known tussenvoegsel (longest first to avoid
|
|
# partial matches like "van" eating the "van" in "van der").
|
|
_TV_PATTERN = re.compile(
|
|
r"\b("
|
|
+ "|".join(re.escape(tv) for tv in sorted(_TUSSENVOEGSEL, key=len, reverse=True))
|
|
+ r")\b",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
def normalize_mp_key(name: str) -> str:
|
|
"""Produce a canonical key that matches regardless of tussenvoegsel position.
|
|
|
|
Both "Burg van der, E." (mp_votes style) and "Van der Burg, E."
|
|
(mp_metadata style) should produce the same key. Also strips diacritics
|
|
so "Kostić, I." matches "Kostic, I.".
|
|
|
|
Strategy: split into pre-comma and post-comma parts. From the pre-comma
|
|
part, extract any tussenvoegsel tokens and the remaining lastname.
|
|
Canonical key = "lastname tussenvoegsel initials", all lowercased.
|
|
"""
|
|
if not name:
|
|
return ""
|
|
# Strip diacritics: NFD decompose then drop combining marks
|
|
s = unicodedata.normalize("NFD", name)
|
|
s = "".join(c for c in s if unicodedata.category(c) != "Mn")
|
|
# remove parenthetical fullnames e.g. "(Christine)"
|
|
s = re.sub(r"\s*\(.*?\)", "", s).strip()
|
|
# remove dots and commas for splitting but keep the comma position
|
|
# Split on first comma: last_part, initials_part
|
|
parts = s.split(",", 1)
|
|
last_part = parts[0].strip()
|
|
initials_part = parts[1].strip() if len(parts) > 1 else ""
|
|
|
|
# Clean initials: remove dots
|
|
initials = re.sub(r"\.", "", initials_part).strip().lower()
|
|
|
|
# From last_part, extract tussenvoegsel and lastname
|
|
last_lower = last_part.lower()
|
|
# Find all tussenvoegsel matches
|
|
found_tv = []
|
|
remaining = last_lower
|
|
for m in _TV_PATTERN.finditer(last_lower):
|
|
found_tv.append(m.group(0).lower())
|
|
# Remove tussenvoegsel tokens from remaining to get the pure lastname
|
|
remaining = _TV_PATTERN.sub("", last_lower).strip()
|
|
remaining = re.sub(r"\s+", " ", remaining).strip()
|
|
|
|
# Sort tussenvoegsel to canonical order
|
|
tv_str = " ".join(sorted(found_tv)) if found_tv else ""
|
|
|
|
# Build canonical key: "lastname tv initials"
|
|
key_parts = [remaining]
|
|
if tv_str:
|
|
key_parts.append(tv_str)
|
|
if initials:
|
|
key_parts.append(initials)
|
|
return " ".join(key_parts)
|
|
|
|
|
|
def pick_preferred_party(records: list) -> str | None:
|
|
# records: list of dicts with keys party, van, tot
|
|
# prefer active membership
|
|
for r in records:
|
|
if r.get("tot") is None and r.get("party"):
|
|
return r.get("party")
|
|
# otherwise pick most recent van
|
|
best = None
|
|
best_date = None
|
|
for r in records:
|
|
van = r.get("van")
|
|
try:
|
|
d = datetime.fromisoformat(van).date() if van else None
|
|
except Exception:
|
|
d = None
|
|
if d and (best_date is None or d > best_date):
|
|
best_date = d
|
|
best = r
|
|
if best:
|
|
return best.get("party")
|
|
# fallback to any party present
|
|
for r in records:
|
|
if r.get("party"):
|
|
return r.get("party")
|
|
return None
|
|
|
|
|
|
def _infer_party_by_covoting(conn, mp_name: str, min_overlap: int = 10) -> str | None:
|
|
"""Infer party by finding which known-party MPs vote identically most often.
|
|
|
|
For each motion where *mp_name* voted, find all other MPs who cast the
|
|
same vote AND already have a party assigned. The party with the highest
|
|
agreement count wins, provided the overlap exceeds *min_overlap*.
|
|
"""
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT other.party, COUNT(*) AS agreement
|
|
FROM mp_votes me
|
|
JOIN mp_votes other
|
|
ON me.motion_id = other.motion_id
|
|
AND me.vote = other.vote
|
|
WHERE me.mp_name = ?
|
|
AND other.mp_name != ?
|
|
AND other.party IS NOT NULL
|
|
AND other.party != ''
|
|
AND other.mp_name LIKE '%,%'
|
|
GROUP BY other.party
|
|
ORDER BY agreement DESC
|
|
LIMIT 5
|
|
""",
|
|
(mp_name, mp_name),
|
|
).fetchall()
|
|
if not rows:
|
|
return None
|
|
|
|
best_party, best_count = rows[0]
|
|
if best_count < min_overlap:
|
|
return None
|
|
|
|
# Require meaningful margin over second-best to avoid ambiguous assignment
|
|
if len(rows) > 1:
|
|
second_count = rows[1][1]
|
|
# Best must have at least 20% more agreement than runner-up
|
|
if best_count < second_count * 1.2:
|
|
logger.debug(
|
|
"Co-voting ambiguous for %s: %s=%d vs %s=%d",
|
|
mp_name,
|
|
best_party,
|
|
best_count,
|
|
rows[1][0],
|
|
second_count,
|
|
)
|
|
return None
|
|
|
|
logger.info(
|
|
"Co-voting inferred %s -> %s (agreement=%d)",
|
|
mp_name,
|
|
best_party,
|
|
best_count,
|
|
)
|
|
return best_party
|
|
|
|
|
|
def main(argv=None) -> int:
|
|
p = argparse.ArgumentParser()
|
|
p.add_argument("--db", default="data/motions.db")
|
|
args = p.parse_args(argv)
|
|
|
|
conn = duckdb.connect(args.db)
|
|
|
|
# Load mp_metadata
|
|
md_rows = conn.execute(
|
|
"SELECT mp_name, party, van, tot_en_met FROM mp_metadata"
|
|
).fetchall()
|
|
|
|
metadata = defaultdict(list)
|
|
for mp_name, party, van, tot in md_rows:
|
|
key = normalize_mp_key(mp_name)
|
|
metadata[key].append(
|
|
{"mp_name": mp_name, "party": party, "van": van, "tot": tot}
|
|
)
|
|
|
|
# Build majority-party mapping from existing mp_votes (non-null parties)
|
|
party_counts = defaultdict(lambda: defaultdict(int))
|
|
rows_counts = conn.execute(
|
|
"SELECT mp_name, party, COUNT(*) FROM mp_votes WHERE party IS NOT NULL AND party != '' GROUP BY mp_name, party"
|
|
).fetchall()
|
|
for mp_name, party, cnt in rows_counts:
|
|
key = normalize_mp_key(mp_name)
|
|
party_counts[key][party] += cnt
|
|
|
|
majority_by_norm = {
|
|
k: max(v.items(), key=lambda kv: kv[1])[0] for k, v in party_counts.items()
|
|
}
|
|
|
|
# Target mp_votes rows: individual MPs (contain comma) with NULL or empty party
|
|
target_rows = conn.execute(
|
|
"SELECT id, mp_name FROM mp_votes WHERE (party IS NULL OR party = '') AND mp_name LIKE '%,%'"
|
|
).fetchall()
|
|
|
|
updated = 0
|
|
# Track MPs that need co-voting inference (tier 4) — collect after tiers 1-3
|
|
covote_candidates: dict[str, list[int]] = defaultdict(list) # mp_name -> [ids]
|
|
|
|
for id_, mp_name in target_rows:
|
|
key = normalize_mp_key(mp_name)
|
|
chosen_party = None
|
|
|
|
# 1) exact normalized metadata match
|
|
if key in metadata:
|
|
chosen_party = pick_preferred_party(metadata[key])
|
|
|
|
# 2) fallback to majority observed in mp_votes
|
|
if not chosen_party:
|
|
chosen_party = majority_by_norm.get(key)
|
|
|
|
# 3) try looser substring matches on lastname token
|
|
if not chosen_party:
|
|
tokens = key.split()
|
|
if tokens:
|
|
lastname = tokens[0]
|
|
# find metadata keys that start with lastname
|
|
for meta_key, recs in metadata.items():
|
|
if meta_key.split()[0] == lastname:
|
|
chosen_party = pick_preferred_party(recs)
|
|
if chosen_party:
|
|
break
|
|
|
|
if chosen_party:
|
|
conn.execute(
|
|
"UPDATE mp_votes SET party = ? WHERE id = ?", (chosen_party, id_)
|
|
)
|
|
updated += 1
|
|
else:
|
|
covote_candidates[mp_name].append(id_)
|
|
|
|
# 4) Co-voting inference for remaining unresolved MPs
|
|
for mp_name, ids in covote_candidates.items():
|
|
inferred = _infer_party_by_covoting(conn, mp_name)
|
|
if inferred:
|
|
for id_ in ids:
|
|
conn.execute(
|
|
"UPDATE mp_votes SET party = ? WHERE id = ?", (inferred, id_)
|
|
)
|
|
updated += 1
|
|
|
|
conn.close()
|
|
logger.info("Updated %d mp_votes rows with party info", updated)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|
|
|