"""Axis classifier: correlate per-party PCA positions against ideology reference data to assign honest, dynamic labels to political compass axes. Public API: classify_axes(positions_by_window, axes, db_path) -> dict """ import logging from collections import Counter from pathlib import Path from typing import Dict, List, Optional, Tuple import numpy as np import re import json from analysis.svd_labels import get_svd_label, get_fallback_labels _logger = logging.getLogger(__name__) # Module-level caches — loaded once per process lifetime. _ideology_cache: Optional[Dict[str, Dict[str, float]]] = None _coalition_cache: Optional[Dict[str, set]] = None # Correlation threshold above which we consider an axis "explained" by a dimension. _THRESHOLD = 0.65 _LABELS = { "lr": "Verzorgingsstaat–Marktwerking", "eu": "EU-integratie–Nationalisme", "pi": "Populistisch–Institutioneel", "co": "Coalitie–Oppositie", "pc": "Conservatief–Progressief", # When we have no interpretable classifier signal, fall back to the known # SVD component meanings rather than generic "As N" labels. "fallback_x": get_svd_label(1), "fallback_y": get_svd_label(2), } # Module-level helper: map internal/modal labels to user-facing labels. # Remove duplicate lower definition (keep the one at the top) def display_label_for_modal(modal_label: Optional[str], axis: str) -> str: """Return a user-facing axis label for a modal/internal label. Maps numeric fallback names 'As 1' / 'Stempatroon As 1' to the semantic labels from SVD_THEMES. Any other label is returned unchanged. None is treated as the semantic fallback for the axis. """ if modal_label is None: # Fallback to component 1 (x) or 2 (y) comp = 1 if axis == "x" else 2 return get_svd_label(comp) # Map "As 1" / "As 2" to semantic labels if axis == "x" and modal_label in ("As 1", "Stempatroon As 1"): return get_svd_label(1) if axis == "y" and modal_label in ("As 2", "Stempatroon As 2"): return get_svd_label(2) return modal_label _INTERPRETATION_TEMPLATES = { "lr": "De {orientation} as weerspiegelt de economische tegenstelling tussen verzorgingsstaat en marktwerking.", "eu": "De {orientation} as weerspiegelt de tegenstelling tussen EU-integratie/internationalisme en nationalisme/soevereiniteit.", "pi": "De {orientation} as scheidt populistisch-nationalistische partijen van het institutioneel-parlementaire establishment.", "co": ( "De {orientation} as weerspiegelt stemgedrag van coalitie- versus " "oppositiepartijen (r={r:.2f}). Ideologische tegenstellingen zijn minder dominant dit jaar." ), "pc": "De {orientation} as weerspiegelt de progressief-conservatieve tegenstelling.", } # Maps motion-path keyword labels to _INTERPRETATION_TEMPLATES keys. # Labels not present here fall back to "fallback". _MOTION_LABEL_TEMPLATE_KEY: Dict[str, str] = { "Verzorgingsstaat–Marktwerking": "lr", "EU-integratie–Nationalisme": "eu", "Populistisch–Institutioneel": "pi", "Progressief–Conservatief": "pc", } # Simple keyword-based classifier for motion titles (fallback signal) _KEYWORD_THRESHOLD = 0.4 _KEYWORDS: Dict[str, List[str]] = { "Verzorgingsstaat–Marktwerking": [ # economic / welfare state "belasting", "uitkering", "bijstand", "minimumloon", "cao", "vakbond", "bezuiniging", "privatisering", "subsidie", "pensioen", "aow", "zorg", "huur", "woning", "sociaal", "werkloos", "ww", "arbeidsongeschik", "wao", "gemeentefonds", ], "EU-integratie–Nationalisme": [ # EU and international cooperation "europees", "europese", " eu ", "eu-", "verdrag", "intergouvernementeel", "samenwerking", "internationaal", "navo", "nato", " vn ", "vn-", "sancties", "israël", "vluchteling", "asiel", "soevereiniteit", "nationaal", ], "Populistisch–Institutioneel": [ # Populist/nationalist themes "terugsturen", "syrië", "syrier", "grenzen dicht", "remigratie", "eigen volk", "nederland eerst", "corona", "vaccin", "ivermectine", "hydroxychloroquine", "complot", "deep state", "establishment", "elite", "herstelbetaling", "excuses", ], "Progressief–Conservatief": [ # environment "klimaat", "stikstof", "duurzaam", "duurzaamheid", "co2", "energietransitie", "biodiversiteit", # social "euthanasie", "abortus", "lgbtq", "transgender", "diversiteit", "traditi", "gezin", "religie", "geloof", ], } # Pre-compiled regexes for keyword matching. We escape keywords but do NOT add # word-boundaries because some keywords intentionally match substrings # (e.g. 'traditi' matching 'tradities'). re.IGNORECASE makes lowercasing # unnecessary during matching. _KEYWORD_REGEXES: Dict[str, "re.Pattern[str]"] = { cat: re.compile( "|".join(re.escape(kw.strip()) for kw in kws), re.IGNORECASE, ) for cat, kws in _KEYWORDS.items() } def _classify_from_titles(titles: List[str]) -> Tuple[Optional[str], float]: """Classify a list of motion titles into an axis category using keyword matching. Returns (category_label, confidence) where confidence = fraction of titles containing at least one keyword from the winning category. Returns (None, confidence) if confidence is below _KEYWORD_THRESHOLD. """ if not titles: return None, 0.0 counts: Dict[str, int] = {cat: 0 for cat in _KEYWORDS} for title in titles: for cat, rx in _KEYWORD_REGEXES.items(): if rx.search(title): counts[cat] += 1 # Determine the best category, but be deterministic on ties: if more than # one category has the top count, return None to indicate ambiguity. best_count = max(counts.values()) best_cats = [cat for cat, cnt in counts.items() if cnt == best_count] confidence = best_count / len(titles) if len(best_cats) != 1 or confidence < _KEYWORD_THRESHOLD: return None, confidence return best_cats[0], confidence def _load_motion_vectors(db_path: str, window_id: str) -> Dict[int, np.ndarray]: """Load SVD motion vectors for a given window from DuckDB. Returns {motion_id: vector_array}. Returns {} on any error. """ try: import duckdb conn = duckdb.connect(db_path, read_only=True) try: rows = conn.execute( "SELECT entity_id, vector FROM svd_vectors " "WHERE entity_type = 'motion' AND window_id = ?", [window_id], ).fetchall() finally: conn.close() result: Dict[int, np.ndarray] = {} for entity_id, vector_raw in rows: try: mid = int(entity_id) vec = np.array(json.loads(vector_raw), dtype=float) result[mid] = vec except Exception: continue return result except Exception as exc: _logger.debug("Failed to load motion vectors for window %s: %s", window_id, exc) return {} def _project_motions( motion_vecs: Dict[int, np.ndarray], x_axis: np.ndarray, y_axis: np.ndarray, global_mean: np.ndarray, ) -> Dict[int, Tuple[float, float]]: """Project motion vectors onto the PCA axes after centering by global_mean. Returns {motion_id: (x_score, y_score)}. """ try: projections: Dict[int, Tuple[float, float]] = {} for mid, vec in motion_vecs.items(): try: centered = vec - global_mean x_score = float(np.dot(centered, x_axis)) y_score = float(np.dot(centered, y_axis)) projections[mid] = (x_score, y_score) except Exception: continue return projections except Exception as exc: _logger.debug("Failed to project motions: %s", exc) return {} def _top_motion_ids( projections: Dict[int, Tuple[float, float]], axis: str, n: int = 5, ) -> Dict[str, List[int]]: """Return the top-n motion IDs at each pole of the given axis. axis: 'x' or 'y' Returns {'+': [motion_ids], '-': [motion_ids]} (highest positive first, most negative first in the '-' list). """ try: if axis not in ("x", "y"): raise ValueError("axis must be 'x' or 'y'") idx = 0 if axis == "x" else 1 sorted_ids = sorted(projections, key=lambda mid: projections[mid][idx]) neg_ids = sorted_ids[:n] pos_ids = sorted_ids[-n:][::-1] return {"+": pos_ids, "-": neg_ids} except Exception as exc: _logger.debug("Failed to compute top_motion_ids: %s", exc) return {"+": [], "-": []} def _fetch_motion_titles( db_path: str, motion_ids: List[int], ) -> Dict[int, Tuple[str, str]]: """Fetch (title, date) for a list of motion IDs from DuckDB. Returns {motion_id: (title, date_str)}. Missing IDs are omitted. Returns {} on any DB error. """ if not motion_ids: return {} try: import duckdb placeholders = ", ".join("?" for _ in motion_ids) conn = duckdb.connect(db_path, read_only=True) try: rows = conn.execute( f"SELECT id, title, date FROM motions WHERE id IN ({placeholders})", motion_ids, ).fetchall() finally: conn.close() return {int(row[0]): (str(row[1]), str(row[2])) for row in rows} except Exception as exc: _logger.debug("Failed to fetch motion titles: %s", exc) return {} def _load_ideology(csv_path: Path) -> Dict[str, Dict[str, float]]: """Load party ideology scores from CSV. Returns {party_name: {"left_right": float, "progressive": float}}. Returns {} on any error (caller should treat empty as 'skip classification'). """ global _ideology_cache if _ideology_cache is not None: return _ideology_cache result: Dict[str, Dict[str, float]] = {} try: with open(csv_path, encoding="utf-8") as fh: lines = fh.read().splitlines() header = [h.strip() for h in lines[0].split(",")] lr_idx = header.index("left_right") pc_idx = header.index("progressive") for line in lines[1:]: if not line.strip(): continue parts = [p.strip() for p in line.split(",")] if len(parts) <= max(lr_idx, pc_idx): continue result[parts[0]] = { "left_right": float(parts[lr_idx]), "progressive": float(parts[pc_idx]), } except FileNotFoundError: _logger.warning( "party_ideologies.csv not found at %s — axis labels will be generic", csv_path, ) return {} except Exception as exc: _logger.warning("Failed to load party_ideologies.csv: %s", exc) return {} _ideology_cache = result return result def _load_coalition(csv_path: Path) -> Dict[str, set]: """Load coalition membership from CSV. Returns {window_id: set_of_party_names}. Returns {} on any error (coalition dimension will be skipped). """ global _coalition_cache if _coalition_cache is not None: return _coalition_cache result: Dict[str, set] = {} try: with open(csv_path, encoding="utf-8") as fh: lines = fh.read().splitlines() for line in lines[1:]: if not line.strip(): continue parts = [p.strip() for p in line.split(",")] if len(parts) < 2: continue wid, party = parts[0], parts[1] result.setdefault(wid, set()).add(party) except FileNotFoundError: _logger.warning( "coalition_membership.csv not found at %s — coalition axis detection disabled", csv_path, ) return {} except Exception as exc: _logger.warning("Failed to load coalition_membership.csv: %s", exc) return {} _coalition_cache = result return result def _window_year(window_id: str) -> Optional[str]: """Extract year string from window_id. Returns None for 'current_parliament'. '2016' → '2016', '2016-Q3' → '2016'. """ if window_id == "current_parliament": return None return window_id.split("-")[0] def _pearsonr(x: List[float], y: List[float]) -> float: """Pearson r; returns 0.0 for degenerate input (< 3 points or zero variance).""" if len(x) < 3: return 0.0 xa = np.array(x, dtype=float) ya = np.array(y, dtype=float) if xa.std() < 1e-12 or ya.std() < 1e-12: return 0.0 return float(np.corrcoef(xa, ya)[0, 1]) def _assign_label( r_lr: float, r_co: float, r_pc: float, axis: str, ) -> Tuple[str, str, float]: """Assign label, interpretation and quality score for one axis. Priority: left-right > coalition > progressive > fallback. Returns (label, interpretation_string, quality_score). """ orientation = "horizontale" if axis == "x" else "verticale" _x_fallback, _y_fallback = get_fallback_labels() fallback_label = _x_fallback if axis == "x" else _y_fallback quality = max(abs(r_lr), abs(r_co), abs(r_pc)) if abs(r_lr) >= _THRESHOLD: return ( _LABELS["lr"], _INTERPRETATION_TEMPLATES["lr"].format(orientation=orientation), quality, ) if abs(r_co) >= _THRESHOLD: return ( _LABELS["co"], _INTERPRETATION_TEMPLATES["co"].format(orientation=orientation, r=r_co), quality, ) if abs(r_pc) >= _THRESHOLD: return ( _LABELS["pc"], _INTERPRETATION_TEMPLATES["pc"].format(orientation=orientation), quality, ) return ( fallback_label, "", # No interpretation for unclassified axes quality, ) def classify_axes( positions_by_window: Dict[str, Dict[str, Tuple[float, float]]], axes: dict, db_path: str, ) -> dict: """Classify compass axes using motion projection (primary) and ideology CSV (fallback). Motion projection path: - Requires axes["global_mean"], axes["x_axis"], axes["y_axis"]. - Loads motion SVD vectors per window, projects onto PCA axes, ranks top 5+5 motions, applies keyword classifier -> label. Fallback path (unchanged): - Pearson-r against party_ideologies.csv (left_right, progressive). - Pearson-r against coalition_membership.csv dummy. Enriches axes with: x_label, y_label — global modal label across annual windows x_quality, y_quality — {window_id: float} max |r| x_interpretation — {window_id: str} y_interpretation — {window_id: str} x_top_motions, y_top_motions — {window_id: {'+': [(title, date), ...], '-': [...]}} x_label_confidence — {window_id: float} y_label_confidence — {window_id: float} """ data_dir = Path(db_path).parent ideology = _load_ideology(data_dir / "party_ideologies.csv") coalition = _load_coalition(data_dir / "coalition_membership.csv") # Determine whether motion projection is possible. global_mean = axes.get("global_mean") x_axis_arr = np.array(axes.get("x_axis", [])) y_axis_arr = np.array(axes.get("y_axis", [])) motion_path_available = ( global_mean is not None and x_axis_arr.ndim == 1 and x_axis_arr.size > 0 and y_axis_arr.size > 0 ) # If we have neither ideology reference data nor motion vectors available, # there is nothing to classify. Previously an early-exit below could be # shadowed by a nested helper definition causing classify_axes to return # None. Ensure we return the original axes dict in this case. if not ideology and not motion_path_available: return axes x_quality: Dict[str, float] = {} y_quality: Dict[str, float] = {} x_interpretation: Dict[str, str] = {} y_interpretation: Dict[str, str] = {} x_top_motions: Dict[str, Dict] = {} y_top_motions: Dict[str, Dict] = {} x_label_confidence: Dict[str, float] = {} y_label_confidence: Dict[str, float] = {} annual_x_labels: List[str] = [] annual_y_labels: List[str] = [] for wid, pos_dict in positions_by_window.items(): year = _window_year(wid) is_annual = wid != "current_parliament" and "-" not in wid # ── Ideology / coalition Pearson-r (unchanged logic) ────────────────── x_lbl_fallback: Optional[str] = None y_lbl_fallback: Optional[str] = None x_q = 0.0 y_q = 0.0 x_int = "" y_int = "" if ideology: parties = [p for p in pos_dict if p in ideology] if len(parties) >= 5: party_x = [pos_dict[p][0] for p in parties] party_y = [pos_dict[p][1] for p in parties] ref_lr = [ideology[p]["left_right"] for p in parties] ref_pc = [ideology[p]["progressive"] for p in parties] if year and coalition and year in coalition: gov_set = coalition[year] ref_co = [1.0 if p in gov_set else -1.0 for p in parties] else: ref_co = [0.0] * len(parties) r_lr_x = _pearsonr(party_x, ref_lr) r_co_x = _pearsonr(party_x, ref_co) r_pc_x = _pearsonr(party_x, ref_pc) x_lbl_fallback, x_int, x_q = _assign_label(r_lr_x, r_co_x, r_pc_x, "x") r_lr_y = _pearsonr(party_y, ref_lr) r_co_y = _pearsonr(party_y, ref_co) r_pc_y = _pearsonr(party_y, ref_pc) y_lbl_fallback, y_int, y_q = _assign_label(r_lr_y, r_co_y, r_pc_y, "y") # ── Motion projection (primary) ──────────────────────────────────────── x_lbl = x_lbl_fallback y_lbl = y_lbl_fallback x_conf = 0.0 y_conf = 0.0 x_tops: Dict[str, List] = {"+": [], "-": []} y_tops: Dict[str, List] = {"+": [], "-": []} if motion_path_available: motion_vecs = _load_motion_vectors(db_path, wid) if motion_vecs: projections = _project_motions( motion_vecs, x_axis_arr, y_axis_arr, global_mean ) x_ids = _top_motion_ids(projections, "x", n=5) y_ids = _top_motion_ids(projections, "y", n=5) all_x_ids = x_ids["+"] + x_ids["-"] all_y_ids = y_ids["+"] + y_ids["-"] titles_map = _fetch_motion_titles( db_path, list(set(all_x_ids + all_y_ids)) ) x_title_list = [ titles_map[mid][0] for mid in all_x_ids if mid in titles_map ] y_title_list = [ titles_map[mid][0] for mid in all_y_ids if mid in titles_map ] x_kw_lbl, x_conf = _classify_from_titles(x_title_list) y_kw_lbl, y_conf = _classify_from_titles(y_title_list) if x_kw_lbl is not None: x_lbl = x_kw_lbl if not x_int: tkey = _MOTION_LABEL_TEMPLATE_KEY.get(x_kw_lbl, "fallback") x_int = _INTERPRETATION_TEMPLATES[tkey].format( orientation="horizontale" ) if y_kw_lbl is not None: y_lbl = y_kw_lbl if not y_int: tkey = _MOTION_LABEL_TEMPLATE_KEY.get(y_kw_lbl, "fallback") y_int = _INTERPRETATION_TEMPLATES[tkey].format( orientation="verticale" ) # Build display lists: [(title, date), ...] for pole, ids in x_ids.items(): x_tops[pole] = [titles_map[mid] for mid in ids if mid in titles_map] for pole, ids in y_ids.items(): y_tops[pole] = [titles_map[mid] for mid in ids if mid in titles_map] # ── Final label resolution ──────────────────────────────────────────── # If both motion and ideology paths produced nothing, use generic fallback. _x_fallback, _y_fallback = get_fallback_labels() if x_lbl is None: x_lbl = _x_fallback x_int = "" # No interpretation for unclassified axes if y_lbl is None: y_lbl = _y_fallback y_int = "" # No interpretation for unclassified axes x_quality[wid] = x_q y_quality[wid] = y_q x_interpretation[wid] = x_int y_interpretation[wid] = y_int x_top_motions[wid] = x_tops y_top_motions[wid] = y_tops x_label_confidence[wid] = x_conf y_label_confidence[wid] = y_conf if is_annual: annual_x_labels.append(x_lbl) annual_y_labels.append(y_lbl) def _modal(labels: List[str], fallback: str) -> str: if not labels: return fallback return Counter(labels).most_common(1)[0][0] # Use the module-level display_label_for_modal defined above. enriched = dict(axes) # Resolve modal label across annual windows. If the modal label is the # internal generic component name ("As 1"/"As 2" or legacy # "Stempatroon As N"), prefer a conventional short semantic fallback so the # UI doesn't display unhelpful "As N" strings to end users. modal_x = _modal(annual_x_labels, "Links\u2013Rechts") modal_y = _modal(annual_y_labels, "Progressief\u2013Conservatief") enriched["x_label"] = display_label_for_modal(modal_x, "x") enriched["y_label"] = display_label_for_modal(modal_y, "y") enriched["x_quality"] = x_quality enriched["y_quality"] = y_quality enriched["x_interpretation"] = x_interpretation enriched["y_interpretation"] = y_interpretation enriched["x_top_motions"] = x_top_motions enriched["y_top_motions"] = y_top_motions enriched["x_label_confidence"] = x_label_confidence enriched["y_label_confidence"] = y_label_confidence return enriched