#!/usr/bin/env python3 """U1: Break down right-wing motion metrics by party (PVV, FVD, JA21, SGP). Usage: uv run python analysis/right_wing/party_differentiation.py Output: reports/overton_window/party_differentiation.md reports/overton_window/party_differentiation_figure.png """ from __future__ import annotations import logging import re import sys from pathlib import Path from typing import Any import duckdb import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt import numpy as np ROOT = Path(__file__).parent.parent.parent.resolve() if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from analysis.right_wing.common import ( BREAK_YEAR, YEAR_MIN, YEAR_MAX, DB_PATH, REPORTS_DIR, _conn, build_party_name_map, ) from analysis.config import CANONICAL_RIGHT, PARTY_COLOURS, _PARTY_NORMALIZE logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger(__name__) REPORTS_DIR.mkdir(parents=True, exist_ok=True) RIGHT_PARTIES = sorted(CANONICAL_RIGHT) TITLE_PATTERNS = [ r"(?:Gewijzigde|Nader\s+gewijzigde)?\s*Motie\s+van\s+het\s+lid\s+(.+?)\s+(?:c\.s\.\s+)?over\b", r"(?:Gewijzigde|Nader\s+gewijzigde)?\s*Motie\s+van\s+de\s+leden\s+(.+?)\s+(?:c\.s\.\s+)?over\b", r"Amendement\s+van\s+het\s+lid\s+(.+?)\s+over\b", r"Amendement\s+van\s+de\s+leden\s+(.+?)\s+over\b", ] def parse_submitter_party(title: str, name_party_map: dict[str, str]) -> str | None: if not title: return None for pat in TITLE_PATTERNS: m = re.search(pat, title) if m: submitter_str = m.group(1).strip() parts = submitter_str.split(" en ") first_name = parts[0].strip() first_name = re.sub(r"\s+c\.s\.", "", first_name).strip() if not first_name: continue raw_party = name_party_map.get(first_name) if raw_party: return _PARTY_NORMALIZE.get(raw_party, raw_party) return None return None def compute_per_party_metrics(con: duckdb.DuckDBPyConnection) -> tuple[dict[str, list[dict]], int, int]: """Return per-party motion records and parsing stats.""" rows = con.execute(""" SELECT r.motion_id, r.year, r.title, r.centrist_support_strict, r.category, e.stijl_extremiteit, e.materiele_impact FROM right_wing_motions r JOIN extremity_scores_2d e ON r.motion_id = e.motion_id WHERE r.classified = TRUE AND r.year IS NOT NULL AND r.title IS NOT NULL """).fetchall() logger.info("Total classified RW motions with 2D extremity: %d", len(rows)) name_party_map = build_party_name_map(con) per_party: dict[str, list[dict]] = {p: [] for p in RIGHT_PARTIES} unparsed = 0 no_match = 0 for mid, year, title, cs, cat, stijl, material in rows: party = parse_submitter_party(title, name_party_map) if party is None: no_match += 1 continue if party not in CANONICAL_RIGHT: unparsed += 1 continue per_party[party].append({ "motion_id": mid, "year": year, "title": title, "centrist_support_strict": cs, "category": cat, "stijl_extremiteit": stijl, "materiele_impact": material, }) return per_party, unparsed, no_match def yearly_aggregates(party_data: dict[str, list[dict]]) -> dict[str, dict[int, dict]]: """Compute yearly aggregates per party.""" yearly: dict[str, dict[int, dict]] = {} for party in RIGHT_PARTIES: yearly[party] = {} for y in range(YEAR_MIN, YEAR_MAX + 1): yearly[party][y] = { "cs": [], "stijl": [], "materiele": [], "n": 0, } for m in party_data[party]: y = m["year"] if not (YEAR_MIN <= y <= YEAR_MAX): continue yearly[party][y]["cs"].append(m["centrist_support_strict"]) yearly[party][y]["stijl"].append(m["stijl_extremiteit"]) yearly[party][y]["materiele"].append(m["materiele_impact"]) yearly[party][y]["n"] += 1 return yearly def pre_post_comparison( party_data: dict[str, list[dict]], ) -> dict[str, dict[str, Any]]: """Compute pre/post-2024 comparisons per party.""" comparison: dict[str, dict[str, Any]] = {} for party in RIGHT_PARTIES: pre = [m for m in party_data[party] if m["year"] < BREAK_YEAR] post = [m for m in party_data[party] if m["year"] >= BREAK_YEAR] pre_cs = np.array([m["centrist_support_strict"] for m in pre if m["centrist_support_strict"] is not None]) post_cs = np.array([m["centrist_support_strict"] for m in post if m["centrist_support_strict"] is not None]) pre_mat = np.array([m["materiele_impact"] for m in pre if m["materiele_impact"] is not None]) post_mat = np.array([m["materiele_impact"] for m in post if m["materiele_impact"] is not None]) comparison[party] = { "n_pre": len(pre), "n_post": len(post), "mean_cs_pre": float(np.mean(pre_cs)) if len(pre_cs) > 0 else float("nan"), "mean_cs_post": float(np.mean(post_cs)) if len(post_cs) > 0 else float("nan"), "delta_cs": float(np.mean(post_cs) - np.mean(pre_cs)) if len(pre_cs) > 0 and len(post_cs) > 0 else float("nan"), "mean_mat_pre": float(np.mean(pre_mat)) if len(pre_mat) > 0 else float("nan"), "mean_mat_post": float(np.mean(post_mat)) if len(post_mat) > 0 else float("nan"), "delta_mat": float(np.mean(post_mat) - np.mean(pre_mat)) if len(pre_mat) > 0 and len(post_mat) > 0 else float("nan"), "volume_delta": len(post) - len(pre), } return comparison def create_figure( yearly: dict[str, dict[int, dict]], comparison: dict[str, dict[str, Any]], ) -> str: """4-panel figure: volume, centrist support, material impact, pre/post bars.""" years = list(range(YEAR_MIN, YEAR_MAX + 1)) years_arr = np.array(years) party_colours = { "PVV": PARTY_COLOURS.get("PVV", "#002366"), "FVD": PARTY_COLOURS.get("FVD", "#6A1B9A"), "JA21": PARTY_COLOURS.get("JA21", "#7B1FA2"), "SGP": PARTY_COLOURS.get("SGP", "#F4511E"), } marker_map = {"PVV": "o", "FVD": "s", "JA21": "^", "SGP": "D"} fig, axes = plt.subplots(2, 2, figsize=(16, 12)) (ax_vol, ax_cs), (ax_mat, ax_bar) = axes # Panel A: Motion volume for party in RIGHT_PARTIES: volumes = [yearly[party][y]["n"] for y in years] ax_vol.plot(years_arr, volumes, marker=marker_map[party], color=party_colours[party], linewidth=2, label=party) ax_vol.axvline(x=BREAK_YEAR - 0.5, color="black", linestyle=":", alpha=0.5, linewidth=1) ax_vol.set_xlabel("Year") ax_vol.set_ylabel("Motion count") ax_vol.set_title("A: Motion Volume by Party Over Time", fontweight="bold") ax_vol.legend(fontsize=9) ax_vol.grid(True, alpha=0.3) ax_vol.set_xticks(years_arr) ax_vol.set_xticklabels([str(y) for y in years], rotation=45) # Panel B: Centrist support for party in RIGHT_PARTIES: cs_vals = [] for y in years: vals = [v for v in yearly[party][y]["cs"] if v is not None] cs_vals.append(np.mean(vals) if vals else np.nan) ax_cs.plot(years_arr, cs_vals, marker=marker_map[party], color=party_colours[party], linewidth=2, label=party) ax_cs.axvline(x=BREAK_YEAR - 0.5, color="black", linestyle=":", alpha=0.5, linewidth=1) ax_cs.set_xlabel("Year") ax_cs.set_ylabel("Centrist support (strict)") ax_cs.set_title("B: Centrist Support by Party Over Time", fontweight="bold") ax_cs.legend(fontsize=9) ax_cs.set_ylim(0, 1.05) ax_cs.grid(True, alpha=0.3) ax_cs.set_xticks(years_arr) ax_cs.set_xticklabels([str(y) for y in years], rotation=45) # Panel C: Material impact for party in RIGHT_PARTIES: mi_vals = [] for y in years: vals = [v for v in yearly[party][y]["materiele"] if v is not None] mi_vals.append(np.mean(vals) if vals else np.nan) ax_mat.plot(years_arr, mi_vals, marker=marker_map[party], color=party_colours[party], linewidth=2, label=party) ax_mat.axvline(x=BREAK_YEAR - 0.5, color="black", linestyle=":", alpha=0.5, linewidth=1) ax_mat.set_xlabel("Year") ax_mat.set_ylabel("Material impact (1-5)") ax_mat.set_title("C: Material Impact by Party Over Time", fontweight="bold") ax_mat.legend(fontsize=9) ax_mat.grid(True, alpha=0.3) ax_mat.set_xticks(years_arr) ax_mat.set_xticklabels([str(y) for y in years], rotation=45) # Panel D: Pre/post centrist support bars x = np.arange(len(RIGHT_PARTIES)) width = 0.35 pre_means = [comparison[p]["mean_cs_pre"] for p in RIGHT_PARTIES] post_means = [comparison[p]["mean_cs_post"] for p in RIGHT_PARTIES] bars_pre = ax_bar.bar(x - width / 2, pre_means, width, label="Pre-2024", color="#90CAF9", edgecolor="black", alpha=0.9) bars_post = ax_bar.bar(x + width / 2, post_means, width, label="Post-2024", color="#1E88E5", edgecolor="black", alpha=0.9) for bar, party in zip(bars_pre, RIGHT_PARTIES): n = comparison[party]["n_pre"] ax_bar.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.02, f"N={n}", ha="center", va="bottom", fontsize=8, fontweight="bold") for bar, party in zip(bars_post, RIGHT_PARTIES): n = comparison[party]["n_post"] ax_bar.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.02, f"N={n}", ha="center", va="bottom", fontsize=8, fontweight="bold") ax_bar.set_xticks(x) ax_bar.set_xticklabels(RIGHT_PARTIES, fontsize=10) ax_bar.set_ylabel("Centrist support (strict)") ax_bar.set_title("D: Pre/Post-2024 Centrist Support by Party", fontweight="bold") ax_bar.legend(fontsize=9) ax_bar.set_ylim(0, 1.05) ax_bar.grid(True, alpha=0.3, axis="y") plt.tight_layout() path = str(REPORTS_DIR / "party_differentiation_figure.png") fig.savefig(path, dpi=150, bbox_inches="tight") plt.close(fig) logger.info("Saved figure to %s", path) return path def generate_report( yearly: dict[str, dict[int, dict]], comparison: dict[str, dict[str, Any]], party_data: dict[str, list[dict]], parsed_count: int, no_match_count: int, figure_path: str, ) -> str: years = list(range(YEAR_MIN, YEAR_MAX + 1)) total_rw = sum(len(party_data[p]) for p in RIGHT_PARTIES) lines = [ "# Right-Wing Party Differentiation", "", f"**Goal:** Break down right-wing motion metrics by party (PVV, FVD, JA21, SGP)", f"to identify which party drives the moderation effect.", "", f"**Analysis period:** {YEAR_MIN}–{YEAR_MAX}", f"**Right-wing parties:** {', '.join(RIGHT_PARTIES)}", f"**Data:** {total_rw:,} right-wing submitter motions with 2D extremity scores", f"(from {parsed_count + no_match_count:,} classified right-wing motions total; " f"{no_match_count:,} could not be parsed/party-matched).", "", "---", "", "## 1. Motion Volume by Party and Year", "", "| Year | " + " | ".join(RIGHT_PARTIES) + " | Total RW |", "|------|" + "|".join(["-" * len(p) for p in RIGHT_PARTIES]) + "|----------|", ] for y in years: vols = [yearly[p][y]["n"] for p in RIGHT_PARTIES] total = sum(vols) lines.append(f"| {y} | {vols[0]} | {vols[1]} | {vols[2]} | {vols[3]} | {total} |") lines += [ "", "---", "", "## 2. Centrist Support (Strict) by Party and Year", "", "| Year | " + " | ".join(RIGHT_PARTIES) + " |", "|------|" + "|".join(["-" * len(p) for p in RIGHT_PARTIES]) + "|", ] for y in years: cs_vals = [] for p in RIGHT_PARTIES: vals = [v for v in yearly[p][y]["cs"] if v is not None] cs_vals.append(np.mean(vals) if vals else float("nan")) cs_strs = [f"{v:.3f}" if not np.isnan(v) else "N/A" for v in cs_vals] lines.append(f"| {y} | {cs_strs[0]} | {cs_strs[1]} | {cs_strs[2]} | {cs_strs[3]} |") lines += [ "", "---", "", "## 3. Material Impact by Party and Year", "", "| Year | " + " | ".join(RIGHT_PARTIES) + " |", "|------|" + "|".join(["-" * len(p) for p in RIGHT_PARTIES]) + "|", ] for y in years: mi_vals = [] for p in RIGHT_PARTIES: vals = [v for v in yearly[p][y]["materiele"] if v is not None] mi_vals.append(np.mean(vals) if vals else float("nan")) mi_strs = [f"{v:.2f}" if not np.isnan(v) else "N/A" for v in mi_vals] lines.append(f"| {y} | {mi_strs[0]} | {mi_strs[1]} | {mi_strs[2]} | {mi_strs[3]} |") lines += [ "", "---", "", "## 4. Pre/Post-2024 Comparison by Party", "", "| Party | N Pre | N Post | CS Pre | CS Post | Delta CS | Mat. Pre | Mat. Post | Delta Mat. | Vol. Delta |", "|-------|-------|--------|--------|---------|----------|----------|-----------|------------|------------|", ] for party in RIGHT_PARTIES: c = comparison[party] lines.append( f"| {party} | {c['n_pre']} | {c['n_post']} | " f"{c['mean_cs_pre']:.3f} | {c['mean_cs_post']:.3f} | " f"{c['delta_cs']:+.3f} | {c['mean_mat_pre']:.2f} | " f"{c['mean_mat_post']:.2f} | {c['delta_mat']:+.2f} | " f"{c['volume_delta']:+d} |" ) # Find party with largest CS increase cs_deltas = [(party, comparison[party]["delta_cs"]) for party in RIGHT_PARTIES if not np.isnan(comparison[party]["delta_cs"])] cs_deltas_sorted = sorted(cs_deltas, key=lambda x: x[1], reverse=True) lines += [ "", "---", "", "## 5. Key Findings", "", ] if cs_deltas_sorted: lines.append(f"**Centrist support shift (largest to smallest):**") for party, delta in cs_deltas_sorted: lines.append(f"- **{party}**: {delta:+.3f}") lines += [ "", "### Volume", ] for party in RIGHT_PARTIES: c = comparison[party] lines.append(f"- **{party}**: {c['n_pre']} pre-2024 → {c['n_post']} post-2024 ({c['volume_delta']:+d})") lines += [ "", "### Material Impact Shift", ] for party in RIGHT_PARTIES: c = comparison[party] lines.append(f"- **{party}**: {c['mean_mat_pre']:.2f} → {c['mean_mat_post']:.2f} ({c['delta_mat']:+.2f})") lines += [ "", "---", "", "## 6. Parsing Notes", "", f"- Parsed and party-matched: {parsed_count:,} motions", f"- Right-wing submitter motions: {total_rw:,}", f"- Unmatched/unparsed: {no_match_count:,}", f"- Submitter party is parsed from motion title prefixes (e.g. 'Motie van het lid Wilders ...').", f"- Multi-submitter motions use the first listed submitter.", f"- Party names are normalized via `_PARTY_NORMALIZE` (e.g. Groep Markuszower → PVV).", "", "---", "", "## 7. Figure", "", f"![Party differentiation figure]({Path(figure_path).name})", "", ] report_path = REPORTS_DIR / "party_differentiation.md" with open(report_path, "w") as f: f.write("\n".join(lines)) logger.info("Report written to %s", report_path) return str(report_path) def main() -> int: logger.info("Connecting to database: %s", DB_PATH) con = _conn(read_only=True) logger.info("Computing per-party metrics...") party_data, unparsed, no_match = compute_per_party_metrics(con) con.close() total_rw = sum(len(party_data[p]) for p in RIGHT_PARTIES) logger.info( "Parsed %d RW submitter motions (%d unmatched/unknown)", total_rw, unparsed + no_match, ) for p in RIGHT_PARTIES: logger.info(" %s: %d motions", p, len(party_data[p])) logger.info("Computing yearly aggregates...") yearly = yearly_aggregates(party_data) logger.info("Computing pre/post-2024 comparisons...") comparison = pre_post_comparison(party_data) logger.info("Generating figure...") fig_path = create_figure(yearly, comparison) logger.info("Generating report...") report_path = generate_report( yearly, comparison, party_data, total_rw, unparsed + no_match, fig_path, ) print(f"\nReport: {report_path}") print(f"Figure: {fig_path}") return 0 if __name__ == "__main__": raise SystemExit(main())