"""political_axis.py — Project MP SVD vectors onto an ideological axis. Two modes: 1. PCA mode (default): compute the first principal component of all MP SVD vectors for a window and project each MP onto it. The sign is arbitrary but consistent within a window. 2. Anchor mode: define the axis as the vector from the centroid of ``left_parties`` to the centroid of ``right_parties``. Project all MPs onto this normalised anchor axis. Both modes return a dict mapping mp_name → scalar score for the given window. """ import json import logging from typing import Dict, List, Optional, Tuple import numpy as np from . import trajectory as _trajectory import duckdb _logger = logging.getLogger(__name__) def _load_mp_svd_vectors(db_path: str, window_id: str) -> Dict[str, np.ndarray]: """Load all MP SVD vectors for a window from svd_vectors table.""" conn = duckdb.connect(db_path) rows = conn.execute( "SELECT entity_id, vector FROM svd_vectors WHERE window_id = ? AND entity_type = 'mp'", (window_id,), ).fetchall() conn.close() result = {} for mp_name, vec_json in rows: try: result[mp_name] = np.array(json.loads(vec_json), dtype=float) except Exception: _logger.warning("Could not parse SVD vector for MP %s", mp_name) return result def compute_pca_axis(db_path: str, window_id: str) -> Dict[str, float]: """Project MP SVD vectors onto their first principal component. Returns {mp_name: score}. Returns empty dict if fewer than 2 MPs. """ mp_vecs = _load_mp_svd_vectors(db_path, window_id) if len(mp_vecs) < 2: _logger.warning( "window %s has only %d MPs; skipping PCA axis", window_id, len(mp_vecs) ) return {} names = list(mp_vecs.keys()) mat = np.vstack([mp_vecs[n] for n in names]) # (n_mps, k) # Centre mat_centred = mat - mat.mean(axis=0) # First PC via SVD try: _, _, Vt = np.linalg.svd(mat_centred, full_matrices=False) axis = Vt[0] # (k,) except np.linalg.LinAlgError: _logger.exception("SVD failed in compute_pca_axis for window %s", window_id) return {} projections = mat_centred.dot(axis) return {name: float(score) for name, score in zip(names, projections)} def compute_anchor_axis( db_path: str, window_id: str, left_parties: List[str], right_parties: List[str], ) -> Dict[str, float]: """Project MP SVD vectors onto a left↔right anchor axis. The axis runs from the centroid of ``left_parties`` to the centroid of ``right_parties``. Positive scores are toward the right. Returns {mp_name: score}. """ mp_vecs = _load_mp_svd_vectors(db_path, window_id) if not mp_vecs: return {} left_set = set(left_parties) right_set = set(right_parties) # 1. Party-level actors whose entity_id IS a party name (e.g. "GroenLinks-PvdA") left_vecs = [mp_vecs[p] for p in left_set if p in mp_vecs] right_vecs = [mp_vecs[p] for p in right_set if p in mp_vecs] # 2. Individual MPs via mp_metadata party affiliation conn = duckdb.connect(db_path) rows = conn.execute("SELECT mp_name, party FROM mp_metadata").fetchall() conn.close() for mp_name, party in rows: if mp_name not in mp_vecs: continue if party in left_set and mp_name not in left_set: left_vecs.append(mp_vecs[mp_name]) elif party in right_set and mp_name not in right_set: right_vecs.append(mp_vecs[mp_name]) if not left_vecs or not right_vecs: _logger.warning( "window %s: insufficient anchor parties (left=%d, right=%d)", window_id, len(left_vecs), len(right_vecs), ) return {} left_centroid = np.mean(left_vecs, axis=0) right_centroid = np.mean(right_vecs, axis=0) axis = right_centroid - left_centroid norm = np.linalg.norm(axis) if norm < 1e-10: _logger.warning("Anchor axis has near-zero norm for window %s", window_id) return {} axis = axis / norm return {name: float(np.dot(vec, axis)) for name, vec in mp_vecs.items()} def compute_2d_axes( db_path: str, window_ids: Optional[List[str]] = None, method: str = "pca", anchor_kwargs: Optional[Dict] = None, normalize_vectors: bool = True, pca_residual: bool = False, ) -> Tuple[Dict[str, Dict[str, Tuple[float, float]]], Dict[str, np.ndarray]]: """Compute 2D coordinates for MPs per window. Args: db_path: path to duckdb window_ids: optional ordered list of windows (defaults to all) method: 'pca' or 'anchor' anchor_kwargs: when method=='anchor' must provide { 'left_parties': List[str], 'right_parties': List[str], 'prog_parties': List[str], 'cons_parties': List[str], } Returns: positions_by_window, axis_def - positions_by_window: {window_id: {mp_name: (x,y)}} - axis_def: {'x_axis': np.ndarray, 'y_axis': np.ndarray, 'method': str} Notes: This function expects aligned SVD vectors produced by trajectory._procrustes_align_windows. It will call trajectory helpers to load and align windows so the returned coordinates are consistent across windows. """ # Import trajectory helper at runtime so tests can monkeypatch sys.modules import importlib _trajectory = importlib.import_module("analysis.trajectory") if window_ids is None: window_ids = _trajectory._load_window_ids(db_path) # Load per-window raw vectors using the trajectory helper and align them raw_window_vecs: Dict[str, Dict[str, np.ndarray]] = {} for wid in window_ids: raw_window_vecs[wid] = _trajectory._load_mp_vectors_for_window(db_path, wid) # Pad all vectors to the maximum dimension across windows before Procrustes. # Some windows (e.g. 2017 with only 30 motions) have lower-rank SVD output # (dim=29 instead of 50). Padding with zeros lets Procrustes treat all vectors # as the same dimension, preserving the alignment chain. if raw_window_vecs: max_dim = max(v.shape[0] for d in raw_window_vecs.values() for v in d.values()) padded: Dict[str, Dict[str, np.ndarray]] = {} for wid, d in raw_window_vecs.items(): padded[wid] = { e: np.pad(v, (0, max_dim - v.shape[0])) if v.shape[0] < max_dim else v for e, v in d.items() } raw_window_vecs = padded aligned_window_vecs = _trajectory._procrustes_align_windows(raw_window_vecs) # Stack all vectors across windows into a single matrix for PCA if needed all_vecs = [] entity_index = [] # parallel list of (window_id, entity) for wid, d in aligned_window_vecs.items(): for ent, v in d.items(): if normalize_vectors: n = np.linalg.norm(v) all_vecs.append(v / n if n > 1e-10 else v) else: all_vecs.append(v) entity_index.append((wid, ent)) if len(all_vecs) == 0: _logger.info("No vectors loaded for windows %s", window_ids) return ({}, {}) M = np.vstack(all_vecs) if method == "pca": # centre globally Mc = M - M.mean(axis=0) try: U, s, Vt = np.linalg.svd(Mc, full_matrices=False) except np.linalg.LinAlgError: _logger.exception("SVD failed in compute_2d_axes (pca)") return ({}, {}) # explained variance ratio from singular values sv2 = s**2 evr = sv2 / (sv2.sum() + 1e-20) evr1 = float(evr[0]) if evr.size > 0 else 0.0 evr2 = float(evr[1]) if evr.size > 1 else 0.0 # take top-1 component as primary axis comp1 = Vt[0] comp1_hat = comp1 / (np.linalg.norm(comp1) + 1e-12) # By default take the second Vt row as the second axis, but optionally # compute the second axis from residuals (PCA on data with PC1 removed) comp2 = Vt[1] if Vt.shape[0] > 1 else np.zeros_like(comp1) # compute residual-based second component if requested or if PC1 dominates if pca_residual or evr1 > 0.85: # Project out PC1 from centred data and compute PCA on residuals proj1 = (Mc.dot(comp1_hat)).reshape(-1, 1) * comp1_hat.reshape(1, -1) residual = Mc - proj1 try: _, s_res, Vt_res = np.linalg.svd(residual, full_matrices=False) comp2 = Vt_res[0] _logger.info( "Using residual PCA for second axis (pca_residual=%s)", pca_residual ) except Exception: _logger.exception( "Residual PCA failed; falling back to raw second component" ) comp2_hat = comp2 / (np.linalg.norm(comp2) + 1e-12) axes = { "x_axis": comp1_hat, "y_axis": comp2_hat, "method": "pca", "explained_variance_ratio": [evr1, evr2], "pca_residual_used": bool(pca_residual or evr1 > 0.85), } # Ensure consistent left/right and progressive/conservative orientation # by checking canonical party centroids and flipping axis signs if needed. try: right_parties = { "PVV", "VVD", "FVD", "BBB", "JA21", "Nieuw Sociaal Contract", } left_parties = {"SP", "PvdA", "GL", "GroenLinks", "GroenLinks-PvdA", "DENK"} cons_parties = { "PVV", "VVD", "FVD", "CDA", "SGP", "BBB", "JA21", "Nieuw Sociaal Contract", } prog_parties = { "GL", "GroenLinks", "PvdA", "PvdD", "SP", "GroenLinks-PvdA", "DENK", } # Build mapping of entity -> vector from stacked matrix M ent_to_vec = {ent: vec for (wid, ent), vec in zip(entity_index, M)} def _centroid_for_party_set(party_set): vecs = [] # Party-level centroid vectors (entity_id == party name directly). for p in party_set: if p in ent_to_vec: vecs.append(ent_to_vec[p]) # MP-level vectors: mp_metadata stores mp_name in the same # "Lastname, Initials" format as entity_id in svd_vectors. try: conn = duckdb.connect(db_path) rows = conn.execute( "SELECT mp_name, party FROM mp_metadata" ).fetchall() conn.close() except Exception: rows = [] for mp_name, party in rows: if party in party_set and mp_name in ent_to_vec: vecs.append(ent_to_vec[mp_name]) if not vecs: return None return np.mean(np.vstack(vecs), axis=0) # X-axis: left vs right left_cent = _centroid_for_party_set(left_parties) right_cent = _centroid_for_party_set(right_parties) if left_cent is not None and right_cent is not None: left_proj = float(np.dot(left_cent - M.mean(axis=0), comp1_hat)) right_proj = float(np.dot(right_cent - M.mean(axis=0), comp1_hat)) if right_proj < left_proj: _logger.info( "Flipping PCA x-axis to match canonical left/right orientation (right_proj=%.3f left_proj=%.3f)", right_proj, left_proj, ) axes["x_axis"] = -axes["x_axis"] # Y-axis: progressive vs conservative — positive Y = progressive prog_cent = _centroid_for_party_set(prog_parties) cons_cent = _centroid_for_party_set(cons_parties) if prog_cent is not None and cons_cent is not None: prog_proj = float(np.dot(prog_cent - M.mean(axis=0), comp2_hat)) cons_proj = float(np.dot(cons_cent - M.mean(axis=0), comp2_hat)) # We want positive Y to mean 'progressive'. If the progressive # centroid currently projects lower than the conservative centroid, # flip the sign so progressive > conservative. if prog_proj < cons_proj: _logger.info( "Flipping PCA y-axis so positive Y corresponds to progressive (prog_proj=%.3f cons_proj=%.3f)", prog_proj, cons_proj, ) axes["y_axis"] = -axes["y_axis"] except Exception: _logger.debug("Could not auto-orient PCA axes; leaving signs as-is") # warn if PCA is effectively 1-D if evr1 > 0.85 and not pca_residual: _logger.warning( "PCA first component explains %.1f%% of variance — data is near-1D;\n" "consider using pca_residual=True or the anchor method for a second axis", evr1 * 100, ) # project per-window vectors (centre by global mean) global_mean = M.mean(axis=0) positions_by_window: Dict[str, Dict[str, Tuple[float, float]]] = { wid: {} for wid in window_ids } for (wid, ent), vec in zip(entity_index, M): v_centered = vec - global_mean x = float(np.dot(v_centered, axes["x_axis"])) y = float(np.dot(v_centered, axes["y_axis"])) positions_by_window[wid][ent] = (x, y) return positions_by_window, axes elif method == "anchor": if not anchor_kwargs: raise ValueError("anchor_kwargs required for method='anchor'") left = set(anchor_kwargs.get("left_parties", [])) right = set(anchor_kwargs.get("right_parties", [])) prog = set(anchor_kwargs.get("prog_parties", [])) cons = set(anchor_kwargs.get("cons_parties", [])) # collect vectors across all windows for each anchor group def collect_for_party_set(party_set: set) -> List[np.ndarray]: res: List[np.ndarray] = [] # party-level entities (entity_id equals party name) for wid, d in aligned_window_vecs.items(): for ent, v in d.items(): if ent in party_set: res.append(v) # MP-level via mp_metadata party affiliation conn = duckdb.connect(db_path) rows = conn.execute("SELECT mp_name, party FROM mp_metadata").fetchall() conn.close() for mp_name, party in rows: if party in party_set: # take all vectors for this MP across windows if present for wid, d in aligned_window_vecs.items(): if mp_name in d: res.append(d[mp_name]) return res left_vecs = collect_for_party_set(left) right_vecs = collect_for_party_set(right) prog_vecs = collect_for_party_set(prog) cons_vecs = collect_for_party_set(cons) if not left_vecs or not right_vecs or not prog_vecs or not cons_vecs: _logger.warning("Insufficient anchor vectors for requested parties") return ({}, {}) left_centroid = np.mean(np.vstack(left_vecs), axis=0) right_centroid = np.mean(np.vstack(right_vecs), axis=0) prog_centroid = np.mean(np.vstack(prog_vecs), axis=0) cons_centroid = np.mean(np.vstack(cons_vecs), axis=0) lr = right_centroid - left_centroid # Construct progressive-conservative axis so that positive Y corresponds # to *progressive* positions (consistent with PCA branch where we flip # the sign to make progressive > conservative). Use prog - cons so a # positive dot product means closer to the progressive centroid. pc = prog_centroid - cons_centroid # Gram-Schmidt: make pc orthogonal to lr lr_norm = np.linalg.norm(lr) if lr_norm < 1e-12: raise ValueError("Left-right anchor axis has near-zero norm") lr_hat = lr / lr_norm # remove projection of pc on lr pc = pc - np.dot(pc, lr_hat) * lr_hat pc_norm = np.linalg.norm(pc) if pc_norm < 1e-12: raise ValueError( "Progressive-conservative anchor axis degenerate after orthogonalisation" ) pc_hat = pc / pc_norm axes = {"x_axis": lr_hat, "y_axis": pc_hat, "method": "anchor"} positions_by_window = {wid: {} for wid in window_ids} for wid, d in aligned_window_vecs.items(): for ent, v in d.items(): x = float(np.dot(v, axes["x_axis"])) y = float(np.dot(v, axes["y_axis"])) positions_by_window[wid][ent] = (x, y) return positions_by_window, axes else: raise ValueError("Unknown method '%s'" % method)