chore: confirm deletion of stale files

main
Sven Geboers 4 weeks ago
parent eb71328967
commit 5ddf2cd85a
  1. 43
      .mindmodel/anti-patterns.yaml
  2. 35
      .mindmodel/architecture.yaml
  3. 32
      .mindmodel/conventions.yaml
  4. 55
      .mindmodel/dependencies.yaml
  5. 37
      .mindmodel/domain-glossary.yaml
  6. 33
      .mindmodel/stack.yaml
  7. 6
      main.py
  8. 264
      scheduler.py
  9. 183
      scraper.py
  10. 9
      verify.py

@ -1,43 +0,0 @@
# Known anti-patterns and recommended remediation (Phase 1 findings)
anti_patterns:
- id: broad_except_swallows_errors
description: "Wide except: clauses that swallow exceptions without logging or re-raising."
examples:
- path: multiple
note: "Observed in various pipeline and ingestion spots where except Exception: returns a default without context."
remediation:
- "Replace broad except with specific exceptions."
- "When broad except is absolutely needed, call logger.exception(...) and re-raise or convert to a typed domain error."
- "Add unit tests to ensure critical errors are visible in CI logs."
- id: mixed_print_and_logging
description: "Mixing print() and logging() for errors and info messages."
examples:
- path: api_client.py
excerpt: |
```python
print(f"Fetched {len(voting_records)} voting records from API")
...
except Exception as e:
print(f"Error fetching motions from API: {e}")
```
remediation:
- "Use logging.getLogger(__name__) and logger.info/warning/exception consistently."
- "Add a top-level logging configuration for Streamlit and scripts."
- id: no_lockfile
description: "No lockfile present -> unreproducible installs and CI unpredictability."
remediation:
- "Add a lockfile (poetry.lock, requirements.txt produced by pip-tools) and pin versions in CI."
- "Make CI use the lockfile for reproducible builds."
- id: declared_but_unused_dependency
description: "Dependency declared but unused (openai in pyproject)."
remediation:
- "Either remove the dependency or add clear adapter code/tests that exercise it. Keep pyproject tidy."
- id: brittle_identity_heuristics
description: "Heuristics for MP identity (comma-based parsing) are brittle."
remediation:
- "Add robust parsing rules and unit tests; prefer canonical identifiers (persoon_id) where available."

@ -1,35 +0,0 @@
# Architecture overview and confidence levels
layers:
- name: ui
description: "Streamlit pages and app entrypoints (Home.py, pages/*)."
confidence: high
- name: ingestion
description: "API client and scrapers (api_client.py, scraper.py)."
confidence: high
- name: processing
description: "Pipelines for embeddings, SVD, fusion (pipeline/*, similarity/*)."
confidence: high
- name: storage
description: "DuckDB primary store; JSON fallback used in tests when duckdb missing."
confidence: high
- name: ai_provider
description: "Lightweight HTTP wrapper around OpenRouter/OpenAI-style backends in ai_provider.py."
confidence: medium
- name: orchestration
description: "Script-based orchestration (scripts/*.py), rerun_embeddings, scheduler."
confidence: medium
organization:
- Keep UI code separated from heavy compute — Streamlit runs should avoid heavy compute inline (use subprocess or schedule).
- Pipelines are implemented as re-entrant functions returning summary dicts to facilitate testing and subprocess usage (seen in svd_pipeline.compute_svd_for_window).
- DB access is centralised via MotionDatabase helper (database.py) with convenience methods (store_fused_embedding, append_audit_event).
design_decisions:
- Use DuckDB for local fast analytics storage; read_only connections used in compute stages to allow parallel workers.
- Embeddings and similarity cache are stored as JSON in DuckDB tables (vector columns).
- The ai_provider uses requests with retry/backoff rather than a heavy SDK to keep testing simple.
confidence_summary:
overall_confidence: high
notes: "Phase 1 input inspected files across the repo; design mapping is consistent with code samples."

