You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
motief/scripts/motion_drift.py

1374 lines
47 KiB

"""motion_drift.py — Analyze semantic drift of SVD axes over time.
Tracks how motion semantics evolve across annual windows (2016-2024):
- Axis stability: which SVD components are comparable across time
- Semantic drift: how top motions on each axis change semantically
- Party voting: how parties move along axes and cross-ideological voting
Usage:
uv run python scripts/motion_drift.py --db data/motions.db --output reports/drift
uv run python scripts/motion_drift.py --db data/motions.db --windows 2019 2020 2021 --top-n 30
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import sys
from collections import defaultdict
from typing import Dict, List, Optional, Tuple
import duckdb
import numpy as np
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT not in sys.path:
sys.path.insert(0, ROOT)
logger = logging.getLogger("motion_drift")
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
def _validate_schema(con: duckdb.DuckDBPyConnection) -> List[str]:
"""Check that required tables exist. Returns list of missing tables."""
required = {"svd_vectors", "fused_embeddings", "mp_votes", "motions"}
tables = {r[0] for r in con.execute("SHOW TABLES").fetchall()}
return list(required - tables)
def _load_vectors(
con: duckdb.DuckDBPyConnection, table: str, window_id: str, entity_type: str
) -> Dict[str, np.ndarray]:
"""Load vectors from a table for a given window and entity type.
Returns {entity_id: vector_array}.
"""
rows = con.execute(
f"SELECT entity_id, vector FROM {table} WHERE window_id = ? AND entity_type = ?",
[window_id, entity_type],
).fetchall()
result = {}
for entity_id, raw_vec in rows:
if isinstance(raw_vec, str):
vec = json.loads(raw_vec)
elif isinstance(raw_vec, (bytes, bytearray)):
vec = json.loads(raw_vec.decode())
elif isinstance(raw_vec, list):
vec = raw_vec
else:
vec = list(raw_vec)
result[entity_id] = np.array([float(v) if v is not None else 0.0 for v in vec])
return result
def _load_motion_scores(
con: duckdb.DuckDBPyConnection, window_id: str
) -> Dict[int, np.ndarray]:
"""Load motion SVD scores for a window. Returns {motion_id: score_array}."""
return _load_vectors(con, "svd_vectors", window_id, "motion")
def _load_fused_embeddings(
con: duckdb.DuckDBPyConnection, window_id: str
) -> Dict[str, np.ndarray]:
"""Load fused embeddings for a window. Returns {str(motion_id): embedding_array}."""
rows = con.execute(
"SELECT motion_id, vector FROM fused_embeddings WHERE window_id = ?",
[window_id],
).fetchall()
result = {}
for motion_id, raw_vec in rows:
if isinstance(raw_vec, str):
vec = json.loads(raw_vec)
elif isinstance(raw_vec, (bytes, bytearray)):
vec = json.loads(raw_vec.decode())
elif isinstance(raw_vec, list):
vec = raw_vec
else:
vec = list(raw_vec)
result[str(motion_id)] = np.array(
[float(v) if v is not None else 0.0 for v in vec]
)
return result
def _get_annual_windows(con: duckdb.DuckDBPyConnection) -> List[str]:
"""Get list of annual windows (non-quarterly) that have fused embeddings, sorted by year."""
rows = con.execute(
"""
SELECT DISTINCT f.window_id
FROM fused_embeddings f
JOIN svd_vectors s ON f.window_id = s.window_id AND s.entity_type = 'motion'
WHERE f.window_id NOT LIKE '%-Q%'
ORDER BY f.window_id
"""
).fetchall()
return [r[0] for r in rows]
def _get_top_motions_per_component(
motion_scores: Dict[int, np.ndarray], top_n: int, n_components: int = 10
) -> Dict[int, List[int]]:
"""Get top N motions per component by absolute loading.
Returns {component_idx: [motion_ids]}.
"""
# Build matrix: motions × components
motion_ids = list(motion_scores.keys())
if not motion_ids:
return {}
matrix = np.array([motion_scores[mid] for mid in motion_ids])
# Pad or truncate to n_components
if matrix.shape[1] < n_components:
matrix = np.pad(matrix, ((0, 0), (0, n_components - matrix.shape[1])))
matrix = matrix[:, :n_components]
result = {}
for comp_idx in range(n_components):
abs_scores = np.abs(matrix[:, comp_idx])
top_indices = np.argsort(abs_scores)[-top_n:][::-1]
result[comp_idx + 1] = [motion_ids[i] for i in top_indices]
return result
def compute_axis_stability(
con: duckdb.DuckDBPyConnection,
windows: List[str],
top_n: int = 20,
n_components: int = 10,
stability_threshold: float = 0.7,
regression_alpha: float = 1.0,
) -> Dict:
"""Compute axis stability across windows using Ridge regression weights.
For each SVD axis and each window, fits Ridge regression:
SVD_score ~ fused_embedding
The weight vector (2610 dims) is the semantic signature of the axis.
Stability = cosine similarity of weight vectors across window pairs.
Falls back to party-based sign consistency for windows with < 50 motions.
Returns dict with stability_matrix, stable_axes, reordered_axes, unstable_axes,
and weight_vectors for downstream interpretation.
"""
from sklearn.linear_model import Lasso
from sklearn.preprocessing import StandardScaler
# Load data per window
window_data: Dict[str, Tuple[np.ndarray, np.ndarray]] = {}
for w in windows:
motion_scores = _load_motion_scores(con, w)
fused = _load_fused_embeddings(con, w)
if not motion_scores or not fused:
continue
common = [m for m in motion_scores if m in fused]
if len(common) < 50:
continue
dim = min(len(fused[m]) for m in common)
X = np.array([fused[m][:dim] for m in common])
Y = np.array([motion_scores[m][:n_components] for m in common])
window_data[w] = (X, Y)
if len(window_data) < 2:
return _compute_stability_fallback(
con, windows, n_components, stability_threshold
)
# Fit Lasso regression per axis per window
# Lasso (L1) produces sparse weight vectors, concentrating on the
# most important embedding dimensions for each axis
weight_vectors: Dict[str, Dict[int, np.ndarray]] = {}
window_list = sorted(window_data.keys())
for w in window_list:
X, Y = window_data[w]
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
weights = {}
for comp_idx in range(n_components):
y = Y[:, comp_idx]
model = Lasso(alpha=regression_alpha, max_iter=5000)
model.fit(X_scaled, y)
weights[comp_idx + 1] = model.coef_
weight_vectors[w] = weights
# Compute pairwise stability of weight vectors per component
# Use both cosine similarity and Jaccard of top-K dimensions
top_k = 100 # Compare top 100 dimensions by absolute weight
stability_matrix = np.zeros((len(window_list), len(window_list), n_components))
for i, w1 in enumerate(window_list):
for j, w2 in enumerate(window_list):
if i == j:
stability_matrix[i, j] = 1.0
continue
for comp in range(1, n_components + 1):
if comp not in weight_vectors[w1] or comp not in weight_vectors[w2]:
stability_matrix[i, j, comp - 1] = 0.0
continue
a = weight_vectors[w1][comp]
b = weight_vectors[w2][comp]
# Align dimensions
dim = min(len(a), len(b))
a = a[:dim]
b = b[:dim]
# Method 1: Cosine similarity of full weight vectors
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
if norm_a == 0 or norm_b == 0:
cosine_sim = 0.0
else:
cosine_sim = np.dot(a, b) / (norm_a * norm_b)
# Method 2: Jaccard similarity of top-K dimensions
top_a = set(np.argsort(np.abs(a))[-top_k:])
top_b = set(np.argsort(np.abs(b))[-top_k:])
jaccard = (
len(top_a & top_b) / len(top_a | top_b) if top_a | top_b else 0.0
)
# Use the maximum of both methods (more robust)
stability_matrix[i, j, comp - 1] = max(cosine_sim, jaccard)
# Average stability across window pairs for each component
n_windows = len(window_list)
avg_stability = np.zeros(n_components)
for comp in range(n_components):
values = []
for i in range(n_windows):
for j in range(n_windows):
if i != j:
values.append(stability_matrix[i, j, comp])
avg_stability[comp] = np.mean(values) if values else 0.0
# Classify axes
stable_axes = [
c + 1 for c in range(n_components) if avg_stability[c] >= stability_threshold
]
unstable_axes = [
c + 1
for c in range(n_components)
if avg_stability[c] < stability_threshold * 0.5
]
reordered_axes = [
c + 1
for c in range(n_components)
if stability_threshold * 0.5 <= avg_stability[c] < stability_threshold
]
return {
"stability_matrix": stability_matrix,
"avg_stability": avg_stability,
"stable_axes": stable_axes,
"reordered_axes": reordered_axes,
"unstable_axes": unstable_axes,
"windows": window_list,
"weight_vectors": weight_vectors,
}
def _compute_stability_fallback(
con: duckdb.DuckDBPyConnection,
windows: List[str],
n_components: int,
stability_threshold: float,
) -> Dict:
"""Fallback: use sign consistency of canonical party scores for stability.
Computes mean SVD scores per party per window, then checks whether
canonical right-left score differences have consistent signs across windows.
"""
from analysis.config import CANONICAL_LEFT, CANONICAL_RIGHT
party_axes: Dict[str, Dict[int, float]] = {}
for w in windows:
# Get MP vectors with party mapping
try:
rows = con.execute(
"""
SELECT m.party, s.vector
FROM svd_vectors s
JOIN mp_metadata m ON s.entity_id = m.mp_name
WHERE s.window_id = ? AND s.entity_type = 'mp' AND m.party IS NOT NULL
""",
[w],
).fetchall()
except Exception:
# mp_metadata may not exist in test DBs
continue
party_vectors = {}
for party, raw_vec in rows:
if isinstance(raw_vec, str):
vec = json.loads(raw_vec)
else:
vec = list(raw_vec)
party = party.strip()
if party not in party_vectors:
party_vectors[party] = []
party_vectors[party].append(
np.array([float(v) if v is not None else 0.0 for v in vec])
)
# Compute mean score per component for canonical parties
comp_means = {}
for comp in range(n_components):
right_vals = []
left_vals = []
for party, vectors in party_vectors.items():
mean_score = np.mean([v[comp] for v in vectors])
if party in CANONICAL_RIGHT:
right_vals.append(mean_score)
elif party in CANONICAL_LEFT:
left_vals.append(mean_score)
if right_vals and left_vals:
comp_means[comp + 1] = np.mean(right_vals) - np.mean(left_vals)
if comp_means:
party_axes[w] = comp_means
if len(party_axes) < 2:
return {
"stability_matrix": np.array([]),
"avg_stability": np.zeros(n_components),
"stable_axes": [],
"reordered_axes": [],
"unstable_axes": list(range(1, n_components + 1)),
"windows": list(party_axes.keys()),
}
# Compute sign consistency across windows
window_list = sorted(party_axes.keys())
stability_matrix = np.zeros((len(window_list), len(window_list), n_components))
for i, w1 in enumerate(window_list):
for j, w2 in enumerate(window_list):
if i == j:
stability_matrix[i, j] = 1.0
continue
for comp in range(1, n_components + 1):
s1 = np.sign(party_axes[w1].get(comp, 0))
s2 = np.sign(party_axes[w2].get(comp, 0))
stability_matrix[i, j, comp - 1] = 1.0 if s1 == s2 and s1 != 0 else 0.0
n_windows = len(window_list)
avg_stability = np.zeros(n_components)
for comp in range(n_components):
values = []
for i in range(n_windows):
for j in range(n_windows):
if i != j:
values.append(stability_matrix[i, j, comp])
avg_stability[comp] = np.mean(values) if values else 0.0
stable_axes = [
c + 1 for c in range(n_components) if avg_stability[c] >= stability_threshold
]
unstable_axes = [
c + 1
for c in range(n_components)
if avg_stability[c] < stability_threshold * 0.5
]
reordered_axes = [
c + 1
for c in range(n_components)
if stability_threshold * 0.5 <= avg_stability[c] < stability_threshold
]
return {
"stability_matrix": stability_matrix,
"avg_stability": avg_stability,
"stable_axes": stable_axes,
"reordered_axes": reordered_axes,
"unstable_axes": unstable_axes,
"windows": window_list,
}
# Compute sign consistency across windows
window_list = sorted(party_axes.keys())
stability_matrix = np.zeros((len(window_list), len(window_list), n_components))
for i, w1 in enumerate(window_list):
for j, w2 in enumerate(window_list):
if i == j:
stability_matrix[i, j] = 1.0
continue
for comp in range(1, n_components + 1):
s1 = np.sign(party_axes[w1].get(comp, 0))
s2 = np.sign(party_axes[w2].get(comp, 0))
stability_matrix[i, j, comp - 1] = 1.0 if s1 == s2 and s1 != 0 else 0.0
n_windows = len(window_list)
avg_stability = np.zeros(n_components)
for comp in range(n_components):
values = []
for i in range(n_windows):
for j in range(n_windows):
if i != j:
values.append(stability_matrix[i, j, comp])
avg_stability[comp] = np.mean(values) if values else 0.0
stable_axes = [
c + 1 for c in range(n_components) if avg_stability[c] >= stability_threshold
]
unstable_axes = [
c + 1
for c in range(n_components)
if avg_stability[c] < stability_threshold * 0.5
]
reordered_axes = [
c + 1
for c in range(n_components)
if stability_threshold * 0.5 <= avg_stability[c] < stability_threshold
]
return {
"stability_matrix": stability_matrix,
"avg_stability": avg_stability,
"stable_axes": stable_axes,
"reordered_axes": reordered_axes,
"unstable_axes": unstable_axes,
"windows": window_list,
}
# Compute pairwise cosine similarity between window centroids per component
window_list = list(window_centroids.keys())
stability_matrix = np.zeros((len(window_list), len(window_list), n_components))
for i, w1 in enumerate(window_list):
for j, w2 in enumerate(window_list):
if i == j:
stability_matrix[i, j] = 1.0
continue
for comp in range(1, n_components + 1):
if comp not in window_centroids[w1] or comp not in window_centroids[w2]:
stability_matrix[i, j, comp - 1] = 0.0
continue
a = window_centroids[w1][comp]
b = window_centroids[w2][comp]
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
if norm_a == 0 or norm_b == 0:
stability_matrix[i, j, comp - 1] = 0.0
else:
stability_matrix[i, j, comp - 1] = np.dot(a, b) / (norm_a * norm_b)
# Average stability across window pairs for each component
n_windows = len(window_list)
avg_stability = np.zeros(n_components)
for comp in range(n_components):
values = []
for i in range(n_windows):
for j in range(n_windows):
if i != j:
values.append(stability_matrix[i, j, comp])
avg_stability[comp] = np.mean(values) if values else 0.0
# Classify axes
stable_axes = [
c + 1 for c in range(n_components) if avg_stability[c] >= stability_threshold
]
unstable_axes = [
c + 1
for c in range(n_components)
if avg_stability[c] < stability_threshold * 0.5
]
reordered_axes = [
c + 1
for c in range(n_components)
if stability_threshold * 0.5 <= avg_stability[c] < stability_threshold
]
return {
"stability_matrix": stability_matrix,
"avg_stability": avg_stability,
"stable_axes": stable_axes,
"reordered_axes": reordered_axes,
"unstable_axes": unstable_axes,
"windows": window_list,
}
# Compute pairwise stability between windows
window_list = list(window_rankings.keys())
stability_matrix = np.zeros((len(window_list), len(window_list), n_components))
for i, w1 in enumerate(window_list):
for j, w2 in enumerate(window_list):
if i == j:
stability_matrix[i, j] = 1.0
continue
for comp in range(1, n_components + 1):
motions_1 = set(window_rankings[w1].get(comp, []))
motions_2 = set(window_rankings[w2].get(comp, []))
if not motions_1 or not motions_2:
stability_matrix[i, j, comp - 1] = 0.0
continue
# Jaccard similarity of top-N motion sets
intersection = len(motions_1 & motions_2)
union = len(motions_1 | motions_2)
stability_matrix[i, j, comp - 1] = (
intersection / union if union > 0 else 0.0
)
# Average stability across window pairs for each component
# Exclude diagonal (self-similarity = 1.0)
n_windows = len(window_list)
avg_stability = np.zeros(n_components)
for comp in range(n_components):
values = []
for i in range(n_windows):
for j in range(n_windows):
if i != j:
values.append(stability_matrix[i, j, comp])
avg_stability[comp] = np.mean(values) if values else 0.0
# Classify axes
stable_axes = [
c + 1 for c in range(n_components) if avg_stability[c] >= stability_threshold
]
unstable_axes = [
c + 1
for c in range(n_components)
if avg_stability[c] < stability_threshold * 0.5
]
reordered_axes = [
c + 1
for c in range(n_components)
if stability_threshold * 0.5 <= avg_stability[c] < stability_threshold
]
return {
"stability_matrix": stability_matrix,
"avg_stability": avg_stability,
"stable_axes": stable_axes,
"reordered_axes": reordered_axes,
"unstable_axes": unstable_axes,
"windows": window_list,
}
def compute_overtone_shift(
con: duckdb.DuckDBPyConnection,
stable_axes: List[int],
windows: List[str],
weight_vectors: Dict[str, Dict[int, np.ndarray]],
top_k: int = 50,
) -> Dict:
"""Compute overtone shift: how semantic gravity moves across windows.
Semantic gravity = weighted mean fused embedding of all motions on an axis,
weighted by absolute SVD score. Tracks how the content center shifts
even when party ordering stays the same.
Args:
weight_vectors: from compute_axis_stability, used for top-K dimension analysis
top_k: number of top dimensions to analyze
Returns dict with shift_series, inflection_points, dimension_analysis.
"""
if not stable_axes:
return {"shift_series": {}, "inflection_points": {}, "dimension_analysis": {}}
shift_series = {}
inflection_points = {}
dimension_analysis = {}
for axis in stable_axes:
gravities = {} # window -> gravity vector
for w in windows:
motion_scores = _load_motion_scores(con, w)
fused = _load_fused_embeddings(con, w)
if not motion_scores or not fused:
continue
# Compute semantic gravity: weighted mean of fused embeddings
# weights = absolute SVD score on this axis
comp_idx = axis - 1
valid = []
weights = []
for m_id, scores in motion_scores.items():
if m_id in fused and comp_idx < len(scores):
valid.append(fused[m_id])
weights.append(abs(scores[comp_idx]))
if not valid or sum(weights) == 0:
continue
# Align dimensions
dim = min(len(v) for v in valid)
vectors = np.array([v[:dim] for v in valid])
w_arr = np.array(weights[: len(vectors)])
gravity = np.average(vectors, axis=0, weights=w_arr)
gravities[w] = gravity
if len(gravities) < 2:
continue
# Compute shift between consecutive windows
window_list = sorted(gravities.keys())
shifts = []
for i in range(len(window_list) - 1):
a = gravities[window_list[i]]
b = gravities[window_list[i + 1]]
# Align dimensions
dim = min(len(a), len(b))
a = a[:dim]
b = b[:dim]
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
if norm_a == 0 or norm_b == 0:
shifts.append(0.0)
else:
cosine_sim = np.dot(a, b) / (norm_a * norm_b)
shifts.append(1.0 - cosine_sim) # cosine distance
shift_series[axis] = shifts
# Detect inflection points
if len(shifts) < 3:
inflection_points[axis] = []
continue
median_shift = np.median(shifts)
threshold = 2 * median_shift if median_shift > 0 else 0.1
inflections = []
for i, shift in enumerate(shifts):
if shift > threshold:
inflections.append(
{
"window_before": window_list[i],
"window_after": window_list[i + 1],
"shift": float(shift),
"median_shift": float(median_shift),
}
)
inflection_points[axis] = inflections
# Dimension analysis: which dimensions drive the shift
if axis in weight_vectors and window_list:
# Get top-K dimensions from regression weights (average across windows)
all_weights = []
for w in window_list:
if axis in weight_vectors.get(w, {}):
all_weights.append(weight_vectors[w][axis])
if all_weights:
avg_weights = np.mean(
[
np.abs(w[: min(len(x) for x in all_weights)])
for w in all_weights
],
axis=0,
)
top_dims = np.argsort(avg_weights)[-top_k:][::-1]
# Compute how much each top dimension shifted
dim_shifts = {}
for d in top_dims[:20]: # Report top 20
vals = []
for w in window_list:
if w in gravities and d < len(gravities[w]):
vals.append(gravities[w][d])
if len(vals) >= 2:
dim_shifts[int(d)] = {
"mean": float(np.mean(vals)),
"std": float(np.std(vals)),
"range": float(max(vals) - min(vals)),
}
dimension_analysis[axis] = {
"top_dimensions": [int(d) for d in top_dims[:20]],
"dimension_shifts": dim_shifts,
}
return {
"shift_series": shift_series,
"inflection_points": inflection_points,
"dimension_analysis": dimension_analysis,
}
def compute_semantic_drift(
con: duckdb.DuckDBPyConnection,
stable_axes: List[int],
windows: List[str],
top_n: int = 20,
n_components: int = 10,
) -> Dict:
"""Compute semantic drift for stable axes.
Returns dict with drift_series, inflection_points, example_motions per axis.
"""
if not stable_axes:
return {"drift_series": {}, "inflection_points": {}, "example_motions": {}}
drift_series = {}
inflection_points = {}
example_motions = {}
for axis in stable_axes:
comp_idx = axis - 1
centroids = []
window_centroids = {}
for w in windows:
motion_scores = _load_motion_scores(con, w)
fused = _load_fused_embeddings(con, w)
if not motion_scores or not fused:
continue
# Get top N motions for this axis
top_motions = _get_top_motions_per_component(
motion_scores, top_n, n_components
).get(axis, [])
# Get fused embeddings for top motions
valid_motions = [m for m in top_motions if m in fused]
if not valid_motions:
continue
# Compute centroid (align dimensions)
vectors = [fused[m] for m in valid_motions]
dim = min(len(v) for v in vectors)
vectors = np.array([v[:dim] for v in vectors])
centroid = np.mean(vectors, axis=0)
centroids.append(centroid)
window_centroids[w] = {
"centroid": centroid,
"motions": valid_motions,
}
if len(centroids) < 2:
continue
# Compute cosine distance between consecutive centroids
drift_values = []
for i in range(len(centroids) - 1):
a, b = centroids[i], centroids[i + 1]
# Align dimensions
dim = min(len(a), len(b))
a = a[:dim]
b = b[:dim]
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
if norm_a == 0 or norm_b == 0:
drift_values.append(0.0)
else:
cosine_sim = np.dot(a, b) / (norm_a * norm_b)
drift_values.append(1.0 - cosine_sim) # cosine distance
drift_series[axis] = drift_values
# Detect inflection points (drift > 2× median)
median_drift = np.median(drift_values) if drift_values else 0
threshold = 2 * median_drift if median_drift > 0 else 0.1
inflections = []
for i, drift in enumerate(drift_values):
if drift > threshold:
w_before = list(window_centroids.keys())[i]
w_after = list(window_centroids.keys())[i + 1]
inflections.append(
{
"window_before": w_before,
"window_after": w_after,
"drift": float(drift),
"median_drift": float(median_drift),
}
)
inflection_points[axis] = inflections
# Collect example motions for inflection points
examples = {}
for inf in inflections:
w_before = inf["window_before"]
w_after = inf["window_after"]
examples[f"{w_before}_to_{w_after}"] = {
"before": window_centroids.get(w_before, {}).get("motions", [])[:3],
"after": window_centroids.get(w_after, {}).get("motions", [])[:3],
}
example_motions[axis] = examples
return {
"drift_series": drift_series,
"inflection_points": inflection_points,
"example_motions": example_motions,
}
def compute_party_voting(
con: duckdb.DuckDBPyConnection,
stable_axes: List[int],
windows: List[str],
) -> Dict:
"""Compute party voting centroids and detect cross-ideological voting.
Returns dict with party_trajectories, cross_voting, examples.
"""
if not stable_axes:
return {"party_trajectories": {}, "cross_voting": {}, "examples": {}}
# Import canonical party sets
sys.path.insert(0, ROOT)
from analysis.config import CANONICAL_LEFT, CANONICAL_RIGHT
party_trajectories = {}
cross_voting = {}
examples = {}
for w in windows:
motion_scores = _load_motion_scores(con, w)
if not motion_scores:
continue
# Get party votes for this window
# Parse window year for date filtering
year = int(w.split("-")[0]) if "-" not in w else int(w.split("-Q")[0])
year_start = f"{year}-01-01"
year_end = f"{year}-12-31"
votes = con.execute(
"""
SELECT party, motion_id, vote
FROM mp_votes
WHERE date >= ? AND date <= ? AND vote = 'voor'
""",
[year_start, year_end],
).fetchall()
if not votes:
continue
# Group motions by party
party_motions = defaultdict(set)
for party, motion_id, vote in votes:
party_motions[party].add(int(motion_id))
# Compute party centroids along stable axes
for party, motion_ids in party_motions.items():
valid_motions = [m for m in motion_ids if str(m) in motion_scores]
if not valid_motions:
continue
vectors = np.array([motion_scores[str(m)] for m in valid_motions])
centroid = np.mean(vectors, axis=0)
if party not in party_trajectories:
party_trajectories[party] = {}
# Only include axes that fit within the centroid vector
valid_axes = [a for a in stable_axes if a - 1 < len(centroid)]
party_trajectories[party][w] = {
"centroid": centroid,
"n_motions": len(valid_motions),
"axes": {axis: float(centroid[axis - 1]) for axis in valid_axes},
}
# Detect cross-ideological voting
# Determine axis polarity: where do canonical right parties score?
axis_polarity = {}
for axis in stable_axes:
comp_idx = axis - 1
right_scores = []
for party in CANONICAL_RIGHT:
if party in party_trajectories and w in party_trajectories[party]:
right_scores.append(
party_trajectories[party][w]["axes"].get(axis, 0)
)
if right_scores:
mean_right = np.mean(right_scores)
axis_polarity[axis] = (
"right_positive" if mean_right > 0 else "right_negative"
)
# Find cross-ideological votes
for party in party_motions:
is_left = party in CANONICAL_LEFT or party in {
"SP",
"PvdA",
"GL",
"GroenLinks",
"GroenLinks-PvdA",
"DENK",
"PvdD",
"Volt",
}
is_right = party in CANONICAL_RIGHT or party in {
"PVV",
"FVD",
"JA21",
"SGP",
"VVD",
"CDA",
"BBB",
}
if not (is_left or is_right):
continue
for motion_id in party_motions[party]:
if motion_id not in motion_scores:
continue
scores = motion_scores[motion_id]
# Check if motion is ideologically opposite
for axis in stable_axes:
comp_idx = axis - 1
score = scores[comp_idx]
polarity = axis_polarity.get(axis, "right_positive")
# If polarity is right_positive, high score = right-wing motion
is_right_motion = (
score > 0 if polarity == "right_positive" else score < 0
)
if is_left and is_right_motion:
if party not in cross_voting:
cross_voting[party] = defaultdict(lambda: defaultdict(list))
cross_voting[party][w][axis].append(motion_id)
elif is_right and not is_right_motion:
if party not in cross_voting:
cross_voting[party] = defaultdict(lambda: defaultdict(list))
cross_voting[party][w][axis].append(motion_id)
# Collect examples
for party, windows_data in cross_voting.items():
for w, axes_data in windows_data.items():
for axis, motion_ids in axes_data.items():
if motion_ids:
key = f"{party}_{w}_axis{axis}"
examples[key] = {
"party": party,
"window": w,
"axis": axis,
"motion_ids": motion_ids[:5], # Top 5 examples
}
return {
"party_trajectories": party_trajectories,
"cross_voting": cross_voting,
"examples": examples,
}
def _generate_report(
output_dir: str,
stability_result: Dict,
drift_result: Dict,
party_result: Dict,
windows: List[str],
top_n: int,
overtone_result: Optional[Dict] = None,
) -> str:
"""Generate markdown report with analysis results."""
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
os.makedirs(output_dir, exist_ok=True)
lines = [
"# Motion Semantic Drift Analysis Report",
"",
f"**Windows analyzed:** {', '.join(windows)}",
f"**Top N motions per axis:** {top_n}",
"",
"---",
"",
]
# Summary
stable = stability_result.get("stable_axes", [])
inflections = drift_result.get("inflection_points", {})
cross_voting = party_result.get("cross_voting", {})
lines.append("## Summary")
lines.append("")
lines.append(f"- **Stable axes:** {stable if stable else 'None'}")
lines.append(
f"- **Axes with inflection points:** {len([k for k, v in inflections.items() if v])}"
)
lines.append(f"- **Parties with cross-ideological voting:** {len(cross_voting)}")
lines.append("")
# Axis Stability
lines.append("## Axis Stability")
lines.append("")
if (
stability_result.get("stability_matrix") is not None
and stability_result["stability_matrix"].size > 0
and stable
):
n_plots = len(stable)
fig, axes = plt.subplots(
1,
n_plots,
figsize=(6 * max(1, n_plots), 4),
)
if n_plots == 1:
axes = [axes]
for i, axis in enumerate(stable):
ax = axes[i]
matrix = stability_result["stability_matrix"][:, :, axis - 1]
im = ax.imshow(matrix, cmap="RdYlGn", vmin=0, vmax=1)
ax.set_xticks(range(len(windows)))
ax.set_xticklabels(windows, rotation=45, ha="right")
ax.set_yticks(range(len(windows)))
ax.set_yticklabels(windows)
ax.set_title(f"Axis {axis} Stability")
fig.colorbar(im, ax=ax, label="Jaccard Similarity")
plt.tight_layout()
fig_path = os.path.join(output_dir, "axis_stability.png")
plt.savefig(fig_path, dpi=150, bbox_inches="tight")
plt.close()
lines.append(f"![Axis Stability Heatmap](axis_stability.png)")
lines.append("")
lines.append(f"**Stable axes (similarity > 0.7):** {stable if stable else 'None'}")
lines.append(f"**Reordered axes:** {stability_result.get('reordered_axes', [])}")
lines.append(f"**Unstable axes:** {stability_result.get('unstable_axes', [])}")
lines.append("")
# Semantic Drift
lines.append("## Semantic Drift")
lines.append("")
drift_series = drift_result.get("drift_series", {})
if drift_series:
fig, axes = plt.subplots(
len(drift_series), 1, figsize=(10, 3 * len(drift_series))
)
if len(drift_series) == 1:
axes = [axes]
for i, (axis, values) in enumerate(drift_series.items()):
ax = axes[i]
x = range(1, len(values) + 1)
ax.plot(x, values, marker="o", label=f"Axis {axis}")
# Mark inflection points
inflections = drift_result.get("inflection_points", {}).get(axis, [])
for inf in inflections:
ax.axvline(
x=list(drift_series.keys()).index(axis) + 1, color="red", alpha=0.3
)
ax.set_xlabel("Window Transition")
ax.set_ylabel("Cosine Distance")
ax.set_title(f"Axis {axis} Semantic Drift")
ax.legend()
plt.tight_layout()
fig_path = os.path.join(output_dir, "semantic_drift.png")
plt.savefig(fig_path, dpi=150, bbox_inches="tight")
plt.close()
lines.append(f"![Semantic Drift Timeline](semantic_drift.png)")
lines.append("")
# Inflection point details
for axis, inflections in drift_result.get("inflection_points", {}).items():
if inflections:
lines.append(f"### Axis {axis} Inflection Points")
lines.append("")
for inf in inflections:
lines.append(
f"- **{inf['window_before']}{inf['window_after']}**: drift={inf['drift']:.4f} (median={inf['median_drift']:.4f})"
)
lines.append("")
else:
lines.append("No drift data available (no stable axes or insufficient data).")
lines.append("")
# Party Voting Analysis
lines.append("## Party Voting Analysis")
lines.append("")
party_trajectories = party_result.get("party_trajectories", {})
if party_trajectories:
lines.append(f"**Parties tracked:** {len(party_trajectories)}")
lines.append("")
# Party trajectory plot
fig, ax = plt.subplots(figsize=(10, 6))
# Get stable axes from party trajectory data
sample_axes = next(iter(party_trajectories.values()))
sample_window = next(iter(sample_axes.values()))
available_axes = sorted(sample_window.get("axes", {}).keys())
for party, trajectory in party_trajectories.items():
sorted_windows = sorted(trajectory.keys())
axes_scores = [trajectory[w]["axes"] for w in sorted_windows]
# Plot first two available axes
if len(available_axes) >= 2:
x = [
axes_scores[i].get(available_axes[0], 0)
for i in range(len(sorted_windows))
]
y = [
axes_scores[i].get(available_axes[1], 0)
for i in range(len(sorted_windows))
]
ax.plot(x, y, marker="o", label=party)
for j, w in enumerate(sorted_windows):
ax.annotate(w, (x[j], y[j]), fontsize=8, alpha=0.7)
ax.set_xlabel(f"Axis {available_axes[0]}")
ax.set_ylabel(f"Axis {available_axes[1]}")
ax.set_title("Party Voting Trajectories")
ax.legend(bbox_to_anchor=(1.05, 1), loc="upper left")
ax.axhline(y=0, color="gray", alpha=0.3)
ax.axvline(x=0, color="gray", alpha=0.3)
plt.tight_layout()
fig_path = os.path.join(output_dir, "party_trajectories.png")
plt.savefig(fig_path, dpi=150, bbox_inches="tight")
plt.close()
lines.append(f"![Party Trajectories](party_trajectories.png)")
lines.append("")
# Cross-ideological voting
cross_voting = party_result.get("cross_voting", {})
if cross_voting:
lines.append("### Cross-Ideological Voting")
lines.append("")
lines.append("| Party | Window | Axis | Example Motion IDs |")
lines.append("|-------|--------|------|-------------------|")
for party, windows_data in sorted(cross_voting.items()):
for w, axes_data in sorted(windows_data.items()):
for axis, motion_ids in sorted(axes_data.items()):
lines.append(
f"| {party} | {w} | {axis} | {', '.join(str(m) for m in motion_ids[:3])} |"
)
lines.append("")
else:
lines.append("No cross-ideological voting detected.")
lines.append("")
# Overtone Shift
if overtone_result and overtone_result.get("shift_series"):
lines.append("## Overtone Shift")
lines.append("")
lines.append(
"Overtone shift measures how the semantic content of motions on each axis "
"changes over time, even when party ordering stays the same."
)
lines.append("")
shift_series = overtone_result["shift_series"]
inflection_points = overtone_result.get("inflection_points", {})
dim_analysis = overtone_result.get("dimension_analysis", {})
for axis, shifts in shift_series.items():
avg_shift = np.mean(shifts) if shifts else 0
max_shift = max(shifts) if shifts else 0
n_inflections = len(inflection_points.get(axis, []))
lines.append(f"### Axis {axis}")
lines.append("")
lines.append(f"- **Average shift:** {avg_shift:.4f}")
lines.append(f"- **Max shift:** {max_shift:.4f}")
lines.append(f"- **Inflection points:** {n_inflections}")
lines.append("")
# Show inflection points
if inflection_points.get(axis):
for inf in inflection_points[axis]:
lines.append(
f"- **{inf['window_before']}{inf['window_after']}**: "
f"shift={inf['shift']:.4f} (median={inf['median_shift']:.4f})"
)
lines.append("")
# Show top shifting dimensions
if axis in dim_analysis:
da = dim_analysis[axis]
lines.append("**Top shifting dimensions:**")
lines.append("")
lines.append("| Dim | Mean | Std | Range |")
lines.append("|-----|------|-----|-------|")
for d, stats in sorted(
da.get("dimension_shifts", {}).items(),
key=lambda x: x[1]["range"],
reverse=True,
)[:10]:
lines.append(
f"| {d} | {stats['mean']:.4f} | {stats['std']:.4f} | {stats['range']:.4f} |"
)
lines.append("")
# Methodology
lines.append("## Methodology")
lines.append("")
lines.append(
"- **Axis stability:** Ridge regression weights (SVD_score ~ fused_embedding) per axis per window, "
"compared via max(cosine similarity, Jaccard top-100 dimensions)"
)
lines.append(
"- **Overtone shift:** Semantic gravity (weighted mean fused embedding) per axis per window, "
"tracked via cosine distance between consecutive windows"
)
lines.append(
"- **Semantic drift:** Cosine distance between fused embedding centroids of top-N motions per axis"
)
lines.append("- **Inflection points:** Drift/shift rate exceeding 2× median rate")
lines.append(
"- **Cross-ideological voting:** Parties voting 'voor' on motions where canonical opposite-wing parties have high loadings"
)
lines.append("")
lines.append(
"- **Semantic drift:** Cosine distance between fused embedding centroids of top-N motions per axis"
)
lines.append("- **Inflection points:** Drift rate exceeding 2× median drift rate")
lines.append(
"- **Cross-ideological voting:** Parties voting 'voor' on motions where canonical opposite-wing parties have high loadings"
)
lines.append("")
report_path = os.path.join(output_dir, "report.md")
with open(report_path, "w") as f:
f.write("\n".join(lines))
return report_path
def main(argv: Optional[List[str]] = None) -> int:
p = argparse.ArgumentParser(
description="Analyze semantic drift of SVD axes over time"
)
p.add_argument("--db", default="data/motions.db", help="Path to motions database")
p.add_argument(
"--windows",
nargs="+",
default=None,
help="Specific windows to analyze (default: all annual windows)",
)
p.add_argument(
"--top-n",
type=int,
default=20,
help="Number of top motions per axis (default: 20)",
)
p.add_argument(
"--output",
default="reports/drift",
help="Output directory for report and charts",
)
p.add_argument(
"--stability-threshold",
type=float,
default=0.7,
help="Similarity threshold for axis stability (default: 0.7)",
)
p.add_argument(
"--regression-alpha",
type=float,
default=0.1,
help="Lasso regression regularization strength (default: 0.1)",
)
args = p.parse_args(argv)
if not os.path.exists(args.db):
logger.error("Database not found: %s", args.db)
return 1
con = duckdb.connect(database=args.db, read_only=True)
try:
# Validate schema
missing = _validate_schema(con)
if missing:
logger.error("Missing required tables: %s", missing)
return 1
# Determine windows
if args.windows:
windows = args.windows
else:
windows = _get_annual_windows(con)
logger.info("Found %d annual windows: %s", len(windows), windows)
if len(windows) < 2:
logger.warning(
"Need at least 2 windows for drift analysis. Found: %s", windows
)
return 1
# Run analysis
logger.info("Computing axis stability...")
stability_result = compute_axis_stability(
con,
windows,
args.top_n,
stability_threshold=args.stability_threshold,
regression_alpha=args.regression_alpha,
)
logger.info("Stable axes: %s", stability_result["stable_axes"])
logger.info("Computing semantic drift...")
drift_result = compute_semantic_drift(
con, stability_result["stable_axes"], windows, args.top_n
)
logger.info("Computing overtone shift...")
weight_vectors = stability_result.get("weight_vectors", {})
overtone_result = compute_overtone_shift(
con, stability_result["stable_axes"], windows, weight_vectors
)
logger.info("Computing party voting analysis...")
party_result = compute_party_voting(
con, stability_result["stable_axes"], windows
)
# Generate report
logger.info("Generating report to %s...", args.output)
report_path = _generate_report(
args.output,
stability_result,
drift_result,
party_result,
windows,
args.top_n,
overtone_result=overtone_result,
)
logger.info("Report generated: %s", report_path)
return 0
finally:
con.close()
if __name__ == "__main__":
raise SystemExit(main())