You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
motief/explorer.py

543 lines
18 KiB

"""Parlement Explorer — Streamlit data analysis app.
Four tabs:
1. Politiek Kompas — 2D scatter of MPs/parties, window slider
2. Partij Trajectories — party centroid lines over time
3. Motie Zoeken — text search + similarity lookup
4. Motie Browser — sortable table + detail panel
Run with: streamlit run explorer.py
Import-safe: heavy computation is behind @st.cache_data and only runs at UI time.
All DuckDB connections are read_only=True so the app can run alongside the pipeline.
"""
from __future__ import annotations
import json
import logging
import os
import re
import traceback
from datetime import datetime
from typing import Dict, List, Optional, Tuple
try:
import duckdb
_DUCKDB_AVAILABLE = True
except Exception:
duckdb = None
_DUCKDB_AVAILABLE = False
import numpy as np
import pandas as pd
from analysis import config
from analysis import explorer_data
from analysis import projections
from analysis import trajectory
# Backwards-compatible re-export used by tests
choose_trajectory_title = trajectory.choose_trajectory_title
try:
import plotly.express as px
import plotly.graph_objects as go
except Exception:
px = None
import types
class _DummyTrace:
def __init__(self, **kwargs):
self.name = kwargs.get("name")
self.x = kwargs.get("x")
self.y = kwargs.get("y")
self.text = kwargs.get("text")
self.customdata = kwargs.get("customdata")
class _DummyFigure:
def __init__(self):
self.data = []
def add_trace(self, trace):
if isinstance(trace, _DummyTrace):
self.data.append(trace)
else:
try:
name = getattr(trace, "name", None)
x = getattr(trace, "x", None)
y = getattr(trace, "y", None)
text = getattr(trace, "text", None)
customdata = getattr(trace, "customdata", None)
except Exception:
name = trace.get("name") if hasattr(trace, "get") else None
x = trace.get("x") if hasattr(trace, "get") else None
y = trace.get("y") if hasattr(trace, "get") else None
text = trace.get("text") if hasattr(trace, "get") else None
customdata = (
trace.get("customdata") if hasattr(trace, "get") else None
)
self.data.append(
_DummyTrace(name=name, x=x, y=y, text=text, customdata=customdata)
)
def add_annotation(self, *args, **kwargs):
return None
go = types.SimpleNamespace(
Figure=_DummyFigure, Scatter=lambda **kwargs: _DummyTrace(**kwargs)
)
try:
import streamlit as st
except Exception:
class _DummySt:
def cache_data(self, *args, **kwargs):
def _decorator(func):
return func
return _decorator
def markdown(self, *args, **kwargs):
return None
def subheader(self, *args, **kwargs):
return None
def plotly_chart(self, *args, **kwargs):
return None
def caption(self, *args, **kwargs):
return None
def text_area(self, *args, **kwargs):
return None
def json(self, *args, **kwargs):
return None
def checkbox(self, *args, **kwargs):
return kwargs.get("value", False)
def warning(self, *args, **kwargs):
return None
def info(self, *args, **kwargs):
return None
def selectbox(self, *args, **kwargs):
opts = (
kwargs.get("options")
if kwargs.get("options") is not None
else (args[1] if len(args) > 1 else [])
)
return opts[0] if opts else None
def multiselect(self, *args, **kwargs):
opts = (
kwargs.get("options")
if kwargs.get("options") is not None
else (args[1] if len(args) > 1 else [])
)
default = kwargs.get("default")
if default is not None:
return default
return opts[:6] if opts else []
def number_input(self, *args, **kwargs):
return kwargs.get("value") if "value" in kwargs else 1
def slider(self, *args, **kwargs):
return kwargs.get("value") if "value" in kwargs else 0.35
def expander(self, *args, **kwargs):
class _Ctx:
def __enter__(self_inner):
return self_inner
def __exit__(self_inner, exc_type, exc, tb):
return False
return _Ctx()
def columns(self, *args, **kwargs):
class _Col:
def markdown(self, *a, **k):
return None
def metric(self, *a, **k):
return None
def dataframe(self, *a, **k):
return None
n = len(args[0]) if args else 1
return tuple(_Col() for _ in range(n))
st = _DummySt()
# Re-export trajectories diagnostics for backwards compatibility
from analysis.tabs.trajectories import (
_last_diagnostics,
_last_trajectories_diagnostics,
get_debug_trajectories_enabled,
)
def select_trajectory_plot_data(*args, **kwargs):
"""Lazy wrapper around analysis.tabs.trajectories.select_trajectory_plot_data."""
from analysis.tabs.trajectories import (
select_trajectory_plot_data as _impl,
)
return _impl(*args, **kwargs)
logger = logging.getLogger(__name__)
PARTY_COLOURS: Dict[str, str] = config.PARTY_COLOURS
SVD_THEMES: dict[int, dict[str, str]] = config.SVD_THEMES
KNOWN_MAJOR_PARTIES = config.KNOWN_MAJOR_PARTIES
CURRENT_PARLIAMENT_PARTIES = config.CURRENT_PARLIAMENT_PARTIES
_PARTY_NORMALIZE = config._PARTY_NORMALIZE
# ---------------------------------------------------------------------------
# Cached loaders
# ---------------------------------------------------------------------------
@st.cache_data(show_spinner="Beschikbare tijdsvensters laden…")
def get_available_windows(db_path: str) -> List[str]:
"""Return sorted list of distinct window_ids from svd_vectors."""
return explorer_data.get_available_windows(db_path)
@st.cache_data(show_spinner=False)
def get_uniform_dim_windows(db_path: str) -> List[str]:
"""Return only windows whose dominant MP-vector dimension is >= 25."""
return explorer_data.get_uniform_dim_windows(db_path)
def _should_swap_axes(axis_def: dict) -> bool:
"""Return True if the Y axis is economic left-right and the X axis is not."""
return projections.should_swap_axes(axis_def)
def _swap_axes(
positions_by_window: dict,
axis_def: dict,
) -> tuple:
"""Swap x and y in all positions and axis metadata."""
return projections.swap_axes(positions_by_window, axis_def)
@st.cache_data(show_spinner="2D posities berekenen (kan even duren)…")
def load_positions(
db_path: str, window_size: str = "annual"
) -> Tuple[Dict[str, Dict[str, Tuple[float, float]]], Dict]:
"""Compute 2D positions per window using PCA on aligned SVD vectors."""
return explorer_data.load_positions(db_path, window_size)
@st.cache_data(show_spinner="Partijkaart laden…")
def load_party_map(db_path: str) -> Dict[str, str]:
"""Return {mp_name: party} mapping, with party names normalised to abbreviations."""
return explorer_data.load_party_map(db_path)
@st.cache_data(show_spinner="Actieve Kamerleden laden…")
def load_active_mps(db_path: str) -> set:
"""Return the set of mp_name values that are currently seated in parliament."""
return explorer_data.load_active_mps(db_path)
def get_aligned_party_scores(
db_path: str, window: str, active_mps: set | None = None
) -> Dict[str, np.ndarray]:
"""Get party scores for all N components from aligned PCA positions."""
return explorer_data.get_aligned_party_scores(db_path, window, active_mps)
def compute_party_discipline(
db_path: str,
start_date: str,
end_date: str,
) -> pd.DataFrame:
"""Compute per-party voting discipline (Rice index) for roll-call votes in a date range."""
return explorer_data.compute_party_discipline(db_path, start_date, end_date)
def _load_mp_vectors_by_party(db_path: str) -> Dict[str, List[np.ndarray]]:
"""Load individual MP SVD vectors grouped by party for current_parliament."""
return explorer_data.load_mp_vectors_by_party(db_path)
def _load_mp_vectors_by_party_for_window(
db_path: str, window: str
) -> Dict[str, List[np.ndarray]]:
"""Load individual MP SVD vectors grouped by party for a specific window."""
return explorer_data.load_mp_vectors_by_party_for_window(db_path, window)
@st.cache_data(show_spinner="Partijposities op SVD-assen laden…")
def load_party_axis_scores(db_path: str) -> Dict[str, List[float]]:
"""Return per-party SVD vectors, computed as mean of individual MP vectors."""
try:
return explorer_data.compute_party_axis_scores(
explorer_data.load_mp_vectors_by_party(db_path)
)
except Exception:
logger.exception("Failed to load party axis scores")
return {}
@st.cache_data(show_spinner="Partijposities voor jaar laden…")
def load_party_axis_scores_for_window(
db_path: str, window: str
) -> Dict[str, List[float]]:
"""Return per-party SVD vectors for a specific window."""
try:
return explorer_data.compute_party_axis_scores(
explorer_data.load_mp_vectors_by_party_for_window(db_path, window)
)
except Exception:
logger.exception(f"Failed to load party axis scores for window {window}")
return {}
@st.cache_data(show_spinner="SVD scores voor alle vensters laden…")
def load_party_scores_all_windows(
db_path: str, windows: List[str]
) -> Dict[str, Dict[str, List[float]]]:
"""Load party SVD scores for all specified windows."""
result: Dict[str, Dict[str, List[float]]] = {}
for window in windows:
if window == "current_parliament":
result[window] = load_party_axis_scores(db_path)
else:
result[window] = load_party_axis_scores_for_window(db_path, window)
return result
def _load_mp_vectors_by_window(db_path: str, window: str) -> Dict[str, np.ndarray]:
"""Load individual MP SVD vectors for a specific window."""
return explorer_data.load_mp_vectors_by_window(db_path, window)
def _get_aligned_trajectory_scores(
db_path: str, windows: List[str], n_components: int = 10
) -> Dict[str, Dict[str, List[float]]]:
"""Get aligned PCA scores for all windows as {window: {party: [scores per component]}}."""
return explorer_data._get_aligned_trajectory_scores(db_path, windows, n_components)
@st.cache_data(show_spinner="SVD scores met Procrustes-uitlijning laden…")
def load_party_scores_all_windows_aligned(
db_path: str, windows: List[str]
) -> Dict[str, Dict[str, List[float]]]:
"""Load party SVD scores for all windows with Procrustes alignment."""
from analysis.trajectory import _procrustes_align_windows
raw_window_vecs: Dict[str, Dict[str, np.ndarray]] = {}
party_map = load_party_map(db_path)
for window in windows:
mp_vecs = _load_mp_vectors_by_window(db_path, window)
if mp_vecs:
raw_window_vecs[window] = mp_vecs
aligned_window_vecs = _procrustes_align_windows(raw_window_vecs)
result: Dict[str, Dict[str, List[float]]] = {}
for window in windows:
if window not in aligned_window_vecs:
continue
mp_vecs = aligned_window_vecs[window]
party_vecs: Dict[str, List[np.ndarray]] = {}
for mp_name, vec in mp_vecs.items():
party = party_map.get(mp_name)
if party:
if party not in party_vecs:
party_vecs[party] = []
party_vecs[party].append(vec)
result[window] = {}
for party, vecs in party_vecs.items():
if vecs:
avg_vec = np.mean(vecs, axis=0)
result[window][party] = avg_vec.tolist()
return result
@st.cache_data(show_spinner="Partij-MP vectoren laden…")
def load_party_mp_vectors(db_path: str) -> Dict[str, List[np.ndarray]]:
"""Return per-party lists of individual MP SVD vectors."""
try:
return explorer_data.load_mp_vectors_by_party(db_path)
except Exception:
logger.exception("Failed to load party MP vectors")
return {}
@st.cache_data(show_spinner="Bootstrap CI berekenen…")
def _cached_bootstrap_cis(
party_mp_vectors: Dict[str, List[np.ndarray]],
) -> Dict[str, Dict]:
"""Thin caching wrapper around compute_party_bootstrap_cis."""
from analysis.political_axis import compute_party_bootstrap_cis
return compute_party_bootstrap_cis(party_mp_vectors)
@st.cache_data(show_spinner="Scree-plot laden…")
def load_scree_data(db_path: str) -> List[float]:
"""Return explained variance ratios (%) for all SVD components, sorted descending."""
try:
from analysis.political_axis import compute_svd_spectrum
return compute_svd_spectrum(db_path)
except Exception:
logger.exception("Failed to load scree data")
return []
@st.cache_data(show_spinner="Moties laden…")
def load_motions_df(db_path: str) -> pd.DataFrame:
"""Load the full motions table as a pandas DataFrame (read-only)."""
return explorer_data.load_motions_df(db_path)
def query_similar(
db_path: str,
source_motion_id: int,
vector_type: str = "fused",
top_k: int = 10,
) -> pd.DataFrame:
"""Return top-k similar motions from similarity_cache (read-only)."""
return explorer_data.query_similar(db_path, source_motion_id, vector_type, top_k)
def _window_to_dates(window_id: str) -> tuple[str, str]:
"""Return (start_date, end_date) ISO strings for a given window_id."""
return trajectory.window_to_dates(window_id)
def build_compass_tab(*args, **kwargs):
"""Build the Politiek Kompas tab."""
from analysis.tabs.compass import build_compass_tab as _impl
return _impl(*args, **kwargs)
def build_trajectories_tab(*args, **kwargs):
"""Build the Partij Trajectories tab."""
from analysis.tabs.trajectories import build_trajectories_tab as _impl
return _impl(*args, **kwargs)
def build_search_tab(*args, **kwargs):
"""Build the Motie Zoeken tab."""
from analysis.tabs.search import build_search_tab as _impl
return _impl(*args, **kwargs)
def build_browser_tab(*args, **kwargs):
"""Build the Motie Browser tab."""
from analysis.tabs.browser import build_browser_tab as _impl
return _impl(*args, **kwargs)
def build_svd_components_tab(*args, **kwargs):
"""Build the SVD Components tab."""
from analysis.tabs.components import build_svd_components_tab as _impl
return _impl(*args, **kwargs)
def build_mp_quiz_tab(*args, **kwargs):
"""Build the MP Quiz tab."""
from analysis.tabs.quiz import build_mp_quiz_tab as _impl
return _impl(*args, **kwargs)
def run_app() -> None:
st.set_page_config(
layout="wide",
page_title="Parlement Explorer",
page_icon="🏛",
)
st.title("🏛 Parlement Explorer")
st.sidebar.title("Instellingen")
db_path = "data/motions.db"
window_size = "annual"
show_rejected = st.sidebar.checkbox("Toon verworpen moties", value=False)
with st.sidebar.expander(" Over", expanded=False):
try:
if _DUCKDB_AVAILABLE:
con = duckdb.connect(database=db_path, read_only=True)
n_motions = con.execute("SELECT COUNT(*) FROM motions").fetchone()[0]
n_fused = con.execute(
"SELECT COUNT(*) FROM fused_embeddings"
).fetchone()[0]
n_sim = con.execute("SELECT COUNT(*) FROM similarity_cache").fetchone()[
0
]
con.close()
st.markdown(
f"**Moties:** {n_motions:,} \n"
f"**Fused embeddings:** {n_fused:,} \n"
f"**Similarity cache:** {n_sim:,}"
)
else:
st.warning(
"DuckDB niet beschikbaar in deze Python-omgeving; DB diagnostics zijn niet beschikbaar."
)
except Exception as e:
st.warning(f"DB niet bereikbaar: {e}")
tab_labels = [
"🧭 Politiek Kompas",
"📈 Trajectories",
"🔍 Motie Zoeken",
"📋 Motie Browser",
"🔬 SVD Components",
]
if hasattr(st, "tabs") and callable(getattr(st, "tabs")):
tab1, tab2, tab3, tab4, tab5 = st.tabs(tab_labels)
with tab1:
build_compass_tab(db_path, window_size)
with tab2:
build_trajectories_tab(db_path, window_size)
with tab3:
build_search_tab(db_path, show_rejected)
with tab4:
build_browser_tab(db_path, show_rejected)
with tab5:
build_svd_components_tab(db_path)
else:
selection = st.radio("Tab", tab_labels)
if selection == tab_labels[0]:
build_compass_tab(db_path, window_size)
elif selection == tab_labels[1]:
build_trajectories_tab(db_path, window_size)
elif selection == tab_labels[2]:
build_search_tab(db_path, show_rejected)
elif selection == tab_labels[3]:
build_browser_tab(db_path, show_rejected)
else:
build_svd_components_tab(db_path)
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s"
)
run_app()