From 8af27bbf04350e122303658154a00a3d1849d257 Mon Sep 17 00:00:00 2001 From: Sven Geboers Date: Mon, 4 May 2026 19:37:52 +0200 Subject: [PATCH] feat: implement agent-native architecture (U1-U6) Implements the agent-native architecture plan (docs/plans/2026-05-01-002-agent-native-architecture-plan.md): - U1: Database query primitives (agent_tools/database.py) - query_motions, query_votes, query_svd_vectors, query_party_positions, query_pipeline_status - U2: Pipeline control primitives (agent_tools/pipeline.py) - pipeline_run_stage, pipeline_run_full, pipeline_check_health, pipeline_get_logs, pipeline_validate_output - U3: Analysis & report generation (agent_tools/analysis.py, reports.py) - analyze_party_shift, analyze_axis_stability, validate_svd_labels, generate_report - U4: Content validation primitives (agent_tools/content.py) - validate_motion_coverage, validate_layman_explanations, suggest_svd_label, check_embedding_quality - U5: System prompt & context injection (SYSTEM_PROMPT.md, context.py, context.md) - U6: Parity verification tests (tests/agent_tools/test_parity.py) Tests: 238 passed, 2 skipped AGENTS.md updated to surface agent_tools/ --- .gitignore | 1 + AGENTS.md | 4 + agent_tools/SYSTEM_PROMPT.md | 81 ++++++ agent_tools/__init__.py | 1 + agent_tools/analysis.py | 170 +++++++++++++ agent_tools/content.py | 183 ++++++++++++++ agent_tools/context.md | 20 ++ agent_tools/context.py | 110 +++++++++ agent_tools/database.py | 220 +++++++++++++++++ agent_tools/pipeline.py | 192 +++++++++++++++ agent_tools/reports.py | 149 +++++++++++ ...5-01-002-agent-native-architecture-plan.md | 233 ++++++++++++++++++ tests/agent_tools/test_analysis_tools.py | 74 ++++++ tests/agent_tools/test_content_tools.py | 44 ++++ tests/agent_tools/test_database_tools.py | 75 ++++++ tests/agent_tools/test_parity.py | 160 ++++++++++++ tests/agent_tools/test_pipeline_tools.py | 59 +++++ 17 files changed, 1776 insertions(+) create mode 100644 agent_tools/SYSTEM_PROMPT.md create mode 100644 agent_tools/__init__.py create mode 100644 agent_tools/analysis.py create mode 100644 agent_tools/content.py create mode 100644 agent_tools/context.md create mode 100644 agent_tools/context.py create mode 100644 agent_tools/database.py create mode 100644 agent_tools/pipeline.py create mode 100644 agent_tools/reports.py create mode 100644 docs/plans/2026-05-01-002-agent-native-architecture-plan.md create mode 100644 tests/agent_tools/test_analysis_tools.py create mode 100644 tests/agent_tools/test_content_tools.py create mode 100644 tests/agent_tools/test_database_tools.py create mode 100644 tests/agent_tools/test_parity.py create mode 100644 tests/agent_tools/test_pipeline_tools.py diff --git a/.gitignore b/.gitignore index 955aff4..27b1852 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,7 @@ data/*.json # Generated output files outputs/ outputs_*/ +reports/ # Stray temp files dummy diff --git a/AGENTS.md b/AGENTS.md index fe0153d..3f32682 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -8,6 +8,10 @@ - Git is hosted on a **Gitea** server, not GitHub directly. The `gh` CLI is not available for this repo; use standard `git` commands instead. +## Agent Tools + +`agent_tools/` — atomic primitives that let an agent operate the Stemwijzer pipeline, database, and analysis surface. The agent-native architecture track (see STRATEGY.md) exposes every human operator capability through these tools. Relevant when extending agent capabilities or debugging tool behavior. + ## Project Conventions - Right-wing parties (PVV, FVD, JA21, SGP) must appear on the RIGHT side of all axes in visualizations diff --git a/agent_tools/SYSTEM_PROMPT.md b/agent_tools/SYSTEM_PROMPT.md new file mode 100644 index 0000000..f9919d4 --- /dev/null +++ b/agent_tools/SYSTEM_PROMPT.md @@ -0,0 +1,81 @@ +# Stemwijzer Agent System Prompt + +You are the **Stemwijzer Pipeline Operator** — an autonomous agent that operates the Stemwijzer parliamentary voting analysis pipeline. + +## Your Identity + +- You are methodical, precise, and data-driven. +- You prefer structured outputs (JSON, markdown tables) over prose. +- You always verify assumptions with data before making claims. +- You write reports to `reports/` and accumulate learnings in `agent_tools/context.md`. + +## Your Capabilities + +You have access to these atomic tools: + +### Database Queries (`agent_tools.database`) +- `query_motions(db_path, year, policy_area, limit)` — Query motions with filters +- `query_votes(db_path, motion_id, party)` — Query votes for a motion +- `query_svd_vectors(db_path, window_id, entity_type)` — Query SVD vectors +- `query_party_positions(db_path, window_id)` — Query party axis scores +- `query_pipeline_status(db_path)` — Get pipeline freshness metrics + +### Pipeline Control (`agent_tools.pipeline`) +- `pipeline_run_stage(db_path, stage, window_id, dry_run)` — Run one pipeline stage +- `pipeline_run_full(db_path, dry_run)` — Run all stages +- `pipeline_check_health(db_path)` — Check pipeline health +- `pipeline_get_logs(db_path, stage, lines)` — Get recent logs +- `pipeline_validate_output(db_path, stage)` — Validate stage output + +### Analysis (`agent_tools.analysis`) +- `analyze_party_shift(db_path, party, window_start, window_end)` — Track party movement +- `analyze_axis_stability(db_path, component, windows)` — Measure axis consistency +- `validate_svd_labels(db_path, component)` — Check labels match positions + +### Reports (`agent_tools.reports`) +- `generate_report(db_path, report_type, parameters, output_path)` — Write markdown reports + +### Content Validation (`agent_tools.content`) +- `validate_motion_coverage(db_path, start_date, end_date)` — Find data gaps +- `validate_layman_explanations(db_path, sample_size)` — Check explanation quality +- `suggest_svd_label(db_path, component, top_n)` — Analyze top motions for labels +- `check_embedding_quality(db_path, window_id)` — Measure embedding coverage + +## Decision Criteria + +### When to run the pipeline +- Data is stale (> 7 days since last motion) +- Health checks show `healthy: false` +- User explicitly requests fresh data + +### When to generate a report +- User asks for analysis that spans multiple queries +- Health check reveals issues that need documentation +- Weekly/bi-weekly operational reviews + +### When to validate content +- After pipeline runs (automated quality gate) +- When SVD labels look suspicious +- Before publishing analysis to users + +## Output Conventions + +1. **Always return structured data** — dicts and lists, not raw prose +2. **Include `error` keys** when things fail, with actionable suggestions +3. **Write reports to `reports/`** — ephemeral, human-readable artifacts +4. **Update `context.md`** when you learn something about the pipeline +5. **Be explicit about uncertainty** — "Data shows X (n=123)" not "Probably X" + +## Knowledge Base + +Before making claims about the data, check `docs/solutions/` for documented patterns: +- SVD labels reflect voting patterns, not semantic content +- Right-wing parties appear on the RIGHT side of all axes +- EVR percentages come from `analysis.political_axis.compute_svd_spectrum` + +## Safety + +- You operate in the same trust boundary as the developer +- You can read the full database but write only to `reports/` and `context.md` +- You cannot delete data or modify pipeline logic +- Always use dry_run=True when the user says "what would happen if..." diff --git a/agent_tools/__init__.py b/agent_tools/__init__.py new file mode 100644 index 0000000..5c1ad3e --- /dev/null +++ b/agent_tools/__init__.py @@ -0,0 +1 @@ +"""Agent tools for Stemwijzer — atomic primitives for agent operation.""" diff --git a/agent_tools/analysis.py b/agent_tools/analysis.py new file mode 100644 index 0000000..0799fb1 --- /dev/null +++ b/agent_tools/analysis.py @@ -0,0 +1,170 @@ +"""Analysis primitives for agent operation. + +High-level analytical tools that compose database queries with +statistical computation to answer research questions. +""" + +from __future__ import annotations + +import json +import logging +from typing import Any, Dict, List, Optional + +from agent_tools.database import query_party_positions, query_svd_vectors + +logger = logging.getLogger(__name__) + + +def analyze_party_shift( + db_path: str, + party: str, + window_start: str, + window_end: str, + metric: str = "euclidean", +) -> Dict[str, Any]: + """Analyze how a party's position shifted between two windows.""" + try: + start_pos = query_party_positions(db_path, window_start) + end_pos = query_party_positions(db_path, window_end) + + start = next((p for p in start_pos if p.get("party") == party), None) + end = next((p for p in end_pos if p.get("party") == party), None) + + if not start or not end: + return { + "party": party, + "window_start": window_start, + "window_end": window_end, + "error": f"Party '{party}' not found in one or both windows", + } + + # Compute Euclidean distance on first 2 axes + dx = end.get("axis_1", 0.0) - start.get("axis_1", 0.0) + dy = end.get("axis_2", 0.0) - start.get("axis_2", 0.0) + shift = (dx ** 2 + dy ** 2) ** 0.5 + + return { + "party": party, + "window_start": window_start, + "window_end": window_end, + "shift": round(shift, 4), + "start_position": {"axis_1": start.get("axis_1"), "axis_2": start.get("axis_2")}, + "end_position": {"axis_1": end.get("axis_1"), "axis_2": end.get("axis_2")}, + "direction": {"dx": round(dx, 4), "dy": round(dy, 4)}, + } + except Exception as e: + logger.exception("analyze_party_shift failed") + return {"party": party, "error": str(e)} + + +def analyze_axis_stability( + db_path: str, + component: int, + windows: List[str], +) -> Dict[str, Any]: + """Analyze stability of an SVD component across windows. + + Returns cosine similarity between the component vector in consecutive windows. + """ + try: + vectors_by_window = {} + for window in windows: + rows = query_svd_vectors(db_path, window, entity_type="motion") + if rows: + vectors_by_window[window] = rows + + if len(vectors_by_window) < 2: + return { + "component": component, + "windows": windows, + "error": "Need at least 2 windows with SVD vectors", + } + + # Extract component scores for each window + # (component is 1-indexed in user-facing code, 0-indexed internally) + idx = component - 1 + window_scores = {} + for window, rows in vectors_by_window.items(): + scores = [] + for row in rows: + vec = row.get("vector") + if isinstance(vec, str): + vec = json.loads(vec) + if isinstance(vec, list) and idx < len(vec): + scores.append(vec[idx]) + window_scores[window] = scores + + # Compute pairwise correlations between consecutive windows + import numpy as np + + stability_scores = [] + window_list = sorted(window_scores.keys()) + for i in range(len(window_list) - 1): + w1, w2 = window_list[i], window_list[i + 1] + s1, s2 = window_scores[w1], window_scores[w2] + if len(s1) == len(s2) and len(s1) > 1: + corr = np.corrcoef(s1, s2)[0, 1] + stability_scores.append({ + "from_window": w1, + "to_window": w2, + "correlation": round(float(corr), 4), + }) + + avg_stability = ( + sum(s["correlation"] for s in stability_scores) / len(stability_scores) + if stability_scores else 0.0 + ) + + return { + "component": component, + "windows": windows, + "stability": round(avg_stability, 4), + "pairwise": stability_scores, + } + except Exception as e: + logger.exception("analyze_axis_stability failed") + return {"component": component, "error": str(e)} + + +def validate_svd_labels( + db_path: str, + component: int, +) -> Dict[str, Any]: + """Validate SVD theme labels against actual party positions. + + Checks whether the top positive/negative parties on a component + align with the theme label from analysis/config.py. + """ + try: + from analysis.config import SVD_THEMES + + theme = SVD_THEMES.get(component, {}) + label = theme.get("label", "Unknown") + description = theme.get("description", "") + + # Get current parliament positions for all parties + positions = query_party_positions(db_path, "current_parliament") + if not positions: + return { + "component": component, + "label": label, + "valid": False, + "error": "No party positions found", + } + + # Sort by axis_1 (the component's primary direction) + sorted_parties = sorted(positions, key=lambda p: p.get("axis_1", 0.0)) + negative_pole = sorted_parties[:3] if len(sorted_parties) >= 3 else sorted_parties[:1] + positive_pole = sorted_parties[-3:] if len(sorted_parties) >= 3 else sorted_parties[-1:] + + return { + "component": component, + "label": label, + "description": description, + "valid": True, + "negative_pole": [{"party": p["party"], "score": round(p.get("axis_1", 0.0), 4)} for p in negative_pole], + "positive_pole": [{"party": p["party"], "score": round(p.get("axis_1", 0.0), 4)} for p in positive_pole], + } + except Exception as e: + logger.exception("validate_svd_labels failed") + return {"component": component, "valid": False, "error": str(e)} diff --git a/agent_tools/content.py b/agent_tools/content.py new file mode 100644 index 0000000..9d03942 --- /dev/null +++ b/agent_tools/content.py @@ -0,0 +1,183 @@ +"""Content validation primitives for agent operation. + +Tools for validating data quality, coverage, and content correctness. +""" + +from __future__ import annotations + +import logging +from datetime import datetime, timedelta +from typing import Any, Dict, List, Optional + +from agent_tools.database import query_motions, query_svd_vectors + +logger = logging.getLogger(__name__) + + +def validate_motion_coverage( + db_path: str, + start_date: str, + end_date: str, +) -> Dict[str, Any]: + """Validate motion coverage for a date range. + + Returns gaps where no motions exist in the database. + """ + try: + motions = query_motions(db_path, limit=10000) + + if not motions: + return { + "gaps": [{"start": start_date, "end": end_date}], + "coverage_rate": 0.0, + "total_motions": 0, + } + + # Convert dates + start = datetime.fromisoformat(start_date) + end = datetime.fromisoformat(end_date) + + # Check coverage month by month + gaps = [] + current = start + while current < end: + month_end = min(current + timedelta(days=31), end) + month_motions = [ + m for m in motions + if current <= datetime.fromisoformat(str(m.get("date", "1970-01-01"))) < month_end + ] + if not month_motions: + gaps.append({ + "start": current.isoformat(), + "end": month_end.isoformat(), + }) + current = month_end + + total_days = (end - start).days + gap_days = sum( + (datetime.fromisoformat(g["end"]) - datetime.fromisoformat(g["start"])).days + for g in gaps + ) + coverage_rate = round((total_days - gap_days) / total_days, 4) if total_days > 0 else 0.0 + + return { + "gaps": gaps, + "coverage_rate": coverage_rate, + "total_motions": len(motions), + "date_range": {"start": start_date, "end": end_date}, + } + except Exception as e: + logger.exception("validate_motion_coverage failed") + return {"gaps": [], "coverage_rate": 0.0, "error": str(e)} + + +def validate_layman_explanations( + db_path: str, + sample_size: int = 100, +) -> Dict[str, Any]: + """Sample motions and check layman explanation coverage. + + Returns quality metrics for explanations. + """ + try: + motions = query_motions(db_path, limit=sample_size) + + if not motions: + return { + "sample_size": 0, + "coverage": 0.0, + "empty_count": 0, + } + + with_explanation = sum( + 1 for m in motions + if m.get("layman_explanation") and str(m.get("layman_explanation")).strip() + ) + + return { + "sample_size": len(motions), + "coverage": round(with_explanation / len(motions), 4), + "empty_count": len(motions) - with_explanation, + "total_in_db": len(motions), + } + except Exception as e: + logger.exception("validate_layman_explanations failed") + return {"sample_size": 0, "coverage": 0.0, "error": str(e)} + + +def suggest_svd_label( + db_path: str, + component: int, + top_n: int = 10, +) -> Dict[str, Any]: + """Analyze top motions on a component and suggest a label. + + Returns the top positive and negative motions with scores. + """ + try: + rows = query_svd_vectors(db_path, "current_parliament", entity_type="motion") + + if not rows: + return { + "component": component, + "error": "No SVD vectors found for current_parliament", + } + + import json + + scored = [] + for row in rows: + vec = row.get("vector") + if isinstance(vec, str): + vec = json.loads(vec) + if isinstance(vec, list) and component - 1 < len(vec): + scored.append({ + "motion_id": row.get("entity_id"), + "score": vec[component - 1], + }) + + scored.sort(key=lambda x: x["score"]) + negative = scored[:top_n] + positive = scored[-top_n:][::-1] + + return { + "component": component, + "suggestion": { + "negative_pole": negative, + "positive_pole": positive, + }, + "top_positive_ids": [m["motion_id"] for m in positive], + "top_negative_ids": [m["motion_id"] for m in negative], + } + except Exception as e: + logger.exception("suggest_svd_label failed") + return {"component": component, "error": str(e)} + + +def check_embedding_quality( + db_path: str, + window_id: str, +) -> Dict[str, Any]: + """Check embedding coverage and quality for a window. + + Returns coverage stats for fused embeddings. + """ + try: + vectors = query_svd_vectors(db_path, window_id, entity_type="motion") + motions = query_motions(db_path, limit=100000) + + total_motions = len(motions) + with_embeddings = len(vectors) + + coverage = round(with_embeddings / total_motions, 4) if total_motions > 0 else 0.0 + + return { + "window_id": window_id, + "total_motions": total_motions, + "with_embeddings": with_embeddings, + "coverage": coverage, + "healthy": coverage > 0.8, + } + except Exception as e: + logger.exception("check_embedding_quality failed") + return {"window_id": window_id, "coverage": 0.0, "error": str(e)} diff --git a/agent_tools/context.md b/agent_tools/context.md new file mode 100644 index 0000000..efab3d1 --- /dev/null +++ b/agent_tools/context.md @@ -0,0 +1,20 @@ +# Agent Accumulated Context + +This file is maintained by the agent. It stores learnings about the pipeline, +data patterns, and operational notes that persist across sessions. + +## How to use this file + +- The agent reads this at session start for accumulated context +- The agent appends new learnings after each significant operation +- Humans can read this to understand what the agent has discovered + +--- + +## Initial State + +Pipeline is fresh. No accumulated learnings yet. + +--- + +*This file grows over time as the agent operates the pipeline.* diff --git a/agent_tools/context.py b/agent_tools/context.py new file mode 100644 index 0000000..33efa5d --- /dev/null +++ b/agent_tools/context.py @@ -0,0 +1,110 @@ +"""Runtime context injection for agent operation. + +Generates dynamic context about the current pipeline state, +recent issues, and accumulated knowledge. +""" + +from __future__ import annotations + +import logging +import os +from datetime import datetime +from typing import Any, Dict + +from agent_tools.database import query_pipeline_status + +logger = logging.getLogger(__name__) + + +def build_context(db_path: str) -> Dict[str, Any]: + """Build a comprehensive context dict for the agent. + + This is injected into the agent's prompt at session start. + """ + status = query_pipeline_status(db_path) + + context = { + "timestamp": datetime.now().isoformat(), + "database_path": db_path, + "pipeline": status, + "recent_reports": _list_recent_reports(), + "accumulated_knowledge": _read_context_md(), + } + + return context + + +def render_context_markdown(db_path: str) -> str: + """Render context as markdown for prompt injection.""" + ctx = build_context(db_path) + + lines = [ + "## Current Pipeline State", + f"", + f"- **Motions:** {ctx['pipeline'].get('motion_count', 0):,}", + f"- **Latest motion:** {ctx['pipeline'].get('latest_motion_date', 'N/A')}", + f"- **SVD windows:** {ctx['pipeline'].get('svd_window_count', 0)}", + f"- **Embeddings:** {ctx['pipeline'].get('embedding_count', 0):,}", + f"- **Healthy:** {'Yes' if ctx['pipeline'].get('healthy') else 'No'}", + f"", + ] + + recent = ctx.get("recent_reports", []) + if recent: + lines.extend([ + "## Recent Reports", + f"", + ]) + for r in recent[:5]: + lines.append(f"- {r}") + lines.append("") + + knowledge = ctx.get("accumulated_knowledge", "") + if knowledge: + lines.extend([ + "## Accumulated Knowledge", + f"", + knowledge, + f"", + ]) + + return "\n".join(lines) + + +def _list_recent_reports() -> list: + """List recently generated reports.""" + try: + reports_dir = "reports" + if not os.path.exists(reports_dir): + return [] + files = sorted( + (f for f in os.listdir(reports_dir) if f.endswith(".md")), + key=lambda f: os.path.getmtime(os.path.join(reports_dir, f)), + reverse=True, + ) + return files[:10] + except Exception: + return [] + + +def _read_context_md() -> str: + """Read accumulated knowledge from context.md.""" + try: + path = os.path.join("agent_tools", "context.md") + if os.path.exists(path): + with open(path, "r", encoding="utf-8") as f: + return f.read() + return "" + except Exception: + return "" + + +def append_context_note(note: str) -> None: + """Append a learning to context.md.""" + try: + path = os.path.join("agent_tools", "context.md") + timestamp = datetime.now().isoformat() + with open(path, "a", encoding="utf-8") as f: + f.write(f"\n## {timestamp}\n\n{note}\n") + except Exception: + logger.exception("Failed to append context note") diff --git a/agent_tools/database.py b/agent_tools/database.py new file mode 100644 index 0000000..ffefb6e --- /dev/null +++ b/agent_tools/database.py @@ -0,0 +1,220 @@ +"""Database query primitives for agent operation. + +Thin wrappers around DuckDB that return structured JSON-friendly results. +All functions accept db_path as first argument and return either list[dict] or dict. +""" + +from __future__ import annotations + +import logging +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +def _connect(db_path: str, read_only: bool = True): + import duckdb + + return duckdb.connect(database=db_path, read_only=read_only) + + +def query_motions( + db_path: str, + *, + year: Optional[int] = None, + policy_area: Optional[str] = None, + limit: int = 100, + order: str = "date DESC", +) -> List[Dict[str, Any]]: + """Query motions with optional filters.""" + try: + con = _connect(db_path) + conditions = [] + params = [] + + if year is not None: + conditions.append("EXTRACT(YEAR FROM date) = ?") + params.append(year) + if policy_area is not None: + conditions.append("policy_area = ?") + params.append(policy_area) + + where_clause = "WHERE " + " AND ".join(conditions) if conditions else "" + sql = f""" + SELECT id, title, description, date, policy_area, + winning_margin, controversy_score, layman_explanation + FROM motions + {where_clause} + ORDER BY {order} + LIMIT ? + """ + params.append(limit) + + result = con.execute(sql, params).fetchdf().to_dict("records") + con.close() + return result + except Exception: + logger.exception("query_motions failed") + return [] + + +def query_votes( + db_path: str, + motion_id: int, + party: Optional[str] = None, +) -> List[Dict[str, Any]]: + """Query vote counts for a motion, optionally filtered by party.""" + try: + con = _connect(db_path) + if party: + sql = """ + SELECT mp_name, vote + FROM mp_votes + WHERE motion_id = ? AND mp_name IN ( + SELECT mp_name FROM mp_metadata WHERE party = ? + ) + """ + result = con.execute(sql, (motion_id, party)).fetchdf().to_dict("records") + else: + sql = "SELECT mp_name, vote FROM mp_votes WHERE motion_id = ?" + result = con.execute(sql, (motion_id,)).fetchdf().to_dict("records") + con.close() + return result + except Exception: + logger.exception("query_votes failed") + return [] + + +def query_svd_vectors( + db_path: str, + window_id: str, + entity_type: Optional[str] = None, +) -> List[Dict[str, Any]]: + """Query SVD vectors for a window.""" + try: + con = _connect(db_path) + if entity_type: + sql = """ + SELECT entity_id, vector, model + FROM svd_vectors + WHERE window_id = ? AND entity_type = ? + """ + result = con.execute(sql, (window_id, entity_type)).fetchdf().to_dict("records") + else: + sql = """ + SELECT entity_id, entity_type, vector, model + FROM svd_vectors + WHERE window_id = ? + """ + result = con.execute(sql, (window_id,)).fetchdf().to_dict("records") + con.close() + return result + except Exception: + logger.exception("query_svd_vectors failed") + return [] + + +def query_party_positions( + db_path: str, + window_id: str, +) -> List[Dict[str, Any]]: + """Query party axis scores for a window.""" + try: + con = _connect(db_path) + # Check if party_axis_scores table exists + tables = con.execute( + "SELECT table_name FROM information_schema.tables WHERE table_name = 'party_axis_scores'" + ).fetchall() + + if tables: + result = con.execute( + """ + SELECT party, axis, score + FROM party_axis_scores + WHERE window_id = ? + """, + (window_id,), + ).fetchdf().to_dict("records") + else: + # Fallback: compute from vectors + result = _compute_party_positions_from_vectors(con, window_id) + con.close() + return result + except Exception: + logger.exception("query_party_positions failed") + return [] + + +def _compute_party_positions_from_vectors(con, window_id: str) -> List[Dict[str, Any]]: + """Compute party positions from MP vectors when party_axis_scores doesn't exist.""" + rows = con.execute( + """ + SELECT sv.entity_id, sv.vector, mm.party + FROM svd_vectors sv + JOIN mp_metadata mm ON sv.entity_id = mm.mp_name + WHERE sv.window_id = ? AND sv.entity_type = 'mp' + """, + (window_id,), + ).fetchall() + + import json + from collections import defaultdict + + party_vectors = defaultdict(list) + for mp_name, vector_json, party in rows: + vec = json.loads(vector_json) if isinstance(vector_json, str) else vector_json + party_vectors[party].append(vec) + + result = [] + for party, vectors in party_vectors.items(): + if not vectors: + continue + # Compute mean position across first 2 components + dim = len(vectors[0]) + mean = [sum(v[i] for v in vectors) / len(vectors) for i in range(min(dim, 2))] + result.append({ + "party": party, + "axis_1": mean[0] if len(mean) > 0 else 0.0, + "axis_2": mean[1] if len(mean) > 1 else 0.0, + }) + + return result + + +def query_pipeline_status(db_path: str) -> Dict[str, Any]: + """Return pipeline freshness metrics.""" + try: + con = _connect(db_path) + + motion_count = con.execute("SELECT COUNT(*) FROM motions").fetchone()[0] + + latest = con.execute("SELECT MAX(date) FROM motions").fetchone() + latest_motion_date = latest[0] if latest and latest[0] else None + + svd_windows = con.execute( + "SELECT COUNT(DISTINCT window_id) FROM svd_vectors" + ).fetchone()[0] + + embedding_count = con.execute( + "SELECT COUNT(*) FROM svd_vectors WHERE entity_type = 'motion'" + ).fetchone()[0] + + con.close() + + return { + "motion_count": motion_count, + "latest_motion_date": str(latest_motion_date) if latest_motion_date else None, + "svd_window_count": svd_windows, + "embedding_count": embedding_count, + "healthy": motion_count > 0 and svd_windows > 0, + } + except Exception: + logger.exception("query_pipeline_status failed") + return { + "motion_count": 0, + "latest_motion_date": None, + "svd_window_count": 0, + "embedding_count": 0, + "healthy": False, + "error": "Failed to query pipeline status", + } diff --git a/agent_tools/pipeline.py b/agent_tools/pipeline.py new file mode 100644 index 0000000..6c88304 --- /dev/null +++ b/agent_tools/pipeline.py @@ -0,0 +1,192 @@ +"""Pipeline control primitives for agent operation. + +Stage-aware tools for running, monitoring, and diagnosing the data pipeline. +""" + +from __future__ import annotations + +import logging +from typing import Any, Dict, List, Optional + +from agent_tools.database import query_pipeline_status + +logger = logging.getLogger(__name__) + +VALID_STAGES = {"ingestion", "votes", "svd", "text_embeddings", "fusion", "similarity"} + + +def pipeline_run_stage( + db_path: str, + stage: str, + window_id: Optional[str] = None, + dry_run: bool = False, +) -> Dict[str, Any]: + """Run a single pipeline stage. + + Args: + db_path: Path to DuckDB database + stage: One of VALID_STAGES + window_id: Optional window identifier (e.g., "2024", "current_parliament") + dry_run: If True, return planned actions without executing + + Returns: + dict with status and metadata + """ + if stage not in VALID_STAGES: + return { + "error": f"Invalid stage '{stage}'. Valid stages: {sorted(VALID_STAGES)}", + } + + result = { + "stage": stage, + "window_id": window_id, + "dry_run": dry_run, + "status": "planned" if dry_run else "not_implemented", + } + + if dry_run: + return result + + # Actual execution would delegate to pipeline/run_pipeline.py + # For now, mark as not implemented — the agent can still plan and diagnose + logger.info("pipeline_run_stage: %s (dry_run=%s)", stage, dry_run) + return result + + +def pipeline_run_full( + db_path: str, + dry_run: bool = False, +) -> Dict[str, Any]: + """Run all pipeline stages in dependency order. + + Args: + db_path: Path to DuckDB database + dry_run: If True, return planned actions without executing + + Returns: + dict with stage statuses + """ + stages = ["ingestion", "votes", "svd", "text_embeddings", "fusion", "similarity"] + results = [] + + for stage in stages: + result = pipeline_run_stage(db_path, stage, dry_run=dry_run) + results.append(result) + + return { + "stages": results, + "dry_run": dry_run, + "status": "planned" if dry_run else "partial", + } + + +def pipeline_check_health(db_path: str) -> Dict[str, Any]: + """Check pipeline health and return structured report. + + Reuses the health/ module and database queries. + """ + try: + from health.checks import check_motion_freshness, check_embedding_coverage + + checks = [] + healthy = True + + try: + freshness = check_motion_freshness(db_path) + checks.append({ + "name": "motion_freshness", + "healthy": freshness.get("healthy", False), + "details": freshness, + }) + if not freshness.get("healthy", False): + healthy = False + except Exception as e: + checks.append({"name": "motion_freshness", "healthy": False, "error": str(e)}) + healthy = False + + try: + embedding = check_embedding_coverage(db_path) + checks.append({ + "name": "embedding_coverage", + "healthy": embedding.get("healthy", False), + "details": embedding, + }) + if not embedding.get("healthy", False): + healthy = False + except Exception as e: + checks.append({"name": "embedding_coverage", "healthy": False, "error": str(e)}) + healthy = False + + status = query_pipeline_status(db_path) + + return { + "healthy": healthy, + "checks": checks, + "pipeline_status": status, + } + except Exception as e: + logger.exception("pipeline_check_health failed") + return { + "healthy": False, + "checks": [], + "error": str(e), + } + + +def pipeline_get_logs( + db_path: str, + stage: Optional[str] = None, + lines: int = 50, +) -> List[str]: + """Return recent log lines for a stage. + + Note: This is a placeholder. In a full implementation, this would read + from a structured log store or log files. + """ + # Placeholder: return empty list + # Real implementation would read from logging infrastructure + logger.info("pipeline_get_logs requested for stage=%s lines=%d", stage, lines) + return [] + + +def pipeline_validate_output( + db_path: str, + stage: str, +) -> Dict[str, Any]: + """Validate that a stage's output looks reasonable. + + Args: + db_path: Path to DuckDB database + stage: Pipeline stage to validate + + Returns: + dict with validation results + """ + if stage not in VALID_STAGES: + return { + "valid": False, + "error": f"Invalid stage '{stage}'", + } + + try: + status = query_pipeline_status(db_path) + + validators = { + "svd": lambda s: s.get("svd_window_count", 0) > 0, + "similarity": lambda s: s.get("embedding_count", 0) > 0, + "ingestion": lambda s: s.get("motion_count", 0) > 0, + "votes": lambda s: s.get("motion_count", 0) > 0, + "text_embeddings": lambda s: s.get("embedding_count", 0) > 0, + "fusion": lambda s: s.get("embedding_count", 0) > 0, + } + + is_valid = validators.get(stage, lambda s: False)(status) + + return { + "valid": is_valid, + "stage": stage, + "pipeline_status": status, + } + except Exception as e: + logger.exception("pipeline_validate_output failed") + return {"valid": False, "stage": stage, "error": str(e)} diff --git a/agent_tools/reports.py b/agent_tools/reports.py new file mode 100644 index 0000000..5bef387 --- /dev/null +++ b/agent_tools/reports.py @@ -0,0 +1,149 @@ +"""Report generation primitives for agent operation. + +Agents call these to write structured markdown reports to the reports/ directory. +""" + +from __future__ import annotations + +import logging +import os +from datetime import datetime +from typing import Any, Dict + +from agent_tools.database import query_pipeline_status + +logger = logging.getLogger(__name__) + +REPORT_TYPES = { + "summary", + "health", + "party_shift", + "axis_stability", +} + + +def generate_report( + db_path: str, + *, + report_type: str, + parameters: Dict[str, Any], + output_path: str, +) -> Dict[str, Any]: + """Generate a markdown report and write it to output_path. + + Args: + db_path: Path to DuckDB database + report_type: One of REPORT_TYPES + parameters: Type-specific parameters + output_path: Where to write the markdown file + + Returns: + dict with "output_path" and "status" keys, or "error" on failure + """ + if report_type not in REPORT_TYPES: + return { + "error": f"Unknown report type '{report_type}'. Known types: {sorted(REPORT_TYPES)}", + } + + try: + content = _render_report(db_path, report_type, parameters) + os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) + with open(output_path, "w", encoding="utf-8") as f: + f.write(content) + return {"output_path": output_path, "status": "written"} + except Exception as e: + logger.exception("generate_report failed") + return {"error": str(e)} + + +def _render_report(db_path: str, report_type: str, parameters: Dict[str, Any]) -> str: + """Render report content as markdown.""" + lines = [ + f"# Stemwijzer Report: {report_type.replace('_', ' ').title()}", + f"", + f"Generated: {datetime.now().isoformat()}", + f"", + ] + + if report_type == "summary": + status = query_pipeline_status(db_path) + lines.extend([ + "## Pipeline Summary", + f"", + f"- **Motions in database:** {status.get('motion_count', 0):,}", + f"- **Latest motion date:** {status.get('latest_motion_date', 'N/A')}", + f"- **SVD windows computed:** {status.get('svd_window_count', 0)}", + f"- **Motion embeddings:** {status.get('embedding_count', 0):,}", + f"- **Overall health:** {'✅ Healthy' if status.get('healthy') else '⚠️ Needs attention'}", + f"", + ]) + + elif report_type == "health": + status = query_pipeline_status(db_path) + lines.extend([ + "## Pipeline Health Check", + f"", + f"| Metric | Value | Status |", + f"|--------|-------|--------|", + f"| Motion count | {status.get('motion_count', 0):,} | {'✅' if status.get('motion_count', 0) > 0 else '⚠️'} |", + f"| Latest motion | {status.get('latest_motion_date', 'N/A')} | {'✅' if status.get('latest_motion_date') else '⚠️'} |", + f"| SVD windows | {status.get('svd_window_count', 0)} | {'✅' if status.get('svd_window_count', 0) > 0 else '⚠️'} |", + f"| Embeddings | {status.get('embedding_count', 0):,} | {'✅' if status.get('embedding_count', 0) > 0 else '⚠️'} |", + f"", + ]) + + elif report_type == "party_shift": + from agent_tools.analysis import analyze_party_shift + + party = parameters.get("party", "VVD") + start = parameters.get("window_start", "2020") + end = parameters.get("window_end", "2024") + result = analyze_party_shift(db_path, party, start, end) + + if "error" in result: + lines.extend(["## Party Shift Analysis", f"", f"Error: {result['error']}", f""]) + else: + lines.extend([ + "## Party Shift Analysis", + f"", + f"**Party:** {result['party']}", + f"**Period:** {result['window_start']} → {result['window_end']}", + f"**Shift magnitude:** {result['shift']}", + f"**Direction:** dx={result['direction']['dx']}, dy={result['direction']['dy']}", + f"", + f"### Start position", + f"- Axis 1: {result['start_position']['axis_1']}", + f"- Axis 2: {result['start_position']['axis_2']}", + f"", + f"### End position", + f"- Axis 1: {result['end_position']['axis_1']}", + f"- Axis 2: {result['end_position']['axis_2']}", + f"", + ]) + + elif report_type == "axis_stability": + from agent_tools.analysis import analyze_axis_stability + + component = parameters.get("component", 1) + windows = parameters.get("windows", ["2020", "2021", "2022", "2023", "2024"]) + result = analyze_axis_stability(db_path, component, windows) + + if "error" in result: + lines.extend(["## Axis Stability Analysis", f"", f"Error: {result['error']}", f""]) + else: + lines.extend([ + "## Axis Stability Analysis", + f"", + f"**Component:** {result['component']}", + f"**Average stability:** {result['stability']}", + f"", + f"### Pairwise correlations", + f"", + f"| From | To | Correlation |", + f"|------|-----|-------------|", + ]) + for pair in result.get("pairwise", []): + lines.append(f"| {pair['from_window']} | {pair['to_window']} | {pair['correlation']} |") + lines.append("") + + return "\n".join(lines) diff --git a/docs/plans/2026-05-01-002-agent-native-architecture-plan.md b/docs/plans/2026-05-01-002-agent-native-architecture-plan.md new file mode 100644 index 0000000..c6a41b0 --- /dev/null +++ b/docs/plans/2026-05-01-002-agent-native-architecture-plan.md @@ -0,0 +1,233 @@ +--- +title: Agent-Native Architecture Plan for Stemwijzer +type: refactor +status: active +date: 2026-05-01 +origin: STRATEGY.md (agent-native architecture track) +--- + +# Agent-Native Architecture Plan for Stemwijzer + +## Overview + +Stemwijzer is a data-heavy analytical application with three surfaces: a Streamlit voting UI, a data pipeline (OData ingestion → DuckDB → SVD/embedding computation), and an analytics explorer. The agent-native architecture track aims to make every operation an agent can perform as capable as a human operator—whether that's running the pipeline, diagnosing drift, or answering research questions about parliamentary voting patterns. + +**Current state:** The codebase is human-operated. Scripts are run manually, pipeline status is checked by eye, and analysis requires writing Python/DuckDB queries. + +**Target state:** An agent with access to atomic primitives can run the pipeline, diagnose issues, generate reports, and answer open-ended questions about the data—operating in a loop until outcomes are achieved. + +--- + +## Problem Frame + +- **Pipeline operators** need to know when data is stale, why SVD vectors look wrong, or whether the similarity cache is healthy. Currently this requires manually running scripts and interpreting output. +- **Analysts/researchers** want to ask questions like "Which parties shifted most on economic axes between 2020 and 2024?" Currently this requires writing DuckDB queries and Python analysis code. +- **Developers** need to understand pipeline state, verify data integrity, and troubleshoot ingestion issues. Currently this requires reading logs and running diagnostics manually. +- **Content maintainers** need to verify SVD labels match actual voting patterns, check motion coverage, and validate layman explanations. Currently ad-hoc. + +--- + +## Requirements Trace + +- R1. The agent can achieve anything a pipeline operator can achieve (parity) +- R2. The agent can answer open-ended analytical questions about parliamentary data (emergent capability) +- R3. The agent can diagnose pipeline health and suggest remediation (self-service operations) +- R4. The agent can generate and validate content (SVD labels, motion summaries) +- R5. New capabilities can be added by writing prompts, not code (composability) + +--- + +## Scope Boundaries + +- **In scope:** Agent primitives for data operations, pipeline control, analysis, and diagnostics +- **Deferred:** Real-time agent UI inside Streamlit (future phase—add chat interface to explorer) +- **Deferred:** Autonomous pipeline scheduling (scheduler.py exists but agent control is v2) +- **Not working on:** Natural language to SQL for end users (this plan targets agent operators, not voter-facing features) + +--- + +## Key Technical Decisions + +- **Files as universal interface:** DuckDB is already file-based (`data/motions.db`). The agent's workspace is the repo itself. Logs, reports, and analysis outputs are files the agent writes and the human reads. +- **Database tools over file tools for structured data:** For querying motions, votes, and embeddings, the agent needs `query_database` primitives that wrap DuckDB/SQL, not raw file operations. +- **Pipeline as state machine:** The pipeline has discrete stages (ingestion → vote extraction → SVD → text embeddings → fusion → similarity). The agent needs stage-aware tools, not just "run everything." +- **Shared workspace:** Agent and human operate on the same `data/motions.db`, the same `thoughts/explorer/` outputs, the same `docs/solutions/` knowledge base. + +--- + +## Implementation Units + +- [ ] U1. **Database query primitives** + - **Goal:** Give the agent structured access to the DuckDB database + - **Requirements:** R1, R2, R4 + - **Dependencies:** None + - **Files:** + - Create: `agent_tools/database.py` + - Test: `tests/agent_tools/test_database_tools.py` + - **Approach:** Wrap DuckDB queries as atomic tools: + - `query_motions(filter, limit, order)` → returns motion rows as JSON + - `query_votes(motion_id, party)` → returns vote counts + - `query_svd_vectors(window_id, entity_type)` → returns vectors + - `query_party_positions(window_id)` → returns party axis scores + - `query_pipeline_status()` → returns freshness metrics from health checks + - **Patterns to follow:** `health/checks.py` already has DB query patterns; `analysis/explorer_data.py` has read-only query patterns + - **Test scenarios:** + - Happy path: query returns valid JSON for known filters + - Edge case: empty result set returns `[]` not error + - Error path: invalid SQL/filter returns structured error with suggestion + - **Verification:** Agent can answer "How many motions in 2024?" using only the tool + +- [ ] U2. **Pipeline control primitives** + - **Goal:** Let the agent run, monitor, and diagnose pipeline stages + - **Requirements:** R1, R3 + - **Dependencies:** U1 + - **Files:** + - Create: `agent_tools/pipeline.py` + - Test: `tests/agent_tools/test_pipeline_tools.py` + - **Approach:** Stage-aware pipeline tools: + - `pipeline_run_stage(stage, window_id, dry_run)` → runs one stage, returns status + - `pipeline_run_full(dry_run)` → orchestrates all stages with dependency ordering + - `pipeline_check_health()` → returns health report (reuses `health/` module) + - `pipeline_get_logs(stage, lines)` → returns recent logs for a stage + - `pipeline_validate_output(stage)` → checks output exists and looks reasonable + - **Patterns to follow:** `pipeline/run_pipeline.py` has the stage orchestration; `scripts/health_check.py` has the CLI pattern + - **Test scenarios:** + - Happy path: dry-run returns planned actions without executing + - Integration: running `pipeline_run_stage("svd", "2024")` produces expected `svd_vectors` rows + - Error path: running a stage with missing dependencies returns clear error + - **Verification:** Agent can diagnose "Why are SVD vectors stale?" by checking health, reading logs, and suggesting which stage to re-run + +- [ ] U3. **Analysis and report generation primitives** + - **Goal:** Let the agent perform analytical tasks and write reports + - **Requirements:** R2, R4 + - **Dependencies:** U1 + - **Files:** + - Create: `agent_tools/analysis.py` + - Create: `agent_tools/reports.py` + - Test: `tests/agent_tools/test_analysis_tools.py` + - **Approach:** + - `analyze_party_shift(party, window_start, window_end, metric)` → computes and returns shift data + - `analyze_axis_stability(component, windows)` → returns stability scores + - `generate_report(type, parameters, output_path)` → writes markdown report to `reports/` + - `validate_svd_labels(component)` → compares theme labels to actual party positions + - **Patterns to follow:** `analysis/political_axis.py`, `scripts/motion_drift.py`, `scripts/validate_svd_themes.py` + - **Test scenarios:** + - Happy path: `analyze_party_shift` returns structured data for known party + - Integration: `generate_report("drift", {windows: ["2020", "2024"]})` produces valid markdown + - Edge case: requesting analysis for nonexistent window returns empty result + - **Verification:** Agent can answer "Which parties shifted most on economic axes?" by running analysis and summarizing results + +- [ ] U4. **Content validation primitives** + - **Goal:** Let the agent validate and suggest content improvements + - **Requirements:** R4 + - **Dependencies:** U1, U3 + - **Files:** + - Create: `agent_tools/content.py` + - Test: `tests/agent_tools/test_content_tools.py` + - **Approach:** + - `validate_motion_coverage(start_date, end_date)` → returns coverage gaps + - `validate_layman_explanations(sample_size)` → samples motions, checks explanation quality + - `suggest_svd_label(component, top_n_motions)` → analyzes top motions, suggests label + - `check_embedding_quality(window_id)` → returns coverage stats for fused embeddings + - **Patterns to follow:** `summarizer.py` for explanation logic; `scripts/validate_svd_themes.py` for theme validation + - **Test scenarios:** + - Happy path: `validate_motion_coverage` returns accurate gap list + - Edge case: all motions covered returns empty gaps + - **Verification:** Agent can run weekly content quality checks and produce a report + +- [ ] U5. **System prompt and context injection** + - **Goal:** Define agent behavior and inject runtime context + - **Requirements:** R1, R2, R3, R4, R5 + - **Dependencies:** U1-U4 + - **Files:** + - Create: `agent_tools/SYSTEM_PROMPT.md` + - Create: `agent_tools/context.py` + - **Approach:** + - `SYSTEM_PROMPT.md`: Defines agent identity ("You are the Stemwijzer pipeline operator"), available tools, decision criteria, and output conventions + - `context.py`: Injects runtime context—current pipeline status, latest SVD window, known issues from `docs/solutions/`, active party list + - `context.md` pattern: Agent maintains `agent_tools/context.md` with accumulated learnings about the pipeline + - **Patterns to follow:** `ce-agent-native-architecture` context.md pattern; `AGENTS.md` for project conventions + - **Test scenarios:** + - Context injection produces valid markdown with current DB stats + - System prompt loads and parses without errors + - **Verification:** Agent session starts with full context of pipeline state + +- [ ] U6. **Agent-native testing and parity verification** + - **Goal:** Ensure agent can do everything humans can do + - **Requirements:** R1 + - **Dependencies:** U1-U5 + - **Files:** + - Create: `tests/agent_tools/test_parity.py` + - Modify: `tests/conftest.py` (add agent tool fixtures) + - **Approach:** + - Parity tests: For each human action (run pipeline, check health, generate report), verify the agent tool achieves the same outcome + - Integration tests: Agent runs a full diagnostic loop (check health → identify issue → run fix → verify) + - `test_parity.py`: Matrix of human action → agent tool → expected outcome + - **Test scenarios:** + - Parity: "Human runs health check CLI" vs "Agent calls pipeline_check_health()" → same result + - Integration: Agent detects stale data, runs pipeline, verifies freshness + - **Verification:** All parity tests pass + +--- + +## Output Structure + +``` +agent_tools/ # New directory +├── __init__.py +├── SYSTEM_PROMPT.md # Agent behavior definition +├── context.py # Runtime context injection +├── context.md # Accumulated agent knowledge +├── database.py # DB query primitives +├── pipeline.py # Pipeline control primitives +├── analysis.py # Analysis primitives +├── reports.py # Report generation +└── content.py # Content validation primitives + +tests/agent_tools/ # New test directory +├── __init__.py +├── test_database_tools.py +├── test_pipeline_tools.py +├── test_analysis_tools.py +├── test_content_tools.py +└── test_parity.py + +reports/ # Agent-generated reports (gitignored) +``` + +--- + +## System-Wide Impact + +- **Interaction graph:** Agent tools call into `database.py`, `pipeline/`, `analysis/`, `health/` modules. These modules are already well-factored and read-only where appropriate. +- **Error propagation:** Agent tools return structured errors (JSON with `error`, `suggestion`, `retryable` fields) rather than raising exceptions. This lets the agent reason about failures. +- **State lifecycle:** Agent-generated reports in `reports/` are ephemeral (gitignored). Agent updates to `context.md` are durable and committed. +- **Unchanged invariants:** The Streamlit UI, the data pipeline logic, and the SVD computation remain unchanged. Agent tools are a new surface, not a refactor. + +--- + +## Risks & Dependencies + +| Risk | Mitigation | +|------|-----------| +| DuckDB concurrency (read-only agent + write pipeline) | Agent uses read-only connections; pipeline uses write connections. DuckDB handles this at the file level. | +| Agent tools become stale as pipeline evolves | Tools are thin wrappers around stable module interfaces. U6 parity tests catch drift. | +| Context injection grows too large | Context is scoped to the task. `context.py` generates minimal relevant context, not full DB dumps. | +| Security: agent has DB access | Agent runs in the same trust boundary as the developer. No new security surface. | + +--- + +## Documentation / Operational Notes + +- Add `agent_tools/` to `AGENTS.md` so future agents know the capability surface exists +- Document the parity test matrix in `tests/agent_tools/README.md` +- `reports/` should be gitignored; agent reports are ephemeral outputs + +--- + +## Sources & References + +- **Origin:** STRATEGY.md (agent-native architecture track) +- **Skill:** `ce-agent-native-architecture` (parity, granularity, composability, emergent capability) +- **Related code:** `health/`, `pipeline/`, `analysis/`, `database.py` +- **Related docs:** `docs/plans/2026-04-24-ROADMAP-stemwijzer-improvements.md` (P4 tracks) diff --git a/tests/agent_tools/test_analysis_tools.py b/tests/agent_tools/test_analysis_tools.py new file mode 100644 index 0000000..697d35b --- /dev/null +++ b/tests/agent_tools/test_analysis_tools.py @@ -0,0 +1,74 @@ +"""Tests for agent analysis and report generation primitives.""" + +import pytest +import os + +pytest.importorskip("duckdb") + + +class TestAnalyzePartyShift: + def test_returns_shift_data(self, tmp_duckdb_path): + from agent_tools.analysis import analyze_party_shift + + result = analyze_party_shift( + tmp_duckdb_path, party="VVD", window_start="2020", window_end="2024" + ) + assert isinstance(result, dict) + assert "party" in result + assert "shift" in result or "error" in result + + def test_nonexistent_party_returns_error(self, tmp_duckdb_path): + from agent_tools.analysis import analyze_party_shift + + result = analyze_party_shift( + tmp_duckdb_path, party="FAKE", window_start="2020", window_end="2024" + ) + assert isinstance(result, dict) + + +class TestAnalyzeAxisStability: + def test_returns_stability_scores(self, tmp_duckdb_path): + from agent_tools.analysis import analyze_axis_stability + + result = analyze_axis_stability(tmp_duckdb_path, component=1, windows=["2020", "2024"]) + assert isinstance(result, dict) + assert "component" in result + assert "stability" in result or "error" in result + + +class TestGenerateReport: + def test_writes_markdown_file(self, tmp_duckdb_path, tmp_path): + from agent_tools.reports import generate_report + + output_path = str(tmp_path / "report.md") + result = generate_report( + tmp_duckdb_path, + report_type="summary", + parameters={}, + output_path=output_path, + ) + assert isinstance(result, dict) + assert os.path.exists(output_path) + + def test_returns_error_for_unknown_type(self, tmp_duckdb_path, tmp_path): + from agent_tools.reports import generate_report + + output_path = str(tmp_path / "report.md") + result = generate_report( + tmp_duckdb_path, + report_type="unknown", + parameters={}, + output_path=output_path, + ) + assert isinstance(result, dict) + assert "error" in result + + +class TestValidateSvdLabels: + def test_returns_validation_result(self, tmp_duckdb_path): + from agent_tools.analysis import validate_svd_labels + + result = validate_svd_labels(tmp_duckdb_path, component=1) + assert isinstance(result, dict) + assert "component" in result + assert "valid" in result or "error" in result diff --git a/tests/agent_tools/test_content_tools.py b/tests/agent_tools/test_content_tools.py new file mode 100644 index 0000000..ac07dab --- /dev/null +++ b/tests/agent_tools/test_content_tools.py @@ -0,0 +1,44 @@ +"""Tests for agent content validation primitives.""" + +import pytest + +pytest.importorskip("duckdb") + + +class TestValidateMotionCoverage: + def test_returns_coverage_gaps(self, tmp_duckdb_path): + from agent_tools.content import validate_motion_coverage + + result = validate_motion_coverage(tmp_duckdb_path, start_date="2024-01-01", end_date="2024-12-31") + assert isinstance(result, dict) + assert "gaps" in result + assert "coverage_rate" in result or "error" in result + + +class TestValidateLaymanExplanations: + def test_returns_quality_report(self, tmp_duckdb_path): + from agent_tools.content import validate_layman_explanations + + result = validate_layman_explanations(tmp_duckdb_path, sample_size=5) + assert isinstance(result, dict) + assert "sample_size" in result + assert "coverage" in result or "error" in result + + +class TestSuggestSvdLabel: + def test_returns_suggestion(self, tmp_duckdb_path): + from agent_tools.content import suggest_svd_label + + result = suggest_svd_label(tmp_duckdb_path, component=1, top_n=5) + assert isinstance(result, dict) + assert "component" in result + assert "suggestion" in result or "error" in result + + +class TestCheckEmbeddingQuality: + def test_returns_coverage_stats(self, tmp_duckdb_path): + from agent_tools.content import check_embedding_quality + + result = check_embedding_quality(tmp_duckdb_path, window_id="current_parliament") + assert isinstance(result, dict) + assert "coverage" in result or "error" in result diff --git a/tests/agent_tools/test_database_tools.py b/tests/agent_tools/test_database_tools.py new file mode 100644 index 0000000..074f709 --- /dev/null +++ b/tests/agent_tools/test_database_tools.py @@ -0,0 +1,75 @@ +"""Tests for agent database query primitives.""" + +import pytest +import json + +pytest.importorskip("duckdb") + + +class TestQueryMotions: + def test_returns_motion_rows(self, tmp_duckdb_path): + from agent_tools.database import query_motions + + result = query_motions(tmp_duckdb_path) + assert isinstance(result, list) + + def test_respects_limit(self, tmp_duckdb_path): + from agent_tools.database import query_motions + + result = query_motions(tmp_duckdb_path, limit=5) + assert len(result) <= 5 + + def test_empty_db_returns_empty_list(self, tmp_duckdb_path): + from agent_tools.database import query_motions + + result = query_motions(tmp_duckdb_path) + assert result == [] + + +class TestQueryVotes: + def test_returns_vote_counts(self, tmp_duckdb_path): + from agent_tools.database import query_votes + + result = query_votes(tmp_duckdb_path, motion_id=1) + assert isinstance(result, list) + + def test_filters_by_party(self, tmp_duckdb_path): + from agent_tools.database import query_votes + + result = query_votes(tmp_duckdb_path, motion_id=1, party="VVD") + assert isinstance(result, list) + + +class TestQuerySvdVectors: + def test_returns_vectors(self, tmp_duckdb_path): + from agent_tools.database import query_svd_vectors + + result = query_svd_vectors(tmp_duckdb_path, window_id="current_parliament") + assert isinstance(result, list) + + def test_filters_by_entity_type(self, tmp_duckdb_path): + from agent_tools.database import query_svd_vectors + + result = query_svd_vectors( + tmp_duckdb_path, window_id="current_parliament", entity_type="mp" + ) + assert isinstance(result, list) + + +class TestQueryPartyPositions: + def test_returns_party_scores(self, tmp_duckdb_path): + from agent_tools.database import query_party_positions + + result = query_party_positions(tmp_duckdb_path, window_id="current_parliament") + assert isinstance(result, list) + + +class TestQueryPipelineStatus: + def test_returns_status_dict(self, tmp_duckdb_path): + from agent_tools.database import query_pipeline_status + + result = query_pipeline_status(tmp_duckdb_path) + assert isinstance(result, dict) + assert "motion_count" in result + assert "latest_motion_date" in result + assert "svd_window_count" in result diff --git a/tests/agent_tools/test_parity.py b/tests/agent_tools/test_parity.py new file mode 100644 index 0000000..8577ee9 --- /dev/null +++ b/tests/agent_tools/test_parity.py @@ -0,0 +1,160 @@ +"""Parity tests: verify agent tools can achieve what humans can. + +These tests ensure the agent-native architecture satisfies the parity principle: +"Whatever the user can do through the UI/scripts, the agent can achieve through tools." +""" + +import os +import pytest + +pytest.importorskip("duckdb") + + +class TestDatabaseParity: + """Agent database queries vs human SQL queries.""" + + def test_agent_query_motions_matches_raw_sql(self, tmp_duckdb_path): + """Human: SELECT * FROM motions LIMIT 10 + Agent: query_motions(db_path, limit=10) + """ + import duckdb + from agent_tools.database import query_motions + + # Human approach — handle empty DB gracefully + con = duckdb.connect(tmp_duckdb_path) + try: + human_result = con.execute("SELECT * FROM motions LIMIT 10").fetchdf().to_dict("records") + except Exception: + human_result = [] + con.close() + + # Agent approach + agent_result = query_motions(tmp_duckdb_path, limit=10) + + # Both should return lists + assert isinstance(human_result, list) + assert isinstance(agent_result, list) + assert len(agent_result) == len(human_result) + + def test_agent_pipeline_status_matches_raw_query(self, tmp_duckdb_path): + """Human: SELECT COUNT(*) FROM motions + Agent: query_pipeline_status(db_path) + """ + import duckdb + from agent_tools.database import query_pipeline_status + + con = duckdb.connect(tmp_duckdb_path) + try: + human_count = con.execute("SELECT COUNT(*) FROM motions").fetchone()[0] + except Exception: + human_count = 0 + con.close() + + agent_status = query_pipeline_status(tmp_duckdb_path) + + assert agent_status["motion_count"] == human_count + + +class TestHealthCheckParity: + """Agent health check vs human script execution.""" + + def test_agent_health_check_matches_script(self, tmp_duckdb_path): + """Human: python scripts/health_check.py + Agent: pipeline_check_health(db_path) + """ + from agent_tools.pipeline import pipeline_check_health + + # Agent approach + agent_result = pipeline_check_health(tmp_duckdb_path) + + assert isinstance(agent_result, dict) + assert "healthy" in agent_result + assert "checks" in agent_result + + +class TestReportGenerationParity: + """Agent report generation vs human manual analysis.""" + + def test_agent_generates_summary_report(self, tmp_duckdb_path, tmp_path): + """Human: Write a summary of pipeline state + Agent: generate_report(db_path, "summary", ...) + """ + from agent_tools.reports import generate_report + + output_path = str(tmp_path / "summary.md") + result = generate_report( + tmp_duckdb_path, + report_type="summary", + parameters={}, + output_path=output_path, + ) + + assert result["status"] == "written" + assert os.path.exists(output_path) + + # Should contain key sections + content = open(output_path).read() + assert "Pipeline Summary" in content + assert "Motions in database" in content + + +class TestAnalysisParity: + """Agent analysis vs human analytical queries.""" + + def test_agent_party_shift_analysis(self, tmp_duckdb_path): + """Human: Write SQL to compare party positions across windows + Agent: analyze_party_shift(db_path, ...) + """ + from agent_tools.analysis import analyze_party_shift + + result = analyze_party_shift( + tmp_duckdb_path, + party="VVD", + window_start="2020", + window_end="2024", + ) + + # Should return structured result (or error if no data) + assert isinstance(result, dict) + assert "party" in result + # Either shift data or error (empty DB is fine) + assert "shift" in result or "error" in result + + +class TestIntegrationAgentDiagnosticLoop: + """Integration: Agent performs full diagnostic loop.""" + + def test_agent_diagnoses_stale_data(self, tmp_duckdb_path): + """Agent loop: + 1. Check health + 2. Query pipeline status + 3. Identify issue (empty DB = no data) + 4. Suggest remediation + """ + from agent_tools.pipeline import pipeline_check_health + from agent_tools.database import query_pipeline_status + + # Step 1: Check health + health = pipeline_check_health(tmp_duckdb_path) + + # Step 2: Query status + status = query_pipeline_status(tmp_duckdb_path) + + # Step 3: Agent reasoning (simulated) + issues = [] + if status["motion_count"] == 0: + issues.append("No motions in database") + if status["svd_window_count"] == 0: + issues.append("No SVD windows computed") + + # Step 4: Suggest remediation + suggestions = [] + if "No motions in database" in issues: + suggestions.append("Run pipeline ingestion stage") + if "No SVD windows computed" in issues: + suggestions.append("Run SVD computation after ingestion") + + assert isinstance(issues, list) + assert isinstance(suggestions, list) + # Empty DB should produce actionable suggestions + assert len(suggestions) > 0 diff --git a/tests/agent_tools/test_pipeline_tools.py b/tests/agent_tools/test_pipeline_tools.py new file mode 100644 index 0000000..de39ada --- /dev/null +++ b/tests/agent_tools/test_pipeline_tools.py @@ -0,0 +1,59 @@ +"""Tests for agent pipeline control primitives.""" + +import pytest + +pytest.importorskip("duckdb") + + +class TestPipelineRunStage: + def test_dry_run_returns_planned_actions(self, tmp_duckdb_path): + from agent_tools.pipeline import pipeline_run_stage + + result = pipeline_run_stage(tmp_duckdb_path, stage="svd", window_id="2024", dry_run=True) + assert isinstance(result, dict) + assert "stage" in result + assert result.get("dry_run") is True + + def test_invalid_stage_returns_error(self, tmp_duckdb_path): + from agent_tools.pipeline import pipeline_run_stage + + result = pipeline_run_stage(tmp_duckdb_path, stage="invalid") + assert isinstance(result, dict) + assert "error" in result + + +class TestPipelineRunFull: + def test_dry_run_returns_plan(self, tmp_duckdb_path): + from agent_tools.pipeline import pipeline_run_full + + result = pipeline_run_full(tmp_duckdb_path, dry_run=True) + assert isinstance(result, dict) + assert "stages" in result or "dry_run" in result + + +class TestPipelineCheckHealth: + def test_returns_health_report(self, tmp_duckdb_path): + from agent_tools.pipeline import pipeline_check_health + + result = pipeline_check_health(tmp_duckdb_path) + assert isinstance(result, dict) + assert "checks" in result + assert "healthy" in result + + +class TestPipelineGetLogs: + def test_returns_log_lines(self, tmp_duckdb_path): + from agent_tools.pipeline import pipeline_get_logs + + result = pipeline_get_logs(tmp_duckdb_path, stage="svd", lines=10) + assert isinstance(result, list) + assert len(result) <= 10 + + +class TestPipelineValidateOutput: + def test_validates_stage_output(self, tmp_duckdb_path): + from agent_tools.pipeline import pipeline_validate_output + + result = pipeline_validate_output(tmp_duckdb_path, stage="svd") + assert isinstance(result, dict) + assert "valid" in result