You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
motief/analysis/right_wing/overton_svd_drift.py

565 lines
19 KiB

#!/usr/bin/env python3
"""Quantify Overton window shift via Procrustes-aligned center drift.
Uses Procrustes-aligned, PCA-rotated 2D party positions from
load_party_scores_all_windows_aligned() to measure rightward drift
of the centrist center of gravity on a common reference frame.
Axes are aligned across all windows — no stability validation needed.
Usage:
uv run python analysis/right_wing/overton_svd_drift.py
"""
from __future__ import annotations
import json
import logging
import os
import sys
from pathlib import Path
from typing import Any, Dict, List
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
matplotlib.use("Agg")
ROOT = Path(__file__).parent.parent.parent.resolve()
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from analysis.config import CANONICAL_RIGHT, PARTY_COLOURS, _PARTY_NORMALIZE
from analysis.explorer_data import (
get_uniform_dim_windows,
load_party_scores_all_windows_aligned,
)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("overton_svd_drift")
CANONICAL_CENTRIST = frozenset(
{"VVD", "D66", "CDA", "NSC", "BBB", "CU", "ChristenUnie"}
)
DB_PATH = str(ROOT / "data" / "motions.db")
REPORTS_DIR = ROOT / "reports" / "overton_window"
def _normalize_party(raw: str) -> str:
"""Normalize a raw party name to its canonical abbreviation."""
return _PARTY_NORMALIZE.get(raw, raw)
def _party_in_set(party: str, canonical_set: frozenset) -> bool:
"""Check party membership against a canonical set.
Checks the raw party name and its normalized form so that both
'CU' and 'ChristenUnie' match a set containing either variant.
"""
if party in canonical_set:
return True
normalized = _normalize_party(party)
return normalized != party and normalized in canonical_set
def compute_aligned_centers(
scores: Dict[str, List[List[float]]],
windows: List[str],
annual_indices: List[int],
) -> List[Dict[str, Any]]:
"""Compute centrist and right-wing centers of gravity per window.
Uses Procrustes-aligned party positions from
load_party_scores_all_windows_aligned(). Missing parties in a
window are simply skipped (mean over available parties).
"""
results: List[Dict[str, Any]] = []
for idx, window_id in enumerate(windows):
centrist_a1: List[float] = []
centrist_a2: List[float] = []
right_a1: List[float] = []
right_a2: List[float] = []
centrist_present: List[str] = []
right_present: List[str] = []
for party, window_scores in scores.items():
if idx >= len(window_scores):
continue
a1, a2 = window_scores[idx]
if _party_in_set(party, CANONICAL_CENTRIST):
centrist_a1.append(a1)
centrist_a2.append(a2)
centrist_present.append(party)
if _party_in_set(party, CANONICAL_RIGHT):
right_a1.append(a1)
right_a2.append(a2)
right_present.append(party)
results.append(
{
"window_id": window_id,
"centrist_mean_axis1": float(np.mean(centrist_a1)) if centrist_a1 else None,
"centrist_mean_axis2": float(np.mean(centrist_a2)) if centrist_a2 else None,
"right_mean_axis1": float(np.mean(right_a1)) if right_a1 else None,
"right_mean_axis2": float(np.mean(right_a2)) if right_a2 else None,
"centrist_parties_present": sorted(centrist_present),
"right_parties_present": sorted(right_present),
"centrist_count": len(centrist_present),
"right_count": len(right_present),
"is_annual": idx in annual_indices,
}
)
return results
def compute_drift_metrics(
annual_centers: List[Dict[str, Any]],
) -> Dict[str, Any]:
"""Compute drift metrics for annual windows only.
Returns:
euclidean_steps: year-over-year displacements
net_displacement: first-to-last Euclidean distance
angular_direction_deg: arctan2(dy, dx) in degrees
approach_to_right: whether centrist center is moving toward
or away from the right-wing center
right_net: net displacement of right-wing center for comparison
"""
valid = [c for c in annual_centers if c["centrist_mean_axis1"] is not None]
if len(valid) < 2:
return {
"euclidean_steps": [],
"net_displacement": None,
"net_dx": None,
"net_dy": None,
"angular_direction_deg": None,
"approach_to_right": None,
"right_net": None,
}
euclidean_steps = []
for i in range(len(valid) - 1):
dx = (
valid[i + 1]["centrist_mean_axis1"]
- valid[i]["centrist_mean_axis1"]
)
dy = (
valid[i + 1]["centrist_mean_axis2"]
- valid[i]["centrist_mean_axis2"]
)
dist = float(np.sqrt(dx**2 + dy**2))
euclidean_steps.append(
{
"window_pair": f"{valid[i]['window_id']}-{valid[i+1]['window_id']}",
"distance": round(dist, 6),
"dx": round(dx, 6),
"dy": round(dy, 6),
}
)
first = valid[0]
last = valid[-1]
dx_net = last["centrist_mean_axis1"] - first["centrist_mean_axis1"]
dy_net = last["centrist_mean_axis2"] - first["centrist_mean_axis2"]
net_disp = float(np.sqrt(dx_net**2 + dy_net**2))
angle_rad = np.arctan2(dy_net, dx_net)
angle_deg = float(np.degrees(angle_rad))
# Right-wing net displacement for comparison
right_net = None
right_valid = [
c for c in annual_centers if c["right_mean_axis1"] is not None
]
if len(right_valid) >= 2:
r_first = right_valid[0]
r_last = right_valid[-1]
r_dx = r_last["right_mean_axis1"] - r_first["right_mean_axis1"]
r_dy = r_last["right_mean_axis2"] - r_first["right_mean_axis2"]
right_net = {
"net_displacement": round(float(np.sqrt(r_dx**2 + r_dy**2)), 6),
"net_dx": round(r_dx, 6),
"net_dy": round(r_dy, 6),
}
# Is centrist center drifting toward or away from right-wing center?
approach_to_right = None
if (
first.get("right_mean_axis1") is not None
and last.get("right_mean_axis1") is not None
):
first_dist = float(
np.sqrt(
(first["centrist_mean_axis1"] - first["right_mean_axis1"]) ** 2
+ (first["centrist_mean_axis2"] - first["right_mean_axis2"]) ** 2
)
)
last_dist = float(
np.sqrt(
(last["centrist_mean_axis1"] - last["right_mean_axis1"]) ** 2
+ (last["centrist_mean_axis2"] - last["right_mean_axis2"]) ** 2
)
)
delta = last_dist - first_dist
if abs(delta) < 1e-9:
direction = "unchanged"
elif delta < 0:
direction = "toward right"
else:
direction = "away from right"
approach_to_right = {
"first_distance": round(first_dist, 6),
"last_distance": round(last_dist, 6),
"delta_distance": round(delta, 6),
"direction": direction,
}
return {
"euclidean_steps": euclidean_steps,
"net_displacement": round(net_disp, 6),
"net_dx": round(dx_net, 6),
"net_dy": round(dy_net, 6),
"angular_direction_deg": round(angle_deg, 2),
"approach_to_right": approach_to_right,
"right_net": right_net,
}
def plot_trajectory(
annual_centers: List[Dict[str, Any]],
output_path: str,
) -> None:
"""Plot centrist center trajectory with right-wing reference on 2D compass.
Uses arrows between consecutive annual windows and year labels.
"""
fig, ax = plt.subplots(figsize=(10, 8))
cent_a1 = [c["centrist_mean_axis1"] for c in annual_centers]
cent_a2 = [c["centrist_mean_axis2"] for c in annual_centers]
windows_labels = [
c["window_id"]
for c in annual_centers
if c["centrist_mean_axis1"] is not None
]
cent_a1_valid = [v for v in cent_a1 if v is not None]
cent_a2_valid = [v for v in cent_a2 if v is not None]
if len(cent_a1_valid) < 2:
ax.text(
0.5,
0.5,
"Insufficient data for trajectory plot",
transform=ax.transAxes,
ha="center",
va="center",
)
fig.savefig(output_path, dpi=150, bbox_inches="tight", facecolor="white")
plt.close(fig)
return
# Arrows between consecutive years
for i in range(len(cent_a1_valid) - 1):
ax.annotate(
"",
xy=(cent_a1_valid[i + 1], cent_a2_valid[i + 1]),
xytext=(cent_a1_valid[i], cent_a2_valid[i]),
arrowprops=dict(arrowstyle="->", color="#1E73BE", lw=1.5, alpha=0.6),
)
ax.plot(
cent_a1_valid,
cent_a2_valid,
"o-",
color="#1E73BE",
linewidth=2,
markersize=8,
label="Centrist center (VVD, D66, CDA, NSC, BBB, CU)",
zorder=3,
)
# Right-wing trajectory (dashed reference)
right_a1 = [c["right_mean_axis1"] for c in annual_centers]
right_a2 = [c["right_mean_axis2"] for c in annual_centers]
right_a1_valid = [v for v in right_a1 if v is not None]
right_a2_valid = [v for v in right_a2 if v is not None]
if right_a1_valid and right_a2_valid:
ax.plot(
right_a1_valid,
right_a2_valid,
"s--",
color="#6A1B9A",
linewidth=1.5,
markersize=6,
label="Right-wing center (PVV, FVD, JA21, SGP)",
alpha=0.7,
zorder=2,
)
# Year labels
for i, label in enumerate(windows_labels):
if i < len(cent_a1_valid):
ax.annotate(
str(label),
(cent_a1_valid[i], cent_a2_valid[i]),
textcoords="offset points",
xytext=(7, 7),
fontsize=8,
color="#333333",
)
ax.axhline(0, color="#CCCCCC", linewidth=0.5, linestyle="-")
ax.axvline(0, color="#CCCCCC", linewidth=0.5, linestyle="-")
ax.set_xlabel("PCA Axis 1 (Procrustes-aligned)")
ax.set_ylabel("PCA Axis 2 (Procrustes-aligned)")
ax.set_title(
"Parliamentary Center Trajectory (Procrustes-Aligned PCA)",
fontsize=11,
)
ax.legend(loc="upper left", fontsize=8, framealpha=0.9)
ax.set_aspect("equal", adjustable="datalim")
ax.grid(True, alpha=0.3)
fig.tight_layout()
fig.savefig(output_path, dpi=150, bbox_inches="tight", facecolor="white")
plt.close(fig)
logger.info("Chart saved to %s", output_path)
def write_report(
centers: List[Dict[str, Any]],
annual_centers: List[Dict[str, Any]],
drift: Dict[str, Any],
output_path: str,
chart_path: str,
non_annual: List[str],
) -> None:
"""Write the center drift report as Markdown."""
lines: List[str] = []
lines.append("# Center Drift Report (Procrustes-Aligned)\n")
lines.append("## Alignment Method\n")
lines.append(
"Party positions are Procrustes-aligned across all windows, then "
"PCA-rotated to a common 2D reference frame. This ensures that axis "
"orientation is consistent across time — no stability validation is "
"needed because all positions live in the same coordinate system.\n"
)
lines.append(
"This is the same alignment used by the Explorer UI compass and "
"trajectories: 1) zero-padding vectors to max dimension across all "
"windows, 2) chained Procrustes orthogonal rotation (each window to "
"the previous aligned one), 3) global PCA on the stacked aligned "
"matrix, 4) flip-correction per component using canonical left/right "
"parties.\n"
)
if non_annual:
lines.append(
f"**Note:** Non-annual windows excluded from drift analysis: "
f"{', '.join(sorted(non_annual))}\n"
)
lines.append("## Centrist Center of Gravity\n")
lines.append(
"| Window | Centrist Ax1 | Centrist Ax2 | Right Ax1 | Right Ax2 | "
"Centrist Parties | Right Parties |"
)
lines.append("|---|---|---|---|---|---|---|")
for c in centers:
cent_a1 = (
f"{c['centrist_mean_axis1']:.4f}"
if c["centrist_mean_axis1"] is not None
else "N/A"
)
cent_a2 = (
f"{c['centrist_mean_axis2']:.4f}"
if c["centrist_mean_axis2"] is not None
else "N/A"
)
right_a1 = (
f"{c['right_mean_axis1']:.4f}"
if c["right_mean_axis1"] is not None
else "N/A"
)
right_a2 = (
f"{c['right_mean_axis2']:.4f}"
if c["right_mean_axis2"] is not None
else "N/A"
)
cent_parties = ", ".join(c["centrist_parties_present"])
right_parties = ", ".join(c["right_parties_present"])
lines.append(
f"| {c['window_id']} | {cent_a1} | {cent_a2} | "
f"{right_a1} | {right_a2} | {cent_parties} | {right_parties} |"
)
lines.append("")
# Drift metrics
lines.append("## Drift Metrics (Annual Windows Only)\n")
if drift.get("net_displacement") is not None:
lines.append(
f"- **Net centrist displacement (first → last):** "
f"{drift['net_displacement']}"
)
lines.append(f" - Δ axis-1: {drift['net_dx']}")
lines.append(f" - Δ axis-2: {drift['net_dy']}")
lines.append(
f"- **Net direction:** {drift['angular_direction_deg']}° "
f"(arctan2(Δy, Δx))"
)
lines.append(f" - Positive Δx = rightward on axis 1")
lines.append(f" - Positive Δy = upward on axis 2\n")
if drift.get("right_net"):
rn = drift["right_net"]
lines.append("- **Right-wing net displacement (reference):**")
lines.append(f" - Net displacement: {rn['net_displacement']}")
lines.append(f" - Δ axis-1: {rn['net_dx']}")
lines.append(f" - Δ axis-2: {rn['net_dy']}\n")
if drift.get("approach_to_right"):
ar = drift["approach_to_right"]
lines.append("- **Centrist–right distance:**")
lines.append(f" - First window: {ar['first_distance']}")
lines.append(f" - Last window: {ar['last_distance']}")
lines.append(
f" - Δ distance: {ar['delta_distance']} "
f"(centrist center moving **{ar['direction']}**)\n"
)
lines.append("### Year-over-Year Drift\n")
lines.append("| Window Pair | Distance | Δ Axis-1 | Δ Axis-2 |")
lines.append("|---|---|---|---|")
total_dist = 0.0
for step in drift["euclidean_steps"]:
lines.append(
f"| {step['window_pair']} | {step['distance']:.6f} "
f"| {step['dx']:+.6f} | {step['dy']:+.6f} |"
)
total_dist += step["distance"]
lines.append(f"\n**Total path length:** {total_dist:.6f}\n")
else:
lines.append("Insufficient annual windows for drift computation.\n")
lines.append("## Chart\n")
lines.append(f"![Drift Chart]({os.path.basename(chart_path)})\n")
lines.append("## Interpretability Statement\n")
lines.append(
"Party positions use Procrustes-aligned PCA axes that provide a "
"common reference frame across all windows. Unlike raw per-window "
"SVD axes — which may re-orient between windows and cause 9/10 "
"consecutive window pairs to fail axis stability (Spearman ρ < 0.7) "
"— this alignment ensures that positional changes reflect genuine "
"shifts in voting behavior rather than axis re-orientation artifacts. "
"The centrist center-of-gravity movement on the 2D compass can be "
"interpreted as a measure of ideological drift.\n"
)
lines.append("---\n")
lines.append(
"*Note: PCA axes reflect voting patterns, not semantic content. "
"A shift means voting behavior changed, not that parties changed "
"their rhetoric. See: docs/solutions/best-practices/"
"svd-labels-voting-patterns-not-semantics.md*\n"
)
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, "w", encoding="utf-8") as f:
f.write("\n".join(lines) + "\n")
logger.info("Report saved to %s", output_path)
def main() -> Dict[str, Any]:
os.makedirs(str(REPORTS_DIR), exist_ok=True)
logger.info("Loading aligned party positions...")
windows = get_uniform_dim_windows(DB_PATH)
if not windows:
logger.error("No uniform-dim windows found in database")
return {"error": "No windows found", "windows_analyzed": 0}
scores = load_party_scores_all_windows_aligned(DB_PATH)
if not scores:
logger.error("No aligned party scores loaded")
return {"error": "No scores loaded", "windows_analyzed": 0}
logger.info("Found %d total windows: %s", len(windows), windows)
logger.info(
"Loaded scores for %d parties: %s",
len(scores),
sorted(scores.keys()),
)
# Classify windows: annual (pure digit years) vs non-annual
annual_indices: List[int] = []
non_annual: List[str] = []
for idx, w in enumerate(windows):
if w.strip().isdigit():
annual_indices.append(idx)
else:
non_annual.append(w)
annual_window_ids = [windows[i] for i in annual_indices]
logger.info("Annual windows (%d): %s", len(annual_window_ids), annual_window_ids)
if non_annual:
logger.info(
"Non-annual windows (excluded from drift): %s", sorted(non_annual)
)
# Compute centers for all windows
centers = compute_aligned_centers(scores, windows, annual_indices)
for c in centers:
logger.info(
"Window %s: %d centrist, %d right (annual=%s)",
c["window_id"],
c["centrist_count"],
c["right_count"],
c["is_annual"],
)
# Filter to annual-only for drift and chart
annual_centers = [c for c in centers if c["is_annual"]]
drift = compute_drift_metrics(annual_centers)
# Chart
chart_path = str(REPORTS_DIR / "svd_drift_chart.png")
plot_trajectory(annual_centers, chart_path)
# Report
report_path = str(REPORTS_DIR / "svd_stability_report.md")
write_report(centers, annual_centers, drift, report_path, chart_path, non_annual)
summary = {
"method": "Procrustes-aligned PCA",
"total_windows": len(windows),
"annual_windows_analyzed": len(annual_centers),
"non_annual_skipped": sorted(non_annual),
"parties_loaded": len(scores),
"windows": windows,
"net_displacement": drift.get("net_displacement"),
"net_dx": drift.get("net_dx"),
"net_dy": drift.get("net_dy"),
"angular_direction_deg": drift.get("angular_direction_deg"),
"approach_to_right": drift.get("approach_to_right"),
}
logger.info("Summary: %s", json.dumps(summary, indent=2))
return summary
if __name__ == "__main__":
result = main()
print(json.dumps(result, indent=2))