You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
95 lines
3.3 KiB
95 lines
3.3 KiB
import json
|
|
import logging
|
|
from typing import Optional
|
|
|
|
import duckdb
|
|
import pandas as pd
|
|
|
|
from database import MotionDatabase
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
def extract_mp_votes(db_path: Optional[str] = None, limit: Optional[int] = None):
|
|
"""Extract individual MP votes from motions.voting_results and store them
|
|
in the mp_votes table.
|
|
|
|
Handles both individual MP votes (ActorNaam contains comma) and party-level
|
|
votes (ActorNaam has no comma) by treating party names as actors.
|
|
|
|
Uses a single DuckDB connection with DataFrame bulk insert for performance.
|
|
|
|
Returns a dict with summary counts:
|
|
- motions_scanned: number of motions inspected
|
|
- mp_rows_inserted: number of mp_votes rows inserted
|
|
- motions_skipped: number of motions skipped because mp_votes already existed
|
|
"""
|
|
db = MotionDatabase(db_path=db_path) if db_path else MotionDatabase()
|
|
|
|
conn = duckdb.connect(db.db_path)
|
|
try:
|
|
# support optional limit to only scan a subset of motions
|
|
if limit is not None:
|
|
rows = conn.execute(
|
|
"SELECT id, voting_results, date FROM motions LIMIT ?", (limit,)
|
|
).fetchall()
|
|
else:
|
|
rows = conn.execute(
|
|
"SELECT id, voting_results, date FROM motions"
|
|
).fetchall()
|
|
except Exception as e:
|
|
conn.close()
|
|
raise e
|
|
|
|
motions_scanned = 0
|
|
motions_skipped = 0
|
|
batch = []
|
|
|
|
for motion_id, voting_results_json, date in rows:
|
|
motions_scanned += 1
|
|
|
|
# Check if mp_votes already exist for this motion
|
|
existing = conn.execute(
|
|
"SELECT COUNT(*) FROM mp_votes WHERE motion_id = ?", (motion_id,)
|
|
).fetchone()
|
|
if existing and existing[0] > 0:
|
|
_logger.debug("Skipping motion %s, mp_votes already exist", motion_id)
|
|
motions_skipped += 1
|
|
continue
|
|
|
|
# voting_results may be stored as JSON text or as native JSON; ensure it's a dict
|
|
if isinstance(voting_results_json, str):
|
|
voting_results = json.loads(voting_results_json)
|
|
else:
|
|
voting_results = voting_results_json
|
|
|
|
for actor, vote in (voting_results or {}).items():
|
|
# Individual MP names contain a comma (e.g. "Last, F.")
|
|
# Party names have no comma (e.g. "VVD", "GroenLinks-PvdA")
|
|
party = None if "," in actor else actor
|
|
batch.append((motion_id, actor, party, vote, str(date) if date else None))
|
|
|
|
# Bulk insert via DataFrame for performance (avoids per-row connection overhead)
|
|
mp_rows_inserted = 0
|
|
if batch:
|
|
try:
|
|
df = pd.DataFrame(
|
|
batch, columns=["motion_id", "mp_name", "party", "vote", "date"]
|
|
)
|
|
conn.execute(
|
|
"INSERT INTO mp_votes (motion_id, mp_name, party, vote, date) SELECT * FROM df"
|
|
)
|
|
mp_rows_inserted = len(batch)
|
|
_logger.info("Bulk inserted %d mp_votes rows", mp_rows_inserted)
|
|
except Exception as e:
|
|
_logger.error("Bulk insert failed: %s", e)
|
|
else:
|
|
_logger.info("No new mp_votes rows to insert")
|
|
|
|
conn.close()
|
|
|
|
return {
|
|
"motions_scanned": motions_scanned,
|
|
"mp_rows_inserted": mp_rows_inserted,
|
|
"motions_skipped": motions_skipped,
|
|
}
|
|
|