diff --git a/analysis/axis_classifier.py b/analysis/axis_classifier.py index 325faca..f633ce9 100644 --- a/analysis/axis_classifier.py +++ b/analysis/axis_classifier.py @@ -392,74 +392,157 @@ def classify_axes( axes: dict, db_path: str, ) -> dict: - """Classify compass axes by correlating per-party positions against ideology reference data. - - Enriches ``axes`` with: - x_label, y_label — global label (modal across annual windows) - x_quality, y_quality — {window_id: float} max |r| for each window - x_interpretation — {window_id: str} Dutch explanation per window - y_interpretation — {window_id: str} Dutch explanation per window - - Returns the original ``axes`` dict unchanged if reference data is unavailable. + """Classify compass axes using motion projection (primary) and ideology CSV (fallback). + + Motion projection path: + - Requires axes["global_mean"], axes["x_axis"], axes["y_axis"]. + - Loads motion SVD vectors per window, projects onto PCA axes, + ranks top 5+5 motions, applies keyword classifier -> label. + + Fallback path (unchanged): + - Pearson-r against party_ideologies.csv (left_right, progressive). + - Pearson-r against coalition_membership.csv dummy. + + Enriches axes with: + x_label, y_label — global modal label across annual windows + x_quality, y_quality — {window_id: float} max |r| + x_interpretation — {window_id: str} + y_interpretation — {window_id: str} + x_top_motions, y_top_motions — {window_id: {'+': [(title, date), ...], '-': [...]}} + x_label_confidence — {window_id: float} + y_label_confidence — {window_id: float} """ data_dir = Path(db_path).parent ideology = _load_ideology(data_dir / "party_ideologies.csv") - if not ideology: - return axes # no reference data — preserve existing behaviour - coalition = _load_coalition(data_dir / "coalition_membership.csv") + # Determine whether motion projection is possible. + global_mean = axes.get("global_mean") + x_axis_arr = np.array(axes.get("x_axis", [])) + y_axis_arr = np.array(axes.get("y_axis", [])) + motion_path_available = ( + global_mean is not None + and x_axis_arr.ndim == 1 + and x_axis_arr.size > 0 + and y_axis_arr.size > 0 + ) + + if not ideology and not motion_path_available: + return axes # nothing to classify with + x_quality: Dict[str, float] = {} y_quality: Dict[str, float] = {} x_interpretation: Dict[str, str] = {} y_interpretation: Dict[str, str] = {} + x_top_motions: Dict[str, Dict] = {} + y_top_motions: Dict[str, Dict] = {} + x_label_confidence: Dict[str, float] = {} + y_label_confidence: Dict[str, float] = {} annual_x_labels: List[str] = [] annual_y_labels: List[str] = [] for wid, pos_dict in positions_by_window.items(): year = _window_year(wid) - is_current = wid == "current_parliament" - is_annual = not is_current and "-" not in wid # e.g. "2016" not "2016-Q3" - - # Only use parties present in both the positions and the ideology reference. - parties = [p for p in pos_dict if p in ideology] - if len(parties) < 5: - _logger.debug( - "Skipping axis classification for %s: only %d reference parties (need 5)", - wid, - len(parties), + is_annual = wid != "current_parliament" and "-" not in wid + + # ── Ideology / coalition Pearson-r (unchanged logic) ────────────────── + x_lbl_fallback: Optional[str] = None + y_lbl_fallback: Optional[str] = None + x_q = 0.0 + y_q = 0.0 + x_int = "" + y_int = "" + + if ideology: + parties = [p for p in pos_dict if p in ideology] + if len(parties) >= 5: + party_x = [pos_dict[p][0] for p in parties] + party_y = [pos_dict[p][1] for p in parties] + ref_lr = [ideology[p]["left_right"] for p in parties] + ref_pc = [ideology[p]["progressive"] for p in parties] + + if year and coalition and year in coalition: + gov_set = coalition[year] + ref_co = [1.0 if p in gov_set else -1.0 for p in parties] + else: + ref_co = [0.0] * len(parties) + + r_lr_x = _pearsonr(party_x, ref_lr) + r_co_x = _pearsonr(party_x, ref_co) + r_pc_x = _pearsonr(party_x, ref_pc) + x_lbl_fallback, x_int, x_q = _assign_label(r_lr_x, r_co_x, r_pc_x, "x") + + r_lr_y = _pearsonr(party_y, ref_lr) + r_co_y = _pearsonr(party_y, ref_co) + r_pc_y = _pearsonr(party_y, ref_pc) + y_lbl_fallback, y_int, y_q = _assign_label(r_lr_y, r_co_y, r_pc_y, "y") + + # ── Motion projection (primary) ──────────────────────────────────────── + x_lbl = x_lbl_fallback + y_lbl = y_lbl_fallback + x_conf = 0.0 + y_conf = 0.0 + x_tops: Dict[str, List] = {"+": [], "-": []} + y_tops: Dict[str, List] = {"+": [], "-": []} + + if motion_path_available: + motion_vecs = _load_motion_vectors(db_path, wid) + if motion_vecs: + projections = _project_motions( + motion_vecs, x_axis_arr, y_axis_arr, global_mean + ) + x_ids = _top_motion_ids(projections, "x", n=5) + y_ids = _top_motion_ids(projections, "y", n=5) + + all_x_ids = x_ids["+"] + x_ids["-"] + all_y_ids = y_ids["+"] + y_ids["-"] + titles_map = _fetch_motion_titles( + db_path, list(set(all_x_ids + all_y_ids)) + ) + + x_title_list = [ + titles_map[mid][0] for mid in all_x_ids if mid in titles_map + ] + y_title_list = [ + titles_map[mid][0] for mid in all_y_ids if mid in titles_map + ] + + x_kw_lbl, x_conf = _classify_from_titles(x_title_list) + y_kw_lbl, y_conf = _classify_from_titles(y_title_list) + + if x_kw_lbl is not None: + x_lbl = x_kw_lbl + if y_kw_lbl is not None: + y_lbl = y_kw_lbl + + # Build display lists: [(title, date), ...] + for pole, ids in x_ids.items(): + x_tops[pole] = [titles_map[mid] for mid in ids if mid in titles_map] + for pole, ids in y_ids.items(): + y_tops[pole] = [titles_map[mid] for mid in ids if mid in titles_map] + + # ── Final label resolution ──────────────────────────────────────────── + # If both motion and ideology paths produced nothing, use generic fallback. + if x_lbl is None: + x_lbl = _LABELS["fallback_x"] + x_int = _INTERPRETATION_TEMPLATES["fallback"].format( + orientation="horizontale" + ) + if y_lbl is None: + y_lbl = _LABELS["fallback_y"] + y_int = _INTERPRETATION_TEMPLATES["fallback"].format( + orientation="verticale" ) - continue - - party_x = [pos_dict[p][0] for p in parties] - party_y = [pos_dict[p][1] for p in parties] - ref_lr = [ideology[p]["left_right"] for p in parties] - ref_pc = [ideology[p]["progressive"] for p in parties] - - # Coalition dummy: +1 if in government that year, -1 otherwise. - # current_parliament and windows with no coalition data use a neutral vector. - if year and coalition and year in coalition: - gov_set = coalition[year] - ref_co = [1.0 if p in gov_set else -1.0 for p in parties] - else: - ref_co = [0.0] * len(parties) # neutral — will never exceed threshold - - r_lr_x = _pearsonr(party_x, ref_lr) - r_co_x = _pearsonr(party_x, ref_co) - r_pc_x = _pearsonr(party_x, ref_pc) - x_lbl, x_int, x_q = _assign_label(r_lr_x, r_co_x, r_pc_x, "x") - - r_lr_y = _pearsonr(party_y, ref_lr) - r_co_y = _pearsonr(party_y, ref_co) - r_pc_y = _pearsonr(party_y, ref_pc) - y_lbl, y_int, y_q = _assign_label(r_lr_y, r_co_y, r_pc_y, "y") x_quality[wid] = x_q y_quality[wid] = y_q x_interpretation[wid] = x_int y_interpretation[wid] = y_int + x_top_motions[wid] = x_tops + y_top_motions[wid] = y_tops + x_label_confidence[wid] = x_conf + y_label_confidence[wid] = y_conf - # Only annual windows vote on the global label (not quarterly, not current_parliament). if is_annual: annual_x_labels.append(x_lbl) annual_y_labels.append(y_lbl) @@ -476,4 +559,8 @@ def classify_axes( enriched["y_quality"] = y_quality enriched["x_interpretation"] = x_interpretation enriched["y_interpretation"] = y_interpretation + enriched["x_top_motions"] = x_top_motions + enriched["y_top_motions"] = y_top_motions + enriched["x_label_confidence"] = x_label_confidence + enriched["y_label_confidence"] = y_label_confidence return enriched