You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
motief/analysis/right_wing/extremity_scorer.py

266 lines
8.8 KiB

#!/usr/bin/env python3
"""Policy extremity scorer: LLM-based radicalism scoring for right-wing motions.
Scores BOTH the original motion text and the layman explanation separately.
Usage:
uv run python analysis/right_wing/extremity_scorer.py --sample 50
uv run python analysis/right_wing/extremity_scorer.py --sample -1 # all motions
"""
from __future__ import annotations
import argparse
import json
import logging
import sys
from pathlib import Path
from typing import Any
import duckdb
ROOT = Path(__file__).parent.parent.parent.resolve()
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from ai_provider import ProviderError, chat_completion_json_parallel
from analysis.config import config
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
EXTREMITY_SCHEMA = {
"name": "extremity_score",
"strict": True,
"schema": {
"type": "object",
"properties": {
"text_score": {
"type": "integer",
"description": "Radicalism of the original motion text (1=mild to 5=extreme)",
"minimum": 1,
"maximum": 5,
},
"text_explanation": {
"type": "string",
"description": "Why the motion text got this score (Dutch)",
},
"layman_score": {
"type": "integer",
"description": "Radicalism of the layman explanation (1=mild to 5=extreme)",
"minimum": 1,
"maximum": 5,
},
"layman_explanation": {
"type": "string",
"description": "Why the layman explanation got this score (Dutch)",
},
},
"required": ["text_score", "text_explanation", "layman_score", "layman_explanation"],
"additionalProperties": False,
},
}
PROMPT_TEMPLATE = """Beoordeel de radicalisme van de volgende motie op twee manieren:
1) Het ORIGINELE motietekst:
Titel: {title}
Tekst: {text}
2) De VEREENVOUDIGDE uitleg:
{layman}
Geef voor ELKE versie een score van 1 (mild/technisch) tot 5 (extreem/fundamenteel) plus een korte verklaring in het Nederlands."""
def _build_prompt(title: str, body_text: str | None, layman: str | None) -> str:
text = body_text or title or ""
if len(text) > 500:
text = text[:500] + "..."
layman = layman or "(geen vereenvoudigde uitleg beschikbaar)"
if len(layman) > 400:
layman = layman[:400] + "..."
return PROMPT_TEMPLATE.format(title=title or "", text=text, layman=layman)
def _score_batch(
motion_ids: list[int],
titles: list[str],
texts: list[str | None],
laymen: list[str | None],
) -> list[dict[str, Any]]:
"""Score a batch of motions in parallel via LLM."""
message_batches = []
for title, text, layman in zip(titles, texts, laymen):
prompt = _build_prompt(title, text, layman)
message_batches.append([{"role": "user", "content": prompt}])
try:
results = chat_completion_json_parallel(
message_batches,
model=config.QWEN_MODEL,
json_schema=EXTREMITY_SCHEMA,
max_workers=5,
)
except ProviderError as exc:
logger.error("Batch API call failed: %s", exc)
return [{
"text_score": None, "text_explanation": None,
"layman_score": None, "layman_explanation": None,
"error": str(exc),
}] * len(motion_ids)
validated = []
for res in results:
if not isinstance(res, dict):
validated.append({
"text_score": None, "text_explanation": None,
"layman_score": None, "layman_explanation": None,
"error": "non-dict response",
})
continue
ts = res.get("text_score")
te = res.get("text_explanation")
ls = res.get("layman_score")
le = res.get("layman_explanation")
if not isinstance(ts, int) or ts < 1 or ts > 5:
validated.append({
"text_score": None, "text_explanation": None,
"layman_score": None, "layman_explanation": None,
"error": f"invalid text_score: {ts}",
})
continue
if not isinstance(ls, int) or ls < 1 or ls > 5:
validated.append({
"text_score": None, "text_explanation": None,
"layman_score": None, "layman_explanation": None,
"error": f"invalid layman_score: {ls}",
})
continue
validated.append({
"text_score": ts, "text_explanation": te,
"layman_score": ls, "layman_explanation": le,
"error": None,
})
return validated
def score_motions(
db_path: str = "data/motions.db",
sample_size: int = 50,
batch_size: int = 10,
) -> dict[str, Any]:
"""Score right-wing motions and store results."""
db = Path(db_path)
if not db.exists():
raise FileNotFoundError(f"Database not found: {db}")
con = duckdb.connect(str(db))
try:
tables = {t[0] for t in con.execute("SHOW TABLES").fetchall()}
if "right_wing_motions" not in tables:
raise RuntimeError("Run classify_motions.py first.")
limit_clause = "" if sample_size < 0 else f"LIMIT {sample_size}"
rows = con.execute(
f"""
SELECT r.motion_id, m.title, m.body_text, m.layman_explanation
FROM right_wing_motions r
JOIN motions m ON r.motion_id = m.id
WHERE r.classified = TRUE
ORDER BY RANDOM()
{limit_clause}
"""
).fetchall()
if not rows:
logger.warning("No classified right-wing motions found.")
return {"scored": 0, "failed": 0}
logger.info("Scoring %d motions in batches of %d...", len(rows), batch_size)
con.execute("DROP TABLE IF EXISTS extremity_scores")
con.execute(
"""
CREATE TABLE extremity_scores (
motion_id INTEGER PRIMARY KEY,
text_score INTEGER,
text_explanation VARCHAR,
layman_score INTEGER,
layman_explanation VARCHAR,
error VARCHAR
)
"""
)
scored = 0
failed = 0
for i in range(0, len(rows), batch_size):
batch = rows[i : i + batch_size]
motion_ids = [r[0] for r in batch]
titles = [r[1] for r in batch]
texts = [r[2] for r in batch]
laymen = [r[3] for r in batch]
logger.info("Batch %d/%d (%d motions)", i // batch_size + 1, (len(rows) - 1) // batch_size + 1, len(batch))
results = _score_batch(motion_ids, titles, texts, laymen)
for mid, res in zip(motion_ids, results):
con.execute(
"""
INSERT INTO extremity_scores
(motion_id, text_score, text_explanation, layman_score, layman_explanation, error)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
mid,
res.get("text_score"),
res.get("text_explanation"),
res.get("layman_score"),
res.get("layman_explanation"),
res.get("error"),
),
)
if res.get("error") is None:
scored += 1
else:
failed += 1
con.commit()
# Update yearly summary with average extremity (using text_score as primary)
con.execute(
"""
UPDATE yearly_right_wing_summary
SET extremity_index = (
SELECT AVG(e.text_score)
FROM extremity_scores e
JOIN right_wing_motions r ON e.motion_id = r.motion_id
WHERE r.year = yearly_right_wing_summary.year
AND e.text_score IS NOT NULL
)
"""
)
con.commit()
logger.info("Scored %d motions, %d failures", scored, failed)
return {"scored": scored, "failed": failed, "sample_size": len(rows)}
finally:
con.close()
def main() -> int:
parser = argparse.ArgumentParser(description="Score policy extremity of right-wing motions")
parser.add_argument("--db", default="data/motions.db")
parser.add_argument("--sample", type=int, default=50, help="Number of motions to score (-1 for all)")
parser.add_argument("--batch-size", type=int, default=10)
args = parser.parse_args()
result = score_motions(db_path=args.db, sample_size=args.sample, batch_size=args.batch_size)
print(json.dumps(result, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())