diff --git a/analysis/political_axis.py b/analysis/political_axis.py index 3adfd14..218b508 100644 --- a/analysis/political_axis.py +++ b/analysis/political_axis.py @@ -551,3 +551,71 @@ def compute_2d_axes( else: raise ValueError("Unknown method '%s'" % method) + + +def compute_svd_spectrum( + db_path: str, + window_ids: Optional[List[str]] = None, + normalize_vectors: bool = True, +) -> List[float]: + """Return explained variance ratios (%) for all SVD components, sorted descending. + + Uses the same Procrustes-aligned multi-window matrix as compute_2d_axes so the + scree plot is consistent with the compass axes. + + Args: + db_path: path to duckdb + window_ids: optional ordered list of windows (defaults to all) + normalize_vectors: whether to L2-normalise each MP vector before stacking + + Returns: + List of EVR percentages sorted descending (e.g. [24.1, 10.4, 7.2, ...]) + """ + import importlib + + _trajectory = importlib.import_module("analysis.trajectory") + + if window_ids is None: + window_ids = _trajectory._load_window_ids(db_path) + + raw_window_vecs: Dict[str, Dict[str, np.ndarray]] = {} + for wid in window_ids: + raw_window_vecs[wid] = _trajectory._load_mp_vectors_for_window(db_path, wid) + + if not raw_window_vecs: + return [] + + # Pad to uniform dimension before Procrustes alignment + max_dim = max(v.shape[0] for d in raw_window_vecs.values() for v in d.values()) + padded: Dict[str, Dict[str, np.ndarray]] = {} + for wid, d in raw_window_vecs.items(): + padded[wid] = { + e: np.pad(v, (0, max_dim - v.shape[0])) if v.shape[0] < max_dim else v + for e, v in d.items() + } + + aligned = _trajectory._procrustes_align_windows(padded) + + all_vecs = [] + for d in aligned.values(): + for v in d.values(): + if normalize_vectors: + n = np.linalg.norm(v) + all_vecs.append(v / n if n > 1e-10 else v) + else: + all_vecs.append(v) + + if not all_vecs: + return [] + + M = np.vstack(all_vecs) + Mc = M - M.mean(axis=0) + try: + _, s, _ = np.linalg.svd(Mc, full_matrices=False) + except np.linalg.LinAlgError: + _logger.exception("SVD failed in compute_svd_spectrum") + return [] + + sv2 = s**2 + evr = sv2 / (sv2.sum() + 1e-20) * 100 + return list(evr) # already sorted descending by SVD diff --git a/explorer.py b/explorer.py index 41393fe..48709cc 100644 --- a/explorer.py +++ b/explorer.py @@ -487,51 +487,18 @@ def load_party_axis_scores(db_path: str) -> Dict[str, List[float]]: @st.cache_data(show_spinner="Scree-plot laden…") def load_scree_data(db_path: str) -> List[float]: - """Return per-component importances (L2-norm per SVD dim), sorted descending. + """Return explained variance ratios (%) for all SVD components, sorted descending. - Uses individual MP vectors from current_parliament (entity_id LIKE '%,%'). - Computes L2-norm per SVD dimension across all MPs, then sorts descending - so the elbow shape is visible in the scree chart. + Uses the same Procrustes-aligned multi-window matrix as the compass axes so the + scree plot is consistent with what the compass actually uses. """ try: - con = duckdb.connect(database=db_path, read_only=True) - rows = con.execute( - "SELECT entity_id, vector FROM svd_vectors " - "WHERE entity_type='mp' AND window_id='current_parliament' " - "AND entity_id LIKE '%,%'" - ).fetchall() - vectors: List[List[float]] = [] - for entity_id, raw_vec in rows: - if isinstance(raw_vec, str): - vec = json.loads(raw_vec) - elif isinstance(raw_vec, (bytes, bytearray)): - vec = json.loads(raw_vec.decode()) - elif isinstance(raw_vec, list): - vec = raw_vec - else: - try: - vec = list(raw_vec) - except Exception: - continue - fvec = [float(v) if v is not None else 0.0 for v in vec] - vectors.append(fvec) - if not vectors: - return [] - n_dims = len(vectors[0]) - importances: List[float] = [] - for dim in range(n_dims): - col = [v[dim] for v in vectors if dim < len(v)] - l2 = sum(x**2 for x in col) ** 0.5 - importances.append(l2) - return sorted(importances, reverse=True) + from analysis.political_axis import compute_svd_spectrum + + return compute_svd_spectrum(db_path) except Exception: logger.exception("Failed to load scree data") return [] - finally: - try: - con.close() - except Exception: - pass def _render_scree_plot(importances: List[float], n_show: int = 15) -> None: @@ -547,9 +514,9 @@ def _render_scree_plot(importances: List[float], n_show: int = 15) -> None: """ if not importances: return - total = sum(importances) or 1.0 - raw = importances[:n_show] - data = [v / total * 100 for v in raw] + # importances are already EVR percentages summing to ~100 over all components. + # Slice to n_show for display; cumulative line shows how much variance is covered. + data = list(importances[:n_show]) ranks = list(range(1, len(data) + 1)) # Cumulative variance for the dashed overlay line @@ -573,7 +540,7 @@ def _render_scree_plot(importances: List[float], n_show: int = 15) -> None: x=ranks, y=data, marker_color=bar_colours, - hovertemplate="As %{x}
%{y:.1f}% van totaal", + hovertemplate="As %{x}
%{y:.1f}% verklaarde variantie", showlegend=False, ) )