#!/usr/bin/env python3 """Hybrid motion classifier: identify right-wing motions via keywords + voting patterns. Usage: uv run python analysis/right_wing/classify_motions.py """ from __future__ import annotations import argparse import json import logging import re import sys from pathlib import Path from typing import Any import duckdb ROOT = Path(__file__).parent.parent.parent.resolve() if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from analysis.config import CANONICAL_LEFT, CANONICAL_RIGHT logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger(__name__) # Centrist parties for cross-ideological metrics CANONICAL_CENTRIST = frozenset({"VVD", "D66", "CDA", "NSC", "BBB", "CU"}) def _load_keywords(keywords_path: str) -> tuple[list[str], list[str]]: """Load right-wing and left-wing keywords from JSON.""" with open(keywords_path, "r", encoding="utf-8") as f: data = json.load(f) right = [item["term"] for item in data.get("right_keywords", [])] left = [item["term"] for item in data.get("left_keywords", [])] return right, left def _build_keyword_pattern(keywords: list[str]) -> re.Pattern | None: """Build case-insensitive whole-word regex from keyword list.""" if not keywords: return None escaped = [re.escape(kw) for kw in keywords] pattern = r"\b(?:" + "|".join(escaped) + r")\b" return re.compile(pattern, re.IGNORECASE) def _compute_party_metrics( motion_votes: dict[str, dict[str, int]], ) -> tuple[float, float, float]: """Compute right_support, left_opposition, centrist_support for a motion. Returns: (right_support, left_opposition, centrist_support) Each is a float 0.0-1.0, or None if no relevant parties voted. """ def _support_ratio(votes: dict[str, int], parties: frozenset[str]) -> float | None: total = 0 supportive = 0 for party, pv in votes.items(): if party not in parties: continue tv = pv.get("voor", 0) + pv.get("tegen", 0) + pv.get("afwezig", 0) if tv == 0: continue total += 1 # For right/centrist, "support" = voor; for left, "opposition" = tegen if pv.get("voor", 0) / tv >= 0.5: supportive += 1 if total == 0: return None return supportive / total def _opposition_ratio(votes: dict[str, int], parties: frozenset[str]) -> float | None: total = 0 opposed = 0 for party, pv in votes.items(): if party not in parties: continue tv = pv.get("voor", 0) + pv.get("tegen", 0) + pv.get("afwezig", 0) if tv == 0: continue total += 1 if pv.get("tegen", 0) / tv >= 0.5: opposed += 1 if total == 0: return None return opposed / total right_support = _support_ratio(motion_votes, CANONICAL_RIGHT) left_opposition = _opposition_ratio(motion_votes, CANONICAL_LEFT) centrist_support = _support_ratio(motion_votes, CANONICAL_CENTRIST) return right_support, left_opposition, centrist_support def _match_keywords(text: str, pattern: re.Pattern | None) -> list[str]: """Return list of matched keywords in text.""" if pattern is None or not text: return [] return pattern.findall(text) def classify_motions( db_path: str = "data/motions.db", keywords_path: str = "analysis/right_wing/right_wing_keywords.json", right_support_threshold: float = 0.60, left_opposition_threshold: float = 0.40, require_keywords: bool = True, keyword_min_matches: int = 1, ) -> dict[str, Any]: """Classify motions and write results to `right_wing_motions` table. Returns stats dict with counts. """ db = Path(db_path) if not db.exists(): raise FileNotFoundError(f"Database not found: {db}") kw_path = Path(keywords_path) if not kw_path.exists(): raise FileNotFoundError(f"Keywords file not found: {kw_path}") right_kws, left_kws = _load_keywords(str(kw_path)) right_pattern = _build_keyword_pattern(right_kws) left_pattern = _build_keyword_pattern(left_kws) con = duckdb.connect(str(db)) try: # Create output table con.execute("DROP TABLE IF EXISTS right_wing_motions") con.execute( """ CREATE TABLE right_wing_motions ( motion_id INTEGER PRIMARY KEY, year INTEGER, title VARCHAR, right_support DOUBLE, left_opposition DOUBLE, centrist_support DOUBLE, right_keyword_matches INTEGER, left_keyword_matches INTEGER, classified BOOLEAN ) """ ) # Load all motion texts and dates rows = con.execute( "SELECT id, title, body_text, date FROM motions" ).fetchall() motion_texts = {mid: (title or "") + " " + (body_text or "") for mid, title, body_text, _ in rows} motion_years = {mid: date.year if date else None for mid, _, _, date in rows} # Load party votes vote_rows = con.execute( """ SELECT motion_id, party, vote, COUNT(*) as n FROM mp_votes WHERE party IS NOT NULL GROUP BY motion_id, party, vote """ ).fetchall() motion_votes: dict[int, dict[str, dict[str, int]]] = {} for motion_id, party, vote, n in vote_rows: mv = motion_votes.setdefault(motion_id, {}) pv = mv.setdefault(party, {"voor": 0, "tegen": 0, "afwezig": 0}) pv[vote] = pv.get(vote, 0) + n classified_count = 0 total_processed = 0 for motion_id, votes in motion_votes.items(): text = motion_texts.get(motion_id, "") year = motion_years.get(motion_id) right_support, left_opposition, centrist_support = _compute_party_metrics(votes) right_kw_matches = len(_match_keywords(text, right_pattern)) left_kw_matches = len(_match_keywords(text, left_pattern)) # Classification logic passes_votes = ( right_support is not None and right_support >= right_support_threshold and left_opposition is not None and left_opposition >= left_opposition_threshold ) passes_keywords = right_kw_matches >= keyword_min_matches is_classified = passes_votes and (not require_keywords or passes_keywords) con.execute( """ INSERT INTO right_wing_motions (motion_id, year, title, right_support, left_opposition, centrist_support, right_keyword_matches, left_keyword_matches, classified) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( motion_id, year, motion_texts.get(motion_id, "")[:300], right_support, left_opposition, centrist_support, right_kw_matches, left_kw_matches, is_classified, ), ) total_processed += 1 if is_classified: classified_count += 1 con.commit() logger.info( "Processed %d motions, classified %d as right-wing (%.1f%%)", total_processed, classified_count, 100 * classified_count / total_processed if total_processed else 0, ) return { "total_processed": total_processed, "classified": classified_count, "right_keywords_loaded": len(right_kws), "left_keywords_loaded": len(left_kws), } finally: con.close() def main() -> int: parser = argparse.ArgumentParser(description="Classify right-wing motions") parser.add_argument("--db", default="data/motions.db") parser.add_argument("--keywords", default="analysis/right_wing/right_wing_keywords.json") parser.add_argument("--right-threshold", type=float, default=0.60) parser.add_argument("--left-threshold", type=float, default=0.40) parser.add_argument("--require-keywords", action="store_true", default=True) parser.add_argument("--no-require-keywords", dest="require_keywords", action="store_false") parser.add_argument("--keyword-min-matches", type=int, default=1) args = parser.parse_args() result = classify_motions( db_path=args.db, keywords_path=args.keywords, right_support_threshold=args.right_threshold, left_opposition_threshold=args.left_threshold, require_keywords=args.require_keywords, keyword_min_matches=args.keyword_min_matches, ) print(json.dumps(result, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())