#!/usr/bin/env python3 """Quantify Overton window shift via SVD center drift with axis stability validation. Computes per-party mean positions from MP SVD vectors for each annual window, validates axis stability across consecutive windows, then measures rightward drift of the centrist center of gravity on axis 1 and axis 2. Usage: uv run python analysis/right_wing/overton_svd_drift.py """ from __future__ import annotations import json import logging import os import sys from collections import defaultdict from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import duckdb import matplotlib import matplotlib.pyplot as plt import numpy as np from scipy.stats import spearmanr matplotlib.use("Agg") ROOT = Path(__file__).parent.parent.parent.resolve() if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from analysis.config import CANONICAL_RIGHT, PARTY_COLOURS, _PARTY_NORMALIZE logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger("overton_svd_drift") CANONICAL_CENTRIST = frozenset({"VVD", "D66", "CDA", "NSC", "BBB", "ChristenUnie"}) DB_PATH = str(ROOT / "data" / "motions.db") REPORTS_DIR = ROOT / "reports" / "overton_window" STABILITY_THRESHOLD = 0.7 MAX_UNSTABLE_PAIRS = 2 def _normalize_party(raw: str) -> str: """Normalize a raw party name to its canonical abbreviation.""" return _PARTY_NORMALIZE.get(raw, raw) def compute_party_positions( con: duckdb.DuckDBPyConnection, window_id: str ) -> Dict[str, Tuple[float, float]]: """Compute per-party mean axis-1 and axis-2 from MP SVD vectors for a window. Mirrors the logic of agent_tools/database.py:compute_party_positions_from_vectors. """ rows = con.execute( """ SELECT sv.entity_id, sv.vector, mm.party FROM svd_vectors sv JOIN mp_metadata mm ON sv.entity_id = mm.mp_name WHERE sv.window_id = ? AND sv.entity_type = 'mp' """, (window_id,), ).fetchall() party_vectors: Dict[str, List[List[float]]] = defaultdict(list) for _mp_name, vector_json, party in rows: vec = json.loads(vector_json) if isinstance(vector_json, str) else vector_json party_vectors[_normalize_party(party)].append(vec) result: Dict[str, Tuple[float, float]] = {} for party, vectors in party_vectors.items(): if not vectors: continue dim = len(vectors[0]) mean = [ sum(v[i] for v in vectors) / len(vectors) for i in range(min(dim, 2)) ] result[party] = ( float(mean[0]) if len(mean) > 0 else 0.0, float(mean[1]) if len(mean) > 1 else 0.0, ) return result def get_annual_windows(con: duckdb.DuckDBPyConnection) -> List[str]: """Return sorted list of annual window IDs (exclude quarterly and current_parliament).""" rows = con.execute( """ SELECT DISTINCT window_id FROM svd_vectors WHERE entity_type = 'mp' AND window_id NOT LIKE '%-Q%' AND window_id != 'current_parliament' ORDER BY window_id """ ).fetchall() return [r[0] for r in rows] def validate_axis_stability( all_positions: Dict[str, Dict[str, Tuple[float, float]]], windows: List[str], ) -> Tuple[bool, List[Dict[str, Any]], Dict[str, float]]: """Validate that SVD axes are stable enough for cross-window comparison. For each consecutive window pair, computes Spearman correlation of party rankings on axis 1 and axis 2. If either correlation < threshold, the pair is flagged as unstable. If >2 unstable pairs, the comparison is aborted. Returns (is_stable, stability_details, avg_correlations). """ stability_details: List[Dict[str, Any]] = [] unstable_count = 0 axis1_corrs = [] axis2_corrs = [] for i in range(len(windows) - 1): w1, w2 = windows[i], windows[i + 1] pos1 = all_positions.get(w1, {}) pos2 = all_positions.get(w2, {}) shared = set(pos1.keys()) & set(pos2.keys()) if len(shared) < 3: stability_details.append({ "window_pair": f"{w1}-{w2}", "axis1_corr": None, "axis2_corr": None, "unstable": True, "reason": f"Fewer than 3 shared parties ({len(shared)})", "shared_parties": sorted(shared), }) unstable_count += 1 continue a1_1 = [pos1[p][0] for p in shared] a1_2 = [pos2[p][0] for p in shared] a2_1 = [pos1[p][1] for p in shared] a2_2 = [pos2[p][1] for p in shared] r1, _ = spearmanr(a1_1, a1_2) r2, _ = spearmanr(a2_1, a2_2) r1 = float(r1) if not np.isnan(r1) else 0.0 r2 = float(r2) if not np.isnan(r2) else 0.0 axis1_corrs.append(r1) axis2_corrs.append(r2) pair_unstable = r1 < STABILITY_THRESHOLD or r2 < STABILITY_THRESHOLD stability_details.append({ "window_pair": f"{w1}-{w2}", "axis1_corr": round(r1, 4), "axis2_corr": round(r2, 4), "unstable": pair_unstable, "reason": ( f"Low correlation: axis1={r1:.3f}, axis2={r2:.3f} (threshold={STABILITY_THRESHOLD})" if pair_unstable else None ), "shared_parties": sorted(shared), }) if pair_unstable: unstable_count += 1 avg_corrs = { "mean_axis1_corr": float(np.mean(axis1_corrs)) if axis1_corrs else 0.0, "mean_axis2_corr": float(np.mean(axis2_corrs)) if axis2_corrs else 0.0, } is_stable = unstable_count <= MAX_UNSTABLE_PAIRS return is_stable, stability_details, avg_corrs def compute_centers( all_positions: Dict[str, Dict[str, Tuple[float, float]]], windows: List[str], ) -> List[Dict[str, Any]]: """Compute centrist and right-wing centers of gravity per window. Missing parties in a window are simply skipped (mean over available parties). """ results: List[Dict[str, Any]] = [] for window_id in windows: pos = all_positions.get(window_id, {}) centrist_a1 = [] centrist_a2 = [] right_a1 = [] right_a2 = [] for party, (a1, a2) in pos.items(): if party in CANONICAL_CENTRIST: centrist_a1.append(a1) centrist_a2.append(a2) if party in CANONICAL_RIGHT: right_a1.append(a1) right_a2.append(a2) centrist_mean_a1 = float(np.mean(centrist_a1)) if centrist_a1 else None centrist_mean_a2 = float(np.mean(centrist_a2)) if centrist_a2 else None right_mean_a1 = float(np.mean(right_a1)) if right_a1 else None right_mean_a2 = float(np.mean(right_a2)) if right_a2 else None results.append({ "window_id": window_id, "centrist_mean_axis1": centrist_mean_a1, "centrist_mean_axis2": centrist_mean_a2, "right_mean_axis1": right_mean_a1, "right_mean_axis2": right_mean_a2, "centrist_parties_present": sorted( p for p in pos if p in CANONICAL_CENTRIST ), "right_parties_present": sorted( p for p in pos if p in CANONICAL_RIGHT ), }) return results def create_table( con: duckdb.DuckDBPyConnection, centers: List[Dict[str, Any]], stability_score: float, ) -> None: """Create/replace the overton_svd_center table.""" con.execute("DROP TABLE IF EXISTS overton_svd_center") con.execute(""" CREATE TABLE overton_svd_center ( window_id VARCHAR PRIMARY KEY, centrist_mean_axis1 DOUBLE, centrist_mean_axis2 DOUBLE, right_mean_axis1 DOUBLE, right_mean_axis2 DOUBLE, stability_score DOUBLE ) """) for row in centers: con.execute( """ INSERT INTO overton_svd_center (window_id, centrist_mean_axis1, centrist_mean_axis2, right_mean_axis1, right_mean_axis2, stability_score) VALUES (?, ?, ?, ?, ?, ?) """, ( row["window_id"], row["centrist_mean_axis1"], row["centrist_mean_axis2"], row["right_mean_axis1"], row["right_mean_axis2"], stability_score, ), ) def plot_trajectory( centers: List[Dict[str, Any]], stability_details: List[Dict[str, Any]], avg_corrs: Dict[str, float], output_path: str, ) -> None: """Plot centrist center trajectory with right-wing reference on 2D compass.""" fig, ax = plt.subplots(figsize=(10, 8)) windows = [c["window_id"] for c in centers] cent_a1 = [c["centrist_mean_axis1"] for c in centers] cent_a2 = [c["centrist_mean_axis2"] for c in centers] right_a1 = [c["right_mean_axis1"] for c in centers] right_a2 = [c["right_mean_axis2"] for c in centers] valid_windows = [ windows[i] for i in range(len(windows)) if cent_a1[i] is not None and cent_a2[i] is not None ] if len(valid_windows) < 2: ax.text( 0.5, 0.5, "Insufficient data for trajectory plot", transform=ax.transAxes, ha="center", va="center", ) fig.savefig(output_path, dpi=150, bbox_inches="tight", facecolor="white") plt.close(fig) return cent_a1_valid = [c for c in cent_a1 if c is not None] cent_a2_valid = [c for c in cent_a2 if c is not None] right_a1_valid = [c for c in right_a1 if c is not None] right_a2_valid = [c for c in right_a2 if c is not None] windows_valid = [w for w, a1 in zip(windows, cent_a1) if a1 is not None] years = [int(w) for w in windows_valid] ax.plot(cent_a1_valid, cent_a2_valid, "o-", color="#1E73BE", linewidth=2, markersize=8, label="Centrist center (VVD, D66, CDA, NSC, BBB, CU)", zorder=3) if right_a1_valid and right_a2_valid: ax.plot(right_a1_valid, right_a2_valid, "s--", color="#6A1B9A", linewidth=1.5, markersize=6, label="Right-wing center (PVV, FVD, JA21, SGP)", alpha=0.7, zorder=2) for i, year in enumerate(years): if i < len(cent_a1_valid) and cent_a1_valid[i] is not None: ax.annotate( str(year), (cent_a1_valid[i], cent_a2_valid[i]), textcoords="offset points", xytext=(7, 7), fontsize=8, color="#333333", ) ax.axhline(0, color="#CCCCCC", linewidth=0.5, linestyle="-") ax.axvline(0, color="#CCCCCC", linewidth=0.5, linestyle="-") ax.set_xlabel("SVD Axis 1") ax.set_ylabel("SVD Axis 2") ax.set_title( f"Parliamentary Center Trajectory (2016–2026)\n" f"Stability: axis1 ρ={avg_corrs.get('mean_axis1_corr', 0):.3f}, " f"axis2 ρ={avg_corrs.get('mean_axis2_corr', 0):.3f}", fontsize=11, ) ax.legend(loc="upper left", fontsize=8, framealpha=0.9) ax.set_aspect("equal", adjustable="datalim") ax.grid(True, alpha=0.3) fig.tight_layout() fig.savefig(output_path, dpi=150, bbox_inches="tight", facecolor="white") plt.close(fig) logger.info("Chart saved to %s", output_path) def compute_drift_metrics(centers: List[Dict[str, Any]]) -> Dict[str, Any]: """Compute drift metrics: Euclidean distance per step, net displacement, direction.""" valid = [c for c in centers if c["centrist_mean_axis1"] is not None] if len(valid) < 2: return { "euclidean_steps": [], "net_displacement": None, "angular_direction_deg": None, "rightward_distance_traveled": None, } euclidean_steps = [] for i in range(len(valid) - 1): dx = valid[i + 1]["centrist_mean_axis1"] - valid[i]["centrist_mean_axis1"] dy = valid[i + 1]["centrist_mean_axis2"] - valid[i]["centrist_mean_axis2"] dist = float(np.sqrt(dx**2 + dy**2)) euclidean_steps.append({ "window_pair": f"{valid[i]['window_id']}-{valid[i+1]['window_id']}", "distance": round(dist, 6), "dx": round(dx, 6), "dy": round(dy, 6), }) first = valid[0] last = valid[-1] dx_net = last["centrist_mean_axis1"] - first["centrist_mean_axis1"] dy_net = last["centrist_mean_axis2"] - first["centrist_mean_axis2"] net_disp = float(np.sqrt(dx_net**2 + dy_net**2)) angle_rad = np.arctan2(dy_net, dx_net) angle_deg = float(np.degrees(angle_rad)) return { "euclidean_steps": euclidean_steps, "net_displacement": round(net_disp, 6), "net_dx": round(dx_net, 6), "net_dy": round(dy_net, 6), "angular_direction_deg": round(angle_deg, 2), } def write_report( is_stable: bool, stability_details: List[Dict[str, Any]], avg_corrs: Dict[str, float], centers: List[Dict[str, Any]], drift: Dict[str, Any], output_path: str, chart_path: str, ) -> None: """Write the SVD stability and drift report as Markdown.""" lines: List[str] = [] lines.append("# SVD Center Drift & Axis Stability Report\n") lines.append("## Axis Stability Validation\n") lines.append( f"**Stability threshold:** Spearman ρ ≥ {STABILITY_THRESHOLD} for both axes. " f"Maximum unstable pairs allowed: {MAX_UNSTABLE_PAIRS}.\n" ) unstable_count = sum(1 for d in stability_details if d.get("unstable")) lines.append( f"**Result:** {unstable_count} unstable pair(s) out of " f"{len(stability_details)} consecutive window pairs.\n" ) if not is_stable: lines.append( "**CONCLUSION: SVD axes are too unstable for longitudinal comparison. " "Positions may reflect re-orientation rather than genuine drift. " "The following drift metrics and chart should be interpreted with extreme caution.**\n" ) lines.append(f"- Mean axis-1 correlation: {avg_corrs['mean_axis1_corr']:.4f}") lines.append(f"- Mean axis-2 correlation: {avg_corrs['mean_axis2_corr']:.4f}\n") lines.append("### Per-Pair Stability Details\n") lines.append("| Window Pair | Axis 1 ρ | Axis 2 ρ | Unstable | Shared Parties |") lines.append("|---|---|---|---|---|") for d in stability_details: r1 = f"{d['axis1_corr']:.3f}" if d["axis1_corr"] is not None else "N/A" r2 = f"{d['axis2_corr']:.3f}" if d["axis2_corr"] is not None else "N/A" flag = "**YES**" if d.get("unstable") else "no" parties = ", ".join(d.get("shared_parties", [])) lines.append(f"| {d['window_pair']} | {r1} | {r2} | {flag} | {parties} |") lines.append("") lines.append("## Centrist Center of Gravity\n") lines.append( "| Window | Centrist Ax1 | Centrist Ax2 | Right Ax1 | Right Ax2 | " "Centrist Parties Present | Right Parties Present |" ) lines.append("|---|---|---|---|---|---|---|") for c in centers: cent_a1 = f"{c['centrist_mean_axis1']:.4f}" if c["centrist_mean_axis1"] is not None else "N/A" cent_a2 = f"{c['centrist_mean_axis2']:.4f}" if c["centrist_mean_axis2"] is not None else "N/A" right_a1 = f"{c['right_mean_axis1']:.4f}" if c["right_mean_axis1"] is not None else "N/A" right_a2 = f"{c['right_mean_axis2']:.4f}" if c["right_mean_axis2"] is not None else "N/A" cent_parties = ", ".join(c["centrist_parties_present"]) right_parties = ", ".join(c["right_parties_present"]) lines.append( f"| {c['window_id']} | {cent_a1} | {cent_a2} | {right_a1} | {right_a2} " f"| {cent_parties} | {right_parties} |" ) lines.append("") if is_stable: lines.append("## Drift Metrics\n") lines.append(f"- **Net displacement (first → last):** {drift['net_displacement']}") lines.append(f" - Δ axis-1: {drift['net_dx']}") lines.append(f" - Δ axis-2: {drift['net_dy']}") lines.append(f"- **Net direction:** {drift['angular_direction_deg']}° " f"(arctan2(Δy, Δx))") lines.append(f" - Positive Δx = rightward on axis 1") lines.append(f" - Positive Δy = upward on axis 2\n") lines.append("### Year-over-Year Drift\n") lines.append("| Window Pair | Euclidean Distance | Δ Axis-1 | Δ Axis-2 |") lines.append("|---|---|---|---|") total_dist = 0.0 for step in drift["euclidean_steps"]: lines.append( f"| {step['window_pair']} | {step['distance']:.6f} " f"| {step['dx']:+.6f} | {step['dy']:+.6f} |" ) total_dist += step["distance"] lines.append(f"\n**Total path length:** {total_dist:.6f}\n") else: lines.append("## Drift Metrics (UNRELIABLE — Axes Unstable)\n") lines.append( "Drift metrics were computed but are unreliable due to axis instability. " "Cross-window comparisons on unstable axes conflate positional change " "with axis re-orientation.\n" ) lines.append(f"## Chart\n") lines.append(f"![SVD Drift Chart]({os.path.basename(chart_path)})\n") lines.append("## Interpretability Statement\n") if is_stable: lines.append( "The SVD axes show sufficient stability for cross-window comparison. " "The parliamentary center trajectory reflects genuine shifts in voting " "behavior rather than axis re-orientation artifact. The centrist center-of-gravity " "movement on the 2D compass can be interpreted as a measure of ideological drift.\n" ) else: lines.append( "SVD axes are too unstable for longitudinal comparison. The trajectory " "plotted above may reflect axis re-orientation (each SVD window independently " "determines its principal axes) rather than genuine ideological drift. " "We recommend against drawing conclusions from this analysis.\n" ) lines.append("---\n") lines.append( "*Note: SVD axes reflect voting patterns, not semantic content. " "A shift means voting behavior changed, not that parties changed their rhetoric. " "See: docs/solutions/best-practices/svd-labels-voting-patterns-not-semantics.md*\n" ) os.makedirs(os.path.dirname(output_path), exist_ok=True) with open(output_path, "w", encoding="utf-8") as f: f.write("\n".join(lines) + "\n") logger.info("Report saved to %s", output_path) def main() -> None: os.makedirs(str(REPORTS_DIR), exist_ok=True) con = duckdb.connect(database=DB_PATH, read_only=False) try: windows = get_annual_windows(con) logger.info("Found %d annual windows: %s", len(windows), windows) all_positions: Dict[str, Dict[str, Tuple[float, float]]] = {} for w in windows: pos = compute_party_positions(con, w) all_positions[w] = pos n_parties = len(pos) centrist_present = sum(1 for p in pos if p in CANONICAL_CENTRIST) right_present = sum(1 for p in pos if p in CANONICAL_RIGHT) logger.info( "Window %s: %d parties, %d centrist, %d right", w, n_parties, centrist_present, right_present, ) is_stable, stability_details, avg_corrs = validate_axis_stability( all_positions, windows ) unstable_count = sum(1 for d in stability_details if d.get("unstable")) logger.info( "Stability: %s (%d/%d unstable pairs), mean axis1 ρ=%.3f, mean axis2 ρ=%.3f", "STABLE" if is_stable else "UNSTABLE", unstable_count, len(stability_details), avg_corrs["mean_axis1_corr"], avg_corrs["mean_axis2_corr"], ) for d in stability_details: if d.get("unstable"): logger.warning( "Unstable pair %s: axis1=%.3f, axis2=%.3f, reason=%s", d["window_pair"], d["axis1_corr"] or 0, d["axis2_corr"] or 0, d.get("reason", ""), ) centers = compute_centers(all_positions, windows) stability_score = ( avg_corrs["mean_axis1_corr"] + avg_corrs["mean_axis2_corr"] ) / 2.0 for c_row in centers: c_row["stability_score"] = stability_score create_table(con, centers, stability_score) n_rows = con.execute("SELECT COUNT(*) FROM overton_svd_center").fetchone()[0] logger.info("Created overton_svd_center table with %d rows", n_rows) chart_path = str(REPORTS_DIR / "svd_drift_chart.png") plot_trajectory(centers, stability_details, avg_corrs, chart_path) drift = compute_drift_metrics(centers) report_path = str(REPORTS_DIR / "svd_stability_report.md") write_report( is_stable, stability_details, avg_corrs, centers, drift, report_path, chart_path, ) summary = { "stability_status": "STABLE" if is_stable else "UNSTABLE", "unstable_pairs": unstable_count, "total_pairs": len(stability_details), "mean_axis1_corr": round(avg_corrs["mean_axis1_corr"], 4), "mean_axis2_corr": round(avg_corrs["mean_axis2_corr"], 4), "windows": len(windows), "table_rows": n_rows, "net_displacement": drift.get("net_displacement"), "net_dx": drift.get("net_dx"), "net_dy": drift.get("net_dy"), "angular_direction_deg": drift.get("angular_direction_deg"), } logger.info("Summary: %s", json.dumps(summary, indent=2)) return summary finally: con.close() if __name__ == "__main__": result = main() print(json.dumps(result, indent=2))