You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
motief/scripts/motion_drift.py

1131 lines
38 KiB

"""motion_drift.py — Analyze semantic drift of SVD axes over time.
Tracks how motion semantics evolve across annual windows (2016-2024):
- Axis stability: which SVD components are comparable across time
- Semantic drift: how top motions on each axis change semantically
- Party voting: how parties move along axes and cross-ideological voting
Usage:
uv run python scripts/motion_drift.py --db data/motions.db --output reports/drift
uv run python scripts/motion_drift.py --db data/motions.db --windows 2019 2020 2021 --top-n 30
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import sys
from collections import defaultdict
from typing import Dict, List, Optional, Tuple
import duckdb
import numpy as np
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT not in sys.path:
sys.path.insert(0, ROOT)
logger = logging.getLogger("motion_drift")
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
def _validate_schema(con: duckdb.DuckDBPyConnection) -> List[str]:
"""Check that required tables exist. Returns list of missing tables."""
required = {"svd_vectors", "fused_embeddings", "mp_votes", "motions"}
tables = {r[0] for r in con.execute("SHOW TABLES").fetchall()}
return list(required - tables)
def _load_vectors(
con: duckdb.DuckDBPyConnection, table: str, window_id: str, entity_type: str
) -> Dict[str, np.ndarray]:
"""Load vectors from a table for a given window and entity type.
Returns {entity_id: vector_array}.
"""
rows = con.execute(
f"SELECT entity_id, vector FROM {table} WHERE window_id = ? AND entity_type = ?",
[window_id, entity_type],
).fetchall()
result = {}
for entity_id, raw_vec in rows:
if isinstance(raw_vec, str):
vec = json.loads(raw_vec)
elif isinstance(raw_vec, (bytes, bytearray)):
vec = json.loads(raw_vec.decode())
elif isinstance(raw_vec, list):
vec = raw_vec
else:
vec = list(raw_vec)
result[entity_id] = np.array([float(v) if v is not None else 0.0 for v in vec])
return result
def _load_motion_scores(
con: duckdb.DuckDBPyConnection, window_id: str
) -> Dict[int, np.ndarray]:
"""Load motion SVD scores for a window. Returns {motion_id: score_array}."""
return _load_vectors(con, "svd_vectors", window_id, "motion")
def _load_fused_embeddings(
con: duckdb.DuckDBPyConnection, window_id: str
) -> Dict[str, np.ndarray]:
"""Load fused embeddings for a window. Returns {str(motion_id): embedding_array}."""
rows = con.execute(
"SELECT motion_id, vector FROM fused_embeddings WHERE window_id = ?",
[window_id],
).fetchall()
result = {}
for motion_id, raw_vec in rows:
if isinstance(raw_vec, str):
vec = json.loads(raw_vec)
elif isinstance(raw_vec, (bytes, bytearray)):
vec = json.loads(raw_vec.decode())
elif isinstance(raw_vec, list):
vec = raw_vec
else:
vec = list(raw_vec)
result[str(motion_id)] = np.array(
[float(v) if v is not None else 0.0 for v in vec]
)
return result
def _get_annual_windows(con: duckdb.DuckDBPyConnection) -> List[str]:
"""Get list of annual windows (non-quarterly) that have fused embeddings, sorted by year."""
rows = con.execute(
"""
SELECT DISTINCT f.window_id
FROM fused_embeddings f
JOIN svd_vectors s ON f.window_id = s.window_id AND s.entity_type = 'motion'
WHERE f.window_id NOT LIKE '%-Q%'
ORDER BY f.window_id
"""
).fetchall()
return [r[0] for r in rows]
def _get_top_motions_per_component(
motion_scores: Dict[int, np.ndarray], top_n: int, n_components: int = 10
) -> Dict[int, List[int]]:
"""Get top N motions per component by absolute loading.
Returns {component_idx: [motion_ids]}.
"""
# Build matrix: motions × components
motion_ids = list(motion_scores.keys())
if not motion_ids:
return {}
matrix = np.array([motion_scores[mid] for mid in motion_ids])
# Pad or truncate to n_components
if matrix.shape[1] < n_components:
matrix = np.pad(matrix, ((0, 0), (0, n_components - matrix.shape[1])))
matrix = matrix[:, :n_components]
result = {}
for comp_idx in range(n_components):
abs_scores = np.abs(matrix[:, comp_idx])
top_indices = np.argsort(abs_scores)[-top_n:][::-1]
result[comp_idx + 1] = [motion_ids[i] for i in top_indices]
return result
def compute_axis_stability(
con: duckdb.DuckDBPyConnection,
windows: List[str],
top_n: int = 20,
n_components: int = 10,
stability_threshold: float = 0.7,
) -> Dict:
"""Compute axis stability across windows using Procrustes-aligned SVD scores.
Aligns motion score matrices across windows using orthogonal Procrustes
to handle SVD sign ambiguity, then computes cosine similarity of per-component
centroids.
Returns dict with stability_matrix, stable_axes, reordered_axes, unstable_axes.
"""
from scipy.linalg import orthogonal_procrustes
# Load motion scores per window
window_scores: Dict[str, Dict[int, np.ndarray]] = {}
for w in windows:
motion_scores = _load_motion_scores(con, w)
if not motion_scores:
continue
window_scores[w] = motion_scores
if len(window_scores) < 2:
return {
"stability_matrix": np.array([]),
"avg_stability": np.zeros(n_components),
"stable_axes": [],
"reordered_axes": [],
"unstable_axes": list(range(1, n_components + 1)),
"windows": list(window_scores.keys()),
}
# Build motion score matrices per window (motions × components)
# Use common motions across all windows for alignment
common_motions = None
for w, scores in window_scores.items():
motions = set(scores.keys())
if common_motions is None:
common_motions = motions
else:
common_motions = common_motions & motions
if not common_motions or len(common_motions) < n_components:
# Fallback: use sign consistency based on canonical party scores
return _compute_stability_fallback(
con, windows, n_components, stability_threshold
)
common_motions = sorted(common_motions)
# Build matrices: each row is a motion's first n_components scores
matrices = {}
for w, scores in window_scores.items():
mat = np.array(
[scores[m][:n_components] for m in common_motions if m in scores]
)
if mat.shape[0] >= len(common_motions) * 0.5: # At least 50% coverage
matrices[w] = mat
if len(matrices) < 2:
return {
"stability_matrix": np.array([]),
"avg_stability": np.zeros(n_components),
"stable_axes": [],
"reordered_axes": [],
"unstable_axes": list(range(1, n_components + 1)),
"windows": list(window_scores.keys()),
}
# Align all matrices to the first window using Procrustes
window_list = sorted(matrices.keys())
ref_matrix = matrices[window_list[0]]
aligned_matrices = {window_list[0]: ref_matrix}
for w in window_list[1:]:
mat = matrices[w]
# Orthogonal Procrustes: find rotation R that best aligns mat to ref
R, _ = orthogonal_procrustes(mat, ref_matrix)
aligned_matrices[w] = mat @ R
# Compute per-component centroids and cosine similarity
stability_matrix = np.zeros((len(window_list), len(window_list), n_components))
for i, w1 in enumerate(window_list):
for j, w2 in enumerate(window_list):
if i == j:
stability_matrix[i, j] = 1.0
continue
for comp in range(n_components):
a = aligned_matrices[w1][:, comp]
b = aligned_matrices[w2][:, comp]
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
if norm_a == 0 or norm_b == 0:
stability_matrix[i, j, comp] = 0.0
else:
stability_matrix[i, j, comp] = np.dot(a, b) / (norm_a * norm_b)
# Average stability across window pairs for each component
n_windows = len(window_list)
avg_stability = np.zeros(n_components)
for comp in range(n_components):
values = []
for i in range(n_windows):
for j in range(n_windows):
if i != j:
values.append(stability_matrix[i, j, comp])
avg_stability[comp] = np.mean(values) if values else 0.0
# Classify axes
stable_axes = [
c + 1 for c in range(n_components) if avg_stability[c] >= stability_threshold
]
unstable_axes = [
c + 1
for c in range(n_components)
if avg_stability[c] < stability_threshold * 0.5
]
reordered_axes = [
c + 1
for c in range(n_components)
if stability_threshold * 0.5 <= avg_stability[c] < stability_threshold
]
return {
"stability_matrix": stability_matrix,
"avg_stability": avg_stability,
"stable_axes": stable_axes,
"reordered_axes": reordered_axes,
"unstable_axes": unstable_axes,
"windows": window_list,
}
def _compute_stability_fallback(
con: duckdb.DuckDBPyConnection,
windows: List[str],
n_components: int,
stability_threshold: float,
) -> Dict:
"""Fallback: use sign consistency of canonical party scores for stability.
Computes mean SVD scores per party per window, then checks whether
canonical right-left score differences have consistent signs across windows.
"""
from analysis.config import CANONICAL_LEFT, CANONICAL_RIGHT
party_axes: Dict[str, Dict[int, float]] = {}
for w in windows:
# Get MP vectors with party mapping
rows = con.execute(
"""
SELECT m.party, s.vector
FROM svd_vectors s
JOIN mp_metadata m ON s.entity_id = m.mp_name
WHERE s.window_id = ? AND s.entity_type = 'mp' AND m.party IS NOT NULL
""",
[w],
).fetchall()
party_vectors = {}
for party, raw_vec in rows:
if isinstance(raw_vec, str):
vec = json.loads(raw_vec)
else:
vec = list(raw_vec)
party = party.strip()
if party not in party_vectors:
party_vectors[party] = []
party_vectors[party].append(
np.array([float(v) if v is not None else 0.0 for v in vec])
)
# Compute mean score per component for canonical parties
comp_means = {}
for comp in range(n_components):
right_vals = []
left_vals = []
for party, vectors in party_vectors.items():
mean_score = np.mean([v[comp] for v in vectors])
if party in CANONICAL_RIGHT:
right_vals.append(mean_score)
elif party in CANONICAL_LEFT:
left_vals.append(mean_score)
if right_vals and left_vals:
comp_means[comp + 1] = np.mean(right_vals) - np.mean(left_vals)
if comp_means:
party_axes[w] = comp_means
if len(party_axes) < 2:
return {
"stability_matrix": np.array([]),
"avg_stability": np.zeros(n_components),
"stable_axes": [],
"reordered_axes": [],
"unstable_axes": list(range(1, n_components + 1)),
"windows": list(party_axes.keys()),
}
# Compute sign consistency across windows
window_list = sorted(party_axes.keys())
stability_matrix = np.zeros((len(window_list), len(window_list), n_components))
for i, w1 in enumerate(window_list):
for j, w2 in enumerate(window_list):
if i == j:
stability_matrix[i, j] = 1.0
continue
for comp in range(1, n_components + 1):
s1 = np.sign(party_axes[w1].get(comp, 0))
s2 = np.sign(party_axes[w2].get(comp, 0))
stability_matrix[i, j, comp - 1] = 1.0 if s1 == s2 and s1 != 0 else 0.0
n_windows = len(window_list)
avg_stability = np.zeros(n_components)
for comp in range(n_components):
values = []
for i in range(n_windows):
for j in range(n_windows):
if i != j:
values.append(stability_matrix[i, j, comp])
avg_stability[comp] = np.mean(values) if values else 0.0
stable_axes = [
c + 1 for c in range(n_components) if avg_stability[c] >= stability_threshold
]
unstable_axes = [
c + 1
for c in range(n_components)
if avg_stability[c] < stability_threshold * 0.5
]
reordered_axes = [
c + 1
for c in range(n_components)
if stability_threshold * 0.5 <= avg_stability[c] < stability_threshold
]
return {
"stability_matrix": stability_matrix,
"avg_stability": avg_stability,
"stable_axes": stable_axes,
"reordered_axes": reordered_axes,
"unstable_axes": unstable_axes,
"windows": window_list,
}
# Compute sign consistency across windows
window_list = sorted(party_axes.keys())
stability_matrix = np.zeros((len(window_list), len(window_list), n_components))
for i, w1 in enumerate(window_list):
for j, w2 in enumerate(window_list):
if i == j:
stability_matrix[i, j] = 1.0
continue
for comp in range(1, n_components + 1):
s1 = np.sign(party_axes[w1].get(comp, 0))
s2 = np.sign(party_axes[w2].get(comp, 0))
stability_matrix[i, j, comp - 1] = 1.0 if s1 == s2 and s1 != 0 else 0.0
n_windows = len(window_list)
avg_stability = np.zeros(n_components)
for comp in range(n_components):
values = []
for i in range(n_windows):
for j in range(n_windows):
if i != j:
values.append(stability_matrix[i, j, comp])
avg_stability[comp] = np.mean(values) if values else 0.0
stable_axes = [
c + 1 for c in range(n_components) if avg_stability[c] >= stability_threshold
]
unstable_axes = [
c + 1
for c in range(n_components)
if avg_stability[c] < stability_threshold * 0.5
]
reordered_axes = [
c + 1
for c in range(n_components)
if stability_threshold * 0.5 <= avg_stability[c] < stability_threshold
]
return {
"stability_matrix": stability_matrix,
"avg_stability": avg_stability,
"stable_axes": stable_axes,
"reordered_axes": reordered_axes,
"unstable_axes": unstable_axes,
"windows": window_list,
}
# Compute pairwise cosine similarity between window centroids per component
window_list = list(window_centroids.keys())
stability_matrix = np.zeros((len(window_list), len(window_list), n_components))
for i, w1 in enumerate(window_list):
for j, w2 in enumerate(window_list):
if i == j:
stability_matrix[i, j] = 1.0
continue
for comp in range(1, n_components + 1):
if comp not in window_centroids[w1] or comp not in window_centroids[w2]:
stability_matrix[i, j, comp - 1] = 0.0
continue
a = window_centroids[w1][comp]
b = window_centroids[w2][comp]
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
if norm_a == 0 or norm_b == 0:
stability_matrix[i, j, comp - 1] = 0.0
else:
stability_matrix[i, j, comp - 1] = np.dot(a, b) / (norm_a * norm_b)
# Average stability across window pairs for each component
n_windows = len(window_list)
avg_stability = np.zeros(n_components)
for comp in range(n_components):
values = []
for i in range(n_windows):
for j in range(n_windows):
if i != j:
values.append(stability_matrix[i, j, comp])
avg_stability[comp] = np.mean(values) if values else 0.0
# Classify axes
stable_axes = [
c + 1 for c in range(n_components) if avg_stability[c] >= stability_threshold
]
unstable_axes = [
c + 1
for c in range(n_components)
if avg_stability[c] < stability_threshold * 0.5
]
reordered_axes = [
c + 1
for c in range(n_components)
if stability_threshold * 0.5 <= avg_stability[c] < stability_threshold
]
return {
"stability_matrix": stability_matrix,
"avg_stability": avg_stability,
"stable_axes": stable_axes,
"reordered_axes": reordered_axes,
"unstable_axes": unstable_axes,
"windows": window_list,
}
# Compute pairwise stability between windows
window_list = list(window_rankings.keys())
stability_matrix = np.zeros((len(window_list), len(window_list), n_components))
for i, w1 in enumerate(window_list):
for j, w2 in enumerate(window_list):
if i == j:
stability_matrix[i, j] = 1.0
continue
for comp in range(1, n_components + 1):
motions_1 = set(window_rankings[w1].get(comp, []))
motions_2 = set(window_rankings[w2].get(comp, []))
if not motions_1 or not motions_2:
stability_matrix[i, j, comp - 1] = 0.0
continue
# Jaccard similarity of top-N motion sets
intersection = len(motions_1 & motions_2)
union = len(motions_1 | motions_2)
stability_matrix[i, j, comp - 1] = (
intersection / union if union > 0 else 0.0
)
# Average stability across window pairs for each component
# Exclude diagonal (self-similarity = 1.0)
n_windows = len(window_list)
avg_stability = np.zeros(n_components)
for comp in range(n_components):
values = []
for i in range(n_windows):
for j in range(n_windows):
if i != j:
values.append(stability_matrix[i, j, comp])
avg_stability[comp] = np.mean(values) if values else 0.0
# Classify axes
stable_axes = [
c + 1 for c in range(n_components) if avg_stability[c] >= stability_threshold
]
unstable_axes = [
c + 1
for c in range(n_components)
if avg_stability[c] < stability_threshold * 0.5
]
reordered_axes = [
c + 1
for c in range(n_components)
if stability_threshold * 0.5 <= avg_stability[c] < stability_threshold
]
return {
"stability_matrix": stability_matrix,
"avg_stability": avg_stability,
"stable_axes": stable_axes,
"reordered_axes": reordered_axes,
"unstable_axes": unstable_axes,
"windows": window_list,
}
def compute_semantic_drift(
con: duckdb.DuckDBPyConnection,
stable_axes: List[int],
windows: List[str],
top_n: int = 20,
n_components: int = 10,
) -> Dict:
"""Compute semantic drift for stable axes.
Returns dict with drift_series, inflection_points, example_motions per axis.
"""
if not stable_axes:
return {"drift_series": {}, "inflection_points": {}, "example_motions": {}}
drift_series = {}
inflection_points = {}
example_motions = {}
for axis in stable_axes:
comp_idx = axis - 1
centroids = []
window_centroids = {}
for w in windows:
motion_scores = _load_motion_scores(con, w)
fused = _load_fused_embeddings(con, w)
if not motion_scores or not fused:
continue
# Get top N motions for this axis
top_motions = _get_top_motions_per_component(
motion_scores, top_n, n_components
).get(axis, [])
# Get fused embeddings for top motions
valid_motions = [m for m in top_motions if m in fused]
if not valid_motions:
continue
# Compute centroid
vectors = np.array([fused[m] for m in valid_motions])
centroid = np.mean(vectors, axis=0)
centroids.append(centroid)
window_centroids[w] = {
"centroid": centroid,
"motions": valid_motions,
}
if len(centroids) < 2:
continue
# Compute cosine distance between consecutive centroids
drift_values = []
for i in range(len(centroids) - 1):
a, b = centroids[i], centroids[i + 1]
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
if norm_a == 0 or norm_b == 0:
drift_values.append(0.0)
else:
cosine_sim = np.dot(a, b) / (norm_a * norm_b)
drift_values.append(1.0 - cosine_sim) # cosine distance
drift_series[axis] = drift_values
# Detect inflection points (drift > 2× median)
median_drift = np.median(drift_values) if drift_values else 0
threshold = 2 * median_drift if median_drift > 0 else 0.1
inflections = []
for i, drift in enumerate(drift_values):
if drift > threshold:
w_before = list(window_centroids.keys())[i]
w_after = list(window_centroids.keys())[i + 1]
inflections.append(
{
"window_before": w_before,
"window_after": w_after,
"drift": float(drift),
"median_drift": float(median_drift),
}
)
inflection_points[axis] = inflections
# Collect example motions for inflection points
examples = {}
for inf in inflections:
w_before = inf["window_before"]
w_after = inf["window_after"]
examples[f"{w_before}_to_{w_after}"] = {
"before": window_centroids.get(w_before, {}).get("motions", [])[:3],
"after": window_centroids.get(w_after, {}).get("motions", [])[:3],
}
example_motions[axis] = examples
return {
"drift_series": drift_series,
"inflection_points": inflection_points,
"example_motions": example_motions,
}
def compute_party_voting(
con: duckdb.DuckDBPyConnection,
stable_axes: List[int],
windows: List[str],
) -> Dict:
"""Compute party voting centroids and detect cross-ideological voting.
Returns dict with party_trajectories, cross_voting, examples.
"""
if not stable_axes:
return {"party_trajectories": {}, "cross_voting": {}, "examples": {}}
# Import canonical party sets
sys.path.insert(0, ROOT)
from analysis.config import CANONICAL_LEFT, CANONICAL_RIGHT
party_trajectories = {}
cross_voting = {}
examples = {}
for w in windows:
motion_scores = _load_motion_scores(con, w)
if not motion_scores:
continue
# Get party votes for this window
# Parse window year for date filtering
year = int(w.split("-")[0]) if "-" not in w else int(w.split("-Q")[0])
year_start = f"{year}-01-01"
year_end = f"{year}-12-31"
votes = con.execute(
"""
SELECT party, motion_id, vote
FROM mp_votes
WHERE date >= ? AND date <= ? AND vote = 'voor'
""",
[year_start, year_end],
).fetchall()
if not votes:
continue
# Group motions by party
party_motions = defaultdict(set)
for party, motion_id, vote in votes:
party_motions[party].add(int(motion_id))
# Compute party centroids along stable axes
for party, motion_ids in party_motions.items():
valid_motions = [m for m in motion_ids if str(m) in motion_scores]
if not valid_motions:
continue
vectors = np.array([motion_scores[str(m)] for m in valid_motions])
centroid = np.mean(vectors, axis=0)
if party not in party_trajectories:
party_trajectories[party] = {}
# Only include axes that fit within the centroid vector
valid_axes = [a for a in stable_axes if a - 1 < len(centroid)]
party_trajectories[party][w] = {
"centroid": centroid,
"n_motions": len(valid_motions),
"axes": {axis: float(centroid[axis - 1]) for axis in valid_axes},
}
# Detect cross-ideological voting
# Determine axis polarity: where do canonical right parties score?
axis_polarity = {}
for axis in stable_axes:
comp_idx = axis - 1
right_scores = []
for party in CANONICAL_RIGHT:
if party in party_trajectories and w in party_trajectories[party]:
right_scores.append(
party_trajectories[party][w]["axes"].get(axis, 0)
)
if right_scores:
mean_right = np.mean(right_scores)
axis_polarity[axis] = (
"right_positive" if mean_right > 0 else "right_negative"
)
# Find cross-ideological votes
for party in party_motions:
is_left = party in CANONICAL_LEFT or party in {
"SP",
"PvdA",
"GL",
"GroenLinks",
"GroenLinks-PvdA",
"DENK",
"PvdD",
"Volt",
}
is_right = party in CANONICAL_RIGHT or party in {
"PVV",
"FVD",
"JA21",
"SGP",
"VVD",
"CDA",
"BBB",
}
if not (is_left or is_right):
continue
for motion_id in party_motions[party]:
if motion_id not in motion_scores:
continue
scores = motion_scores[motion_id]
# Check if motion is ideologically opposite
for axis in stable_axes:
comp_idx = axis - 1
score = scores[comp_idx]
polarity = axis_polarity.get(axis, "right_positive")
# If polarity is right_positive, high score = right-wing motion
is_right_motion = (
score > 0 if polarity == "right_positive" else score < 0
)
if is_left and is_right_motion:
if party not in cross_voting:
cross_voting[party] = defaultdict(lambda: defaultdict(list))
cross_voting[party][w][axis].append(motion_id)
elif is_right and not is_right_motion:
if party not in cross_voting:
cross_voting[party] = defaultdict(lambda: defaultdict(list))
cross_voting[party][w][axis].append(motion_id)
# Collect examples
for party, windows_data in cross_voting.items():
for w, axes_data in windows_data.items():
for axis, motion_ids in axes_data.items():
if motion_ids:
key = f"{party}_{w}_axis{axis}"
examples[key] = {
"party": party,
"window": w,
"axis": axis,
"motion_ids": motion_ids[:5], # Top 5 examples
}
return {
"party_trajectories": party_trajectories,
"cross_voting": cross_voting,
"examples": examples,
}
def _generate_report(
output_dir: str,
stability_result: Dict,
drift_result: Dict,
party_result: Dict,
windows: List[str],
top_n: int,
) -> str:
"""Generate markdown report with analysis results."""
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
os.makedirs(output_dir, exist_ok=True)
lines = [
"# Motion Semantic Drift Analysis Report",
"",
f"**Windows analyzed:** {', '.join(windows)}",
f"**Top N motions per axis:** {top_n}",
"",
"---",
"",
]
# Summary
stable = stability_result.get("stable_axes", [])
inflections = drift_result.get("inflection_points", {})
cross_voting = party_result.get("cross_voting", {})
lines.append("## Summary")
lines.append("")
lines.append(f"- **Stable axes:** {stable if stable else 'None'}")
lines.append(
f"- **Axes with inflection points:** {len([k for k, v in inflections.items() if v])}"
)
lines.append(f"- **Parties with cross-ideological voting:** {len(cross_voting)}")
lines.append("")
# Axis Stability
lines.append("## Axis Stability")
lines.append("")
if (
stability_result.get("stability_matrix") is not None
and stability_result["stability_matrix"].size > 0
and stable
):
n_plots = len(stable)
fig, axes = plt.subplots(
1,
n_plots,
figsize=(6 * max(1, n_plots), 4),
)
if n_plots == 1:
axes = [axes]
for i, axis in enumerate(stable):
ax = axes[i]
matrix = stability_result["stability_matrix"][:, :, axis - 1]
im = ax.imshow(matrix, cmap="RdYlGn", vmin=0, vmax=1)
ax.set_xticks(range(len(windows)))
ax.set_xticklabels(windows, rotation=45, ha="right")
ax.set_yticks(range(len(windows)))
ax.set_yticklabels(windows)
ax.set_title(f"Axis {axis} Stability")
fig.colorbar(im, ax=ax, label="Jaccard Similarity")
plt.tight_layout()
fig_path = os.path.join(output_dir, "axis_stability.png")
plt.savefig(fig_path, dpi=150, bbox_inches="tight")
plt.close()
lines.append(f"![Axis Stability Heatmap](axis_stability.png)")
lines.append("")
lines.append(f"**Stable axes (similarity > 0.7):** {stable if stable else 'None'}")
lines.append(f"**Reordered axes:** {stability_result.get('reordered_axes', [])}")
lines.append(f"**Unstable axes:** {stability_result.get('unstable_axes', [])}")
lines.append("")
# Semantic Drift
lines.append("## Semantic Drift")
lines.append("")
drift_series = drift_result.get("drift_series", {})
if drift_series:
fig, axes = plt.subplots(
len(drift_series), 1, figsize=(10, 3 * len(drift_series))
)
if len(drift_series) == 1:
axes = [axes]
for i, (axis, values) in enumerate(drift_series.items()):
ax = axes[i]
x = range(1, len(values) + 1)
ax.plot(x, values, marker="o", label=f"Axis {axis}")
# Mark inflection points
inflections = drift_result.get("inflection_points", {}).get(axis, [])
for inf in inflections:
ax.axvline(
x=list(drift_series.keys()).index(axis) + 1, color="red", alpha=0.3
)
ax.set_xlabel("Window Transition")
ax.set_ylabel("Cosine Distance")
ax.set_title(f"Axis {axis} Semantic Drift")
ax.legend()
plt.tight_layout()
fig_path = os.path.join(output_dir, "semantic_drift.png")
plt.savefig(fig_path, dpi=150, bbox_inches="tight")
plt.close()
lines.append(f"![Semantic Drift Timeline](semantic_drift.png)")
lines.append("")
# Inflection point details
for axis, inflections in drift_result.get("inflection_points", {}).items():
if inflections:
lines.append(f"### Axis {axis} Inflection Points")
lines.append("")
for inf in inflections:
lines.append(
f"- **{inf['window_before']}{inf['window_after']}**: drift={inf['drift']:.4f} (median={inf['median_drift']:.4f})"
)
lines.append("")
else:
lines.append("No drift data available (no stable axes or insufficient data).")
lines.append("")
# Party Voting Analysis
lines.append("## Party Voting Analysis")
lines.append("")
party_trajectories = party_result.get("party_trajectories", {})
if party_trajectories:
lines.append(f"**Parties tracked:** {len(party_trajectories)}")
lines.append("")
# Party trajectory plot
fig, ax = plt.subplots(figsize=(10, 6))
# Get stable axes from party trajectory data
sample_axes = next(iter(party_trajectories.values()))
sample_window = next(iter(sample_axes.values()))
available_axes = sorted(sample_window.get("axes", {}).keys())
for party, trajectory in party_trajectories.items():
sorted_windows = sorted(trajectory.keys())
axes_scores = [trajectory[w]["axes"] for w in sorted_windows]
# Plot first two available axes
if len(available_axes) >= 2:
x = [
axes_scores[i].get(available_axes[0], 0)
for i in range(len(sorted_windows))
]
y = [
axes_scores[i].get(available_axes[1], 0)
for i in range(len(sorted_windows))
]
ax.plot(x, y, marker="o", label=party)
for j, w in enumerate(sorted_windows):
ax.annotate(w, (x[j], y[j]), fontsize=8, alpha=0.7)
ax.set_xlabel(f"Axis {available_axes[0]}")
ax.set_ylabel(f"Axis {available_axes[1]}")
ax.set_title("Party Voting Trajectories")
ax.legend(bbox_to_anchor=(1.05, 1), loc="upper left")
ax.axhline(y=0, color="gray", alpha=0.3)
ax.axvline(x=0, color="gray", alpha=0.3)
plt.tight_layout()
fig_path = os.path.join(output_dir, "party_trajectories.png")
plt.savefig(fig_path, dpi=150, bbox_inches="tight")
plt.close()
lines.append(f"![Party Trajectories](party_trajectories.png)")
lines.append("")
# Cross-ideological voting
cross_voting = party_result.get("cross_voting", {})
if cross_voting:
lines.append("### Cross-Ideological Voting")
lines.append("")
lines.append("| Party | Window | Axis | Example Motion IDs |")
lines.append("|-------|--------|------|-------------------|")
for party, windows_data in sorted(cross_voting.items()):
for w, axes_data in sorted(windows_data.items()):
for axis, motion_ids in sorted(axes_data.items()):
lines.append(
f"| {party} | {w} | {axis} | {', '.join(str(m) for m in motion_ids[:3])} |"
)
lines.append("")
else:
lines.append("No cross-ideological voting detected.")
lines.append("")
# Methodology
lines.append("## Methodology")
lines.append("")
lines.append(
"- **Axis stability:** Jaccard similarity of top-N motion rankings per component across windows"
)
lines.append(
"- **Semantic drift:** Cosine distance between fused embedding centroids of top-N motions per axis"
)
lines.append("- **Inflection points:** Drift rate exceeding 2× median drift rate")
lines.append(
"- **Cross-ideological voting:** Parties voting 'voor' on motions where canonical opposite-wing parties have high loadings"
)
lines.append("")
report_path = os.path.join(output_dir, "report.md")
with open(report_path, "w") as f:
f.write("\n".join(lines))
return report_path
def main(argv: Optional[List[str]] = None) -> int:
p = argparse.ArgumentParser(
description="Analyze semantic drift of SVD axes over time"
)
p.add_argument("--db", default="data/motions.db", help="Path to motions database")
p.add_argument(
"--windows",
nargs="+",
default=None,
help="Specific windows to analyze (default: all annual windows)",
)
p.add_argument(
"--top-n",
type=int,
default=20,
help="Number of top motions per axis (default: 20)",
)
p.add_argument(
"--output",
default="reports/drift",
help="Output directory for report and charts",
)
p.add_argument(
"--stability-threshold",
type=float,
default=0.7,
help="Similarity threshold for axis stability (default: 0.7)",
)
args = p.parse_args(argv)
if not os.path.exists(args.db):
logger.error("Database not found: %s", args.db)
return 1
con = duckdb.connect(database=args.db, read_only=True)
try:
# Validate schema
missing = _validate_schema(con)
if missing:
logger.error("Missing required tables: %s", missing)
return 1
# Determine windows
if args.windows:
windows = args.windows
else:
windows = _get_annual_windows(con)
logger.info("Found %d annual windows: %s", len(windows), windows)
if len(windows) < 2:
logger.warning(
"Need at least 2 windows for drift analysis. Found: %s", windows
)
return 1
# Run analysis
logger.info("Computing axis stability...")
stability_result = compute_axis_stability(
con, windows, args.top_n, stability_threshold=args.stability_threshold
)
logger.info("Stable axes: %s", stability_result["stable_axes"])
logger.info("Computing semantic drift...")
drift_result = compute_semantic_drift(
con, stability_result["stable_axes"], windows, args.top_n
)
logger.info("Computing party voting analysis...")
party_result = compute_party_voting(
con, stability_result["stable_axes"], windows
)
# Generate report
logger.info("Generating report to %s...", args.output)
report_path = _generate_report(
args.output,
stability_result,
drift_result,
party_result,
windows,
args.top_n,
)
logger.info("Report generated: %s", report_path)
return 0
finally:
con.close()
if __name__ == "__main__":
raise SystemExit(main())