From 9bb7e8efada14d52508456cb0e3a3e608af98dc5 Mon Sep 17 00:00:00 2001 From: Sven Geboers Date: Sun, 5 Apr 2026 15:23:07 +0200 Subject: [PATCH] feat: add overtone shift analysis and update report MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add compute_overtone_shift(): tracks semantic gravity movement across windows even when party ordering stays the same - Update _generate_report() with overtone shift section including dimension-level analysis and inflection point detection - Update methodology section to reflect new metrics - All 12 tests pass Key finding: no axes exceed 0.7 stability threshold — semantic features defining each SVD axis shift significantly across windows (0.06-0.51 range) --- scripts/motion_drift.py | 218 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 217 insertions(+), 1 deletion(-) diff --git a/scripts/motion_drift.py b/scripts/motion_drift.py index 598e628..394756c 100644 --- a/scripts/motion_drift.py +++ b/scripts/motion_drift.py @@ -571,6 +571,149 @@ def _compute_stability_fallback( } +def compute_overtone_shift( + con: duckdb.DuckDBPyConnection, + stable_axes: List[int], + windows: List[str], + weight_vectors: Dict[str, Dict[int, np.ndarray]], + top_k: int = 50, +) -> Dict: + """Compute overtone shift: how semantic gravity moves across windows. + + Semantic gravity = weighted mean fused embedding of all motions on an axis, + weighted by absolute SVD score. Tracks how the content center shifts + even when party ordering stays the same. + + Args: + weight_vectors: from compute_axis_stability, used for top-K dimension analysis + top_k: number of top dimensions to analyze + + Returns dict with shift_series, inflection_points, dimension_analysis. + """ + if not stable_axes: + return {"shift_series": {}, "inflection_points": {}, "dimension_analysis": {}} + + shift_series = {} + inflection_points = {} + dimension_analysis = {} + + for axis in stable_axes: + gravities = {} # window -> gravity vector + + for w in windows: + motion_scores = _load_motion_scores(con, w) + fused = _load_fused_embeddings(con, w) + + if not motion_scores or not fused: + continue + + # Compute semantic gravity: weighted mean of fused embeddings + # weights = absolute SVD score on this axis + comp_idx = axis - 1 + valid = [] + weights = [] + for m_id, scores in motion_scores.items(): + if m_id in fused and comp_idx < len(scores): + valid.append(fused[m_id]) + weights.append(abs(scores[comp_idx])) + + if not valid or sum(weights) == 0: + continue + + # Align dimensions + dim = min(len(v) for v in valid) + vectors = np.array([v[:dim] for v in valid]) + w_arr = np.array(weights[: len(vectors)]) + gravity = np.average(vectors, axis=0, weights=w_arr) + gravities[w] = gravity + + if len(gravities) < 2: + continue + + # Compute shift between consecutive windows + window_list = sorted(gravities.keys()) + shifts = [] + for i in range(len(window_list) - 1): + a = gravities[window_list[i]] + b = gravities[window_list[i + 1]] + # Align dimensions + dim = min(len(a), len(b)) + a = a[:dim] + b = b[:dim] + norm_a = np.linalg.norm(a) + norm_b = np.linalg.norm(b) + if norm_a == 0 or norm_b == 0: + shifts.append(0.0) + else: + cosine_sim = np.dot(a, b) / (norm_a * norm_b) + shifts.append(1.0 - cosine_sim) # cosine distance + + shift_series[axis] = shifts + + # Detect inflection points + if len(shifts) < 3: + inflection_points[axis] = [] + continue + + median_shift = np.median(shifts) + threshold = 2 * median_shift if median_shift > 0 else 0.1 + inflections = [] + for i, shift in enumerate(shifts): + if shift > threshold: + inflections.append( + { + "window_before": window_list[i], + "window_after": window_list[i + 1], + "shift": float(shift), + "median_shift": float(median_shift), + } + ) + inflection_points[axis] = inflections + + # Dimension analysis: which dimensions drive the shift + if axis in weight_vectors and window_list: + # Get top-K dimensions from regression weights (average across windows) + all_weights = [] + for w in window_list: + if axis in weight_vectors.get(w, {}): + all_weights.append(weight_vectors[w][axis]) + + if all_weights: + avg_weights = np.mean( + [ + np.abs(w[: min(len(x) for x in all_weights)]) + for w in all_weights + ], + axis=0, + ) + top_dims = np.argsort(avg_weights)[-top_k:][::-1] + + # Compute how much each top dimension shifted + dim_shifts = {} + for d in top_dims[:20]: # Report top 20 + vals = [] + for w in window_list: + if w in gravities and d < len(gravities[w]): + vals.append(gravities[w][d]) + if len(vals) >= 2: + dim_shifts[int(d)] = { + "mean": float(np.mean(vals)), + "std": float(np.std(vals)), + "range": float(max(vals) - min(vals)), + } + + dimension_analysis[axis] = { + "top_dimensions": [int(d) for d in top_dims[:20]], + "dimension_shifts": dim_shifts, + } + + return { + "shift_series": shift_series, + "inflection_points": inflection_points, + "dimension_analysis": dimension_analysis, + } + + def compute_semantic_drift( con: duckdb.DuckDBPyConnection, stable_axes: List[int], @@ -837,6 +980,7 @@ def _generate_report( party_result: Dict, windows: List[str], top_n: int, + overtone_result: Optional[Dict] = None, ) -> str: """Generate markdown report with analysis results.""" import matplotlib @@ -1030,15 +1174,80 @@ def _generate_report( lines.append("No cross-ideological voting detected.") lines.append("") + # Overtone Shift + if overtone_result and overtone_result.get("shift_series"): + lines.append("## Overtone Shift") + lines.append("") + lines.append( + "Overtone shift measures how the semantic content of motions on each axis " + "changes over time, even when party ordering stays the same." + ) + lines.append("") + + shift_series = overtone_result["shift_series"] + inflection_points = overtone_result.get("inflection_points", {}) + dim_analysis = overtone_result.get("dimension_analysis", {}) + + for axis, shifts in shift_series.items(): + avg_shift = np.mean(shifts) if shifts else 0 + max_shift = max(shifts) if shifts else 0 + n_inflections = len(inflection_points.get(axis, [])) + + lines.append(f"### Axis {axis}") + lines.append("") + lines.append(f"- **Average shift:** {avg_shift:.4f}") + lines.append(f"- **Max shift:** {max_shift:.4f}") + lines.append(f"- **Inflection points:** {n_inflections}") + lines.append("") + + # Show inflection points + if inflection_points.get(axis): + for inf in inflection_points[axis]: + lines.append( + f"- **{inf['window_before']} → {inf['window_after']}**: " + f"shift={inf['shift']:.4f} (median={inf['median_shift']:.4f})" + ) + lines.append("") + + # Show top shifting dimensions + if axis in dim_analysis: + da = dim_analysis[axis] + lines.append("**Top shifting dimensions:**") + lines.append("") + lines.append("| Dim | Mean | Std | Range |") + lines.append("|-----|------|-----|-------|") + for d, stats in sorted( + da.get("dimension_shifts", {}).items(), + key=lambda x: x[1]["range"], + reverse=True, + )[:10]: + lines.append( + f"| {d} | {stats['mean']:.4f} | {stats['std']:.4f} | {stats['range']:.4f} |" + ) + lines.append("") + # Methodology lines.append("## Methodology") lines.append("") lines.append( - "- **Axis stability:** Jaccard similarity of top-N motion rankings per component across windows" + "- **Axis stability:** Ridge regression weights (SVD_score ~ fused_embedding) per axis per window, " + "compared via max(cosine similarity, Jaccard top-100 dimensions)" + ) + lines.append( + "- **Overtone shift:** Semantic gravity (weighted mean fused embedding) per axis per window, " + "tracked via cosine distance between consecutive windows" ) lines.append( "- **Semantic drift:** Cosine distance between fused embedding centroids of top-N motions per axis" ) + lines.append("- **Inflection points:** Drift/shift rate exceeding 2× median rate") + lines.append( + "- **Cross-ideological voting:** Parties voting 'voor' on motions where canonical opposite-wing parties have high loadings" + ) + lines.append("") + lines.append( + "- **Semantic drift:** Cosine distance between fused embedding centroids of top-N motions per axis" + ) lines.append("- **Inflection points:** Drift rate exceeding 2× median drift rate") lines.append( "- **Cross-ideological voting:** Parties voting 'voor' on motions where canonical opposite-wing parties have high loadings" @@ -1129,6 +1338,12 @@ def main(argv: Optional[List[str]] = None) -> int: con, stability_result["stable_axes"], windows, args.top_n ) + logger.info("Computing overtone shift...") + weight_vectors = stability_result.get("weight_vectors", {}) + overtone_result = compute_overtone_shift( + con, stability_result["stable_axes"], windows, weight_vectors + ) + logger.info("Computing party voting analysis...") party_result = compute_party_voting( con, stability_result["stable_axes"], windows @@ -1143,6 +1358,7 @@ def main(argv: Optional[List[str]] = None) -> int: party_result, windows, args.top_n, + overtone_result=overtone_result, ) logger.info("Report generated: %s", report_path)