You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
motief/analysis/right_wing/success_correlation.py

517 lines
19 KiB

#!/usr/bin/env python3
"""U6: Test whether motions with high centrist support actually passed at higher rates.
Computes pass_rate for right-wing motions by centrist_support_strict quartile,
tests for a monotonic relationship (Cochran-Armitage trend test), stratifies by
period and government/opposition, and computes the success premium.
Usage:
uv run python -m analysis.right_wing.success_correlation
Output:
reports/overton_window/success_correlation.md
"""
from __future__ import annotations
import json
import logging
import re
import sys
from pathlib import Path
from typing import Any
PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
import duckdb
import numpy as np
from scipy.stats import chi2
from analysis.config import CANONICAL_RIGHT
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
DB_PATH = str(PROJECT_ROOT / "data" / "motions.db")
REPORTS_DIR = PROJECT_ROOT / "reports" / "overton_window"
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
BREAK_YEAR = 2024
COALITION: dict[int, set[str]] = {
2016: {"VVD", "PvdA"},
2017: {"VVD", "PvdA"},
2018: {"VVD", "CDA", "D66", "CU"},
2019: {"VVD", "CDA", "D66", "CU"},
2020: {"VVD", "CDA", "D66", "CU"},
2021: {"VVD", "CDA", "D66", "CU"},
2022: {"VVD", "D66", "CDA", "CU"},
2023: {"VVD", "D66", "CDA", "CU"},
2024: {"PVV", "VVD", "NSC", "BBB"},
2025: {"PVV", "VVD", "NSC", "BBB"},
2026: {"PVV", "VVD", "NSC", "BBB"},
}
def build_party_name_map(con: duckdb.DuckDBPyConnection) -> dict[str, str]:
rows = con.execute("""
SELECT mp_name, party, van, tot_en_met
FROM mp_metadata
WHERE party IS NOT NULL
ORDER BY tot_en_met DESC NULLS LAST, van DESC NULLS LAST
""").fetchall()
last_to_party: dict[str, str] = {}
for mp_name, party, _van, _tot in rows:
last = mp_name.split(",")[0].strip()
if last not in last_to_party:
last_to_party[last] = party
return last_to_party
def parse_lead_submitter(
title: str, name_party_map: dict[str, str]
) -> tuple[str | None, str | None]:
if not title:
return None, None
patterns = [
r"(?:Gewijzigde|Nader\s+gewijzigde)?\s*Motie\s+van\s+het\s+lid\s+(.+?)\s+(?:c\.s\.\s+)?over\b",
r"(?:Gewijzigde|Nader\s+gewijzigde)?\s*Motie\s+van\s+de\s+leden\s+(.+?)\s+(?:c\.s\.\s+)?over\b",
r"Amendement\s+van\s+het\s+lid\s+(.+?)\s+over\b",
r"Amendement\s+van\s+de\s+leden\s+(.+?)\s+over\b",
]
for pat in patterns:
m = re.search(pat, title)
if m:
submitter_str = m.group(1).strip()
parts = submitter_str.split(" en ")
first_name = parts[0].strip()
first_name = re.sub(r"\s+c\.s\.", "", first_name).strip()
if not first_name:
continue
party = name_party_map.get(first_name)
return first_name, party
return None, None
def motion_passed(voting: dict | None, winning_margin: float | None = None) -> bool | None:
if voting is None:
voting = {}
if winning_margin is not None:
return winning_margin > 0
voor = sum(1 for v in voting.values() if v == "voor")
tegen = sum(1 for v in voting.values() if v == "tegen")
if voor + tegen == 0:
return None
return voor > tegen
def cochran_armitage_trend_test(
counts: np.ndarray, totals: np.ndarray, scores: np.ndarray | None = None
) -> dict[str, float]:
"""Cochran-Armitage trend test for monotonic relationship.
counts[i] = number of successes in bin i
totals[i] = total observations in bin i
scores[i] = trend score for bin i (default: 1, 2, 3, ..., k)
"""
k = len(counts)
if scores is None:
scores = np.arange(1, k + 1, dtype=float)
n = totals.sum()
x = counts.sum()
p_hat = x / n if n > 0 else 0.0
expected = totals * p_hat
numerator = np.sum(scores * (counts - expected))
denominator = p_hat * (1 - p_hat) * (np.sum(totals * scores**2) - np.sum(totals * scores) ** 2 / n)
if denominator <= 0 or p_hat in (0.0, 1.0):
return {"statistic": 0.0, "p_value": 1.0, "df": 1}
chi2_stat = numerator**2 / denominator
p_value = 1.0 - chi2.cdf(chi2_stat, 1)
return {"statistic": chi2_stat, "p_value": p_value, "df": 1}
def quartile_bin(cs: float) -> int:
"""Map centrist_support_strict to quartile bin 0-3."""
if cs <= 0.25:
return 0
elif cs <= 0.50:
return 1
elif cs <= 0.75:
return 2
else:
return 3
QUARTILE_LABELS = [
"Q1 [0.00\u20130.25]",
"Q2 (0.25\u20130.50]",
"Q3 (0.50\u20130.75]",
"Q4 (0.75\u20131.00]",
]
def collect_motion_data(
con: duckdb.DuckDBPyConnection, name_party_map: dict[str, str]
) -> list[dict[str, Any]]:
rows = con.execute("""
SELECT
r.motion_id,
r.year,
r.title,
r.centrist_support_strict,
m.voting_results,
m.winning_margin
FROM right_wing_motions r
JOIN motions m ON r.motion_id = m.id
WHERE r.classified = TRUE
AND r.year IS NOT NULL
AND r.centrist_support_strict IS NOT NULL
""").fetchall()
motions: list[dict[str, Any]] = []
for mid, year, title, cs, vr_json, wm in rows:
voting = json.loads(vr_json) if isinstance(vr_json, str) else (vr_json or {})
passed = motion_passed(voting, wm)
submitter_name, submitter_party = parse_lead_submitter(title, name_party_map)
coalition = COALITION.get(int(year), set())
motion_type = None
if submitter_party is not None:
motion_type = "government" if submitter_party in coalition else "opposition"
motions.append({
"motion_id": mid,
"year": int(year),
"centrist_support_strict": float(cs),
"passed": passed,
"submitter_party": submitter_party,
"motion_type": motion_type,
"period": "post-2024" if int(year) >= BREAK_YEAR else "pre-2024",
})
return motions
def compute_quartile_pass_rates(
motions: list[dict], filter_fn=None
) -> dict[str, dict[int, dict[str, Any]]]:
"""Compute pass_rate by centrist_support quartile.
filter_fn: optional (motion) -> bool filter.
Returns dict with keys: 'all', 'pre-2024', 'post-2024', 'government', 'opposition'
when no filter is applied. When filter_fn is given, returns a single key 'filtered'.
"""
if filter_fn is None:
strata = {
"all": lambda m: True,
"pre-2024": lambda m: m["period"] == "pre-2024",
"post-2024": lambda m: m["period"] == "post-2024",
"government": lambda m: m["motion_type"] == "government",
"opposition": lambda m: m["motion_type"] == "opposition",
}
else:
strata = {"filtered": filter_fn}
result: dict[str, dict[int, dict]] = {}
for label, fn in strata.items():
bins: dict[int, dict] = {q: {"passed": 0, "total": 0, "n_determined": 0}
for q in range(4)}
for m in motions:
if not fn(m):
continue
q = quartile_bin(m["centrist_support_strict"])
bins[q]["total"] += 1
if m["passed"] is not None:
bins[q]["n_determined"] += 1
if m["passed"]:
bins[q]["passed"] += 1
for q in range(4):
d = bins[q]
d["pass_rate"] = d["passed"] / d["n_determined"] if d["n_determined"] > 0 else float("nan")
d["undetermined"] = d["total"] - d["n_determined"]
result[label] = bins
return result
def format_pass_rate_table(
strata: dict[str, dict[int, dict]], label_map: dict[str, str] | None = None
) -> str:
if label_map is None:
label_map = {k: k for k in strata}
lines = ["| Stratum | " + " | ".join(QUARTILE_LABELS) + " | N total | Trend \u03c7\u00b2 | p-value |",
"|---------|" + "|".join(["-" * len(lb) for lb in QUARTILE_LABELS]) + "|---------|-----------|---------|"]
for key, bins in strata.items():
prs = []
for q in range(4):
rate = bins[q]["pass_rate"]
nd = bins[q]["n_determined"]
if np.isnan(rate):
prs.append(f"N/A (n={nd})")
else:
prs.append(f"{rate:.1%} (n={nd})")
total = sum(bins[q]["total"] for q in range(4))
nd_total = sum(bins[q]["n_determined"] for q in range(4))
counts = np.array([bins[q]["passed"] for q in range(4)], dtype=float)
totals = np.array([bins[q]["n_determined"] for q in range(4)], dtype=float)
trend = cochran_armitage_trend_test(counts, totals)
label = label_map.get(key, key)
if trend["p_value"] < 0.001:
p_str = "<0.001"
else:
p_str = f"{trend['p_value']:.3f}"
lines.append(
f"| {label} | " + " | ".join(prs) + f" | {nd_total} | {trend['statistic']:.2f} | {p_str} |"
)
return "\n".join(lines)
def compute_success_premium(
strata: dict[str, dict[int, dict]]
) -> dict[str, float]:
premiums: dict[str, float] = {}
for key, bins in strata.items():
low_rate = bins[0]["pass_rate"] # Q1
high_rate = bins[3]["pass_rate"] # Q4
if not np.isnan(low_rate) and not np.isnan(high_rate):
premiums[key] = high_rate - low_rate
else:
premiums[key] = float("nan")
return premiums
def generate_report(
all_strata: dict[str, dict[int, dict]],
premium: dict[str, float],
n_total: int,
n_with_outcome: int,
n_passed: int,
overall_pass_rate: float,
n_government: int,
n_opposition: int,
n_unknown_type: int,
) -> str:
lines = [
"# Motion Success Correlation Analysis",
"",
"**Goal:** Test whether motions with high centrist support actually passed at higher rates,",
"validating that centrist support translates to legislative success.",
"",
f"**Analysis period:** 2016\u20132026",
f"**Total right-wing motions:** {n_total}",
f"**Motions with determinable outcome:** {n_with_outcome}",
f"**Motions passed:** {n_passed} ({overall_pass_rate:.1%})",
f"**Government motions:** {n_government} \u00b7 **Opposition motions:** {n_opposition} \u00b7 **Unknown type:** {n_unknown_type}",
"",
"---",
"",
"## 1. Pass Rate by Centrist Support Quartile",
"",
"Centrist support (strict) is the fraction of centrist parties that voted 'voor'.",
"Quartile bins are: [0-0.25], (0.25-0.50], (0.50-0.75], (0.75-1.0].",
"",
format_pass_rate_table(all_strata),
"",
"**Cochran-Armitage trend test:** Tests for a monotonic trend in pass rates across",
"ordered quartile bins. A significant result (p < 0.05) indicates that pass rates",
"increase or decrease systematically with centrist support level.",
"",
"---",
"",
"## 2. Success Premium",
"",
'The "success premium" is the difference in pass_rate between the highest centrist',
"support quartile (Q4) and the lowest (Q1): pass_rate(Q4) - pass_rate(Q1).",
"",
]
lines.append("| Stratum | Q1 Pass Rate | Q4 Pass Rate | Premium |")
lines.append("|---------|-------------|-------------|---------|")
for key in ["all", "pre-2024", "post-2024", "government", "opposition"]:
if key in all_strata:
q1 = all_strata[key][0]["pass_rate"]
q4 = all_strata[key][3]["pass_rate"]
p = premium[key]
q1s = f"{q1:.1%}" if not np.isnan(q1) else "N/A"
q4s = f"{q4:.1%}" if not np.isnan(q4) else "N/A"
ps = f"{p:+.1%}" if not np.isnan(p) else "N/A"
lines.append(f"| {key} | {q1s} | {q4s} | {ps} |")
lines += [
"",
"Positive premium \u2192 higher centrist support correlates with higher pass rate.",
"Negative premium \u2192 higher centrist support correlates with lower pass rate.",
"",
"---",
"",
"## 3. Period Stratification (Pre vs Post-2024)",
"",
"Pre-2024: 2016\u20132023 (Rutte cabinets II\u2013IV).",
"Post-2024: 2024\u20132026 (Schoof cabinet, PVV in coalition).",
"",
"The post-2024 period has far more right-wing motions (volume surge).",
"If the success premium differs between periods, the structural break",
"affected not just centrist willingness to support but also motion outcomes.",
"",
"---",
"",
"## 4. Government vs Opposition Control",
"",
"Government motions come from coalition party members and generally have higher",
"baseline pass rates. Opposition motions are the true test: if high centrist support",
"predicts passage for opposition motions, centrist backing is decisive.",
"",
"Motion type is determined by parsing the lead submitter from the title prefix",
"(e.g., 'Motie van het lid Wilders over ...').",
"",
"---",
"",
"## 5. Interpretation",
"",
]
all_bins = all_strata["all"]
all_counts = np.array([all_bins[q]["passed"] for q in range(4)], dtype=float)
all_totals_arr = np.array([all_bins[q]["n_determined"] for q in range(4)], dtype=float)
trend = cochran_armitage_trend_test(all_counts, all_totals_arr)
if trend["p_value"] < 0.05:
direction = "positive" if premium.get("all", 0) > 0 else "negative"
lines.append(
f"The Cochran-Armitage trend test is significant (\u03c7\u00b2={trend['statistic']:.2f}, "
f"p={trend['p_value']:.3f}), indicating a {direction} monotonic relationship "
f"between centrist support and pass rate. The success premium is "
f"{premium.get('all', 0):+.1%}."
)
else:
lines.append(
f"The Cochran-Armitage trend test is not significant (\u03c7\u00b2={trend['statistic']:.2f}, "
f"p={trend['p_value']:.3f}). There is no evidence of a monotonic relationship "
f"between centrist support and pass rate. This is consistent with the observation "
f"that virtually all motions pass in the Dutch parliament (ceiling effect)."
)
if "opposition" in all_strata:
opp_bins = all_strata["opposition"]
opp_counts = np.array([opp_bins[q]["passed"] for q in range(4)], dtype=float)
opp_totals_arr = np.array([opp_bins[q]["n_determined"] for q in range(4)], dtype=float)
opp_trend = cochran_armitage_trend_test(opp_counts, opp_totals_arr)
lines.append("")
lines.append(
f"For opposition motions specifically, the trend test "
f"is {'significant' if opp_trend['p_value'] < 0.05 else 'not significant'} "
f"(\u03c7\u00b2={opp_trend['statistic']:.2f}, p={opp_trend['p_value']:.3f})."
)
paths = [p for p in all_strata if p.startswith("pre") or p.startswith("post")]
lines.append("")
lines.append("### Period Comparison")
for p in paths:
bins = all_strata[p]
p_counts = np.array([bins[q]["passed"] for q in range(4)], dtype=float)
p_totals_arr = np.array([bins[q]["n_determined"] for q in range(4)], dtype=float)
p_trend = cochran_armitage_trend_test(p_counts, p_totals_arr)
n = int(p_totals_arr.sum())
lines.append(
f"- **{p}** (n={n}): \u03c7\u00b2={p_trend['statistic']:.2f}, "
f"p={p_trend['p_value']:.3f}, premium={premium.get(p, float('nan')):+.1%}"
)
lines += [
"",
"---",
"",
"## 6. Limitations",
"",
"- **Ceiling effect:** Dutch parliamentary motions pass at very high rates (>95%),",
" leaving little variance to detect correlation with centrist support.",
"- **Undetermined outcomes:** Some motions had equal votes or no voting data,",
" reducing sample size (excluded from pass rate calculation).",
"- **Submitter parsing:** Lead submitter party identification from title prefixes",
" may misclassify some multi-submitter motions.",
"- **Coalition coding:** 2024 is ambiguous (Rutte IV until July, Schoof thereafter).",
"- **Causality direction:** Correlation does not imply causation. High centrist support",
" could reflect motions that were already likely to pass (centrists voting with the",
" majority), rather than centrist support causing passage.",
"",
"---",
"",
"*Report generated by `analysis/right_wing/success_correlation.py`*",
]
report_path = REPORTS_DIR / "success_correlation.md"
with open(report_path, "w") as f:
f.write("\n".join(lines))
logger.info("Report written to %s", report_path)
return str(report_path)
def main() -> int:
logger.info("Connecting to database: %s", DB_PATH)
con = duckdb.connect(DB_PATH, read_only=True)
logger.info("Building party name map...")
name_party_map = build_party_name_map(con)
logger.info("Collecting motion data...")
motions = collect_motion_data(con, name_party_map)
con.close()
n_total = len(motions)
n_with_outcome = sum(1 for m in motions if m["passed"] is not None)
n_passed = sum(1 for m in motions if m["passed"] is True)
overall_pass_rate = n_passed / n_with_outcome if n_with_outcome > 0 else 0.0
n_government = sum(1 for m in motions if m["motion_type"] == "government")
n_opposition = sum(1 for m in motions if m["motion_type"] == "opposition")
n_unknown_type = sum(1 for m in motions if m["motion_type"] is None)
logger.info(
"Total: %d motions, %d with outcome, %d passed (%.1f%%), gov=%d opp=%d unknown=%d",
n_total, n_with_outcome, n_passed, overall_pass_rate * 100,
n_government, n_opposition, n_unknown_type,
)
all_strata = compute_quartile_pass_rates(motions)
premium = compute_success_premium(all_strata)
for key in ["all", "pre-2024", "post-2024", "government", "opposition"]:
if key in premium:
logger.info("Success premium (%s): %+.1f%%", key, premium[key] * 100)
report_path = generate_report(
all_strata=all_strata,
premium=premium,
n_total=n_total,
n_with_outcome=n_with_outcome,
n_passed=n_passed,
overall_pass_rate=overall_pass_rate,
n_government=n_government,
n_opposition=n_opposition,
n_unknown_type=n_unknown_type,
)
print(f"\nReport: {report_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())