"""political_axis.py — Project MP SVD vectors onto an ideological axis. Two modes: 1. PCA mode (default): compute the first principal component of all MP SVD vectors for a window and project each MP onto it. The sign is arbitrary but consistent within a window. 2. Anchor mode: define the axis as the vector from the centroid of ``left_parties`` to the centroid of ``right_parties``. Project all MPs onto this normalised anchor axis. Both modes return a dict mapping mp_name → scalar score for the given window. """ import json import logging from typing import Dict, List, Optional, Tuple import numpy as np from . import trajectory as _trajectory try: import duckdb except Exception: # pragma: no cover - allow importing module in lightweight test envs duckdb = None # type: ignore # Import canonical party sets from config for consistent orientation with SVD components tab try: from .config import CANONICAL_LEFT, CANONICAL_RIGHT except Exception: # pragma: no cover - fallback for test environments CANONICAL_LEFT: frozenset = frozenset() CANONICAL_RIGHT: frozenset = frozenset() _logger = logging.getLogger(__name__) def _load_mp_svd_vectors(db_path: str, window_id: str) -> Dict[str, np.ndarray]: """Load all MP SVD vectors for a window from svd_vectors table.""" conn = duckdb.connect(db_path) rows = conn.execute( "SELECT entity_id, vector FROM svd_vectors WHERE window_id = ? AND entity_type = 'mp'", (window_id,), ).fetchall() conn.close() result = {} for mp_name, vec_json in rows: try: result[mp_name] = np.array(json.loads(vec_json), dtype=float) except Exception: _logger.warning("Could not parse SVD vector for MP %s", mp_name) return result def compute_pca_axis(db_path: str, window_id: str) -> Dict[str, float]: """Project MP SVD vectors onto their first principal component. Returns {mp_name: score}. Returns empty dict if fewer than 2 MPs. """ mp_vecs = _load_mp_svd_vectors(db_path, window_id) if len(mp_vecs) < 2: _logger.warning( "window %s has only %d MPs; skipping PCA axis", window_id, len(mp_vecs) ) return {} names = list(mp_vecs.keys()) mat = np.vstack([mp_vecs[n] for n in names]) # (n_mps, k) # Centre mat_centred = mat - mat.mean(axis=0) # First PC via SVD try: _, _, Vt = np.linalg.svd(mat_centred, full_matrices=False) axis = Vt[0] # (k,) except np.linalg.LinAlgError: _logger.exception("SVD failed in compute_pca_axis for window %s", window_id) return {} projections = mat_centred.dot(axis) return {name: float(score) for name, score in zip(names, projections)} def compute_anchor_axis( db_path: str, window_id: str, left_parties: List[str], right_parties: List[str], ) -> Dict[str, float]: """Project MP SVD vectors onto a left↔right anchor axis. The axis runs from the centroid of ``left_parties`` to the centroid of ``right_parties``. Positive scores are toward the right. Returns {mp_name: score}. """ mp_vecs = _load_mp_svd_vectors(db_path, window_id) if not mp_vecs: return {} left_set = set(left_parties) right_set = set(right_parties) # 1. Party-level actors whose entity_id IS a party name (e.g. "GroenLinks-PvdA") left_vecs = [mp_vecs[p] for p in left_set if p in mp_vecs] right_vecs = [mp_vecs[p] for p in right_set if p in mp_vecs] # 2. Individual MPs via mp_metadata party affiliation conn = duckdb.connect(db_path) rows = conn.execute("SELECT mp_name, party FROM mp_metadata").fetchall() conn.close() for mp_name, party in rows: if mp_name not in mp_vecs: continue if party in left_set and mp_name not in left_set: left_vecs.append(mp_vecs[mp_name]) elif party in right_set and mp_name not in right_set: right_vecs.append(mp_vecs[mp_name]) if not left_vecs or not right_vecs: _logger.warning( "window %s: insufficient anchor parties (left=%d, right=%d)", window_id, len(left_vecs), len(right_vecs), ) return {} left_centroid = np.mean(left_vecs, axis=0) right_centroid = np.mean(right_vecs, axis=0) axis = right_centroid - left_centroid norm = np.linalg.norm(axis) if norm < 1e-10: _logger.warning("Anchor axis has near-zero norm for window %s", window_id) return {} axis = axis / norm return {name: float(np.dot(vec, axis)) for name, vec in mp_vecs.items()} def compute_2d_axes( db_path: str, window_ids: Optional[List[str]] = None, method: str = "pca", anchor_kwargs: Optional[Dict] = None, normalize_vectors: bool = True, pca_residual: bool = False, ) -> Tuple[Dict[str, Dict[str, Tuple[float, float]]], Dict[str, np.ndarray]]: """Compute 2D coordinates for MPs per window. Args: db_path: path to duckdb window_ids: optional ordered list of windows (defaults to all) method: 'pca' or 'anchor' anchor_kwargs: when method=='anchor' must provide { 'left_parties': List[str], 'right_parties': List[str], 'prog_parties': List[str], 'cons_parties': List[str], } Returns: positions_by_window, axis_def - positions_by_window: {window_id: {mp_name: (x,y)}} - axis_def: {'x_axis': np.ndarray, 'y_axis': np.ndarray, 'method': str} Notes: This function expects aligned SVD vectors produced by trajectory._procrustes_align_windows. It will call trajectory helpers to load and align windows so the returned coordinates are consistent across windows. """ # Import trajectory helper at runtime so tests can monkeypatch sys.modules import importlib _trajectory = importlib.import_module("analysis.trajectory") if window_ids is None: window_ids = _trajectory._load_window_ids(db_path) # Load per-window raw vectors using the trajectory helper and align them raw_window_vecs: Dict[str, Dict[str, np.ndarray]] = {} for wid in window_ids: raw_window_vecs[wid] = _trajectory._load_mp_vectors_for_window(db_path, wid) # Pad all vectors to the maximum dimension across windows before Procrustes. # Some windows (e.g. 2017 with only 30 motions) have lower-rank SVD output # (dim=29 instead of 50). Padding with zeros lets Procrustes treat all vectors # as the same dimension, preserving the alignment chain. if raw_window_vecs: max_dim = max(v.shape[0] for d in raw_window_vecs.values() for v in d.values()) padded: Dict[str, Dict[str, np.ndarray]] = {} for wid, d in raw_window_vecs.items(): padded[wid] = { e: np.pad(v, (0, max_dim - v.shape[0])) if v.shape[0] < max_dim else v for e, v in d.items() } raw_window_vecs = padded aligned_window_vecs = _trajectory._procrustes_align_windows(raw_window_vecs) # Stack all vectors across windows into a single matrix for PCA if needed all_vecs = [] entity_index = [] # parallel list of (window_id, entity) for wid, d in aligned_window_vecs.items(): for ent, v in d.items(): if normalize_vectors: n = np.linalg.norm(v) all_vecs.append(v / n if n > 1e-10 else v) else: all_vecs.append(v) entity_index.append((wid, ent)) if len(all_vecs) == 0: _logger.info("No vectors loaded for windows %s", window_ids) return ({}, {}) M = np.vstack(all_vecs) if method == "pca": # centre globally Mc = M - M.mean(axis=0) try: U, s, Vt = np.linalg.svd(Mc, full_matrices=False) except np.linalg.LinAlgError: _logger.exception("SVD failed in compute_2d_axes (pca)") return ({}, {}) # explained variance ratio from singular values sv2 = s**2 evr = sv2 / (sv2.sum() + 1e-20) evr1 = float(evr[0]) if evr.size > 0 else 0.0 evr2 = float(evr[1]) if evr.size > 1 else 0.0 # take top-1 component as primary axis comp1 = Vt[0] comp1_hat = comp1 / (np.linalg.norm(comp1) + 1e-12) # By default take the second Vt row as the second axis, but optionally # compute the second axis from residuals (PCA on data with PC1 removed) comp2 = Vt[1] if Vt.shape[0] > 1 else np.zeros_like(comp1) # compute residual-based second component if requested or if PC1 dominates if pca_residual or evr1 > 0.85: # Project out PC1 from centred data and compute PCA on residuals proj1 = (Mc.dot(comp1_hat)).reshape(-1, 1) * comp1_hat.reshape(1, -1) residual = Mc - proj1 try: _, s_res, Vt_res = np.linalg.svd(residual, full_matrices=False) comp2 = Vt_res[0] _logger.info( "Using residual PCA for second axis (pca_residual=%s)", pca_residual ) except Exception: _logger.exception( "Residual PCA failed; falling back to raw second component" ) comp2_hat = comp2 / (np.linalg.norm(comp2) + 1e-12) axes = { "x_axis": comp1_hat, "y_axis": comp2_hat, "method": "pca", "explained_variance_ratio": [evr1, evr2], "pca_residual_used": bool(pca_residual or evr1 > 0.85), } # Canonical party sets used for axis orientation (global and per-window). # Use CANONICAL_LEFT/RIGHT from config for consistency with SVD components tab. # X-axis: CANONICAL_RIGHT (right) vs CANONICAL_LEFT (left) # Y-axis: CANONICAL_LEFT (progressive) vs cons_parties (conservative) right_parties = CANONICAL_RIGHT left_parties = CANONICAL_LEFT cons_parties = CANONICAL_RIGHT # Conservative = right-leaning parties prog_parties = CANONICAL_LEFT # Progressive = left-leaning parties # Ensure consistent left/right and progressive/conservative orientation # by checking canonical party centroids and flipping axis signs if needed. try: # Build mapping of entity -> vector from stacked matrix M ent_to_vec = {ent: vec for (wid, ent), vec in zip(entity_index, M)} def _centroid_for_party_set(party_set): vecs = [] # Party-level centroid vectors (entity_id == party name directly). for p in party_set: if p in ent_to_vec: vecs.append(ent_to_vec[p]) # MP-level vectors: mp_metadata stores mp_name in the same # "Lastname, Initials" format as entity_id in svd_vectors. try: conn = duckdb.connect(db_path) rows = conn.execute( "SELECT mp_name, party FROM mp_metadata" ).fetchall() conn.close() except Exception: rows = [] for mp_name, party in rows: if party in party_set and mp_name in ent_to_vec: vecs.append(ent_to_vec[mp_name]) if not vecs: return None return np.mean(np.vstack(vecs), axis=0) # X-axis: left vs right left_cent = _centroid_for_party_set(left_parties) right_cent = _centroid_for_party_set(right_parties) if left_cent is not None and right_cent is not None: left_proj = float(np.dot(left_cent - M.mean(axis=0), comp1_hat)) right_proj = float(np.dot(right_cent - M.mean(axis=0), comp1_hat)) if right_proj < left_proj: _logger.info( "Flipping PCA x-axis to match canonical left/right orientation (right_proj=%.3f left_proj=%.3f)", right_proj, left_proj, ) axes["x_axis"] = -axes["x_axis"] # Y-axis: progressive vs conservative — positive Y = progressive prog_cent = _centroid_for_party_set(prog_parties) cons_cent = _centroid_for_party_set(cons_parties) if prog_cent is not None and cons_cent is not None: prog_proj = float(np.dot(prog_cent - M.mean(axis=0), comp2_hat)) cons_proj = float(np.dot(cons_cent - M.mean(axis=0), comp2_hat)) # We want positive Y to mean 'progressive'. If the progressive # centroid currently projects lower than the conservative centroid, # flip the sign so progressive > conservative. if prog_proj < cons_proj: _logger.info( "Flipping PCA y-axis so positive Y corresponds to progressive (prog_proj=%.3f cons_proj=%.3f)", prog_proj, cons_proj, ) axes["y_axis"] = -axes["y_axis"] except Exception: _logger.debug("Could not auto-orient PCA axes; leaving signs as-is") # warn if PCA is effectively 1-D if evr1 > 0.85 and not pca_residual: _logger.warning( "PCA first component explains %.1f%% of variance — data is near-1D;\n" "consider using pca_residual=True or the anchor method for a second axis", evr1 * 100, ) # project per-window vectors (centre by global mean) global_mean = M.mean(axis=0) axes["global_mean"] = global_mean positions_by_window: Dict[str, Dict[str, Tuple[float, float]]] = { wid: {} for wid in window_ids } for (wid, ent), vec in zip(entity_index, M): v_centered = vec - global_mean x = float(np.dot(v_centered, axes["x_axis"])) y = float(np.dot(v_centered, axes["y_axis"])) positions_by_window[wid][ent] = (x, y) # Per-window Y-axis correction: ensure "positive Y = progressive" holds # for EACH window individually. The global orientation check above uses # centroids averaged across all windows, so individual windows (e.g. an # election year with few returning MPs) can still be inverted. We check # each window and flip its Y values if conservative parties sit above # progressive ones. try: # Fetch mp_metadata once for the per-window check _mp_meta_rows: List[Tuple[str, str]] = [] try: conn = duckdb.connect(db_path) _mp_meta_rows = conn.execute( "SELECT mp_name, party FROM mp_metadata" ).fetchall() conn.close() except Exception: pass # no DB available (e.g. unit tests without metadata) # Map mp_name -> party _mp_party: Dict[str, str] = {r[0]: r[1] for r in _mp_meta_rows} y_flipped_windows: set = set() for wid, pos_dict in positions_by_window.items(): prog_ys = [] cons_ys = [] for ent, (x_val, y_val) in pos_dict.items(): # direct party entity if ent in prog_parties: prog_ys.append(y_val) elif ent in cons_parties: cons_ys.append(y_val) # individual MP via metadata lookup party = _mp_party.get(ent) if party is not None: if party in prog_parties: prog_ys.append(y_val) elif party in cons_parties: cons_ys.append(y_val) if prog_ys and cons_ys: prog_avg = float(np.mean(prog_ys)) cons_avg = float(np.mean(cons_ys)) if cons_avg > prog_avg: _logger.info( "Per-window Y flip for window %s: " "prog_avg_y=%.3f cons_avg_y=%.3f — negating Y", wid, prog_avg, cons_avg, ) positions_by_window[wid] = { ent: (x_val, -y_val) for ent, (x_val, y_val) in pos_dict.items() } y_flipped_windows.add(wid) axes["y_flipped_windows"] = y_flipped_windows # Per-window X-axis correction: mirror the Y-axis logic above. # The global X-flip uses centroids averaged across all windows, so # individual windows can still have left/right inverted. x_flipped_windows: set = set() for wid, pos_dict in positions_by_window.items(): right_xs = [] left_xs = [] for ent, (x_val, y_val) in pos_dict.items(): # direct party entity if ent in right_parties: right_xs.append(x_val) elif ent in left_parties: left_xs.append(x_val) # individual MP via metadata lookup party = _mp_party.get(ent) if party is not None: if party in right_parties: right_xs.append(x_val) elif party in left_parties: left_xs.append(x_val) if right_xs and left_xs: right_avg = float(np.mean(right_xs)) left_avg = float(np.mean(left_xs)) if left_avg > right_avg: _logger.info( "Per-window X flip for window %s: " "right_avg_x=%.3f left_avg_x=%.3f — negating X", wid, right_avg, left_avg, ) positions_by_window[wid] = { ent: (-x_val, y_val) for ent, (x_val, y_val) in pos_dict.items() } x_flipped_windows.add(wid) axes["x_flipped_windows"] = x_flipped_windows except Exception: _logger.debug( "Per-window orientation check failed; leaving per-window axes as-is" ) return positions_by_window, axes elif method == "anchor": if not anchor_kwargs: raise ValueError("anchor_kwargs required for method='anchor'") left = set(anchor_kwargs.get("left_parties", [])) right = set(anchor_kwargs.get("right_parties", [])) prog = set(anchor_kwargs.get("prog_parties", [])) cons = set(anchor_kwargs.get("cons_parties", [])) # collect vectors across all windows for each anchor group def collect_for_party_set(party_set: set) -> List[np.ndarray]: res: List[np.ndarray] = [] # party-level entities (entity_id equals party name) for wid, d in aligned_window_vecs.items(): for ent, v in d.items(): if ent in party_set: res.append(v) # MP-level via mp_metadata party affiliation conn = duckdb.connect(db_path) rows = conn.execute("SELECT mp_name, party FROM mp_metadata").fetchall() conn.close() for mp_name, party in rows: if party in party_set: # take all vectors for this MP across windows if present for wid, d in aligned_window_vecs.items(): if mp_name in d: res.append(d[mp_name]) return res left_vecs = collect_for_party_set(left) right_vecs = collect_for_party_set(right) prog_vecs = collect_for_party_set(prog) cons_vecs = collect_for_party_set(cons) if not left_vecs or not right_vecs or not prog_vecs or not cons_vecs: _logger.warning("Insufficient anchor vectors for requested parties") return ({}, {}) left_centroid = np.mean(np.vstack(left_vecs), axis=0) right_centroid = np.mean(np.vstack(right_vecs), axis=0) prog_centroid = np.mean(np.vstack(prog_vecs), axis=0) cons_centroid = np.mean(np.vstack(cons_vecs), axis=0) lr = right_centroid - left_centroid # Construct progressive-conservative axis so that positive Y corresponds # to *progressive* positions (consistent with PCA branch where we flip # the sign to make progressive > conservative). Use prog - cons so a # positive dot product means closer to the progressive centroid. pc = prog_centroid - cons_centroid # Gram-Schmidt: make pc orthogonal to lr lr_norm = np.linalg.norm(lr) if lr_norm < 1e-12: raise ValueError("Left-right anchor axis has near-zero norm") lr_hat = lr / lr_norm # remove projection of pc on lr pc = pc - np.dot(pc, lr_hat) * lr_hat pc_norm = np.linalg.norm(pc) if pc_norm < 1e-12: raise ValueError( "Progressive-conservative anchor axis degenerate after orthogonalisation" ) pc_hat = pc / pc_norm axes = {"x_axis": lr_hat, "y_axis": pc_hat, "method": "anchor"} positions_by_window = {wid: {} for wid in window_ids} for wid, d in aligned_window_vecs.items(): for ent, v in d.items(): x = float(np.dot(v, axes["x_axis"])) y = float(np.dot(v, axes["y_axis"])) positions_by_window[wid][ent] = (x, y) return positions_by_window, axes else: raise ValueError("Unknown method '%s'" % method) def compute_nd_axes( db_path: str, window_ids: Optional[List[str]] = None, n_components: int = 10, normalize_vectors: bool = True, ) -> Tuple[Dict[str, Dict[str, np.ndarray]], Dict]: """Compute aligned PCA projections onto N components for MPs per window. This extends compute_2d_axes to return projections onto all N principal components (not just the first 2), enabling consistent aligned positioning for SVD components 1-10 in the explorer. Args: db_path: path to duckdb window_ids: optional ordered list of windows (defaults to all) n_components: number of PCA components to compute (default 10) normalize_vectors: whether to normalize vectors before PCA (default True) Returns: scores_by_window, axes_def - scores_by_window: {window_id: {entity: np.ndarray of shape (n_components,)}} - axes_def: dict with 'components' (list of component vectors), 'explained_variance_ratio', 'global_mean', etc. """ import importlib _trajectory = importlib.import_module("analysis.trajectory") if window_ids is None: window_ids = _trajectory._load_window_ids(db_path) # Load per-window raw vectors and align them raw_window_vecs: Dict[str, Dict[str, np.ndarray]] = {} for wid in window_ids: raw_window_vecs[wid] = _trajectory._load_mp_vectors_for_window(db_path, wid) # Pad all vectors to maximum dimension across windows if raw_window_vecs: max_dim = max(v.shape[0] for d in raw_window_vecs.values() for v in d.values()) padded: Dict[str, Dict[str, np.ndarray]] = {} for wid, d in raw_window_vecs.items(): padded[wid] = { e: np.pad(v, (0, max_dim - v.shape[0])) if v.shape[0] < max_dim else v for e, v in d.items() } raw_window_vecs = padded aligned_window_vecs = _trajectory._procrustes_align_windows(raw_window_vecs) # Stack all aligned vectors across windows all_vecs = [] entity_index = [] # parallel list of (window_id, entity) for wid, d in aligned_window_vecs.items(): for ent, v in d.items(): if normalize_vectors: n = np.linalg.norm(v) all_vecs.append(v / n if n > 1e-10 else v) else: all_vecs.append(v) entity_index.append((wid, ent)) if len(all_vecs) == 0: _logger.info("No vectors loaded for windows %s", window_ids) return ({}, {}) M = np.vstack(all_vecs) global_mean = M.mean(axis=0) # PCA: centre globally and compute SVD Mc = M - global_mean try: U, s, Vt = np.linalg.svd(Mc, full_matrices=False) except np.linalg.LinAlgError: _logger.exception("SVD failed in compute_nd_axes") return ({}, {}) # Explained variance ratio for each component sv2 = s**2 evr = sv2 / (sv2.sum() + 1e-20) explained_variance_ratio = evr[:n_components].tolist() # Component directions (normalized) components = [ Vt[i] / (np.linalg.norm(Vt[i]) + 1e-12) for i in range(min(n_components, Vt.shape[0])) ] # Build entity -> vector mapping ent_to_vec = {ent: vec for (wid, ent), vec in zip(entity_index, M)} # Per-component flip directions using canonical party centroids right_parties = CANONICAL_RIGHT left_parties = CANONICAL_LEFT def _centroid_for_party_set(party_set): vecs = [] for p in party_set: if p in ent_to_vec: vecs.append(ent_to_vec[p]) try: conn = duckdb.connect(db_path) rows = conn.execute("SELECT mp_name, party FROM mp_metadata").fetchall() conn.close() except Exception: rows = [] for mp_name, party in rows: if party in party_set and mp_name in ent_to_vec: vecs.append(ent_to_vec[mp_name]) if not vecs: return None return np.mean(np.vstack(vecs), axis=0) left_cent = _centroid_for_party_set(left_parties) right_cent = _centroid_for_party_set(right_parties) # Compute flip signs per component flip_signs = [] if left_cent is not None and right_cent is not None: for i, comp in enumerate(components): left_proj = float(np.dot(left_cent - global_mean, comp)) right_proj = float(np.dot(right_cent - global_mean, comp)) # Flip if right parties project lower than left (we want RIGHT > LEFT) flip_signs.append(-1.0 if right_proj < left_proj else 1.0) else: flip_signs = [1.0] * len(components) # Project all entities onto all components scores_by_window: Dict[str, Dict[str, np.ndarray]] = {wid: {} for wid in window_ids} for (wid, ent), vec in zip(entity_index, M): v_centered = vec - global_mean scores = np.array( [ flip_signs[i] * float(np.dot(v_centered, components[i])) for i in range(len(components)) ] ) scores_by_window[wid][ent] = scores axes_def = { "components": components, "explained_variance_ratio": explained_variance_ratio, "global_mean": global_mean, "flip_signs": flip_signs, "n_components": len(components), } return scores_by_window, axes_def def compute_svd_spectrum( db_path: str, window_ids: Optional[List[str]] = None, normalize_vectors: bool = True, ) -> List[float]: """Return explained variance ratios (%) for all SVD components, sorted descending. Uses the same Procrustes-aligned multi-window matrix as compute_2d_axes so the scree plot is consistent with the compass axes. Args: db_path: path to duckdb window_ids: optional ordered list of windows (defaults to all) normalize_vectors: whether to L2-normalise each MP vector before stacking Returns: List of EVR percentages sorted descending (e.g. [24.1, 10.4, 7.2, ...]) """ import importlib _trajectory = importlib.import_module("analysis.trajectory") if window_ids is None: window_ids = _trajectory._load_window_ids(db_path) raw_window_vecs: Dict[str, Dict[str, np.ndarray]] = {} for wid in window_ids: raw_window_vecs[wid] = _trajectory._load_mp_vectors_for_window(db_path, wid) if not raw_window_vecs: return [] # Pad to uniform dimension before Procrustes alignment max_dim = max(v.shape[0] for d in raw_window_vecs.values() for v in d.values()) padded: Dict[str, Dict[str, np.ndarray]] = {} for wid, d in raw_window_vecs.items(): padded[wid] = { e: np.pad(v, (0, max_dim - v.shape[0])) if v.shape[0] < max_dim else v for e, v in d.items() } aligned = _trajectory._procrustes_align_windows(padded) all_vecs = [] for d in aligned.values(): for v in d.values(): if normalize_vectors: n = np.linalg.norm(v) all_vecs.append(v / n if n > 1e-10 else v) else: all_vecs.append(v) if not all_vecs: return [] M = np.vstack(all_vecs) Mc = M - M.mean(axis=0) try: _, s, _ = np.linalg.svd(Mc, full_matrices=False) except np.linalg.LinAlgError: _logger.exception("SVD failed in compute_svd_spectrum") return [] sv2 = s**2 evr = sv2 / (sv2.sum() + 1e-20) * 100 return list(evr) # already sorted descending by SVD def compute_party_bootstrap_cis( party_vectors: Dict[str, List[np.ndarray]], n_boot: int = 1000, ci: float = 95.0, seed: int = 42, ) -> Dict[str, Dict]: """Compute bootstrap confidence intervals for party centroid vectors. For each party, resamples its MP vectors with replacement to build a distribution of centroid estimates, then extracts percentile-based confidence intervals per dimension. Args: party_vectors: mapping of party name → list of individual MP vectors (each a numpy array of consistent length, e.g. 50 dimensions). n_boot: number of bootstrap replicates. ci: confidence level as a percentage (e.g. 95.0 for 95% CI). seed: random seed for reproducibility (used with ``np.random.default_rng``). Returns: Dict mapping party name → dict with keys ``centroid``, ``ci_lower``, ``ci_upper``, ``std``, and ``n_mps``. Parties with no MPs (empty list) are excluded from the output. """ alpha = 100.0 - ci lo_pct = alpha / 2.0 hi_pct = 100.0 - lo_pct rng = np.random.default_rng(seed) result: Dict[str, Dict] = {} for party, vectors in party_vectors.items(): n_mps = len(vectors) if n_mps == 0: continue mat = np.vstack(vectors) # (n_mps, dim) centroid = np.mean(mat, axis=0) if n_mps == 1: result[party] = { "centroid": centroid, "ci_lower": centroid.copy(), "ci_upper": centroid.copy(), "std": np.zeros_like(centroid), "n_mps": 1, } continue idx = rng.integers(0, n_mps, size=(n_boot, n_mps)) boot_centroids = mat[idx].mean(axis=1) # (n_boot, dim) ci_lower = np.percentile(boot_centroids, lo_pct, axis=0) ci_upper = np.percentile(boot_centroids, hi_pct, axis=0) std = np.std(boot_centroids, axis=0) result[party] = { "centroid": centroid, "ci_lower": ci_lower, "ci_upper": ci_upper, "std": std, "n_mps": n_mps, } _logger.info( "Bootstrap CIs computed for %d parties (n_boot=%d, ci=%.1f%%)", len(result), n_boot, ci, ) return result