You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
549 lines
19 KiB
549 lines
19 KiB
#!/usr/bin/env python3
|
|
"""Quantify Overton window shift via Procrustes-aligned center drift.
|
|
|
|
Uses Procrustes-aligned, PCA-rotated 2D party positions from
|
|
load_party_scores_all_windows_aligned() to measure rightward drift
|
|
of the centrist center of gravity on a common reference frame.
|
|
Axes are aligned across all windows — no stability validation needed.
|
|
|
|
Usage:
|
|
uv run python analysis/right_wing/overton_svd_drift.py
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List
|
|
|
|
import matplotlib
|
|
import matplotlib.pyplot as plt
|
|
import numpy as np
|
|
|
|
matplotlib.use("Agg")
|
|
|
|
ROOT = Path(__file__).parent.parent.parent.resolve()
|
|
if str(ROOT) not in sys.path:
|
|
sys.path.insert(0, str(ROOT))
|
|
|
|
from analysis.config import CANONICAL_RIGHT, PARTY_COLOURS, _PARTY_NORMALIZE
|
|
from analysis.explorer_data import (
|
|
get_uniform_dim_windows,
|
|
load_party_scores_all_windows_aligned,
|
|
)
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
|
logger = logging.getLogger("overton_svd_drift")
|
|
|
|
CANONICAL_CENTRIST = frozenset(
|
|
{"VVD", "D66", "CDA", "NSC", "BBB", "CU", "ChristenUnie"}
|
|
)
|
|
|
|
DB_PATH = str(ROOT / "data" / "motions.db")
|
|
REPORTS_DIR = ROOT / "reports" / "overton_window"
|
|
|
|
|
|
def _normalize_party(raw: str) -> str:
|
|
"""Normalize a raw party name to its canonical abbreviation."""
|
|
return _PARTY_NORMALIZE.get(raw, raw)
|
|
|
|
|
|
def _party_in_set(party: str, canonical_set: frozenset) -> bool:
|
|
"""Check party membership against a canonical set.
|
|
|
|
Checks the raw party name and its normalized form so that both
|
|
'CU' and 'ChristenUnie' match a set containing either variant.
|
|
"""
|
|
if party in canonical_set:
|
|
return True
|
|
normalized = _normalize_party(party)
|
|
return normalized != party and normalized in canonical_set
|
|
|
|
|
|
def _fmt_axis(val: float | None) -> str:
|
|
return f"{val:.4f}" if val is not None else "N/A"
|
|
|
|
|
|
def compute_aligned_centers(
|
|
scores: Dict[str, List[List[float]]],
|
|
windows: List[str],
|
|
annual_indices: List[int],
|
|
) -> List[Dict[str, Any]]:
|
|
"""Compute centrist and right-wing centers of gravity per window.
|
|
|
|
Uses Procrustes-aligned party positions from
|
|
load_party_scores_all_windows_aligned(). Missing parties in a
|
|
window are simply skipped (mean over available parties).
|
|
"""
|
|
results: List[Dict[str, Any]] = []
|
|
|
|
for idx, window_id in enumerate(windows):
|
|
centrist_a1: List[float] = []
|
|
centrist_a2: List[float] = []
|
|
right_a1: List[float] = []
|
|
right_a2: List[float] = []
|
|
centrist_present: List[str] = []
|
|
right_present: List[str] = []
|
|
|
|
for party, window_scores in scores.items():
|
|
if idx >= len(window_scores):
|
|
continue
|
|
a1, a2 = window_scores[idx]
|
|
|
|
if _party_in_set(party, CANONICAL_CENTRIST):
|
|
centrist_a1.append(a1)
|
|
centrist_a2.append(a2)
|
|
centrist_present.append(party)
|
|
if _party_in_set(party, CANONICAL_RIGHT):
|
|
right_a1.append(a1)
|
|
right_a2.append(a2)
|
|
right_present.append(party)
|
|
|
|
results.append(
|
|
{
|
|
"window_id": window_id,
|
|
"centrist_mean_axis1": float(np.mean(centrist_a1)) if centrist_a1 else None,
|
|
"centrist_mean_axis2": float(np.mean(centrist_a2)) if centrist_a2 else None,
|
|
"right_mean_axis1": float(np.mean(right_a1)) if right_a1 else None,
|
|
"right_mean_axis2": float(np.mean(right_a2)) if right_a2 else None,
|
|
"centrist_parties_present": sorted(centrist_present),
|
|
"right_parties_present": sorted(right_present),
|
|
"centrist_count": len(centrist_present),
|
|
"right_count": len(right_present),
|
|
"is_annual": idx in annual_indices,
|
|
}
|
|
)
|
|
|
|
return results
|
|
|
|
|
|
def compute_drift_metrics(
|
|
annual_centers: List[Dict[str, Any]],
|
|
) -> Dict[str, Any]:
|
|
"""Compute drift metrics for annual windows only.
|
|
|
|
Returns:
|
|
euclidean_steps: year-over-year displacements
|
|
net_displacement: first-to-last Euclidean distance
|
|
angular_direction_deg: arctan2(dy, dx) in degrees
|
|
approach_to_right: whether centrist center is moving toward
|
|
or away from the right-wing center
|
|
right_net: net displacement of right-wing center for comparison
|
|
"""
|
|
valid = [c for c in annual_centers if c["centrist_mean_axis1"] is not None]
|
|
|
|
if len(valid) < 2:
|
|
return {
|
|
"euclidean_steps": [],
|
|
"net_displacement": None,
|
|
"net_dx": None,
|
|
"net_dy": None,
|
|
"angular_direction_deg": None,
|
|
"approach_to_right": None,
|
|
"right_net": None,
|
|
}
|
|
|
|
euclidean_steps = []
|
|
for i in range(len(valid) - 1):
|
|
dx = (
|
|
valid[i + 1]["centrist_mean_axis1"]
|
|
- valid[i]["centrist_mean_axis1"]
|
|
)
|
|
dy = (
|
|
valid[i + 1]["centrist_mean_axis2"]
|
|
- valid[i]["centrist_mean_axis2"]
|
|
)
|
|
dist = float(np.sqrt(dx**2 + dy**2))
|
|
euclidean_steps.append(
|
|
{
|
|
"window_pair": f"{valid[i]['window_id']}-{valid[i+1]['window_id']}",
|
|
"distance": round(dist, 6),
|
|
"dx": round(dx, 6),
|
|
"dy": round(dy, 6),
|
|
}
|
|
)
|
|
|
|
first = valid[0]
|
|
last = valid[-1]
|
|
dx_net = last["centrist_mean_axis1"] - first["centrist_mean_axis1"]
|
|
dy_net = last["centrist_mean_axis2"] - first["centrist_mean_axis2"]
|
|
net_disp = float(np.sqrt(dx_net**2 + dy_net**2))
|
|
angle_rad = np.arctan2(dy_net, dx_net)
|
|
angle_deg = float(np.degrees(angle_rad))
|
|
|
|
right_net = None
|
|
right_valid = [
|
|
c for c in annual_centers if c["right_mean_axis1"] is not None
|
|
]
|
|
if len(right_valid) >= 2:
|
|
r_first = right_valid[0]
|
|
r_last = right_valid[-1]
|
|
r_dx = r_last["right_mean_axis1"] - r_first["right_mean_axis1"]
|
|
r_dy = r_last["right_mean_axis2"] - r_first["right_mean_axis2"]
|
|
right_net = {
|
|
"net_displacement": round(float(np.sqrt(r_dx**2 + r_dy**2)), 6),
|
|
"net_dx": round(r_dx, 6),
|
|
"net_dy": round(r_dy, 6),
|
|
}
|
|
|
|
approach_to_right = None
|
|
if (
|
|
first.get("right_mean_axis1") is not None
|
|
and last.get("right_mean_axis1") is not None
|
|
):
|
|
first_dist = float(
|
|
np.sqrt(
|
|
(first["centrist_mean_axis1"] - first["right_mean_axis1"]) ** 2
|
|
+ (first["centrist_mean_axis2"] - first["right_mean_axis2"]) ** 2
|
|
)
|
|
)
|
|
last_dist = float(
|
|
np.sqrt(
|
|
(last["centrist_mean_axis1"] - last["right_mean_axis1"]) ** 2
|
|
+ (last["centrist_mean_axis2"] - last["right_mean_axis2"]) ** 2
|
|
)
|
|
)
|
|
delta = last_dist - first_dist
|
|
if abs(delta) < 1e-9:
|
|
direction = "unchanged"
|
|
elif delta < 0:
|
|
direction = "toward right"
|
|
else:
|
|
direction = "away from right"
|
|
approach_to_right = {
|
|
"first_distance": round(first_dist, 6),
|
|
"last_distance": round(last_dist, 6),
|
|
"delta_distance": round(delta, 6),
|
|
"direction": direction,
|
|
}
|
|
|
|
return {
|
|
"euclidean_steps": euclidean_steps,
|
|
"net_displacement": round(net_disp, 6),
|
|
"net_dx": round(dx_net, 6),
|
|
"net_dy": round(dy_net, 6),
|
|
"angular_direction_deg": round(angle_deg, 2),
|
|
"approach_to_right": approach_to_right,
|
|
"right_net": right_net,
|
|
}
|
|
|
|
|
|
def plot_trajectory(
|
|
annual_centers: List[Dict[str, Any]],
|
|
output_path: str,
|
|
) -> None:
|
|
"""Plot centrist center trajectory with right-wing reference on 2D compass.
|
|
|
|
Uses arrows between consecutive annual windows and year labels.
|
|
"""
|
|
fig, ax = plt.subplots(figsize=(10, 8))
|
|
|
|
cent_a1 = [c["centrist_mean_axis1"] for c in annual_centers]
|
|
cent_a2 = [c["centrist_mean_axis2"] for c in annual_centers]
|
|
windows_labels = [
|
|
c["window_id"]
|
|
for c in annual_centers
|
|
if c["centrist_mean_axis1"] is not None
|
|
]
|
|
cent_a1_valid = [v for v in cent_a1 if v is not None]
|
|
cent_a2_valid = [v for v in cent_a2 if v is not None]
|
|
|
|
if len(cent_a1_valid) < 2:
|
|
ax.text(
|
|
0.5,
|
|
0.5,
|
|
"Insufficient data for trajectory plot",
|
|
transform=ax.transAxes,
|
|
ha="center",
|
|
va="center",
|
|
)
|
|
fig.savefig(output_path, dpi=150, bbox_inches="tight", facecolor="white")
|
|
plt.close(fig)
|
|
return
|
|
|
|
for i in range(len(cent_a1_valid) - 1):
|
|
ax.annotate(
|
|
"",
|
|
xy=(cent_a1_valid[i + 1], cent_a2_valid[i + 1]),
|
|
xytext=(cent_a1_valid[i], cent_a2_valid[i]),
|
|
arrowprops=dict(arrowstyle="->", color="#1E73BE", lw=1.5, alpha=0.6),
|
|
)
|
|
|
|
ax.plot(
|
|
cent_a1_valid,
|
|
cent_a2_valid,
|
|
"o-",
|
|
color="#1E73BE",
|
|
linewidth=2,
|
|
markersize=8,
|
|
label="Centrist center (VVD, D66, CDA, NSC, BBB, CU)",
|
|
zorder=3,
|
|
)
|
|
|
|
# Right-wing trajectory (dashed reference)
|
|
right_a1 = [c["right_mean_axis1"] for c in annual_centers]
|
|
right_a2 = [c["right_mean_axis2"] for c in annual_centers]
|
|
right_a1_valid = [v for v in right_a1 if v is not None]
|
|
right_a2_valid = [v for v in right_a2 if v is not None]
|
|
|
|
if right_a1_valid and right_a2_valid:
|
|
ax.plot(
|
|
right_a1_valid,
|
|
right_a2_valid,
|
|
"s--",
|
|
color="#6A1B9A",
|
|
linewidth=1.5,
|
|
markersize=6,
|
|
label="Right-wing center (PVV, FVD, JA21, SGP)",
|
|
alpha=0.7,
|
|
zorder=2,
|
|
)
|
|
|
|
# Year labels
|
|
for i, label in enumerate(windows_labels):
|
|
if i < len(cent_a1_valid):
|
|
ax.annotate(
|
|
str(label),
|
|
(cent_a1_valid[i], cent_a2_valid[i]),
|
|
textcoords="offset points",
|
|
xytext=(7, 7),
|
|
fontsize=8,
|
|
color="#333333",
|
|
)
|
|
|
|
ax.axhline(0, color="#CCCCCC", linewidth=0.5, linestyle="-")
|
|
ax.axvline(0, color="#CCCCCC", linewidth=0.5, linestyle="-")
|
|
|
|
ax.set_xlabel("PCA Axis 1 (Procrustes-aligned)")
|
|
ax.set_ylabel("PCA Axis 2 (Procrustes-aligned)")
|
|
ax.set_title(
|
|
"Parliamentary Center Trajectory (Procrustes-Aligned PCA)",
|
|
fontsize=11,
|
|
)
|
|
ax.legend(loc="upper left", fontsize=8, framealpha=0.9)
|
|
ax.set_aspect("equal", adjustable="datalim")
|
|
ax.grid(True, alpha=0.3)
|
|
|
|
fig.tight_layout()
|
|
fig.savefig(output_path, dpi=150, bbox_inches="tight", facecolor="white")
|
|
plt.close(fig)
|
|
logger.info("Chart saved to %s", output_path)
|
|
|
|
|
|
def write_report(
|
|
centers: List[Dict[str, Any]],
|
|
annual_centers: List[Dict[str, Any]],
|
|
drift: Dict[str, Any],
|
|
output_path: str,
|
|
chart_path: str,
|
|
non_annual: List[str],
|
|
) -> None:
|
|
"""Write the center drift report as Markdown."""
|
|
lines: List[str] = []
|
|
|
|
lines.append("# Center Drift Report (Procrustes-Aligned)\n")
|
|
|
|
lines.append("## Alignment Method\n")
|
|
lines.append(
|
|
"Party positions are Procrustes-aligned across all windows, then "
|
|
"PCA-rotated to a common 2D reference frame. This ensures that axis "
|
|
"orientation is consistent across time — no stability validation is "
|
|
"needed because all positions live in the same coordinate system.\n"
|
|
)
|
|
lines.append(
|
|
"This is the same alignment used by the Explorer UI compass and "
|
|
"trajectories: 1) zero-padding vectors to max dimension across all "
|
|
"windows, 2) chained Procrustes orthogonal rotation (each window to "
|
|
"the previous aligned one), 3) global PCA on the stacked aligned "
|
|
"matrix, 4) flip-correction per component using canonical left/right "
|
|
"parties.\n"
|
|
)
|
|
|
|
if non_annual:
|
|
lines.append(
|
|
f"**Note:** Non-annual windows excluded from drift analysis: "
|
|
f"{', '.join(sorted(non_annual))}\n"
|
|
)
|
|
|
|
lines.append("## Centrist Center of Gravity\n")
|
|
lines.append(
|
|
"| Window | Centrist Ax1 | Centrist Ax2 | Right Ax1 | Right Ax2 | "
|
|
"Centrist Parties | Right Parties |"
|
|
)
|
|
lines.append("|---|---|---|---|---|---|---|")
|
|
for c in centers:
|
|
cent_a1 = _fmt_axis(c["centrist_mean_axis1"])
|
|
cent_a2 = _fmt_axis(c["centrist_mean_axis2"])
|
|
right_a1 = _fmt_axis(c["right_mean_axis1"])
|
|
right_a2 = _fmt_axis(c["right_mean_axis2"])
|
|
cent_parties = ", ".join(c["centrist_parties_present"])
|
|
right_parties = ", ".join(c["right_parties_present"])
|
|
lines.append(
|
|
f"| {c['window_id']} | {cent_a1} | {cent_a2} | "
|
|
f"{right_a1} | {right_a2} | {cent_parties} | {right_parties} |"
|
|
)
|
|
|
|
lines.append("")
|
|
|
|
lines.append("## Drift Metrics (Annual Windows Only)\n")
|
|
|
|
if drift.get("net_displacement") is not None:
|
|
lines.append(
|
|
f"- **Net centrist displacement (first → last):** "
|
|
f"{drift['net_displacement']}"
|
|
)
|
|
lines.append(f" - Δ axis-1: {drift['net_dx']}")
|
|
lines.append(f" - Δ axis-2: {drift['net_dy']}")
|
|
lines.append(
|
|
f"- **Net direction:** {drift['angular_direction_deg']}° "
|
|
f"(arctan2(Δy, Δx))"
|
|
)
|
|
lines.append(f" - Positive Δx = rightward on axis 1")
|
|
lines.append(f" - Positive Δy = upward on axis 2\n")
|
|
|
|
if drift.get("right_net"):
|
|
rn = drift["right_net"]
|
|
lines.append("- **Right-wing net displacement (reference):**")
|
|
lines.append(f" - Net displacement: {rn['net_displacement']}")
|
|
lines.append(f" - Δ axis-1: {rn['net_dx']}")
|
|
lines.append(f" - Δ axis-2: {rn['net_dy']}\n")
|
|
|
|
if drift.get("approach_to_right"):
|
|
ar = drift["approach_to_right"]
|
|
lines.append("- **Centrist–right distance:**")
|
|
lines.append(f" - First window: {ar['first_distance']}")
|
|
lines.append(f" - Last window: {ar['last_distance']}")
|
|
lines.append(
|
|
f" - Δ distance: {ar['delta_distance']} "
|
|
f"(centrist center moving **{ar['direction']}**)\n"
|
|
)
|
|
|
|
lines.append("### Year-over-Year Drift\n")
|
|
lines.append("| Window Pair | Distance | Δ Axis-1 | Δ Axis-2 |")
|
|
lines.append("|---|---|---|---|")
|
|
total_dist = 0.0
|
|
for step in drift["euclidean_steps"]:
|
|
lines.append(
|
|
f"| {step['window_pair']} | {step['distance']:.6f} "
|
|
f"| {step['dx']:+.6f} | {step['dy']:+.6f} |"
|
|
)
|
|
total_dist += step["distance"]
|
|
lines.append(f"\n**Total path length:** {total_dist:.6f}\n")
|
|
else:
|
|
lines.append("Insufficient annual windows for drift computation.\n")
|
|
|
|
lines.append("## Chart\n")
|
|
lines.append(f"})\n")
|
|
|
|
lines.append("## Interpretability Statement\n")
|
|
lines.append(
|
|
"Party positions use Procrustes-aligned PCA axes that provide a "
|
|
"common reference frame across all windows. Unlike raw per-window "
|
|
"SVD axes — which may re-orient between windows and cause 9/10 "
|
|
"consecutive window pairs to fail axis stability (Spearman ρ < 0.7) "
|
|
"— this alignment ensures that positional changes reflect genuine "
|
|
"shifts in voting behavior rather than axis re-orientation artifacts. "
|
|
"The centrist center-of-gravity movement on the 2D compass can be "
|
|
"interpreted as a measure of ideological drift.\n"
|
|
)
|
|
|
|
lines.append("---\n")
|
|
lines.append(
|
|
"*Note: PCA axes reflect voting patterns, not semantic content. "
|
|
"A shift means voting behavior changed, not that parties changed "
|
|
"their rhetoric. See: docs/solutions/best-practices/"
|
|
"svd-labels-voting-patterns-not-semantics.md*\n"
|
|
)
|
|
|
|
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
|
with open(output_path, "w", encoding="utf-8") as f:
|
|
f.write("\n".join(lines) + "\n")
|
|
logger.info("Report saved to %s", output_path)
|
|
|
|
|
|
def main() -> Dict[str, Any]:
|
|
os.makedirs(str(REPORTS_DIR), exist_ok=True)
|
|
|
|
logger.info("Loading aligned party positions...")
|
|
windows = get_uniform_dim_windows(DB_PATH)
|
|
if not windows:
|
|
logger.error("No uniform-dim windows found in database")
|
|
return {"error": "No windows found", "windows_analyzed": 0}
|
|
|
|
scores = load_party_scores_all_windows_aligned(DB_PATH)
|
|
if not scores:
|
|
logger.error("No aligned party scores loaded")
|
|
return {"error": "No scores loaded", "windows_analyzed": 0}
|
|
|
|
logger.info("Found %d total windows: %s", len(windows), windows)
|
|
logger.info(
|
|
"Loaded scores for %d parties: %s",
|
|
len(scores),
|
|
sorted(scores.keys()),
|
|
)
|
|
|
|
# Classify windows: annual (pure digit years) vs non-annual
|
|
annual_indices: List[int] = []
|
|
non_annual: List[str] = []
|
|
for idx, w in enumerate(windows):
|
|
if w.strip().isdigit():
|
|
annual_indices.append(idx)
|
|
else:
|
|
non_annual.append(w)
|
|
|
|
annual_window_ids = [windows[i] for i in annual_indices]
|
|
logger.info("Annual windows (%d): %s", len(annual_window_ids), annual_window_ids)
|
|
if non_annual:
|
|
logger.info(
|
|
"Non-annual windows (excluded from drift): %s", sorted(non_annual)
|
|
)
|
|
|
|
# Compute centers for all windows
|
|
centers = compute_aligned_centers(scores, windows, annual_indices)
|
|
|
|
for c in centers:
|
|
logger.info(
|
|
"Window %s: %d centrist, %d right (annual=%s)",
|
|
c["window_id"],
|
|
c["centrist_count"],
|
|
c["right_count"],
|
|
c["is_annual"],
|
|
)
|
|
|
|
# Filter to annual-only for drift and chart
|
|
annual_centers = [c for c in centers if c["is_annual"]]
|
|
|
|
drift = compute_drift_metrics(annual_centers)
|
|
|
|
# Chart
|
|
chart_path = str(REPORTS_DIR / "svd_drift_chart.png")
|
|
plot_trajectory(annual_centers, chart_path)
|
|
|
|
# Report
|
|
report_path = str(REPORTS_DIR / "svd_stability_report.md")
|
|
write_report(centers, annual_centers, drift, report_path, chart_path, non_annual)
|
|
|
|
summary = {
|
|
"method": "Procrustes-aligned PCA",
|
|
"total_windows": len(windows),
|
|
"annual_windows_analyzed": len(annual_centers),
|
|
"non_annual_skipped": sorted(non_annual),
|
|
"parties_loaded": len(scores),
|
|
"windows": windows,
|
|
"net_displacement": drift.get("net_displacement"),
|
|
"net_dx": drift.get("net_dx"),
|
|
"net_dy": drift.get("net_dy"),
|
|
"angular_direction_deg": drift.get("angular_direction_deg"),
|
|
"approach_to_right": drift.get("approach_to_right"),
|
|
}
|
|
|
|
logger.info("Summary: %s", json.dumps(summary, indent=2))
|
|
return summary
|
|
|
|
|
|
if __name__ == "__main__":
|
|
result = main()
|
|
print(json.dumps(result, indent=2))
|
|
|