"""Derive a right-wing keyword taxonomy from motion titles using TF-IDF. Identifies motions where canonical right-wing parties vote predominantly 'voor', contrasts them with left-wing control motions, and extracts distinctive terms via differential TF-IDF. Usage: uv run python analysis/right_wing/derive_keywords.py uv run python analysis/right_wing/derive_keywords.py --db data/motions.db """ from __future__ import annotations import argparse import json import logging import re import sys from pathlib import Path from typing import Any import duckdb # Ensure project root is on path for imports ROOT = Path(__file__).parent.parent.parent.resolve() if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from analysis.config import CANONICAL_LEFT, CANONICAL_RIGHT, _PARTY_NORMALIZE logger = logging.getLogger("derive_keywords") logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") # Dutch stopwords — expanded from derive_svd_labels.py DUTCH_STOPWORDS = frozenset( { "de", "het", "een", "van", "en", "in", "is", "dat", "op", "te", "voor", "met", "zijn", "aan", "niet", "om", "ook", "als", "maar", "bij", "door", "over", "naar", "uit", "dan", "was", "worden", "dit", "die", "zou", "kunnen", "moet", "heeft", "hun", "nog", "wel", "meer", "of", "tegen", "onder", "geen", "alle", "zal", "er", "zich", "na", "tot", "omdat", "hoe", "wat", "wie", "waar", "waarom", "kan", "motie", "lid", "leden", "c.s.", "over", "verzoekt", "regering", "kamer", "vaststelling", "begrotingsstaten", "ministerie", "jaar", "voorstel", "wijziging", "amendement", "gewijzigde", "nader", "gewest", "artikel", "eerste", "tweede", "derde", "vierde", "nummer", "nr", "ontvangen", "datum", "voorgesteld", "beraadslaging", "overwegende", "constaterende", "betreffende", "inzake", "tot", "ten", "aanzien", "verzoeken", "besluiten", "kamerstuk", "procedure", "procedurele", "technische", "parlementaire", "parlement", "staten", "generaal", "minister", "ministers", "staatssecretaris", "staatssecretarissen", "kabinet", # Parliamentary procedural terms "gehoord", "uitspreken", "aangenomen", "spreekt", "roept", "verzoekt", "verzoeken", "stelt", "stellen", "besluiten", "overwegende", "constaterende", "ontvangen", "voorgesteld", # Generic function words "gaat", "dag", "mogelijk", "direct", "per", "open", "hoger", "zien", "zetten", "stoppen", "intrekken", "toestand", "land", "orde", "enz", "nota", "gebruik", "gebruikte", "gebruiken", "moeten", "willen", "kunnen", "zullen", "zou", "zouden", "worden", "wordt", "waren", "was", "werd", "werden", "heeft", "hebben", "had", "hadden", # National/generic terms "nederland", "nederlandse", "nederlands", "nationale", "rijks", "financiën", "financieel", "financiële", # Politician names (right-wing) — filter as noise "wilders", "baudet", "haga", "eerdmans", "plas", "kops", "smolders", "vanderplas", "vangaal", "houwelingen", "bontes", "van", "der", "den", "de", "het", "ten", # More pronouns / generic verbs "wij", "we", "jullie", "u", "jou", "jouw", "weer", "terug", "geven", "voeren", "doen", "maken", "komen", "gaan", "staan", "zitten", "liggen", "brengen", "nemen", "laten", "zien", "houden", "vinden", "worden", # More noise "onze", "taak", "stemmen", "box", "openen", "jong", "voornemens", # More politician names "roon", "maeijer", "emiel", "eppink", } ) # Generic parliamentary terms to filter from final keyword list GENERIC_TERMS = frozenset( { "motie", "amendement", "voorstel", "wijziging", "lid", "leden", "kamer", "regering", "ministerie", "minister", "staatssecretaris", "kabinet", "parlement", "parlementaire", "procedure", "technische", "procedurele", "beraadslaging", "vaststelling", "begrotingsstaten", "artikel", "nummer", "nr", "jaar", "datum", "ontvangen", "voorgesteld", "overwegende", "constaterende", "verzoekt", "verzoeken", "besluiten", "c.s.", "gewest", "eerste", "tweede", "derde", "vierde", "kamerstuk", "staten", "generaal", "ministers", "staatssecretarissen", "gewijzigde", "nader", "gewijzigd", # Additional procedural / generic noise "gehoord", "uitspreken", "aangenomen", "spreekt", "roept", "roeptop", "verzoekt", "verzoeken", "besluiten", "stelt", "stellen", "overwegende", "constaterende", "ontvangen", "voorgesteld", "gaat", "dag", "mogelijk", "direct", "per", "open", "hoger", "zien", "zetten", "stoppen", "intrekken", "toestand", "land", "orde", "enz", "nota", "gebruik", "gebruikte", "gebruiken", "nederland", "nederlandse", "nederlands", "nationale", "rijks", "financiën", "financieel", "financiële", "wilders", "baudet", "haga", "eerdmans", "plas", "kops", "smolders", "vanderplas", "vangaal", } ) def _clean_text(text: str) -> str: """Normalize motion text for TF-IDF: lowercase, strip prefixes, remove noise.""" text = text.lower() # Strip motion prefixes aggressively. # Patterns: # "Motie van het lid [Name] c.s. over " # "Motie van het lid [Name] over " # "Motie van de leden [Name] en [Name] over " # "Gewijzigde motie van het lid [Name] (t.v.v. ...) over " # "Amendement van het lid [Name] over " # "Voorstel tot wijziging van ... over " # Use non-greedy match up to "over" or end of prefix. text = re.sub( r"^(?:gewijzigde\s+|nader\s+gewijzigde\s+)?(?:motie|amendement|voorstel)" r"(?:\s+van\s+(?:het\s+lid|de\s+leden)\s+[^()]*?)(?:\s+c\.s\.)?" r"(?:\s+\(t\.v\.v\.[^)]*\))?\s+over\s+", "", text, ) # Fallback for any remaining "van het lid ..." fragments text = re.sub(r"van\s+(?:het\s+lid|de\s+leden)\s+\w+(?:\s+\w+)*\s+(?:c\.s\.)?\s*", " ", text) # Remove parentheticals, punctuation, digits text = re.sub(r"\(.*?\)", " ", text) text = re.sub(r"[^\w\s]", " ", text) text = re.sub(r"\d+", " ", text) # Collapse whitespace text = re.sub(r"\s+", " ", text) return text.strip() def _tokenize(text: str) -> list[str]: """Split cleaned text into tokens, filtering stopwords and short words.""" return [ w for w in text.split() if len(w) > 2 and w not in DUTCH_STOPWORDS ] def _load_party_votes( con: duckdb.DuckDBPyConnection, ) -> dict[int, dict[str, dict[str, int]]]: """Load aggregated party votes per motion. Returns: {motion_id: {party: {'voor': int, 'tegen': int, 'afwezig': int}}} """ rows = con.execute( """ SELECT motion_id, party, vote, COUNT(*) as n FROM mp_votes WHERE party IS NOT NULL GROUP BY motion_id, party, vote """ ).fetchall() result: dict[int, dict[str, dict[str, int]]] = {} for motion_id, party, vote, n in rows: normalized = _PARTY_NORMALIZE.get(party, party) motion_votes = result.setdefault(motion_id, {}) party_votes = motion_votes.setdefault(normalized, {"voor": 0, "tegen": 0, "afwezig": 0}) party_votes[vote] = party_votes.get(vote, 0) + n return result def _compute_group_support( motion_votes: dict[str, dict[str, int]], party_set: frozenset[str], threshold: float = 0.60, ) -> bool: """Return True if >= threshold of parties in party_set voted 'voor'.""" total_parties = 0 supporting_parties = 0 for party, votes in motion_votes.items(): if party not in party_set: continue total_votes = votes["voor"] + votes["tegen"] + votes["afwezig"] if total_votes == 0: continue total_parties += 1 # A party "supports" if majority of its votes are 'voor' if votes["voor"] / total_votes >= threshold: supporting_parties += 1 if total_parties == 0: return False return supporting_parties / total_parties >= threshold def _load_motion_texts(con: duckdb.DuckDBPyConnection) -> dict[int, str]: """Load motion titles keyed by id.""" rows = con.execute("SELECT id, title, body_text FROM motions").fetchall() result = {} for mid, title, body_text in rows: text = title or "" # Optionally append start of body_text if available if body_text: text = text + " " + body_text[:500] result[mid] = text return result def derive_keywords( db_path: str = "data/motions.db", right_threshold: float = 0.60, left_threshold: float = 0.60, top_n: int = 50, min_df: int = 2, max_df_ratio: float = 0.95, ) -> dict[str, Any]: """Derive right-wing keywords via differential TF-IDF. Returns dict with: - right_keywords: list of (term, score) - left_keywords: list of (term, score) - differential: list of (term, diff_score) # right - left - filtered_keywords: final curated list - stats: motion counts per group """ db = Path(db_path) if not db.exists(): raise FileNotFoundError(f"Database not found: {db}") con = duckdb.connect(str(db), read_only=True) try: logger.info("Loading party votes...") party_votes = _load_party_votes(con) logger.info("Loaded votes for %d motions", len(party_votes)) logger.info("Loading motion texts...") motion_texts = _load_motion_texts(con) logger.info("Loaded texts for %d motions", len(motion_texts)) # Classify motions right_motion_ids = [] left_motion_ids = [] unmatched = [] for motion_id, votes in party_votes.items(): if motion_id not in motion_texts: continue is_right = _compute_group_support(votes, CANONICAL_RIGHT, right_threshold) is_left = _compute_group_support(votes, CANONICAL_LEFT, left_threshold) if is_right and not is_left: right_motion_ids.append(motion_id) elif is_left and not is_right: left_motion_ids.append(motion_id) else: unmatched.append(motion_id) logger.info( "Classified: %d right-wing, %d left-wing, %d unmatched", len(right_motion_ids), len(left_motion_ids), len(unmatched), ) if len(right_motion_ids) < 10 or len(left_motion_ids) < 10: raise ValueError( f"Insufficient motions for TF-IDF: right={len(right_motion_ids)}, left={len(left_motion_ids)}" ) # Build corpus right_texts = [_clean_text(motion_texts[mid]) for mid in right_motion_ids] left_texts = [_clean_text(motion_texts[mid]) for mid in left_motion_ids] # Use sklearn TF-IDF try: from sklearn.feature_extraction.text import TfidfVectorizer except ImportError as exc: raise ImportError("sklearn is required. Install with: uv add scikit-learn") from exc vectorizer = TfidfVectorizer( tokenizer=_tokenize, preprocessor=lambda x: x, # already cleaned token_pattern=None, # use tokenizer instead min_df=min_df, max_df=max_df_ratio, sublinear_tf=True, ) all_texts = right_texts + left_texts tfidf_matrix = vectorizer.fit_transform(all_texts) feature_names = vectorizer.get_feature_names_out() # Split matrices right_matrix = tfidf_matrix[: len(right_texts)] left_matrix = tfidf_matrix[len(right_texts) :] # Compute mean TF-IDF per term per group import numpy as np right_mean = np.asarray(right_matrix.mean(axis=0)).flatten() left_mean = np.asarray(left_matrix.mean(axis=0)).flatten() # Differential score: right_mean - left_mean diff_scores = right_mean - left_mean # Sort by differential score term_scores = list(zip(feature_names, diff_scores, right_mean, left_mean)) term_scores.sort(key=lambda x: x[1], reverse=True) # Filter generic terms from top results filtered = [ (term, float(diff), float(rm), float(lm)) for term, diff, rm, lm in term_scores if term not in GENERIC_TERMS and len(term) > 2 ] result = { "right_keywords": [ {"term": t, "diff": d, "right_tfidf": r, "left_tfidf": l} for t, d, r, l in filtered[:top_n] ], "left_keywords": [ {"term": t, "diff": d, "right_tfidf": r, "left_tfidf": l} for t, d, r, l in filtered[-top_n:][::-1] ], "filtered_terms": [t for t, _, _, _ in filtered[:top_n]], "stats": { "right_motions": len(right_motion_ids), "left_motions": len(left_motion_ids), "unmatched_motions": len(unmatched), "total_motions": len(party_votes), }, } return result finally: con.close() def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(description="Derive right-wing keyword taxonomy") parser.add_argument("--db", default="data/motions.db", help="Path to motions.db") parser.add_argument("--output", default="analysis/right_wing/right_wing_keywords.json", help="Output JSON path") parser.add_argument("--top-n", type=int, default=50, help="Number of top keywords to extract") parser.add_argument("--right-threshold", type=float, default=0.60, help="Right-wing support threshold") parser.add_argument("--left-threshold", type=float, default=0.60, help="Left-wing support threshold") args = parser.parse_args(argv) result = derive_keywords( db_path=args.db, right_threshold=args.right_threshold, left_threshold=args.left_threshold, top_n=args.top_n, ) output_path = Path(args.output) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(json.dumps(result, indent=2, ensure_ascii=False)) logger.info("Keywords written to %s", output_path) logger.info("Top 10 right-wing terms: %s", [k["term"] for k in result["right_keywords"][:10]]) return 0 if __name__ == "__main__": raise SystemExit(main())