diff --git a/analysis/political_axis.py b/analysis/political_axis.py index ed337d2..ff27e33 100644 --- a/analysis/political_axis.py +++ b/analysis/political_axis.py @@ -542,6 +542,155 @@ def compute_2d_axes( raise ValueError("Unknown method '%s'" % method) +def compute_nd_axes( + db_path: str, + window_ids: Optional[List[str]] = None, + n_components: int = 10, + normalize_vectors: bool = True, +) -> Tuple[Dict[str, Dict[str, np.ndarray]], Dict]: + """Compute aligned PCA projections onto N components for MPs per window. + + This extends compute_2d_axes to return projections onto all N principal + components (not just the first 2), enabling consistent aligned positioning + for SVD components 1-10 in the explorer. + + Args: + db_path: path to duckdb + window_ids: optional ordered list of windows (defaults to all) + n_components: number of PCA components to compute (default 10) + normalize_vectors: whether to normalize vectors before PCA (default True) + + Returns: + scores_by_window, axes_def + - scores_by_window: {window_id: {entity: np.ndarray of shape (n_components,)}} + - axes_def: dict with 'components' (list of component vectors), + 'explained_variance_ratio', 'global_mean', etc. + """ + import importlib + + _trajectory = importlib.import_module("analysis.trajectory") + + if window_ids is None: + window_ids = _trajectory._load_window_ids(db_path) + + # Load per-window raw vectors and align them + raw_window_vecs: Dict[str, Dict[str, np.ndarray]] = {} + for wid in window_ids: + raw_window_vecs[wid] = _trajectory._load_mp_vectors_for_window(db_path, wid) + + # Pad all vectors to maximum dimension across windows + if raw_window_vecs: + max_dim = max(v.shape[0] for d in raw_window_vecs.values() for v in d.values()) + padded: Dict[str, Dict[str, np.ndarray]] = {} + for wid, d in raw_window_vecs.items(): + padded[wid] = { + e: np.pad(v, (0, max_dim - v.shape[0])) if v.shape[0] < max_dim else v + for e, v in d.items() + } + raw_window_vecs = padded + + aligned_window_vecs = _trajectory._procrustes_align_windows(raw_window_vecs) + + # Stack all aligned vectors across windows + all_vecs = [] + entity_index = [] # parallel list of (window_id, entity) + for wid, d in aligned_window_vecs.items(): + for ent, v in d.items(): + if normalize_vectors: + n = np.linalg.norm(v) + all_vecs.append(v / n if n > 1e-10 else v) + else: + all_vecs.append(v) + entity_index.append((wid, ent)) + + if len(all_vecs) == 0: + _logger.info("No vectors loaded for windows %s", window_ids) + return ({}, {}) + + M = np.vstack(all_vecs) + global_mean = M.mean(axis=0) + + # PCA: centre globally and compute SVD + Mc = M - global_mean + try: + U, s, Vt = np.linalg.svd(Mc, full_matrices=False) + except np.linalg.LinAlgError: + _logger.exception("SVD failed in compute_nd_axes") + return ({}, {}) + + # Explained variance ratio for each component + sv2 = s**2 + evr = sv2 / (sv2.sum() + 1e-20) + explained_variance_ratio = evr[:n_components].tolist() + + # Component directions (normalized) + components = [ + Vt[i] / (np.linalg.norm(Vt[i]) + 1e-12) + for i in range(min(n_components, Vt.shape[0])) + ] + + # Build entity -> vector mapping + ent_to_vec = {ent: vec for (wid, ent), vec in zip(entity_index, M)} + + # Per-component flip directions using canonical party centroids + right_parties = CANONICAL_RIGHT + left_parties = CANONICAL_LEFT + + def _centroid_for_party_set(party_set): + vecs = [] + for p in party_set: + if p in ent_to_vec: + vecs.append(ent_to_vec[p]) + try: + conn = duckdb.connect(db_path) + rows = conn.execute("SELECT mp_name, party FROM mp_metadata").fetchall() + conn.close() + except Exception: + rows = [] + for mp_name, party in rows: + if party in party_set and mp_name in ent_to_vec: + vecs.append(ent_to_vec[mp_name]) + if not vecs: + return None + return np.mean(np.vstack(vecs), axis=0) + + left_cent = _centroid_for_party_set(left_parties) + right_cent = _centroid_for_party_set(right_parties) + + # Compute flip signs per component + flip_signs = [] + if left_cent is not None and right_cent is not None: + for i, comp in enumerate(components): + left_proj = float(np.dot(left_cent - global_mean, comp)) + right_proj = float(np.dot(right_cent - global_mean, comp)) + # Flip if right parties project lower than left (we want RIGHT > LEFT) + flip_signs.append(-1.0 if right_proj < left_proj else 1.0) + else: + flip_signs = [1.0] * len(components) + + # Project all entities onto all components + scores_by_window: Dict[str, Dict[str, np.ndarray]] = {wid: {} for wid in window_ids} + for (wid, ent), vec in zip(entity_index, M): + v_centered = vec - global_mean + scores = np.array( + [ + flip_signs[i] * float(np.dot(v_centered, components[i])) + for i in range(len(components)) + ] + ) + scores_by_window[wid][ent] = scores + + axes_def = { + "components": components, + "explained_variance_ratio": explained_variance_ratio, + "global_mean": global_mean, + "flip_signs": flip_signs, + "n_components": len(components), + } + + return scores_by_window, axes_def + + def compute_svd_spectrum( db_path: str, window_ids: Optional[List[str]] = None, diff --git a/explorer.py b/explorer.py index 6a5b95d..6d49c7a 100644 --- a/explorer.py +++ b/explorer.py @@ -2601,33 +2601,69 @@ def build_svd_components_tab(db_path: str) -> None: if coords } - # Extract 1D scores for this component - party_1d_coords: dict = {} + # Load aligned scores for ALL components 1-10 using PCA on aligned vectors. + # This ensures consistency between compass and SVD components tab. + def _get_aligned_party_scores(window: str) -> Dict[str, np.ndarray]: + """Get party scores for all N components from aligned PCA positions.""" + from analysis.political_axis import compute_nd_axes + + scores_by_window, _ = compute_nd_axes(db_path, n_components=10) + window_scores = scores_by_window.get(window, {}) + if not window_scores: + return {} - if comp_sel <= 2: - # Components 1-2: use aligned PCA positions from load_positions (consistent with compass) - aligned_coords = _get_aligned_party_coords(svd_window) - for party, (x, y) in aligned_coords.items(): - party_1d_coords[party] = (x,) if comp_sel == 1 else (y,) - else: - # Components 3-10: use raw SVD scores - idx = comp_sel - 1 # Convert to 0-indexed - for party, scores in party_scores.items(): - try: - if scores and len(scores) > idx: - party_1d_coords[party] = (float(scores[idx]),) - except Exception: - continue + # Load party map to convert MP names to parties + _party_map = load_party_map(db_path) - # Auto-compute flip directions for ALL components 1-10 based on party centroids. - # Each window's SVD has arbitrary sign orientation, so we compute flip per component - # to ensure canonical right parties (PVV, FVD, JA21, SGP) appear on the RIGHT. + # Aggregate MP scores to party centroids per component + n_comps = 10 + party_scores_agg: Dict[str, List[np.ndarray]] = {} + for mp_name, scores in window_scores.items(): + party = _party_map.get( + mp_name, _party_map.get(mp_name.split("(")[0].strip(), None) + ) + if party: + party_scores_agg.setdefault(party, []).append(scores[:n_comps]) + + # Compute mean scores per party for each component + return { + party: np.mean(np.vstack(score_list), axis=0) + for party, score_list in party_scores_agg.items() + if score_list + } + + # Extract 1D scores for this component using aligned PCA scores + party_1d_coords: dict = {} + aligned_all_scores = _get_aligned_party_scores(svd_window) + for party, all_scores in aligned_all_scores.items(): + idx = comp_sel - 1 # 0-indexed + if idx < len(all_scores): + party_1d_coords[party] = (float(all_scores[idx]),) + + # Auto-compute flip directions for ALL components 1-10 based on aligned party centroids. + # Since we now use aligned PCA scores for all components, compute flip directly from + # aligned scores to ensure canonical right parties (PVV, FVD, JA21, SGP) appear on RIGHT. computed_flips: Dict[int, bool] = {} try: - from analysis.svd_labels import compute_flip_direction - - for comp in range(1, 11): - computed_flips[comp] = compute_flip_direction(comp, party_scores) + from analysis.config import CANONICAL_LEFT, CANONICAL_RIGHT + + # Compute flip for each component based on aligned party scores + for comp_idx in range(10): + right_scores = [] + left_scores = [] + for party, scores in aligned_all_scores.items(): + if party in CANONICAL_RIGHT: + right_scores.append(scores[comp_idx]) + elif party in CANONICAL_LEFT: + left_scores.append(scores[comp_idx]) + + if right_scores and left_scores: + right_avg = np.mean(right_scores) + left_avg = np.mean(left_scores) + # Flip if right parties score lower than left (we want RIGHT > LEFT) + computed_flips[comp_idx + 1] = right_avg < left_avg + else: + computed_flips[comp_idx + 1] = False except Exception: # If flip computation fails, keep existing flip values from SVD_THEMES pass @@ -2657,9 +2693,8 @@ def build_svd_components_tab(db_path: str) -> None: has_current = "current_parliament" in available_windows all_windows = year_windows + (["current_parliament"] if has_current else []) - # For components 1-2, use aligned PCA positions for consistency with compass. - # For components 3-10, use raw SVD scores. - # Per-window flip computation handles orientation alignment for the trajectory. + # TODO: For full consistency, this should also use aligned PCA scores for all windows. + # Currently uses raw SVD scores for trajectory - single-window view uses aligned scores. party_scores_by_window = load_party_scores_all_windows(db_path, all_windows) _render_svd_time_trajectory(