You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1366 lines
49 KiB
1366 lines
49 KiB
# database.py (final working version)
|
|
try:
|
|
import duckdb
|
|
except Exception: # pragma: no cover - environment may not have duckdb installed
|
|
duckdb = None
|
|
import json
|
|
import uuid
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Tuple
|
|
from config import config
|
|
import logging
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MotionDatabase:
|
|
def __init__(self, db_path: str = config.DATABASE_PATH):
|
|
self.db_path = db_path
|
|
# If duckdb is not available, operate in lightweight file-backed mode
|
|
self._file_mode = duckdb is None
|
|
self._init_database()
|
|
|
|
def _init_database(self):
|
|
"""Initialize database with required tables"""
|
|
# Create directory if it doesn't exist
|
|
import os
|
|
|
|
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
|
|
|
|
# If duckdb isn't available in this environment, create lightweight
|
|
# JSON-backed files to allow tests to run without the duckdb dependency.
|
|
if duckdb is None:
|
|
# create simple JSON files representing embeddings and similarity cache
|
|
emb_file = f"{self.db_path}.embeddings.json"
|
|
sim_file = f"{self.db_path}.similarity_cache.json"
|
|
for p in (emb_file, sim_file):
|
|
if not os.path.exists(p):
|
|
with open(p, "w", encoding="utf-8") as fh:
|
|
fh.write("[]")
|
|
return
|
|
|
|
conn = duckdb.connect(self.db_path)
|
|
|
|
# Create sequence for auto-incrementing IDs
|
|
try:
|
|
conn.execute("CREATE SEQUENCE IF NOT EXISTS motions_id_seq START 1")
|
|
except:
|
|
pass
|
|
|
|
# Create tables with proper ID handling
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS motions (
|
|
id INTEGER DEFAULT nextval('motions_id_seq'),
|
|
title TEXT NOT NULL,
|
|
description TEXT,
|
|
date DATE,
|
|
policy_area TEXT,
|
|
voting_results JSON,
|
|
winning_margin FLOAT,
|
|
controversy_score FLOAT,
|
|
layman_explanation TEXT,
|
|
externe_identifier TEXT,
|
|
body_text TEXT,
|
|
url TEXT UNIQUE,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
PRIMARY KEY (id)
|
|
)
|
|
""")
|
|
# Ensure older databases get new columns added without recreating table
|
|
try:
|
|
conn.execute(
|
|
"ALTER TABLE motions ADD COLUMN IF NOT EXISTS externe_identifier TEXT"
|
|
)
|
|
conn.execute("ALTER TABLE motions ADD COLUMN IF NOT EXISTS body_text TEXT")
|
|
except Exception:
|
|
# Best-effort: if ALTER fails for any reason, continue without stopping app startup
|
|
_logger.debug(
|
|
"Could not ALTER motions table to add new columns (may already exist or unsupported)."
|
|
)
|
|
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS user_sessions (
|
|
session_id TEXT PRIMARY KEY,
|
|
user_votes JSON,
|
|
completed_motions INTEGER DEFAULT 0,
|
|
total_motions INTEGER DEFAULT 10,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS party_results (
|
|
session_id TEXT,
|
|
party_name TEXT,
|
|
agreement_percentage FLOAT,
|
|
agreed_motions JSON,
|
|
disagreed_motions JSON,
|
|
PRIMARY KEY (session_id, party_name)
|
|
)
|
|
""")
|
|
|
|
# New pipeline tables
|
|
conn.execute("""
|
|
CREATE SEQUENCE IF NOT EXISTS mp_votes_id_seq START 1
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS mp_votes (
|
|
id INTEGER DEFAULT nextval('mp_votes_id_seq'),
|
|
motion_id INTEGER NOT NULL,
|
|
mp_name TEXT NOT NULL,
|
|
party TEXT,
|
|
vote TEXT NOT NULL,
|
|
date DATE,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
PRIMARY KEY (id)
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS mp_metadata (
|
|
mp_name TEXT PRIMARY KEY,
|
|
party TEXT,
|
|
van DATE,
|
|
tot_en_met DATE,
|
|
persoon_id TEXT
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE SEQUENCE IF NOT EXISTS svd_vectors_id_seq START 1
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS svd_vectors (
|
|
id INTEGER DEFAULT nextval('svd_vectors_id_seq'),
|
|
window_id TEXT NOT NULL,
|
|
entity_type TEXT NOT NULL,
|
|
entity_id TEXT NOT NULL,
|
|
vector JSON NOT NULL,
|
|
model TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
PRIMARY KEY (id)
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE SEQUENCE IF NOT EXISTS fused_embeddings_id_seq START 1
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS fused_embeddings (
|
|
id INTEGER DEFAULT nextval('fused_embeddings_id_seq'),
|
|
motion_id INTEGER NOT NULL,
|
|
window_id TEXT NOT NULL,
|
|
vector JSON NOT NULL,
|
|
svd_dims INTEGER NOT NULL,
|
|
text_dims INTEGER NOT NULL,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
PRIMARY KEY (id)
|
|
)
|
|
""")
|
|
|
|
# Embeddings table for raw text embeddings
|
|
conn.execute("""
|
|
CREATE SEQUENCE IF NOT EXISTS embeddings_id_seq START 1
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS embeddings (
|
|
id INTEGER DEFAULT nextval('embeddings_id_seq'),
|
|
motion_id INTEGER NOT NULL,
|
|
model TEXT,
|
|
vector JSON NOT NULL,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
PRIMARY KEY (id)
|
|
)
|
|
""")
|
|
|
|
# Similarity cache table for precomputed neighbors
|
|
conn.execute("""
|
|
CREATE SEQUENCE IF NOT EXISTS similarity_cache_id_seq START 1
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS similarity_cache (
|
|
id INTEGER DEFAULT nextval('similarity_cache_id_seq'),
|
|
source_motion_id INTEGER NOT NULL,
|
|
target_motion_id INTEGER NOT NULL,
|
|
score FLOAT NOT NULL,
|
|
vector_type TEXT NOT NULL,
|
|
window_id TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
PRIMARY KEY (id)
|
|
)
|
|
""")
|
|
|
|
conn.close()
|
|
|
|
def reset_database(self):
|
|
"""Development helper: drop known tables and re-run initialization.
|
|
|
|
WARNING: intended for dev/test only. This will remove tables and recreate schema.
|
|
"""
|
|
conn = duckdb.connect(self.db_path)
|
|
try:
|
|
# Drop known tables if they exist
|
|
for t in ("party_results", "user_sessions", "motions"):
|
|
try:
|
|
conn.execute(f"DROP TABLE IF EXISTS {t}")
|
|
except Exception:
|
|
pass
|
|
# Recreate schema
|
|
conn.close()
|
|
self._init_database()
|
|
finally:
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
|
|
def append_audit_event(
|
|
self,
|
|
actor_id: Optional[str],
|
|
action: str,
|
|
target_type: Optional[str] = None,
|
|
target_id: Optional[str] = None,
|
|
metadata: Optional[Dict] = None,
|
|
) -> bool:
|
|
"""Record an audit event.
|
|
|
|
Tries to write to an audit_events table in DuckDB. If that fails (no DB,
|
|
missing table, or any error) falls back to appending the event to
|
|
thoughts/ledgers/audit_events.json for durable inspection.
|
|
|
|
Returns True when the event was recorded somewhere, False otherwise.
|
|
"""
|
|
event_id = str(uuid.uuid4())
|
|
now = datetime.utcnow().isoformat() + "Z"
|
|
payload = {
|
|
"id": event_id,
|
|
"actor_id": actor_id,
|
|
"action": action,
|
|
"target_type": target_type,
|
|
"target_id": target_id,
|
|
"metadata": metadata or {},
|
|
"created_at": now,
|
|
}
|
|
|
|
# If duckdb is available try to write to DB first.
|
|
try:
|
|
if duckdb is not None:
|
|
conn = duckdb.connect(self.db_path)
|
|
try:
|
|
conn.execute(
|
|
"INSERT INTO audit_events (id, actor_id, action, target_type, target_id, metadata, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
|
(
|
|
payload["id"],
|
|
payload["actor_id"],
|
|
payload["action"],
|
|
payload["target_type"],
|
|
payload["target_id"],
|
|
json.dumps(payload["metadata"]),
|
|
payload["created_at"],
|
|
),
|
|
)
|
|
conn.close()
|
|
return True
|
|
except Exception as e:
|
|
_logger.debug("Could not write audit event to DB: %s", e)
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
# fall back to ledger file
|
|
except Exception:
|
|
_logger.debug("DuckDB unavailable when appending audit event")
|
|
|
|
# Ledger fallback: append to thoughts/ledgers/audit_events.json
|
|
try:
|
|
from pathlib import Path
|
|
|
|
ledger_dir = Path("thoughts") / "ledgers"
|
|
ledger_dir.mkdir(parents=True, exist_ok=True)
|
|
ledger_path = ledger_dir / "audit_events.json"
|
|
if ledger_path.exists():
|
|
try:
|
|
data = json.loads(ledger_path.read_text(encoding="utf-8"))
|
|
if not isinstance(data, list):
|
|
data = []
|
|
except Exception:
|
|
data = []
|
|
else:
|
|
data = []
|
|
|
|
data.append(payload)
|
|
ledger_path.write_text(
|
|
json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8"
|
|
)
|
|
return True
|
|
except Exception as e:
|
|
_logger.error("Failed to record audit event to ledger: %s", e)
|
|
return False
|
|
|
|
def insert_motion(self, motion_data: Dict) -> bool:
|
|
"""Insert a new motion into database"""
|
|
try:
|
|
conn = duckdb.connect(self.db_path)
|
|
|
|
# Check if motion already exists by URL to avoid duplicates
|
|
existing = conn.execute(
|
|
"""
|
|
SELECT COUNT(*) FROM motions WHERE url = ?
|
|
""",
|
|
(motion_data["url"],),
|
|
).fetchone()
|
|
|
|
if existing and existing[0] > 0:
|
|
conn.close()
|
|
return False # Motion already exists
|
|
|
|
# Insert motion - id will be auto-generated by sequence
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO motions
|
|
(title, description, date, policy_area, voting_results,
|
|
winning_margin, controversy_score, url, externe_identifier, body_text, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
|
|
""",
|
|
(
|
|
motion_data["title"],
|
|
motion_data["description"] or "",
|
|
motion_data["date"],
|
|
motion_data["policy_area"],
|
|
json.dumps(motion_data["voting_results"]),
|
|
motion_data["winning_margin"],
|
|
1 - motion_data["winning_margin"], # controversy score
|
|
motion_data["url"],
|
|
motion_data.get("externe_identifier"),
|
|
motion_data.get("body_text"),
|
|
),
|
|
)
|
|
|
|
conn.close()
|
|
|
|
# Also insert mp_vote rows for individual MPs if party data is available.
|
|
# This only runs for brand-new motions (existing motions are rejected above),
|
|
# so there is no risk of duplicates — no existence check needed here.
|
|
mp_vote_parties = motion_data.get("mp_vote_parties", {})
|
|
voting_results_raw = motion_data.get("voting_results", {})
|
|
if mp_vote_parties:
|
|
conn2 = duckdb.connect(self.db_path)
|
|
row = conn2.execute(
|
|
"SELECT id FROM motions WHERE url = ? LIMIT 1",
|
|
(motion_data["url"],),
|
|
).fetchone()
|
|
conn2.close()
|
|
motion_id = row[0] if row else None
|
|
|
|
if motion_id is not None:
|
|
motion_date = motion_data.get("date", "")
|
|
for mp_name, party in mp_vote_parties.items():
|
|
vote = voting_results_raw.get(mp_name, "afwezig")
|
|
self.insert_mp_vote(
|
|
motion_id=motion_id,
|
|
mp_name=mp_name,
|
|
party=party,
|
|
vote=vote,
|
|
date=motion_date,
|
|
)
|
|
|
|
return True
|
|
|
|
except Exception:
|
|
_logger.exception("Error inserting motion")
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
def batch_insert_motions(self, motions_data: List[Dict]) -> Tuple[int, int]:
|
|
"""Batch-insert motions and their mp_votes using a single DuckDB connection.
|
|
|
|
Returns (inserted_count, duplicate_count).
|
|
"""
|
|
if not motions_data:
|
|
return 0, 0
|
|
|
|
try:
|
|
conn = duckdb.connect(self.db_path)
|
|
|
|
# 1. Find which URLs already exist — single query
|
|
urls = [m["url"] for m in motions_data]
|
|
placeholders = ", ".join("?" * len(urls))
|
|
existing_urls = set(
|
|
row[0]
|
|
for row in conn.execute(
|
|
f"SELECT url FROM motions WHERE url IN ({placeholders})", urls
|
|
).fetchall()
|
|
)
|
|
|
|
new_motions = [m for m in motions_data if m["url"] not in existing_urls]
|
|
duplicates = len(motions_data) - len(new_motions)
|
|
|
|
if not new_motions:
|
|
conn.close()
|
|
return 0, duplicates
|
|
|
|
# 2. Bulk-insert motions
|
|
motion_rows = [
|
|
(
|
|
m["title"],
|
|
m["description"] or "",
|
|
m["date"],
|
|
m["policy_area"],
|
|
json.dumps(m["voting_results"]),
|
|
m["winning_margin"],
|
|
1 - m["winning_margin"],
|
|
m["url"],
|
|
m.get("externe_identifier"),
|
|
m.get("body_text"),
|
|
)
|
|
for m in new_motions
|
|
]
|
|
conn.executemany(
|
|
"""
|
|
INSERT INTO motions
|
|
(title, description, date, policy_area, voting_results,
|
|
winning_margin, controversy_score, url, externe_identifier,
|
|
body_text, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
|
|
""",
|
|
motion_rows,
|
|
)
|
|
|
|
# 3. Fetch the newly-assigned IDs in one query
|
|
new_urls = [m["url"] for m in new_motions]
|
|
np = ", ".join("?" * len(new_urls))
|
|
url_to_id = {
|
|
row[1]: row[0]
|
|
for row in conn.execute(
|
|
f"SELECT id, url FROM motions WHERE url IN ({np})", new_urls
|
|
).fetchall()
|
|
}
|
|
|
|
# 4. Bulk-insert mp_votes
|
|
vote_rows = []
|
|
for m in new_motions:
|
|
motion_id = url_to_id.get(m["url"])
|
|
if motion_id is None:
|
|
continue
|
|
mp_vote_parties = m.get("mp_vote_parties", {})
|
|
voting_results_raw = m.get("voting_results", {})
|
|
motion_date = m.get("date", "")
|
|
for mp_name, party in mp_vote_parties.items():
|
|
vote = voting_results_raw.get(mp_name, "afwezig")
|
|
vote_rows.append((motion_id, mp_name, party, vote, motion_date))
|
|
|
|
if vote_rows:
|
|
conn.executemany(
|
|
"""
|
|
INSERT INTO mp_votes (motion_id, mp_name, party, vote, date, created_at)
|
|
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
|
|
""",
|
|
vote_rows,
|
|
)
|
|
|
|
conn.close()
|
|
return len(new_motions), duplicates
|
|
|
|
except Exception as e:
|
|
_logger.error(f"Error in batch_insert_motions: {e}")
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
raise
|
|
|
|
def get_filtered_motions(
|
|
self,
|
|
policy_area: str = "Alle",
|
|
min_margin: float = 0.2,
|
|
max_margin: float = 0.8,
|
|
limit: int = 100,
|
|
) -> List[Dict]:
|
|
"""Get motions filtered by criteria"""
|
|
conn = duckdb.connect(self.db_path)
|
|
|
|
query = """
|
|
SELECT * FROM motions
|
|
WHERE winning_margin BETWEEN ? AND ?
|
|
AND layman_explanation IS NOT NULL
|
|
AND layman_explanation != ''
|
|
"""
|
|
params = [min_margin, max_margin]
|
|
|
|
if policy_area != "Alle":
|
|
query += " AND policy_area = ?"
|
|
params.append(policy_area)
|
|
|
|
query += " ORDER BY controversy_score DESC LIMIT ?"
|
|
params.append(limit)
|
|
|
|
try:
|
|
result = conn.execute(query, params).fetchall()
|
|
columns = [desc[0] for desc in conn.description]
|
|
conn.close()
|
|
|
|
return [dict(zip(columns, row)) for row in result]
|
|
except Exception:
|
|
_logger.exception("Error querying motions")
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
return []
|
|
|
|
def get_titles_for_ids(self, ids: List[int]) -> Dict[int, Optional[str]]:
|
|
"""Return a mapping of motion id -> title for the given ids.
|
|
|
|
If DuckDB is not available, fall back to an empty mapping.
|
|
"""
|
|
out: Dict[int, Optional[str]] = {}
|
|
try:
|
|
if duckdb is None:
|
|
return out
|
|
conn = duckdb.connect(self.db_path)
|
|
placeholders = ",".join("?" for _ in ids)
|
|
rows = conn.execute(
|
|
f"SELECT id, title FROM motions WHERE id IN ({placeholders})", ids
|
|
).fetchall()
|
|
conn.close()
|
|
for r in rows:
|
|
try:
|
|
out[int(r[0])] = r[1]
|
|
except Exception:
|
|
continue
|
|
return out
|
|
except Exception:
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
_logger.exception("Error fetching titles for ids")
|
|
return out
|
|
|
|
def create_session(self, total_motions: int = 10) -> str:
|
|
"""Create new user session"""
|
|
session_id = str(uuid.uuid4())
|
|
conn = duckdb.connect(self.db_path)
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO user_sessions (session_id, user_votes, total_motions)
|
|
VALUES (?, '{}', ?)
|
|
""",
|
|
(session_id, total_motions),
|
|
)
|
|
conn.close()
|
|
return session_id
|
|
|
|
def update_user_vote(self, session_id: str, motion_id: int, vote: str):
|
|
"""Update user vote for a motion"""
|
|
conn = duckdb.connect(self.db_path)
|
|
|
|
# Get current votes
|
|
current_votes = conn.execute(
|
|
"""
|
|
SELECT user_votes FROM user_sessions WHERE session_id = ?
|
|
""",
|
|
(session_id,),
|
|
).fetchone()
|
|
|
|
if current_votes:
|
|
votes_dict = json.loads(current_votes[0])
|
|
votes_dict[str(motion_id)] = vote
|
|
|
|
conn.execute(
|
|
"""
|
|
UPDATE user_sessions
|
|
SET user_votes = ?,
|
|
completed_motions = ?,
|
|
last_updated = CURRENT_TIMESTAMP
|
|
WHERE session_id = ?
|
|
""",
|
|
(json.dumps(votes_dict), len(votes_dict), session_id),
|
|
)
|
|
|
|
conn.close()
|
|
|
|
def calculate_party_matches(self, session_id: str) -> List[Dict]:
|
|
"""Calculate party agreement percentages"""
|
|
conn = duckdb.connect(self.db_path)
|
|
|
|
# Get user votes and motion data
|
|
user_data = conn.execute(
|
|
"""
|
|
SELECT user_votes FROM user_sessions WHERE session_id = ?
|
|
""",
|
|
(session_id,),
|
|
).fetchone()
|
|
|
|
if not user_data:
|
|
return []
|
|
|
|
user_votes = json.loads(user_data[0])
|
|
motion_ids = list(user_votes.keys())
|
|
|
|
if not motion_ids:
|
|
return []
|
|
|
|
# Get motion voting results
|
|
placeholders = ",".join(["?" for _ in motion_ids])
|
|
motions = conn.execute(
|
|
f"""
|
|
SELECT id, voting_results FROM motions
|
|
WHERE id IN ({placeholders})
|
|
""",
|
|
motion_ids,
|
|
).fetchall()
|
|
|
|
conn.close()
|
|
|
|
# Calculate agreements
|
|
party_scores = {}
|
|
|
|
for motion_id, voting_results_json in motions:
|
|
voting_results = json.loads(voting_results_json)
|
|
user_vote = user_votes[str(motion_id)]
|
|
|
|
if user_vote == "Geen stem": # Skip abstentions
|
|
continue
|
|
|
|
for party, party_vote in voting_results.items():
|
|
# Skip individual MP names (contain comma, e.g. "Yesilgöz-Zegerius, D.")
|
|
# Party/fractie names never contain a comma.
|
|
if "," in party:
|
|
continue
|
|
|
|
if party not in party_scores:
|
|
party_scores[party] = {"agreed": 0, "total": 0}
|
|
|
|
party_scores[party]["total"] += 1
|
|
|
|
# Check agreement
|
|
if (user_vote == "Voor" and party_vote == "voor") or (
|
|
user_vote == "Tegen" and party_vote == "tegen"
|
|
):
|
|
party_scores[party]["agreed"] += 1
|
|
|
|
# Convert to percentages and sort
|
|
results = []
|
|
for party, scores in party_scores.items():
|
|
if scores["total"] > 0:
|
|
agreement_pct = (scores["agreed"] / scores["total"]) * 100
|
|
results.append(
|
|
{
|
|
"party": party,
|
|
"agreement_percentage": round(agreement_pct, 1),
|
|
"agreed_motions": scores["agreed"],
|
|
"total_motions": scores["total"],
|
|
}
|
|
)
|
|
|
|
return sorted(results, key=lambda x: x["agreement_percentage"], reverse=True)
|
|
|
|
def get_motions_with_individual_votes(self, k: int = 20) -> List[int]:
|
|
"""Return up to k motion IDs that have individual MP vote records.
|
|
|
|
Selects motions where at least one mp_name contains a comma (i.e.
|
|
individual MPs in 'Lastname, F.' format), ordered by controversy_score
|
|
descending so the most discriminating motions come first.
|
|
|
|
Args:
|
|
k: maximum number of motion IDs to return.
|
|
|
|
Returns:
|
|
List of motion IDs (ints), sorted by controversy_score DESC.
|
|
"""
|
|
if duckdb is None:
|
|
return []
|
|
try:
|
|
conn = duckdb.connect(self.db_path, read_only=True)
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT DISTINCT mv.motion_id, m.controversy_score
|
|
FROM mp_votes mv
|
|
JOIN motions m ON mv.motion_id = m.id
|
|
WHERE mv.mp_name LIKE '%,%'
|
|
ORDER BY m.controversy_score DESC, mv.motion_id ASC
|
|
LIMIT ?
|
|
""",
|
|
(int(k),),
|
|
).fetchall()
|
|
conn.close()
|
|
return [int(r[0]) for r in rows]
|
|
except Exception:
|
|
_logger.exception("Error in get_motions_with_individual_votes")
|
|
return []
|
|
|
|
def match_mps_for_votes(
|
|
self, user_votes: Dict[int, str], limit: int = 50
|
|
) -> List[Dict]:
|
|
"""Return per-MP agreement against provided user_votes.
|
|
|
|
Args:
|
|
user_votes: mapping motion_id -> vote token (UI or canonical).
|
|
limit: max number of MPs to return.
|
|
|
|
Returns:
|
|
List of dicts: {mp_name, party, matched, overlap, agreement_pct}
|
|
|
|
Notes:
|
|
- Normalizes common UI tokens to canonical DB tokens: 'voor','tegen','onthouden','afwezig'.
|
|
- Excludes MPs with overlap == 0.
|
|
- Requires DuckDB to be available (raises RuntimeError otherwise).
|
|
"""
|
|
if not user_votes:
|
|
raise ValueError("user_votes must be a non-empty dict")
|
|
|
|
# Normalization mapping (UI variants -> canonical)
|
|
def _norm(v: str) -> Optional[str]:
|
|
if v is None:
|
|
return None
|
|
s = str(v).strip()
|
|
if not s:
|
|
return None
|
|
s_low = s.lower()
|
|
if s_low in ("voor", "v", "yes"):
|
|
return "voor"
|
|
if s_low in ("tegen", "t", "no"):
|
|
return "tegen"
|
|
if s_low in ("onthouden", "abstain", "abstained"):
|
|
return "onthouden"
|
|
if s_low in ("geen stem", "no vote"):
|
|
return None # user chose not to answer — skip entirely
|
|
if s_low in ("afwezig", "absent"):
|
|
return "afwezig"
|
|
# already canonical?
|
|
if s_low in ("voor", "tegen", "onthouden", "afwezig"):
|
|
return s_low
|
|
# fallback: try Dutch keywords
|
|
if "voor" in s_low:
|
|
return "voor"
|
|
if "tegen" in s_low:
|
|
return "tegen"
|
|
if "onthouden" in s_low:
|
|
return "onthouden"
|
|
if "afwezig" in s_low:
|
|
return "afwezig"
|
|
return None
|
|
|
|
# Build normalized mapping and DataFrame for DuckDB
|
|
import pandas as pd
|
|
|
|
rows = []
|
|
for mid, v in user_votes.items():
|
|
try:
|
|
mid_i = int(mid)
|
|
except Exception:
|
|
raise ValueError(f"motion id must be integer-like: {mid}")
|
|
nv = _norm(v)
|
|
if nv is None:
|
|
# treat as abstain/skip (do not include in user votes)
|
|
continue
|
|
rows.append({"motion_id": mid_i, "user_vote": nv})
|
|
|
|
if not rows:
|
|
raise ValueError("After normalization no valid user votes remain")
|
|
|
|
if duckdb is None:
|
|
raise RuntimeError("DuckDB is required for match_mps_for_votes")
|
|
|
|
conn = duckdb.connect(self.db_path)
|
|
try:
|
|
uv = pd.DataFrame(rows)
|
|
# register as temporary relation
|
|
conn.register("_user_votes", uv)
|
|
|
|
q = (
|
|
"SELECT mp.mp_name, mp.party, "
|
|
"SUM(CASE WHEN lower(mp.vote)=_user_votes.user_vote THEN 1 ELSE 0 END) AS matched, "
|
|
"COUNT(*) AS overlap "
|
|
"FROM mp_votes mp JOIN _user_votes ON mp.motion_id = _user_votes.motion_id "
|
|
"WHERE mp.mp_name LIKE '%,%' "
|
|
"GROUP BY mp.mp_name, mp.party "
|
|
"HAVING COUNT(*) > 0 "
|
|
"ORDER BY (matched*1.0/overlap) DESC, matched DESC, mp.mp_name ASC "
|
|
f"LIMIT {int(limit)}"
|
|
)
|
|
|
|
rows_out = conn.execute(q).fetchall()
|
|
# columns: mp_name, party, matched, overlap
|
|
results = []
|
|
for r in rows_out:
|
|
try:
|
|
matched = int(r[2])
|
|
overlap = int(r[3])
|
|
pct = round((matched / overlap) * 100.0, 1) if overlap else 0.0
|
|
except Exception:
|
|
matched = int(r[2]) if r[2] is not None else 0
|
|
overlap = int(r[3]) if r[3] is not None else 0
|
|
pct = 0.0
|
|
results.append(
|
|
{
|
|
"mp_name": r[0],
|
|
"party": r[1],
|
|
"matched": matched,
|
|
"overlap": overlap,
|
|
"agreement_pct": pct,
|
|
}
|
|
)
|
|
conn.unregister("_user_votes")
|
|
conn.close()
|
|
return results
|
|
except Exception:
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
_logger.exception("Error in match_mps_for_votes")
|
|
return []
|
|
|
|
def choose_discriminating_motions(
|
|
self, candidates: List[str], excluded_motion_ids: List[int], k: int = 1
|
|
) -> List[int]:
|
|
"""Return top-k motion ids that best split the candidate MPs.
|
|
|
|
Scoring: Shannon entropy over vote distribution among candidate MPs for each motion.
|
|
Ties broken by higher controversy_score then lower motion id.
|
|
"""
|
|
if not candidates:
|
|
raise ValueError("candidates must be non-empty")
|
|
|
|
if duckdb is None:
|
|
raise RuntimeError("DuckDB is required for choose_discriminating_motions")
|
|
|
|
conn = duckdb.connect(self.db_path)
|
|
try:
|
|
# Prepare candidate names as a temp table
|
|
import pandas as pd
|
|
|
|
cand_df = pd.DataFrame({"mp_name": candidates})
|
|
conn.register("_candidates", cand_df)
|
|
|
|
# Build excluded list SQL fragment
|
|
excl_clause = ""
|
|
params = []
|
|
if excluded_motion_ids:
|
|
excl_clause = (
|
|
"AND mp.motion_id NOT IN ("
|
|
+ ",".join(str(int(x)) for x in excluded_motion_ids)
|
|
+ ")"
|
|
)
|
|
|
|
# Aggregate counts per motion by vote token
|
|
q = f"""
|
|
SELECT
|
|
m.id as motion_id,
|
|
m.controversy_score,
|
|
SUM(CASE WHEN lower(mp.vote) = 'voor' THEN 1 ELSE 0 END) as cnt_voor,
|
|
SUM(CASE WHEN lower(mp.vote) = 'tegen' THEN 1 ELSE 0 END) as cnt_tegen,
|
|
SUM(CASE WHEN lower(mp.vote) = 'onthouden' THEN 1 ELSE 0 END) as cnt_onthouden,
|
|
SUM(CASE WHEN lower(mp.vote) = 'afwezig' THEN 1 ELSE 0 END) as cnt_afwezig,
|
|
COUNT(*) as total_votes
|
|
FROM mp_votes mp
|
|
JOIN _candidates c ON mp.mp_name = c.mp_name
|
|
JOIN motions m ON m.id = mp.motion_id
|
|
WHERE 1=1
|
|
{excl_clause}
|
|
GROUP BY m.id, m.controversy_score
|
|
HAVING COUNT(*) > 0
|
|
"""
|
|
|
|
rows = conn.execute(q).fetchall()
|
|
conn.unregister("_candidates")
|
|
conn.close()
|
|
|
|
if not rows:
|
|
return []
|
|
|
|
import math
|
|
|
|
scored = []
|
|
for r in rows:
|
|
motion_id = int(r[0])
|
|
controversy = r[1] or 0.0
|
|
counts = [int(r[2]), int(r[3]), int(r[4]), int(r[5])]
|
|
total = int(r[6])
|
|
if total <= 0:
|
|
entropy = 0.0
|
|
else:
|
|
entropy = 0.0
|
|
for c in counts:
|
|
if c <= 0:
|
|
continue
|
|
p = c / total
|
|
entropy -= p * math.log2(p)
|
|
scored.append((motion_id, entropy, controversy))
|
|
|
|
# sort by entropy desc, controversy desc, motion_id asc
|
|
scored.sort(key=lambda x: (-x[1], -x[2], x[0]))
|
|
return [m[0] for m in scored[: int(k)]]
|
|
except Exception:
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
_logger.exception("Error in choose_discriminating_motions")
|
|
return []
|
|
|
|
def store_embedding(self, motion_id: int, model: str, vector: List[float]) -> int:
|
|
"""Store an embedding for a motion. Returns inserted row id or -1 on failure."""
|
|
try:
|
|
conn = duckdb.connect(self.db_path)
|
|
# Use explicit nextval for id since older tables may lack DEFAULT
|
|
conn.execute(
|
|
"INSERT INTO embeddings (id, motion_id, model, vector, created_at) VALUES (nextval('embeddings_id_seq'), ?, ?, ?, CURRENT_TIMESTAMP)",
|
|
(motion_id, model, json.dumps(vector)),
|
|
)
|
|
row = conn.execute("SELECT currval('embeddings_id_seq')").fetchone()
|
|
conn.close()
|
|
if row and row[0] is not None:
|
|
return int(row[0])
|
|
return -1
|
|
except Exception as e:
|
|
_logger.error("Error storing embedding: %s", e)
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
return -1
|
|
|
|
def search_similar(
|
|
self, query_vector: List[float], top_k: int = 5, model: Optional[str] = None
|
|
) -> List[Dict]:
|
|
"""Naive in-Python cosine similarity search over stored embeddings.
|
|
|
|
Returns list of dicts with keys: id, motion_id, model, score, created_at
|
|
"""
|
|
try:
|
|
conn = duckdb.connect(self.db_path)
|
|
if model:
|
|
rows = conn.execute(
|
|
"SELECT id, motion_id, model, vector, created_at FROM embeddings WHERE model = ?",
|
|
(model,),
|
|
).fetchall()
|
|
else:
|
|
rows = conn.execute(
|
|
"SELECT id, motion_id, model, vector, created_at FROM embeddings"
|
|
).fetchall()
|
|
conn.close()
|
|
|
|
results = []
|
|
import math
|
|
|
|
for r in rows:
|
|
id_, motion_id, mdl, vector_json, created_at = r
|
|
try:
|
|
vec = json.loads(vector_json)
|
|
except Exception:
|
|
continue
|
|
|
|
# cosine similarity
|
|
try:
|
|
dot = sum(float(a) * float(b) for a, b in zip(query_vector, vec))
|
|
na = math.sqrt(sum(float(a) * float(a) for a in query_vector))
|
|
nb = math.sqrt(sum(float(b) * float(b) for b in vec))
|
|
score = dot / (na * nb) if na and nb else 0.0
|
|
except Exception:
|
|
score = 0.0
|
|
|
|
results.append(
|
|
{
|
|
"id": id_,
|
|
"motion_id": motion_id,
|
|
"model": mdl,
|
|
"score": score,
|
|
"created_at": created_at,
|
|
}
|
|
)
|
|
|
|
results.sort(key=lambda x: x["score"], reverse=True)
|
|
return results[:top_k]
|
|
except Exception:
|
|
_logger.exception("Error searching embeddings")
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
return []
|
|
|
|
def mp_votes_exists_for_motion(self, motion_id: int) -> bool:
|
|
try:
|
|
conn = duckdb.connect(self.db_path)
|
|
row = conn.execute(
|
|
"SELECT COUNT(*) FROM mp_votes WHERE motion_id = ?",
|
|
(motion_id,),
|
|
).fetchone()
|
|
conn.close()
|
|
return bool(row and row[0] > 0)
|
|
except Exception as e:
|
|
_logger.error(f"Error checking mp_votes existence: {e}")
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
def insert_mp_vote(
|
|
self,
|
|
motion_id: int,
|
|
mp_name: str,
|
|
vote: str,
|
|
date: Optional[str] = None,
|
|
party: Optional[str] = None,
|
|
) -> int:
|
|
try:
|
|
conn = duckdb.connect(self.db_path)
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO mp_votes (motion_id, mp_name, party, vote, date, created_at)
|
|
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
|
|
""",
|
|
(motion_id, mp_name, party, vote, date),
|
|
)
|
|
row = conn.execute("SELECT max(id) FROM mp_votes").fetchone()
|
|
conn.close()
|
|
if row and row[0] is not None:
|
|
return int(row[0])
|
|
return -1
|
|
except Exception as e:
|
|
_logger.error(f"Error inserting mp_vote: {e}")
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
return -1
|
|
|
|
def upsert_mp_metadata(
|
|
self,
|
|
mp_name: str,
|
|
party: Optional[str],
|
|
van: Optional[str],
|
|
tot_en_met: Optional[str],
|
|
persoon_id: Optional[str],
|
|
) -> None:
|
|
try:
|
|
conn = duckdb.connect(self.db_path)
|
|
exists = conn.execute(
|
|
"SELECT COUNT(*) FROM mp_metadata WHERE mp_name = ?", (mp_name,)
|
|
).fetchone()
|
|
if exists and exists[0] > 0:
|
|
# Only update if this record is newer (higher Van date) than the stored one,
|
|
# preferring active memberships (TotEnMet IS NULL) over ended ones.
|
|
conn.execute(
|
|
"""
|
|
UPDATE mp_metadata SET party = ?, van = ?, tot_en_met = ?, persoon_id = ?
|
|
WHERE mp_name = ?
|
|
AND (
|
|
-- prefer active over ended
|
|
(? IS NULL AND tot_en_met IS NOT NULL)
|
|
-- or same active status but newer start date
|
|
OR (? IS NULL AND tot_en_met IS NULL AND CAST(? AS DATE) > CAST(van AS DATE))
|
|
OR (? IS NOT NULL AND tot_en_met IS NOT NULL AND CAST(? AS DATE) > CAST(van AS DATE))
|
|
)
|
|
""",
|
|
(
|
|
party,
|
|
van,
|
|
tot_en_met,
|
|
persoon_id,
|
|
mp_name,
|
|
tot_en_met, # prefer active
|
|
tot_en_met,
|
|
van, # both active, newer
|
|
tot_en_met,
|
|
van,
|
|
), # both ended, newer
|
|
)
|
|
else:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO mp_metadata (mp_name, party, van, tot_en_met, persoon_id)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""",
|
|
(mp_name, party, van, tot_en_met, persoon_id),
|
|
)
|
|
conn.close()
|
|
except Exception as e:
|
|
_logger.error(f"Error upserting mp_metadata: {e}")
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
|
|
def store_svd_vector(
|
|
self,
|
|
window_id: str,
|
|
entity_type: str,
|
|
entity_id: str,
|
|
vector: List[float],
|
|
model: Optional[str] = None,
|
|
) -> int:
|
|
try:
|
|
conn = duckdb.connect(self.db_path)
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO svd_vectors (window_id, entity_type, entity_id, vector, model, created_at)
|
|
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
|
|
""",
|
|
(window_id, entity_type, entity_id, json.dumps(vector), model),
|
|
)
|
|
row = conn.execute("SELECT max(id) FROM svd_vectors").fetchone()
|
|
conn.close()
|
|
if row and row[0] is not None:
|
|
return int(row[0])
|
|
return -1
|
|
except Exception as e:
|
|
_logger.error(f"Error storing svd_vector: {e}")
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
return -1
|
|
|
|
def batch_store_svd_vectors(
|
|
self,
|
|
window_id: str,
|
|
rows: List[Tuple], # each: (entity_type, entity_id, vector_list, model_or_None)
|
|
) -> int:
|
|
"""Batch-upsert SVD vectors for a window using a single connection.
|
|
|
|
Deletes all existing rows for the window first, then inserts the new batch.
|
|
Returns number of rows inserted.
|
|
"""
|
|
if not rows:
|
|
return 0
|
|
try:
|
|
conn = duckdb.connect(self.db_path)
|
|
conn.execute("DELETE FROM svd_vectors WHERE window_id = ?", (window_id,))
|
|
insert_rows = [
|
|
(window_id, entity_type, entity_id, json.dumps(vector), model)
|
|
for entity_type, entity_id, vector, model in rows
|
|
]
|
|
conn.executemany(
|
|
"""
|
|
INSERT INTO svd_vectors
|
|
(window_id, entity_type, entity_id, vector, model, created_at)
|
|
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
|
|
""",
|
|
insert_rows,
|
|
)
|
|
conn.close()
|
|
return len(insert_rows)
|
|
except Exception as e:
|
|
_logger.error(f"Error in batch_store_svd_vectors: {e}")
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
raise
|
|
|
|
def store_fused_embedding(
|
|
self,
|
|
motion_id: int,
|
|
window_id: str,
|
|
vector: List[float],
|
|
svd_dims: int,
|
|
text_dims: int,
|
|
) -> int:
|
|
try:
|
|
conn = duckdb.connect(self.db_path)
|
|
# Delete any existing row for this (motion_id, window_id) to prevent duplicates
|
|
conn.execute(
|
|
"DELETE FROM fused_embeddings WHERE motion_id = ? AND window_id = ?",
|
|
(motion_id, window_id),
|
|
)
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO fused_embeddings (motion_id, window_id, vector, svd_dims, text_dims, created_at)
|
|
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
|
|
""",
|
|
(motion_id, window_id, json.dumps(vector), svd_dims, text_dims),
|
|
)
|
|
row = conn.execute("SELECT max(id) FROM fused_embeddings").fetchone()
|
|
conn.close()
|
|
if row and row[0] is not None:
|
|
return int(row[0])
|
|
return -1
|
|
except Exception as e:
|
|
_logger.error(f"Error storing fused_embedding: {e}")
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
return -1
|
|
|
|
def store_similarity_batch(self, rows: List[Dict]) -> int:
|
|
"""Insert multiple similarity_cache rows. Returns number inserted."""
|
|
if not rows:
|
|
return 0
|
|
inserted = 0
|
|
# File-backed fallback when duckdb is not available
|
|
if duckdb is None:
|
|
sim_file = f"{self.db_path}.similarity_cache.json"
|
|
try:
|
|
with open(sim_file, "r+", encoding="utf-8") as fh:
|
|
data = json.load(fh)
|
|
# assign incremental ids
|
|
max_id = max((item.get("id", 0) for item in data), default=0)
|
|
for r in rows:
|
|
max_id += 1
|
|
entry = {
|
|
"id": max_id,
|
|
"source_motion_id": int(r["source_motion_id"]),
|
|
"target_motion_id": int(r["target_motion_id"]),
|
|
"score": float(r["score"]),
|
|
"vector_type": r["vector_type"],
|
|
"window_id": r.get("window_id"),
|
|
}
|
|
data.append(entry)
|
|
inserted += 1
|
|
fh.seek(0)
|
|
json.dump(data, fh)
|
|
fh.truncate()
|
|
return inserted
|
|
except Exception as e:
|
|
_logger.error(f"Error writing similarity cache file: {e}")
|
|
return inserted
|
|
|
|
try:
|
|
conn = duckdb.connect(self.db_path)
|
|
for r in rows:
|
|
try:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO similarity_cache (source_motion_id, target_motion_id, score, vector_type, window_id, created_at)
|
|
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
|
|
""",
|
|
(
|
|
r["source_motion_id"],
|
|
r["target_motion_id"],
|
|
float(r["score"]),
|
|
r["vector_type"],
|
|
r.get("window_id"),
|
|
),
|
|
)
|
|
inserted += 1
|
|
except Exception as e:
|
|
_logger.error(f"Error inserting similarity row {r}: {e}")
|
|
conn.close()
|
|
return inserted
|
|
except Exception as e:
|
|
_logger.error(f"Error in store_similarity_batch: {e}")
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
return inserted
|
|
|
|
def get_cached_similarities(
|
|
self,
|
|
source_motion_id: int,
|
|
vector_type: str,
|
|
window_id: Optional[str] = None,
|
|
top_k: int = 10,
|
|
) -> List[Dict]:
|
|
"""Retrieve cached similarities for a source motion.
|
|
|
|
Returns list of dicts with keys: target_motion_id, score, created_at, id
|
|
"""
|
|
# File-backed fallback
|
|
if duckdb is None:
|
|
sim_file = f"{self.db_path}.similarity_cache.json"
|
|
try:
|
|
with open(sim_file, "r", encoding="utf-8") as fh:
|
|
data = json.load(fh)
|
|
rows = [
|
|
r
|
|
for r in data
|
|
if int(r.get("source_motion_id")) == int(source_motion_id)
|
|
and r.get("vector_type") == vector_type
|
|
and (window_id is None or r.get("window_id") == window_id)
|
|
]
|
|
# sort by score desc
|
|
rows.sort(key=lambda x: float(x.get("score", 0)), reverse=True)
|
|
return rows[:top_k]
|
|
except Exception as e:
|
|
_logger.error(f"Error reading similarity cache file: {e}")
|
|
return []
|
|
|
|
try:
|
|
conn = duckdb.connect(self.db_path)
|
|
params = [source_motion_id, vector_type]
|
|
query = (
|
|
"SELECT id, target_motion_id, score, created_at FROM similarity_cache"
|
|
" WHERE source_motion_id = ? AND vector_type = ?"
|
|
)
|
|
if window_id is not None:
|
|
query += " AND window_id = ?"
|
|
params.append(window_id)
|
|
query += " ORDER BY score DESC LIMIT ?"
|
|
params.append(top_k)
|
|
|
|
rows = conn.execute(query, params).fetchall()
|
|
columns = [desc[0] for desc in conn.description]
|
|
conn.close()
|
|
return [dict(zip(columns, row)) for row in rows]
|
|
except Exception as e:
|
|
_logger.error(f"Error fetching cached similarities: {e}")
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
return []
|
|
|
|
def clear_similarity_cache(
|
|
self, vector_type: str, window_id: Optional[str] = None
|
|
) -> int:
|
|
"""Delete cached similarity rows matching vector_type and optional window_id. Returns count deleted."""
|
|
try:
|
|
# File-backed fallback
|
|
if duckdb is None:
|
|
sim_file = f"{self.db_path}.similarity_cache.json"
|
|
try:
|
|
with open(sim_file, "r+", encoding="utf-8") as fh:
|
|
data = json.load(fh)
|
|
before = len(data)
|
|
data = [
|
|
r
|
|
for r in data
|
|
if not (
|
|
r.get("vector_type") == vector_type
|
|
and (
|
|
window_id is None or r.get("window_id") == window_id
|
|
)
|
|
)
|
|
]
|
|
deleted = before - len(data)
|
|
fh.seek(0)
|
|
json.dump(data, fh)
|
|
fh.truncate()
|
|
return deleted
|
|
except Exception as e:
|
|
_logger.error(f"Error clearing similarity cache file: {e}")
|
|
return 0
|
|
|
|
conn = duckdb.connect(self.db_path)
|
|
params = [vector_type]
|
|
count_q = "SELECT COUNT(*) FROM similarity_cache WHERE vector_type = ?"
|
|
del_q = "DELETE FROM similarity_cache WHERE vector_type = ?"
|
|
if window_id is not None:
|
|
count_q += " AND window_id = ?"
|
|
del_q += " AND window_id = ?"
|
|
params.append(window_id)
|
|
|
|
row = conn.execute(count_q, params).fetchone()
|
|
to_delete = int(row[0]) if row and row[0] is not None else 0
|
|
if to_delete > 0:
|
|
conn.execute(del_q, params)
|
|
conn.close()
|
|
return to_delete
|
|
except Exception as e:
|
|
_logger.error(f"Error clearing similarity_cache: {e}")
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
return 0
|
|
|
|
|
|
db = MotionDatabase()
|
|
|