You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
270 lines
9.1 KiB
270 lines
9.1 KiB
#!/usr/bin/env python3
|
|
"""Policy extremity scorer: LLM-based radicalism scoring for right-wing motions.
|
|
|
|
Scores BOTH the original motion text and the layman explanation separately.
|
|
|
|
Usage:
|
|
uv run python analysis/right_wing/extremity_scorer.py --sample 50
|
|
uv run python analysis/right_wing/extremity_scorer.py --sample -1 # all motions
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import duckdb
|
|
|
|
ROOT = Path(__file__).parent.parent.parent.resolve()
|
|
if str(ROOT) not in sys.path:
|
|
sys.path.insert(0, str(ROOT))
|
|
|
|
from ai_provider import ProviderError, chat_completion_json_parallel
|
|
from analysis.config import config
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
EXTREMITY_SCHEMA = {
|
|
"name": "extremity_score",
|
|
"strict": True,
|
|
"schema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"text_score": {
|
|
"type": "integer",
|
|
"description": "Radicalism of the original motion text (1=mild to 5=extreme)",
|
|
"minimum": 1,
|
|
"maximum": 5,
|
|
},
|
|
"text_explanation": {
|
|
"type": "string",
|
|
"description": "Why the motion text got this score (Dutch)",
|
|
},
|
|
"layman_score": {
|
|
"type": "integer",
|
|
"description": "Radicalism of the layman explanation (1=mild to 5=extreme)",
|
|
"minimum": 1,
|
|
"maximum": 5,
|
|
},
|
|
"layman_explanation": {
|
|
"type": "string",
|
|
"description": "Why the layman explanation got this score (Dutch)",
|
|
},
|
|
},
|
|
"required": ["text_score", "text_explanation", "layman_score", "layman_explanation"],
|
|
"additionalProperties": False,
|
|
},
|
|
}
|
|
|
|
PROMPT_TEMPLATE = """Beoordeel de radicalisme van de volgende motie op twee manieren:
|
|
|
|
1) Het ORIGINELE motietekst:
|
|
Titel: {title}
|
|
Tekst: {text}
|
|
|
|
2) De VEREENVOUDIGDE uitleg:
|
|
{layman}
|
|
|
|
Geef voor ELKE versie een score van 1 (mild/technisch) tot 5 (extreem/fundamenteel) plus een korte verklaring in het Nederlands."""
|
|
|
|
|
|
def _build_prompt(title: str, body_text: str | None, layman: str | None) -> str:
|
|
text = body_text or title or ""
|
|
if len(text) > 500:
|
|
text = text[:500] + "..."
|
|
layman = layman or "(geen vereenvoudigde uitleg beschikbaar)"
|
|
if len(layman) > 400:
|
|
layman = layman[:400] + "..."
|
|
return PROMPT_TEMPLATE.format(title=title or "", text=text, layman=layman)
|
|
|
|
|
|
def _score_batch(
|
|
motion_ids: list[int],
|
|
titles: list[str],
|
|
texts: list[str | None],
|
|
laymen: list[str | None],
|
|
) -> list[dict[str, Any]]:
|
|
"""Score a batch of motions in parallel via LLM."""
|
|
message_batches = []
|
|
for title, text, layman in zip(titles, texts, laymen):
|
|
prompt = _build_prompt(title, text, layman)
|
|
message_batches.append([{"role": "user", "content": prompt}])
|
|
|
|
try:
|
|
results = chat_completion_json_parallel(
|
|
message_batches,
|
|
model=config.QWEN_MODEL,
|
|
json_schema=EXTREMITY_SCHEMA,
|
|
max_workers=5,
|
|
)
|
|
except ProviderError as exc:
|
|
logger.error("Batch API call failed: %s", exc)
|
|
return [{
|
|
"text_score": None, "text_explanation": None,
|
|
"layman_score": None, "layman_explanation": None,
|
|
"error": str(exc),
|
|
}] * len(motion_ids)
|
|
|
|
validated = []
|
|
for res in results:
|
|
if not isinstance(res, dict):
|
|
validated.append({
|
|
"text_score": None, "text_explanation": None,
|
|
"layman_score": None, "layman_explanation": None,
|
|
"error": "non-dict response",
|
|
})
|
|
continue
|
|
ts = res.get("text_score")
|
|
te = res.get("text_explanation")
|
|
ls = res.get("layman_score")
|
|
le = res.get("layman_explanation")
|
|
if not isinstance(ts, int) or ts < 1 or ts > 5:
|
|
validated.append({
|
|
"text_score": None, "text_explanation": None,
|
|
"layman_score": None, "layman_explanation": None,
|
|
"error": f"invalid text_score: {ts}",
|
|
})
|
|
continue
|
|
if not isinstance(ls, int) or ls < 1 or ls > 5:
|
|
validated.append({
|
|
"text_score": None, "text_explanation": None,
|
|
"layman_score": None, "layman_explanation": None,
|
|
"error": f"invalid layman_score: {ls}",
|
|
})
|
|
continue
|
|
validated.append({
|
|
"text_score": ts, "text_explanation": te,
|
|
"layman_score": ls, "layman_explanation": le,
|
|
"error": None,
|
|
})
|
|
return validated
|
|
|
|
|
|
def score_motions(
|
|
db_path: str = "data/motions.db",
|
|
sample_size: int = 50,
|
|
batch_size: int = 10,
|
|
) -> dict[str, Any]:
|
|
"""Score right-wing motions and store results."""
|
|
db = Path(db_path)
|
|
if not db.exists():
|
|
raise FileNotFoundError(f"Database not found: {db}")
|
|
|
|
con = duckdb.connect(str(db))
|
|
try:
|
|
tables = {t[0] for t in con.execute("SHOW TABLES").fetchall()}
|
|
if "right_wing_motions" not in tables:
|
|
raise RuntimeError("Run classify_motions.py first.")
|
|
|
|
limit_clause = "" if sample_size < 0 else f"LIMIT {sample_size}"
|
|
rows = con.execute(
|
|
f"""
|
|
SELECT r.motion_id, m.title, m.body_text, m.layman_explanation
|
|
FROM right_wing_motions r
|
|
JOIN motions m ON r.motion_id = m.id
|
|
WHERE r.classified = TRUE
|
|
ORDER BY RANDOM()
|
|
{limit_clause}
|
|
"""
|
|
).fetchall()
|
|
|
|
if not rows:
|
|
logger.warning("No classified right-wing motions found.")
|
|
return {"scored": 0, "failed": 0}
|
|
|
|
# Resume support: only create table if missing, skip already-scored motions
|
|
con.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS extremity_scores (
|
|
motion_id INTEGER PRIMARY KEY,
|
|
text_score INTEGER,
|
|
text_explanation VARCHAR,
|
|
layman_score INTEGER,
|
|
layman_explanation VARCHAR,
|
|
error VARCHAR
|
|
)
|
|
"""
|
|
)
|
|
already_scored = {
|
|
r[0] for r in con.execute("SELECT motion_id FROM extremity_scores WHERE error IS NULL").fetchall()
|
|
}
|
|
rows = [r for r in rows if r[0] not in already_scored]
|
|
|
|
logger.info("Scoring %d motions in batches of %d...", len(rows), batch_size)
|
|
|
|
scored = 0
|
|
failed = 0
|
|
|
|
for i in range(0, len(rows), batch_size):
|
|
batch = rows[i : i + batch_size]
|
|
motion_ids = [r[0] for r in batch]
|
|
titles = [r[1] for r in batch]
|
|
texts = [r[2] for r in batch]
|
|
laymen = [r[3] for r in batch]
|
|
|
|
logger.info("Batch %d/%d (%d motions)", i // batch_size + 1, (len(rows) - 1) // batch_size + 1, len(batch))
|
|
results = _score_batch(motion_ids, titles, texts, laymen)
|
|
|
|
for mid, res in zip(motion_ids, results):
|
|
con.execute(
|
|
"""
|
|
INSERT OR REPLACE INTO extremity_scores
|
|
(motion_id, text_score, text_explanation, layman_score, layman_explanation, error)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
mid,
|
|
res.get("text_score"),
|
|
res.get("text_explanation"),
|
|
res.get("layman_score"),
|
|
res.get("layman_explanation"),
|
|
res.get("error"),
|
|
),
|
|
)
|
|
if res.get("error") is None:
|
|
scored += 1
|
|
else:
|
|
failed += 1
|
|
|
|
con.commit()
|
|
|
|
# Update yearly summary with average extremity (using text_score as primary)
|
|
con.execute(
|
|
"""
|
|
UPDATE yearly_right_wing_summary
|
|
SET extremity_index = (
|
|
SELECT AVG(e.text_score)
|
|
FROM extremity_scores e
|
|
JOIN right_wing_motions r ON e.motion_id = r.motion_id
|
|
WHERE r.year = yearly_right_wing_summary.year
|
|
AND e.text_score IS NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
con.commit()
|
|
|
|
logger.info("Scored %d motions, %d failures", scored, failed)
|
|
return {"scored": scored, "failed": failed, "sample_size": len(rows)}
|
|
finally:
|
|
con.close()
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Score policy extremity of right-wing motions")
|
|
parser.add_argument("--db", default="data/motions.db")
|
|
parser.add_argument("--sample", type=int, default=50, help="Number of motions to score (-1 for all)")
|
|
parser.add_argument("--batch-size", type=int, default=10)
|
|
args = parser.parse_args()
|
|
|
|
result = score_motions(db_path=args.db, sample_size=args.sample, batch_size=args.batch_size)
|
|
print(json.dumps(result, indent=2))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|
|
|