"""Generate political compass and 2D trajectories HTML outputs. This script computes 2D axes using residual-PCA (or anchor), applies the party-fill helper to colour MPs, and writes self-contained HTML files into an outputs/ directory. Usage: python scripts/generate_compass.py --db data/motions.db --out outputs --method pca --pca-residual The script is defensive: if required optional libraries (duckdb, plotly, scipy) are missing it will log and exit without raising an uncaught exception. """ from __future__ import annotations import argparse import logging import os import sys from typing import Optional # Ensure project root is on sys.path so `import analysis.*` works when the # script is executed from the repository root or from scripts/ directly. ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT not in sys.path: sys.path.insert(0, ROOT) logger = logging.getLogger("generate_compass") logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") def main(argv: Optional[list] = None): p = argparse.ArgumentParser() p.add_argument("--db", default="data/motions.db", help="Path to duckdb database") p.add_argument("--out", default="outputs", help="Output directory") p.add_argument("--method", choices=["pca", "anchor"], default="pca") p.add_argument( "--pca-residual", action="store_true", help="Use residual PCA for second axis" ) p.add_argument( "--y-scale", type=float, default=None, help="Optional manual y-axis scale multiplier", ) args = p.parse_args(argv) # Lazy imports so the script exits gracefully if deps missing try: from analysis.political_axis import compute_2d_axes from analysis.visualize import ( plot_political_compass, plot_2d_trajectories, _load_party_map, ) except Exception as e: # pragma: no cover - runtime helper logger.exception("Required analysis modules could not be imported: %s", e) sys.exit(1) # Ensure output dir exists os.makedirs(args.out, exist_ok=True) logger.info( "Computing 2D axes (method=%s pca_residual=%s)", args.method, args.pca_residual ) try: positions_by_window, axis_def = compute_2d_axes( args.db, method=args.method, pca_residual=args.pca_residual, normalize_vectors=True, ) except Exception as e: # defensive logger.exception("compute_2d_axes failed: %s", e) sys.exit(1) if not positions_by_window: logger.error("No positions produced — aborting") sys.exit(1) # pick latest window (lexicographic order is used elsewhere in codebase) window_id = sorted(positions_by_window.keys())[-1] # Build party mapping to colour points try: party_map = _load_party_map(args.db) except Exception: logger.exception("Failed to build party map; proceeding without it") party_map = None # Output files compass_out = os.path.join( args.out, f"political_compass_{args.method}_{window_id}.html" ) traj_out = os.path.join(args.out, f"trajectories_compass_{args.method}_top50.html") try: plot_political_compass( positions_by_window, window_id=window_id, party_of=party_map, axis_def=axis_def, y_scale=args.y_scale, output_path=compass_out, ) logger.info("Wrote compass to %s", compass_out) except Exception: logger.exception("Failed to write political compass") try: # Build 2D trajectories from the already-computed positions_by_window so # we keep the same PCA/anchor axes (compute_2d_trajectories would call # compute_2d_axes again which may use different defaults). import numpy as _np window_ids = sorted(positions_by_window.keys()) mp_data = {} for wid in window_ids: pos = positions_by_window.get(wid, {}) for mp_name, coord in pos.items(): mp_data.setdefault(mp_name, {"windows": [], "coords": []}) mp_data[mp_name]["windows"].append(wid) mp_data[mp_name]["coords"].append(tuple(coord)) trajs = {} for mp_name, data in mp_data.items(): if len(data["windows"]) < 2: continue coords = [_np.array(c, dtype=float) for c in data["coords"]] step_vecs = [coords[i + 1] - coords[i] for i in range(len(coords) - 1)] mags = [float(_np.linalg.norm(v)) for v in step_vecs] trajs[mp_name] = { "windows": data["windows"], "coords": [[float(c[0]), float(c[1])] for c in coords], "step_vectors": [[float(v[0]), float(v[1])] for v in step_vecs], "step_magnitudes": mags, "total_magnitude": float(sum(mags)), } ranked = sorted( trajs.items(), key=lambda kv: kv[1]["total_magnitude"], reverse=True ) top_names = [mp for mp, _ in ranked[:50]] if ranked else None plot_2d_trajectories( positions_by_window, mp_names=top_names, output_path=traj_out ) logger.info("Wrote trajectories to %s", traj_out) except Exception: logger.exception("Failed to compute/write trajectories") if __name__ == "__main__": main()