diff --git a/analysis/right_wing/classify_motions.py b/analysis/right_wing/classify_motions.py new file mode 100644 index 0000000..10946f0 --- /dev/null +++ b/analysis/right_wing/classify_motions.py @@ -0,0 +1,262 @@ +#!/usr/bin/env python3 +"""Hybrid motion classifier: identify right-wing motions via keywords + voting patterns. + +Usage: + uv run python analysis/right_wing/classify_motions.py +""" + +from __future__ import annotations + +import argparse +import json +import logging +import re +import sys +from pathlib import Path +from typing import Any + +import duckdb + +ROOT = Path(__file__).parent.parent.parent.resolve() +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from analysis.config import CANONICAL_LEFT, CANONICAL_RIGHT + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") +logger = logging.getLogger(__name__) + +# Centrist parties for cross-ideological metrics +CANONICAL_CENTRIST = frozenset({"VVD", "D66", "CDA", "NSC", "BBB", "CU"}) + + +def _load_keywords(keywords_path: str) -> tuple[list[str], list[str]]: + """Load right-wing and left-wing keywords from JSON.""" + with open(keywords_path, "r", encoding="utf-8") as f: + data = json.load(f) + right = [item["term"] for item in data.get("right_keywords", [])] + left = [item["term"] for item in data.get("left_keywords", [])] + return right, left + + +def _build_keyword_pattern(keywords: list[str]) -> re.Pattern | None: + """Build case-insensitive whole-word regex from keyword list.""" + if not keywords: + return None + escaped = [re.escape(kw) for kw in keywords] + pattern = r"\b(?:" + "|".join(escaped) + r")\b" + return re.compile(pattern, re.IGNORECASE) + + +def _compute_party_metrics( + motion_votes: dict[str, dict[str, int]], +) -> tuple[float, float, float]: + """Compute right_support, left_opposition, centrist_support for a motion. + + Returns: + (right_support, left_opposition, centrist_support) + Each is a float 0.0-1.0, or None if no relevant parties voted. + """ + + def _support_ratio(votes: dict[str, int], parties: frozenset[str]) -> float | None: + total = 0 + supportive = 0 + for party, pv in votes.items(): + if party not in parties: + continue + tv = pv.get("voor", 0) + pv.get("tegen", 0) + pv.get("afwezig", 0) + if tv == 0: + continue + total += 1 + # For right/centrist, "support" = voor; for left, "opposition" = tegen + if pv.get("voor", 0) / tv >= 0.5: + supportive += 1 + if total == 0: + return None + return supportive / total + + def _opposition_ratio(votes: dict[str, int], parties: frozenset[str]) -> float | None: + total = 0 + opposed = 0 + for party, pv in votes.items(): + if party not in parties: + continue + tv = pv.get("voor", 0) + pv.get("tegen", 0) + pv.get("afwezig", 0) + if tv == 0: + continue + total += 1 + if pv.get("tegen", 0) / tv >= 0.5: + opposed += 1 + if total == 0: + return None + return opposed / total + + right_support = _support_ratio(motion_votes, CANONICAL_RIGHT) + left_opposition = _opposition_ratio(motion_votes, CANONICAL_LEFT) + centrist_support = _support_ratio(motion_votes, CANONICAL_CENTRIST) + return right_support, left_opposition, centrist_support + + +def _match_keywords(text: str, pattern: re.Pattern | None) -> list[str]: + """Return list of matched keywords in text.""" + if pattern is None or not text: + return [] + return pattern.findall(text) + + +def classify_motions( + db_path: str = "data/motions.db", + keywords_path: str = "analysis/right_wing/right_wing_keywords.json", + right_support_threshold: float = 0.60, + left_opposition_threshold: float = 0.40, + require_keywords: bool = True, + keyword_min_matches: int = 1, +) -> dict[str, Any]: + """Classify motions and write results to `right_wing_motions` table. + + Returns stats dict with counts. + """ + db = Path(db_path) + if not db.exists(): + raise FileNotFoundError(f"Database not found: {db}") + + kw_path = Path(keywords_path) + if not kw_path.exists(): + raise FileNotFoundError(f"Keywords file not found: {kw_path}") + + right_kws, left_kws = _load_keywords(str(kw_path)) + right_pattern = _build_keyword_pattern(right_kws) + left_pattern = _build_keyword_pattern(left_kws) + + con = duckdb.connect(str(db)) + try: + # Create output table + con.execute("DROP TABLE IF EXISTS right_wing_motions") + con.execute( + """ + CREATE TABLE right_wing_motions ( + motion_id INTEGER PRIMARY KEY, + year INTEGER, + title VARCHAR, + right_support DOUBLE, + left_opposition DOUBLE, + centrist_support DOUBLE, + right_keyword_matches INTEGER, + left_keyword_matches INTEGER, + classified BOOLEAN + ) + """ + ) + + # Load all motion texts and dates + rows = con.execute( + "SELECT id, title, body_text, date FROM motions" + ).fetchall() + motion_texts = {mid: (title or "") + " " + (body_text or "") for mid, title, body_text, _ in rows} + motion_years = {mid: date.year if date else None for mid, _, _, date in rows} + + # Load party votes + vote_rows = con.execute( + """ + SELECT motion_id, party, vote, COUNT(*) as n + FROM mp_votes + WHERE party IS NOT NULL + GROUP BY motion_id, party, vote + """ + ).fetchall() + + motion_votes: dict[int, dict[str, dict[str, int]]] = {} + for motion_id, party, vote, n in vote_rows: + mv = motion_votes.setdefault(motion_id, {}) + pv = mv.setdefault(party, {"voor": 0, "tegen": 0, "afwezig": 0}) + pv[vote] = pv.get(vote, 0) + n + + classified_count = 0 + total_processed = 0 + + for motion_id, votes in motion_votes.items(): + text = motion_texts.get(motion_id, "") + year = motion_years.get(motion_id) + + right_support, left_opposition, centrist_support = _compute_party_metrics(votes) + + right_kw_matches = len(_match_keywords(text, right_pattern)) + left_kw_matches = len(_match_keywords(text, left_pattern)) + + # Classification logic + passes_votes = ( + right_support is not None + and right_support >= right_support_threshold + and left_opposition is not None + and left_opposition >= left_opposition_threshold + ) + passes_keywords = right_kw_matches >= keyword_min_matches + + is_classified = passes_votes and (not require_keywords or passes_keywords) + + con.execute( + """ + INSERT INTO right_wing_motions + (motion_id, year, title, right_support, left_opposition, centrist_support, + right_keyword_matches, left_keyword_matches, classified) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + motion_id, + year, + motion_texts.get(motion_id, "")[:300], + right_support, + left_opposition, + centrist_support, + right_kw_matches, + left_kw_matches, + is_classified, + ), + ) + total_processed += 1 + if is_classified: + classified_count += 1 + + con.commit() + logger.info( + "Processed %d motions, classified %d as right-wing (%.1f%%)", + total_processed, + classified_count, + 100 * classified_count / total_processed if total_processed else 0, + ) + + return { + "total_processed": total_processed, + "classified": classified_count, + "right_keywords_loaded": len(right_kws), + "left_keywords_loaded": len(left_kws), + } + finally: + con.close() + + +def main() -> int: + parser = argparse.ArgumentParser(description="Classify right-wing motions") + parser.add_argument("--db", default="data/motions.db") + parser.add_argument("--keywords", default="analysis/right_wing/right_wing_keywords.json") + parser.add_argument("--right-threshold", type=float, default=0.60) + parser.add_argument("--left-threshold", type=float, default=0.40) + parser.add_argument("--require-keywords", action="store_true", default=True) + parser.add_argument("--no-require-keywords", dest="require_keywords", action="store_false") + parser.add_argument("--keyword-min-matches", type=int, default=1) + args = parser.parse_args() + + result = classify_motions( + db_path=args.db, + keywords_path=args.keywords, + right_support_threshold=args.right_threshold, + left_opposition_threshold=args.left_threshold, + require_keywords=args.require_keywords, + keyword_min_matches=args.keyword_min_matches, + ) + print(json.dumps(result, indent=2)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())