diff --git a/.env b/.env new file mode 100644 index 0000000..0e9bba6 --- /dev/null +++ b/.env @@ -0,0 +1 @@ +OPENROUTER_API_KEY="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" diff --git a/analysis/political_axis.py b/analysis/political_axis.py index 8b0bb4b..7580c9a 100644 --- a/analysis/political_axis.py +++ b/analysis/political_axis.py @@ -14,9 +14,10 @@ Both modes return a dict mapping mp_name → scalar score for the given window. import json import logging -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Tuple import numpy as np +from . import trajectory as _trajectory import duckdb _logger = logging.getLogger(__name__) @@ -125,3 +126,162 @@ def compute_anchor_axis( axis = axis / norm return {name: float(np.dot(vec, axis)) for name, vec in mp_vecs.items()} + + +def compute_2d_axes( + db_path: str, + window_ids: Optional[List[str]] = None, + method: str = "pca", + anchor_kwargs: Optional[Dict] = None, +) -> Tuple[Dict[str, Dict[str, Tuple[float, float]]], Dict[str, np.ndarray]]: + """Compute 2D coordinates for MPs per window. + + Args: + db_path: path to duckdb + window_ids: optional ordered list of windows (defaults to all) + method: 'pca' or 'anchor' + anchor_kwargs: when method=='anchor' must provide + { + 'left_parties': List[str], + 'right_parties': List[str], + 'prog_parties': List[str], + 'cons_parties': List[str], + } + + Returns: + positions_by_window, axis_def + - positions_by_window: {window_id: {mp_name: (x,y)}} + - axis_def: {'x_axis': np.ndarray, 'y_axis': np.ndarray, 'method': str} + + Notes: + This function expects aligned SVD vectors produced by + trajectory._procrustes_align_windows. It will call trajectory helpers + to load and align windows so the returned coordinates are consistent + across windows. + """ + if window_ids is None: + window_ids = _trajectory._load_window_ids(db_path) + + # Load per-window raw vectors using the trajectory helper and align them + raw_window_vecs: Dict[str, Dict[str, np.ndarray]] = {} + for wid in window_ids: + raw_window_vecs[wid] = _trajectory._load_mp_vectors_for_window(db_path, wid) + + aligned_window_vecs = _trajectory._procrustes_align_windows(raw_window_vecs) + + # Stack all vectors across windows into a single matrix for PCA if needed + all_vecs = [] + entity_index = [] # parallel list of (window_id, entity) + for wid, d in aligned_window_vecs.items(): + for ent, v in d.items(): + all_vecs.append(v) + entity_index.append((wid, ent)) + + if len(all_vecs) == 0: + _logger.info("No vectors loaded for windows %s", window_ids) + return ({}, {}) + + M = np.vstack(all_vecs) + + if method == "pca": + # centre globally + Mc = M - M.mean(axis=0) + try: + _, _, Vt = np.linalg.svd(Mc, full_matrices=False) + except np.linalg.LinAlgError: + _logger.exception("SVD failed in compute_2d_axes (pca)") + return ({}, {}) + # take top-2 components as axes (shape k,) + comp1 = Vt[0] + comp2 = Vt[1] if Vt.shape[0] > 1 else np.zeros_like(comp1) + axes = { + "x_axis": comp1 / (np.linalg.norm(comp1) + 1e-12), + "y_axis": comp2 / (np.linalg.norm(comp2) + 1e-12), + "method": "pca", + } + + # project per-window vectors (centre by global mean) + global_mean = M.mean(axis=0) + positions_by_window: Dict[str, Dict[str, Tuple[float, float]]] = { + wid: {} for wid in window_ids + } + for (wid, ent), vec in zip(entity_index, M): + v_centered = vec - global_mean + x = float(np.dot(v_centered, axes["x_axis"])) + y = float(np.dot(v_centered, axes["y_axis"])) + positions_by_window[wid][ent] = (x, y) + + return positions_by_window, axes + + elif method == "anchor": + if not anchor_kwargs: + raise ValueError("anchor_kwargs required for method='anchor'") + left = set(anchor_kwargs.get("left_parties", [])) + right = set(anchor_kwargs.get("right_parties", [])) + prog = set(anchor_kwargs.get("prog_parties", [])) + cons = set(anchor_kwargs.get("cons_parties", [])) + + # collect vectors across all windows for each anchor group + def collect_for_party_set(party_set: set) -> List[np.ndarray]: + res: List[np.ndarray] = [] + # party-level entities (entity_id equals party name) + for wid, d in aligned_window_vecs.items(): + for ent, v in d.items(): + if ent in party_set: + res.append(v) + # MP-level via mp_metadata party affiliation + conn = duckdb.connect(db_path) + rows = conn.execute("SELECT mp_name, party FROM mp_metadata").fetchall() + conn.close() + for mp_name, party in rows: + if party in party_set: + # take all vectors for this MP across windows if present + for wid, d in aligned_window_vecs.items(): + if mp_name in d: + res.append(d[mp_name]) + return res + + left_vecs = collect_for_party_set(left) + right_vecs = collect_for_party_set(right) + prog_vecs = collect_for_party_set(prog) + cons_vecs = collect_for_party_set(cons) + + if not left_vecs or not right_vecs or not prog_vecs or not cons_vecs: + _logger.warning("Insufficient anchor vectors for requested parties") + return ({}, {}) + + left_centroid = np.mean(np.vstack(left_vecs), axis=0) + right_centroid = np.mean(np.vstack(right_vecs), axis=0) + prog_centroid = np.mean(np.vstack(prog_vecs), axis=0) + cons_centroid = np.mean(np.vstack(cons_vecs), axis=0) + + lr = right_centroid - left_centroid + pc = cons_centroid - prog_centroid + + # Gram-Schmidt: make pc orthogonal to lr + lr_norm = np.linalg.norm(lr) + if lr_norm < 1e-12: + raise ValueError("Left-right anchor axis has near-zero norm") + lr_hat = lr / lr_norm + # remove projection of pc on lr + pc = pc - np.dot(pc, lr_hat) * lr_hat + pc_norm = np.linalg.norm(pc) + if pc_norm < 1e-12: + raise ValueError( + "Progressive-conservative anchor axis degenerate after orthogonalisation" + ) + pc_hat = pc / pc_norm + + axes = {"x_axis": lr_hat, "y_axis": pc_hat, "method": "anchor"} + + positions_by_window = {wid: {} for wid in window_ids} + for wid, d in aligned_window_vecs.items(): + for ent, v in d.items(): + x = float(np.dot(v, axes["x_axis"])) + y = float(np.dot(v, axes["y_axis"])) + positions_by_window[wid][ent] = (x, y) + + return positions_by_window, axes + + else: + raise ValueError("Unknown method '%s'" % method) diff --git a/analysis/trajectory.py b/analysis/trajectory.py index ef4e782..cfe119a 100644 --- a/analysis/trajectory.py +++ b/analysis/trajectory.py @@ -195,6 +195,62 @@ def compute_trajectories( return result +def compute_2d_trajectories( + db_path: str, method: str = "pca", anchor_kwargs: Optional[Dict] = None +) -> Dict[str, Dict]: + """Compute 2D trajectory positions for MPs using compute_2d_axes. + + Returns dict keyed by mp_name with: + { + 'windows': [window_ids...], + 'coords': [[x,y], ...], + 'step_vectors': [[dx,dy], ...], + 'step_magnitudes': [float,...], + 'total_magnitude': float, + } + + Only MPs present in >=2 windows are included. + """ + from .political_axis import compute_2d_axes + + window_ids = _load_window_ids(db_path) + if len(window_ids) < 2: + _logger.info("Fewer than 2 windows — no 2D trajectories to compute") + return {} + + positions_by_window, axes = compute_2d_axes( + db_path, window_ids=window_ids, method=method, anchor_kwargs=anchor_kwargs + ) + + # Build per-MP time-ordered coords + mp_data: Dict[str, Dict] = {} + for wid in window_ids: + pos = positions_by_window.get(wid, {}) + for mp_name, coord in pos.items(): + if mp_name not in mp_data: + mp_data[mp_name] = {"windows": [], "coords": []} + mp_data[mp_name]["windows"].append(wid) + mp_data[mp_name]["coords"].append(tuple(coord)) + + result: Dict[str, Dict] = {} + for mp_name, data in mp_data.items(): + if len(data["windows"]) < 2: + continue + coords = [np.array(c, dtype=float) for c in data["coords"]] + step_vecs = [coords[i + 1] - coords[i] for i in range(len(coords) - 1)] + mags = [float(np.linalg.norm(v)) for v in step_vecs] + result[mp_name] = { + "windows": data["windows"], + "coords": [[float(c[0]), float(c[1])] for c in coords], + "step_vectors": [[float(v[0]), float(v[1])] for v in step_vecs], + "step_magnitudes": mags, + "total_magnitude": float(sum(mags)), + } + + _logger.info("2D trajectories computed for %d MPs", len(result)) + return result + + def top_drifters(trajectories: Dict[str, Dict], n: int = 10) -> List[Dict]: """Return the top-n MPs by total drift, sorted descending. diff --git a/analysis/visualize.py b/analysis/visualize.py index 595624a..e93bc78 100644 --- a/analysis/visualize.py +++ b/analysis/visualize.py @@ -9,9 +9,10 @@ Functions: """ import logging -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Tuple import numpy as np +from typing import Any _logger = logging.getLogger(__name__) @@ -161,3 +162,124 @@ def plot_political_axis( fig.write_html(output_path, include_plotlyjs="cdn") _logger.info("Political axis chart written to %s", output_path) return output_path + + +def plot_political_compass( + positions_by_window: Dict, + window_id: str, + party_of: Optional[Dict] = None, + output_path: str = "analysis_compass.html", +) -> str: + """Plot 2D political compass scatter for a single window. + + Args: + positions_by_window: {window_id: {mp_name: (x,y)}} + window_id: which window to plot + party_of: optional mapping mp_name -> party for colouring + output_path: HTML output path + + Returns output_path + """ + go, px = _require_plotly() + + pos = positions_by_window.get(window_id, {}) + xs = [v[0] for v in pos.values()] + ys = [v[1] for v in pos.values()] + names = list(pos.keys()) + + # If no party mapping provided, try to load from data/motions.db (duckdb) + if party_of is None: + try: + import duckdb # type: ignore + + try: + conn = duckdb.connect(database="data/motions.db", read_only=True) + df = conn.execute("SELECT mp_name, party FROM mp_metadata").fetchdf() + # convert to dict + party_of = { + row[0]: row[1] for row in df.itertuples(index=False, name=None) + } + _logger.info( + "Loaded party mapping for %d MPs from data/motions.db", + len(party_of), + ) + finally: + try: + conn.close() + except Exception: + pass + except ImportError: + _logger.debug("duckdb not installed; proceeding without party mapping") + except Exception as e: + _logger.debug("Could not load party mapping from data/motions.db: %s", e) + + parties = [party_of.get(n, "Unknown") if party_of else "Unknown" for n in names] + + fig = px.scatter( + x=xs, + y=ys, + color=parties, + hover_name=names, + title=f"Political Compass ({window_id})", + labels={ + "x": "Left ← — → Right", + "y": "Progressive ← — → Conservative", + "color": "Party", + }, + ) + fig.update_traces(marker=dict(size=8, opacity=0.8)) + fig.write_html(output_path, include_plotlyjs="cdn") + _logger.info("Political compass written to %s", output_path) + return output_path + + +def plot_2d_trajectories( + positions_by_window: Dict, + mp_names: Optional[List[str]] = None, + output_path: str = "analysis_trajectories_compass.html", +) -> str: + """Plot MP trajectories across windows on the 2D compass. + + Args: + positions_by_window: {window_id: {mp_name: (x,y)}} + mp_names: list of MPs to plot (default: all found in positions) + output_path: output HTML path + """ + go, px = _require_plotly() + + # collect window order + window_ids = list(positions_by_window.keys()) + # build per-MP time-ordered coords + # mp_coords maps mp_name -> list of (window_id, (x,y)) + mp_coords: Dict[str, List[Tuple[str, Tuple[float, float]]]] = {} + for wid in window_ids: + for mp, coord in positions_by_window.get(wid, {}).items(): + mp_coords.setdefault(mp, []).append((wid, coord)) + + if mp_names is None: + mp_names = list(mp_coords.keys()) + + fig = go.Figure() + for mp in mp_names: + if mp not in mp_coords: + continue + items = mp_coords[mp] + # ensure sorted by window order + items_sorted = sorted(items, key=lambda it: window_ids.index(it[0])) + xs = [c[1][0] for c in items_sorted] + ys = [c[1][1] for c in items_sorted] + text = [f"{mp} ({w})" for w, _ in items_sorted] + fig.add_trace( + go.Scatter( + x=xs, y=ys, mode="lines+markers", name=mp, text=text, hoverinfo="text" + ) + ) + + fig.update_layout( + title="MP Trajectories on Political Compass", + xaxis_title="Left ← — → Right", + yaxis_title="Progressive ← — → Conservative", + ) + fig.write_html(output_path, include_plotlyjs="cdn") + _logger.info("2D trajectories compass written to %s", output_path) + return output_path diff --git a/data/motions.db b/data/motions.db new file mode 100644 index 0000000..927a418 Binary files /dev/null and b/data/motions.db differ diff --git a/tests/test_political_compass.py b/tests/test_political_compass.py new file mode 100644 index 0000000..fa71908 --- /dev/null +++ b/tests/test_political_compass.py @@ -0,0 +1,44 @@ +import numpy as np +import types +import sys + +import pytest + + +def test_compute_2d_axes_pca_synthetic(monkeypatch): + """Synthetic test for compute_2d_axes using patched alignment helper.""" + + # Create a fake trajectory module with required helpers + fake_traj = types.SimpleNamespace() + + # _load_window_ids should return ordered windows + fake_traj._load_window_ids = lambda db: ["w1", "w2"] + + # _load_mp_vectors_for_window is not used because we patch _procrustes_align_windows + fake_traj._load_mp_vectors_for_window = lambda db, w: {} + + # Provide aligned vectors directly + aligned = { + "w1": {"Alice": np.array([1.0, 0.0, 0.0]), "Bob": np.array([0.0, 1.0, 0.0])}, + "w2": {"Alice": np.array([0.8, 0.2, 0.0]), "Bob": np.array([0.1, 0.9, 0.0])}, + } + + fake_traj._procrustes_align_windows = lambda x: aligned + + # Insert fake module into sys.modules for import by analysis.political_axis + monkeypatch.setitem(sys.modules, "analysis.trajectory", fake_traj) + + # Now import the function under test + from analysis.political_axis import compute_2d_axes + + positions_by_window, axis_def = compute_2d_axes( + db_path="dummy", window_ids=["w1", "w2"], method="pca" + ) + + assert "w1" in positions_by_window and "w2" in positions_by_window + for wid in ("w1", "w2"): + for name, coord in positions_by_window[wid].items(): + assert len(coord) == 2 + assert np.isfinite(coord[0]) and np.isfinite(coord[1]) + + assert axis_def.get("method") == "pca"