@ -1,32 +0,0 @@
# Coding conventions cheat-sheet (extracted from Phase 1)
naming:
module_files: snake_case (e.g., text_pipeline.py, ai_provider.py)
functions: snake_case
classes: PascalCase
constants: UPPER_SNAKE_CASE
module_singletons: module-level instances, named lower_snake (e.g., db = MotionDatabase())
imports:
order:
- stdlib
- third-party
- local application imports
style:
- group imports with a blank line between groups
- prefer "from x import y" only when needed to avoid circular imports
types_and_dataclasses:
- Use type hints broadly (functions, public APIs)
- config should be a dataclass in config.py
- Module-level singletons are allowed (but follow lifecycle rules in db_connection constraints)
tests:
- pytest
- tests/ directory, files named test_*.py
- Use fixtures in tests/fixtures and conftest.py
- Tests expect raises(...) for invalid input or ProviderError
error_handling:
- Prefer explicit exceptions (ValueError, ProviderError)
- Avoid overly-broad except: clauses (see anti-patterns)

@ -1,55 +0,0 @@
# Dependencies map and recommended extras (Phase 1 authoritative)
declared:
- streamlit
- duckdb
- ibis-framework[duckdb]
- plotly
- scikit-learn
- scipy
- umap-learn
- openai # note: declared but not observed imported; review usage
- requests
observed:
- requests
- duckdb (used but sometimes import guarded)
- numpy
- pytest
grouped:
core:
- python >=3.13
- streamlit
- duckdb
- ibis-framework[duckdb]
- requests
ml:
- scikit-learn
- scipy
- umap-learn
- numpy
viz:
- plotly
testing:
- pytest
recommended_extras:
reproducibility:
- poetry (poetry.lock) or pip-tools (requirements.txt + requirements.in)
- pipx or virtualenv usage documented
linting_and_formatting:
- black
- ruff
- isort
- mypy
logging_and_monitoring:
- structlog (optional)
containerization:
- docker (already used)
heavy_analytics (optional):
- pandas
- altair
- dash (if more interactive dashboards are needed)
notes:
- Because no lockfile was present during Phase 1, adding one is high priority for reproducible CI builds.
- openai is declared but not imported anywhere in Phase 1 files; prefer to either remove or add an explicit adapter usage and tests.

@ -1,37 +0,0 @@
# Domain glossary (core concepts from Phase 1)
terms:
Motion:
short: "A parliamentary motion/decision"
keys: [id, title, description, date, body_text, url]
motie:
short: "Dutch: motion (motie). Equivalent to Motion in code comments and UI."
MP:
short: "Member of Parliament (kamerlid)"
keys: [mp_name, party, van, tot_en_met, persoon_id]
mp_votes:
short: "Raw voting rows: motion_id, mp_name, vote, date"
mp_metadata:
short: "Per-MP metadata table and fields"
user_sessions:
short: "Streamlit user quiz session state (session_id, user_votes, completed_motions...)"
embeddings:
short: "Raw text embeddings stored per motion (embeddings table)"
svd_vectors:
short: "SVD-derived vectors from the vote matrix (svd_vectors table)"
fused_embeddings:
short: "Concatenation of SVD and text embeddings (fused_embeddings table)"
similarity_cache:
short: "Precomputed nearest neighbors for each motion"
window_id:
short: "Processing window identifier used for SVD/fusion runs"
controversy_score:
short: "Numeric measure stored in motions table"
winning_margin:
short: "Numeric field indicating margin of win in a vote"
Politiek_Kompas:
short: "Political compass; also appears in UI features"
MP_quiz:
short: "Interactive quiz derived from motions and mp_votes"
notes:
- Use these canonical terms in docs, tests, variable names and DB schemas.

@ -1,33 +0,0 @@
# Tech stack (Phase 1 authoritative)
language:
name: python
version: ">=3.13"
frameworks:
- streamlit: ">=1.48.0" # UI: Home.py, pages/..., app.py
database:
primary: duckdb
orm_or_adapter: ibis-framework[duckdb] # used for some parts
visualization:
- plotly
ml:
- scikit-learn
- scipy
- umap-learn
ai:
declared_dependency: openai # declared in pyproject but not observed imported; ai_provider uses requests
runtime_adapter: custom requests-based wrapper (ai_provider.py)
container:
- docker: Dockerfile FROM python:3.13-slim, EXPOSE 8501, CMD streamlit run Home.py
testing:
- pytest
ci:
- drone: .drone.yml present

@ -1,6 +0,0 @@
def main():
print("Hello from stemwijzer!")
if __name__ == "__main__":
main()

