#!/usr/bin/env python3 """Policy extremity scorer: LLM-based radicalism scoring for right-wing motions. Scores BOTH the original motion text and the layman explanation separately. Usage: uv run python analysis/right_wing/extremity_scorer.py --sample 50 uv run python analysis/right_wing/extremity_scorer.py --sample -1 # all motions """ from __future__ import annotations import argparse import json import logging import sys from pathlib import Path from typing import Any import duckdb ROOT = Path(__file__).parent.parent.parent.resolve() if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from ai_provider import ProviderError, chat_completion_json_parallel from analysis.config import config logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger(__name__) EXTREMITY_SCHEMA = { "name": "extremity_score", "strict": True, "schema": { "type": "object", "properties": { "text_score": { "type": "integer", "description": "Radicalism of the original motion text (1=mild to 5=extreme)", "minimum": 1, "maximum": 5, }, "text_explanation": { "type": "string", "description": "Why the motion text got this score (Dutch)", }, "layman_score": { "type": "integer", "description": "Radicalism of the layman explanation (1=mild to 5=extreme)", "minimum": 1, "maximum": 5, }, "layman_explanation": { "type": "string", "description": "Why the layman explanation got this score (Dutch)", }, }, "required": ["text_score", "text_explanation", "layman_score", "layman_explanation"], "additionalProperties": False, }, } PROMPT_TEMPLATE = """Beoordeel de radicalisme van de volgende motie op twee manieren: 1) Het ORIGINELE motietekst: Titel: {title} Tekst: {text} 2) De VEREENVOUDIGDE uitleg: {layman} Geef voor ELKE versie een score van 1 (mild/technisch) tot 5 (extreem/fundamenteel) plus een korte verklaring in het Nederlands.""" def _build_prompt(title: str, body_text: str | None, layman: str | None) -> str: text = body_text or title or "" if len(text) > 500: text = text[:500] + "..." layman = layman or "(geen vereenvoudigde uitleg beschikbaar)" if len(layman) > 400: layman = layman[:400] + "..." return PROMPT_TEMPLATE.format(title=title or "", text=text, layman=layman) def _score_batch( motion_ids: list[int], titles: list[str], texts: list[str | None], laymen: list[str | None], ) -> list[dict[str, Any]]: """Score a batch of motions in parallel via LLM.""" message_batches = [] for title, text, layman in zip(titles, texts, laymen): prompt = _build_prompt(title, text, layman) message_batches.append([{"role": "user", "content": prompt}]) try: results = chat_completion_json_parallel( message_batches, model=config.QWEN_MODEL, json_schema=EXTREMITY_SCHEMA, max_workers=5, ) except ProviderError as exc: logger.error("Batch API call failed: %s", exc) return [{ "text_score": None, "text_explanation": None, "layman_score": None, "layman_explanation": None, "error": str(exc), }] * len(motion_ids) validated = [] for res in results: if not isinstance(res, dict): validated.append({ "text_score": None, "text_explanation": None, "layman_score": None, "layman_explanation": None, "error": "non-dict response", }) continue ts = res.get("text_score") te = res.get("text_explanation") ls = res.get("layman_score") le = res.get("layman_explanation") if not isinstance(ts, int) or ts < 1 or ts > 5: validated.append({ "text_score": None, "text_explanation": None, "layman_score": None, "layman_explanation": None, "error": f"invalid text_score: {ts}", }) continue if not isinstance(ls, int) or ls < 1 or ls > 5: validated.append({ "text_score": None, "text_explanation": None, "layman_score": None, "layman_explanation": None, "error": f"invalid layman_score: {ls}", }) continue validated.append({ "text_score": ts, "text_explanation": te, "layman_score": ls, "layman_explanation": le, "error": None, }) return validated def score_motions( db_path: str = "data/motions.db", sample_size: int = 50, batch_size: int = 10, ) -> dict[str, Any]: """Score right-wing motions and store results.""" db = Path(db_path) if not db.exists(): raise FileNotFoundError(f"Database not found: {db}") con = duckdb.connect(str(db)) try: tables = {t[0] for t in con.execute("SHOW TABLES").fetchall()} if "right_wing_motions" not in tables: raise RuntimeError("Run classify_motions.py first.") limit_clause = "" if sample_size < 0 else f"LIMIT {sample_size}" rows = con.execute( f""" SELECT r.motion_id, m.title, m.body_text, m.layman_explanation FROM right_wing_motions r JOIN motions m ON r.motion_id = m.id WHERE r.classified = TRUE ORDER BY RANDOM() {limit_clause} """ ).fetchall() if not rows: logger.warning("No classified right-wing motions found.") return {"scored": 0, "failed": 0} # Resume support: only create table if missing, skip already-scored motions con.execute( """ CREATE TABLE IF NOT EXISTS extremity_scores ( motion_id INTEGER PRIMARY KEY, text_score INTEGER, text_explanation VARCHAR, layman_score INTEGER, layman_explanation VARCHAR, error VARCHAR ) """ ) already_scored = { r[0] for r in con.execute("SELECT motion_id FROM extremity_scores WHERE error IS NULL").fetchall() } rows = [r for r in rows if r[0] not in already_scored] logger.info("Scoring %d motions in batches of %d...", len(rows), batch_size) scored = 0 failed = 0 for i in range(0, len(rows), batch_size): batch = rows[i : i + batch_size] motion_ids = [r[0] for r in batch] titles = [r[1] for r in batch] texts = [r[2] for r in batch] laymen = [r[3] for r in batch] logger.info("Batch %d/%d (%d motions)", i // batch_size + 1, (len(rows) - 1) // batch_size + 1, len(batch)) results = _score_batch(motion_ids, titles, texts, laymen) for mid, res in zip(motion_ids, results): con.execute( """ INSERT OR REPLACE INTO extremity_scores (motion_id, text_score, text_explanation, layman_score, layman_explanation, error) VALUES (?, ?, ?, ?, ?, ?) """, ( mid, res.get("text_score"), res.get("text_explanation"), res.get("layman_score"), res.get("layman_explanation"), res.get("error"), ), ) if res.get("error") is None: scored += 1 else: failed += 1 con.commit() # Update yearly summary with average extremity (using text_score as primary) con.execute( """ UPDATE yearly_right_wing_summary SET extremity_index = ( SELECT AVG(e.text_score) FROM extremity_scores e JOIN right_wing_motions r ON e.motion_id = r.motion_id WHERE r.year = yearly_right_wing_summary.year AND e.text_score IS NOT NULL ) """ ) con.commit() logger.info("Scored %d motions, %d failures", scored, failed) return {"scored": scored, "failed": failed, "sample_size": len(rows)} finally: con.close() def main() -> int: parser = argparse.ArgumentParser(description="Score policy extremity of right-wing motions") parser.add_argument("--db", default="data/motions.db") parser.add_argument("--sample", type=int, default=50, help="Number of motions to score (-1 for all)") parser.add_argument("--batch-size", type=int, default=10) args = parser.parse_args() result = score_motions(db_path=args.db, sample_size=args.sample, batch_size=args.batch_size) print(json.dumps(result, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())