#!/usr/bin/env python3 """Policy extremity scorer: LLM-based radicalism scoring for right-wing motions. Usage: uv run python analysis/right_wing/extremity_scorer.py --sample 50 uv run python analysis/right_wing/extremity_scorer.py --sample -1 # all motions """ from __future__ import annotations import argparse import json import logging import os import sys from pathlib import Path from typing import Any import duckdb ROOT = Path(__file__).parent.parent.parent.resolve() if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from ai_provider import ProviderError, chat_completion_json_parallel from analysis.config import config logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger(__name__) # JSON schema enforcing the expected response shape EXTREMITY_SCHEMA = { "name": "extremity_score", "strict": True, "schema": { "type": "object", "properties": { "score": { "type": "integer", "description": "Radicalism score from 1 (mild/technical) to 5 (extreme/fundamental)", "minimum": 1, "maximum": 5, }, "explanation": { "type": "string", "description": "Short explanation in Dutch of why this score was given", }, }, "required": ["score", "explanation"], "additionalProperties": False, }, } PROMPT_TEMPLATE = """Dit is een motie in het Nederlandse parlement. Titel: {title} Tekst: {text} Wat vraagt deze motie concreet? Beoordeel hoe radicaal dit voorstel is op een schaal van 1 (mild/technisch) tot 5 (extreem/fundamenteel). Geef alleen het cijfer en een korte verklaring in het Nederlands.""" def _build_prompt(title: str, body_text: str | None) -> str: text = body_text or title or "" # Truncate body_text to keep prompt size reasonable if len(text) > 800: text = text[:800] + "..." return PROMPT_TEMPLATE.format(title=title or "", text=text) def _score_batch(motion_ids: list[int], titles: list[str], texts: list[str | None]) -> list[dict[str, Any]]: """Score a batch of motions in parallel via LLM.""" message_batches = [] for title, text in zip(titles, texts): prompt = _build_prompt(title, text) message_batches.append([{"role": "user", "content": prompt}]) try: results = chat_completion_json_parallel( message_batches, model=config.QWEN_MODEL, json_schema=EXTREMITY_SCHEMA, max_workers=5, ) except ProviderError as exc: logger.error("Batch API call failed: %s", exc) return [{"score": None, "explanation": None, "error": str(exc)}] * len(motion_ids) # Validate each result validated = [] for res in results: if not isinstance(res, dict): validated.append({"score": None, "explanation": None, "error": "non-dict response"}) continue score = res.get("score") explanation = res.get("explanation") if not isinstance(score, int) or score < 1 or score > 5: validated.append({"score": None, "explanation": None, "error": f"invalid score: {score}"}) continue validated.append({"score": score, "explanation": explanation, "error": None}) return validated def score_motions( db_path: str = "data/motions.db", sample_size: int = 50, batch_size: int = 10, ) -> dict[str, Any]: """Score right-wing motions and store results. Args: sample_size: Number of motions to score. -1 = all classified motions. """ db = Path(db_path) if not db.exists(): raise FileNotFoundError(f"Database not found: {db}") con = duckdb.connect(str(db)) try: # Ensure tables exist tables = {t[0] for t in con.execute("SHOW TABLES").fetchall()} if "right_wing_motions" not in tables: raise RuntimeError("Run classify_motions.py first.") # Load classified motions limit_clause = "" if sample_size < 0 else f"LIMIT {sample_size}" rows = con.execute( f""" SELECT r.motion_id, m.title, m.body_text FROM right_wing_motions r JOIN motions m ON r.motion_id = m.id WHERE r.classified = TRUE ORDER BY RANDOM() {limit_clause} """ ).fetchall() if not rows: logger.warning("No classified right-wing motions found.") return {"scored": 0, "failed": 0} logger.info("Scoring %d motions in batches of %d...", len(rows), batch_size) # Create output table con.execute("DROP TABLE IF EXISTS extremity_scores") con.execute( """ CREATE TABLE extremity_scores ( motion_id INTEGER PRIMARY KEY, score INTEGER, explanation VARCHAR, error VARCHAR ) """ ) scored = 0 failed = 0 for i in range(0, len(rows), batch_size): batch = rows[i : i + batch_size] motion_ids = [r[0] for r in batch] titles = [r[1] for r in batch] texts = [r[2] for r in batch] logger.info("Batch %d/%d (%d motions)", i // batch_size + 1, (len(rows) - 1) // batch_size + 1, len(batch)) results = _score_batch(motion_ids, titles, texts) for mid, res in zip(motion_ids, results): con.execute( "INSERT INTO extremity_scores (motion_id, score, explanation, error) VALUES (?, ?, ?, ?)", (mid, res.get("score"), res.get("explanation"), res.get("error")), ) if res.get("score") is not None: scored += 1 else: failed += 1 con.commit() # Update yearly summary with average extremity con.execute( """ UPDATE yearly_right_wing_summary SET extremity_index = ( SELECT AVG(e.score) FROM extremity_scores e JOIN right_wing_motions r ON e.motion_id = r.motion_id WHERE r.year = yearly_right_wing_summary.year AND e.score IS NOT NULL ) """ ) con.commit() logger.info("Scored %d motions, %d failures", scored, failed) return {"scored": scored, "failed": failed, "sample_size": len(rows)} finally: con.close() def main() -> int: parser = argparse.ArgumentParser(description="Score policy extremity of right-wing motions") parser.add_argument("--db", default="data/motions.db") parser.add_argument("--sample", type=int, default=50, help="Number of motions to score (-1 for all)") parser.add_argument("--batch-size", type=int, default=10) args = parser.parse_args() result = score_motions(db_path=args.db, sample_size=args.sample, batch_size=args.batch_size) print(json.dumps(result, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())