@ -1,264 +0,0 @@
# scheduler.py (fixed infinite loop issue)
import schedule
import time
import duckdb
from datetime import datetime, timedelta
from api_client import TweedeKamerAPI
from summarizer import summarizer
from database import db
from config import config
class DataUpdateScheduler:
def __init__(self):
self.api_client = TweedeKamerAPI()
def test_api_connection(self) -> bool:
"""Test API connection before proceeding"""
print("Testing API connection...")
if self.api_client.test_api_connection():
print("✅ API connection successful")
return True
else:
print("❌ API connection failed")
return False
def check_database_has_data(self) -> bool:
"""Check if database has any motion data"""
try:
conn = duckdb.connect(config.DATABASE_PATH)
result = conn.execute("SELECT COUNT(*) FROM motions").fetchone()
conn.close()
return result[0] > 0 if result else False
except Exception as e:
print(f"Error checking database: {e}")
return False
def update_motions_data(self, days_back: int = 30, max_records: int = 1000):
"""Fetch new motions from API and update database"""
print(f"Starting motion data update at {datetime.now()}")
if not self.test_api_connection():
return False
try:
# Fetch recent motions from API (respecting API limits)
start_date = datetime.now() - timedelta(days=days_back)
motions = self.api_client.get_motions(
start_date=start_date,
limit=max_records
)
print(f"Fetched {len(motions)} motions from API")
if not motions:
print("No motions received from API")
return False
# Insert new motions into database
successful_inserts = 0
duplicate_count = 0
for motion in motions:
if db.insert_motion(motion):
successful_inserts += 1
else:
duplicate_count += 1
print(f"Successfully inserted {successful_inserts} new motions")
if duplicate_count > 0:
print(f"Skipped {duplicate_count} duplicate motions")
# Generate AI summaries for new motions (only if we have new data)
if successful_inserts > 0:
print("Generating AI summaries for new motions...")
summarizer.update_motion_summaries()
print("Motion data update completed successfully")
return True
except Exception as e:
print(f"Error during motion data update: {e}")
return False
def initial_data_load(self):
"""Perform initial data load with comprehensive data"""
print("Performing initial comprehensive data load...")
if not self.test_api_connection():
return False
try:
# Start from 2 years ago but make sure we don't go into the future
start_date = datetime.now() - timedelta(days=730)
end_date = datetime.now()
print(f"Loading data from {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}")
# Use a single request for recent data first, then expand if needed
chunk_days = 90 # 3-month chunks
current_date = start_date
all_motions = []
chunks_processed = 0
max_chunks = 10 # Safety limit to prevent infinite loops
while current_date < end_date and chunks_processed < max_chunks:
chunk_end_date = min(current_date + timedelta(days=chunk_days), end_date)
print(f"Fetching chunk {chunks_processed + 1}/{max_chunks}: {current_date.strftime('%Y-%m-%d')} to {chunk_end_date.strftime('%Y-%m-%d')}")
try:
# Fetch data for this time chunk
chunk_motions = self.api_client.get_motions(
start_date=current_date,
end_date=chunk_end_date,
limit=250 # Reasonable limit per chunk
)
if chunk_motions:
all_motions.extend(chunk_motions)
print(f"✅ Found {len(chunk_motions)} motions in this chunk (Total: {len(all_motions)})")
else:
print(f" No motions found in chunk {current_date.strftime('%Y-%m-%d')} to {chunk_end_date.strftime('%Y-%m-%d')}")
except Exception as e:
print(f"❌ Error fetching chunk {current_date.strftime('%Y-%m-%d')} to {chunk_end_date.strftime('%Y-%m-%d')}: {e}")
# IMPORTANT: Always increment the date to avoid infinite loop
current_date = chunk_end_date
chunks_processed += 1
# Add delay between chunks
if chunks_processed < max_chunks and current_date < end_date:
time.sleep(2)
print(f"Data collection completed. Total motions fetched: {len(all_motions)}")
if not all_motions:
print("❌ No motions retrieved from API. This might be normal if the API doesn't have recent data.")
print("💡 Try adjusting the date range or check if the API has data for the selected period.")
# Try a broader date range as fallback
print("🔄 Trying broader date range (last 30 days)...")
fallback_start = datetime.now() - timedelta(days=30)
fallback_motions = self.api_client.get_motions(
start_date=fallback_start,
limit=250
)
if fallback_motions:
all_motions = fallback_motions
print(f"✅ Fallback successful: Found {len(fallback_motions)} motions")
else:
print("❌ No data found even with broader date range")
return False
# Insert all motions with progress tracking
successful_inserts = 0
duplicate_count = 0
print(f"Inserting {len(all_motions)} motions into database...")
for i, motion in enumerate(all_motions):
if i % 25 == 0: # Progress indicator every 25 motions
print(f"Processing motion {i+1}/{len(all_motions)} ({((i+1)/len(all_motions)*100):.1f}%)")
if db.insert_motion(motion):
successful_inserts += 1
else:
duplicate_count += 1
print(f"✅ Successfully inserted {successful_inserts} motions")
if duplicate_count > 0:
print(f" Skipped {duplicate_count} duplicate motions")
# Generate summaries if we have data
if successful_inserts > 0:
print("🤖 Generating AI summaries...")
summarizer.update_motion_summaries()
print("🎉 Initial data load completed!")
return successful_inserts > 0
except Exception as e:
print(f"❌ Error during initial data load: {e}")
return False
def weekly_update_job(self):
"""Weekly job to update with new motions"""
print(f"Starting weekly update job at {datetime.now()}")
# Use smaller limits for regular updates
self.update_motions_data(days_back=14, max_records=250)
print("Weekly update job completed")
def run_scheduler(self):
"""Main scheduler function"""
print("=" * 50)
print("Dutch Political Compass Data Scheduler")
print("=" * 50)
# Check if database has data
has_data = self.check_database_has_data()
print(f"Database has existing data: {has_data}")
if not has_data:
print("\n🔄 No data found in database. Running initial data load...")
success = self.initial_data_load()
if success:
print("✅ Initial data load completed successfully!")
else:
print("❌ Initial data load failed or no data available.")
print("💡 You may need to check the API or adjust the date range.")
return
else:
print("✅ Database already contains motion data.")
# Ask if user wants to update anyway
try:
response = input("\nDo you want to fetch recent motions anyway? (y/n): ").lower().strip()
if response in ['y', 'yes']:
print("🔄 Updating with recent motions...")
self.update_motions_data(days_back=7, max_records=250)
except KeyboardInterrupt:
print("\nSkipping manual update.")
# Schedule regular updates
print("\n📅 Scheduling regular updates...")
schedule.every().monday.at("02:00").do(self.weekly_update_job)
schedule.every().thursday.at("14:00").do(lambda: self.update_motions_data(days_back=7, max_records=250))
print("Jobs scheduled:")
print("- Weekly motion update: Every Monday at 02:00")
print("- Mid-week update: Every Thursday at 14:00")
print(f"- API limit per request: {config.API_MAX_LIMIT} records")
print("\n🔄 Scheduler is now running. Press Ctrl+C to stop.")
try:
while True:
schedule.run_pending()
time.sleep(3600) # Check every hour
except KeyboardInterrupt:
print("\n👋 Scheduler stopped by user.")
def run_once():
"""Run data update once and exit"""
scheduler = DataUpdateScheduler()
print("Running one-time data update...")
has_data = scheduler.check_database_has_data()
if not has_data:
print("No existing data found. Running initial data load...")
scheduler.initial_data_load()
else:
print("Updating existing data with recent motions...")
scheduler.update_motions_data(days_back=14, max_records=250)
print("One-time update completed!")
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == "--once":
run_once()
else:
scheduler = DataUpdateScheduler()
scheduler.run_scheduler()

