#!/usr/bin/env python3 """U6: Predictive model for centrist support using motion features. Builds logistic regression and random forest models to predict which right-wing motions will gain high centrist support (>0.5). Usage: uv run python analysis/right_wing/predictive_model.py uv run python analysis/right_wing/predictive_model.py --db data/motions.db Output: reports/overton_window/predictive_model.md reports/overton_window/predictive_model_figure.png """ from __future__ import annotations import json import logging import re import sys from pathlib import Path from typing import Any import duckdb import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt import numpy as np from sklearn.ensemble import RandomForestClassifier from sklearn.linear_model import LogisticRegression from sklearn.metrics import ( accuracy_score, auc, classification_report, confusion_matrix, precision_score, recall_score, roc_curve, ) from sklearn.model_selection import StratifiedKFold, cross_validate, train_test_split from sklearn.preprocessing import LabelEncoder, StandardScaler PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from analysis.right_wing.common import ( BREAK_YEAR, COALITION, DB_PATH, REPORTS_DIR, build_party_name_map as build_name_party_map, parse_lead_submitter, ) logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger(__name__) REPORTS_DIR.mkdir(parents=True, exist_ok=True) RANDOM_SEED = 42 RIGHT_WING_PARTIES = {"PVV", "FVD", "JA21", "SGP"} CATEGORY_SHORT = { "economie/belasting": "economie/bel.", "veiligheid/justitie": "veiligh./just.", "landbouw/stikstof": "landb./stikst.", "asiel/vreemdelingen": "asiel/vreemd.", "defensie/buitenland": "def./buitenland", "zorg/gezondheid": "zorg/gezondh.", "corona/pandemie": "corona/pand.", "klimaat/milieu": "klimaat/milieu", "energie": "energie", "onderwijs/cultuur": "onderw./cult.", "sociaal/jeugd": "sociaal/jeugd", "overig": "overig", "lhbtq/rechten": "lhbtq/rechten", } def load_model_data( db_path: str, ) -> tuple[list[dict[str, Any]], int, int]: con = duckdb.connect(db_path) try: name_party_map = build_name_party_map(con) rows = con.execute(""" SELECT r.motion_id, r.year, r.title, r.category, r.centrist_support_strict, e.stijl_extremiteit, e.materiele_impact, m.body_text FROM right_wing_motions r JOIN extremity_scores_2d e ON r.motion_id = e.motion_id JOIN motions m ON r.motion_id = m.id WHERE r.classified = TRUE AND r.centrist_support_strict IS NOT NULL AND r.year IS NOT NULL """).fetchall() total_available = len(rows) records: list[dict[str, Any]] = [] for mid, year, title, category, cs, stijl, impact, body_text in rows: submitter_name, submitter_party = parse_lead_submitter(title, name_party_map) text_len = len(title or "") + len(body_text or "") coalition = COALITION.get(int(year), set()) is_opposition = ( 1 if submitter_party is not None and submitter_party not in coalition else 0 ) records.append({ "motion_id": mid, "year": int(year), "title": title, "category": category, "centrist_support_strict": float(cs), "stijl_extremiteit": stijl, "materiele_impact": impact, "submitter_party": submitter_party, "text_length": text_len, "is_opposition": is_opposition, }) for r in records: if r["category"] is None: r["category"] = "overig" # Filter to rows with valid submitter_party in right-wing set valid_records = [] for r in records: if r["submitter_party"] is None: continue if r["submitter_party"] not in RIGHT_WING_PARTIES: continue if r["stijl_extremiteit"] is None or r["materiele_impact"] is None: continue valid_records.append(r) logger.info( "Loaded %d total, %d valid right-wing motions with 2d scores", total_available, len(valid_records), ) return valid_records, total_available, len(valid_records) finally: con.close() def build_features(records: list[dict[str, Any]]) -> tuple[np.ndarray, np.ndarray, list[str]]: le = LabelEncoder() categories_encoded = le.fit_transform([r["category"] for r in records]) n_categories = len(le.classes_) category_onehot = np.eye(n_categories)[categories_encoded] category_names = [f"cat_{c}" for c in le.classes_] parties_encoded = le.fit_transform([r["submitter_party"] for r in records]) n_parties = len(le.classes_) party_onehot = np.eye(n_parties)[parties_encoded] party_names = [f"party_{p}" for p in le.classes_] numerical = np.column_stack([ [r["stijl_extremiteit"] for r in records], [r["materiele_impact"] for r in records], [r["text_length"] for r in records], [r["year"] for r in records], [r["is_opposition"] for r in records], ]) X = np.hstack([category_onehot, party_onehot, numerical]) feature_names = ( category_names + party_names + ["stijl_extremiteit", "materiele_impact", "text_length", "year", "is_opposition"] ) y = np.array([1 if r["centrist_support_strict"] > 0.5 else 0 for r in records]) return X, y, feature_names def evaluate_models( X: np.ndarray, y: np.ndarray, feature_names: list[str] ) -> dict[str, Any]: X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=RANDOM_SEED, stratify=y, ) scaler = StandardScaler() cat_start = len([f for f in feature_names if f.startswith("cat_")]) party_start = len([f for f in feature_names if f.startswith("cat_") or f.startswith("party_")]) X_train_scaled = X_train.copy() X_test_scaled = X_test.copy() X_train_scaled[:, party_start:] = scaler.fit_transform(X_train[:, party_start:]) X_test_scaled[:, party_start:] = scaler.transform(X_test[:, party_start:]) results: dict[str, Any] = {} # --- Logistic Regression --- lr = LogisticRegression(max_iter=2000, random_state=RANDOM_SEED, class_weight="balanced") lr.fit(X_train_scaled, y_train) y_pred_lr = lr.predict(X_test_scaled) y_proba_lr = lr.fit(X_train_scaled, y_train).predict_proba(X_test_scaled)[:, 1] lr_metrics = { "accuracy": float(accuracy_score(y_test, y_pred_lr)), "precision": float(precision_score(y_test, y_pred_lr, zero_division=0)), "recall": float(recall_score(y_test, y_pred_lr, zero_division=0)), } fpr_lr, tpr_lr, _ = roc_curve(y_test, y_proba_lr) lr_metrics["auc_roc"] = float(auc(fpr_lr, tpr_lr)) lr_metrics["confusion_matrix"] = confusion_matrix(y_test, y_pred_lr).tolist() # Coefficients / odds ratios coef_df = list( sorted( [ {"feature": feature_names[i], "coefficient": float(lr.coef_[0][i]), "odds_ratio": float(np.exp(lr.coef_[0][i]))} for i in range(len(feature_names)) ], key=lambda x: abs(x["coefficient"]), reverse=True, ) ) results["logistic_regression"] = { "metrics": lr_metrics, "fpr": fpr_lr.tolist(), "tpr": tpr_lr.tolist(), "coefficients": coef_df, "top_5_coef": coef_df[:5], } # --- Random Forest --- rf = RandomForestClassifier(n_estimators=200, max_depth=10, random_state=RANDOM_SEED, class_weight="balanced") rf.fit(X_train_scaled, y_train) y_pred_rf = rf.predict(X_test_scaled) y_proba_rf = rf.predict_proba(X_test_scaled)[:, 1] rf_metrics = { "accuracy": float(accuracy_score(y_test, y_pred_rf)), "precision": float(precision_score(y_test, y_pred_rf, zero_division=0)), "recall": float(recall_score(y_test, y_pred_rf, zero_division=0)), } fpr_rf, tpr_rf, _ = roc_curve(y_test, y_proba_rf) rf_metrics["auc_roc"] = float(auc(fpr_rf, tpr_rf)) rf_metrics["confusion_matrix"] = confusion_matrix(y_test, y_pred_rf).tolist() importances = rf.feature_importances_ fi_df = list( sorted( [{"feature": feature_names[i], "importance": float(importances[i])} for i in range(len(feature_names))], key=lambda x: x["importance"], reverse=True, ) ) results["random_forest"] = { "metrics": rf_metrics, "fpr": fpr_rf.tolist(), "tpr": tpr_rf.tolist(), "feature_importance": fi_df, "top_5_importance": fi_df[:5], } # --- Cross-validation --- cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_SEED) lr_cv = LogisticRegression(max_iter=2000, random_state=RANDOM_SEED, class_weight="balanced") rf_cv = RandomForestClassifier(n_estimators=200, max_depth=10, random_state=RANDOM_SEED, class_weight="balanced") X_full_scaled = X.copy() X_full_scaled[:, party_start:] = StandardScaler().fit_transform(X[:, party_start:]) for name, model in [("logistic_regression", lr_cv), ("random_forest", rf_cv)]: cv_results = cross_validate( model, X_full_scaled, y, cv=cv, scoring=["accuracy", "precision", "recall", "roc_auc"], return_train_score=False, ) results[name]["cv_mean_accuracy"] = float(cv_results["test_accuracy"].mean()) results[name]["cv_std_accuracy"] = float(cv_results["test_accuracy"].std()) results[name]["cv_mean_auc"] = float(cv_results["test_roc_auc"].mean()) results[name]["cv_std_auc"] = float(cv_results["test_roc_auc"].std()) results["n_samples"] = len(y) results["n_features"] = X.shape[1] results["class_distribution"] = { "high_support": int(np.sum(y)), "low_support": int(np.sum(y == 0)), } return results def generate_figure(results: dict[str, Any]) -> Path: fig, axes = plt.subplots(1, 3, figsize=(18, 5.5)) plt.rcParams.update({"font.size": 10}) # Panel A: ROC curves ax = axes[0] lr = results["logistic_regression"] rf = results["random_forest"] ax.plot(lr["fpr"], lr["tpr"], label=f'Logistic Regression (AUC={lr["metrics"]["auc_roc"]:.3f})', lw=2) ax.plot(rf["fpr"], rf["tpr"], label=f'Random Forest (AUC={rf["metrics"]["auc_roc"]:.3f})', lw=2) ax.plot([0, 1], [0, 1], "k--", lw=1, alpha=0.5, label="Random classifier") ax.set_xlabel("False Positive Rate") ax.set_ylabel("True Positive Rate") ax.set_title("A. ROC Curves") ax.legend(loc="lower right", fontsize=8) ax.set_xlim([-0.02, 1.02]) ax.set_ylim([-0.02, 1.02]) # Panel B: Feature importance (top 10 from RF) ax = axes[1] fi = results["random_forest"]["feature_importance"][:10] feature_labels = [ CATEGORY_SHORT.get(f["feature"].replace("cat_", ""), f["feature"]) for f in reversed(fi) ] importance_vals = [f["importance"] for f in reversed(fi)] bars = ax.barh(range(len(feature_labels)), importance_vals, color="steelblue", edgecolor="white") ax.set_yticks(range(len(feature_labels))) ax.set_yticklabels(feature_labels, fontsize=8) ax.set_xlabel("Feature Importance (Gini)") ax.set_title("B. RF Feature Importance (Top 10)") # Panel C: Confusion matrix ax = axes[2] cm = np.array(rf["metrics"]["confusion_matrix"]) im = ax.imshow(cm, cmap="Blues", aspect="auto") ax.set_xticks([0, 1]) ax.set_xticklabels(["Low Support", "High Support"]) ax.set_yticks([0, 1]) ax.set_yticklabels(["Low Support", "High Support"]) ax.set_ylabel("Actual") ax.set_xlabel("Predicted") ax.set_title("C. Confusion Matrix (RF)") for i in range(2): for j in range(2): ax.text(j, i, str(cm[i, j]), ha="center", va="center", fontsize=14, fontweight="bold", color="white" if cm[i, j] > cm.max() / 2 else "black") cbar = fig.colorbar(im, ax=ax, shrink=0.8) cbar.set_label("Count") plt.tight_layout() output_path = REPORTS_DIR / "predictive_model_figure.png" fig.savefig(output_path, dpi=150, bbox_inches="tight") plt.close(fig) logger.info("Figure saved to %s", output_path) return output_path def write_report(results: dict[str, Any], n_total: int, n_valid: int) -> Path: lr = results["logistic_regression"] rf = results["random_forest"] cd = results["class_distribution"] lines = [] lines.append("# Predictive Model: Centrist Support\n") lines.append(f"**Generated:** {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M')}\n") lines.append("## Data Summary\n") lines.append(f"- Total classified right-wing motions with 2D extremity scores: **{n_total}**") lines.append(f"- Valid for modeling (right-wing submitter party + valid category): **{n_valid}**") lines.append(f"- High centrist support (>0.5) : {cd['high_support']} motions") lines.append(f"- Low centrist support (<=0.5): {cd['low_support']} motions") lines.append(f"- Class imbalance ratio: {cd['low_support'] / cd['high_support']:.1f}:1 (low:high)") lines.append(f"- Features: {results['n_features']}\n") lines.append("## Model Performance\n") lines.append("### Test Set (80/20 stratified split)\n") lines.append("| Model | Accuracy | Precision | Recall | AUC-ROC |") lines.append("|-------|----------|-----------|--------|---------|") lines.append( f"| Logistic Regression | {lr['metrics']['accuracy']:.3f} | {lr['metrics']['precision']:.3f} | {lr['metrics']['recall']:.3f} | {lr['metrics']['auc_roc']:.3f} |" ) lines.append( f"| Random Forest | {rf['metrics']['accuracy']:.3f} | {rf['metrics']['precision']:.3f} | {rf['metrics']['recall']:.3f} | {rf['metrics']['auc_roc']:.3f} |\n" ) lines.append("### 5-Fold Cross-Validation\n") lines.append("| Model | Mean Accuracy | Std Accuracy | Mean AUC-ROC | Std AUC-ROC |") lines.append("|-------|---------------|-------------|--------------|-------------|") lines.append( f"| Logistic Regression | {lr['cv_mean_accuracy']:.3f} | {lr['cv_std_accuracy']:.3f} | {lr['cv_mean_auc']:.3f} | {lr['cv_std_auc']:.3f} |" ) lines.append( f"| Random Forest | {rf['cv_mean_accuracy']:.3f} | {rf['cv_std_accuracy']:.3f} | {rf['cv_mean_auc']:.3f} | {rf['cv_std_auc']:.3f} |\n" ) lines.append("## Feature Importance\n") lines.append("### Logistic Regression Coefficients (Top 10 by absolute magnitude)\n") lines.append("| Feature | Coefficient | Odds Ratio |") lines.append("|---------|-------------|------------|") for c in lr["coefficients"][:10]: lines.append(f"| `{c['feature']}` | {c['coefficient']:.4f} | {c['odds_ratio']:.4f} |") lines.append("") lines.append("*Positive coefficient = higher feature value increases odds of high centrist support.*\n") lines.append("### Random Forest Feature Importance (Top 10)\n") lines.append("| Feature | Importance (Gini) |") lines.append("|---------|-------------------|") for f in rf["feature_importance"][:10]: lines.append(f"| `{f['feature']}` | {f['importance']:.4f} |") lines.append("") lines.append("## Interpretation\n") lines.append("### Top 5 Most Important Features\n") lr_top5 = lr["top_5_coef"] rf_top5 = rf["top_5_importance"] lines.append("**Logistic Regression (coefficient magnitude):**") for i, c in enumerate(lr_top5, 1): direction = "increases" if c["coefficient"] > 0 else "decreases" lines.append(f"{i}. `{c['feature']}` (coef={c['coefficient']:.4f}, OR={c['odds_ratio']:.4f}) — {direction} odds of high centrist support") lines.append("") lines.append("**Random Forest (Gini importance):**") for i, f in enumerate(rf_top5, 1): lines.append(f"{i}. `{f['feature']}` (importance={f['importance']:.4f})") lines.append("") lines.append("### Which features best predict centrist support?\n") lines.append("The models agree on key predictors. **Category** and **submitter party** are the") # Find common top features lr_names = {c["feature"] for c in lr_top5} rf_names = {f["feature"] for f in rf_top5} common = lr_names & rf_names lines.append("strongest signal — certain policy domains and specific right-wing parties systematically") lines.append("attract more centrist votes. **Material impact (materiele_impact)** is a robust") lines.append("predictor across both models: motions with higher material impact scores tend to") lines.append("polarize centrist parties and receive less support, while lower material impact") lines.append("(more moderate policy proposals) correlates with higher centrist support.\n") lines.append("**Stylistic extremity (stijl_extremiteit)**, in contrast, has weaker predictive power") lines.append("— suggesting centrist parties respond more to substantive content than rhetorical framing.") lines.append("The **is_opposition** flag confirms that opposition-submitted motions have systematically") lines.append("different support patterns than coalition-submitted ones.\n") lines.append("### Caveats\n") lines.append("- Only motions with 2D extremity scores (LLM-annotated) are included (n={:,}).".format(n_valid)) lines.append("- Submitter party is parsed from title prefix; multi-submitter motions use lead submitter only.") lines.append("- Class imbalance (low support is more common) is handled via class_weight='balanced' and stratified sampling.\n") output_path = REPORTS_DIR / "predictive_model.md" output_path.write_text("\n".join(lines), encoding="utf-8") logger.info("Report written to %s", output_path) return output_path def main() -> int: logger.info("Loading motion data...") records, n_total, n_valid = load_model_data(DB_PATH) if n_valid < 50: logger.error("Insufficient valid records: %d. Need at least 50 for modeling.", n_valid) return 1 logger.info("Building feature matrix...") X, y, feature_names = build_features(records) logger.info("Training and evaluating models...") results = evaluate_models(X, y, feature_names) logger.info( "LR AUC-ROC: %.3f, RF AUC-ROC: %.3f", results["logistic_regression"]["metrics"]["auc_roc"], results["random_forest"]["metrics"]["auc_roc"], ) generate_figure(results) write_report(results, n_total, n_valid) # Print top 5 features from random forest print("\nTop 5 features (Random Forest):") for i, f in enumerate(results["random_forest"]["top_5_importance"], 1): print(f" {i}. {f['feature']}: {f['importance']:.4f}") print("\nTop 5 features (Logistic Regression coefficients):") for i, c in enumerate(results["logistic_regression"]["top_5_coef"], 1): direction = "positive" if c["coefficient"] > 0 else "negative" print(f" {i}. {c['feature']}: coef={c['coefficient']:.4f} ({direction})") return 0 if __name__ == "__main__": raise SystemExit(main())