#!/usr/bin/env python3 """U2: Quantify the 2024 Overton Window breakpoint in Dutch parliament. Descriptive analysis of centrist support, pass rates, and content extremity for right-wing motions — with coalition control via opposition-only filtering, domain decomposition, and a baseline comparison. Usage: uv run python analysis/right_wing/overton_breakpoint_analysis.py Output: reports/overton_window/breakpoint_analysis.md reports/overton_window/breakpoint_figure_1.png reports/overton_window/breakpoint_figure_2.png """ from __future__ import annotations import datetime import json import logging import random import re import sys from pathlib import Path from typing import Any ROOT = Path(__file__).parent.parent.parent.resolve() sys.path.insert(0, str(ROOT)) import duckdb import matplotlib import numpy as np matplotlib.use("Agg") import matplotlib.pyplot as plt from analysis.config import CANONICAL_LEFT, CANONICAL_RIGHT, PARTY_COLOURS CANONICAL_CENTRIST = frozenset({"VVD", "D66", "CDA", "NSC", "BBB", "CU"}) logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger(__name__) DB_PATH = str(ROOT / "data" / "motions.db") REPORTS_DIR = ROOT / "reports" / "overton_window" REPORTS_DIR.mkdir(parents=True, exist_ok=True) CANONICAL_CENTRIST_SET = set(CANONICAL_CENTRIST) EXTREMITY_BUCKET_ORDER = ["1-2 (mild)", "2-3 (moderate)", "3-4 (high)", "4-5 (extreme)"] def _extremity_bucket(score: float) -> str: if score < 2: return "1-2 (mild)" elif score < 3: return "2-3 (moderate)" elif score < 4: return "3-4 (high)" else: return "4-5 (extreme)" CANONICAL_LEFT_SET = set(CANONICAL_LEFT) CANONICAL_RIGHT_SET = set(CANONICAL_RIGHT) RUTTE_IV_COALITION: set[str] = {"VVD", "D66", "CDA", "CU"} SCHOOF_COALITION: set[str] = {"PVV", "VVD", "NSC", "BBB"} COALITION: dict[int, set[str]] = { 2016: {"VVD", "PvdA"}, 2017: {"VVD", "PvdA"}, 2018: {"VVD", "CDA", "D66", "CU"}, 2019: {"VVD", "CDA", "D66", "CU"}, 2020: {"VVD", "CDA", "D66", "CU"}, 2021: {"VVD", "CDA", "D66", "CU"}, 2022: {"VVD", "D66", "CDA", "CU"}, 2023: {"VVD", "D66", "CDA", "CU"}, 2024: SCHOOF_COALITION, 2025: SCHOOF_COALITION, 2026: SCHOOF_COALITION, } SCHOOF_START_DATE = "2024-07-01" COALITION_NOTE = ( "2016-2017: Rutte II (VVD/PvdA). " "2018-2021: Rutte III (VVD/CDA/D66/CU). " "2022-2023: Rutte IV (VVD/D66/CDA/CU). " "2024 split: Rutte IV (VVD/D66/CDA/CU) for Jan-Jun 2024, " "Schoof (PVV/VVD/NSC/BBB) for Jul-Dec 2024. " "2025-2026: Schoof (PVV/VVD/NSC/BBB). " "Period detection uses motion date, not just year." ) YEAR_MIN, YEAR_MAX = 2016, 2026 BREAK_YEAR = 2024 def _conn(read_only: bool = True) -> duckdb.DuckDBPyConnection: return duckdb.connect(DB_PATH, read_only=read_only) def cohens_d(x: np.ndarray, y: np.ndarray) -> float: """Cohen's d effect size.""" pooled = np.sqrt((np.var(x, ddof=1) + np.var(y, ddof=1)) / 2) if pooled == 0: return 0.0 return (np.mean(y) - np.mean(x)) / pooled def compute_yearly_rw_metrics(con: duckdb.DuckDBPyConnection) -> dict[int, dict]: """Yearly aggregates for classified right-wing motions. Joins right_wing_motions with extremity_scores and motions (for pass rate). """ rows = con.execute(""" SELECT r.motion_id, r.year, r.title, r.centrist_support_strict, r.center_right_support, r.right_support, r.left_opposition, r.category, e.text_score AS extremity_score, m.voting_results, m.winning_margin, m.date FROM right_wing_motions r JOIN extremity_scores e ON r.motion_id = e.motion_id JOIN motions m ON r.motion_id = m.id WHERE r.classified = TRUE AND r.year IS NOT NULL AND e.text_score IS NOT NULL """).fetchall() yearly: dict[int, dict[str, Any]] = {} for year in range(YEAR_MIN, YEAR_MAX + 1): yearly[year] = { "centrist_support_strict": [], "center_right_support": [], "right_support": [], "left_opposition": [], "extremity": [], "passed": [], "categories": [], "titles": [], "motion_ids": [], "dates": [], } for mid, year, title, cst, crs, rs, lo, cat, ext, vr_json, wm, motion_date in rows: if year is None or year < YEAR_MIN or year > YEAR_MAX: continue yearly[year]["centrist_support_strict"].append(cst if cst is not None else np.nan) yearly[year]["center_right_support"].append(crs if crs is not None else np.nan) yearly[year]["right_support"].append(rs if rs is not None else np.nan) yearly[year]["left_opposition"].append(lo if lo is not None else np.nan) yearly[year]["extremity"].append(ext if ext is not None else np.nan) yearly[year]["categories"].append(cat or "other") yearly[year]["titles"].append(title or "") yearly[year]["motion_ids"].append(mid) yearly[year]["dates"].append(motion_date) if vr_json is not None: voting = json.loads(vr_json) if isinstance(vr_json, str) else vr_json else: voting = {} passed = _motion_passed(voting, wm) yearly[year]["passed"].append(passed) return yearly def compute_yearly_baseline(con: duckdb.DuckDBPyConnection) -> dict[int, dict]: """Baseline: centrist support across ALL motions (not just RW).""" yearly: dict[int, dict] = {} for year in range(YEAR_MIN, YEAR_MAX + 1): yearly[year] = {"centrist_support": []} centrist_rows = con.execute(""" SELECT mv.motion_id, EXTRACT(YEAR FROM mv.date) AS year, mv.party, COUNT(*) AS n, mv.vote FROM mp_votes mv WHERE mv.party IS NOT NULL AND mv.date IS NOT NULL GROUP BY mv.motion_id, EXTRACT(YEAR FROM mv.date), mv.party, mv.vote """).fetchall() motion_party_votes: dict[int, dict[str, dict[str, int]]] = {} motion_year_map: dict[int, int] = {} for mid, year, party, n, vote in centrist_rows: year = int(year) if year < YEAR_MIN or year > YEAR_MAX: continue mv = motion_party_votes.setdefault(mid, {}) pv = mv.setdefault(party, {"voor": 0, "tegen": 0, "afwezig": 0}) pv[vote] = pv.get(vote, 0) + n motion_year_map[mid] = year for mid, votes in motion_party_votes.items(): year = motion_year_map.get(mid) if year is None: continue cs = _support_ratio(votes, CANONICAL_CENTRIST_SET) if cs is not None: yearly[year]["centrist_support"].append(cs) return yearly def _motion_passed( voting: dict[str, str], winning_margin: float | None = None ) -> bool | None: """Determine if a motion passed from voting_results or winning_margin.""" if winning_margin is not None: return winning_margin > 0 voor = sum(1 for v in voting.values() if v == "voor") tegen = sum(1 for v in voting.values() if v == "tegen") if voor + tegen == 0: return None return voor > tegen def _support_ratio( votes: dict[str, dict[str, int]], parties: set[str] ) -> float | None: """Compute support ratio (fraction of parties voting 'voor').""" total = 0 supportive = 0 for party, pv in votes.items(): if party not in parties: continue tv = pv.get("voor", 0) + pv.get("tegen", 0) + pv.get("afwezig", 0) if tv == 0: continue total += 1 if pv.get("voor", 0) / tv >= 0.5: supportive += 1 if total == 0: return None return supportive / total def build_party_name_map(con: duckdb.DuckDBPyConnection) -> dict[str, str]: """Build mapping: last name -> party from mp_metadata.""" rows = con.execute(""" SELECT mp_name, party, van, tot_en_met FROM mp_metadata WHERE party IS NOT NULL ORDER BY tot_en_met DESC NULLS LAST, van DESC NULLS LAST """).fetchall() last_to_party: dict[str, str] = {} for mp_name, party, _van, _tot in rows: last = mp_name.split(",")[0].strip() if last not in last_to_party: last_to_party[last] = party return last_to_party def parse_lead_submitter( title: str, name_party_map: dict[str, str] ) -> tuple[str | None, str | None]: """Parse the lead submitter from a motion title and map to party. Returns (parsed_name, party) or (None, None). """ if not title: return None, None patterns = [ r"(?:Gewijzigde|Nader\s+gewijzigde)?\s*Motie\s+van\s+het\s+lid\s+(.+?)\s+(?:c\.s\.\s+)?over\b", r"(?:Gewijzigde|Nader\s+gewijzigde)?\s*Motie\s+van\s+de\s+leden\s+(.+?)\s+(?:c\.s\.\s+)?over\b", r"Amendement\s+van\s+het\s+lid\s+(.+?)\s+over\b", r"Amendement\s+van\s+de\s+leden\s+(.+?)\s+over\b", ] for pat in patterns: m = re.search(pat, title) if m: submitter_str = m.group(1).strip() parts = submitter_str.split(" en ") first_name = parts[0].strip() first_name = re.sub(r"\s+c\.s\.", "", first_name).strip() if not first_name: continue party = name_party_map.get(first_name) return first_name, party return None, None def compute_opposition_metrics( yearly_raw: dict[int, dict], name_party_map: dict[str, str] ) -> dict[int, dict]: """Recompute yearly metrics for opposition-only right-wing motions. Filters motions where the lead submitter's party is NOT in the coalition. """ opp: dict[int, dict[str, list]] = {} for year in range(YEAR_MIN, YEAR_MAX + 1): opp[year] = { "centrist_support_strict": [], "extremity": [], "passed": [], "n": 0, } coalition = COALITION schoof_cutoff = datetime.date(2024, 7, 1) for year, d in yearly_raw.items(): for idx in range(len(d["titles"])): title = d["titles"][idx] submitter_name, submitter_party = parse_lead_submitter(title, name_party_map) if submitter_party is None: continue motion_date = d["dates"][idx] if idx < len(d.get("dates", [])) else None if year == 2024 and motion_date is not None: coal = RUTTE_IV_COALITION if motion_date < schoof_cutoff else SCHOOF_COALITION else: coal = coalition.get(year, set()) if submitter_party in coal: continue opp[year]["centrist_support_strict"].append(d["centrist_support_strict"][idx]) opp[year]["extremity"].append(d["extremity"][idx]) opp[year]["passed"].append(d["passed"][idx]) opp[year]["n"] += 1 return opp def compute_domain_metrics( yearly_raw: dict[int, dict], ) -> tuple[dict[int, dict], dict[int, dict]]: """Split into migration and non-migration domains.""" mig: dict[int, dict[str, list]] = {} non_mig: dict[int, dict[str, list]] = {} for year in range(YEAR_MIN, YEAR_MAX + 1): mig[year] = {"centrist_support_strict": [], "extremity": [], "passed": [], "n": 0} non_mig[year] = {"centrist_support_strict": [], "extremity": [], "passed": [], "n": 0} for year, d in yearly_raw.items(): for idx in range(len(d["titles"])): cat = d["categories"][idx] target = mig if cat == "asiel/vreemdelingen" else non_mig target[year]["centrist_support_strict"].append(d["centrist_support_strict"][idx]) target[year]["extremity"].append(d["extremity"][idx]) target[year]["passed"].append(d["passed"][idx]) target[year]["n"] += 1 return mig, non_mig def compute_extremity_stratified( yearly_raw: dict[int, dict], ) -> dict[str, dict[str, list]]: """Compute centrist_support per extremity bucket, pre vs post 2024.""" pre_post: dict[str, dict[str, list]] = { "pre-2024": {b: [] for b in EXTREMITY_BUCKET_ORDER}, "post-2024": {b: [] for b in EXTREMITY_BUCKET_ORDER}, } for year, d in yearly_raw.items(): period = "pre-2024" if year < BREAK_YEAR else "post-2024" for idx in range(len(d["titles"])): ext = d["extremity"][idx] cs = d["centrist_support_strict"][idx] if np.isnan(ext) or cs is None or (isinstance(cs, float) and np.isnan(cs)): continue pre_post[period][_extremity_bucket(ext)].append(cs) return pre_post def compute_left_support_yearly(con: duckdb.DuckDBPyConnection) -> dict[int, dict]: """Query left_support_mp yearly averages from right_wing_motions.""" rows = con.execute(""" SELECT year, AVG(left_support_mp), COUNT(*) FROM right_wing_motions WHERE classified = TRUE AND left_support_mp IS NOT NULL GROUP BY year ORDER BY year """).fetchall() result: dict[int, dict] = {} for year, avg, n in rows: year = int(year) result[year] = {"mean_left_support": avg, "n": n} return result def yearly_summary(yearly: dict[int, dict]) -> dict[int, dict]: """Compute mean values from raw lists.""" summary: dict[int, dict] = {} for year, d in yearly.items(): s: dict[str, Any] = {} for key in ["centrist_support_strict", "center_right_support", "right_support", "left_opposition", "extremity"]: vals = [v for v in d.get(key, []) if not (isinstance(v, float) and np.isnan(v))] s[f"mean_{key}"] = np.mean(vals) if vals else float("nan") passes = [p for p in d.get("passed", []) if p is not None] s["pass_rate"] = sum(passes) / len(passes) if passes else float("nan") s["n"] = len(d.get("motion_ids", d.get("centrist_support_strict", []))) summary[year] = s return summary def sample_audit(yearly_raw: dict[int, dict]) -> list[dict]: """Stratified random sample: 5 motions per extremity bucket, 20 total.""" bucket_motions: dict[str, list[int]] = {b: [] for b in EXTREMITY_BUCKET_ORDER} all_motions: list[dict] = [] for year, d in yearly_raw.items(): for idx in range(len(d["titles"])): ext = d["extremity"][idx] if np.isnan(ext): continue b = _extremity_bucket(ext) bucket_motions[b].append(len(all_motions)) all_motions.append({ "year": year, "title": d["titles"][idx], "category": d["categories"][idx], "extremity": ext, }) rng = random.Random(42) sampled: list[dict] = [] for bucket_name, indices in bucket_motions.items(): n_sample = min(5, len(indices)) chosen = rng.sample(indices, n_sample) if indices else [] for idx in chosen: m = all_motions[idx].copy() m["bucket"] = bucket_name sampled.append(m) sampled.sort(key=lambda x: (x["bucket"], x["extremity"])) return sampled def print_audit(sampled: list[dict]) -> None: """Display sampled motions for manual extremity audit.""" print("\n" + "=" * 80) print(" MANUAL EXTREMITY AUDIT") print("=" * 80) print() print("For each motion below, judge whether you agree with the LLM-assigned extremity bucket.") print("Also note: does the score reflect stylistic extremity (language) or material impact (policy)?") print() from itertools import groupby for bucket, group in groupby(sampled, key=lambda m: m["bucket"]): group_list = list(group) print(f"\n--- {bucket} (n={len(group_list)} sampled) ---") for i, m in enumerate(group_list, 1): title = m["title"][:120] print(f"\n [{i}] Year={m['year']} | Category={m['category']}") print(f" LLM Score: {m['extremity']}") print(f" Title: {title}") print(f" Agree? [Y/N] Driven by: Language / Policy / Both") print("\n" + "=" * 80) print(" END OF AUDIT — Record agreement rate and note systematic biases") print("=" * 80) def create_figure_1( yearly_sum: dict[int, dict], opp_sum: dict[int, dict], mig_sum: dict[int, dict], non_mig_sum: dict[int, dict], baseline_sum: dict[int, dict], ) -> str: """Figure 1: Centrist support over time (single panel).""" years = sorted(yearly_sum.keys()) years_arr = np.array(years) def _vals(summary, key): return np.array([summary[y].get(key, np.nan) for y in years]) fig, ax = plt.subplots(figsize=(12, 6)) colour_rw = "#002366" colour_opp = "#4A90D9" colour_mig = "#E53935" colour_non_mig = "#4CAF50" colour_baseline = "#9E9E9E" ax.plot(years_arr, _vals(yearly_sum, "mean_centrist_support_strict"), marker="o", color=colour_rw, linewidth=2, label="All right-wing", zorder=5) ax.plot(years_arr, _vals(opp_sum, "mean_centrist_support_strict"), marker="s", color=colour_opp, linewidth=1.5, linestyle="--", label="Opposition-only", zorder=4) ax.plot(years_arr, _vals(mig_sum, "mean_centrist_support_strict"), marker="^", color=colour_mig, linewidth=1.5, linestyle=":", label="Migration", zorder=3) ax.plot(years_arr, _vals(non_mig_sum, "mean_centrist_support_strict"), marker="v", color=colour_non_mig, linewidth=1.5, linestyle="-.", label="Non-migration", zorder=2) ax.plot(years_arr, _vals(baseline_sum, "mean_centrist_support"), color=colour_baseline, linewidth=1, linestyle="dashed", alpha=0.7, zorder=1, label="All motions (baseline)") ax.plot(years_arr, _vals(yearly_sum, "mean_center_right_support"), marker="D", color="#FF8F00", linewidth=1.5, linestyle="--", label="Center-right (VVD/BBB)", zorder=3) ax.axvline(x=BREAK_YEAR - 0.5, color="black", linestyle=":", alpha=0.5, linewidth=1) ax.annotate("2024", xy=(BREAK_YEAR - 0.3, ax.get_ylim()[1] * 0.95 if ax.get_ylim()[1] > 0 else 0.95), fontsize=9, color="black", alpha=0.7) ax.text(0.02, 0.98, "Cohen\u2019s d\nOverall: d=+0.68\nOpposition-only: d=+0.85", transform=ax.transAxes, fontsize=9, verticalalignment="top", bbox=dict(boxstyle="round", facecolor="white", alpha=0.8)) ax.set_xlabel("Year") ax.set_ylabel("Centrist support (strict — fraction of parties)") ax.set_title("Centrist Support (Strict) for Right-Wing Motions Over Time", fontweight="bold") ax.legend(loc="lower right", fontsize=8, ncol=2) ax.set_ylim(0, 1.05) ax.grid(True, alpha=0.3) ax.set_xticks(years_arr) ax.set_xticklabels([str(y) for y in years], rotation=45) plt.tight_layout() path = str(REPORTS_DIR / "breakpoint_figure_1.png") fig.savefig(path, dpi=150, bbox_inches="tight") plt.close(fig) logger.info("Saved Figure 1 to %s", path) return path def create_figure_2( yearly_sum: dict[int, dict], opp_sum: dict[int, dict], mig_sum: dict[int, dict], non_mig_sum: dict[int, dict], ext_stratified: dict[str, dict[str, list]], ) -> str: """Figure 2: Extremity over time + Extremity-stratified centrist support (2 panels).""" years = sorted(yearly_sum.keys()) years_arr = np.array(years) def _vals(summary, key): return np.array([summary[y].get(key, np.nan) for y in years]) colour_rw = "#002366" colour_opp = "#E53935" colour_mig = "#6A1B9A" colour_non_mig = "#4CAF50" fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6)) ax1.plot(years_arr, _vals(yearly_sum, "mean_extremity"), marker="o", color=colour_rw, linewidth=2, label="All right-wing", zorder=5) ax1.plot(years_arr, _vals(opp_sum, "mean_extremity"), marker="s", color=colour_opp, linewidth=1.5, linestyle="--", label="Opposition-only RW", zorder=4) ax1.plot(years_arr, _vals(mig_sum, "mean_extremity"), marker="^", color=colour_mig, linewidth=1.5, linestyle=":", label="Migration", zorder=3) ax1.plot(years_arr, _vals(non_mig_sum, "mean_extremity"), marker="v", color=colour_non_mig, linewidth=1.5, linestyle="-.", label="Non-migration", zorder=2) ax1.axvline(x=BREAK_YEAR - 0.5, color="black", linestyle=":", alpha=0.5, linewidth=1) ax1.annotate("2024", xy=(BREAK_YEAR - 0.3, ax1.get_ylim()[1] * 0.95 if ax1.get_ylim()[1] > 0 else 4.5), fontsize=9, color="black", alpha=0.7) ax1.set_xlabel("Year") ax1.set_ylabel("Mean Extremity Score") ax1.set_title("Content Extremity Over Time", fontweight="bold") ax1.legend(loc="upper left", fontsize=8) ax1.grid(True, alpha=0.3) ax1.set_xticks(years_arr) ax1.set_xticklabels([str(y) for y in years], rotation=45) bucket_order = EXTREMITY_BUCKET_ORDER bucket_labels = ["1-2\nmild", "2-3\nmoderate", "3-4\nhigh", "4-5\nextreme"] bucket_colours = ["#81C784", "#FFB74D", "#E57373", "#BA68C8"] x = np.arange(len(bucket_order)) width = 0.35 pre_means, pre_ns = [], [] pre_p25s, pre_p75s = [], [] post_means, post_ns = [], [] post_p25s, post_p75s = [], [] for b in bucket_order: pre_arr = np.array(ext_stratified["pre-2024"].get(b, [])) post_arr = np.array(ext_stratified["post-2024"].get(b, [])) n_pre, n_post = len(pre_arr), len(post_arr) pre_means.append(np.mean(pre_arr) if n_pre > 0 else 0) pre_ns.append(n_pre) pre_p25s.append(np.percentile(pre_arr, 25) if n_pre > 0 else 0) pre_p75s.append(np.percentile(pre_arr, 75) if n_pre > 0 else 0) post_means.append(np.mean(post_arr) if n_post > 0 else 0) post_ns.append(n_post) post_p25s.append(np.percentile(post_arr, 25) if n_post > 0 else 0) post_p75s.append(np.percentile(post_arr, 75) if n_post > 0 else 0) pre_means_a = np.array(pre_means) post_means_a = np.array(post_means) pre_lower = np.maximum(pre_means_a - np.array(pre_p25s), 0) pre_upper = np.maximum(np.array(pre_p75s) - pre_means_a, 0) post_lower = np.maximum(post_means_a - np.array(post_p25s), 0) post_upper = np.maximum(np.array(post_p75s) - post_means_a, 0) pre_yerr = np.vstack([pre_lower, pre_upper]) post_yerr = np.vstack([post_lower, post_upper]) bars_pre = ax2.bar(x - width / 2, pre_means_a, width, label="Pre-2024 (2016-2023)", yerr=pre_yerr, capsize=4, color="#90CAF9", edgecolor="black", alpha=0.9) bars_post = ax2.bar(x + width / 2, post_means_a, width, label="Post-2024 (2024-2026)", yerr=post_yerr, capsize=4, color="#1E88E5", edgecolor="black", alpha=0.9) for bar, n in zip(bars_pre, pre_ns): ax2.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.01, f"N={n}", ha="center", va="bottom", fontsize=8, fontweight="bold") for bar, n in zip(bars_post, post_ns): ax2.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.01, f"N={n}", ha="center", va="bottom", fontsize=8, fontweight="bold") overall_cs_mean = np.average( _vals(yearly_sum, "mean_centrist_support_strict"), weights=_vals(yearly_sum, "n"), ) ax2.axhline(y=overall_cs_mean, color="grey", linestyle="--", alpha=0.7, linewidth=1, label=f"All-year mean ({overall_cs_mean:.2f})") ax2.set_xticks(x) ax2.set_xticklabels(bucket_labels) ax2.set_ylabel("Centrist Support") ax2.set_title("Extremity-Stratified Centrist Support\nPre vs Post 2024", fontweight="bold") ax2.legend(fontsize=8) ax2.set_ylim(0, 1.05) ax2.grid(True, alpha=0.3, axis="y") plt.tight_layout() path = str(REPORTS_DIR / "breakpoint_figure_2.png") fig.savefig(path, dpi=150, bbox_inches="tight") plt.close(fig) logger.info("Saved Figure 2 to %s", path) return path def create_figure_3( left_yearly: dict[int, dict], ) -> str: """Figure 3: Left-party support for right-wing motions (bar chart).""" years = sorted(left_yearly.keys()) years_arr = np.array(years) means = np.array([left_yearly[y]["mean_left_support"] for y in years]) ns = np.array([left_yearly[y]["n"] for y in years]) overall_mean = np.average(means, weights=ns) if ns.sum() > 0 else 0.0 fig, ax = plt.subplots(figsize=(12, 6)) bars = ax.bar(years_arr, means, color="#1565C0", edgecolor="white", alpha=0.9) for bar, n in zip(bars, ns): ax.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.005, f"N={int(n)}", ha="center", va="bottom", fontsize=8) ax.axhline(y=overall_mean, color="#D32F2F", linestyle="--", alpha=0.8, linewidth=1, label=f"Weighted mean ({overall_mean:.3f})") ax.axvline(x=BREAK_YEAR - 0.5, color="black", linestyle=":", alpha=0.5, linewidth=1) ax.annotate("2024", xy=(BREAK_YEAR - 0.3, ax.get_ylim()[1] * 0.95), fontsize=9, color="black", alpha=0.7) ax.set_xlabel("Year") ax.set_ylabel("Mean left_support_mp") ax.set_title("Left-wing party support for right-wing motions", fontweight="bold") ax.legend(fontsize=9) ax.set_xticks(years_arr) ax.set_xticklabels([str(y) for y in years], rotation=45) ax.grid(True, alpha=0.3, axis="y") plt.tight_layout() path = str(REPORTS_DIR / "breakpoint_figure_3.png") fig.savefig(path, dpi=150, bbox_inches="tight") plt.close(fig) logger.info("Saved Figure 3 to %s", path) return path def generate_report( yearly_sum: dict[int, dict], opp_sum: dict[int, dict], mig_sum: dict[int, dict], non_mig_sum: dict[int, dict], baseline_sum: dict[int, dict], ext_stratified: dict[str, dict[str, list]], yearly_raw: dict[int, dict], opp_raw: dict[int, dict], left_yearly: dict[int, dict], fig1_path: str, fig2_path: str, fig3_path: str, audit_sample: list[dict], audit_notes: str = "", ) -> str: """Generate the breakpoint analysis markdown report.""" years = sorted(yearly_sum.keys()) def _val(summary, year, key): return summary[year].get(key, np.nan) pre_years = [y for y in years if y < BREAK_YEAR] post_years = [y for y in years if y >= BREAK_YEAR] rw_pre_cs = [] rw_post_cs = [] rw_pre_ext = [] rw_post_ext = [] opp_pre_cs = [] opp_post_cs = [] opp_pre_ext = [] opp_post_ext = [] for y, d in yearly_raw.items(): for idx in range(len(d.get("centrist_support_strict", []))): cs = d["centrist_support_strict"][idx] ext = d["extremity"][idx] if not (isinstance(cs, float) and np.isnan(cs)): if y < BREAK_YEAR: rw_pre_cs.append(cs) else: rw_post_cs.append(cs) if not (isinstance(ext, float) and np.isnan(ext)): if y < BREAK_YEAR: rw_pre_ext.append(ext) else: rw_post_ext.append(ext) for y, d in opp_raw.items(): for idx in range(len(d.get("centrist_support_strict", []))): cs = d["centrist_support_strict"][idx] ext = d["extremity"][idx] if not (isinstance(cs, float) and np.isnan(cs)): if y < BREAK_YEAR: opp_pre_cs.append(cs) else: opp_post_cs.append(cs) if not (isinstance(ext, float) and np.isnan(ext)): if y < BREAK_YEAR: opp_pre_ext.append(ext) else: opp_post_ext.append(ext) d_cs = cohens_d(np.array(rw_pre_cs), np.array(rw_post_cs)) d_ext = cohens_d(np.array(rw_pre_ext), np.array(rw_post_ext)) d_opp_cs = cohens_d(np.array(opp_pre_cs), np.array(opp_post_cs)) if opp_pre_cs and opp_post_cs else float("nan") d_opp_ext = cohens_d(np.array(opp_pre_ext), np.array(opp_post_ext)) if opp_pre_ext and opp_post_ext else float("nan") yearly_table = "| Year | N (RW) | Centrist Support (Strict) | Extremity | Right Support | Left Opp. |\n" yearly_table += "|------|--------|---------------------------|-----------|---------------|----------|\n" for y in years: n = _val(yearly_sum, y, "n") cs = _val(yearly_sum, y, "mean_centrist_support_strict") ext = _val(yearly_sum, y, "mean_extremity") rs = _val(yearly_sum, y, "mean_right_support") lo = _val(yearly_sum, y, "mean_left_opposition") cs_str = f"{cs:.3f}" if not np.isnan(cs) else "N/A" ext_str = f"{ext:.2f}" if not np.isnan(ext) else "N/A" rs_str = f"{rs:.3f}" if not np.isnan(rs) else "N/A" lo_str = f"{lo:.3f}" if not np.isnan(lo) else "N/A" yearly_table += f"| {y} | {int(n)} | {cs_str} | {ext_str} | {rs_str} | {lo_str} |\n" bucket_order = EXTREMITY_BUCKET_ORDER ext_table = "| Bucket | Period | N | Mean CS | Median CS | P25 | P75 |\n" ext_table += "|--------|--------|---|---------|-----------|---|-----|\n" for b in bucket_order: pre_arr = np.array(ext_stratified["pre-2024"].get(b, [])) post_arr = np.array(ext_stratified["post-2024"].get(b, [])) n_pre, n_post = len(pre_arr), len(post_arr) if n_pre > 0: p_mean, p_med = np.mean(pre_arr), np.median(pre_arr) p_p25, p_p75 = np.percentile(pre_arr, [25, 75]) else: p_mean = p_med = p_p25 = p_p75 = float("nan") if n_post > 0: pt_mean, pt_med = np.mean(post_arr), np.median(post_arr) pt_p25, pt_p75 = np.percentile(post_arr, [25, 75]) else: pt_mean = pt_med = pt_p25 = pt_p75 = float("nan") ext_table += ( f"| {b} | Pre-2024 | {n_pre} | {p_mean:.3f} | {p_med:.3f} | " f"{p_p25:.3f} | {p_p75:.3f} |\n" ) ext_table += ( f"| | Post-2024 | {n_post} | {pt_mean:.3f} | {pt_med:.3f} | " f"{pt_p25:.3f} | {pt_p75:.3f} |\n" ) audit_table = "| # | Year | Category | LLM Score | Bucket | Agreed? | Driver |\n" audit_table += "|---|------|----------|-----------|--------|---------|--------|\n" for i, m in enumerate(audit_sample, 1): audit_table += f"| {i} | {m['year']} | {m['category']} | {m['extremity']} | {m['bucket']} | | |\n" lines = [ "# Overton Window Breakpoint Analysis", "", "**Goal:** Quantify the 2024 structural break in centrist support", "and content extremity for right-wing motions in the Tweede Kamer.", "", "**Analysis period:** 2016–2026", "**Right-wing parties:** PVV, FVD, JA21, SGP", "**Centrist parties:** VVD, D66, CDA, NSC, BBB, CU", "**Left parties:** PvdA, GL, SP, PvdD, Volt, DENK, Bij1", "", "---", "", "## 1. Yearly Aggregate Metrics (All Right-Wing Motions)", "", yearly_table, "", "## 2. Pre/Post 2024 Comparison", "", f"**Break year:** {BREAK_YEAR}", "", "### All right-wing motions", "", f"| Metric | Pre-2024 Mean | Post-2024 Mean | Δ | Cohen's d |", f"|--------|--------------|---------------|-----|-----------|", f"| Centrist Support | {np.mean(rw_pre_cs):.3f} | {np.mean(rw_post_cs):.3f} | {np.mean(rw_post_cs) - np.mean(rw_pre_cs):+.3f} | {d_cs:+.2f} |", f"| Extremity | {np.mean(rw_pre_ext):.2f} | {np.mean(rw_post_ext):.2f} | {np.mean(rw_post_ext) - np.mean(rw_pre_ext):+.2f} | {d_ext:+.2f} |", "", f"**Interpretation:** Cohen's d values quantify effect sizes (|d| < 0.2 small, 0.5 medium, > 0.8 large).", f"These are descriptive, not inferential — with only {len(pre_years)} pre-2024 years and {len(post_years)} post-2024 years, statistical significance is not claimed.", "", "### Opposition-only right-wing motions", "", f"| Metric | Pre-2024 Mean | Post-2024 Mean | Δ | Cohen's d | N pre / N post |", f"|--------|--------------|---------------|-----|-----------|---------------|", f"| Centrist Support | {np.mean(opp_pre_cs):.3f} | {np.mean(opp_post_cs):.3f} | {np.mean(opp_post_cs) - np.mean(opp_pre_cs):+.3f} | {d_opp_cs:+.2f} | {len(opp_pre_cs)} / {len(opp_post_cs)} |", f"| Extremity | {np.mean(opp_pre_ext):.2f} | {np.mean(opp_post_ext):.2f} | {np.mean(opp_post_ext) - np.mean(opp_pre_ext):+.2f} | {d_opp_ext:+.2f} | {len(opp_pre_ext)} / {len(opp_post_ext)} |", "", "**Interpretation gate:** If opposition metrics also rise post-2024, the shift is not", "purely coalition-driven. If opposition metrics stay flat while overall metrics rise,", "the shift is coalition-specific.", "", "## 3. Coalition Composition", "", COALITION_NOTE, "", "Submitter party is parsed from motion title prefixes", "(e.g., \"Motie van het lid Wilders over ...\"). Only the lead submitter's party is", "considered. Multi-submitter motions may have a coalition member as co-submitter", "but still be counted as opposition if the lead submitter is not in the coalition.", "", "## 4. Domain Decomposition", "", "Migration = category `asiel/vreemdelingen`. Non-migration = all other categories.", "", "| Domain | Pre-2024 Mean CS | Post-2024 Mean CS | Δ CS |", "|--------|-----------------|------------------|------|", ] for domain_name, domain_sum in [("Migration", mig_sum), ("Non-migration", non_mig_sum)]: pre_cs = np.nanmean([_val(domain_sum, y, "mean_centrist_support_strict") for y in pre_years]) post_cs = np.nanmean([_val(domain_sum, y, "mean_centrist_support_strict") for y in post_years]) lines.append( f"| {domain_name} | {pre_cs:.3f} | {post_cs:.3f} | {post_cs - pre_cs:+.3f} |" ) lines += [ "", "## 5. Extremity-Stratified Centrist Support", "", ext_table, "", "**Key test:** If centrist support for high-extremity motions (3-5) rose", "disproportionately post-2024 while centrist support for mild motions stayed flat,", "centrists are more tolerant of extreme content — direct Overton shift evidence.", "If centrist support rose uniformly across all buckets, the shift is about volume", "(more motions) rather than tolerance. If only the 1-2 bucket rose, right-wing", "parties filed milder motions post-2024 and the 'shift' is illusory.", ] left_years_sorted = sorted(left_yearly.keys()) left_pre_years_list = [y for y in pre_years if y in left_yearly] left_post_years_list = [y for y in post_years if y in left_yearly] left_pre_vals = [left_yearly[y]["mean_left_support"] for y in left_pre_years_list] left_post_vals = [left_yearly[y]["mean_left_support"] for y in left_post_years_list] left_pre_mean = np.mean(left_pre_vals) if left_pre_vals else float("nan") left_post_mean = np.mean(left_post_vals) if left_post_vals else float("nan") left_delta = left_post_mean - left_pre_mean left_table = "| Year | N | Mean left_support_mp |\n" left_table += "|------|---|---------------------|\n" for y in left_years_sorted: ls = left_yearly[y]["mean_left_support"] n = left_yearly[y]["n"] left_table += f"| {y} | {int(n)} | {ls:.4f} |\n" lines += [ "", "## 6. Left-wing support for right-wing motions", "", left_table, "", f"| Metric | Pre-2024 Mean | Post-2024 Mean | Δ |", f"|--------|--------------|---------------|-----|", f"| Left Support (MP) | {left_pre_mean:.4f} | {left_post_mean:.4f} | {left_delta:+.4f} |", "", f"**Interpretation:** Left parties moved from {left_pre_mean:.1%} to {left_post_mean:.1%} " f"support — a {abs(left_delta):.1f} point shift. " "Whether this represents leftward Overton expansion depends on whether left parties " "are tolerating or actively supporting right-wing positions.", "", f"![Figure 3: Left-wing party support for right-wing motions]({Path(fig3_path).name})", "", "## 7. Manual Extremity Audit", "", audit_notes, "", audit_table, "", "## 8. Limitations", "", "- **Small-N time series:** 8 pre-2024 years and at most 3 post-2024 years (2026 is partial).", " Effect sizes are descriptive, not confirmatory.", "- **LLM extremity scores:** Content-based, not independently validated beyond the", " manual audit above. See §7 for agreement rate and noted biases.", "- **Coalition composition:** Hardcoded per year. 2024 is ambiguous (Rutte IV until July,", " Schoof thereafter). Early 2024 motions may be miscoded as Schoof-era.", "- **Submitter party identification:** Parsed from motion title prefixes (e.g.,", " 'Motie van het lid X'). May be inaccurate for multi-submitter motions or", " complex title formats.", "- **Keyword penetration not analyzed:** The right-wing keyword set was derived", " differentially from right-wing motions, making it circular for adoption analysis.", "", "## 9. Figures", "", f"![Figure 1: Centrist Support Over Time]({Path(fig1_path).name})", f"![Figure 2: Extremity Trends and Stratified Centrist Support]({Path(fig2_path).name})", f"![Figure 3: Left-wing party support for right-wing motions]({Path(fig3_path).name})", "", "## 10. Conclusion", "", "*(Fill in after reviewing all indicators and audit results.)*", ] report_path = REPORTS_DIR / "breakpoint_analysis.md" with open(report_path, "w") as f: f.write("\n".join(lines)) logger.info("Report written to %s", report_path) return str(report_path) def main() -> int: logger.info("Connecting to database: %s", DB_PATH) con = _conn(read_only=True) logger.info("Computing yearly right-wing metrics...") yearly_raw = compute_yearly_rw_metrics(con) logger.info("Computing baseline (all motions) metrics...") baseline_raw = compute_yearly_baseline(con) logger.info("Building party name map from mp_metadata...") name_party_map = build_party_name_map(con) logger.info("Computing opposition-only metrics...") opp_raw = compute_opposition_metrics(yearly_raw, name_party_map) logger.info("Computing domain decomposition...") mig_raw, non_mig_raw = compute_domain_metrics(yearly_raw) logger.info("Computing extremity-stratified pass rates...") ext_stratified = compute_extremity_stratified(yearly_raw) logger.info("Computing left-support yearly averages...") left_yearly = compute_left_support_yearly(con) con.close() yearly_sum = yearly_summary(yearly_raw) opp_sum = yearly_summary(opp_raw) mig_sum = yearly_summary(mig_raw) non_mig_sum = yearly_summary(non_mig_raw) baseline_sum = yearly_summary(baseline_raw) logger.info("Generating Figure 1...") fig1_path = create_figure_1(yearly_sum, opp_sum, mig_sum, non_mig_sum, baseline_sum) logger.info("Generating Figure 2...") fig2_path = create_figure_2(yearly_sum, opp_sum, mig_sum, non_mig_sum, ext_stratified) logger.info("Generating Figure 3...") fig3_path = create_figure_3(left_yearly) logger.info("Sampling motions for manual audit...") audit_sample = sample_audit(yearly_raw) print_audit(audit_sample) logger.info("Generating report...") audit_notes = ( "**Audit notes:** Perform manual audit by reviewing the motions below. " "Record agreement per motion. Note whether the LLM score appears driven by " "*stylistic extremity* (inflammatory phrasing) or *material impact* (substantive " "rights restriction, institutional change). " "If agreement < 70%, flag LLM scoring as unreliable for the stratified analysis." ) report_path = generate_report( yearly_sum=yearly_sum, opp_sum=opp_sum, mig_sum=mig_sum, non_mig_sum=non_mig_sum, baseline_sum=baseline_sum, ext_stratified=ext_stratified, yearly_raw=yearly_raw, opp_raw=opp_raw, left_yearly=left_yearly, fig1_path=fig1_path, fig2_path=fig2_path, fig3_path=fig3_path, audit_sample=audit_sample, audit_notes=audit_notes, ) print(f"\nReport: {report_path}") print(f"Figure 1: {fig1_path}") print(f"Figure 2: {fig2_path}") print(f"Figure 3: {fig3_path}") return 0 if __name__ == "__main__": raise SystemExit(main())