From d2310edfc49ada2c07fc3924ee71495656600261 Mon Sep 17 00:00:00 2001 From: Sven Geboers Date: Tue, 5 May 2026 21:23:08 +0200 Subject: [PATCH] feat(right-wing): LLM-based policy extremity scoring MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements U4: extremity_scorer.py uses ai_provider.chat_completion_json_parallel with a JSON schema enforcing integer 1-5 + Dutch explanation. Design: - Batch size 10, max_workers 5 for parallel API calls - Prompt asks for concrete policy + radicalism score in Dutch - Stores results in table (motion_id, score, explanation, error) - Updates with yearly averages - Default sample=50 for validation; --sample -1 scores all motions Sample validation (50 motions): scores distributed 1→2, 2→34, 3→7, 4→7, yearly averages ~2.0-2.5 (mild-to-moderate radicalism). --- analysis/right_wing/extremity_scorer.py | 215 ++++++++++++++++++++++++ 1 file changed, 215 insertions(+) create mode 100644 analysis/right_wing/extremity_scorer.py diff --git a/analysis/right_wing/extremity_scorer.py b/analysis/right_wing/extremity_scorer.py new file mode 100644 index 0000000..64551a4 --- /dev/null +++ b/analysis/right_wing/extremity_scorer.py @@ -0,0 +1,215 @@ +#!/usr/bin/env python3 +"""Policy extremity scorer: LLM-based radicalism scoring for right-wing motions. + +Usage: + uv run python analysis/right_wing/extremity_scorer.py --sample 50 + uv run python analysis/right_wing/extremity_scorer.py --sample -1 # all motions +""" + +from __future__ import annotations + +import argparse +import json +import logging +import os +import sys +from pathlib import Path +from typing import Any + +import duckdb + +ROOT = Path(__file__).parent.parent.parent.resolve() +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from ai_provider import ProviderError, chat_completion_json_parallel +from analysis.config import config + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") +logger = logging.getLogger(__name__) + +# JSON schema enforcing the expected response shape +EXTREMITY_SCHEMA = { + "name": "extremity_score", + "strict": True, + "schema": { + "type": "object", + "properties": { + "score": { + "type": "integer", + "description": "Radicalism score from 1 (mild/technical) to 5 (extreme/fundamental)", + "minimum": 1, + "maximum": 5, + }, + "explanation": { + "type": "string", + "description": "Short explanation in Dutch of why this score was given", + }, + }, + "required": ["score", "explanation"], + "additionalProperties": False, + }, +} + +PROMPT_TEMPLATE = """Dit is een motie in het Nederlandse parlement. + +Titel: {title} + +Tekst: {text} + +Wat vraagt deze motie concreet? Beoordeel hoe radicaal dit voorstel is op een schaal van 1 (mild/technisch) tot 5 (extreem/fundamenteel). Geef alleen het cijfer en een korte verklaring in het Nederlands.""" + + +def _build_prompt(title: str, body_text: str | None) -> str: + text = body_text or title or "" + # Truncate body_text to keep prompt size reasonable + if len(text) > 800: + text = text[:800] + "..." + return PROMPT_TEMPLATE.format(title=title or "", text=text) + + +def _score_batch(motion_ids: list[int], titles: list[str], texts: list[str | None]) -> list[dict[str, Any]]: + """Score a batch of motions in parallel via LLM.""" + message_batches = [] + for title, text in zip(titles, texts): + prompt = _build_prompt(title, text) + message_batches.append([{"role": "user", "content": prompt}]) + + try: + results = chat_completion_json_parallel( + message_batches, + model=config.QWEN_MODEL, + json_schema=EXTREMITY_SCHEMA, + max_workers=5, + ) + except ProviderError as exc: + logger.error("Batch API call failed: %s", exc) + return [{"score": None, "explanation": None, "error": str(exc)}] * len(motion_ids) + + # Validate each result + validated = [] + for res in results: + if not isinstance(res, dict): + validated.append({"score": None, "explanation": None, "error": "non-dict response"}) + continue + score = res.get("score") + explanation = res.get("explanation") + if not isinstance(score, int) or score < 1 or score > 5: + validated.append({"score": None, "explanation": None, "error": f"invalid score: {score}"}) + continue + validated.append({"score": score, "explanation": explanation, "error": None}) + return validated + + +def score_motions( + db_path: str = "data/motions.db", + sample_size: int = 50, + batch_size: int = 10, +) -> dict[str, Any]: + """Score right-wing motions and store results. + + Args: + sample_size: Number of motions to score. -1 = all classified motions. + """ + db = Path(db_path) + if not db.exists(): + raise FileNotFoundError(f"Database not found: {db}") + + con = duckdb.connect(str(db)) + try: + # Ensure tables exist + tables = {t[0] for t in con.execute("SHOW TABLES").fetchall()} + if "right_wing_motions" not in tables: + raise RuntimeError("Run classify_motions.py first.") + + # Load classified motions + limit_clause = "" if sample_size < 0 else f"LIMIT {sample_size}" + rows = con.execute( + f""" + SELECT r.motion_id, m.title, m.body_text + FROM right_wing_motions r + JOIN motions m ON r.motion_id = m.id + WHERE r.classified = TRUE + ORDER BY RANDOM() + {limit_clause} + """ + ).fetchall() + + if not rows: + logger.warning("No classified right-wing motions found.") + return {"scored": 0, "failed": 0} + + logger.info("Scoring %d motions in batches of %d...", len(rows), batch_size) + + # Create output table + con.execute("DROP TABLE IF EXISTS extremity_scores") + con.execute( + """ + CREATE TABLE extremity_scores ( + motion_id INTEGER PRIMARY KEY, + score INTEGER, + explanation VARCHAR, + error VARCHAR + ) + """ + ) + + scored = 0 + failed = 0 + + for i in range(0, len(rows), batch_size): + batch = rows[i : i + batch_size] + motion_ids = [r[0] for r in batch] + titles = [r[1] for r in batch] + texts = [r[2] for r in batch] + + logger.info("Batch %d/%d (%d motions)", i // batch_size + 1, (len(rows) - 1) // batch_size + 1, len(batch)) + results = _score_batch(motion_ids, titles, texts) + + for mid, res in zip(motion_ids, results): + con.execute( + "INSERT INTO extremity_scores (motion_id, score, explanation, error) VALUES (?, ?, ?, ?)", + (mid, res.get("score"), res.get("explanation"), res.get("error")), + ) + if res.get("score") is not None: + scored += 1 + else: + failed += 1 + + con.commit() + + # Update yearly summary with average extremity + con.execute( + """ + UPDATE yearly_right_wing_summary + SET extremity_index = ( + SELECT AVG(e.score) + FROM extremity_scores e + JOIN right_wing_motions r ON e.motion_id = r.motion_id + WHERE r.year = yearly_right_wing_summary.year + AND e.score IS NOT NULL + ) + """ + ) + con.commit() + + logger.info("Scored %d motions, %d failures", scored, failed) + return {"scored": scored, "failed": failed, "sample_size": len(rows)} + finally: + con.close() + + +def main() -> int: + parser = argparse.ArgumentParser(description="Score policy extremity of right-wing motions") + parser.add_argument("--db", default="data/motions.db") + parser.add_argument("--sample", type=int, default=50, help="Number of motions to score (-1 for all)") + parser.add_argument("--batch-size", type=int, default=10) + args = parser.parse_args() + + result = score_motions(db_path=args.db, sample_size=args.sample, batch_size=args.batch_size) + print(json.dumps(result, indent=2)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())