feat(analysis): add 2D political compass and 2D trajectories

- compute_2d_axes (PCA + anchor)
- compute_2d_trajectories
- plot_political_compass, plot_2d_trajectories
- unit test: tests/test_political_compass.py
main
Sven Geboers 1 month ago
parent f7d806dc3a
commit 3551a82f83
  1. 1
      .env
  2. 162
      analysis/political_axis.py
  3. 56
      analysis/trajectory.py
  4. 124
      analysis/visualize.py
  5. BIN
      data/motions.db
  6. 44
      tests/test_political_compass.py

@ -0,0 +1 @@
OPENROUTER_API_KEY="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"

@ -14,9 +14,10 @@ Both modes return a dict mapping mp_name → scalar score for the given window.
import json
import logging
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Tuple
import numpy as np
from . import trajectory as _trajectory
import duckdb
_logger = logging.getLogger(__name__)
@ -125,3 +126,162 @@ def compute_anchor_axis(
axis = axis / norm
return {name: float(np.dot(vec, axis)) for name, vec in mp_vecs.items()}
def compute_2d_axes(
db_path: str,
window_ids: Optional[List[str]] = None,
method: str = "pca",
anchor_kwargs: Optional[Dict] = None,
) -> Tuple[Dict[str, Dict[str, Tuple[float, float]]], Dict[str, np.ndarray]]:
"""Compute 2D coordinates for MPs per window.
Args:
db_path: path to duckdb
window_ids: optional ordered list of windows (defaults to all)
method: 'pca' or 'anchor'
anchor_kwargs: when method=='anchor' must provide
{
'left_parties': List[str],
'right_parties': List[str],
'prog_parties': List[str],
'cons_parties': List[str],
}
Returns:
positions_by_window, axis_def
- positions_by_window: {window_id: {mp_name: (x,y)}}
- axis_def: {'x_axis': np.ndarray, 'y_axis': np.ndarray, 'method': str}
Notes:
This function expects aligned SVD vectors produced by
trajectory._procrustes_align_windows. It will call trajectory helpers
to load and align windows so the returned coordinates are consistent
across windows.
"""
if window_ids is None:
window_ids = _trajectory._load_window_ids(db_path)
# Load per-window raw vectors using the trajectory helper and align them
raw_window_vecs: Dict[str, Dict[str, np.ndarray]] = {}
for wid in window_ids:
raw_window_vecs[wid] = _trajectory._load_mp_vectors_for_window(db_path, wid)
aligned_window_vecs = _trajectory._procrustes_align_windows(raw_window_vecs)
# Stack all vectors across windows into a single matrix for PCA if needed
all_vecs = []
entity_index = [] # parallel list of (window_id, entity)
for wid, d in aligned_window_vecs.items():
for ent, v in d.items():
all_vecs.append(v)
entity_index.append((wid, ent))
if len(all_vecs) == 0:
_logger.info("No vectors loaded for windows %s", window_ids)
return ({}, {})
M = np.vstack(all_vecs)
if method == "pca":
# centre globally
Mc = M - M.mean(axis=0)
try:
_, _, Vt = np.linalg.svd(Mc, full_matrices=False)
except np.linalg.LinAlgError:
_logger.exception("SVD failed in compute_2d_axes (pca)")
return ({}, {})
# take top-2 components as axes (shape k,)
comp1 = Vt[0]
comp2 = Vt[1] if Vt.shape[0] > 1 else np.zeros_like(comp1)
axes = {
"x_axis": comp1 / (np.linalg.norm(comp1) + 1e-12),
"y_axis": comp2 / (np.linalg.norm(comp2) + 1e-12),
"method": "pca",
}
# project per-window vectors (centre by global mean)
global_mean = M.mean(axis=0)
positions_by_window: Dict[str, Dict[str, Tuple[float, float]]] = {
wid: {} for wid in window_ids
}
for (wid, ent), vec in zip(entity_index, M):
v_centered = vec - global_mean
x = float(np.dot(v_centered, axes["x_axis"]))
y = float(np.dot(v_centered, axes["y_axis"]))
positions_by_window[wid][ent] = (x, y)
return positions_by_window, axes
elif method == "anchor":
if not anchor_kwargs:
raise ValueError("anchor_kwargs required for method='anchor'")
left = set(anchor_kwargs.get("left_parties", []))
right = set(anchor_kwargs.get("right_parties", []))
prog = set(anchor_kwargs.get("prog_parties", []))
cons = set(anchor_kwargs.get("cons_parties", []))
# collect vectors across all windows for each anchor group
def collect_for_party_set(party_set: set) -> List[np.ndarray]:
res: List[np.ndarray] = []
# party-level entities (entity_id equals party name)
for wid, d in aligned_window_vecs.items():
for ent, v in d.items():
if ent in party_set:
res.append(v)
# MP-level via mp_metadata party affiliation
conn = duckdb.connect(db_path)
rows = conn.execute("SELECT mp_name, party FROM mp_metadata").fetchall()
conn.close()
for mp_name, party in rows:
if party in party_set:
# take all vectors for this MP across windows if present
for wid, d in aligned_window_vecs.items():
if mp_name in d:
res.append(d[mp_name])
return res
left_vecs = collect_for_party_set(left)
right_vecs = collect_for_party_set(right)
prog_vecs = collect_for_party_set(prog)
cons_vecs = collect_for_party_set(cons)
if not left_vecs or not right_vecs or not prog_vecs or not cons_vecs:
_logger.warning("Insufficient anchor vectors for requested parties")
return ({}, {})
left_centroid = np.mean(np.vstack(left_vecs), axis=0)
right_centroid = np.mean(np.vstack(right_vecs), axis=0)
prog_centroid = np.mean(np.vstack(prog_vecs), axis=0)
cons_centroid = np.mean(np.vstack(cons_vecs), axis=0)
lr = right_centroid - left_centroid
pc = cons_centroid - prog_centroid
# Gram-Schmidt: make pc orthogonal to lr
lr_norm = np.linalg.norm(lr)
if lr_norm < 1e-12:
raise ValueError("Left-right anchor axis has near-zero norm")
lr_hat = lr / lr_norm
# remove projection of pc on lr
pc = pc - np.dot(pc, lr_hat) * lr_hat
pc_norm = np.linalg.norm(pc)
if pc_norm < 1e-12:
raise ValueError(
"Progressive-conservative anchor axis degenerate after orthogonalisation"
)
pc_hat = pc / pc_norm
axes = {"x_axis": lr_hat, "y_axis": pc_hat, "method": "anchor"}
positions_by_window = {wid: {} for wid in window_ids}
for wid, d in aligned_window_vecs.items():
for ent, v in d.items():
x = float(np.dot(v, axes["x_axis"]))
y = float(np.dot(v, axes["y_axis"]))
positions_by_window[wid][ent] = (x, y)
return positions_by_window, axes
else:
raise ValueError("Unknown method '%s'" % method)

@ -195,6 +195,62 @@ def compute_trajectories(
return result
def compute_2d_trajectories(
db_path: str, method: str = "pca", anchor_kwargs: Optional[Dict] = None
) -> Dict[str, Dict]:
"""Compute 2D trajectory positions for MPs using compute_2d_axes.
Returns dict keyed by mp_name with:
{
'windows': [window_ids...],
'coords': [[x,y], ...],
'step_vectors': [[dx,dy], ...],
'step_magnitudes': [float,...],
'total_magnitude': float,
}
Only MPs present in >=2 windows are included.
"""
from .political_axis import compute_2d_axes
window_ids = _load_window_ids(db_path)
if len(window_ids) < 2:
_logger.info("Fewer than 2 windows — no 2D trajectories to compute")
return {}
positions_by_window, axes = compute_2d_axes(
db_path, window_ids=window_ids, method=method, anchor_kwargs=anchor_kwargs
)
# Build per-MP time-ordered coords
mp_data: Dict[str, Dict] = {}
for wid in window_ids:
pos = positions_by_window.get(wid, {})
for mp_name, coord in pos.items():
if mp_name not in mp_data:
mp_data[mp_name] = {"windows": [], "coords": []}
mp_data[mp_name]["windows"].append(wid)
mp_data[mp_name]["coords"].append(tuple(coord))
result: Dict[str, Dict] = {}
for mp_name, data in mp_data.items():
if len(data["windows"]) < 2:
continue
coords = [np.array(c, dtype=float) for c in data["coords"]]
step_vecs = [coords[i + 1] - coords[i] for i in range(len(coords) - 1)]
mags = [float(np.linalg.norm(v)) for v in step_vecs]
result[mp_name] = {
"windows": data["windows"],
"coords": [[float(c[0]), float(c[1])] for c in coords],
"step_vectors": [[float(v[0]), float(v[1])] for v in step_vecs],
"step_magnitudes": mags,
"total_magnitude": float(sum(mags)),
}
_logger.info("2D trajectories computed for %d MPs", len(result))
return result
def top_drifters(trajectories: Dict[str, Dict], n: int = 10) -> List[Dict]:
"""Return the top-n MPs by total drift, sorted descending.

@ -9,9 +9,10 @@ Functions:
"""
import logging
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Tuple
import numpy as np
from typing import Any
_logger = logging.getLogger(__name__)
@ -161,3 +162,124 @@ def plot_political_axis(
fig.write_html(output_path, include_plotlyjs="cdn")
_logger.info("Political axis chart written to %s", output_path)
return output_path
def plot_political_compass(
positions_by_window: Dict,
window_id: str,
party_of: Optional[Dict] = None,
output_path: str = "analysis_compass.html",
) -> str:
"""Plot 2D political compass scatter for a single window.
Args:
positions_by_window: {window_id: {mp_name: (x,y)}}
window_id: which window to plot
party_of: optional mapping mp_name -> party for colouring
output_path: HTML output path
Returns output_path
"""
go, px = _require_plotly()
pos = positions_by_window.get(window_id, {})
xs = [v[0] for v in pos.values()]
ys = [v[1] for v in pos.values()]
names = list(pos.keys())
# If no party mapping provided, try to load from data/motions.db (duckdb)
if party_of is None:
try:
import duckdb # type: ignore
try:
conn = duckdb.connect(database="data/motions.db", read_only=True)
df = conn.execute("SELECT mp_name, party FROM mp_metadata").fetchdf()
# convert to dict
party_of = {
row[0]: row[1] for row in df.itertuples(index=False, name=None)
}
_logger.info(
"Loaded party mapping for %d MPs from data/motions.db",
len(party_of),
)
finally:
try:
conn.close()
except Exception:
pass
except ImportError:
_logger.debug("duckdb not installed; proceeding without party mapping")
except Exception as e:
_logger.debug("Could not load party mapping from data/motions.db: %s", e)
parties = [party_of.get(n, "Unknown") if party_of else "Unknown" for n in names]
fig = px.scatter(
x=xs,
y=ys,
color=parties,
hover_name=names,
title=f"Political Compass ({window_id})",
labels={
"x": "Left ← — → Right",
"y": "Progressive ← — → Conservative",
"color": "Party",
},
)
fig.update_traces(marker=dict(size=8, opacity=0.8))
fig.write_html(output_path, include_plotlyjs="cdn")
_logger.info("Political compass written to %s", output_path)
return output_path
def plot_2d_trajectories(
positions_by_window: Dict,
mp_names: Optional[List[str]] = None,
output_path: str = "analysis_trajectories_compass.html",
) -> str:
"""Plot MP trajectories across windows on the 2D compass.
Args:
positions_by_window: {window_id: {mp_name: (x,y)}}
mp_names: list of MPs to plot (default: all found in positions)
output_path: output HTML path
"""
go, px = _require_plotly()
# collect window order
window_ids = list(positions_by_window.keys())
# build per-MP time-ordered coords
# mp_coords maps mp_name -> list of (window_id, (x,y))
mp_coords: Dict[str, List[Tuple[str, Tuple[float, float]]]] = {}
for wid in window_ids:
for mp, coord in positions_by_window.get(wid, {}).items():
mp_coords.setdefault(mp, []).append((wid, coord))
if mp_names is None:
mp_names = list(mp_coords.keys())
fig = go.Figure()
for mp in mp_names:
if mp not in mp_coords:
continue
items = mp_coords[mp]
# ensure sorted by window order
items_sorted = sorted(items, key=lambda it: window_ids.index(it[0]))
xs = [c[1][0] for c in items_sorted]
ys = [c[1][1] for c in items_sorted]
text = [f"{mp} ({w})" for w, _ in items_sorted]
fig.add_trace(
go.Scatter(
x=xs, y=ys, mode="lines+markers", name=mp, text=text, hoverinfo="text"
)
)
fig.update_layout(
title="MP Trajectories on Political Compass",
xaxis_title="Left ← — → Right",
yaxis_title="Progressive ← — → Conservative",
)
fig.write_html(output_path, include_plotlyjs="cdn")
_logger.info("2D trajectories compass written to %s", output_path)
return output_path

Binary file not shown.

@ -0,0 +1,44 @@
import numpy as np
import types
import sys
import pytest
def test_compute_2d_axes_pca_synthetic(monkeypatch):
"""Synthetic test for compute_2d_axes using patched alignment helper."""
# Create a fake trajectory module with required helpers
fake_traj = types.SimpleNamespace()
# _load_window_ids should return ordered windows
fake_traj._load_window_ids = lambda db: ["w1", "w2"]
# _load_mp_vectors_for_window is not used because we patch _procrustes_align_windows
fake_traj._load_mp_vectors_for_window = lambda db, w: {}
# Provide aligned vectors directly
aligned = {
"w1": {"Alice": np.array([1.0, 0.0, 0.0]), "Bob": np.array([0.0, 1.0, 0.0])},
"w2": {"Alice": np.array([0.8, 0.2, 0.0]), "Bob": np.array([0.1, 0.9, 0.0])},
}
fake_traj._procrustes_align_windows = lambda x: aligned
# Insert fake module into sys.modules for import by analysis.political_axis
monkeypatch.setitem(sys.modules, "analysis.trajectory", fake_traj)
# Now import the function under test
from analysis.political_axis import compute_2d_axes
positions_by_window, axis_def = compute_2d_axes(
db_path="dummy", window_ids=["w1", "w2"], method="pca"
)
assert "w1" in positions_by_window and "w2" in positions_by_window
for wid in ("w1", "w2"):
for name, coord in positions_by_window[wid].items():
assert len(coord) == 2
assert np.isfinite(coord[0]) and np.isfinite(coord[1])
assert axis_def.get("method") == "pca"
Loading…
Cancel
Save