You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
motief/analysis/political_axis.py

833 lines
32 KiB

"""political_axis.py — Project MP SVD vectors onto an ideological axis.
Two modes:
1. PCA mode (default): compute the first principal component of all MP SVD
vectors for a window and project each MP onto it. The sign is arbitrary
but consistent within a window.
2. Anchor mode: define the axis as the vector from the centroid of
``left_parties`` to the centroid of ``right_parties``. Project all MPs
onto this normalised anchor axis.
Both modes return a dict mapping mp_name → scalar score for the given window.
"""
import json
import logging
from typing import Dict, List, Optional, Tuple
import numpy as np
from . import trajectory as _trajectory
try:
import duckdb
except Exception: # pragma: no cover - allow importing module in lightweight test envs
duckdb = None # type: ignore
# Import canonical party sets from config for consistent orientation with SVD components tab
try:
from .config import CANONICAL_LEFT, CANONICAL_RIGHT
except Exception: # pragma: no cover - fallback for test environments
CANONICAL_LEFT: frozenset = frozenset()
CANONICAL_RIGHT: frozenset = frozenset()
_logger = logging.getLogger(__name__)
def _load_mp_svd_vectors(db_path: str, window_id: str) -> Dict[str, np.ndarray]:
"""Load all MP SVD vectors for a window from svd_vectors table."""
conn = duckdb.connect(db_path)
rows = conn.execute(
"SELECT entity_id, vector FROM svd_vectors WHERE window_id = ? AND entity_type = 'mp'",
(window_id,),
).fetchall()
conn.close()
result = {}
for mp_name, vec_json in rows:
try:
result[mp_name] = np.array(json.loads(vec_json), dtype=float)
except Exception:
_logger.warning("Could not parse SVD vector for MP %s", mp_name)
return result
def compute_pca_axis(db_path: str, window_id: str) -> Dict[str, float]:
"""Project MP SVD vectors onto their first principal component.
Returns {mp_name: score}. Returns empty dict if fewer than 2 MPs.
"""
mp_vecs = _load_mp_svd_vectors(db_path, window_id)
if len(mp_vecs) < 2:
_logger.warning(
"window %s has only %d MPs; skipping PCA axis", window_id, len(mp_vecs)
)
return {}
names = list(mp_vecs.keys())
mat = np.vstack([mp_vecs[n] for n in names]) # (n_mps, k)
# Centre
mat_centred = mat - mat.mean(axis=0)
# First PC via SVD
try:
_, _, Vt = np.linalg.svd(mat_centred, full_matrices=False)
axis = Vt[0] # (k,)
except np.linalg.LinAlgError:
_logger.exception("SVD failed in compute_pca_axis for window %s", window_id)
return {}
projections = mat_centred.dot(axis)
return {name: float(score) for name, score in zip(names, projections)}
def compute_anchor_axis(
db_path: str,
window_id: str,
left_parties: List[str],
right_parties: List[str],
) -> Dict[str, float]:
"""Project MP SVD vectors onto a left↔right anchor axis.
The axis runs from the centroid of ``left_parties`` to the centroid of
``right_parties``. Positive scores are toward the right.
Returns {mp_name: score}.
"""
mp_vecs = _load_mp_svd_vectors(db_path, window_id)
if not mp_vecs:
return {}
left_set = set(left_parties)
right_set = set(right_parties)
# 1. Party-level actors whose entity_id IS a party name (e.g. "GroenLinks-PvdA")
left_vecs = [mp_vecs[p] for p in left_set if p in mp_vecs]
right_vecs = [mp_vecs[p] for p in right_set if p in mp_vecs]
# 2. Individual MPs via mp_metadata party affiliation
conn = duckdb.connect(db_path)
rows = conn.execute("SELECT mp_name, party FROM mp_metadata").fetchall()
conn.close()
for mp_name, party in rows:
if mp_name not in mp_vecs:
continue
if party in left_set and mp_name not in left_set:
left_vecs.append(mp_vecs[mp_name])
elif party in right_set and mp_name not in right_set:
right_vecs.append(mp_vecs[mp_name])
if not left_vecs or not right_vecs:
_logger.warning(
"window %s: insufficient anchor parties (left=%d, right=%d)",
window_id,
len(left_vecs),
len(right_vecs),
)
return {}
left_centroid = np.mean(left_vecs, axis=0)
right_centroid = np.mean(right_vecs, axis=0)
axis = right_centroid - left_centroid
norm = np.linalg.norm(axis)
if norm < 1e-10:
_logger.warning("Anchor axis has near-zero norm for window %s", window_id)
return {}
axis = axis / norm
return {name: float(np.dot(vec, axis)) for name, vec in mp_vecs.items()}
def compute_2d_axes(
db_path: str,
window_ids: Optional[List[str]] = None,
method: str = "pca",
anchor_kwargs: Optional[Dict] = None,
normalize_vectors: bool = True,
pca_residual: bool = False,
) -> Tuple[Dict[str, Dict[str, Tuple[float, float]]], Dict[str, np.ndarray]]:
"""Compute 2D coordinates for MPs per window.
Args:
db_path: path to duckdb
window_ids: optional ordered list of windows (defaults to all)
method: 'pca' or 'anchor'
anchor_kwargs: when method=='anchor' must provide
{
'left_parties': List[str],
'right_parties': List[str],
'prog_parties': List[str],
'cons_parties': List[str],
}
Returns:
positions_by_window, axis_def
- positions_by_window: {window_id: {mp_name: (x,y)}}
- axis_def: {'x_axis': np.ndarray, 'y_axis': np.ndarray, 'method': str}
Notes:
This function expects aligned SVD vectors produced by
trajectory._procrustes_align_windows. It will call trajectory helpers
to load and align windows so the returned coordinates are consistent
across windows.
"""
# Import trajectory helper at runtime so tests can monkeypatch sys.modules
import importlib
_trajectory = importlib.import_module("analysis.trajectory")
if window_ids is None:
window_ids = _trajectory._load_window_ids(db_path)
# Load per-window raw vectors using the trajectory helper and align them
raw_window_vecs: Dict[str, Dict[str, np.ndarray]] = {}
for wid in window_ids:
raw_window_vecs[wid] = _trajectory._load_mp_vectors_for_window(db_path, wid)
# Pad all vectors to the maximum dimension across windows before Procrustes.
# Some windows (e.g. 2017 with only 30 motions) have lower-rank SVD output
# (dim=29 instead of 50). Padding with zeros lets Procrustes treat all vectors
# as the same dimension, preserving the alignment chain.
if raw_window_vecs:
max_dim = max(v.shape[0] for d in raw_window_vecs.values() for v in d.values())
padded: Dict[str, Dict[str, np.ndarray]] = {}
for wid, d in raw_window_vecs.items():
padded[wid] = {
e: np.pad(v, (0, max_dim - v.shape[0])) if v.shape[0] < max_dim else v
for e, v in d.items()
}
raw_window_vecs = padded
aligned_window_vecs = _trajectory._procrustes_align_windows(raw_window_vecs)
# Stack all vectors across windows into a single matrix for PCA if needed
all_vecs = []
entity_index = [] # parallel list of (window_id, entity)
for wid, d in aligned_window_vecs.items():
for ent, v in d.items():
if normalize_vectors:
n = np.linalg.norm(v)
all_vecs.append(v / n if n > 1e-10 else v)
else:
all_vecs.append(v)
entity_index.append((wid, ent))
if len(all_vecs) == 0:
_logger.info("No vectors loaded for windows %s", window_ids)
return ({}, {})
M = np.vstack(all_vecs)
if method == "pca":
# centre globally
Mc = M - M.mean(axis=0)
try:
U, s, Vt = np.linalg.svd(Mc, full_matrices=False)
except np.linalg.LinAlgError:
_logger.exception("SVD failed in compute_2d_axes (pca)")
return ({}, {})
# explained variance ratio from singular values
sv2 = s**2
evr = sv2 / (sv2.sum() + 1e-20)
evr1 = float(evr[0]) if evr.size > 0 else 0.0
evr2 = float(evr[1]) if evr.size > 1 else 0.0
# take top-1 component as primary axis
comp1 = Vt[0]
comp1_hat = comp1 / (np.linalg.norm(comp1) + 1e-12)
# By default take the second Vt row as the second axis, but optionally
# compute the second axis from residuals (PCA on data with PC1 removed)
comp2 = Vt[1] if Vt.shape[0] > 1 else np.zeros_like(comp1)
# compute residual-based second component if requested or if PC1 dominates
if pca_residual or evr1 > 0.85:
# Project out PC1 from centred data and compute PCA on residuals
proj1 = (Mc.dot(comp1_hat)).reshape(-1, 1) * comp1_hat.reshape(1, -1)
residual = Mc - proj1
try:
_, s_res, Vt_res = np.linalg.svd(residual, full_matrices=False)
comp2 = Vt_res[0]
_logger.info(
"Using residual PCA for second axis (pca_residual=%s)", pca_residual
)
except Exception:
_logger.exception(
"Residual PCA failed; falling back to raw second component"
)
comp2_hat = comp2 / (np.linalg.norm(comp2) + 1e-12)
axes = {
"x_axis": comp1_hat,
"y_axis": comp2_hat,
"method": "pca",
"explained_variance_ratio": [evr1, evr2],
"pca_residual_used": bool(pca_residual or evr1 > 0.85),
}
# Canonical party sets used for axis orientation (global and per-window).
# Use CANONICAL_LEFT/RIGHT from config for consistency with SVD components tab.
# X-axis: CANONICAL_RIGHT (right) vs CANONICAL_LEFT (left)
# Y-axis: CANONICAL_LEFT (progressive) vs cons_parties (conservative)
right_parties = CANONICAL_RIGHT
left_parties = CANONICAL_LEFT
cons_parties = CANONICAL_RIGHT # Conservative = right-leaning parties
prog_parties = CANONICAL_LEFT # Progressive = left-leaning parties
# Ensure consistent left/right and progressive/conservative orientation
# by checking canonical party centroids and flipping axis signs if needed.
try:
# Build mapping of entity -> vector from stacked matrix M
ent_to_vec = {ent: vec for (wid, ent), vec in zip(entity_index, M)}
def _centroid_for_party_set(party_set):
vecs = []
# Party-level centroid vectors (entity_id == party name directly).
for p in party_set:
if p in ent_to_vec:
vecs.append(ent_to_vec[p])
# MP-level vectors: mp_metadata stores mp_name in the same
# "Lastname, Initials" format as entity_id in svd_vectors.
try:
conn = duckdb.connect(db_path)
rows = conn.execute(
"SELECT mp_name, party FROM mp_metadata"
).fetchall()
conn.close()
except Exception:
rows = []
for mp_name, party in rows:
if party in party_set and mp_name in ent_to_vec:
vecs.append(ent_to_vec[mp_name])
if not vecs:
return None
return np.mean(np.vstack(vecs), axis=0)
# X-axis: left vs right
left_cent = _centroid_for_party_set(left_parties)
right_cent = _centroid_for_party_set(right_parties)
if left_cent is not None and right_cent is not None:
left_proj = float(np.dot(left_cent - M.mean(axis=0), comp1_hat))
right_proj = float(np.dot(right_cent - M.mean(axis=0), comp1_hat))
if right_proj < left_proj:
_logger.info(
"Flipping PCA x-axis to match canonical left/right orientation (right_proj=%.3f left_proj=%.3f)",
right_proj,
left_proj,
)
axes["x_axis"] = -axes["x_axis"]
# Y-axis: progressive vs conservative — positive Y = progressive
prog_cent = _centroid_for_party_set(prog_parties)
cons_cent = _centroid_for_party_set(cons_parties)
if prog_cent is not None and cons_cent is not None:
prog_proj = float(np.dot(prog_cent - M.mean(axis=0), comp2_hat))
cons_proj = float(np.dot(cons_cent - M.mean(axis=0), comp2_hat))
# We want positive Y to mean 'progressive'. If the progressive
# centroid currently projects lower than the conservative centroid,
# flip the sign so progressive > conservative.
if prog_proj < cons_proj:
_logger.info(
"Flipping PCA y-axis so positive Y corresponds to progressive (prog_proj=%.3f cons_proj=%.3f)",
prog_proj,
cons_proj,
)
axes["y_axis"] = -axes["y_axis"]
except Exception:
_logger.debug("Could not auto-orient PCA axes; leaving signs as-is")
# warn if PCA is effectively 1-D
if evr1 > 0.85 and not pca_residual:
_logger.warning(
"PCA first component explains %.1f%% of variance — data is near-1D;\n"
"consider using pca_residual=True or the anchor method for a second axis",
evr1 * 100,
)
# project per-window vectors (centre by global mean)
global_mean = M.mean(axis=0)
axes["global_mean"] = global_mean
positions_by_window: Dict[str, Dict[str, Tuple[float, float]]] = {
wid: {} for wid in window_ids
}
for (wid, ent), vec in zip(entity_index, M):
v_centered = vec - global_mean
x = float(np.dot(v_centered, axes["x_axis"]))
y = float(np.dot(v_centered, axes["y_axis"]))
positions_by_window[wid][ent] = (x, y)
# Per-window Y-axis correction: ensure "positive Y = progressive" holds
# for EACH window individually. The global orientation check above uses
# centroids averaged across all windows, so individual windows (e.g. an
# election year with few returning MPs) can still be inverted. We check
# each window and flip its Y values if conservative parties sit above
# progressive ones.
try:
# Fetch mp_metadata once for the per-window check
_mp_meta_rows: List[Tuple[str, str]] = []
try:
conn = duckdb.connect(db_path)
_mp_meta_rows = conn.execute(
"SELECT mp_name, party FROM mp_metadata"
).fetchall()
conn.close()
except Exception:
pass # no DB available (e.g. unit tests without metadata)
# Map mp_name -> party
_mp_party: Dict[str, str] = {r[0]: r[1] for r in _mp_meta_rows}
y_flipped_windows: set = set()
for wid, pos_dict in positions_by_window.items():
prog_ys = []
cons_ys = []
for ent, (x_val, y_val) in pos_dict.items():
# direct party entity
if ent in prog_parties:
prog_ys.append(y_val)
elif ent in cons_parties:
cons_ys.append(y_val)
# individual MP via metadata lookup
party = _mp_party.get(ent)
if party is not None:
if party in prog_parties:
prog_ys.append(y_val)
elif party in cons_parties:
cons_ys.append(y_val)
if prog_ys and cons_ys:
prog_avg = float(np.mean(prog_ys))
cons_avg = float(np.mean(cons_ys))
if cons_avg > prog_avg:
_logger.info(
"Per-window Y flip for window %s: "
"prog_avg_y=%.3f cons_avg_y=%.3f — negating Y",
wid,
prog_avg,
cons_avg,
)
positions_by_window[wid] = {
ent: (x_val, -y_val)
for ent, (x_val, y_val) in pos_dict.items()
}
y_flipped_windows.add(wid)
axes["y_flipped_windows"] = y_flipped_windows
# Per-window X-axis correction: mirror the Y-axis logic above.
# The global X-flip uses centroids averaged across all windows, so
# individual windows can still have left/right inverted.
x_flipped_windows: set = set()
for wid, pos_dict in positions_by_window.items():
right_xs = []
left_xs = []
for ent, (x_val, y_val) in pos_dict.items():
# direct party entity
if ent in right_parties:
right_xs.append(x_val)
elif ent in left_parties:
left_xs.append(x_val)
# individual MP via metadata lookup
party = _mp_party.get(ent)
if party is not None:
if party in right_parties:
right_xs.append(x_val)
elif party in left_parties:
left_xs.append(x_val)
if right_xs and left_xs:
right_avg = float(np.mean(right_xs))
left_avg = float(np.mean(left_xs))
if left_avg > right_avg:
_logger.info(
"Per-window X flip for window %s: "
"right_avg_x=%.3f left_avg_x=%.3f — negating X",
wid,
right_avg,
left_avg,
)
positions_by_window[wid] = {
ent: (-x_val, y_val)
for ent, (x_val, y_val) in pos_dict.items()
}
x_flipped_windows.add(wid)
axes["x_flipped_windows"] = x_flipped_windows
except Exception:
_logger.debug(
"Per-window orientation check failed; leaving per-window axes as-is"
)
return positions_by_window, axes
elif method == "anchor":
if not anchor_kwargs:
raise ValueError("anchor_kwargs required for method='anchor'")
left = set(anchor_kwargs.get("left_parties", []))
right = set(anchor_kwargs.get("right_parties", []))
prog = set(anchor_kwargs.get("prog_parties", []))
cons = set(anchor_kwargs.get("cons_parties", []))
# collect vectors across all windows for each anchor group
def collect_for_party_set(party_set: set) -> List[np.ndarray]:
res: List[np.ndarray] = []
# party-level entities (entity_id equals party name)
for wid, d in aligned_window_vecs.items():
for ent, v in d.items():
if ent in party_set:
res.append(v)
# MP-level via mp_metadata party affiliation
conn = duckdb.connect(db_path)
rows = conn.execute("SELECT mp_name, party FROM mp_metadata").fetchall()
conn.close()
for mp_name, party in rows:
if party in party_set:
# take all vectors for this MP across windows if present
for wid, d in aligned_window_vecs.items():
if mp_name in d:
res.append(d[mp_name])
return res
left_vecs = collect_for_party_set(left)
right_vecs = collect_for_party_set(right)
prog_vecs = collect_for_party_set(prog)
cons_vecs = collect_for_party_set(cons)
if not left_vecs or not right_vecs or not prog_vecs or not cons_vecs:
_logger.warning("Insufficient anchor vectors for requested parties")
return ({}, {})
left_centroid = np.mean(np.vstack(left_vecs), axis=0)
right_centroid = np.mean(np.vstack(right_vecs), axis=0)
prog_centroid = np.mean(np.vstack(prog_vecs), axis=0)
cons_centroid = np.mean(np.vstack(cons_vecs), axis=0)
lr = right_centroid - left_centroid
# Construct progressive-conservative axis so that positive Y corresponds
# to *progressive* positions (consistent with PCA branch where we flip
# the sign to make progressive > conservative). Use prog - cons so a
# positive dot product means closer to the progressive centroid.
pc = prog_centroid - cons_centroid
# Gram-Schmidt: make pc orthogonal to lr
lr_norm = np.linalg.norm(lr)
if lr_norm < 1e-12:
raise ValueError("Left-right anchor axis has near-zero norm")
lr_hat = lr / lr_norm
# remove projection of pc on lr
pc = pc - np.dot(pc, lr_hat) * lr_hat
pc_norm = np.linalg.norm(pc)
if pc_norm < 1e-12:
raise ValueError(
"Progressive-conservative anchor axis degenerate after orthogonalisation"
)
pc_hat = pc / pc_norm
axes = {"x_axis": lr_hat, "y_axis": pc_hat, "method": "anchor"}
positions_by_window = {wid: {} for wid in window_ids}
for wid, d in aligned_window_vecs.items():
for ent, v in d.items():
x = float(np.dot(v, axes["x_axis"]))
y = float(np.dot(v, axes["y_axis"]))
positions_by_window[wid][ent] = (x, y)
return positions_by_window, axes
else:
raise ValueError("Unknown method '%s'" % method)
def compute_nd_axes(
db_path: str,
window_ids: Optional[List[str]] = None,
n_components: int = 10,
normalize_vectors: bool = True,
) -> Tuple[Dict[str, Dict[str, np.ndarray]], Dict]:
"""Compute aligned PCA projections onto N components for MPs per window.
This extends compute_2d_axes to return projections onto all N principal
components (not just the first 2), enabling consistent aligned positioning
for SVD components 1-10 in the explorer.
Args:
db_path: path to duckdb
window_ids: optional ordered list of windows (defaults to all)
n_components: number of PCA components to compute (default 10)
normalize_vectors: whether to normalize vectors before PCA (default True)
Returns:
scores_by_window, axes_def
- scores_by_window: {window_id: {entity: np.ndarray of shape (n_components,)}}
- axes_def: dict with 'components' (list of component vectors),
'explained_variance_ratio', 'global_mean', etc.
"""
import importlib
_trajectory = importlib.import_module("analysis.trajectory")
if window_ids is None:
window_ids = _trajectory._load_window_ids(db_path)
# Load per-window raw vectors and align them
raw_window_vecs: Dict[str, Dict[str, np.ndarray]] = {}
for wid in window_ids:
raw_window_vecs[wid] = _trajectory._load_mp_vectors_for_window(db_path, wid)
# Pad all vectors to maximum dimension across windows
if raw_window_vecs:
max_dim = max(v.shape[0] for d in raw_window_vecs.values() for v in d.values())
padded: Dict[str, Dict[str, np.ndarray]] = {}
for wid, d in raw_window_vecs.items():
padded[wid] = {
e: np.pad(v, (0, max_dim - v.shape[0])) if v.shape[0] < max_dim else v
for e, v in d.items()
}
raw_window_vecs = padded
aligned_window_vecs = _trajectory._procrustes_align_windows(raw_window_vecs)
# Stack all aligned vectors across windows
all_vecs = []
entity_index = [] # parallel list of (window_id, entity)
for wid, d in aligned_window_vecs.items():
for ent, v in d.items():
if normalize_vectors:
n = np.linalg.norm(v)
all_vecs.append(v / n if n > 1e-10 else v)
else:
all_vecs.append(v)
entity_index.append((wid, ent))
if len(all_vecs) == 0:
_logger.info("No vectors loaded for windows %s", window_ids)
return ({}, {})
M = np.vstack(all_vecs)
global_mean = M.mean(axis=0)
# PCA: centre globally and compute SVD
Mc = M - global_mean
try:
U, s, Vt = np.linalg.svd(Mc, full_matrices=False)
except np.linalg.LinAlgError:
_logger.exception("SVD failed in compute_nd_axes")
return ({}, {})
# Explained variance ratio for each component
sv2 = s**2
evr = sv2 / (sv2.sum() + 1e-20)
explained_variance_ratio = evr[:n_components].tolist()
# Component directions (normalized)
components = [
Vt[i] / (np.linalg.norm(Vt[i]) + 1e-12)
for i in range(min(n_components, Vt.shape[0]))
]
# Build entity -> vector mapping
ent_to_vec = {ent: vec for (wid, ent), vec in zip(entity_index, M)}
# Per-component flip directions using canonical party centroids
right_parties = CANONICAL_RIGHT
left_parties = CANONICAL_LEFT
def _centroid_for_party_set(party_set):
vecs = []
for p in party_set:
if p in ent_to_vec:
vecs.append(ent_to_vec[p])
try:
conn = duckdb.connect(db_path)
rows = conn.execute("SELECT mp_name, party FROM mp_metadata").fetchall()
conn.close()
except Exception:
rows = []
for mp_name, party in rows:
if party in party_set and mp_name in ent_to_vec:
vecs.append(ent_to_vec[mp_name])
if not vecs:
return None
return np.mean(np.vstack(vecs), axis=0)
left_cent = _centroid_for_party_set(left_parties)
right_cent = _centroid_for_party_set(right_parties)
# Compute flip signs per component
flip_signs = []
if left_cent is not None and right_cent is not None:
for i, comp in enumerate(components):
left_proj = float(np.dot(left_cent - global_mean, comp))
right_proj = float(np.dot(right_cent - global_mean, comp))
# Flip if right parties project lower than left (we want RIGHT > LEFT)
flip_signs.append(-1.0 if right_proj < left_proj else 1.0)
else:
flip_signs = [1.0] * len(components)
# Project all entities onto all components
scores_by_window: Dict[str, Dict[str, np.ndarray]] = {wid: {} for wid in window_ids}
for (wid, ent), vec in zip(entity_index, M):
v_centered = vec - global_mean
scores = np.array(
[
flip_signs[i] * float(np.dot(v_centered, components[i]))
for i in range(len(components))
]
)
scores_by_window[wid][ent] = scores
axes_def = {
"components": components,
"explained_variance_ratio": explained_variance_ratio,
"global_mean": global_mean,
"flip_signs": flip_signs,
"n_components": len(components),
}
return scores_by_window, axes_def
def compute_svd_spectrum(
db_path: str,
window_ids: Optional[List[str]] = None,
normalize_vectors: bool = True,
) -> List[float]:
"""Return explained variance ratios (%) for all SVD components, sorted descending.
Uses the same Procrustes-aligned multi-window matrix as compute_2d_axes so the
scree plot is consistent with the compass axes.
Args:
db_path: path to duckdb
window_ids: optional ordered list of windows (defaults to all)
normalize_vectors: whether to L2-normalise each MP vector before stacking
Returns:
List of EVR percentages sorted descending (e.g. [24.1, 10.4, 7.2, ...])
"""
import importlib
_trajectory = importlib.import_module("analysis.trajectory")
if window_ids is None:
window_ids = _trajectory._load_window_ids(db_path)
raw_window_vecs: Dict[str, Dict[str, np.ndarray]] = {}
for wid in window_ids:
raw_window_vecs[wid] = _trajectory._load_mp_vectors_for_window(db_path, wid)
if not raw_window_vecs:
return []
# Pad to uniform dimension before Procrustes alignment
max_dim = max(v.shape[0] for d in raw_window_vecs.values() for v in d.values())
padded: Dict[str, Dict[str, np.ndarray]] = {}
for wid, d in raw_window_vecs.items():
padded[wid] = {
e: np.pad(v, (0, max_dim - v.shape[0])) if v.shape[0] < max_dim else v
for e, v in d.items()
}
aligned = _trajectory._procrustes_align_windows(padded)
all_vecs = []
for d in aligned.values():
for v in d.values():
if normalize_vectors:
n = np.linalg.norm(v)
all_vecs.append(v / n if n > 1e-10 else v)
else:
all_vecs.append(v)
if not all_vecs:
return []
M = np.vstack(all_vecs)
Mc = M - M.mean(axis=0)
try:
_, s, _ = np.linalg.svd(Mc, full_matrices=False)
except np.linalg.LinAlgError:
_logger.exception("SVD failed in compute_svd_spectrum")
return []
sv2 = s**2
evr = sv2 / (sv2.sum() + 1e-20) * 100
return list(evr) # already sorted descending by SVD
def compute_party_bootstrap_cis(
party_vectors: Dict[str, List[np.ndarray]],
n_boot: int = 1000,
ci: float = 95.0,
seed: int = 42,
) -> Dict[str, Dict]:
"""Compute bootstrap confidence intervals for party centroid vectors.
For each party, resamples its MP vectors with replacement to build a
distribution of centroid estimates, then extracts percentile-based
confidence intervals per dimension.
Args:
party_vectors: mapping of party name → list of individual MP vectors
(each a numpy array of consistent length, e.g. 50 dimensions).
n_boot: number of bootstrap replicates.
ci: confidence level as a percentage (e.g. 95.0 for 95% CI).
seed: random seed for reproducibility (used with ``np.random.default_rng``).
Returns:
Dict mapping party name → dict with keys ``centroid``, ``ci_lower``,
``ci_upper``, ``std``, and ``n_mps``. Parties with no MPs (empty
list) are excluded from the output.
"""
alpha = 100.0 - ci
lo_pct = alpha / 2.0
hi_pct = 100.0 - lo_pct
rng = np.random.default_rng(seed)
result: Dict[str, Dict] = {}
for party, vectors in party_vectors.items():
n_mps = len(vectors)
if n_mps == 0:
continue
mat = np.vstack(vectors) # (n_mps, dim)
centroid = np.mean(mat, axis=0)
if n_mps == 1:
result[party] = {
"centroid": centroid,
"ci_lower": centroid.copy(),
"ci_upper": centroid.copy(),
"std": np.zeros_like(centroid),
"n_mps": 1,
}
continue
idx = rng.integers(0, n_mps, size=(n_boot, n_mps))
boot_centroids = mat[idx].mean(axis=1) # (n_boot, dim)
ci_lower = np.percentile(boot_centroids, lo_pct, axis=0)
ci_upper = np.percentile(boot_centroids, hi_pct, axis=0)
std = np.std(boot_centroids, axis=0)
result[party] = {
"centroid": centroid,
"ci_lower": ci_lower,
"ci_upper": ci_upper,
"std": std,
"n_mps": n_mps,
}
_logger.info(
"Bootstrap CIs computed for %d parties (n_boot=%d, ci=%.1f%%)",
len(result),
n_boot,
ci,
)
return result