Implements U2: classify_motions.py loads keywords from U1 and classifies motions as right-wing when: - right_support >= 60% (CANONICAL_RIGHT parties voting 'voor') - left_opposition >= 40% (CANONICAL_LEFT parties voting 'tegen') - AND at least 1 right-wing keyword match in title/body_text Outputs DuckDB table with: - motion_id, year, title, right_support, left_opposition, centrist_support - right_keyword_matches, left_keyword_matches, classified flag Classified 2986 of 28331 motions (10.5%) as right-wing.main
parent
c6f8540671
commit
d3dfb0ce2f
@ -0,0 +1,262 @@ |
||||
#!/usr/bin/env python3 |
||||
"""Hybrid motion classifier: identify right-wing motions via keywords + voting patterns. |
||||
|
||||
Usage: |
||||
uv run python analysis/right_wing/classify_motions.py |
||||
""" |
||||
|
||||
from __future__ import annotations |
||||
|
||||
import argparse |
||||
import json |
||||
import logging |
||||
import re |
||||
import sys |
||||
from pathlib import Path |
||||
from typing import Any |
||||
|
||||
import duckdb |
||||
|
||||
ROOT = Path(__file__).parent.parent.parent.resolve() |
||||
if str(ROOT) not in sys.path: |
||||
sys.path.insert(0, str(ROOT)) |
||||
|
||||
from analysis.config import CANONICAL_LEFT, CANONICAL_RIGHT |
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") |
||||
logger = logging.getLogger(__name__) |
||||
|
||||
# Centrist parties for cross-ideological metrics |
||||
CANONICAL_CENTRIST = frozenset({"VVD", "D66", "CDA", "NSC", "BBB", "CU"}) |
||||
|
||||
|
||||
def _load_keywords(keywords_path: str) -> tuple[list[str], list[str]]: |
||||
"""Load right-wing and left-wing keywords from JSON.""" |
||||
with open(keywords_path, "r", encoding="utf-8") as f: |
||||
data = json.load(f) |
||||
right = [item["term"] for item in data.get("right_keywords", [])] |
||||
left = [item["term"] for item in data.get("left_keywords", [])] |
||||
return right, left |
||||
|
||||
|
||||
def _build_keyword_pattern(keywords: list[str]) -> re.Pattern | None: |
||||
"""Build case-insensitive whole-word regex from keyword list.""" |
||||
if not keywords: |
||||
return None |
||||
escaped = [re.escape(kw) for kw in keywords] |
||||
pattern = r"\b(?:" + "|".join(escaped) + r")\b" |
||||
return re.compile(pattern, re.IGNORECASE) |
||||
|
||||
|
||||
def _compute_party_metrics( |
||||
motion_votes: dict[str, dict[str, int]], |
||||
) -> tuple[float, float, float]: |
||||
"""Compute right_support, left_opposition, centrist_support for a motion. |
||||
|
||||
Returns: |
||||
(right_support, left_opposition, centrist_support) |
||||
Each is a float 0.0-1.0, or None if no relevant parties voted. |
||||
""" |
||||
|
||||
def _support_ratio(votes: dict[str, int], parties: frozenset[str]) -> float | None: |
||||
total = 0 |
||||
supportive = 0 |
||||
for party, pv in votes.items(): |
||||
if party not in parties: |
||||
continue |
||||
tv = pv.get("voor", 0) + pv.get("tegen", 0) + pv.get("afwezig", 0) |
||||
if tv == 0: |
||||
continue |
||||
total += 1 |
||||
# For right/centrist, "support" = voor; for left, "opposition" = tegen |
||||
if pv.get("voor", 0) / tv >= 0.5: |
||||
supportive += 1 |
||||
if total == 0: |
||||
return None |
||||
return supportive / total |
||||
|
||||
def _opposition_ratio(votes: dict[str, int], parties: frozenset[str]) -> float | None: |
||||
total = 0 |
||||
opposed = 0 |
||||
for party, pv in votes.items(): |
||||
if party not in parties: |
||||
continue |
||||
tv = pv.get("voor", 0) + pv.get("tegen", 0) + pv.get("afwezig", 0) |
||||
if tv == 0: |
||||
continue |
||||
total += 1 |
||||
if pv.get("tegen", 0) / tv >= 0.5: |
||||
opposed += 1 |
||||
if total == 0: |
||||
return None |
||||
return opposed / total |
||||
|
||||
right_support = _support_ratio(motion_votes, CANONICAL_RIGHT) |
||||
left_opposition = _opposition_ratio(motion_votes, CANONICAL_LEFT) |
||||
centrist_support = _support_ratio(motion_votes, CANONICAL_CENTRIST) |
||||
return right_support, left_opposition, centrist_support |
||||
|
||||
|
||||
def _match_keywords(text: str, pattern: re.Pattern | None) -> list[str]: |
||||
"""Return list of matched keywords in text.""" |
||||
if pattern is None or not text: |
||||
return [] |
||||
return pattern.findall(text) |
||||
|
||||
|
||||
def classify_motions( |
||||
db_path: str = "data/motions.db", |
||||
keywords_path: str = "analysis/right_wing/right_wing_keywords.json", |
||||
right_support_threshold: float = 0.60, |
||||
left_opposition_threshold: float = 0.40, |
||||
require_keywords: bool = True, |
||||
keyword_min_matches: int = 1, |
||||
) -> dict[str, Any]: |
||||
"""Classify motions and write results to `right_wing_motions` table. |
||||
|
||||
Returns stats dict with counts. |
||||
""" |
||||
db = Path(db_path) |
||||
if not db.exists(): |
||||
raise FileNotFoundError(f"Database not found: {db}") |
||||
|
||||
kw_path = Path(keywords_path) |
||||
if not kw_path.exists(): |
||||
raise FileNotFoundError(f"Keywords file not found: {kw_path}") |
||||
|
||||
right_kws, left_kws = _load_keywords(str(kw_path)) |
||||
right_pattern = _build_keyword_pattern(right_kws) |
||||
left_pattern = _build_keyword_pattern(left_kws) |
||||
|
||||
con = duckdb.connect(str(db)) |
||||
try: |
||||
# Create output table |
||||
con.execute("DROP TABLE IF EXISTS right_wing_motions") |
||||
con.execute( |
||||
""" |
||||
CREATE TABLE right_wing_motions ( |
||||
motion_id INTEGER PRIMARY KEY, |
||||
year INTEGER, |
||||
title VARCHAR, |
||||
right_support DOUBLE, |
||||
left_opposition DOUBLE, |
||||
centrist_support DOUBLE, |
||||
right_keyword_matches INTEGER, |
||||
left_keyword_matches INTEGER, |
||||
classified BOOLEAN |
||||
) |
||||
""" |
||||
) |
||||
|
||||
# Load all motion texts and dates |
||||
rows = con.execute( |
||||
"SELECT id, title, body_text, date FROM motions" |
||||
).fetchall() |
||||
motion_texts = {mid: (title or "") + " " + (body_text or "") for mid, title, body_text, _ in rows} |
||||
motion_years = {mid: date.year if date else None for mid, _, _, date in rows} |
||||
|
||||
# Load party votes |
||||
vote_rows = con.execute( |
||||
""" |
||||
SELECT motion_id, party, vote, COUNT(*) as n |
||||
FROM mp_votes |
||||
WHERE party IS NOT NULL |
||||
GROUP BY motion_id, party, vote |
||||
""" |
||||
).fetchall() |
||||
|
||||
motion_votes: dict[int, dict[str, dict[str, int]]] = {} |
||||
for motion_id, party, vote, n in vote_rows: |
||||
mv = motion_votes.setdefault(motion_id, {}) |
||||
pv = mv.setdefault(party, {"voor": 0, "tegen": 0, "afwezig": 0}) |
||||
pv[vote] = pv.get(vote, 0) + n |
||||
|
||||
classified_count = 0 |
||||
total_processed = 0 |
||||
|
||||
for motion_id, votes in motion_votes.items(): |
||||
text = motion_texts.get(motion_id, "") |
||||
year = motion_years.get(motion_id) |
||||
|
||||
right_support, left_opposition, centrist_support = _compute_party_metrics(votes) |
||||
|
||||
right_kw_matches = len(_match_keywords(text, right_pattern)) |
||||
left_kw_matches = len(_match_keywords(text, left_pattern)) |
||||
|
||||
# Classification logic |
||||
passes_votes = ( |
||||
right_support is not None |
||||
and right_support >= right_support_threshold |
||||
and left_opposition is not None |
||||
and left_opposition >= left_opposition_threshold |
||||
) |
||||
passes_keywords = right_kw_matches >= keyword_min_matches |
||||
|
||||
is_classified = passes_votes and (not require_keywords or passes_keywords) |
||||
|
||||
con.execute( |
||||
""" |
||||
INSERT INTO right_wing_motions |
||||
(motion_id, year, title, right_support, left_opposition, centrist_support, |
||||
right_keyword_matches, left_keyword_matches, classified) |
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) |
||||
""", |
||||
( |
||||
motion_id, |
||||
year, |
||||
motion_texts.get(motion_id, "")[:300], |
||||
right_support, |
||||
left_opposition, |
||||
centrist_support, |
||||
right_kw_matches, |
||||
left_kw_matches, |
||||
is_classified, |
||||
), |
||||
) |
||||
total_processed += 1 |
||||
if is_classified: |
||||
classified_count += 1 |
||||
|
||||
con.commit() |
||||
logger.info( |
||||
"Processed %d motions, classified %d as right-wing (%.1f%%)", |
||||
total_processed, |
||||
classified_count, |
||||
100 * classified_count / total_processed if total_processed else 0, |
||||
) |
||||
|
||||
return { |
||||
"total_processed": total_processed, |
||||
"classified": classified_count, |
||||
"right_keywords_loaded": len(right_kws), |
||||
"left_keywords_loaded": len(left_kws), |
||||
} |
||||
finally: |
||||
con.close() |
||||
|
||||
|
||||
def main() -> int: |
||||
parser = argparse.ArgumentParser(description="Classify right-wing motions") |
||||
parser.add_argument("--db", default="data/motions.db") |
||||
parser.add_argument("--keywords", default="analysis/right_wing/right_wing_keywords.json") |
||||
parser.add_argument("--right-threshold", type=float, default=0.60) |
||||
parser.add_argument("--left-threshold", type=float, default=0.40) |
||||
parser.add_argument("--require-keywords", action="store_true", default=True) |
||||
parser.add_argument("--no-require-keywords", dest="require_keywords", action="store_false") |
||||
parser.add_argument("--keyword-min-matches", type=int, default=1) |
||||
args = parser.parse_args() |
||||
|
||||
result = classify_motions( |
||||
db_path=args.db, |
||||
keywords_path=args.keywords, |
||||
right_support_threshold=args.right_threshold, |
||||
left_opposition_threshold=args.left_threshold, |
||||
require_keywords=args.require_keywords, |
||||
keyword_min_matches=args.keyword_min_matches, |
||||
) |
||||
print(json.dumps(result, indent=2)) |
||||
return 0 |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
raise SystemExit(main()) |
||||
Loading…
Reference in new issue