chore(explorer): add get_debug_trajectories_enabled helper feat(explorer): instrument trajectories with debug diagnostics and un-silence helper exceptionsmain
parent
0f2db0a9be
commit
baee50f3a5
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,227 @@ |
|||||||
|
"""Helper utilities used by explorer.py. |
||||||
|
|
||||||
|
Primary export: |
||||||
|
- compute_party_coords: compute per-party (x_mean, y_mean) from positions_by_window. |
||||||
|
|
||||||
|
This module is intentionally free of Streamlit side-effects to be easy to unit test. |
||||||
|
""" |
||||||
|
|
||||||
|
from __future__ import annotations |
||||||
|
|
||||||
|
import logging |
||||||
|
import math |
||||||
|
import re |
||||||
|
from typing import Any, Dict, List, Optional, Set, Tuple |
||||||
|
|
||||||
|
import numpy as np |
||||||
|
|
||||||
|
logger = logging.getLogger(__name__) |
||||||
|
|
||||||
|
|
||||||
|
def _strip_paren(s: str) -> str: |
||||||
|
# helper used in plan to try to strip parenthetical variants |
||||||
|
return s.split("(")[0].strip() |
||||||
|
|
||||||
|
|
||||||
|
def inspect_positions_for_issues( |
||||||
|
positions_by_window: Dict[str, Dict[str, Tuple[float, float]]], |
||||||
|
party_map: Dict[str, str], |
||||||
|
) -> Dict[str, Any]: |
||||||
|
"""Inspect positions_by_window for simple issues/summary. |
||||||
|
|
||||||
|
Returns a dictionary with keys including the previous ones (windows_count, |
||||||
|
window_labels, mp_id_set, party_map_count, parties_with_centroid_counts, |
||||||
|
mismatched_mp_ids_sample) plus: |
||||||
|
- mp_positions_count: int (num unique MP ids seen) |
||||||
|
- mp_positions_sample: list[str] (sorted sample up to 10) |
||||||
|
- windows_with_no_positions: list[str] |
||||||
|
|
||||||
|
This helper remains pure and import-safe so unit tests can exercise it. |
||||||
|
""" |
||||||
|
windows = list(positions_by_window.keys()) |
||||||
|
windows_count = len(windows) |
||||||
|
window_labels = sorted(windows)[:10] |
||||||
|
|
||||||
|
mp_id_set: Set[str] = set() |
||||||
|
parties_with_centroid_counts: Dict[str, int] = {} |
||||||
|
mismatched: Set[str] = set() |
||||||
|
windows_with_no_positions: List[str] = [] |
||||||
|
|
||||||
|
for win, pos in positions_by_window.items(): |
||||||
|
if not pos: |
||||||
|
windows_with_no_positions.append(win) |
||||||
|
continue |
||||||
|
present_parties: Set[str] = set() |
||||||
|
for ent in pos.keys(): |
||||||
|
if not ent: |
||||||
|
continue |
||||||
|
mp_id_set.add(ent) |
||||||
|
party = party_map.get(ent) |
||||||
|
if party is None: |
||||||
|
# try stripping paren variant |
||||||
|
party = party_map.get(_strip_paren(ent)) |
||||||
|
if party: |
||||||
|
present_parties.add(party) |
||||||
|
else: |
||||||
|
mismatched.add(ent) |
||||||
|
|
||||||
|
for p in present_parties: |
||||||
|
parties_with_centroid_counts[p] = parties_with_centroid_counts.get(p, 0) + 1 |
||||||
|
|
||||||
|
mismatched_mp_ids_sample = sorted(list(mismatched))[:10] |
||||||
|
|
||||||
|
mp_positions_sample = sorted(list(mp_id_set))[:10] |
||||||
|
mp_positions_count = len(mp_id_set) |
||||||
|
|
||||||
|
return { |
||||||
|
"windows_count": windows_count, |
||||||
|
"window_labels": window_labels, |
||||||
|
"mp_id_set": mp_id_set, |
||||||
|
"party_map_count": len(party_map), |
||||||
|
"parties_with_centroid_counts": parties_with_centroid_counts, |
||||||
|
"mismatched_mp_ids_sample": mismatched_mp_ids_sample, |
||||||
|
"mp_positions_sample": mp_positions_sample, |
||||||
|
"mp_positions_count": mp_positions_count, |
||||||
|
"windows_with_no_positions": windows_with_no_positions, |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
def compute_party_coords( |
||||||
|
positions_by_window: Dict[str, Dict[str, Tuple[float, float]]], |
||||||
|
party_map: Dict[str, str], |
||||||
|
window_id: str, |
||||||
|
fallback_party_scores: Optional[Dict[str, List[float]]] = None, |
||||||
|
) -> Tuple[Dict[str, Tuple[float, float]], Set[str]]: |
||||||
|
""" |
||||||
|
Compute per-party centroids (x_mean, y_mean) for a specific window. |
||||||
|
|
||||||
|
Args: |
||||||
|
positions_by_window: mapping window_id -> {entity_name: (x, y)} |
||||||
|
party_map: mapping mp_name -> party abbreviation (Normalized) |
||||||
|
window_id: which window to compute centroids for (key into positions_by_window) |
||||||
|
fallback_party_scores: optional mapping party -> numeric vector (len>=2). When a |
||||||
|
party has no MPs in the window and fallback_party_scores contains an entry, |
||||||
|
the first two elements of that vector will be used as a fallback (x,y). |
||||||
|
|
||||||
|
Returns: |
||||||
|
(party_coords, fallback_used) where: |
||||||
|
- party_coords: {party: (x_mean, y_mean)} for parties with a computed coord or fallback. |
||||||
|
- fallback_used: set of party names where fallback_party_scores was used. |
||||||
|
""" |
||||||
|
pos = positions_by_window.get(window_id, {}) or {} |
||||||
|
|
||||||
|
per_party: Dict[str, List[Tuple[float, float]]] = {} |
||||||
|
for ent, xy in pos.items(): |
||||||
|
if not ent or xy is None: |
||||||
|
continue |
||||||
|
try: |
||||||
|
x, y = float(xy[0]), float(xy[1]) |
||||||
|
except Exception: |
||||||
|
# skip malformed coords |
||||||
|
continue |
||||||
|
party = party_map.get(ent) |
||||||
|
if party is None: |
||||||
|
# try stripped name fallback |
||||||
|
party = party_map.get(_strip_paren(ent)) |
||||||
|
if not party or party == "Unknown": |
||||||
|
continue |
||||||
|
per_party.setdefault(party, []).append((x, y)) |
||||||
|
|
||||||
|
party_coords: Dict[str, Tuple[float, float]] = {} |
||||||
|
fallback_used: Set[str] = set() |
||||||
|
|
||||||
|
# compute means for parties that have MPs |
||||||
|
for party, coords in per_party.items(): |
||||||
|
xs = [c[0] for c in coords] |
||||||
|
ys = [c[1] for c in coords] |
||||||
|
# defensive: drop nan/inf |
||||||
|
xs = [float(x) for x in xs if not (math.isnan(x) or math.isinf(x))] |
||||||
|
ys = [float(y) for y in ys if not (math.isnan(y) or math.isinf(y))] |
||||||
|
if not xs or not ys: |
||||||
|
continue |
||||||
|
party_coords[party] = (float(np.mean(xs)), float(np.mean(ys))) |
||||||
|
|
||||||
|
# fallback: use supplied party vectors if a party has no MPs in this window |
||||||
|
if fallback_party_scores: |
||||||
|
for party, vec in fallback_party_scores.items(): |
||||||
|
if party in party_coords: |
||||||
|
continue |
||||||
|
if not vec: |
||||||
|
continue |
||||||
|
try: |
||||||
|
# vec may be list, np.array, etc. |
||||||
|
if len(vec) >= 2: |
||||||
|
x_f, y_f = float(vec[0]), float(vec[1]) |
||||||
|
if ( |
||||||
|
math.isnan(x_f) |
||||||
|
or math.isnan(y_f) |
||||||
|
or math.isinf(x_f) |
||||||
|
or math.isinf(y_f) |
||||||
|
): |
||||||
|
continue |
||||||
|
party_coords[party] = (x_f, y_f) |
||||||
|
fallback_used.add(party) |
||||||
|
except Exception: |
||||||
|
continue |
||||||
|
|
||||||
|
if fallback_used: |
||||||
|
logger.warning( |
||||||
|
"compute_party_coords used fallback for parties: %s", |
||||||
|
sorted(list(fallback_used)), |
||||||
|
) |
||||||
|
|
||||||
|
return party_coords, fallback_used |
||||||
|
|
||||||
|
|
||||||
|
def compute_party_centroids( |
||||||
|
positions_by_window: Dict[str, Dict[str, Tuple[float, float]]], |
||||||
|
party_map: Dict[str, str], |
||||||
|
windows: List[str], |
||||||
|
) -> Tuple[Dict[str, List[Tuple[float, float]]], Dict[str, Any]]: |
||||||
|
"""Compute per-party centroids across multiple windows. |
||||||
|
|
||||||
|
Returns (party_centroids, metadata) |
||||||
|
- party_centroids: mapping party -> list of (x,y) tuples of length len(windows). |
||||||
|
Entries without MPs are (np.nan, np.nan). |
||||||
|
- metadata: dict with keys 'per_party_counts', 'total_windows', 'parties' |
||||||
|
""" |
||||||
|
party_centroids: Dict[str, List[Tuple[float, float]]] = {} |
||||||
|
# collect all parties from party_map values |
||||||
|
parties = sorted(set(party_map.values())) |
||||||
|
# if no parties known, return empty dict but still metadata |
||||||
|
if not parties: |
||||||
|
return {}, { |
||||||
|
"per_party_counts": {}, |
||||||
|
"total_windows": len(windows), |
||||||
|
"parties": [], |
||||||
|
} |
||||||
|
|
||||||
|
# initialize lists |
||||||
|
for p in parties: |
||||||
|
party_centroids[p] = [] |
||||||
|
|
||||||
|
# for each window, compute party coords using compute_party_coords for that window |
||||||
|
for w in windows: |
||||||
|
coords, _ = compute_party_coords(positions_by_window or {}, party_map, w) |
||||||
|
for p in parties: |
||||||
|
if p in coords: |
||||||
|
# ensure numeric floats |
||||||
|
party_centroids[p].append((float(coords[p][0]), float(coords[p][1]))) |
||||||
|
else: |
||||||
|
party_centroids[p].append((float(np.nan), float(np.nan))) |
||||||
|
|
||||||
|
# metadata |
||||||
|
per_party_counts: Dict[str, int] = {} |
||||||
|
for p, vals in party_centroids.items(): |
||||||
|
count = 0 |
||||||
|
for x, y in vals: |
||||||
|
if not (np.isnan(x) or np.isnan(y)): |
||||||
|
count += 1 |
||||||
|
per_party_counts[p] = count |
||||||
|
|
||||||
|
metadata = { |
||||||
|
"per_party_counts": per_party_counts, |
||||||
|
"total_windows": len(windows), |
||||||
|
"parties": parties, |
||||||
|
} |
||||||
|
return party_centroids, metadata |
||||||
@ -0,0 +1,49 @@ |
|||||||
|
import os |
||||||
|
import types |
||||||
|
|
||||||
|
import explorer |
||||||
|
|
||||||
|
|
||||||
|
def test_load_positions_empty_sets_diagnostics(monkeypatch): |
||||||
|
# Monkeypatch load_positions to return empty positions |
||||||
|
monkeypatch.setattr( |
||||||
|
explorer, "load_positions", lambda db_path, window_size: ({}, {}) |
||||||
|
) |
||||||
|
monkeypatch.setenv("EXPLORER_DEBUG_TRAJECTORIES", "1") |
||||||
|
|
||||||
|
# Call build_trajectories_tab; it should set diagnostics and return without exception |
||||||
|
explorer.build_trajectories_tab(db_path="unused", window_size="annual") |
||||||
|
|
||||||
|
assert ( |
||||||
|
explorer._last_trajectories_diagnostics.get("stage") == "load_positions_empty" |
||||||
|
) |
||||||
|
|
||||||
|
|
||||||
|
def test_select_helper_exception_is_captured(monkeypatch): |
||||||
|
# Provide a minimal non-empty positions_by_window |
||||||
|
positions = {"W1": {"mp1": (0.1, 0.2)}} |
||||||
|
|
||||||
|
def fake_load_positions(db_path, window_size): |
||||||
|
return positions, {} |
||||||
|
|
||||||
|
monkeypatch.setattr(explorer, "load_positions", fake_load_positions) |
||||||
|
# Ensure party_map maps the mp so centroids/path that invoke select_trajectory_plot_data |
||||||
|
monkeypatch.setattr(explorer, "load_party_map", lambda db_path: {"mp1": "P1"}) |
||||||
|
|
||||||
|
# Patch select_trajectory_plot_data to raise |
||||||
|
def bad_helper(*args, **kwargs): |
||||||
|
raise ValueError("boom") |
||||||
|
|
||||||
|
monkeypatch.setattr(explorer, "select_trajectory_plot_data", bad_helper) |
||||||
|
monkeypatch.setenv("EXPLORER_DEBUG_TRAJECTORIES", "1") |
||||||
|
|
||||||
|
explorer.build_trajectories_tab(db_path="unused", window_size="annual") |
||||||
|
|
||||||
|
# Ensure the helper function has diagnostics attached and module diagnostics updated |
||||||
|
assert getattr(explorer.select_trajectory_plot_data, "_last_diagnostics", None) |
||||||
|
assert "exception" in explorer.select_trajectory_plot_data._last_diagnostics |
||||||
|
assert ( |
||||||
|
explorer._last_trajectories_diagnostics.get("stage") |
||||||
|
== "select_helper_exception" |
||||||
|
) |
||||||
|
assert "ValueError" in explorer._last_trajectories_diagnostics.get("exception", "") |
||||||
@ -0,0 +1,22 @@ |
|||||||
|
import numpy as np |
||||||
|
from explorer_helpers import inspect_positions_for_issues |
||||||
|
|
||||||
|
|
||||||
|
def test_inspect_positions_for_issues_basic(): |
||||||
|
positions_by_window = { |
||||||
|
"w1": {"mp1": (1.0, 2.0), "mp2": (float("nan"), float("nan"))}, |
||||||
|
"w2": {}, |
||||||
|
} |
||||||
|
party_map = {"mp1": "P1"} |
||||||
|
d = inspect_positions_for_issues(positions_by_window, party_map) |
||||||
|
|
||||||
|
# basic keys still present |
||||||
|
assert d["windows_count"] == 2 |
||||||
|
assert isinstance(d["mp_id_set"], set) |
||||||
|
# new diagnostics |
||||||
|
assert "mp_positions_count" in d |
||||||
|
assert d["mp_positions_count"] >= 1 |
||||||
|
assert "mp_positions_sample" in d |
||||||
|
assert isinstance(d["mp_positions_sample"], list) |
||||||
|
assert "windows_with_no_positions" in d |
||||||
|
assert isinstance(d["windows_with_no_positions"], list) |
||||||
Loading…
Reference in new issue