@ -1,183 +0,0 @@
# scraper.py
import requests
from bs4 import BeautifulSoup
import time
import re
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from database import db
from config import config
class MotionScraper:
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
})
def scrape_motion_list(self, start_date: datetime = None, end_date: datetime = None) -> List[str]:
"""Scrape motion URLs from the main page"""
if not start_date:
start_date = datetime.now() - timedelta(days=730) # 2 years ago
if not end_date:
end_date = datetime.now()
motion_urls = []
page = 1
while True:
try:
url = f"{config.BASE_URL}?page={page}"
response = self.session.get(url, timeout=30)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# Find motion links (adjust selectors based on actual HTML structure)
motion_links = soup.find_all('a', href=re.compile(r'/stemmingsuitslagen/'))
if not motion_links:
break
for link in motion_links:
href = link.get('href')
if href and href not in motion_urls:
motion_urls.append(href)
page += 1
time.sleep(config.SCRAPING_DELAY)
except Exception as e:
print(f"Error scraping page {page}: {e}")
break
return motion_urls
def parse_motion_detail(self, motion_url: str) -> Optional[Dict]:
"""Parse individual motion details"""
try:
full_url = f"https://www.tweedekamer.nl{motion_url}" if motion_url.startswith('/') else motion_url
response = self.session.get(full_url, timeout=30)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# Extract motion data (adjust selectors based on actual HTML structure)
title = self._extract_title(soup)
description = self._extract_description(soup)
date = self._extract_date(soup)
policy_area = self._extract_policy_area(soup)
voting_results = self._extract_voting_results(soup)
if not all([title, voting_results]):
return None
# Calculate winning margin
total_votes = sum(1 for vote in voting_results.values() if vote in ['voor', 'tegen'])
if total_votes == 0:
return None
votes_for = sum(1 for vote in voting_results.values() if vote == 'voor')
winning_margin = abs(votes_for - (total_votes - votes_for)) / total_votes
return {
'title': title,
'description': description or '',
'date': date,
'policy_area': policy_area or 'Onbekend',
'voting_results': voting_results,
'winning_margin': winning_margin,
'url': full_url
}
except Exception as e:
print(f"Error parsing motion {motion_url}: {e}")
return None
def _extract_title(self, soup: BeautifulSoup) -> Optional[str]:
"""Extract motion title"""
# Look for common title selectors
selectors = ['h1', '.motion-title', '.title', 'h2']
for selector in selectors:
element = soup.select_one(selector)
if element:
return element.get_text(strip=True)
return None
def _extract_description(self, soup: BeautifulSoup) -> Optional[str]:
"""Extract motion description"""
# Look for description elements
selectors = ['.motion-description', '.description', '.content', 'p']
for selector in selectors:
elements = soup.select(selector)
if elements:
return ' '.join(el.get_text(strip=True) for el in elements[:3])
return None
def _extract_date(self, soup: BeautifulSoup) -> Optional[str]:
"""Extract motion date"""
# Look for date patterns
date_pattern = re.compile(r'\d{1,2}-\d{1,2}-\d{4}|\d{4}-\d{1,2}-\d{1,2}')
text = soup.get_text()
match = date_pattern.search(text)
if match:
return match.group()
return datetime.now().strftime('%Y-%m-%d')
def _extract_policy_area(self, soup: BeautifulSoup) -> Optional[str]:
"""Extract policy area/category"""
# Look for category indicators
text = soup.get_text().lower()
for area in config.POLICY_AREAS[1:]: # Skip "Alle"
if area.lower() in text:
return area
return "Algemeen"
def _extract_voting_results(self, soup: BeautifulSoup) -> Dict[str, str]:
"""Extract party voting results"""
# This is a simplified extraction - you'll need to adjust based on actual HTML
voting_results = {}
# Look for voting tables or lists
tables = soup.find_all('table')
for table in tables:
rows = table.find_all('tr')
for row in rows:
cells = row.find_all(['td', 'th'])
if len(cells) >= 2:
party = cells[0].get_text(strip=True)
vote = cells[1].get_text(strip=True).lower()
if vote in ['voor', 'tegen', 'afwezig']:
voting_results[party] = vote
# Fallback: simulate some voting data for testing
if not voting_results:
parties = ['VVD', 'PVV', 'CDA', 'D66', 'GL', 'SP', 'PvdA', 'CU', 'PvdD', 'FVD', '50PLUS', 'SGP']
import random
for party in parties:
voting_results[party] = random.choice(['voor', 'tegen', 'afwezig'])
return voting_results
def run_scraping_job(self):
"""Main scraping job"""
print("Starting motion scraping...")
motion_urls = self.scrape_motion_list()
print(f"Found {len(motion_urls)} motion URLs")
successful_scrapes = 0
for i, url in enumerate(motion_urls):
print(f"Processing motion {i+1}/{len(motion_urls)}: {url}")
motion_data = self.parse_motion_detail(url)
if motion_data:
if db.insert_motion(motion_data):
successful_scrapes += 1
time.sleep(config.SCRAPING_DELAY)
print(f"Scraping completed. Successfully scraped {successful_scrapes} motions.")
scraper = MotionScraper()

@ -1,9 +0,0 @@
import duckdb
from config import config
conn = duckdb.connect(config.DATABASE_PATH)
result = conn.execute("PRAGMA table_info('motions')").fetchall()
for row in result:
print(row)
conn.close()
Loading…
Cancel
Save