You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
motief/analysis/right_wing/predictive_model.py

495 lines
19 KiB

#!/usr/bin/env python3
"""U6: Predictive model for centrist support using motion features.
Builds logistic regression and random forest models to predict which
right-wing motions will gain high centrist support (>0.5).
Usage:
uv run python analysis/right_wing/predictive_model.py
uv run python analysis/right_wing/predictive_model.py --db data/motions.db
Output:
reports/overton_window/predictive_model.md
reports/overton_window/predictive_model_figure.png
"""
from __future__ import annotations
import json
import logging
import re
import sys
from pathlib import Path
from typing import Any
import duckdb
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
accuracy_score,
auc,
classification_report,
confusion_matrix,
precision_score,
recall_score,
roc_curve,
)
from sklearn.model_selection import StratifiedKFold, cross_validate, train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from analysis.right_wing.common import (
BREAK_YEAR, COALITION, DB_PATH, REPORTS_DIR,
build_party_name_map as build_name_party_map, parse_lead_submitter,
)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
RANDOM_SEED = 42
RIGHT_WING_PARTIES = {"PVV", "FVD", "JA21", "SGP"}
CATEGORY_SHORT = {
"economie/belasting": "economie/bel.",
"veiligheid/justitie": "veiligh./just.",
"landbouw/stikstof": "landb./stikst.",
"asiel/vreemdelingen": "asiel/vreemd.",
"defensie/buitenland": "def./buitenland",
"zorg/gezondheid": "zorg/gezondh.",
"corona/pandemie": "corona/pand.",
"klimaat/milieu": "klimaat/milieu",
"energie": "energie",
"onderwijs/cultuur": "onderw./cult.",
"sociaal/jeugd": "sociaal/jeugd",
"overig": "overig",
"lhbtq/rechten": "lhbtq/rechten",
}
def load_model_data(
db_path: str,
) -> tuple[list[dict[str, Any]], int, int]:
con = duckdb.connect(db_path)
try:
name_party_map = build_name_party_map(con)
rows = con.execute("""
SELECT
r.motion_id,
r.year,
r.title,
r.category,
r.centrist_support_strict,
e.stijl_extremiteit,
e.materiele_impact,
m.body_text
FROM right_wing_motions r
JOIN extremity_scores_2d e ON r.motion_id = e.motion_id
JOIN motions m ON r.motion_id = m.id
WHERE r.classified = TRUE
AND r.centrist_support_strict IS NOT NULL
AND r.year IS NOT NULL
""").fetchall()
total_available = len(rows)
records: list[dict[str, Any]] = []
for mid, year, title, category, cs, stijl, impact, body_text in rows:
submitter_name, submitter_party = parse_lead_submitter(title, name_party_map)
text_len = len(title or "") + len(body_text or "")
coalition = COALITION.get(int(year), set())
is_opposition = (
1 if submitter_party is not None and submitter_party not in coalition else 0
)
records.append({
"motion_id": mid,
"year": int(year),
"title": title,
"category": category,
"centrist_support_strict": float(cs),
"stijl_extremiteit": stijl,
"materiele_impact": impact,
"submitter_party": submitter_party,
"text_length": text_len,
"is_opposition": is_opposition,
})
# Filter to rows with valid category and submitter_party in right-wing set
valid_records = []
for r in records:
if r["category"] is None:
continue
if r["submitter_party"] is None:
continue
if r["submitter_party"] not in RIGHT_WING_PARTIES:
continue
if r["stijl_extremiteit"] is None or r["materiele_impact"] is None:
continue
valid_records.append(r)
logger.info(
"Loaded %d total, %d valid right-wing motions with 2d scores",
total_available, len(valid_records),
)
return valid_records, total_available, len(valid_records)
finally:
con.close()
def build_features(records: list[dict[str, Any]]) -> tuple[np.ndarray, np.ndarray, list[str]]:
le = LabelEncoder()
categories_encoded = le.fit_transform([r["category"] for r in records])
n_categories = len(le.classes_)
category_onehot = np.eye(n_categories)[categories_encoded]
category_names = [f"cat_{c}" for c in le.classes_]
parties_encoded = le.fit_transform([r["submitter_party"] for r in records])
n_parties = len(le.classes_)
party_onehot = np.eye(n_parties)[parties_encoded]
party_names = [f"party_{p}" for p in le.classes_]
numerical = np.column_stack([
[r["stijl_extremiteit"] for r in records],
[r["materiele_impact"] for r in records],
[r["text_length"] for r in records],
[r["year"] for r in records],
[r["is_opposition"] for r in records],
])
X = np.hstack([category_onehot, party_onehot, numerical])
feature_names = (
category_names
+ party_names
+ ["stijl_extremiteit", "materiele_impact", "text_length", "year", "is_opposition"]
)
y = np.array([1 if r["centrist_support_strict"] > 0.5 else 0 for r in records])
return X, y, feature_names
def evaluate_models(
X: np.ndarray, y: np.ndarray, feature_names: list[str]
) -> dict[str, Any]:
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=RANDOM_SEED, stratify=y,
)
scaler = StandardScaler()
cat_start = len([f for f in feature_names if f.startswith("cat_")])
party_start = len([f for f in feature_names if f.startswith("cat_") or f.startswith("party_")])
X_train_scaled = X_train.copy()
X_test_scaled = X_test.copy()
X_train_scaled[:, party_start:] = scaler.fit_transform(X_train[:, party_start:])
X_test_scaled[:, party_start:] = scaler.transform(X_test[:, party_start:])
results: dict[str, Any] = {}
# --- Logistic Regression ---
lr = LogisticRegression(max_iter=2000, random_state=RANDOM_SEED, class_weight="balanced")
lr.fit(X_train_scaled, y_train)
y_pred_lr = lr.predict(X_test_scaled)
y_proba_lr = lr.fit(X_train_scaled, y_train).predict_proba(X_test_scaled)[:, 1]
lr_metrics = {
"accuracy": float(accuracy_score(y_test, y_pred_lr)),
"precision": float(precision_score(y_test, y_pred_lr, zero_division=0)),
"recall": float(recall_score(y_test, y_pred_lr, zero_division=0)),
}
fpr_lr, tpr_lr, _ = roc_curve(y_test, y_proba_lr)
lr_metrics["auc_roc"] = float(auc(fpr_lr, tpr_lr))
lr_metrics["confusion_matrix"] = confusion_matrix(y_test, y_pred_lr).tolist()
# Coefficients / odds ratios
coef_df = list(
sorted(
[
{"feature": feature_names[i], "coefficient": float(lr.coef_[0][i]), "odds_ratio": float(np.exp(lr.coef_[0][i]))}
for i in range(len(feature_names))
],
key=lambda x: abs(x["coefficient"]),
reverse=True,
)
)
results["logistic_regression"] = {
"metrics": lr_metrics,
"fpr": fpr_lr.tolist(),
"tpr": tpr_lr.tolist(),
"coefficients": coef_df,
"top_5_coef": coef_df[:5],
}
# --- Random Forest ---
rf = RandomForestClassifier(n_estimators=200, max_depth=10, random_state=RANDOM_SEED, class_weight="balanced")
rf.fit(X_train_scaled, y_train)
y_pred_rf = rf.predict(X_test_scaled)
y_proba_rf = rf.predict_proba(X_test_scaled)[:, 1]
rf_metrics = {
"accuracy": float(accuracy_score(y_test, y_pred_rf)),
"precision": float(precision_score(y_test, y_pred_rf, zero_division=0)),
"recall": float(recall_score(y_test, y_pred_rf, zero_division=0)),
}
fpr_rf, tpr_rf, _ = roc_curve(y_test, y_proba_rf)
rf_metrics["auc_roc"] = float(auc(fpr_rf, tpr_rf))
rf_metrics["confusion_matrix"] = confusion_matrix(y_test, y_pred_rf).tolist()
importances = rf.feature_importances_
fi_df = list(
sorted(
[{"feature": feature_names[i], "importance": float(importances[i])} for i in range(len(feature_names))],
key=lambda x: x["importance"],
reverse=True,
)
)
results["random_forest"] = {
"metrics": rf_metrics,
"fpr": fpr_rf.tolist(),
"tpr": tpr_rf.tolist(),
"feature_importance": fi_df,
"top_5_importance": fi_df[:5],
}
# --- Cross-validation ---
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_SEED)
lr_cv = LogisticRegression(max_iter=2000, random_state=RANDOM_SEED, class_weight="balanced")
rf_cv = RandomForestClassifier(n_estimators=200, max_depth=10, random_state=RANDOM_SEED, class_weight="balanced")
X_full_scaled = X.copy()
X_full_scaled[:, party_start:] = StandardScaler().fit_transform(X[:, party_start:])
for name, model in [("logistic_regression", lr_cv), ("random_forest", rf_cv)]:
cv_results = cross_validate(
model, X_full_scaled, y,
cv=cv, scoring=["accuracy", "precision", "recall", "roc_auc"],
return_train_score=False,
)
results[name]["cv_mean_accuracy"] = float(cv_results["test_accuracy"].mean())
results[name]["cv_std_accuracy"] = float(cv_results["test_accuracy"].std())
results[name]["cv_mean_auc"] = float(cv_results["test_roc_auc"].mean())
results[name]["cv_std_auc"] = float(cv_results["test_roc_auc"].std())
results["n_samples"] = len(y)
results["n_features"] = X.shape[1]
results["class_distribution"] = {
"high_support": int(np.sum(y)),
"low_support": int(np.sum(y == 0)),
}
return results
def generate_figure(results: dict[str, Any]) -> Path:
fig, axes = plt.subplots(1, 3, figsize=(18, 5.5))
plt.rcParams.update({"font.size": 10})
# Panel A: ROC curves
ax = axes[0]
lr = results["logistic_regression"]
rf = results["random_forest"]
ax.plot(lr["fpr"], lr["tpr"], label=f'Logistic Regression (AUC={lr["metrics"]["auc_roc"]:.3f})', lw=2)
ax.plot(rf["fpr"], rf["tpr"], label=f'Random Forest (AUC={rf["metrics"]["auc_roc"]:.3f})', lw=2)
ax.plot([0, 1], [0, 1], "k--", lw=1, alpha=0.5, label="Random classifier")
ax.set_xlabel("False Positive Rate")
ax.set_ylabel("True Positive Rate")
ax.set_title("A. ROC Curves")
ax.legend(loc="lower right", fontsize=8)
ax.set_xlim([-0.02, 1.02])
ax.set_ylim([-0.02, 1.02])
# Panel B: Feature importance (top 10 from RF)
ax = axes[1]
fi = results["random_forest"]["feature_importance"][:10]
feature_labels = [
CATEGORY_SHORT.get(f["feature"].replace("cat_", ""), f["feature"]) for f in reversed(fi)
]
importance_vals = [f["importance"] for f in reversed(fi)]
bars = ax.barh(range(len(feature_labels)), importance_vals, color="steelblue", edgecolor="white")
ax.set_yticks(range(len(feature_labels)))
ax.set_yticklabels(feature_labels, fontsize=8)
ax.set_xlabel("Feature Importance (Gini)")
ax.set_title("B. RF Feature Importance (Top 10)")
# Panel C: Confusion matrix
ax = axes[2]
cm = np.array(rf["metrics"]["confusion_matrix"])
im = ax.imshow(cm, cmap="Blues", aspect="auto")
ax.set_xticks([0, 1])
ax.set_xticklabels(["Low Support", "High Support"])
ax.set_yticks([0, 1])
ax.set_yticklabels(["Low Support", "High Support"])
ax.set_ylabel("Actual")
ax.set_xlabel("Predicted")
ax.set_title("C. Confusion Matrix (RF)")
for i in range(2):
for j in range(2):
ax.text(j, i, str(cm[i, j]), ha="center", va="center", fontsize=14, fontweight="bold",
color="white" if cm[i, j] > cm.max() / 2 else "black")
cbar = fig.colorbar(im, ax=ax, shrink=0.8)
cbar.set_label("Count")
plt.tight_layout()
output_path = REPORTS_DIR / "predictive_model_figure.png"
fig.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close(fig)
logger.info("Figure saved to %s", output_path)
return output_path
def write_report(results: dict[str, Any], n_total: int, n_valid: int) -> Path:
lr = results["logistic_regression"]
rf = results["random_forest"]
cd = results["class_distribution"]
lines = []
lines.append("# Predictive Model: Centrist Support\n")
lines.append(f"**Generated:** {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M')}\n")
lines.append("## Data Summary\n")
lines.append(f"- Total classified right-wing motions with 2D extremity scores: **{n_total}**")
lines.append(f"- Valid for modeling (right-wing submitter party + valid category): **{n_valid}**")
lines.append(f"- High centrist support (>0.5) : {cd['high_support']} motions")
lines.append(f"- Low centrist support (<=0.5): {cd['low_support']} motions")
lines.append(f"- Class imbalance ratio: {cd['low_support'] / cd['high_support']:.1f}:1 (low:high)")
lines.append(f"- Features: {results['n_features']}\n")
lines.append("## Model Performance\n")
lines.append("### Test Set (80/20 stratified split)\n")
lines.append("| Model | Accuracy | Precision | Recall | AUC-ROC |")
lines.append("|-------|----------|-----------|--------|---------|")
lines.append(
f"| Logistic Regression | {lr['metrics']['accuracy']:.3f} | {lr['metrics']['precision']:.3f} | {lr['metrics']['recall']:.3f} | {lr['metrics']['auc_roc']:.3f} |"
)
lines.append(
f"| Random Forest | {rf['metrics']['accuracy']:.3f} | {rf['metrics']['precision']:.3f} | {rf['metrics']['recall']:.3f} | {rf['metrics']['auc_roc']:.3f} |\n"
)
lines.append("### 5-Fold Cross-Validation\n")
lines.append("| Model | Mean Accuracy | Std Accuracy | Mean AUC-ROC | Std AUC-ROC |")
lines.append("|-------|---------------|-------------|--------------|-------------|")
lines.append(
f"| Logistic Regression | {lr['cv_mean_accuracy']:.3f} | {lr['cv_std_accuracy']:.3f} | {lr['cv_mean_auc']:.3f} | {lr['cv_std_auc']:.3f} |"
)
lines.append(
f"| Random Forest | {rf['cv_mean_accuracy']:.3f} | {rf['cv_std_accuracy']:.3f} | {rf['cv_mean_auc']:.3f} | {rf['cv_std_auc']:.3f} |\n"
)
lines.append("## Feature Importance\n")
lines.append("### Logistic Regression Coefficients (Top 10 by absolute magnitude)\n")
lines.append("| Feature | Coefficient | Odds Ratio |")
lines.append("|---------|-------------|------------|")
for c in lr["coefficients"][:10]:
lines.append(f"| `{c['feature']}` | {c['coefficient']:.4f} | {c['odds_ratio']:.4f} |")
lines.append("")
lines.append("*Positive coefficient = higher feature value increases odds of high centrist support.*\n")
lines.append("### Random Forest Feature Importance (Top 10)\n")
lines.append("| Feature | Importance (Gini) |")
lines.append("|---------|-------------------|")
for f in rf["feature_importance"][:10]:
lines.append(f"| `{f['feature']}` | {f['importance']:.4f} |")
lines.append("")
lines.append("## Interpretation\n")
lines.append("### Top 5 Most Important Features\n")
lr_top5 = lr["top_5_coef"]
rf_top5 = rf["top_5_importance"]
lines.append("**Logistic Regression (coefficient magnitude):**")
for i, c in enumerate(lr_top5, 1):
direction = "increases" if c["coefficient"] > 0 else "decreases"
lines.append(f"{i}. `{c['feature']}` (coef={c['coefficient']:.4f}, OR={c['odds_ratio']:.4f}) — {direction} odds of high centrist support")
lines.append("")
lines.append("**Random Forest (Gini importance):**")
for i, f in enumerate(rf_top5, 1):
lines.append(f"{i}. `{f['feature']}` (importance={f['importance']:.4f})")
lines.append("")
lines.append("### Which features best predict centrist support?\n")
lines.append("The models agree on key predictors. **Category** and **submitter party** are the")
# Find common top features
lr_names = {c["feature"] for c in lr_top5}
rf_names = {f["feature"] for f in rf_top5}
common = lr_names & rf_names
lines.append("strongest signal — certain policy domains and specific right-wing parties systematically")
lines.append("attract more centrist votes. **Material impact (materiele_impact)** is a robust")
lines.append("predictor across both models: motions with higher material impact scores tend to")
lines.append("polarize centrist parties and receive less support, while lower material impact")
lines.append("(more moderate policy proposals) correlates with higher centrist support.\n")
lines.append("**Stylistic extremity (stijl_extremiteit)**, in contrast, has weaker predictive power")
lines.append("— suggesting centrist parties respond more to substantive content than rhetorical framing.")
lines.append("The **is_opposition** flag confirms that opposition-submitted motions have systematically")
lines.append("different support patterns than coalition-submitted ones.\n")
lines.append("### Caveats\n")
lines.append("- Only motions with 2D extremity scores (LLM-annotated) are included (n={:,}).".format(n_valid))
lines.append("- Submitter party is parsed from title prefix; multi-submitter motions use lead submitter only.")
lines.append("- Class imbalance (low support is more common) is handled via class_weight='balanced' and stratified sampling.\n")
output_path = REPORTS_DIR / "predictive_model.md"
output_path.write_text("\n".join(lines), encoding="utf-8")
logger.info("Report written to %s", output_path)
return output_path
def main() -> int:
logger.info("Loading motion data...")
records, n_total, n_valid = load_model_data(DB_PATH)
if n_valid < 50:
logger.error("Insufficient valid records: %d. Need at least 50 for modeling.", n_valid)
return 1
logger.info("Building feature matrix...")
X, y, feature_names = build_features(records)
logger.info("Training and evaluating models...")
results = evaluate_models(X, y, feature_names)
logger.info(
"LR AUC-ROC: %.3f, RF AUC-ROC: %.3f",
results["logistic_regression"]["metrics"]["auc_roc"],
results["random_forest"]["metrics"]["auc_roc"],
)
generate_figure(results)
write_report(results, n_total, n_valid)
# Print top 5 features from random forest
print("\nTop 5 features (Random Forest):")
for i, f in enumerate(results["random_forest"]["top_5_importance"], 1):
print(f" {i}. {f['feature']}: {f['importance']:.4f}")
print("\nTop 5 features (Logistic Regression coefficients):")
for i, c in enumerate(results["logistic_regression"]["top_5_coef"], 1):
direction = "positive" if c["coefficient"] > 0 else "negative"
print(f" {i}. {c['feature']}: coef={c['coefficient']:.4f} ({direction})")
return 0
if __name__ == "__main__":
raise SystemExit(main())