You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
motief/analysis/right_wing/overton_breakpoint_analysis.py

1020 lines
40 KiB

#!/usr/bin/env python3
"""U2: Quantify the 2024 Overton Window breakpoint in Dutch parliament.
Descriptive analysis of centrist support, pass rates, and content extremity
for right-wing motions — with coalition control via opposition-only filtering,
domain decomposition, and a baseline comparison.
Usage:
uv run python analysis/right_wing/overton_breakpoint_analysis.py
Output:
reports/overton_window/breakpoint_analysis.md
reports/overton_window/breakpoint_figure_1.png
reports/overton_window/breakpoint_figure_2.png
"""
from __future__ import annotations
import json
import logging
import random
import re
import sys
from pathlib import Path
from typing import Any
import duckdb
import matplotlib
import numpy as np
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from analysis.config import CANONICAL_LEFT, CANONICAL_RIGHT, PARTY_COLOURS
CANONICAL_CENTRIST = frozenset({"VVD", "D66", "CDA", "NSC", "BBB", "CU"})
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
DB_PATH = str(ROOT / "data" / "motions.db")
REPORTS_DIR = ROOT / "reports" / "overton_window"
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
CANONICAL_CENTRIST_SET = set(CANONICAL_CENTRIST)
EXTREMITY_BUCKET_ORDER = ["1-2 (mild)", "2-3 (moderate)", "3-4 (high)", "4-5 (extreme)"]
def _extremity_bucket(score: float) -> str:
if score < 2:
return "1-2 (mild)"
elif score < 3:
return "2-3 (moderate)"
elif score < 4:
return "3-4 (high)"
else:
return "4-5 (extreme)"
CANONICAL_LEFT_SET = set(CANONICAL_LEFT)
CANONICAL_RIGHT_SET = set(CANONICAL_RIGHT)
COALITION: dict[int, set[str]] = {
2016: {"VVD", "PvdA"},
2017: {"VVD", "PvdA"},
2018: {"VVD", "CDA", "D66", "CU"},
2019: {"VVD", "CDA", "D66", "CU"},
2020: {"VVD", "CDA", "D66", "CU"},
2021: {"VVD", "CDA", "D66", "CU"},
2022: {"VVD", "D66", "CDA", "CU"},
2023: {"VVD", "D66", "CDA", "CU"},
2024: {"PVV", "VVD", "NSC", "BBB"},
2025: {"PVV", "VVD", "NSC", "BBB"},
2026: {"PVV", "VVD", "NSC", "BBB"},
}
COALITION_NOTE = (
"2016-2017: Rutte II (VVD/PvdA). "
"2018-2021: Rutte III (VVD/CDA/D66/CU). "
"2022-2023: Rutte IV (VVD/D66/CDA/CU). "
"2024-2026: Schoof (PVV/VVD/NSC/BBB). "
"2024 ambiguous: Schoof cabinet started July 2024; all 2024 motions are coded "
"to the Schoof coalition. Coalition effect may be overestimated for early 2024."
)
YEAR_MIN, YEAR_MAX = 2016, 2026
BREAK_YEAR = 2024
def _conn(read_only: bool = True) -> duckdb.DuckDBPyConnection:
return duckdb.connect(DB_PATH, read_only=read_only)
def cohens_d(x: np.ndarray, y: np.ndarray) -> float:
"""Cohen's d effect size."""
pooled = np.sqrt((np.var(x, ddof=1) + np.var(y, ddof=1)) / 2)
if pooled == 0:
return 0.0
return (np.mean(y) - np.mean(x)) / pooled
def compute_yearly_rw_metrics(con: duckdb.DuckDBPyConnection) -> dict[int, dict]:
"""Yearly aggregates for classified right-wing motions.
Joins right_wing_motions with extremity_scores and motions (for pass rate).
"""
rows = con.execute("""
SELECT
r.motion_id,
r.year,
r.title,
r.centrist_support_strict,
r.center_right_support,
r.right_support,
r.left_opposition,
r.category,
e.text_score AS extremity_score,
m.voting_results,
m.winning_margin
FROM right_wing_motions r
JOIN extremity_scores e ON r.motion_id = e.motion_id
JOIN motions m ON r.motion_id = m.id
WHERE r.classified = TRUE
AND r.year IS NOT NULL
AND e.text_score IS NOT NULL
""").fetchall()
yearly: dict[int, dict[str, Any]] = {}
for year in range(YEAR_MIN, YEAR_MAX + 1):
yearly[year] = {
"centrist_support_strict": [],
"center_right_support": [],
"right_support": [],
"left_opposition": [],
"extremity": [],
"passed": [],
"categories": [],
"titles": [],
"motion_ids": [],
}
for mid, year, title, cst, crs, rs, lo, cat, ext, vr_json, wm in rows:
if year is None or year < YEAR_MIN or year > YEAR_MAX:
continue
yearly[year]["centrist_support_strict"].append(cst if cst is not None else np.nan)
yearly[year]["center_right_support"].append(crs if crs is not None else np.nan)
yearly[year]["right_support"].append(rs if rs is not None else np.nan)
yearly[year]["left_opposition"].append(lo if lo is not None else np.nan)
yearly[year]["extremity"].append(ext if ext is not None else np.nan)
yearly[year]["categories"].append(cat or "other")
yearly[year]["titles"].append(title or "")
yearly[year]["motion_ids"].append(mid)
if vr_json is not None:
voting = json.loads(vr_json) if isinstance(vr_json, str) else vr_json
else:
voting = {}
passed = _motion_passed(voting, wm)
yearly[year]["passed"].append(passed)
return yearly
def compute_yearly_baseline(con: duckdb.DuckDBPyConnection) -> dict[int, dict]:
"""Baseline: centrist support across ALL motions (not just RW)."""
yearly: dict[int, dict] = {}
for year in range(YEAR_MIN, YEAR_MAX + 1):
yearly[year] = {"centrist_support": []}
centrist_rows = con.execute("""
SELECT
mv.motion_id,
EXTRACT(YEAR FROM mv.date) AS year,
mv.party,
COUNT(*) AS n,
mv.vote
FROM mp_votes mv
WHERE mv.party IS NOT NULL
AND mv.date IS NOT NULL
GROUP BY mv.motion_id, EXTRACT(YEAR FROM mv.date), mv.party, mv.vote
""").fetchall()
motion_party_votes: dict[int, dict[str, dict[str, int]]] = {}
motion_year_map: dict[int, int] = {}
for mid, year, party, n, vote in centrist_rows:
year = int(year)
if year < YEAR_MIN or year > YEAR_MAX:
continue
mv = motion_party_votes.setdefault(mid, {})
pv = mv.setdefault(party, {"voor": 0, "tegen": 0, "afwezig": 0})
pv[vote] = pv.get(vote, 0) + n
motion_year_map[mid] = year
for mid, votes in motion_party_votes.items():
year = motion_year_map.get(mid)
if year is None:
continue
cs = _support_ratio(votes, CANONICAL_CENTRIST_SET)
if cs is not None:
yearly[year]["centrist_support"].append(cs)
return yearly
def _motion_passed(
voting: dict[str, str], winning_margin: float | None = None
) -> bool | None:
"""Determine if a motion passed from voting_results or winning_margin."""
if winning_margin is not None:
return winning_margin > 0
voor = sum(1 for v in voting.values() if v == "voor")
tegen = sum(1 for v in voting.values() if v == "tegen")
if voor + tegen == 0:
return None
return voor > tegen
def _support_ratio(
votes: dict[str, dict[str, int]], parties: set[str]
) -> float | None:
"""Compute support ratio (fraction of parties voting 'voor')."""
total = 0
supportive = 0
for party, pv in votes.items():
if party not in parties:
continue
tv = pv.get("voor", 0) + pv.get("tegen", 0) + pv.get("afwezig", 0)
if tv == 0:
continue
total += 1
if pv.get("voor", 0) / tv >= 0.5:
supportive += 1
if total == 0:
return None
return supportive / total
def build_party_name_map(con: duckdb.DuckDBPyConnection) -> dict[str, str]:
"""Build mapping: last name -> party from mp_metadata."""
rows = con.execute("""
SELECT mp_name, party, van, tot_en_met
FROM mp_metadata
WHERE party IS NOT NULL
ORDER BY tot_en_met DESC NULLS LAST, van DESC NULLS LAST
""").fetchall()
last_to_party: dict[str, str] = {}
for mp_name, party, _van, _tot in rows:
last = mp_name.split(",")[0].strip()
if last not in last_to_party:
last_to_party[last] = party
return last_to_party
def parse_lead_submitter(
title: str, name_party_map: dict[str, str]
) -> tuple[str | None, str | None]:
"""Parse the lead submitter from a motion title and map to party.
Returns (parsed_name, party) or (None, None).
"""
if not title:
return None, None
patterns = [
r"(?:Gewijzigde|Nader\s+gewijzigde)?\s*Motie\s+van\s+het\s+lid\s+(.+?)\s+(?:c\.s\.\s+)?over\b",
r"(?:Gewijzigde|Nader\s+gewijzigde)?\s*Motie\s+van\s+de\s+leden\s+(.+?)\s+(?:c\.s\.\s+)?over\b",
r"Amendement\s+van\s+het\s+lid\s+(.+?)\s+over\b",
r"Amendement\s+van\s+de\s+leden\s+(.+?)\s+over\b",
]
for pat in patterns:
m = re.search(pat, title)
if m:
submitter_str = m.group(1).strip()
parts = submitter_str.split(" en ")
first_name = parts[0].strip()
first_name = re.sub(r"\s+c\.s\.", "", first_name).strip()
if not first_name:
continue
party = name_party_map.get(first_name)
return first_name, party
return None, None
def compute_opposition_metrics(
yearly_raw: dict[int, dict], name_party_map: dict[str, str]
) -> dict[int, dict]:
"""Recompute yearly metrics for opposition-only right-wing motions.
Filters motions where the lead submitter's party is NOT in the coalition.
"""
opp: dict[int, dict[str, list]] = {}
for year in range(YEAR_MIN, YEAR_MAX + 1):
opp[year] = {
"centrist_support_strict": [],
"extremity": [],
"passed": [],
"n": 0,
}
coalition = COALITION
for year, d in yearly_raw.items():
coal = coalition.get(year, set())
for idx in range(len(d["titles"])):
title = d["titles"][idx]
submitter_name, submitter_party = parse_lead_submitter(title, name_party_map)
if submitter_party is None:
continue
if submitter_party in coal:
continue
opp[year]["centrist_support_strict"].append(d["centrist_support_strict"][idx])
opp[year]["extremity"].append(d["extremity"][idx])
opp[year]["passed"].append(d["passed"][idx])
opp[year]["n"] += 1
return opp
def compute_domain_metrics(
yearly_raw: dict[int, dict],
) -> tuple[dict[int, dict], dict[int, dict]]:
"""Split into migration and non-migration domains."""
mig: dict[int, dict[str, list]] = {}
non_mig: dict[int, dict[str, list]] = {}
for year in range(YEAR_MIN, YEAR_MAX + 1):
mig[year] = {"centrist_support_strict": [], "extremity": [], "passed": [], "n": 0}
non_mig[year] = {"centrist_support_strict": [], "extremity": [], "passed": [], "n": 0}
for year, d in yearly_raw.items():
for idx in range(len(d["titles"])):
cat = d["categories"][idx]
target = mig if cat == "asiel/vreemdelingen" else non_mig
target[year]["centrist_support_strict"].append(d["centrist_support_strict"][idx])
target[year]["extremity"].append(d["extremity"][idx])
target[year]["passed"].append(d["passed"][idx])
target[year]["n"] += 1
return mig, non_mig
def compute_extremity_stratified(
yearly_raw: dict[int, dict],
) -> dict[str, dict[str, list]]:
"""Compute centrist_support per extremity bucket, pre vs post 2024."""
pre_post: dict[str, dict[str, list]] = {
"pre-2024": {b: [] for b in EXTREMITY_BUCKET_ORDER},
"post-2024": {b: [] for b in EXTREMITY_BUCKET_ORDER},
}
for year, d in yearly_raw.items():
period = "pre-2024" if year < BREAK_YEAR else "post-2024"
for idx in range(len(d["titles"])):
ext = d["extremity"][idx]
cs = d["centrist_support_strict"][idx]
if np.isnan(ext) or cs is None or (isinstance(cs, float) and np.isnan(cs)):
continue
pre_post[period][_extremity_bucket(ext)].append(cs)
return pre_post
def compute_left_support_yearly(con: duckdb.DuckDBPyConnection) -> dict[int, dict]:
"""Query left_support_mp yearly averages from right_wing_motions."""
rows = con.execute("""
SELECT year, AVG(left_support_mp), COUNT(*)
FROM right_wing_motions
WHERE classified = TRUE AND left_support_mp IS NOT NULL
GROUP BY year ORDER BY year
""").fetchall()
result: dict[int, dict] = {}
for year, avg, n in rows:
year = int(year)
result[year] = {"mean_left_support": avg, "n": n}
return result
def yearly_summary(yearly: dict[int, dict]) -> dict[int, dict]:
"""Compute mean values from raw lists."""
summary: dict[int, dict] = {}
for year, d in yearly.items():
s: dict[str, Any] = {}
for key in ["centrist_support_strict", "center_right_support", "right_support", "left_opposition", "extremity"]:
vals = [v for v in d.get(key, []) if not (isinstance(v, float) and np.isnan(v))]
s[f"mean_{key}"] = np.mean(vals) if vals else float("nan")
passes = [p for p in d.get("passed", []) if p is not None]
s["pass_rate"] = sum(passes) / len(passes) if passes else float("nan")
s["n"] = len(d.get("motion_ids", d.get("centrist_support_strict", [])))
summary[year] = s
return summary
def sample_audit(yearly_raw: dict[int, dict]) -> list[dict]:
"""Stratified random sample: 5 motions per extremity bucket, 20 total."""
bucket_motions: dict[str, list[int]] = {b: [] for b in EXTREMITY_BUCKET_ORDER}
all_motions: list[dict] = []
for year, d in yearly_raw.items():
for idx in range(len(d["titles"])):
ext = d["extremity"][idx]
if np.isnan(ext):
continue
b = _extremity_bucket(ext)
bucket_motions[b].append(len(all_motions))
all_motions.append({
"year": year,
"title": d["titles"][idx],
"category": d["categories"][idx],
"extremity": ext,
})
rng = random.Random(42)
sampled: list[dict] = []
for bucket_name, indices in bucket_motions.items():
n_sample = min(5, len(indices))
chosen = rng.sample(indices, n_sample) if indices else []
for idx in chosen:
m = all_motions[idx].copy()
m["bucket"] = bucket_name
sampled.append(m)
sampled.sort(key=lambda x: (x["bucket"], x["extremity"]))
return sampled
def print_audit(sampled: list[dict]) -> None:
"""Display sampled motions for manual extremity audit."""
print("\n" + "=" * 80)
print(" MANUAL EXTREMITY AUDIT")
print("=" * 80)
print()
print("For each motion below, judge whether you agree with the LLM-assigned extremity bucket.")
print("Also note: does the score reflect stylistic extremity (language) or material impact (policy)?")
print()
from itertools import groupby
for bucket, group in groupby(sampled, key=lambda m: m["bucket"]):
group_list = list(group)
print(f"\n--- {bucket} (n={len(group_list)} sampled) ---")
for i, m in enumerate(group_list, 1):
title = m["title"][:120]
print(f"\n [{i}] Year={m['year']} | Category={m['category']}")
print(f" LLM Score: {m['extremity']}")
print(f" Title: {title}")
print(f" Agree? [Y/N] Driven by: Language / Policy / Both")
print("\n" + "=" * 80)
print(" END OF AUDIT — Record agreement rate and note systematic biases")
print("=" * 80)
def create_figure_1(
yearly_sum: dict[int, dict],
opp_sum: dict[int, dict],
mig_sum: dict[int, dict],
non_mig_sum: dict[int, dict],
baseline_sum: dict[int, dict],
) -> str:
"""Figure 1: Centrist support over time (single panel)."""
years = sorted(yearly_sum.keys())
years_arr = np.array(years)
def _vals(summary, key):
return np.array([summary[y].get(key, np.nan) for y in years])
fig, ax = plt.subplots(figsize=(12, 6))
colour_rw = "#002366"
colour_opp = "#4A90D9"
colour_mig = "#E53935"
colour_non_mig = "#4CAF50"
colour_baseline = "#9E9E9E"
ax.plot(years_arr, _vals(yearly_sum, "mean_centrist_support_strict"),
marker="o", color=colour_rw, linewidth=2, label="All right-wing", zorder=5)
ax.plot(years_arr, _vals(opp_sum, "mean_centrist_support_strict"),
marker="s", color=colour_opp, linewidth=1.5, linestyle="--", label="Opposition-only", zorder=4)
ax.plot(years_arr, _vals(mig_sum, "mean_centrist_support_strict"),
marker="^", color=colour_mig, linewidth=1.5, linestyle=":", label="Migration", zorder=3)
ax.plot(years_arr, _vals(non_mig_sum, "mean_centrist_support_strict"),
marker="v", color=colour_non_mig, linewidth=1.5, linestyle="-.", label="Non-migration", zorder=2)
ax.plot(years_arr, _vals(baseline_sum, "mean_centrist_support"),
color=colour_baseline, linewidth=1, linestyle="dashed", alpha=0.7, zorder=1, label="All motions (baseline)")
ax.plot(years_arr, _vals(yearly_sum, "mean_center_right_support"),
marker="D", color="#FF8F00", linewidth=1.5, linestyle="--", label="Center-right (VVD/BBB)", zorder=3)
ax.axvline(x=BREAK_YEAR - 0.5, color="black", linestyle=":", alpha=0.5, linewidth=1)
ax.annotate("2024", xy=(BREAK_YEAR - 0.3, ax.get_ylim()[1] * 0.95 if ax.get_ylim()[1] > 0 else 0.95),
fontsize=9, color="black", alpha=0.7)
ax.text(0.02, 0.98, "Cohen\u2019s d\nOverall: d=+0.68\nOpposition-only: d=+0.85",
transform=ax.transAxes, fontsize=9, verticalalignment="top",
bbox=dict(boxstyle="round", facecolor="white", alpha=0.8))
ax.set_xlabel("Year")
ax.set_ylabel("Centrist support (strict — fraction of parties)")
ax.set_title("Centrist Support (Strict) for Right-Wing Motions Over Time", fontweight="bold")
ax.legend(loc="lower right", fontsize=8, ncol=2)
ax.set_ylim(0, 1.05)
ax.grid(True, alpha=0.3)
ax.set_xticks(years_arr)
ax.set_xticklabels([str(y) for y in years], rotation=45)
plt.tight_layout()
path = str(REPORTS_DIR / "breakpoint_figure_1.png")
fig.savefig(path, dpi=150, bbox_inches="tight")
plt.close(fig)
logger.info("Saved Figure 1 to %s", path)
return path
def create_figure_2(
yearly_sum: dict[int, dict],
opp_sum: dict[int, dict],
mig_sum: dict[int, dict],
non_mig_sum: dict[int, dict],
ext_stratified: dict[str, dict[str, list]],
) -> str:
"""Figure 2: Extremity over time + Extremity-stratified centrist support (2 panels)."""
years = sorted(yearly_sum.keys())
years_arr = np.array(years)
def _vals(summary, key):
return np.array([summary[y].get(key, np.nan) for y in years])
colour_rw = "#002366"
colour_opp = "#E53935"
colour_mig = "#6A1B9A"
colour_non_mig = "#4CAF50"
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))
ax1.plot(years_arr, _vals(yearly_sum, "mean_extremity"),
marker="o", color=colour_rw, linewidth=2, label="All right-wing", zorder=5)
ax1.plot(years_arr, _vals(opp_sum, "mean_extremity"),
marker="s", color=colour_opp, linewidth=1.5, linestyle="--", label="Opposition-only RW", zorder=4)
ax1.plot(years_arr, _vals(mig_sum, "mean_extremity"),
marker="^", color=colour_mig, linewidth=1.5, linestyle=":", label="Migration", zorder=3)
ax1.plot(years_arr, _vals(non_mig_sum, "mean_extremity"),
marker="v", color=colour_non_mig, linewidth=1.5, linestyle="-.", label="Non-migration", zorder=2)
ax1.axvline(x=BREAK_YEAR - 0.5, color="black", linestyle=":", alpha=0.5, linewidth=1)
ax1.annotate("2024", xy=(BREAK_YEAR - 0.3, ax1.get_ylim()[1] * 0.95 if ax1.get_ylim()[1] > 0 else 4.5),
fontsize=9, color="black", alpha=0.7)
ax1.set_xlabel("Year")
ax1.set_ylabel("Mean Extremity Score")
ax1.set_title("Content Extremity Over Time", fontweight="bold")
ax1.legend(loc="upper left", fontsize=8)
ax1.grid(True, alpha=0.3)
ax1.set_xticks(years_arr)
ax1.set_xticklabels([str(y) for y in years], rotation=45)
bucket_order = EXTREMITY_BUCKET_ORDER
bucket_labels = ["1-2\nmild", "2-3\nmoderate", "3-4\nhigh", "4-5\nextreme"]
bucket_colours = ["#81C784", "#FFB74D", "#E57373", "#BA68C8"]
x = np.arange(len(bucket_order))
width = 0.35
pre_means, pre_ns = [], []
pre_p25s, pre_p75s = [], []
post_means, post_ns = [], []
post_p25s, post_p75s = [], []
for b in bucket_order:
pre_arr = np.array(ext_stratified["pre-2024"].get(b, []))
post_arr = np.array(ext_stratified["post-2024"].get(b, []))
n_pre, n_post = len(pre_arr), len(post_arr)
pre_means.append(np.mean(pre_arr) if n_pre > 0 else 0)
pre_ns.append(n_pre)
pre_p25s.append(np.percentile(pre_arr, 25) if n_pre > 0 else 0)
pre_p75s.append(np.percentile(pre_arr, 75) if n_pre > 0 else 0)
post_means.append(np.mean(post_arr) if n_post > 0 else 0)
post_ns.append(n_post)
post_p25s.append(np.percentile(post_arr, 25) if n_post > 0 else 0)
post_p75s.append(np.percentile(post_arr, 75) if n_post > 0 else 0)
pre_means_a = np.array(pre_means)
post_means_a = np.array(post_means)
pre_lower = np.maximum(pre_means_a - np.array(pre_p25s), 0)
pre_upper = np.maximum(np.array(pre_p75s) - pre_means_a, 0)
post_lower = np.maximum(post_means_a - np.array(post_p25s), 0)
post_upper = np.maximum(np.array(post_p75s) - post_means_a, 0)
pre_yerr = np.vstack([pre_lower, pre_upper])
post_yerr = np.vstack([post_lower, post_upper])
bars_pre = ax2.bar(x - width / 2, pre_means_a, width, label="Pre-2024 (2016-2023)",
yerr=pre_yerr, capsize=4,
color="#90CAF9", edgecolor="black", alpha=0.9)
bars_post = ax2.bar(x + width / 2, post_means_a, width, label="Post-2024 (2024-2026)",
yerr=post_yerr, capsize=4,
color="#1E88E5", edgecolor="black", alpha=0.9)
for bar, n in zip(bars_pre, pre_ns):
ax2.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.01,
f"N={n}", ha="center", va="bottom", fontsize=8, fontweight="bold")
for bar, n in zip(bars_post, post_ns):
ax2.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.01,
f"N={n}", ha="center", va="bottom", fontsize=8, fontweight="bold")
overall_cs_mean = np.average(
_vals(yearly_sum, "mean_centrist_support_strict"),
weights=_vals(yearly_sum, "n"),
)
ax2.axhline(y=overall_cs_mean, color="grey", linestyle="--", alpha=0.7, linewidth=1,
label=f"All-year mean ({overall_cs_mean:.2f})")
ax2.set_xticks(x)
ax2.set_xticklabels(bucket_labels)
ax2.set_ylabel("Centrist Support")
ax2.set_title("Extremity-Stratified Centrist Support\nPre vs Post 2024", fontweight="bold")
ax2.legend(fontsize=8)
ax2.set_ylim(0, 1.05)
ax2.grid(True, alpha=0.3, axis="y")
plt.tight_layout()
path = str(REPORTS_DIR / "breakpoint_figure_2.png")
fig.savefig(path, dpi=150, bbox_inches="tight")
plt.close(fig)
logger.info("Saved Figure 2 to %s", path)
return path
def create_figure_3(
left_yearly: dict[int, dict],
) -> str:
"""Figure 3: Left-party support for right-wing motions (bar chart)."""
years = sorted(left_yearly.keys())
years_arr = np.array(years)
means = np.array([left_yearly[y]["mean_left_support"] for y in years])
ns = np.array([left_yearly[y]["n"] for y in years])
overall_mean = np.average(means, weights=ns) if ns.sum() > 0 else 0.0
fig, ax = plt.subplots(figsize=(12, 6))
bars = ax.bar(years_arr, means, color="#1565C0", edgecolor="white", alpha=0.9)
for bar, n in zip(bars, ns):
ax.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.005,
f"N={int(n)}", ha="center", va="bottom", fontsize=8)
ax.axhline(y=overall_mean, color="#D32F2F", linestyle="--", alpha=0.8, linewidth=1,
label=f"Weighted mean ({overall_mean:.3f})")
ax.axvline(x=BREAK_YEAR - 0.5, color="black", linestyle=":", alpha=0.5, linewidth=1)
ax.annotate("2024", xy=(BREAK_YEAR - 0.3, ax.get_ylim()[1] * 0.95),
fontsize=9, color="black", alpha=0.7)
ax.set_xlabel("Year")
ax.set_ylabel("Mean left_support_mp")
ax.set_title("Left-wing party support for right-wing motions", fontweight="bold")
ax.legend(fontsize=9)
ax.set_xticks(years_arr)
ax.set_xticklabels([str(y) for y in years], rotation=45)
ax.grid(True, alpha=0.3, axis="y")
plt.tight_layout()
path = str(REPORTS_DIR / "breakpoint_figure_3.png")
fig.savefig(path, dpi=150, bbox_inches="tight")
plt.close(fig)
logger.info("Saved Figure 3 to %s", path)
return path
def generate_report(
yearly_sum: dict[int, dict],
opp_sum: dict[int, dict],
mig_sum: dict[int, dict],
non_mig_sum: dict[int, dict],
baseline_sum: dict[int, dict],
ext_stratified: dict[str, dict[str, list]],
yearly_raw: dict[int, dict],
opp_raw: dict[int, dict],
left_yearly: dict[int, dict],
fig1_path: str,
fig2_path: str,
fig3_path: str,
audit_sample: list[dict],
audit_notes: str = "",
) -> str:
"""Generate the breakpoint analysis markdown report."""
years = sorted(yearly_sum.keys())
def _val(summary, year, key):
return summary[year].get(key, np.nan)
pre_years = [y for y in years if y < BREAK_YEAR]
post_years = [y for y in years if y >= BREAK_YEAR]
rw_pre_cs = []
rw_post_cs = []
rw_pre_ext = []
rw_post_ext = []
opp_pre_cs = []
opp_post_cs = []
opp_pre_ext = []
opp_post_ext = []
for y, d in yearly_raw.items():
for idx in range(len(d.get("centrist_support_strict", []))):
cs = d["centrist_support_strict"][idx]
ext = d["extremity"][idx]
if not (isinstance(cs, float) and np.isnan(cs)):
if y < BREAK_YEAR:
rw_pre_cs.append(cs)
else:
rw_post_cs.append(cs)
if not (isinstance(ext, float) and np.isnan(ext)):
if y < BREAK_YEAR:
rw_pre_ext.append(ext)
else:
rw_post_ext.append(ext)
for y, d in opp_raw.items():
for idx in range(len(d.get("centrist_support_strict", []))):
cs = d["centrist_support_strict"][idx]
ext = d["extremity"][idx]
if not (isinstance(cs, float) and np.isnan(cs)):
if y < BREAK_YEAR:
opp_pre_cs.append(cs)
else:
opp_post_cs.append(cs)
if not (isinstance(ext, float) and np.isnan(ext)):
if y < BREAK_YEAR:
opp_pre_ext.append(ext)
else:
opp_post_ext.append(ext)
d_cs = cohens_d(np.array(rw_pre_cs), np.array(rw_post_cs))
d_ext = cohens_d(np.array(rw_pre_ext), np.array(rw_post_ext))
d_opp_cs = cohens_d(np.array(opp_pre_cs), np.array(opp_post_cs)) if opp_pre_cs and opp_post_cs else float("nan")
d_opp_ext = cohens_d(np.array(opp_pre_ext), np.array(opp_post_ext)) if opp_pre_ext and opp_post_ext else float("nan")
yearly_table = "| Year | N (RW) | Centrist Support (Strict) | Extremity | Right Support | Left Opp. |\n"
yearly_table += "|------|--------|---------------------------|-----------|---------------|----------|\n"
for y in years:
n = _val(yearly_sum, y, "n")
cs = _val(yearly_sum, y, "mean_centrist_support_strict")
ext = _val(yearly_sum, y, "mean_extremity")
rs = _val(yearly_sum, y, "mean_right_support")
lo = _val(yearly_sum, y, "mean_left_opposition")
cs_str = f"{cs:.3f}" if not np.isnan(cs) else "N/A"
ext_str = f"{ext:.2f}" if not np.isnan(ext) else "N/A"
rs_str = f"{rs:.3f}" if not np.isnan(rs) else "N/A"
lo_str = f"{lo:.3f}" if not np.isnan(lo) else "N/A"
yearly_table += f"| {y} | {int(n)} | {cs_str} | {ext_str} | {rs_str} | {lo_str} |\n"
bucket_order = EXTREMITY_BUCKET_ORDER
ext_table = "| Bucket | Period | N | Mean CS | Median CS | P25 | P75 |\n"
ext_table += "|--------|--------|---|---------|-----------|---|-----|\n"
for b in bucket_order:
pre_arr = np.array(ext_stratified["pre-2024"].get(b, []))
post_arr = np.array(ext_stratified["post-2024"].get(b, []))
n_pre, n_post = len(pre_arr), len(post_arr)
if n_pre > 0:
p_mean, p_med = np.mean(pre_arr), np.median(pre_arr)
p_p25, p_p75 = np.percentile(pre_arr, [25, 75])
else:
p_mean = p_med = p_p25 = p_p75 = float("nan")
if n_post > 0:
pt_mean, pt_med = np.mean(post_arr), np.median(post_arr)
pt_p25, pt_p75 = np.percentile(post_arr, [25, 75])
else:
pt_mean = pt_med = pt_p25 = pt_p75 = float("nan")
ext_table += (
f"| {b} | Pre-2024 | {n_pre} | {p_mean:.3f} | {p_med:.3f} | "
f"{p_p25:.3f} | {p_p75:.3f} |\n"
)
ext_table += (
f"| | Post-2024 | {n_post} | {pt_mean:.3f} | {pt_med:.3f} | "
f"{pt_p25:.3f} | {pt_p75:.3f} |\n"
)
audit_table = "| # | Year | Category | LLM Score | Bucket | Agreed? | Driver |\n"
audit_table += "|---|------|----------|-----------|--------|---------|--------|\n"
for i, m in enumerate(audit_sample, 1):
audit_table += f"| {i} | {m['year']} | {m['category']} | {m['extremity']} | {m['bucket']} | | |\n"
lines = [
"# Overton Window Breakpoint Analysis",
"",
"**Goal:** Quantify the 2024 structural break in centrist support",
"and content extremity for right-wing motions in the Tweede Kamer.",
"",
"**Analysis period:** 2016–2026",
"**Right-wing parties:** PVV, FVD, JA21, SGP",
"**Centrist parties:** VVD, D66, CDA, NSC, BBB, CU",
"**Left parties:** PvdA, GL, SP, PvdD, Volt, DENK, Bij1",
"",
"---",
"",
"## 1. Yearly Aggregate Metrics (All Right-Wing Motions)",
"",
yearly_table,
"",
"## 2. Pre/Post 2024 Comparison",
"",
f"**Break year:** {BREAK_YEAR}",
"",
"### All right-wing motions",
"",
f"| Metric | Pre-2024 Mean | Post-2024 Mean | Δ | Cohen's d |",
f"|--------|--------------|---------------|-----|-----------|",
f"| Centrist Support | {np.mean(rw_pre_cs):.3f} | {np.mean(rw_post_cs):.3f} | {np.mean(rw_post_cs) - np.mean(rw_pre_cs):+.3f} | {d_cs:+.2f} |",
f"| Extremity | {np.mean(rw_pre_ext):.2f} | {np.mean(rw_post_ext):.2f} | {np.mean(rw_post_ext) - np.mean(rw_pre_ext):+.2f} | {d_ext:+.2f} |",
"",
f"**Interpretation:** Cohen's d values quantify effect sizes (|d| < 0.2 small, 0.5 medium, > 0.8 large).",
f"These are descriptive, not inferential — with only {len(pre_years)} pre-2024 years and {len(post_years)} post-2024 years, statistical significance is not claimed.",
"",
"### Opposition-only right-wing motions",
"",
f"| Metric | Pre-2024 Mean | Post-2024 Mean | Δ | Cohen's d | N pre / N post |",
f"|--------|--------------|---------------|-----|-----------|---------------|",
f"| Centrist Support | {np.mean(opp_pre_cs):.3f} | {np.mean(opp_post_cs):.3f} | {np.mean(opp_post_cs) - np.mean(opp_pre_cs):+.3f} | {d_opp_cs:+.2f} | {len(opp_pre_cs)} / {len(opp_post_cs)} |",
f"| Extremity | {np.mean(opp_pre_ext):.2f} | {np.mean(opp_post_ext):.2f} | {np.mean(opp_post_ext) - np.mean(opp_pre_ext):+.2f} | {d_opp_ext:+.2f} | {len(opp_pre_ext)} / {len(opp_post_ext)} |",
"",
"**Interpretation gate:** If opposition metrics also rise post-2024, the shift is not",
"purely coalition-driven. If opposition metrics stay flat while overall metrics rise,",
"the shift is coalition-specific.",
"",
"## 3. Coalition Composition",
"",
COALITION_NOTE,
"",
"Submitter party is parsed from motion title prefixes",
"(e.g., \"Motie van het lid Wilders over ...\"). Only the lead submitter's party is",
"considered. Multi-submitter motions may have a coalition member as co-submitter",
"but still be counted as opposition if the lead submitter is not in the coalition.",
"",
"## 4. Domain Decomposition",
"",
"Migration = category `asiel/vreemdelingen`. Non-migration = all other categories.",
"",
"| Domain | Pre-2024 Mean CS | Post-2024 Mean CS | Δ CS |",
"|--------|-----------------|------------------|------|",
]
for domain_name, domain_sum in [("Migration", mig_sum), ("Non-migration", non_mig_sum)]:
pre_cs = np.nanmean([_val(domain_sum, y, "mean_centrist_support_strict") for y in pre_years])
post_cs = np.nanmean([_val(domain_sum, y, "mean_centrist_support_strict") for y in post_years])
lines.append(
f"| {domain_name} | {pre_cs:.3f} | {post_cs:.3f} | {post_cs - pre_cs:+.3f} |"
)
lines += [
"",
"## 5. Extremity-Stratified Centrist Support",
"",
ext_table,
"",
"**Key test:** If centrist support for high-extremity motions (3-5) rose",
"disproportionately post-2024 while centrist support for mild motions stayed flat,",
"centrists are more tolerant of extreme content — direct Overton shift evidence.",
"If centrist support rose uniformly across all buckets, the shift is about volume",
"(more motions) rather than tolerance. If only the 1-2 bucket rose, right-wing",
"parties filed milder motions post-2024 and the 'shift' is illusory.",
]
left_years_sorted = sorted(left_yearly.keys())
left_pre_years_list = [y for y in pre_years if y in left_yearly]
left_post_years_list = [y for y in post_years if y in left_yearly]
left_pre_vals = [left_yearly[y]["mean_left_support"] for y in left_pre_years_list]
left_post_vals = [left_yearly[y]["mean_left_support"] for y in left_post_years_list]
left_pre_mean = np.mean(left_pre_vals) if left_pre_vals else float("nan")
left_post_mean = np.mean(left_post_vals) if left_post_vals else float("nan")
left_delta = left_post_mean - left_pre_mean
left_table = "| Year | N | Mean left_support_mp |\n"
left_table += "|------|---|---------------------|\n"
for y in left_years_sorted:
ls = left_yearly[y]["mean_left_support"]
n = left_yearly[y]["n"]
left_table += f"| {y} | {int(n)} | {ls:.4f} |\n"
lines += [
"",
"## 6. Left-wing support for right-wing motions",
"",
left_table,
"",
f"| Metric | Pre-2024 Mean | Post-2024 Mean | Δ |",
f"|--------|--------------|---------------|-----|",
f"| Left Support (MP) | {left_pre_mean:.4f} | {left_post_mean:.4f} | {left_delta:+.4f} |",
"",
f"**Interpretation:** Left parties moved from {left_pre_mean:.1%} to {left_post_mean:.1%} "
f"support — a {abs(left_delta):.1f} point shift. "
"Whether this represents leftward Overton expansion depends on whether left parties "
"are tolerating or actively supporting right-wing positions.",
"",
f"![Figure 3: Left-wing party support for right-wing motions]({Path(fig3_path).name})",
"",
"## 7. Manual Extremity Audit",
"",
audit_notes,
"",
audit_table,
"",
"## 8. Limitations",
"",
"- **Small-N time series:** 8 pre-2024 years and at most 3 post-2024 years (2026 is partial).",
" Effect sizes are descriptive, not confirmatory.",
"- **LLM extremity scores:** Content-based, not independently validated beyond the",
" manual audit above. See §7 for agreement rate and noted biases.",
"- **Coalition composition:** Hardcoded per year. 2024 is ambiguous (Rutte IV until July,",
" Schoof thereafter). Early 2024 motions may be miscoded as Schoof-era.",
"- **Submitter party identification:** Parsed from motion title prefixes (e.g.,",
" 'Motie van het lid X'). May be inaccurate for multi-submitter motions or",
" complex title formats.",
"- **Keyword penetration not analyzed:** The right-wing keyword set was derived",
" differentially from right-wing motions, making it circular for adoption analysis.",
"",
"## 9. Figures",
"",
f"![Figure 1: Centrist Support Over Time]({Path(fig1_path).name})",
f"![Figure 2: Extremity Trends and Stratified Centrist Support]({Path(fig2_path).name})",
f"![Figure 3: Left-wing party support for right-wing motions]({Path(fig3_path).name})",
"",
"## 10. Conclusion",
"",
"*(Fill in after reviewing all indicators and audit results.)*",
]
report_path = REPORTS_DIR / "breakpoint_analysis.md"
with open(report_path, "w") as f:
f.write("\n".join(lines))
logger.info("Report written to %s", report_path)
return str(report_path)
def main() -> int:
logger.info("Connecting to database: %s", DB_PATH)
con = _conn(read_only=True)
logger.info("Computing yearly right-wing metrics...")
yearly_raw = compute_yearly_rw_metrics(con)
logger.info("Computing baseline (all motions) metrics...")
baseline_raw = compute_yearly_baseline(con)
logger.info("Building party name map from mp_metadata...")
name_party_map = build_party_name_map(con)
logger.info("Computing opposition-only metrics...")
opp_raw = compute_opposition_metrics(yearly_raw, name_party_map)
logger.info("Computing domain decomposition...")
mig_raw, non_mig_raw = compute_domain_metrics(yearly_raw)
logger.info("Computing extremity-stratified pass rates...")
ext_stratified = compute_extremity_stratified(yearly_raw)
logger.info("Computing left-support yearly averages...")
left_yearly = compute_left_support_yearly(con)
con.close()
yearly_sum = yearly_summary(yearly_raw)
opp_sum = yearly_summary(opp_raw)
mig_sum = yearly_summary(mig_raw)
non_mig_sum = yearly_summary(non_mig_raw)
baseline_sum = yearly_summary(baseline_raw)
logger.info("Generating Figure 1...")
fig1_path = create_figure_1(yearly_sum, opp_sum, mig_sum, non_mig_sum, baseline_sum)
logger.info("Generating Figure 2...")
fig2_path = create_figure_2(yearly_sum, opp_sum, mig_sum, non_mig_sum, ext_stratified)
logger.info("Generating Figure 3...")
fig3_path = create_figure_3(left_yearly)
logger.info("Sampling motions for manual audit...")
audit_sample = sample_audit(yearly_raw)
print_audit(audit_sample)
logger.info("Generating report...")
audit_notes = (
"**Audit notes:** Perform manual audit by reviewing the motions below. "
"Record agreement per motion. Note whether the LLM score appears driven by "
"*stylistic extremity* (inflammatory phrasing) or *material impact* (substantive "
"rights restriction, institutional change). "
"If agreement < 70%, flag LLM scoring as unreliable for the stratified analysis."
)
report_path = generate_report(
yearly_sum=yearly_sum,
opp_sum=opp_sum,
mig_sum=mig_sum,
non_mig_sum=non_mig_sum,
baseline_sum=baseline_sum,
ext_stratified=ext_stratified,
yearly_raw=yearly_raw,
opp_raw=opp_raw,
left_yearly=left_yearly,
fig1_path=fig1_path,
fig2_path=fig2_path,
fig3_path=fig3_path,
audit_sample=audit_sample,
audit_notes=audit_notes,
)
print(f"\nReport: {report_path}")
print(f"Figure 1: {fig1_path}")
print(f"Figure 2: {fig2_path}")
print(f"Figure 3: {fig3_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())