You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
motief/database.py

1366 lines
49 KiB

# database.py (final working version)
try:
import duckdb
except Exception: # pragma: no cover - environment may not have duckdb installed
duckdb = None
import json
import uuid
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from config import config
import logging
_logger = logging.getLogger(__name__)
class MotionDatabase:
def __init__(self, db_path: str = config.DATABASE_PATH):
self.db_path = db_path
# If duckdb is not available, operate in lightweight file-backed mode
self._file_mode = duckdb is None
self._init_database()
def _init_database(self):
"""Initialize database with required tables"""
# Create directory if it doesn't exist
import os
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
# If duckdb isn't available in this environment, create lightweight
# JSON-backed files to allow tests to run without the duckdb dependency.
if duckdb is None:
# create simple JSON files representing embeddings and similarity cache
emb_file = f"{self.db_path}.embeddings.json"
sim_file = f"{self.db_path}.similarity_cache.json"
for p in (emb_file, sim_file):
if not os.path.exists(p):
with open(p, "w", encoding="utf-8") as fh:
fh.write("[]")
return
conn = duckdb.connect(self.db_path)
# Create sequence for auto-incrementing IDs
try:
conn.execute("CREATE SEQUENCE IF NOT EXISTS motions_id_seq START 1")
except:
pass
# Create tables with proper ID handling
conn.execute("""
CREATE TABLE IF NOT EXISTS motions (
id INTEGER DEFAULT nextval('motions_id_seq'),
title TEXT NOT NULL,
description TEXT,
date DATE,
policy_area TEXT,
voting_results JSON,
winning_margin FLOAT,
controversy_score FLOAT,
layman_explanation TEXT,
externe_identifier TEXT,
body_text TEXT,
url TEXT UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
)
""")
# Ensure older databases get new columns added without recreating table
try:
conn.execute(
"ALTER TABLE motions ADD COLUMN IF NOT EXISTS externe_identifier TEXT"
)
conn.execute("ALTER TABLE motions ADD COLUMN IF NOT EXISTS body_text TEXT")
except Exception:
# Best-effort: if ALTER fails for any reason, continue without stopping app startup
_logger.debug(
"Could not ALTER motions table to add new columns (may already exist or unsupported)."
)
conn.execute("""
CREATE TABLE IF NOT EXISTS user_sessions (
session_id TEXT PRIMARY KEY,
user_votes JSON,
completed_motions INTEGER DEFAULT 0,
total_motions INTEGER DEFAULT 10,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS party_results (
session_id TEXT,
party_name TEXT,
agreement_percentage FLOAT,
agreed_motions JSON,
disagreed_motions JSON,
PRIMARY KEY (session_id, party_name)
)
""")
# New pipeline tables
conn.execute("""
CREATE SEQUENCE IF NOT EXISTS mp_votes_id_seq START 1
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS mp_votes (
id INTEGER DEFAULT nextval('mp_votes_id_seq'),
motion_id INTEGER NOT NULL,
mp_name TEXT NOT NULL,
party TEXT,
vote TEXT NOT NULL,
date DATE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS mp_metadata (
mp_name TEXT PRIMARY KEY,
party TEXT,
van DATE,
tot_en_met DATE,
persoon_id TEXT
)
""")
conn.execute("""
CREATE SEQUENCE IF NOT EXISTS svd_vectors_id_seq START 1
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS svd_vectors (
id INTEGER DEFAULT nextval('svd_vectors_id_seq'),
window_id TEXT NOT NULL,
entity_type TEXT NOT NULL,
entity_id TEXT NOT NULL,
vector JSON NOT NULL,
model TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
)
""")
conn.execute("""
CREATE SEQUENCE IF NOT EXISTS fused_embeddings_id_seq START 1
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS fused_embeddings (
id INTEGER DEFAULT nextval('fused_embeddings_id_seq'),
motion_id INTEGER NOT NULL,
window_id TEXT NOT NULL,
vector JSON NOT NULL,
svd_dims INTEGER NOT NULL,
text_dims INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
)
""")
# Embeddings table for raw text embeddings
conn.execute("""
CREATE SEQUENCE IF NOT EXISTS embeddings_id_seq START 1
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS embeddings (
id INTEGER DEFAULT nextval('embeddings_id_seq'),
motion_id INTEGER NOT NULL,
model TEXT,
vector JSON NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
)
""")
# Similarity cache table for precomputed neighbors
conn.execute("""
CREATE SEQUENCE IF NOT EXISTS similarity_cache_id_seq START 1
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS similarity_cache (
id INTEGER DEFAULT nextval('similarity_cache_id_seq'),
source_motion_id INTEGER NOT NULL,
target_motion_id INTEGER NOT NULL,
score FLOAT NOT NULL,
vector_type TEXT NOT NULL,
window_id TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
)
""")
conn.close()
def reset_database(self):
"""Development helper: drop known tables and re-run initialization.
WARNING: intended for dev/test only. This will remove tables and recreate schema.
"""
conn = duckdb.connect(self.db_path)
try:
# Drop known tables if they exist
for t in ("party_results", "user_sessions", "motions"):
try:
conn.execute(f"DROP TABLE IF EXISTS {t}")
except Exception:
pass
# Recreate schema
conn.close()
self._init_database()
finally:
try:
conn.close()
except Exception:
pass
def append_audit_event(
self,
actor_id: Optional[str],
action: str,
target_type: Optional[str] = None,
target_id: Optional[str] = None,
metadata: Optional[Dict] = None,
) -> bool:
"""Record an audit event.
Tries to write to an audit_events table in DuckDB. If that fails (no DB,
missing table, or any error) falls back to appending the event to
thoughts/ledgers/audit_events.json for durable inspection.
Returns True when the event was recorded somewhere, False otherwise.
"""
event_id = str(uuid.uuid4())
now = datetime.utcnow().isoformat() + "Z"
payload = {
"id": event_id,
"actor_id": actor_id,
"action": action,
"target_type": target_type,
"target_id": target_id,
"metadata": metadata or {},
"created_at": now,
}
# If duckdb is available try to write to DB first.
try:
if duckdb is not None:
conn = duckdb.connect(self.db_path)
try:
conn.execute(
"INSERT INTO audit_events (id, actor_id, action, target_type, target_id, metadata, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
(
payload["id"],
payload["actor_id"],
payload["action"],
payload["target_type"],
payload["target_id"],
json.dumps(payload["metadata"]),
payload["created_at"],
),
)
conn.close()
return True
except Exception as e:
_logger.debug("Could not write audit event to DB: %s", e)
try:
conn.close()
except Exception:
pass
# fall back to ledger file
except Exception:
_logger.debug("DuckDB unavailable when appending audit event")
# Ledger fallback: append to thoughts/ledgers/audit_events.json
try:
from pathlib import Path
ledger_dir = Path("thoughts") / "ledgers"
ledger_dir.mkdir(parents=True, exist_ok=True)
ledger_path = ledger_dir / "audit_events.json"
if ledger_path.exists():
try:
data = json.loads(ledger_path.read_text(encoding="utf-8"))
if not isinstance(data, list):
data = []
except Exception:
data = []
else:
data = []
data.append(payload)
ledger_path.write_text(
json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8"
)
return True
except Exception as e:
_logger.error("Failed to record audit event to ledger: %s", e)
return False
def insert_motion(self, motion_data: Dict) -> bool:
"""Insert a new motion into database"""
try:
conn = duckdb.connect(self.db_path)
# Check if motion already exists by URL to avoid duplicates
existing = conn.execute(
"""
SELECT COUNT(*) FROM motions WHERE url = ?
""",
(motion_data["url"],),
).fetchone()
if existing and existing[0] > 0:
conn.close()
return False # Motion already exists
# Insert motion - id will be auto-generated by sequence
conn.execute(
"""
INSERT INTO motions
(title, description, date, policy_area, voting_results,
winning_margin, controversy_score, url, externe_identifier, body_text, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
""",
(
motion_data["title"],
motion_data["description"] or "",
motion_data["date"],
motion_data["policy_area"],
json.dumps(motion_data["voting_results"]),
motion_data["winning_margin"],
1 - motion_data["winning_margin"], # controversy score
motion_data["url"],
motion_data.get("externe_identifier"),
motion_data.get("body_text"),
),
)
conn.close()
# Also insert mp_vote rows for individual MPs if party data is available.
# This only runs for brand-new motions (existing motions are rejected above),
# so there is no risk of duplicates — no existence check needed here.
mp_vote_parties = motion_data.get("mp_vote_parties", {})
voting_results_raw = motion_data.get("voting_results", {})
if mp_vote_parties:
conn2 = duckdb.connect(self.db_path)
row = conn2.execute(
"SELECT id FROM motions WHERE url = ? LIMIT 1",
(motion_data["url"],),
).fetchone()
conn2.close()
motion_id = row[0] if row else None
if motion_id is not None:
motion_date = motion_data.get("date", "")
for mp_name, party in mp_vote_parties.items():
vote = voting_results_raw.get(mp_name, "afwezig")
self.insert_mp_vote(
motion_id=motion_id,
mp_name=mp_name,
party=party,
vote=vote,
date=motion_date,
)
return True
except Exception:
_logger.exception("Error inserting motion")
try:
conn.close()
except Exception:
pass
return False
def batch_insert_motions(self, motions_data: List[Dict]) -> Tuple[int, int]:
"""Batch-insert motions and their mp_votes using a single DuckDB connection.
Returns (inserted_count, duplicate_count).
"""
if not motions_data:
return 0, 0
try:
conn = duckdb.connect(self.db_path)
# 1. Find which URLs already exist — single query
urls = [m["url"] for m in motions_data]
placeholders = ", ".join("?" * len(urls))
existing_urls = set(
row[0]
for row in conn.execute(
f"SELECT url FROM motions WHERE url IN ({placeholders})", urls
).fetchall()
)
new_motions = [m for m in motions_data if m["url"] not in existing_urls]
duplicates = len(motions_data) - len(new_motions)
if not new_motions:
conn.close()
return 0, duplicates
# 2. Bulk-insert motions
motion_rows = [
(
m["title"],
m["description"] or "",
m["date"],
m["policy_area"],
json.dumps(m["voting_results"]),
m["winning_margin"],
1 - m["winning_margin"],
m["url"],
m.get("externe_identifier"),
m.get("body_text"),
)
for m in new_motions
]
conn.executemany(
"""
INSERT INTO motions
(title, description, date, policy_area, voting_results,
winning_margin, controversy_score, url, externe_identifier,
body_text, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
""",
motion_rows,
)
# 3. Fetch the newly-assigned IDs in one query
new_urls = [m["url"] for m in new_motions]
np = ", ".join("?" * len(new_urls))
url_to_id = {
row[1]: row[0]
for row in conn.execute(
f"SELECT id, url FROM motions WHERE url IN ({np})", new_urls
).fetchall()
}
# 4. Bulk-insert mp_votes
vote_rows = []
for m in new_motions:
motion_id = url_to_id.get(m["url"])
if motion_id is None:
continue
mp_vote_parties = m.get("mp_vote_parties", {})
voting_results_raw = m.get("voting_results", {})
motion_date = m.get("date", "")
for mp_name, party in mp_vote_parties.items():
vote = voting_results_raw.get(mp_name, "afwezig")
vote_rows.append((motion_id, mp_name, party, vote, motion_date))
if vote_rows:
conn.executemany(
"""
INSERT INTO mp_votes (motion_id, mp_name, party, vote, date, created_at)
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
""",
vote_rows,
)
conn.close()
return len(new_motions), duplicates
except Exception as e:
_logger.error(f"Error in batch_insert_motions: {e}")
try:
conn.close()
except Exception:
pass
raise
def get_filtered_motions(
self,
policy_area: str = "Alle",
min_margin: float = 0.2,
max_margin: float = 0.8,
limit: int = 100,
) -> List[Dict]:
"""Get motions filtered by criteria"""
conn = duckdb.connect(self.db_path)
query = """
SELECT * FROM motions
WHERE winning_margin BETWEEN ? AND ?
AND layman_explanation IS NOT NULL
AND layman_explanation != ''
"""
params = [min_margin, max_margin]
if policy_area != "Alle":
query += " AND policy_area = ?"
params.append(policy_area)
query += " ORDER BY controversy_score DESC LIMIT ?"
params.append(limit)
try:
result = conn.execute(query, params).fetchall()
columns = [desc[0] for desc in conn.description]
conn.close()
return [dict(zip(columns, row)) for row in result]
except Exception:
_logger.exception("Error querying motions")
try:
conn.close()
except Exception:
pass
return []
def get_titles_for_ids(self, ids: List[int]) -> Dict[int, Optional[str]]:
"""Return a mapping of motion id -> title for the given ids.
If DuckDB is not available, fall back to an empty mapping.
"""
out: Dict[int, Optional[str]] = {}
try:
if duckdb is None:
return out
conn = duckdb.connect(self.db_path)
placeholders = ",".join("?" for _ in ids)
rows = conn.execute(
f"SELECT id, title FROM motions WHERE id IN ({placeholders})", ids
).fetchall()
conn.close()
for r in rows:
try:
out[int(r[0])] = r[1]
except Exception:
continue
return out
except Exception:
try:
conn.close()
except Exception:
pass
_logger.exception("Error fetching titles for ids")
return out
def create_session(self, total_motions: int = 10) -> str:
"""Create new user session"""
session_id = str(uuid.uuid4())
conn = duckdb.connect(self.db_path)
conn.execute(
"""
INSERT INTO user_sessions (session_id, user_votes, total_motions)
VALUES (?, '{}', ?)
""",
(session_id, total_motions),
)
conn.close()
return session_id
def update_user_vote(self, session_id: str, motion_id: int, vote: str):
"""Update user vote for a motion"""
conn = duckdb.connect(self.db_path)
# Get current votes
current_votes = conn.execute(
"""
SELECT user_votes FROM user_sessions WHERE session_id = ?
""",
(session_id,),
).fetchone()
if current_votes:
votes_dict = json.loads(current_votes[0])
votes_dict[str(motion_id)] = vote
conn.execute(
"""
UPDATE user_sessions
SET user_votes = ?,
completed_motions = ?,
last_updated = CURRENT_TIMESTAMP
WHERE session_id = ?
""",
(json.dumps(votes_dict), len(votes_dict), session_id),
)
conn.close()
def calculate_party_matches(self, session_id: str) -> List[Dict]:
"""Calculate party agreement percentages"""
conn = duckdb.connect(self.db_path)
# Get user votes and motion data
user_data = conn.execute(
"""
SELECT user_votes FROM user_sessions WHERE session_id = ?
""",
(session_id,),
).fetchone()
if not user_data:
return []
user_votes = json.loads(user_data[0])
motion_ids = list(user_votes.keys())
if not motion_ids:
return []
# Get motion voting results
placeholders = ",".join(["?" for _ in motion_ids])
motions = conn.execute(
f"""
SELECT id, voting_results FROM motions
WHERE id IN ({placeholders})
""",
motion_ids,
).fetchall()
conn.close()
# Calculate agreements
party_scores = {}
for motion_id, voting_results_json in motions:
voting_results = json.loads(voting_results_json)
user_vote = user_votes[str(motion_id)]
if user_vote == "Geen stem": # Skip abstentions
continue
for party, party_vote in voting_results.items():
# Skip individual MP names (contain comma, e.g. "Yesilgöz-Zegerius, D.")
# Party/fractie names never contain a comma.
if "," in party:
continue
if party not in party_scores:
party_scores[party] = {"agreed": 0, "total": 0}
party_scores[party]["total"] += 1
# Check agreement
if (user_vote == "Voor" and party_vote == "voor") or (
user_vote == "Tegen" and party_vote == "tegen"
):
party_scores[party]["agreed"] += 1
# Convert to percentages and sort
results = []
for party, scores in party_scores.items():
if scores["total"] > 0:
agreement_pct = (scores["agreed"] / scores["total"]) * 100
results.append(
{
"party": party,
"agreement_percentage": round(agreement_pct, 1),
"agreed_motions": scores["agreed"],
"total_motions": scores["total"],
}
)
return sorted(results, key=lambda x: x["agreement_percentage"], reverse=True)
def get_motions_with_individual_votes(self, k: int = 20) -> List[int]:
"""Return up to k motion IDs that have individual MP vote records.
Selects motions where at least one mp_name contains a comma (i.e.
individual MPs in 'Lastname, F.' format), ordered by controversy_score
descending so the most discriminating motions come first.
Args:
k: maximum number of motion IDs to return.
Returns:
List of motion IDs (ints), sorted by controversy_score DESC.
"""
if duckdb is None:
return []
try:
conn = duckdb.connect(self.db_path, read_only=True)
rows = conn.execute(
"""
SELECT DISTINCT mv.motion_id, m.controversy_score
FROM mp_votes mv
JOIN motions m ON mv.motion_id = m.id
WHERE mv.mp_name LIKE '%,%'
ORDER BY m.controversy_score DESC, mv.motion_id ASC
LIMIT ?
""",
(int(k),),
).fetchall()
conn.close()
return [int(r[0]) for r in rows]
except Exception:
_logger.exception("Error in get_motions_with_individual_votes")
return []
def match_mps_for_votes(
self, user_votes: Dict[int, str], limit: int = 50
) -> List[Dict]:
"""Return per-MP agreement against provided user_votes.
Args:
user_votes: mapping motion_id -> vote token (UI or canonical).
limit: max number of MPs to return.
Returns:
List of dicts: {mp_name, party, matched, overlap, agreement_pct}
Notes:
- Normalizes common UI tokens to canonical DB tokens: 'voor','tegen','onthouden','afwezig'.
- Excludes MPs with overlap == 0.
- Requires DuckDB to be available (raises RuntimeError otherwise).
"""
if not user_votes:
raise ValueError("user_votes must be a non-empty dict")
# Normalization mapping (UI variants -> canonical)
def _norm(v: str) -> Optional[str]:
if v is None:
return None
s = str(v).strip()
if not s:
return None
s_low = s.lower()
if s_low in ("voor", "v", "yes"):
return "voor"
if s_low in ("tegen", "t", "no"):
return "tegen"
if s_low in ("onthouden", "abstain", "abstained"):
return "onthouden"
if s_low in ("geen stem", "no vote"):
return None # user chose not to answer — skip entirely
if s_low in ("afwezig", "absent"):
return "afwezig"
# already canonical?
if s_low in ("voor", "tegen", "onthouden", "afwezig"):
return s_low
# fallback: try Dutch keywords
if "voor" in s_low:
return "voor"
if "tegen" in s_low:
return "tegen"
if "onthouden" in s_low:
return "onthouden"
if "afwezig" in s_low:
return "afwezig"
return None
# Build normalized mapping and DataFrame for DuckDB
import pandas as pd
rows = []
for mid, v in user_votes.items():
try:
mid_i = int(mid)
except Exception:
raise ValueError(f"motion id must be integer-like: {mid}")
nv = _norm(v)
if nv is None:
# treat as abstain/skip (do not include in user votes)
continue
rows.append({"motion_id": mid_i, "user_vote": nv})
if not rows:
raise ValueError("After normalization no valid user votes remain")
if duckdb is None:
raise RuntimeError("DuckDB is required for match_mps_for_votes")
conn = duckdb.connect(self.db_path)
try:
uv = pd.DataFrame(rows)
# register as temporary relation
conn.register("_user_votes", uv)
q = (
"SELECT mp.mp_name, mp.party, "
"SUM(CASE WHEN lower(mp.vote)=_user_votes.user_vote THEN 1 ELSE 0 END) AS matched, "
"COUNT(*) AS overlap "
"FROM mp_votes mp JOIN _user_votes ON mp.motion_id = _user_votes.motion_id "
"WHERE mp.mp_name LIKE '%,%' "
"GROUP BY mp.mp_name, mp.party "
"HAVING COUNT(*) > 0 "
"ORDER BY (matched*1.0/overlap) DESC, matched DESC, mp.mp_name ASC "
f"LIMIT {int(limit)}"
)
rows_out = conn.execute(q).fetchall()
# columns: mp_name, party, matched, overlap
results = []
for r in rows_out:
try:
matched = int(r[2])
overlap = int(r[3])
pct = round((matched / overlap) * 100.0, 1) if overlap else 0.0
except Exception:
matched = int(r[2]) if r[2] is not None else 0
overlap = int(r[3]) if r[3] is not None else 0
pct = 0.0
results.append(
{
"mp_name": r[0],
"party": r[1],
"matched": matched,
"overlap": overlap,
"agreement_pct": pct,
}
)
conn.unregister("_user_votes")
conn.close()
return results
except Exception:
try:
conn.close()
except Exception:
pass
_logger.exception("Error in match_mps_for_votes")
return []
def choose_discriminating_motions(
self, candidates: List[str], excluded_motion_ids: List[int], k: int = 1
) -> List[int]:
"""Return top-k motion ids that best split the candidate MPs.
Scoring: Shannon entropy over vote distribution among candidate MPs for each motion.
Ties broken by higher controversy_score then lower motion id.
"""
if not candidates:
raise ValueError("candidates must be non-empty")
if duckdb is None:
raise RuntimeError("DuckDB is required for choose_discriminating_motions")
conn = duckdb.connect(self.db_path)
try:
# Prepare candidate names as a temp table
import pandas as pd
cand_df = pd.DataFrame({"mp_name": candidates})
conn.register("_candidates", cand_df)
# Build excluded list SQL fragment
excl_clause = ""
params = []
if excluded_motion_ids:
excl_clause = (
"AND mp.motion_id NOT IN ("
+ ",".join(str(int(x)) for x in excluded_motion_ids)
+ ")"
)
# Aggregate counts per motion by vote token
q = f"""
SELECT
m.id as motion_id,
m.controversy_score,
SUM(CASE WHEN lower(mp.vote) = 'voor' THEN 1 ELSE 0 END) as cnt_voor,
SUM(CASE WHEN lower(mp.vote) = 'tegen' THEN 1 ELSE 0 END) as cnt_tegen,
SUM(CASE WHEN lower(mp.vote) = 'onthouden' THEN 1 ELSE 0 END) as cnt_onthouden,
SUM(CASE WHEN lower(mp.vote) = 'afwezig' THEN 1 ELSE 0 END) as cnt_afwezig,
COUNT(*) as total_votes
FROM mp_votes mp
JOIN _candidates c ON mp.mp_name = c.mp_name
JOIN motions m ON m.id = mp.motion_id
WHERE 1=1
{excl_clause}
GROUP BY m.id, m.controversy_score
HAVING COUNT(*) > 0
"""
rows = conn.execute(q).fetchall()
conn.unregister("_candidates")
conn.close()
if not rows:
return []
import math
scored = []
for r in rows:
motion_id = int(r[0])
controversy = r[1] or 0.0
counts = [int(r[2]), int(r[3]), int(r[4]), int(r[5])]
total = int(r[6])
if total <= 0:
entropy = 0.0
else:
entropy = 0.0
for c in counts:
if c <= 0:
continue
p = c / total
entropy -= p * math.log2(p)
scored.append((motion_id, entropy, controversy))
# sort by entropy desc, controversy desc, motion_id asc
scored.sort(key=lambda x: (-x[1], -x[2], x[0]))
return [m[0] for m in scored[: int(k)]]
except Exception:
try:
conn.close()
except Exception:
pass
_logger.exception("Error in choose_discriminating_motions")
return []
def store_embedding(self, motion_id: int, model: str, vector: List[float]) -> int:
"""Store an embedding for a motion. Returns inserted row id or -1 on failure."""
try:
conn = duckdb.connect(self.db_path)
# Use explicit nextval for id since older tables may lack DEFAULT
conn.execute(
"INSERT INTO embeddings (id, motion_id, model, vector, created_at) VALUES (nextval('embeddings_id_seq'), ?, ?, ?, CURRENT_TIMESTAMP)",
(motion_id, model, json.dumps(vector)),
)
row = conn.execute("SELECT currval('embeddings_id_seq')").fetchone()
conn.close()
if row and row[0] is not None:
return int(row[0])
return -1
except Exception as e:
_logger.error("Error storing embedding: %s", e)
try:
conn.close()
except Exception:
pass
return -1
def search_similar(
self, query_vector: List[float], top_k: int = 5, model: Optional[str] = None
) -> List[Dict]:
"""Naive in-Python cosine similarity search over stored embeddings.
Returns list of dicts with keys: id, motion_id, model, score, created_at
"""
try:
conn = duckdb.connect(self.db_path)
if model:
rows = conn.execute(
"SELECT id, motion_id, model, vector, created_at FROM embeddings WHERE model = ?",
(model,),
).fetchall()
else:
rows = conn.execute(
"SELECT id, motion_id, model, vector, created_at FROM embeddings"
).fetchall()
conn.close()
results = []
import math
for r in rows:
id_, motion_id, mdl, vector_json, created_at = r
try:
vec = json.loads(vector_json)
except Exception:
continue
# cosine similarity
try:
dot = sum(float(a) * float(b) for a, b in zip(query_vector, vec))
na = math.sqrt(sum(float(a) * float(a) for a in query_vector))
nb = math.sqrt(sum(float(b) * float(b) for b in vec))
score = dot / (na * nb) if na and nb else 0.0
except Exception:
score = 0.0
results.append(
{
"id": id_,
"motion_id": motion_id,
"model": mdl,
"score": score,
"created_at": created_at,
}
)
results.sort(key=lambda x: x["score"], reverse=True)
return results[:top_k]
except Exception:
_logger.exception("Error searching embeddings")
try:
conn.close()
except Exception:
pass
return []
def mp_votes_exists_for_motion(self, motion_id: int) -> bool:
try:
conn = duckdb.connect(self.db_path)
row = conn.execute(
"SELECT COUNT(*) FROM mp_votes WHERE motion_id = ?",
(motion_id,),
).fetchone()
conn.close()
return bool(row and row[0] > 0)
except Exception as e:
_logger.error(f"Error checking mp_votes existence: {e}")
try:
conn.close()
except Exception:
pass
return False
def insert_mp_vote(
self,
motion_id: int,
mp_name: str,
vote: str,
date: Optional[str] = None,
party: Optional[str] = None,
) -> int:
try:
conn = duckdb.connect(self.db_path)
conn.execute(
"""
INSERT INTO mp_votes (motion_id, mp_name, party, vote, date, created_at)
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
""",
(motion_id, mp_name, party, vote, date),
)
row = conn.execute("SELECT max(id) FROM mp_votes").fetchone()
conn.close()
if row and row[0] is not None:
return int(row[0])
return -1
except Exception as e:
_logger.error(f"Error inserting mp_vote: {e}")
try:
conn.close()
except Exception:
pass
return -1
def upsert_mp_metadata(
self,
mp_name: str,
party: Optional[str],
van: Optional[str],
tot_en_met: Optional[str],
persoon_id: Optional[str],
) -> None:
try:
conn = duckdb.connect(self.db_path)
exists = conn.execute(
"SELECT COUNT(*) FROM mp_metadata WHERE mp_name = ?", (mp_name,)
).fetchone()
if exists and exists[0] > 0:
# Only update if this record is newer (higher Van date) than the stored one,
# preferring active memberships (TotEnMet IS NULL) over ended ones.
conn.execute(
"""
UPDATE mp_metadata SET party = ?, van = ?, tot_en_met = ?, persoon_id = ?
WHERE mp_name = ?
AND (
-- prefer active over ended
(? IS NULL AND tot_en_met IS NOT NULL)
-- or same active status but newer start date
OR (? IS NULL AND tot_en_met IS NULL AND CAST(? AS DATE) > CAST(van AS DATE))
OR (? IS NOT NULL AND tot_en_met IS NOT NULL AND CAST(? AS DATE) > CAST(van AS DATE))
)
""",
(
party,
van,
tot_en_met,
persoon_id,
mp_name,
tot_en_met, # prefer active
tot_en_met,
van, # both active, newer
tot_en_met,
van,
), # both ended, newer
)
else:
conn.execute(
"""
INSERT INTO mp_metadata (mp_name, party, van, tot_en_met, persoon_id)
VALUES (?, ?, ?, ?, ?)
""",
(mp_name, party, van, tot_en_met, persoon_id),
)
conn.close()
except Exception as e:
_logger.error(f"Error upserting mp_metadata: {e}")
try:
conn.close()
except Exception:
pass
def store_svd_vector(
self,
window_id: str,
entity_type: str,
entity_id: str,
vector: List[float],
model: Optional[str] = None,
) -> int:
try:
conn = duckdb.connect(self.db_path)
conn.execute(
"""
INSERT INTO svd_vectors (window_id, entity_type, entity_id, vector, model, created_at)
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
""",
(window_id, entity_type, entity_id, json.dumps(vector), model),
)
row = conn.execute("SELECT max(id) FROM svd_vectors").fetchone()
conn.close()
if row and row[0] is not None:
return int(row[0])
return -1
except Exception as e:
_logger.error(f"Error storing svd_vector: {e}")
try:
conn.close()
except Exception:
pass
return -1
def batch_store_svd_vectors(
self,
window_id: str,
rows: List[Tuple], # each: (entity_type, entity_id, vector_list, model_or_None)
) -> int:
"""Batch-upsert SVD vectors for a window using a single connection.
Deletes all existing rows for the window first, then inserts the new batch.
Returns number of rows inserted.
"""
if not rows:
return 0
try:
conn = duckdb.connect(self.db_path)
conn.execute("DELETE FROM svd_vectors WHERE window_id = ?", (window_id,))
insert_rows = [
(window_id, entity_type, entity_id, json.dumps(vector), model)
for entity_type, entity_id, vector, model in rows
]
conn.executemany(
"""
INSERT INTO svd_vectors
(window_id, entity_type, entity_id, vector, model, created_at)
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
""",
insert_rows,
)
conn.close()
return len(insert_rows)
except Exception as e:
_logger.error(f"Error in batch_store_svd_vectors: {e}")
try:
conn.close()
except Exception:
pass
raise
def store_fused_embedding(
self,
motion_id: int,
window_id: str,
vector: List[float],
svd_dims: int,
text_dims: int,
) -> int:
try:
conn = duckdb.connect(self.db_path)
# Delete any existing row for this (motion_id, window_id) to prevent duplicates
conn.execute(
"DELETE FROM fused_embeddings WHERE motion_id = ? AND window_id = ?",
(motion_id, window_id),
)
conn.execute(
"""
INSERT INTO fused_embeddings (motion_id, window_id, vector, svd_dims, text_dims, created_at)
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
""",
(motion_id, window_id, json.dumps(vector), svd_dims, text_dims),
)
row = conn.execute("SELECT max(id) FROM fused_embeddings").fetchone()
conn.close()
if row and row[0] is not None:
return int(row[0])
return -1
except Exception as e:
_logger.error(f"Error storing fused_embedding: {e}")
try:
conn.close()
except Exception:
pass
return -1
def store_similarity_batch(self, rows: List[Dict]) -> int:
"""Insert multiple similarity_cache rows. Returns number inserted."""
if not rows:
return 0
inserted = 0
# File-backed fallback when duckdb is not available
if duckdb is None:
sim_file = f"{self.db_path}.similarity_cache.json"
try:
with open(sim_file, "r+", encoding="utf-8") as fh:
data = json.load(fh)
# assign incremental ids
max_id = max((item.get("id", 0) for item in data), default=0)
for r in rows:
max_id += 1
entry = {
"id": max_id,
"source_motion_id": int(r["source_motion_id"]),
"target_motion_id": int(r["target_motion_id"]),
"score": float(r["score"]),
"vector_type": r["vector_type"],
"window_id": r.get("window_id"),
}
data.append(entry)
inserted += 1
fh.seek(0)
json.dump(data, fh)
fh.truncate()
return inserted
except Exception as e:
_logger.error(f"Error writing similarity cache file: {e}")
return inserted
try:
conn = duckdb.connect(self.db_path)
for r in rows:
try:
conn.execute(
"""
INSERT INTO similarity_cache (source_motion_id, target_motion_id, score, vector_type, window_id, created_at)
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
""",
(
r["source_motion_id"],
r["target_motion_id"],
float(r["score"]),
r["vector_type"],
r.get("window_id"),
),
)
inserted += 1
except Exception as e:
_logger.error(f"Error inserting similarity row {r}: {e}")
conn.close()
return inserted
except Exception as e:
_logger.error(f"Error in store_similarity_batch: {e}")
try:
conn.close()
except Exception:
pass
return inserted
def get_cached_similarities(
self,
source_motion_id: int,
vector_type: str,
window_id: Optional[str] = None,
top_k: int = 10,
) -> List[Dict]:
"""Retrieve cached similarities for a source motion.
Returns list of dicts with keys: target_motion_id, score, created_at, id
"""
# File-backed fallback
if duckdb is None:
sim_file = f"{self.db_path}.similarity_cache.json"
try:
with open(sim_file, "r", encoding="utf-8") as fh:
data = json.load(fh)
rows = [
r
for r in data
if int(r.get("source_motion_id")) == int(source_motion_id)
and r.get("vector_type") == vector_type
and (window_id is None or r.get("window_id") == window_id)
]
# sort by score desc
rows.sort(key=lambda x: float(x.get("score", 0)), reverse=True)
return rows[:top_k]
except Exception as e:
_logger.error(f"Error reading similarity cache file: {e}")
return []
try:
conn = duckdb.connect(self.db_path)
params = [source_motion_id, vector_type]
query = (
"SELECT id, target_motion_id, score, created_at FROM similarity_cache"
" WHERE source_motion_id = ? AND vector_type = ?"
)
if window_id is not None:
query += " AND window_id = ?"
params.append(window_id)
query += " ORDER BY score DESC LIMIT ?"
params.append(top_k)
rows = conn.execute(query, params).fetchall()
columns = [desc[0] for desc in conn.description]
conn.close()
return [dict(zip(columns, row)) for row in rows]
except Exception as e:
_logger.error(f"Error fetching cached similarities: {e}")
try:
conn.close()
except Exception:
pass
return []
def clear_similarity_cache(
self, vector_type: str, window_id: Optional[str] = None
) -> int:
"""Delete cached similarity rows matching vector_type and optional window_id. Returns count deleted."""
try:
# File-backed fallback
if duckdb is None:
sim_file = f"{self.db_path}.similarity_cache.json"
try:
with open(sim_file, "r+", encoding="utf-8") as fh:
data = json.load(fh)
before = len(data)
data = [
r
for r in data
if not (
r.get("vector_type") == vector_type
and (
window_id is None or r.get("window_id") == window_id
)
)
]
deleted = before - len(data)
fh.seek(0)
json.dump(data, fh)
fh.truncate()
return deleted
except Exception as e:
_logger.error(f"Error clearing similarity cache file: {e}")
return 0
conn = duckdb.connect(self.db_path)
params = [vector_type]
count_q = "SELECT COUNT(*) FROM similarity_cache WHERE vector_type = ?"
del_q = "DELETE FROM similarity_cache WHERE vector_type = ?"
if window_id is not None:
count_q += " AND window_id = ?"
del_q += " AND window_id = ?"
params.append(window_id)
row = conn.execute(count_q, params).fetchone()
to_delete = int(row[0]) if row and row[0] is not None else 0
if to_delete > 0:
conn.execute(del_q, params)
conn.close()
return to_delete
except Exception as e:
_logger.error(f"Error clearing similarity_cache: {e}")
try:
conn.close()
except Exception:
pass
return 0
db = MotionDatabase()