From f94edc3d04aa67b9f3f16eaa104ed2572976d107 Mon Sep 17 00:00:00 2001 From: Sven Geboers Date: Tue, 5 May 2026 21:25:42 +0200 Subject: [PATCH] feat(right-wing): sentiment analysis pipeline for right-wing motions Implements U5: sentiment_analysis.py uses LLM batch calls (fallback when no local Dutch sentiment model is available) to score motion sentiment on [-1, 1] scale. Design: - Prompt asks for sentiment from -1 (hostile/aggressive) to 1 (constructive) - JSON schema enforces numeric score + Dutch explanation - Batch size 10, max_workers 5 for parallel API calls - Stores results in table - Updates with avg_sentiment, sentiment_std, pct_strongly_negative per year Sample validation (50 motions): good variance across [-0.9, 1.0] range. --- analysis/right_wing/sentiment_analysis.py | 228 ++++++++++++++++++++++ 1 file changed, 228 insertions(+) create mode 100644 analysis/right_wing/sentiment_analysis.py diff --git a/analysis/right_wing/sentiment_analysis.py b/analysis/right_wing/sentiment_analysis.py new file mode 100644 index 0000000..689c993 --- /dev/null +++ b/analysis/right_wing/sentiment_analysis.py @@ -0,0 +1,228 @@ +#!/usr/bin/env python3 +"""Sentiment analysis pipeline: Dutch sentiment scoring for right-wing motions. + +Uses LLM batch calls (fallback when no local Dutch sentiment model is available). +Maps outputs to [-1, 1] scale where negative = hostile/aggressive, positive = constructive. + +Usage: + uv run python analysis/right_wing/sentiment_analysis.py --sample 50 + uv run python analysis/right_wing/sentiment_analysis.py --sample -1 +""" + +from __future__ import annotations + +import argparse +import json +import logging +import sys +from pathlib import Path +from typing import Any + +import duckdb + +ROOT = Path(__file__).parent.parent.parent.resolve() +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from ai_provider import ProviderError, chat_completion_json_parallel +from analysis.config import config + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") +logger = logging.getLogger(__name__) + +SENTIMENT_SCHEMA = { + "name": "sentiment_score", + "strict": True, + "schema": { + "type": "object", + "properties": { + "score": { + "type": "number", + "description": "Sentiment score from -1 (very negative/hostile) to 1 (very positive/constructive)", + "minimum": -1, + "maximum": 1, + }, + "explanation": { + "type": "string", + "description": "Short explanation in Dutch of why this sentiment was given", + }, + }, + "required": ["score", "explanation"], + "additionalProperties": False, + }, +} + +PROMPT_TEMPLATE = """Beoordeel de sentiment van de volgende motie uit het Nederlandse parlement. + +Titel: {title} + +Tekst: {text} + +Geef een sentiment score van -1 (zeer negatief, agressief, vijandig) tot 1 (zeer positief, constructief, coöperatief). Geef ook een korte verklaring in het Nederlands.""" + + +def _build_prompt(title: str, body_text: str | None) -> str: + text = body_text or title or "" + if len(text) > 400: + text = text[:400] + "..." + return PROMPT_TEMPLATE.format(title=title or "", text=text) + + +def _score_batch(motion_ids: list[int], titles: list[str], texts: list[str | None]) -> list[dict[str, Any]]: + """Score sentiment for a batch of motions in parallel via LLM.""" + message_batches = [] + for title, text in zip(titles, texts): + prompt = _build_prompt(title, text) + message_batches.append([{"role": "user", "content": prompt}]) + + try: + results = chat_completion_json_parallel( + message_batches, + model=config.QWEN_MODEL, + json_schema=SENTIMENT_SCHEMA, + max_workers=5, + ) + except ProviderError as exc: + logger.error("Batch API call failed: %s", exc) + return [{"score": None, "explanation": None, "error": str(exc)}] * len(motion_ids) + + validated = [] + for res in results: + if not isinstance(res, dict): + validated.append({"score": None, "explanation": None, "error": "non-dict response"}) + continue + score = res.get("score") + explanation = res.get("explanation") + if not isinstance(score, (int, float)) or score < -1 or score > 1: + validated.append({"score": None, "explanation": None, "error": f"invalid score: {score}"}) + continue + validated.append({"score": float(score), "explanation": explanation, "error": None}) + return validated + + +def analyze_sentiment( + db_path: str = "data/motions.db", + sample_size: int = 50, + batch_size: int = 10, +) -> dict[str, Any]: + """Analyze sentiment of right-wing motions and aggregate by year.""" + db = Path(db_path) + if not db.exists(): + raise FileNotFoundError(f"Database not found: {db}") + + con = duckdb.connect(str(db)) + try: + tables = {t[0] for t in con.execute("SHOW TABLES").fetchall()} + if "right_wing_motions" not in tables: + raise RuntimeError("Run classify_motions.py first.") + + limit_clause = "" if sample_size < 0 else f"LIMIT {sample_size}" + rows = con.execute( + f""" + SELECT r.motion_id, r.year, m.title, m.body_text + FROM right_wing_motions r + JOIN motions m ON r.motion_id = m.id + WHERE r.classified = TRUE + ORDER BY RANDOM() + {limit_clause} + """ + ).fetchall() + + if not rows: + logger.warning("No classified right-wing motions found.") + return {"scored": 0, "failed": 0} + + logger.info("Scoring sentiment for %d motions in batches of %d...", len(rows), batch_size) + + con.execute("DROP TABLE IF EXISTS sentiment_scores") + con.execute( + """ + CREATE TABLE sentiment_scores ( + motion_id INTEGER PRIMARY KEY, + year INTEGER, + score DOUBLE, + explanation VARCHAR, + error VARCHAR + ) + """ + ) + + scored = 0 + failed = 0 + + for i in range(0, len(rows), batch_size): + batch = rows[i : i + batch_size] + motion_ids = [r[0] for r in batch] + years = [r[1] for r in batch] + titles = [r[2] for r in batch] + texts = [r[3] for r in batch] + + logger.info("Batch %d/%d (%d motions)", i // batch_size + 1, (len(rows) - 1) // batch_size + 1, len(batch)) + results = _score_batch(motion_ids, titles, texts) + + for mid, year, res in zip(motion_ids, years, results): + con.execute( + "INSERT INTO sentiment_scores (motion_id, year, score, explanation, error) VALUES (?, ?, ?, ?, ?)", + (mid, year, res.get("score"), res.get("explanation"), res.get("error")), + ) + if res.get("score") is not None: + scored += 1 + else: + failed += 1 + + con.commit() + + # Add sentiment columns to yearly summary if not present + cols = {c[0] for c in con.execute("PRAGMA table_info(yearly_right_wing_summary)").fetchall()} + if "avg_sentiment" not in cols: + con.execute("ALTER TABLE yearly_right_wing_summary ADD COLUMN avg_sentiment DOUBLE") + if "sentiment_std" not in cols: + con.execute("ALTER TABLE yearly_right_wing_summary ADD COLUMN sentiment_std DOUBLE") + if "pct_strongly_negative" not in cols: + con.execute("ALTER TABLE yearly_right_wing_summary ADD COLUMN pct_strongly_negative DOUBLE") + + con.execute( + """ + UPDATE yearly_right_wing_summary + SET avg_sentiment = ( + SELECT AVG(s.score) + FROM sentiment_scores s + WHERE s.year = yearly_right_wing_summary.year + AND s.score IS NOT NULL + ), + sentiment_std = ( + SELECT STDDEV(s.score) + FROM sentiment_scores s + WHERE s.year = yearly_right_wing_summary.year + AND s.score IS NOT NULL + ), + pct_strongly_negative = ( + SELECT COUNT(CASE WHEN s.score < -0.5 THEN 1 END) * 100.0 / NULLIF(COUNT(*), 0) + FROM sentiment_scores s + WHERE s.year = yearly_right_wing_summary.year + AND s.score IS NOT NULL + ) + """ + ) + con.commit() + + logger.info("Scored %d motions, %d failures", scored, failed) + return {"scored": scored, "failed": failed, "sample_size": len(rows)} + finally: + con.close() + + +def main() -> int: + parser = argparse.ArgumentParser(description="Sentiment analysis for right-wing motions") + parser.add_argument("--db", default="data/motions.db") + parser.add_argument("--sample", type=int, default=50, help="Number of motions to score (-1 for all)") + parser.add_argument("--batch-size", type=int, default=10) + args = parser.parse_args() + + result = analyze_sentiment(db_path=args.db, sample_size=args.sample, batch_size=args.batch_size) + print(json.dumps(result, indent=2)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())