"""motion_drift.py — Analyze semantic drift of SVD axes over time. Tracks how motion semantics evolve across annual windows (2016-2024): - Axis stability: which SVD components are comparable across time - Semantic drift: how top motions on each axis change semantically - Party voting: how parties move along axes and cross-ideological voting Usage: uv run python scripts/motion_drift.py --db data/motions.db --output reports/drift uv run python scripts/motion_drift.py --db data/motions.db --windows 2019 2020 2021 --top-n 30 """ from __future__ import annotations import argparse import json import logging import os import sys from collections import defaultdict from typing import Dict, List, Optional, Tuple import duckdb import numpy as np ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT not in sys.path: sys.path.insert(0, ROOT) logger = logging.getLogger("motion_drift") logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") def _validate_schema(con: duckdb.DuckDBPyConnection) -> List[str]: """Check that required tables exist. Returns list of missing tables.""" required = {"svd_vectors", "fused_embeddings", "mp_votes", "motions"} tables = {r[0] for r in con.execute("SHOW TABLES").fetchall()} return list(required - tables) def _load_vectors( con: duckdb.DuckDBPyConnection, table: str, window_id: str, entity_type: str ) -> Dict[str, np.ndarray]: """Load vectors from a table for a given window and entity type. Returns {entity_id: vector_array}. """ rows = con.execute( f"SELECT entity_id, vector FROM {table} WHERE window_id = ? AND entity_type = ?", [window_id, entity_type], ).fetchall() result = {} for entity_id, raw_vec in rows: if isinstance(raw_vec, str): vec = json.loads(raw_vec) elif isinstance(raw_vec, (bytes, bytearray)): vec = json.loads(raw_vec.decode()) elif isinstance(raw_vec, list): vec = raw_vec else: vec = list(raw_vec) result[entity_id] = np.array([float(v) if v is not None else 0.0 for v in vec]) return result def _load_motion_scores( con: duckdb.DuckDBPyConnection, window_id: str ) -> Dict[int, np.ndarray]: """Load motion SVD scores for a window. Returns {motion_id: score_array}.""" return _load_vectors(con, "svd_vectors", window_id, "motion") def _load_fused_embeddings( con: duckdb.DuckDBPyConnection, window_id: str ) -> Dict[str, np.ndarray]: """Load fused embeddings for a window. Returns {str(motion_id): embedding_array}.""" rows = con.execute( "SELECT motion_id, vector FROM fused_embeddings WHERE window_id = ?", [window_id], ).fetchall() result = {} for motion_id, raw_vec in rows: if isinstance(raw_vec, str): vec = json.loads(raw_vec) elif isinstance(raw_vec, (bytes, bytearray)): vec = json.loads(raw_vec.decode()) elif isinstance(raw_vec, list): vec = raw_vec else: vec = list(raw_vec) result[str(motion_id)] = np.array( [float(v) if v is not None else 0.0 for v in vec] ) return result def _get_annual_windows(con: duckdb.DuckDBPyConnection) -> List[str]: """Get list of annual windows (non-quarterly) that have fused embeddings, sorted by year.""" rows = con.execute( """ SELECT DISTINCT f.window_id FROM fused_embeddings f JOIN svd_vectors s ON f.window_id = s.window_id AND s.entity_type = 'motion' WHERE f.window_id NOT LIKE '%-Q%' ORDER BY f.window_id """ ).fetchall() return [r[0] for r in rows] def _get_top_motions_per_component( motion_scores: Dict[int, np.ndarray], top_n: int, n_components: int = 10 ) -> Dict[int, List[int]]: """Get top N motions per component by absolute loading. Returns {component_idx: [motion_ids]}. """ # Build matrix: motions × components motion_ids = list(motion_scores.keys()) if not motion_ids: return {} matrix = np.array([motion_scores[mid] for mid in motion_ids]) # Pad or truncate to n_components if matrix.shape[1] < n_components: matrix = np.pad(matrix, ((0, 0), (0, n_components - matrix.shape[1]))) matrix = matrix[:, :n_components] result = {} for comp_idx in range(n_components): abs_scores = np.abs(matrix[:, comp_idx]) top_indices = np.argsort(abs_scores)[-top_n:][::-1] result[comp_idx + 1] = [motion_ids[i] for i in top_indices] return result def compute_axis_stability( con: duckdb.DuckDBPyConnection, windows: List[str], top_n: int = 20, n_components: int = 10, stability_threshold: float = 0.7, regression_alpha: float = 1.0, ) -> Dict: """Compute axis stability across windows using Ridge regression weights. For each SVD axis and each window, fits Ridge regression: SVD_score ~ fused_embedding The weight vector (2610 dims) is the semantic signature of the axis. Stability = cosine similarity of weight vectors across window pairs. Falls back to party-based sign consistency for windows with < 50 motions. Returns dict with stability_matrix, stable_axes, reordered_axes, unstable_axes, and weight_vectors for downstream interpretation. """ from sklearn.linear_model import Lasso from sklearn.preprocessing import StandardScaler # Load data per window window_data: Dict[str, Tuple[np.ndarray, np.ndarray]] = {} for w in windows: motion_scores = _load_motion_scores(con, w) fused = _load_fused_embeddings(con, w) if not motion_scores or not fused: continue common = [m for m in motion_scores if m in fused] if len(common) < 50: continue dim = min(len(fused[m]) for m in common) X = np.array([fused[m][:dim] for m in common]) Y = np.array([motion_scores[m][:n_components] for m in common]) window_data[w] = (X, Y) if len(window_data) < 2: return _compute_stability_fallback( con, windows, n_components, stability_threshold ) # Fit Lasso regression per axis per window # Lasso (L1) produces sparse weight vectors, concentrating on the # most important embedding dimensions for each axis weight_vectors: Dict[str, Dict[int, np.ndarray]] = {} window_list = sorted(window_data.keys()) for w in window_list: X, Y = window_data[w] scaler = StandardScaler() X_scaled = scaler.fit_transform(X) weights = {} for comp_idx in range(n_components): y = Y[:, comp_idx] model = Lasso(alpha=regression_alpha, max_iter=5000) model.fit(X_scaled, y) weights[comp_idx + 1] = model.coef_ weight_vectors[w] = weights # Compute pairwise stability of weight vectors per component # Use both cosine similarity and Jaccard of top-K dimensions top_k = 100 # Compare top 100 dimensions by absolute weight stability_matrix = np.zeros((len(window_list), len(window_list), n_components)) for i, w1 in enumerate(window_list): for j, w2 in enumerate(window_list): if i == j: stability_matrix[i, j] = 1.0 continue for comp in range(1, n_components + 1): if comp not in weight_vectors[w1] or comp not in weight_vectors[w2]: stability_matrix[i, j, comp - 1] = 0.0 continue a = weight_vectors[w1][comp] b = weight_vectors[w2][comp] # Align dimensions dim = min(len(a), len(b)) a = a[:dim] b = b[:dim] # Method 1: Cosine similarity of full weight vectors norm_a = np.linalg.norm(a) norm_b = np.linalg.norm(b) if norm_a == 0 or norm_b == 0: cosine_sim = 0.0 else: cosine_sim = np.dot(a, b) / (norm_a * norm_b) # Method 2: Jaccard similarity of top-K dimensions top_a = set(np.argsort(np.abs(a))[-top_k:]) top_b = set(np.argsort(np.abs(b))[-top_k:]) jaccard = ( len(top_a & top_b) / len(top_a | top_b) if top_a | top_b else 0.0 ) # Use the maximum of both methods (more robust) stability_matrix[i, j, comp - 1] = max(cosine_sim, jaccard) # Average stability across window pairs for each component n_windows = len(window_list) avg_stability = np.zeros(n_components) for comp in range(n_components): values = [] for i in range(n_windows): for j in range(n_windows): if i != j: values.append(stability_matrix[i, j, comp]) avg_stability[comp] = np.mean(values) if values else 0.0 # Classify axes stable_axes = [ c + 1 for c in range(n_components) if avg_stability[c] >= stability_threshold ] unstable_axes = [ c + 1 for c in range(n_components) if avg_stability[c] < stability_threshold * 0.5 ] reordered_axes = [ c + 1 for c in range(n_components) if stability_threshold * 0.5 <= avg_stability[c] < stability_threshold ] return { "stability_matrix": stability_matrix, "avg_stability": avg_stability, "stable_axes": stable_axes, "reordered_axes": reordered_axes, "unstable_axes": unstable_axes, "windows": window_list, "weight_vectors": weight_vectors, } def _compute_stability_fallback( con: duckdb.DuckDBPyConnection, windows: List[str], n_components: int, stability_threshold: float, ) -> Dict: """Fallback: use sign consistency of canonical party scores for stability. Computes mean SVD scores per party per window, then checks whether canonical right-left score differences have consistent signs across windows. """ from analysis.config import CANONICAL_LEFT, CANONICAL_RIGHT party_axes: Dict[str, Dict[int, float]] = {} for w in windows: # Get MP vectors with party mapping try: rows = con.execute( """ SELECT m.party, s.vector FROM svd_vectors s JOIN mp_metadata m ON s.entity_id = m.mp_name WHERE s.window_id = ? AND s.entity_type = 'mp' AND m.party IS NOT NULL """, [w], ).fetchall() except Exception: # mp_metadata may not exist in test DBs continue party_vectors = {} for party, raw_vec in rows: if isinstance(raw_vec, str): vec = json.loads(raw_vec) else: vec = list(raw_vec) party = party.strip() if party not in party_vectors: party_vectors[party] = [] party_vectors[party].append( np.array([float(v) if v is not None else 0.0 for v in vec]) ) # Compute mean score per component for canonical parties comp_means = {} for comp in range(n_components): right_vals = [] left_vals = [] for party, vectors in party_vectors.items(): mean_score = np.mean([v[comp] for v in vectors]) if party in CANONICAL_RIGHT: right_vals.append(mean_score) elif party in CANONICAL_LEFT: left_vals.append(mean_score) if right_vals and left_vals: comp_means[comp + 1] = np.mean(right_vals) - np.mean(left_vals) if comp_means: party_axes[w] = comp_means if len(party_axes) < 2: return { "stability_matrix": np.array([]), "avg_stability": np.zeros(n_components), "stable_axes": [], "reordered_axes": [], "unstable_axes": list(range(1, n_components + 1)), "windows": list(party_axes.keys()), } # Compute sign consistency across windows window_list = sorted(party_axes.keys()) stability_matrix = np.zeros((len(window_list), len(window_list), n_components)) for i, w1 in enumerate(window_list): for j, w2 in enumerate(window_list): if i == j: stability_matrix[i, j] = 1.0 continue for comp in range(1, n_components + 1): s1 = np.sign(party_axes[w1].get(comp, 0)) s2 = np.sign(party_axes[w2].get(comp, 0)) stability_matrix[i, j, comp - 1] = 1.0 if s1 == s2 and s1 != 0 else 0.0 n_windows = len(window_list) avg_stability = np.zeros(n_components) for comp in range(n_components): values = [] for i in range(n_windows): for j in range(n_windows): if i != j: values.append(stability_matrix[i, j, comp]) avg_stability[comp] = np.mean(values) if values else 0.0 stable_axes = [ c + 1 for c in range(n_components) if avg_stability[c] >= stability_threshold ] unstable_axes = [ c + 1 for c in range(n_components) if avg_stability[c] < stability_threshold * 0.5 ] reordered_axes = [ c + 1 for c in range(n_components) if stability_threshold * 0.5 <= avg_stability[c] < stability_threshold ] return { "stability_matrix": stability_matrix, "avg_stability": avg_stability, "stable_axes": stable_axes, "reordered_axes": reordered_axes, "unstable_axes": unstable_axes, "windows": window_list, } def compute_overtone_shift( con: duckdb.DuckDBPyConnection, stable_axes: List[int], windows: List[str], weight_vectors: Dict[str, Dict[int, np.ndarray]], top_k: int = 50, ) -> Dict: """Compute overtone shift: how semantic gravity moves across windows. Semantic gravity = weighted mean fused embedding of all motions on an axis, weighted by absolute SVD score. Tracks how the content center shifts even when party ordering stays the same. Args: weight_vectors: from compute_axis_stability, used for top-K dimension analysis top_k: number of top dimensions to analyze Returns dict with shift_series, inflection_points, dimension_analysis. """ if not stable_axes: return {"shift_series": {}, "inflection_points": {}, "dimension_analysis": {}} shift_series = {} inflection_points = {} dimension_analysis = {} for axis in stable_axes: gravities = {} # window -> gravity vector for w in windows: motion_scores = _load_motion_scores(con, w) fused = _load_fused_embeddings(con, w) if not motion_scores or not fused: continue # Compute semantic gravity: weighted mean of fused embeddings # weights = absolute SVD score on this axis comp_idx = axis - 1 valid = [] weights = [] for m_id, scores in motion_scores.items(): if m_id in fused and comp_idx < len(scores): valid.append(fused[m_id]) weights.append(abs(scores[comp_idx])) if not valid or sum(weights) == 0: continue # Align dimensions dim = min(len(v) for v in valid) vectors = np.array([v[:dim] for v in valid]) w_arr = np.array(weights[: len(vectors)]) gravity = np.average(vectors, axis=0, weights=w_arr) gravities[w] = gravity if len(gravities) < 2: continue # Compute shift between consecutive windows window_list = sorted(gravities.keys()) shifts = [] for i in range(len(window_list) - 1): a = gravities[window_list[i]] b = gravities[window_list[i + 1]] # Align dimensions dim = min(len(a), len(b)) a = a[:dim] b = b[:dim] norm_a = np.linalg.norm(a) norm_b = np.linalg.norm(b) if norm_a == 0 or norm_b == 0: shifts.append(0.0) else: cosine_sim = np.dot(a, b) / (norm_a * norm_b) shifts.append(1.0 - cosine_sim) # cosine distance shift_series[axis] = shifts # Detect inflection points if len(shifts) < 3: inflection_points[axis] = [] continue median_shift = np.median(shifts) threshold = 2 * median_shift if median_shift > 0 else 0.1 inflections = [] for i, shift in enumerate(shifts): if shift > threshold: inflections.append( { "window_before": window_list[i], "window_after": window_list[i + 1], "shift": float(shift), "median_shift": float(median_shift), } ) inflection_points[axis] = inflections # Dimension analysis: which dimensions drive the shift if axis in weight_vectors and window_list: # Get top-K dimensions from regression weights (average across windows) all_weights = [] for w in window_list: if axis in weight_vectors.get(w, {}): all_weights.append(weight_vectors[w][axis]) if all_weights: avg_weights = np.mean( [ np.abs(w[: min(len(x) for x in all_weights)]) for w in all_weights ], axis=0, ) top_dims = np.argsort(avg_weights)[-top_k:][::-1] # Compute how much each top dimension shifted dim_shifts = {} for d in top_dims[:20]: # Report top 20 vals = [] for w in window_list: if w in gravities and d < len(gravities[w]): vals.append(gravities[w][d]) if len(vals) >= 2: dim_shifts[int(d)] = { "mean": float(np.mean(vals)), "std": float(np.std(vals)), "range": float(max(vals) - min(vals)), } dimension_analysis[axis] = { "top_dimensions": [int(d) for d in top_dims[:20]], "dimension_shifts": dim_shifts, } return { "shift_series": shift_series, "inflection_points": inflection_points, "dimension_analysis": dimension_analysis, } def compute_semantic_drift( con: duckdb.DuckDBPyConnection, stable_axes: List[int], windows: List[str], top_n: int = 20, n_components: int = 10, ) -> Dict: """Compute semantic drift for stable axes. Returns dict with drift_series, inflection_points, example_motions per axis. """ if not stable_axes: return {"drift_series": {}, "inflection_points": {}, "example_motions": {}} drift_series = {} inflection_points = {} example_motions = {} for axis in stable_axes: comp_idx = axis - 1 centroids = [] window_centroids = {} for w in windows: motion_scores = _load_motion_scores(con, w) fused = _load_fused_embeddings(con, w) if not motion_scores or not fused: continue # Get top N motions for this axis top_motions = _get_top_motions_per_component( motion_scores, top_n, n_components ).get(axis, []) # Get fused embeddings for top motions valid_motions = [m for m in top_motions if m in fused] if not valid_motions: continue # Compute centroid (align dimensions) vectors = [fused[m] for m in valid_motions] dim = min(len(v) for v in vectors) vectors = np.array([v[:dim] for v in vectors]) centroid = np.mean(vectors, axis=0) centroids.append(centroid) window_centroids[w] = { "centroid": centroid, "motions": valid_motions, } if len(centroids) < 2: continue # Compute cosine distance between consecutive centroids drift_values = [] for i in range(len(centroids) - 1): a, b = centroids[i], centroids[i + 1] # Align dimensions dim = min(len(a), len(b)) a = a[:dim] b = b[:dim] norm_a = np.linalg.norm(a) norm_b = np.linalg.norm(b) if norm_a == 0 or norm_b == 0: drift_values.append(0.0) else: cosine_sim = np.dot(a, b) / (norm_a * norm_b) drift_values.append(1.0 - cosine_sim) # cosine distance drift_series[axis] = drift_values # Detect inflection points (drift > 2× median) median_drift = np.median(drift_values) if drift_values else 0 threshold = 2 * median_drift if median_drift > 0 else 0.1 inflections = [] for i, drift in enumerate(drift_values): if drift > threshold: w_before = list(window_centroids.keys())[i] w_after = list(window_centroids.keys())[i + 1] inflections.append( { "window_before": w_before, "window_after": w_after, "drift": float(drift), "median_drift": float(median_drift), "transition_index": i + 1, } ) inflection_points[axis] = inflections # Collect example motions for inflection points examples = {} for inf in inflections: w_before = inf["window_before"] w_after = inf["window_after"] examples[f"{w_before}_to_{w_after}"] = { "before": window_centroids.get(w_before, {}).get("motions", [])[:3], "after": window_centroids.get(w_after, {}).get("motions", [])[:3], } example_motions[axis] = examples return { "drift_series": drift_series, "inflection_points": inflection_points, "example_motions": example_motions, } def compute_party_voting( con: duckdb.DuckDBPyConnection, stable_axes: List[int], windows: List[str], ) -> Dict: """Compute party voting centroids and detect cross-ideological voting. Returns dict with party_trajectories, cross_voting, examples. """ if not stable_axes: return {"party_trajectories": {}, "cross_voting": {}, "examples": {}} # Import canonical party sets sys.path.insert(0, ROOT) from analysis.config import CANONICAL_LEFT, CANONICAL_RIGHT party_trajectories = {} cross_voting = {} examples = {} for w in windows: motion_scores = _load_motion_scores(con, w) if not motion_scores: continue # Get party votes for this window # Parse window year for date filtering year = int(w.split("-")[0]) year_start = f"{year}-01-01" year_end = f"{year}-12-31" votes = con.execute( """ SELECT party, motion_id, vote FROM mp_votes WHERE date >= ? AND date <= ? AND vote = 'voor' """, [year_start, year_end], ).fetchall() if not votes: continue # Group motions by party party_motions = defaultdict(set) for party, motion_id, vote in votes: party_motions[party].add(int(motion_id)) # Compute party centroids along stable axes for party, motion_ids in party_motions.items(): valid_motions = [m for m in motion_ids if str(m) in motion_scores] if not valid_motions: continue vectors = np.array([motion_scores[str(m)] for m in valid_motions]) centroid = np.mean(vectors, axis=0) if party not in party_trajectories: party_trajectories[party] = {} # Only include axes that fit within the centroid vector valid_axes = [a for a in stable_axes if a - 1 < len(centroid)] party_trajectories[party][w] = { "centroid": centroid, "n_motions": len(valid_motions), "axes": {axis: float(centroid[axis - 1]) for axis in valid_axes}, } # Detect cross-ideological voting # Determine axis polarity: where do canonical right parties score? axis_polarity = {} for axis in stable_axes: comp_idx = axis - 1 right_scores = [] for party in CANONICAL_RIGHT: if party in party_trajectories and w in party_trajectories[party]: right_scores.append( party_trajectories[party][w]["axes"].get(axis, 0) ) if right_scores: mean_right = np.mean(right_scores) axis_polarity[axis] = ( "right_positive" if mean_right > 0 else "right_negative" ) # Find cross-ideological votes for party in party_motions: is_left = party in CANONICAL_LEFT or party in { "SP", "PvdA", "GL", "GroenLinks", "GroenLinks-PvdA", "DENK", "PvdD", "Volt", } is_right = party in CANONICAL_RIGHT or party in { "PVV", "FVD", "JA21", "SGP", "VVD", "CDA", "BBB", } if not (is_left or is_right): continue for motion_id in party_motions[party]: if str(motion_id) not in motion_scores: continue scores = motion_scores[str(motion_id)] # Check if motion is ideologically opposite for axis in stable_axes: comp_idx = axis - 1 score = scores[comp_idx] polarity = axis_polarity.get(axis, "right_positive") # If polarity is right_positive, high score = right-wing motion is_right_motion = ( score > 0 if polarity == "right_positive" else score < 0 ) if is_left and is_right_motion: if party not in cross_voting: cross_voting[party] = defaultdict(lambda: defaultdict(list)) cross_voting[party][w][axis].append(motion_id) elif is_right and not is_right_motion: if party not in cross_voting: cross_voting[party] = defaultdict(lambda: defaultdict(list)) cross_voting[party][w][axis].append(motion_id) # Collect examples for party, windows_data in cross_voting.items(): for w, axes_data in windows_data.items(): for axis, motion_ids in axes_data.items(): if motion_ids: key = f"{party}_{w}_axis{axis}" examples[key] = { "party": party, "window": w, "axis": axis, "motion_ids": motion_ids[:5], # Top 5 examples } return { "party_trajectories": party_trajectories, "cross_voting": cross_voting, "examples": examples, } def _generate_report( output_dir: str, stability_result: Dict, drift_result: Dict, party_result: Dict, windows: List[str], top_n: int, overtone_result: Optional[Dict] = None, ) -> str: """Generate markdown report with analysis results.""" import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt os.makedirs(output_dir, exist_ok=True) lines = [ "# Motion Semantic Drift Analysis Report", "", f"**Windows analyzed:** {', '.join(windows)}", f"**Top N motions per axis:** {top_n}", "", "---", "", ] # Summary stable = stability_result.get("stable_axes", []) inflections = drift_result.get("inflection_points", {}) cross_voting = party_result.get("cross_voting", {}) lines.append("## Summary") lines.append("") lines.append(f"- **Stable axes:** {stable if stable else 'None'}") lines.append( f"- **Axes with inflection points:** {len([k for k, v in inflections.items() if v])}" ) lines.append(f"- **Parties with cross-ideological voting:** {len(cross_voting)}") lines.append("") # Axis Stability lines.append("## Axis Stability") lines.append("") if ( stability_result.get("stability_matrix") is not None and stability_result["stability_matrix"].size > 0 and stable ): n_plots = len(stable) fig, axes = plt.subplots( 1, n_plots, figsize=(6 * max(1, n_plots), 4), ) if n_plots == 1: axes = [axes] for i, axis in enumerate(stable): ax = axes[i] matrix = stability_result["stability_matrix"][:, :, axis - 1] im = ax.imshow(matrix, cmap="RdYlGn", vmin=0, vmax=1) ax.set_xticks(range(len(windows))) ax.set_xticklabels(windows, rotation=45, ha="right") ax.set_yticks(range(len(windows))) ax.set_yticklabels(windows) ax.set_title(f"Axis {axis} Stability") fig.colorbar(im, ax=ax, label="Stability (cosine + Jaccard)") plt.tight_layout() fig_path = os.path.join(output_dir, "axis_stability.png") plt.savefig(fig_path, dpi=150, bbox_inches="tight") plt.close() lines.append(f"![Axis Stability Heatmap](axis_stability.png)") lines.append("") lines.append(f"**Stable axes (similarity > 0.7):** {stable if stable else 'None'}") lines.append(f"**Reordered axes:** {stability_result.get('reordered_axes', [])}") lines.append(f"**Unstable axes:** {stability_result.get('unstable_axes', [])}") lines.append("") # Semantic Drift lines.append("## Semantic Drift") lines.append("") drift_series = drift_result.get("drift_series", {}) if drift_series: fig, axes = plt.subplots( len(drift_series), 1, figsize=(10, 3 * len(drift_series)) ) if len(drift_series) == 1: axes = [axes] for i, (axis, values) in enumerate(drift_series.items()): ax = axes[i] x = range(1, len(values) + 1) ax.plot(x, values, marker="o", label=f"Axis {axis}") # Mark inflection points inflections = drift_result.get("inflection_points", {}).get(axis, []) for inf in inflections: x_pos = inf.get("transition_index", 1) ax.axvline(x=x_pos, color="red", alpha=0.3, linestyle="--") ax.set_xlabel("Window Transition") ax.set_ylabel("Cosine Distance") ax.set_title(f"Axis {axis} Semantic Drift") ax.legend() plt.tight_layout() fig_path = os.path.join(output_dir, "semantic_drift.png") plt.savefig(fig_path, dpi=150, bbox_inches="tight") plt.close() lines.append(f"![Semantic Drift Timeline](semantic_drift.png)") lines.append("") # Inflection point details for axis, inflections in drift_result.get("inflection_points", {}).items(): if inflections: lines.append(f"### Axis {axis} Inflection Points") lines.append("") for inf in inflections: lines.append( f"- **{inf['window_before']} → {inf['window_after']}**: drift={inf['drift']:.4f} (median={inf['median_drift']:.4f})" ) lines.append("") else: lines.append("No drift data available (no stable axes or insufficient data).") lines.append("") # Party Voting Analysis lines.append("## Party Voting Analysis") lines.append("") party_trajectories = party_result.get("party_trajectories", {}) if party_trajectories: lines.append(f"**Parties tracked:** {len(party_trajectories)}") lines.append("") # Party trajectory plot fig, ax = plt.subplots(figsize=(10, 6)) # Get stable axes from party trajectory data sample_axes = next(iter(party_trajectories.values())) sample_window = next(iter(sample_axes.values())) available_axes = sorted(sample_window.get("axes", {}).keys()) for party, trajectory in party_trajectories.items(): sorted_windows = sorted(trajectory.keys()) axes_scores = [trajectory[w]["axes"] for w in sorted_windows] # Plot first two available axes if len(available_axes) >= 2: x = [ axes_scores[i].get(available_axes[0], 0) for i in range(len(sorted_windows)) ] y = [ axes_scores[i].get(available_axes[1], 0) for i in range(len(sorted_windows)) ] ax.plot(x, y, marker="o", label=party) for j, w in enumerate(sorted_windows): ax.annotate(w, (x[j], y[j]), fontsize=8, alpha=0.7) ax.set_xlabel(f"Axis {available_axes[0]}") ax.set_ylabel(f"Axis {available_axes[1]}") ax.set_title("Party Voting Trajectories") ax.legend(bbox_to_anchor=(1.05, 1), loc="upper left") ax.axhline(y=0, color="gray", alpha=0.3) ax.axvline(x=0, color="gray", alpha=0.3) plt.tight_layout() fig_path = os.path.join(output_dir, "party_trajectories.png") plt.savefig(fig_path, dpi=150, bbox_inches="tight") plt.close() lines.append(f"![Party Trajectories](party_trajectories.png)") lines.append("") # Cross-ideological voting cross_voting = party_result.get("cross_voting", {}) if cross_voting: lines.append("### Cross-Ideological Voting") lines.append("") lines.append("| Party | Window | Axis | Example Motion IDs |") lines.append("|-------|--------|------|-------------------|") for party, windows_data in sorted(cross_voting.items()): for w, axes_data in sorted(windows_data.items()): for axis, motion_ids in sorted(axes_data.items()): lines.append( f"| {party} | {w} | {axis} | {', '.join(str(m) for m in motion_ids[:3])} |" ) lines.append("") else: lines.append("No cross-ideological voting detected.") lines.append("") # Overtone Shift if overtone_result and overtone_result.get("shift_series"): lines.append("## Overtone Shift") lines.append("") lines.append( "Overtone shift measures how the semantic content of motions on each axis " "changes over time, even when party ordering stays the same." ) lines.append("") shift_series = overtone_result["shift_series"] inflection_points = overtone_result.get("inflection_points", {}) dim_analysis = overtone_result.get("dimension_analysis", {}) for axis, shifts in shift_series.items(): avg_shift = np.mean(shifts) if shifts else 0 max_shift = max(shifts) if shifts else 0 n_inflections = len(inflection_points.get(axis, [])) lines.append(f"### Axis {axis}") lines.append("") lines.append(f"- **Average shift:** {avg_shift:.4f}") lines.append(f"- **Max shift:** {max_shift:.4f}") lines.append(f"- **Inflection points:** {n_inflections}") lines.append("") # Show inflection points if inflection_points.get(axis): for inf in inflection_points[axis]: lines.append( f"- **{inf['window_before']} → {inf['window_after']}**: " f"shift={inf['shift']:.4f} (median={inf['median_shift']:.4f})" ) lines.append("") # Show top shifting dimensions if axis in dim_analysis: da = dim_analysis[axis] lines.append("**Top shifting dimensions:**") lines.append("") lines.append("| Dim | Mean | Std | Range |") lines.append("|-----|------|-----|-------|") for d, stats in sorted( da.get("dimension_shifts", {}).items(), key=lambda x: x[1]["range"], reverse=True, )[:10]: lines.append( f"| {d} | {stats['mean']:.4f} | {stats['std']:.4f} | {stats['range']:.4f} |" ) lines.append("") # Methodology lines.append("## Methodology") lines.append("") lines.append( "- **Axis stability:** Ridge regression weights (SVD_score ~ fused_embedding) per axis per window, " "compared via max(cosine similarity, Jaccard top-100 dimensions)" ) lines.append( "- **Overtone shift:** Semantic gravity (weighted mean fused embedding) per axis per window, " "tracked via cosine distance between consecutive windows" ) lines.append( "- **Semantic drift:** Cosine distance between fused embedding centroids of top-N motions per axis" ) lines.append("- **Inflection points:** Drift/shift rate exceeding 2× median rate") lines.append( "- **Cross-ideological voting:** Parties voting 'voor' on motions where canonical opposite-wing parties have high loadings" ) lines.append("") lines.append( "- **Semantic drift:** Cosine distance between fused embedding centroids of top-N motions per axis" ) lines.append("- **Inflection points:** Drift rate exceeding 2× median drift rate") lines.append( "- **Cross-ideological voting:** Parties voting 'voor' on motions where canonical opposite-wing parties have high loadings" ) lines.append("") report_path = os.path.join(output_dir, "report.md") with open(report_path, "w") as f: f.write("\n".join(lines)) return report_path def main(argv: Optional[List[str]] = None) -> int: p = argparse.ArgumentParser( description="Analyze semantic drift of SVD axes over time" ) p.add_argument("--db", default="data/motions.db", help="Path to motions database") p.add_argument( "--windows", nargs="+", default=None, help="Specific windows to analyze (default: all annual windows)", ) p.add_argument( "--top-n", type=int, default=20, help="Number of top motions per axis (default: 20)", ) p.add_argument( "--output", default="reports/drift", help="Output directory for report and charts", ) p.add_argument( "--stability-threshold", type=float, default=0.7, help="Similarity threshold for axis stability (default: 0.7)", ) p.add_argument( "--regression-alpha", type=float, default=0.1, help="Lasso regression regularization strength (default: 0.1)", ) args = p.parse_args(argv) if not os.path.exists(args.db): logger.error("Database not found: %s", args.db) return 1 con = duckdb.connect(database=args.db, read_only=True) try: # Validate schema missing = _validate_schema(con) if missing: logger.error("Missing required tables: %s", missing) return 1 # Determine windows if args.windows: windows = args.windows else: windows = _get_annual_windows(con) logger.info("Found %d annual windows: %s", len(windows), windows) if len(windows) < 2: logger.warning( "Need at least 2 windows for drift analysis. Found: %s", windows ) return 1 # Run analysis logger.info("Computing axis stability...") stability_result = compute_axis_stability( con, windows, args.top_n, stability_threshold=args.stability_threshold, regression_alpha=args.regression_alpha, ) logger.info("Stable axes: %s", stability_result["stable_axes"]) logger.info("Computing semantic drift...") drift_result = compute_semantic_drift( con, stability_result["stable_axes"], windows, args.top_n ) logger.info("Computing overtone shift...") weight_vectors = stability_result.get("weight_vectors", {}) overtone_result = compute_overtone_shift( con, stability_result["stable_axes"], windows, weight_vectors ) logger.info("Computing party voting analysis...") party_result = compute_party_voting( con, stability_result["stable_axes"], windows ) # Generate report logger.info("Generating report to %s...", args.output) report_path = _generate_report( args.output, stability_result, drift_result, party_result, windows, args.top_n, overtone_result=overtone_result, ) logger.info("Report generated: %s", report_path) return 0 finally: con.close() if __name__ == "__main__": raise SystemExit(main())