You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
motief/analysis/axis_classifier.py

659 lines
23 KiB

"""Axis classifier: correlate per-party PCA positions against ideology reference data
to assign honest, dynamic labels to political compass axes.
Public API: classify_axes(positions_by_window, axes, db_path) -> dict
"""
import logging
from collections import Counter
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import numpy as np
import re
import json
from analysis.svd_labels import get_svd_label, get_fallback_labels
_logger = logging.getLogger(__name__)
# Module-level caches — loaded once per process lifetime.
_ideology_cache: Optional[Dict[str, Dict[str, float]]] = None
_coalition_cache: Optional[Dict[str, set]] = None
# Correlation threshold above which we consider an axis "explained" by a dimension.
_THRESHOLD = 0.65
_LABELS = {
"lr": "Verzorgingsstaat–Marktwerking",
"eu": "EU-integratie–Nationalisme",
"pi": "Populistisch–Institutioneel",
"co": "Coalitie–Oppositie",
"pc": "Conservatief–Progressief",
# When we have no interpretable classifier signal, fall back to the known
# SVD component meanings rather than generic "As N" labels.
"fallback_x": get_svd_label(1),
"fallback_y": get_svd_label(2),
}
# Module-level helper: map internal/modal labels to user-facing labels.
# Remove duplicate lower definition (keep the one at the top)
def display_label_for_modal(modal_label: Optional[str], axis: str) -> str:
"""Return a user-facing axis label for a modal/internal label.
Maps numeric fallback names 'As 1' / 'Stempatroon As 1' to the
semantic labels from SVD_THEMES. Any other label is returned unchanged.
None is treated as the semantic fallback for the axis.
"""
if modal_label is None:
# Fallback to component 1 (x) or 2 (y)
comp = 1 if axis == "x" else 2
return get_svd_label(comp)
# Map "As 1" / "As 2" to semantic labels
if axis == "x" and modal_label in ("As 1", "Stempatroon As 1"):
return get_svd_label(1)
if axis == "y" and modal_label in ("As 2", "Stempatroon As 2"):
return get_svd_label(2)
return modal_label
_INTERPRETATION_TEMPLATES = {
"lr": "De {orientation} as weerspiegelt de economische tegenstelling tussen verzorgingsstaat en marktwerking.",
"eu": "De {orientation} as weerspiegelt de tegenstelling tussen EU-integratie/internationalisme en nationalisme/soevereiniteit.",
"pi": "De {orientation} as scheidt populistisch-nationalistische partijen van het institutioneel-parlementaire establishment.",
"co": (
"De {orientation} as weerspiegelt stemgedrag van coalitie- versus "
"oppositiepartijen (r={r:.2f}). Ideologische tegenstellingen zijn minder dominant dit jaar."
),
"pc": "De {orientation} as weerspiegelt de progressief-conservatieve tegenstelling.",
}
# Maps motion-path keyword labels to _INTERPRETATION_TEMPLATES keys.
# Labels not present here fall back to "fallback".
_MOTION_LABEL_TEMPLATE_KEY: Dict[str, str] = {
"Verzorgingsstaat–Marktwerking": "lr",
"EU-integratie–Nationalisme": "eu",
"Populistisch–Institutioneel": "pi",
"Progressief–Conservatief": "pc",
}
# Simple keyword-based classifier for motion titles (fallback signal)
_KEYWORD_THRESHOLD = 0.4
_KEYWORDS: Dict[str, List[str]] = {
"Verzorgingsstaat–Marktwerking": [
# economic / welfare state
"belasting",
"uitkering",
"bijstand",
"minimumloon",
"cao",
"vakbond",
"bezuiniging",
"privatisering",
"subsidie",
"pensioen",
"aow",
"zorg",
"huur",
"woning",
"sociaal",
"werkloos",
"ww",
"arbeidsongeschik",
"wao",
"gemeentefonds",
],
"EU-integratie–Nationalisme": [
# EU and international cooperation
"europees",
"europese",
" eu ",
"eu-",
"verdrag",
"intergouvernementeel",
"samenwerking",
"internationaal",
"navo",
"nato",
" vn ",
"vn-",
"sancties",
"israël",
"vluchteling",
"asiel",
"soevereiniteit",
"nationaal",
],
"Populistisch–Institutioneel": [
# Populist/nationalist themes
"terugsturen",
"syrië",
"syrier",
"grenzen dicht",
"remigratie",
"eigen volk",
"nederland eerst",
"corona",
"vaccin",
"ivermectine",
"hydroxychloroquine",
"complot",
"deep state",
"establishment",
"elite",
"herstelbetaling",
"excuses",
],
"Progressief–Conservatief": [
# environment
"klimaat",
"stikstof",
"duurzaam",
"duurzaamheid",
"co2",
"energietransitie",
"biodiversiteit",
# social
"euthanasie",
"abortus",
"lgbtq",
"transgender",
"diversiteit",
"traditi",
"gezin",
"religie",
"geloof",
],
}
# Pre-compiled regexes for keyword matching. We escape keywords but do NOT add
# word-boundaries because some keywords intentionally match substrings
# (e.g. 'traditi' matching 'tradities'). re.IGNORECASE makes lowercasing
# unnecessary during matching.
_KEYWORD_REGEXES: Dict[str, "re.Pattern[str]"] = {
cat: re.compile(
"|".join(re.escape(kw.strip()) for kw in kws),
re.IGNORECASE,
)
for cat, kws in _KEYWORDS.items()
}
def _classify_from_titles(titles: List[str]) -> Tuple[Optional[str], float]:
"""Classify a list of motion titles into an axis category using keyword matching.
Returns (category_label, confidence) where confidence = fraction of titles
containing at least one keyword from the winning category.
Returns (None, confidence) if confidence is below _KEYWORD_THRESHOLD.
"""
if not titles:
return None, 0.0
counts: Dict[str, int] = {cat: 0 for cat in _KEYWORDS}
for title in titles:
for cat, rx in _KEYWORD_REGEXES.items():
if rx.search(title):
counts[cat] += 1
# Determine the best category, but be deterministic on ties: if more than
# one category has the top count, return None to indicate ambiguity.
best_count = max(counts.values())
best_cats = [cat for cat, cnt in counts.items() if cnt == best_count]
confidence = best_count / len(titles)
if len(best_cats) != 1 or confidence < _KEYWORD_THRESHOLD:
return None, confidence
return best_cats[0], confidence
def _load_motion_vectors(db_path: str, window_id: str) -> Dict[int, np.ndarray]:
"""Load SVD motion vectors for a given window from DuckDB.
Returns {motion_id: vector_array}. Returns {} on any error.
"""
try:
import duckdb
conn = duckdb.connect(db_path, read_only=True)
try:
rows = conn.execute(
"SELECT entity_id, vector FROM svd_vectors "
"WHERE entity_type = 'motion' AND window_id = ?",
[window_id],
).fetchall()
finally:
conn.close()
result: Dict[int, np.ndarray] = {}
for entity_id, vector_raw in rows:
try:
mid = int(entity_id)
vec = np.array(json.loads(vector_raw), dtype=float)
result[mid] = vec
except Exception:
continue
return result
except Exception as exc:
_logger.debug("Failed to load motion vectors for window %s: %s", window_id, exc)
return {}
def _project_motions(
motion_vecs: Dict[int, np.ndarray],
x_axis: np.ndarray,
y_axis: np.ndarray,
global_mean: np.ndarray,
) -> Dict[int, Tuple[float, float]]:
"""Project motion vectors onto the PCA axes after centering by global_mean.
Returns {motion_id: (x_score, y_score)}.
"""
try:
projections: Dict[int, Tuple[float, float]] = {}
for mid, vec in motion_vecs.items():
try:
centered = vec - global_mean
x_score = float(np.dot(centered, x_axis))
y_score = float(np.dot(centered, y_axis))
projections[mid] = (x_score, y_score)
except Exception:
continue
return projections
except Exception as exc:
_logger.debug("Failed to project motions: %s", exc)
return {}
def _top_motion_ids(
projections: Dict[int, Tuple[float, float]],
axis: str,
n: int = 5,
) -> Dict[str, List[int]]:
"""Return the top-n motion IDs at each pole of the given axis.
axis: 'x' or 'y'
Returns {'+': [motion_ids], '-': [motion_ids]} (highest positive first,
most negative first in the '-' list).
"""
try:
if axis not in ("x", "y"):
raise ValueError("axis must be 'x' or 'y'")
idx = 0 if axis == "x" else 1
sorted_ids = sorted(projections, key=lambda mid: projections[mid][idx])
neg_ids = sorted_ids[:n]
pos_ids = sorted_ids[-n:][::-1]
return {"+": pos_ids, "-": neg_ids}
except Exception as exc:
_logger.debug("Failed to compute top_motion_ids: %s", exc)
return {"+": [], "-": []}
def _fetch_motion_titles(
db_path: str,
motion_ids: List[int],
) -> Dict[int, Tuple[str, str]]:
"""Fetch (title, date) for a list of motion IDs from DuckDB.
Returns {motion_id: (title, date_str)}. Missing IDs are omitted.
Returns {} on any DB error.
"""
if not motion_ids:
return {}
try:
import duckdb
placeholders = ", ".join("?" for _ in motion_ids)
conn = duckdb.connect(db_path, read_only=True)
try:
rows = conn.execute(
f"SELECT id, title, date FROM motions WHERE id IN ({placeholders})",
motion_ids,
).fetchall()
finally:
conn.close()
return {int(row[0]): (str(row[1]), str(row[2])) for row in rows}
except Exception as exc:
_logger.debug("Failed to fetch motion titles: %s", exc)
return {}
def _load_ideology(csv_path: Path) -> Dict[str, Dict[str, float]]:
"""Load party ideology scores from CSV.
Returns {party_name: {"left_right": float, "progressive": float}}.
Returns {} on any error (caller should treat empty as 'skip classification').
"""
global _ideology_cache
if _ideology_cache is not None:
return _ideology_cache
result: Dict[str, Dict[str, float]] = {}
try:
with open(csv_path, encoding="utf-8") as fh:
lines = fh.read().splitlines()
header = [h.strip() for h in lines[0].split(",")]
lr_idx = header.index("left_right")
pc_idx = header.index("progressive")
for line in lines[1:]:
if not line.strip():
continue
parts = [p.strip() for p in line.split(",")]
if len(parts) <= max(lr_idx, pc_idx):
continue
result[parts[0]] = {
"left_right": float(parts[lr_idx]),
"progressive": float(parts[pc_idx]),
}
except FileNotFoundError:
_logger.warning(
"party_ideologies.csv not found at %s — axis labels will be generic",
csv_path,
)
return {}
except Exception as exc:
_logger.warning("Failed to load party_ideologies.csv: %s", exc)
return {}
_ideology_cache = result
return result
def _load_coalition(csv_path: Path) -> Dict[str, set]:
"""Load coalition membership from CSV.
Returns {window_id: set_of_party_names}.
Returns {} on any error (coalition dimension will be skipped).
"""
global _coalition_cache
if _coalition_cache is not None:
return _coalition_cache
result: Dict[str, set] = {}
try:
with open(csv_path, encoding="utf-8") as fh:
lines = fh.read().splitlines()
for line in lines[1:]:
if not line.strip():
continue
parts = [p.strip() for p in line.split(",")]
if len(parts) < 2:
continue
wid, party = parts[0], parts[1]
result.setdefault(wid, set()).add(party)
except FileNotFoundError:
_logger.warning(
"coalition_membership.csv not found at %s — coalition axis detection disabled",
csv_path,
)
return {}
except Exception as exc:
_logger.warning("Failed to load coalition_membership.csv: %s", exc)
return {}
_coalition_cache = result
return result
def _window_year(window_id: str) -> Optional[str]:
"""Extract year string from window_id.
Returns None for 'current_parliament'.
'2016''2016', '2016-Q3''2016'.
"""
if window_id == "current_parliament":
return None
return window_id.split("-")[0]
def _pearsonr(x: List[float], y: List[float]) -> float:
"""Pearson r; returns 0.0 for degenerate input (< 3 points or zero variance)."""
if len(x) < 3:
return 0.0
xa = np.array(x, dtype=float)
ya = np.array(y, dtype=float)
if xa.std() < 1e-12 or ya.std() < 1e-12:
return 0.0
return float(np.corrcoef(xa, ya)[0, 1])
def _assign_label(
r_lr: float,
r_co: float,
r_pc: float,
axis: str,
) -> Tuple[str, str, float]:
"""Assign label, interpretation and quality score for one axis.
Priority: left-right > coalition > progressive > fallback.
Returns (label, interpretation_string, quality_score).
"""
orientation = "horizontale" if axis == "x" else "verticale"
_x_fallback, _y_fallback = get_fallback_labels()
fallback_label = _x_fallback if axis == "x" else _y_fallback
quality = max(abs(r_lr), abs(r_co), abs(r_pc))
if abs(r_lr) >= _THRESHOLD:
return (
_LABELS["lr"],
_INTERPRETATION_TEMPLATES["lr"].format(orientation=orientation),
quality,
)
if abs(r_co) >= _THRESHOLD:
return (
_LABELS["co"],
_INTERPRETATION_TEMPLATES["co"].format(orientation=orientation, r=r_co),
quality,
)
if abs(r_pc) >= _THRESHOLD:
return (
_LABELS["pc"],
_INTERPRETATION_TEMPLATES["pc"].format(orientation=orientation),
quality,
)
return (
fallback_label,
"", # No interpretation for unclassified axes
quality,
)
def classify_axes(
positions_by_window: Dict[str, Dict[str, Tuple[float, float]]],
axes: dict,
db_path: str,
) -> dict:
"""Classify compass axes using motion projection (primary) and ideology CSV (fallback).
Motion projection path:
- Requires axes["global_mean"], axes["x_axis"], axes["y_axis"].
- Loads motion SVD vectors per window, projects onto PCA axes,
ranks top 5+5 motions, applies keyword classifier -> label.
Fallback path (unchanged):
- Pearson-r against party_ideologies.csv (left_right, progressive).
- Pearson-r against coalition_membership.csv dummy.
Enriches axes with:
x_label, y_label — global modal label across annual windows
x_quality, y_quality — {window_id: float} max |r|
x_interpretation — {window_id: str}
y_interpretation — {window_id: str}
x_top_motions, y_top_motions — {window_id: {'+': [(title, date), ...], '-': [...]}}
x_label_confidence — {window_id: float}
y_label_confidence — {window_id: float}
"""
data_dir = Path(db_path).parent
ideology = _load_ideology(data_dir / "party_ideologies.csv")
coalition = _load_coalition(data_dir / "coalition_membership.csv")
# Determine whether motion projection is possible.
global_mean = axes.get("global_mean")
x_axis_arr = np.array(axes.get("x_axis", []))
y_axis_arr = np.array(axes.get("y_axis", []))
motion_path_available = (
global_mean is not None
and x_axis_arr.ndim == 1
and x_axis_arr.size > 0
and y_axis_arr.size > 0
)
# If we have neither ideology reference data nor motion vectors available,
# there is nothing to classify. Previously an early-exit below could be
# shadowed by a nested helper definition causing classify_axes to return
# None. Ensure we return the original axes dict in this case.
if not ideology and not motion_path_available:
return axes
x_quality: Dict[str, float] = {}
y_quality: Dict[str, float] = {}
x_interpretation: Dict[str, str] = {}
y_interpretation: Dict[str, str] = {}
x_top_motions: Dict[str, Dict] = {}
y_top_motions: Dict[str, Dict] = {}
x_label_confidence: Dict[str, float] = {}
y_label_confidence: Dict[str, float] = {}
annual_x_labels: List[str] = []
annual_y_labels: List[str] = []
for wid, pos_dict in positions_by_window.items():
year = _window_year(wid)
is_annual = wid != "current_parliament" and "-" not in wid
# ── Ideology / coalition Pearson-r (unchanged logic) ──────────────────
x_lbl_fallback: Optional[str] = None
y_lbl_fallback: Optional[str] = None
x_q = 0.0
y_q = 0.0
x_int = ""
y_int = ""
if ideology:
parties = [p for p in pos_dict if p in ideology]
if len(parties) >= 5:
party_x = [pos_dict[p][0] for p in parties]
party_y = [pos_dict[p][1] for p in parties]
ref_lr = [ideology[p]["left_right"] for p in parties]
ref_pc = [ideology[p]["progressive"] for p in parties]
if year and coalition and year in coalition:
gov_set = coalition[year]
ref_co = [1.0 if p in gov_set else -1.0 for p in parties]
else:
ref_co = [0.0] * len(parties)
r_lr_x = _pearsonr(party_x, ref_lr)
r_co_x = _pearsonr(party_x, ref_co)
r_pc_x = _pearsonr(party_x, ref_pc)
x_lbl_fallback, x_int, x_q = _assign_label(r_lr_x, r_co_x, r_pc_x, "x")
r_lr_y = _pearsonr(party_y, ref_lr)
r_co_y = _pearsonr(party_y, ref_co)
r_pc_y = _pearsonr(party_y, ref_pc)
y_lbl_fallback, y_int, y_q = _assign_label(r_lr_y, r_co_y, r_pc_y, "y")
# ── Motion projection (primary) ────────────────────────────────────────
x_lbl = x_lbl_fallback
y_lbl = y_lbl_fallback
x_conf = 0.0
y_conf = 0.0
x_tops: Dict[str, List] = {"+": [], "-": []}
y_tops: Dict[str, List] = {"+": [], "-": []}
if motion_path_available:
motion_vecs = _load_motion_vectors(db_path, wid)
if motion_vecs:
projections = _project_motions(
motion_vecs, x_axis_arr, y_axis_arr, global_mean
)
x_ids = _top_motion_ids(projections, "x", n=5)
y_ids = _top_motion_ids(projections, "y", n=5)
all_x_ids = x_ids["+"] + x_ids["-"]
all_y_ids = y_ids["+"] + y_ids["-"]
titles_map = _fetch_motion_titles(
db_path, list(set(all_x_ids + all_y_ids))
)
x_title_list = [
titles_map[mid][0] for mid in all_x_ids if mid in titles_map
]
y_title_list = [
titles_map[mid][0] for mid in all_y_ids if mid in titles_map
]
x_kw_lbl, x_conf = _classify_from_titles(x_title_list)
y_kw_lbl, y_conf = _classify_from_titles(y_title_list)
if x_kw_lbl is not None:
x_lbl = x_kw_lbl
if not x_int:
tkey = _MOTION_LABEL_TEMPLATE_KEY.get(x_kw_lbl, "fallback")
x_int = _INTERPRETATION_TEMPLATES[tkey].format(
orientation="horizontale"
)
if y_kw_lbl is not None:
y_lbl = y_kw_lbl
if not y_int:
tkey = _MOTION_LABEL_TEMPLATE_KEY.get(y_kw_lbl, "fallback")
y_int = _INTERPRETATION_TEMPLATES[tkey].format(
orientation="verticale"
)
# Build display lists: [(title, date), ...]
for pole, ids in x_ids.items():
x_tops[pole] = [titles_map[mid] for mid in ids if mid in titles_map]
for pole, ids in y_ids.items():
y_tops[pole] = [titles_map[mid] for mid in ids if mid in titles_map]
# ── Final label resolution ────────────────────────────────────────────
# If both motion and ideology paths produced nothing, use generic fallback.
_x_fallback, _y_fallback = get_fallback_labels()
if x_lbl is None:
x_lbl = _x_fallback
x_int = "" # No interpretation for unclassified axes
if y_lbl is None:
y_lbl = _y_fallback
y_int = "" # No interpretation for unclassified axes
x_quality[wid] = x_q
y_quality[wid] = y_q
x_interpretation[wid] = x_int
y_interpretation[wid] = y_int
x_top_motions[wid] = x_tops
y_top_motions[wid] = y_tops
x_label_confidence[wid] = x_conf
y_label_confidence[wid] = y_conf
if is_annual:
annual_x_labels.append(x_lbl)
annual_y_labels.append(y_lbl)
def _modal(labels: List[str], fallback: str) -> str:
if not labels:
return fallback
return Counter(labels).most_common(1)[0][0]
# Use the module-level display_label_for_modal defined above.
enriched = dict(axes)
# Resolve modal label across annual windows. If the modal label is the
# internal generic component name ("As 1"/"As 2" or legacy
# "Stempatroon As N"), prefer a conventional short semantic fallback so the
# UI doesn't display unhelpful "As N" strings to end users.
modal_x = _modal(annual_x_labels, "Links\u2013Rechts")
modal_y = _modal(annual_y_labels, "Progressief\u2013Conservatief")
enriched["x_label"] = display_label_for_modal(modal_x, "x")
enriched["y_label"] = display_label_for_modal(modal_y, "y")
enriched["x_quality"] = x_quality
enriched["y_quality"] = y_quality
enriched["x_interpretation"] = x_interpretation
enriched["y_interpretation"] = y_interpretation
enriched["x_top_motions"] = x_top_motions
enriched["y_top_motions"] = y_top_motions
enriched["x_label_confidence"] = x_label_confidence
enriched["y_label_confidence"] = y_label_confidence
return enriched