You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
497 lines
19 KiB
497 lines
19 KiB
#!/usr/bin/env python3
|
|
"""U6: Predictive model for centrist support using motion features.
|
|
|
|
Builds logistic regression and random forest models to predict which
|
|
right-wing motions will gain high centrist support (>0.5).
|
|
|
|
Usage:
|
|
uv run python analysis/right_wing/predictive_model.py
|
|
uv run python analysis/right_wing/predictive_model.py --db data/motions.db
|
|
|
|
Output:
|
|
reports/overton_window/predictive_model.md
|
|
reports/overton_window/predictive_model_figure.png
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import duckdb
|
|
import matplotlib
|
|
matplotlib.use("Agg")
|
|
import matplotlib.pyplot as plt
|
|
import numpy as np
|
|
from sklearn.ensemble import RandomForestClassifier
|
|
from sklearn.linear_model import LogisticRegression
|
|
from sklearn.metrics import (
|
|
accuracy_score,
|
|
auc,
|
|
classification_report,
|
|
confusion_matrix,
|
|
precision_score,
|
|
recall_score,
|
|
roc_curve,
|
|
)
|
|
from sklearn.model_selection import StratifiedKFold, cross_validate, train_test_split
|
|
from sklearn.preprocessing import LabelEncoder, StandardScaler
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
|
|
if str(PROJECT_ROOT) not in sys.path:
|
|
sys.path.insert(0, str(PROJECT_ROOT))
|
|
|
|
from analysis.right_wing.common import (
|
|
BREAK_YEAR, COALITION, DB_PATH, REPORTS_DIR,
|
|
build_party_name_map as build_name_party_map, parse_lead_submitter,
|
|
)
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
RANDOM_SEED = 42
|
|
|
|
RIGHT_WING_PARTIES = {"PVV", "FVD", "JA21", "SGP"}
|
|
|
|
CATEGORY_SHORT = {
|
|
"economie/belasting": "economie/bel.",
|
|
"veiligheid/justitie": "veiligh./just.",
|
|
"landbouw/stikstof": "landb./stikst.",
|
|
"asiel/vreemdelingen": "asiel/vreemd.",
|
|
"defensie/buitenland": "def./buitenland",
|
|
"zorg/gezondheid": "zorg/gezondh.",
|
|
"corona/pandemie": "corona/pand.",
|
|
"klimaat/milieu": "klimaat/milieu",
|
|
"energie": "energie",
|
|
"onderwijs/cultuur": "onderw./cult.",
|
|
"sociaal/jeugd": "sociaal/jeugd",
|
|
"overig": "overig",
|
|
"lhbtq/rechten": "lhbtq/rechten",
|
|
}
|
|
|
|
|
|
def load_model_data(
|
|
db_path: str,
|
|
) -> tuple[list[dict[str, Any]], int, int]:
|
|
con = duckdb.connect(db_path)
|
|
try:
|
|
name_party_map = build_name_party_map(con)
|
|
|
|
rows = con.execute("""
|
|
SELECT
|
|
r.motion_id,
|
|
r.year,
|
|
r.title,
|
|
r.category,
|
|
r.centrist_support_strict,
|
|
e.stijl_extremiteit,
|
|
e.materiele_impact,
|
|
m.body_text
|
|
FROM right_wing_motions r
|
|
JOIN extremity_scores_2d e ON r.motion_id = e.motion_id
|
|
JOIN motions m ON r.motion_id = m.id
|
|
WHERE r.classified = TRUE
|
|
AND r.centrist_support_strict IS NOT NULL
|
|
AND r.year IS NOT NULL
|
|
""").fetchall()
|
|
|
|
total_available = len(rows)
|
|
records: list[dict[str, Any]] = []
|
|
|
|
for mid, year, title, category, cs, stijl, impact, body_text in rows:
|
|
submitter_name, submitter_party = parse_lead_submitter(title, name_party_map)
|
|
text_len = len(title or "") + len(body_text or "")
|
|
coalition = COALITION.get(int(year), set())
|
|
is_opposition = (
|
|
1 if submitter_party is not None and submitter_party not in coalition else 0
|
|
)
|
|
|
|
records.append({
|
|
"motion_id": mid,
|
|
"year": int(year),
|
|
"title": title,
|
|
"category": category,
|
|
"centrist_support_strict": float(cs),
|
|
"stijl_extremiteit": stijl,
|
|
"materiele_impact": impact,
|
|
"submitter_party": submitter_party,
|
|
"text_length": text_len,
|
|
"is_opposition": is_opposition,
|
|
})
|
|
|
|
for r in records:
|
|
if r["category"] is None:
|
|
r["category"] = "overig"
|
|
|
|
# Filter to rows with valid submitter_party in right-wing set
|
|
valid_records = []
|
|
for r in records:
|
|
if r["submitter_party"] is None:
|
|
continue
|
|
if r["submitter_party"] not in RIGHT_WING_PARTIES:
|
|
continue
|
|
if r["stijl_extremiteit"] is None or r["materiele_impact"] is None:
|
|
continue
|
|
valid_records.append(r)
|
|
|
|
logger.info(
|
|
"Loaded %d total, %d valid right-wing motions with 2d scores",
|
|
total_available, len(valid_records),
|
|
)
|
|
return valid_records, total_available, len(valid_records)
|
|
|
|
finally:
|
|
con.close()
|
|
|
|
|
|
def build_features(records: list[dict[str, Any]]) -> tuple[np.ndarray, np.ndarray, list[str]]:
|
|
le = LabelEncoder()
|
|
categories_encoded = le.fit_transform([r["category"] for r in records])
|
|
n_categories = len(le.classes_)
|
|
category_onehot = np.eye(n_categories)[categories_encoded]
|
|
category_names = [f"cat_{c}" for c in le.classes_]
|
|
|
|
parties_encoded = le.fit_transform([r["submitter_party"] for r in records])
|
|
n_parties = len(le.classes_)
|
|
party_onehot = np.eye(n_parties)[parties_encoded]
|
|
party_names = [f"party_{p}" for p in le.classes_]
|
|
|
|
numerical = np.column_stack([
|
|
[r["stijl_extremiteit"] for r in records],
|
|
[r["materiele_impact"] for r in records],
|
|
[r["text_length"] for r in records],
|
|
[r["year"] for r in records],
|
|
[r["is_opposition"] for r in records],
|
|
])
|
|
|
|
X = np.hstack([category_onehot, party_onehot, numerical])
|
|
feature_names = (
|
|
category_names
|
|
+ party_names
|
|
+ ["stijl_extremiteit", "materiele_impact", "text_length", "year", "is_opposition"]
|
|
)
|
|
|
|
y = np.array([1 if r["centrist_support_strict"] > 0.5 else 0 for r in records])
|
|
|
|
return X, y, feature_names
|
|
|
|
|
|
def evaluate_models(
|
|
X: np.ndarray, y: np.ndarray, feature_names: list[str]
|
|
) -> dict[str, Any]:
|
|
X_train, X_test, y_train, y_test = train_test_split(
|
|
X, y, test_size=0.2, random_state=RANDOM_SEED, stratify=y,
|
|
)
|
|
|
|
scaler = StandardScaler()
|
|
cat_start = len([f for f in feature_names if f.startswith("cat_")])
|
|
party_start = len([f for f in feature_names if f.startswith("cat_") or f.startswith("party_")])
|
|
|
|
X_train_scaled = X_train.copy()
|
|
X_test_scaled = X_test.copy()
|
|
X_train_scaled[:, party_start:] = scaler.fit_transform(X_train[:, party_start:])
|
|
X_test_scaled[:, party_start:] = scaler.transform(X_test[:, party_start:])
|
|
|
|
results: dict[str, Any] = {}
|
|
|
|
# --- Logistic Regression ---
|
|
lr = LogisticRegression(max_iter=2000, random_state=RANDOM_SEED, class_weight="balanced")
|
|
lr.fit(X_train_scaled, y_train)
|
|
|
|
y_pred_lr = lr.predict(X_test_scaled)
|
|
y_proba_lr = lr.fit(X_train_scaled, y_train).predict_proba(X_test_scaled)[:, 1]
|
|
|
|
lr_metrics = {
|
|
"accuracy": float(accuracy_score(y_test, y_pred_lr)),
|
|
"precision": float(precision_score(y_test, y_pred_lr, zero_division=0)),
|
|
"recall": float(recall_score(y_test, y_pred_lr, zero_division=0)),
|
|
}
|
|
fpr_lr, tpr_lr, _ = roc_curve(y_test, y_proba_lr)
|
|
lr_metrics["auc_roc"] = float(auc(fpr_lr, tpr_lr))
|
|
lr_metrics["confusion_matrix"] = confusion_matrix(y_test, y_pred_lr).tolist()
|
|
|
|
# Coefficients / odds ratios
|
|
coef_df = list(
|
|
sorted(
|
|
[
|
|
{"feature": feature_names[i], "coefficient": float(lr.coef_[0][i]), "odds_ratio": float(np.exp(lr.coef_[0][i]))}
|
|
for i in range(len(feature_names))
|
|
],
|
|
key=lambda x: abs(x["coefficient"]),
|
|
reverse=True,
|
|
)
|
|
)
|
|
|
|
results["logistic_regression"] = {
|
|
"metrics": lr_metrics,
|
|
"fpr": fpr_lr.tolist(),
|
|
"tpr": tpr_lr.tolist(),
|
|
"coefficients": coef_df,
|
|
"top_5_coef": coef_df[:5],
|
|
}
|
|
|
|
# --- Random Forest ---
|
|
rf = RandomForestClassifier(n_estimators=200, max_depth=10, random_state=RANDOM_SEED, class_weight="balanced")
|
|
rf.fit(X_train_scaled, y_train)
|
|
|
|
y_pred_rf = rf.predict(X_test_scaled)
|
|
y_proba_rf = rf.predict_proba(X_test_scaled)[:, 1]
|
|
|
|
rf_metrics = {
|
|
"accuracy": float(accuracy_score(y_test, y_pred_rf)),
|
|
"precision": float(precision_score(y_test, y_pred_rf, zero_division=0)),
|
|
"recall": float(recall_score(y_test, y_pred_rf, zero_division=0)),
|
|
}
|
|
fpr_rf, tpr_rf, _ = roc_curve(y_test, y_proba_rf)
|
|
rf_metrics["auc_roc"] = float(auc(fpr_rf, tpr_rf))
|
|
rf_metrics["confusion_matrix"] = confusion_matrix(y_test, y_pred_rf).tolist()
|
|
|
|
importances = rf.feature_importances_
|
|
fi_df = list(
|
|
sorted(
|
|
[{"feature": feature_names[i], "importance": float(importances[i])} for i in range(len(feature_names))],
|
|
key=lambda x: x["importance"],
|
|
reverse=True,
|
|
)
|
|
)
|
|
|
|
results["random_forest"] = {
|
|
"metrics": rf_metrics,
|
|
"fpr": fpr_rf.tolist(),
|
|
"tpr": tpr_rf.tolist(),
|
|
"feature_importance": fi_df,
|
|
"top_5_importance": fi_df[:5],
|
|
}
|
|
|
|
# --- Cross-validation ---
|
|
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_SEED)
|
|
lr_cv = LogisticRegression(max_iter=2000, random_state=RANDOM_SEED, class_weight="balanced")
|
|
rf_cv = RandomForestClassifier(n_estimators=200, max_depth=10, random_state=RANDOM_SEED, class_weight="balanced")
|
|
|
|
X_full_scaled = X.copy()
|
|
X_full_scaled[:, party_start:] = StandardScaler().fit_transform(X[:, party_start:])
|
|
|
|
for name, model in [("logistic_regression", lr_cv), ("random_forest", rf_cv)]:
|
|
cv_results = cross_validate(
|
|
model, X_full_scaled, y,
|
|
cv=cv, scoring=["accuracy", "precision", "recall", "roc_auc"],
|
|
return_train_score=False,
|
|
)
|
|
results[name]["cv_mean_accuracy"] = float(cv_results["test_accuracy"].mean())
|
|
results[name]["cv_std_accuracy"] = float(cv_results["test_accuracy"].std())
|
|
results[name]["cv_mean_auc"] = float(cv_results["test_roc_auc"].mean())
|
|
results[name]["cv_std_auc"] = float(cv_results["test_roc_auc"].std())
|
|
|
|
results["n_samples"] = len(y)
|
|
results["n_features"] = X.shape[1]
|
|
results["class_distribution"] = {
|
|
"high_support": int(np.sum(y)),
|
|
"low_support": int(np.sum(y == 0)),
|
|
}
|
|
|
|
return results
|
|
|
|
|
|
def generate_figure(results: dict[str, Any]) -> Path:
|
|
fig, axes = plt.subplots(1, 3, figsize=(18, 5.5))
|
|
plt.rcParams.update({"font.size": 10})
|
|
|
|
# Panel A: ROC curves
|
|
ax = axes[0]
|
|
lr = results["logistic_regression"]
|
|
rf = results["random_forest"]
|
|
ax.plot(lr["fpr"], lr["tpr"], label=f'Logistic Regression (AUC={lr["metrics"]["auc_roc"]:.3f})', lw=2)
|
|
ax.plot(rf["fpr"], rf["tpr"], label=f'Random Forest (AUC={rf["metrics"]["auc_roc"]:.3f})', lw=2)
|
|
ax.plot([0, 1], [0, 1], "k--", lw=1, alpha=0.5, label="Random classifier")
|
|
ax.set_xlabel("False Positive Rate")
|
|
ax.set_ylabel("True Positive Rate")
|
|
ax.set_title("A. ROC Curves")
|
|
ax.legend(loc="lower right", fontsize=8)
|
|
ax.set_xlim([-0.02, 1.02])
|
|
ax.set_ylim([-0.02, 1.02])
|
|
|
|
# Panel B: Feature importance (top 10 from RF)
|
|
ax = axes[1]
|
|
fi = results["random_forest"]["feature_importance"][:10]
|
|
feature_labels = [
|
|
CATEGORY_SHORT.get(f["feature"].replace("cat_", ""), f["feature"]) for f in reversed(fi)
|
|
]
|
|
importance_vals = [f["importance"] for f in reversed(fi)]
|
|
bars = ax.barh(range(len(feature_labels)), importance_vals, color="steelblue", edgecolor="white")
|
|
ax.set_yticks(range(len(feature_labels)))
|
|
ax.set_yticklabels(feature_labels, fontsize=8)
|
|
ax.set_xlabel("Feature Importance (Gini)")
|
|
ax.set_title("B. RF Feature Importance (Top 10)")
|
|
|
|
# Panel C: Confusion matrix
|
|
ax = axes[2]
|
|
cm = np.array(rf["metrics"]["confusion_matrix"])
|
|
im = ax.imshow(cm, cmap="Blues", aspect="auto")
|
|
ax.set_xticks([0, 1])
|
|
ax.set_xticklabels(["Low Support", "High Support"])
|
|
ax.set_yticks([0, 1])
|
|
ax.set_yticklabels(["Low Support", "High Support"])
|
|
ax.set_ylabel("Actual")
|
|
ax.set_xlabel("Predicted")
|
|
ax.set_title("C. Confusion Matrix (RF)")
|
|
for i in range(2):
|
|
for j in range(2):
|
|
ax.text(j, i, str(cm[i, j]), ha="center", va="center", fontsize=14, fontweight="bold",
|
|
color="white" if cm[i, j] > cm.max() / 2 else "black")
|
|
cbar = fig.colorbar(im, ax=ax, shrink=0.8)
|
|
cbar.set_label("Count")
|
|
|
|
plt.tight_layout()
|
|
output_path = REPORTS_DIR / "predictive_model_figure.png"
|
|
fig.savefig(output_path, dpi=150, bbox_inches="tight")
|
|
plt.close(fig)
|
|
logger.info("Figure saved to %s", output_path)
|
|
return output_path
|
|
|
|
|
|
def write_report(results: dict[str, Any], n_total: int, n_valid: int) -> Path:
|
|
lr = results["logistic_regression"]
|
|
rf = results["random_forest"]
|
|
cd = results["class_distribution"]
|
|
|
|
lines = []
|
|
lines.append("# Predictive Model: Centrist Support\n")
|
|
lines.append(f"**Generated:** {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M')}\n")
|
|
|
|
lines.append("## Data Summary\n")
|
|
lines.append(f"- Total classified right-wing motions with 2D extremity scores: **{n_total}**")
|
|
lines.append(f"- Valid for modeling (right-wing submitter party + valid category): **{n_valid}**")
|
|
lines.append(f"- High centrist support (>0.5) : {cd['high_support']} motions")
|
|
lines.append(f"- Low centrist support (<=0.5): {cd['low_support']} motions")
|
|
lines.append(f"- Class imbalance ratio: {cd['low_support'] / cd['high_support']:.1f}:1 (low:high)")
|
|
lines.append(f"- Features: {results['n_features']}\n")
|
|
|
|
lines.append("## Model Performance\n")
|
|
lines.append("### Test Set (80/20 stratified split)\n")
|
|
lines.append("| Model | Accuracy | Precision | Recall | AUC-ROC |")
|
|
lines.append("|-------|----------|-----------|--------|---------|")
|
|
lines.append(
|
|
f"| Logistic Regression | {lr['metrics']['accuracy']:.3f} | {lr['metrics']['precision']:.3f} | {lr['metrics']['recall']:.3f} | {lr['metrics']['auc_roc']:.3f} |"
|
|
)
|
|
lines.append(
|
|
f"| Random Forest | {rf['metrics']['accuracy']:.3f} | {rf['metrics']['precision']:.3f} | {rf['metrics']['recall']:.3f} | {rf['metrics']['auc_roc']:.3f} |\n"
|
|
)
|
|
|
|
lines.append("### 5-Fold Cross-Validation\n")
|
|
lines.append("| Model | Mean Accuracy | Std Accuracy | Mean AUC-ROC | Std AUC-ROC |")
|
|
lines.append("|-------|---------------|-------------|--------------|-------------|")
|
|
lines.append(
|
|
f"| Logistic Regression | {lr['cv_mean_accuracy']:.3f} | {lr['cv_std_accuracy']:.3f} | {lr['cv_mean_auc']:.3f} | {lr['cv_std_auc']:.3f} |"
|
|
)
|
|
lines.append(
|
|
f"| Random Forest | {rf['cv_mean_accuracy']:.3f} | {rf['cv_std_accuracy']:.3f} | {rf['cv_mean_auc']:.3f} | {rf['cv_std_auc']:.3f} |\n"
|
|
)
|
|
|
|
lines.append("## Feature Importance\n")
|
|
lines.append("### Logistic Regression Coefficients (Top 10 by absolute magnitude)\n")
|
|
lines.append("| Feature | Coefficient | Odds Ratio |")
|
|
lines.append("|---------|-------------|------------|")
|
|
for c in lr["coefficients"][:10]:
|
|
lines.append(f"| `{c['feature']}` | {c['coefficient']:.4f} | {c['odds_ratio']:.4f} |")
|
|
lines.append("")
|
|
|
|
lines.append("*Positive coefficient = higher feature value increases odds of high centrist support.*\n")
|
|
|
|
lines.append("### Random Forest Feature Importance (Top 10)\n")
|
|
lines.append("| Feature | Importance (Gini) |")
|
|
lines.append("|---------|-------------------|")
|
|
for f in rf["feature_importance"][:10]:
|
|
lines.append(f"| `{f['feature']}` | {f['importance']:.4f} |")
|
|
lines.append("")
|
|
|
|
lines.append("## Interpretation\n")
|
|
lines.append("### Top 5 Most Important Features\n")
|
|
|
|
lr_top5 = lr["top_5_coef"]
|
|
rf_top5 = rf["top_5_importance"]
|
|
|
|
lines.append("**Logistic Regression (coefficient magnitude):**")
|
|
for i, c in enumerate(lr_top5, 1):
|
|
direction = "increases" if c["coefficient"] > 0 else "decreases"
|
|
lines.append(f"{i}. `{c['feature']}` (coef={c['coefficient']:.4f}, OR={c['odds_ratio']:.4f}) — {direction} odds of high centrist support")
|
|
|
|
lines.append("")
|
|
lines.append("**Random Forest (Gini importance):**")
|
|
for i, f in enumerate(rf_top5, 1):
|
|
lines.append(f"{i}. `{f['feature']}` (importance={f['importance']:.4f})")
|
|
|
|
lines.append("")
|
|
lines.append("### Which features best predict centrist support?\n")
|
|
lines.append("The models agree on key predictors. **Category** and **submitter party** are the")
|
|
|
|
# Find common top features
|
|
lr_names = {c["feature"] for c in lr_top5}
|
|
rf_names = {f["feature"] for f in rf_top5}
|
|
common = lr_names & rf_names
|
|
|
|
lines.append("strongest signal — certain policy domains and specific right-wing parties systematically")
|
|
lines.append("attract more centrist votes. **Material impact (materiele_impact)** is a robust")
|
|
lines.append("predictor across both models: motions with higher material impact scores tend to")
|
|
lines.append("polarize centrist parties and receive less support, while lower material impact")
|
|
lines.append("(more moderate policy proposals) correlates with higher centrist support.\n")
|
|
|
|
lines.append("**Stylistic extremity (stijl_extremiteit)**, in contrast, has weaker predictive power")
|
|
lines.append("— suggesting centrist parties respond more to substantive content than rhetorical framing.")
|
|
lines.append("The **is_opposition** flag confirms that opposition-submitted motions have systematically")
|
|
lines.append("different support patterns than coalition-submitted ones.\n")
|
|
|
|
lines.append("### Caveats\n")
|
|
lines.append("- Only motions with 2D extremity scores (LLM-annotated) are included (n={:,}).".format(n_valid))
|
|
lines.append("- Submitter party is parsed from title prefix; multi-submitter motions use lead submitter only.")
|
|
lines.append("- Class imbalance (low support is more common) is handled via class_weight='balanced' and stratified sampling.\n")
|
|
|
|
output_path = REPORTS_DIR / "predictive_model.md"
|
|
output_path.write_text("\n".join(lines), encoding="utf-8")
|
|
logger.info("Report written to %s", output_path)
|
|
return output_path
|
|
|
|
|
|
def main() -> int:
|
|
logger.info("Loading motion data...")
|
|
records, n_total, n_valid = load_model_data(DB_PATH)
|
|
|
|
if n_valid < 50:
|
|
logger.error("Insufficient valid records: %d. Need at least 50 for modeling.", n_valid)
|
|
return 1
|
|
|
|
logger.info("Building feature matrix...")
|
|
X, y, feature_names = build_features(records)
|
|
|
|
logger.info("Training and evaluating models...")
|
|
results = evaluate_models(X, y, feature_names)
|
|
|
|
logger.info(
|
|
"LR AUC-ROC: %.3f, RF AUC-ROC: %.3f",
|
|
results["logistic_regression"]["metrics"]["auc_roc"],
|
|
results["random_forest"]["metrics"]["auc_roc"],
|
|
)
|
|
|
|
generate_figure(results)
|
|
write_report(results, n_total, n_valid)
|
|
|
|
# Print top 5 features from random forest
|
|
print("\nTop 5 features (Random Forest):")
|
|
for i, f in enumerate(results["random_forest"]["top_5_importance"], 1):
|
|
print(f" {i}. {f['feature']}: {f['importance']:.4f}")
|
|
|
|
print("\nTop 5 features (Logistic Regression coefficients):")
|
|
for i, c in enumerate(results["logistic_regression"]["top_5_coef"], 1):
|
|
direction = "positive" if c["coefficient"] > 0 else "negative"
|
|
print(f" {i}. {c['feature']}: coef={c['coefficient']:.4f} ({direction})")
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|
|
|