You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
motief/scripts/generate_svd_json.py

594 lines
22 KiB

"""Generate thoughts/explorer/top_svd_top_motions.json from svd_vectors.
For each SVD component, finds the top N motions by absolute score (split
equally between positive and negative pole), joins with the motions table,
and writes the result to the output JSON file.
Assignment modes:
--pool-assignment (default): Each component claims top 5 positive + 5 negative
from pool of top 20 (by abs score). Ensures all components have motions.
--no-exclusive: Each component selects independently (may overlap).
(exclusive is deprecated, replaced by pool-assignment).
Usage:
uv run python3 scripts/generate_svd_json.py --db data/motions.db --window current_parliament
uv run python3 scripts/generate_svd_json.py --db data/motions.db --window 2025
uv run python3 scripts/generate_svd_json.py --db data/motions.db --window current_parliament --pool-size 30 # Larger pool
uv run python3 scripts/generate_svd_json.py --db data/motions.db --window current_parliament --report-top-n 20 # Detailed report
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import sys
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT not in sys.path:
sys.path.insert(0, ROOT)
logger = logging.getLogger("generate_svd_json")
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
def find_best_component(vec: List[float], max_components: int) -> Tuple[int, float]:
"""Find component with highest absolute score within valid range.
Args:
vec: SVD vector for the motion
max_components: Maximum component index to consider
Returns: (component_index, score)
"""
if not vec:
return 0, 0.0
best_idx = 0
best_abs = abs(vec[0]) if len(vec) > 0 else 0.0
best_score = vec[0] if len(vec) > 0 else 0.0
# Only consider components within range
for i in range(min(len(vec), max_components)):
v = vec[i]
if abs(v) > best_abs:
best_abs = abs(v)
best_idx = i
best_score = v
return best_idx, best_score
def generate_markdown_report(
per_component: List[List[Tuple[int, float]]],
details_map: Dict[int, tuple],
window: str,
exclusive: bool,
report_top_n: int,
theme_labels: Optional[Dict[int, str]] = None,
) -> str:
"""Generate markdown report for label review."""
lines = [
"# SVD Motion Report",
f"",
f"**Window**: {window}",
f"**Generated**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
f"**Exclusive Assignment**: {'Yes' if exclusive else 'No'}",
f"**Motions per component**: {report_top_n} ({(report_top_n // 2)} per pole)",
f"",
f"---",
f"",
]
for comp_idx, top_motions in enumerate(per_component):
comp_num = comp_idx + 1
theme = (
theme_labels.get(comp_num, "TBD")
if theme_labels
else f"Component {comp_num}"
)
lines.append(f"## Component {comp_num}: {theme}")
lines.append(f"")
# Separate positive and negative
positive = [(mid, score) for mid, score in top_motions if score >= 0]
negative = [(mid, score) for mid, score in top_motions if score < 0]
# Sort: positive by score descending, negative by score ascending (most negative first)
positive.sort(key=lambda x: x[1], reverse=True)
negative.sort(key=lambda x: x[1])
lines.append(f"### Positive Pole ({len(positive)} motions)")
lines.append(f"")
lines.append(f"| Score | Motion ID | Title |")
lines.append(f"|-------|-----------|-------|")
for mid, score in positive:
detail = details_map.get(mid)
title = detail[1] if detail else f"Motion #{mid}"
# Truncate long titles
if title and len(title) > 80:
title = title[:77] + "..."
lines.append(f"| {score:+.3f} | {mid} | {title} |")
lines.append(f"")
lines.append(f"### Negative Pole ({len(negative)} motions)")
lines.append(f"")
lines.append(f"| Score | Motion ID | Title |")
lines.append(f"|-------|-----------|-------|")
for mid, score in negative:
detail = details_map.get(mid)
title = detail[1] if detail else f"Motion #{mid}"
# Truncate long titles
if title and len(title) > 80:
title = title[:77] + "..."
lines.append(f"| {score:+.3f} | {mid} | {title} |")
lines.append(f"")
lines.append(f"### Motion Details")
lines.append(f"")
# Show top 3 from each pole with full details
for pole_name, motions in [
("Positive", positive[:3]),
("Negative", negative[:3]),
]:
lines.append(f"#### {pole_name} Pole (top 3)")
lines.append(f"")
for mid, score in motions:
detail = details_map.get(mid)
if detail:
lines.append(f"**Motion #{mid}** (score: {score:+.3f})")
lines.append(f"- **Title**: {detail[1]}")
lines.append(f"- **Date**: {detail[3]}")
lines.append(f"- **Policy Area**: {detail[4] or 'N/A'}")
body = detail[2]
if body:
# Truncate body text
if len(body) > 500:
body = body[:497] + "..."
lines.append(f"- **Body**: {body}")
lines.append(f"")
lines.append(f"---")
lines.append(f"")
return "\n".join(lines)
def main(argv: Optional[List[str]] = None) -> int:
p = argparse.ArgumentParser(
description="Generate SVD top-motions JSON and report for a window."
)
p.add_argument("--db", default="data/motions.db", help="Path to motions.db")
p.add_argument(
"--window", default="current_parliament", help="SVD window_id to use"
)
p.add_argument(
"--top-n",
type=int,
default=10,
help="Top N motions per component for JSON output (split pos/neg)",
)
p.add_argument(
"--components", type=int, default=10, help="Number of SVD components to include"
)
p.add_argument(
"--out",
default="thoughts/explorer/top_svd_top_motions.json",
help="Output JSON file path",
)
p.add_argument(
"--no-exclusive",
action="store_true",
help="Use non-exclusive assignment (each motion can appear on multiple components). "
"Default is pool-based assignment.",
)
p.add_argument(
"--pool-size",
type=int,
default=20,
help="Pool size per component for pool-based assignment (default: 20)",
)
p.add_argument(
"--report",
action="store_true",
default=True,
help="Generate markdown report (default: True)",
)
p.add_argument(
"--no-report",
action="store_true",
help="Disable markdown report generation",
)
p.add_argument(
"--report-top-n",
type=int,
default=20,
help="Number of motions per component to show in report (default: 20)",
)
p.add_argument(
"--report-out",
default=None,
help="Output path for markdown report (default: same dir as JSON, .md extension)",
)
args = p.parse_args(argv)
# Pool-based assignment is the default; --no-exclusive switches to non-exclusive mode
pool_assignment = not args.no_exclusive
pool_size = args.pool_size if pool_assignment else 0
generate_report = args.report and not args.no_report
try:
import duckdb
except ImportError:
logger.error("duckdb not available")
return 2
con = duckdb.connect(database=args.db, read_only=True)
# Load all motion SVD vectors for the window
logger.info("Loading motion SVD vectors for window='%s' ...", args.window)
rows = con.execute(
"SELECT entity_id, vector FROM svd_vectors "
"WHERE entity_type='motion' AND window_id=?",
[args.window],
).fetchall()
if not rows:
logger.error(
"No motion vectors found for window='%s' in %s", args.window, args.db
)
con.close()
return 3
logger.info("Loaded %d motion vectors", len(rows))
# Parse vectors into {motion_id: list[float]}
motion_scores: Dict[int, List[float]] = {}
for entity_id, raw_vec in rows:
try:
if isinstance(raw_vec, str):
vec = json.loads(raw_vec)
elif isinstance(raw_vec, (bytes, bytearray)):
vec = json.loads(raw_vec.decode())
elif isinstance(raw_vec, list):
vec = raw_vec
else:
vec = list(raw_vec)
motion_scores[int(entity_id)] = [
float(v) if v is not None else 0.0 for v in vec
]
except Exception:
logger.warning("Failed to parse vector for motion_id=%s", entity_id)
logger.info("Parsed %d motion vectors", len(motion_scores))
n_positive = args.top_n // 2
n_negative = args.top_n - n_positive
report_n_positive = args.report_top_n // 2
report_n_negative = args.report_top_n - report_n_positive
output_rows: List[Dict[str, Any]] = []
all_motion_ids: List[int] = []
per_component: List[List[Tuple[int, float]]] = []
if pool_assignment:
# POOL ASSIGNMENT: greedy exclusive assignment from pools
logger.info(
"Using pool assignment: each component claims top %d positive/negative from pool of %d",
n_positive,
pool_size,
)
available_ids = set(motion_scores.keys())
motion_map = motion_scores # motion_id -> vec
for comp_idx in range(args.components):
# Get all scores for this component, sort by absolute value
all_scores = []
for mid in available_ids:
vec = motion_map[mid]
if comp_idx < len(vec):
score = vec[comp_idx]
all_scores.append((mid, score))
# Sort by absolute score descending
all_scores.sort(key=lambda x: abs(x[1]), reverse=True)
# Take top N from pool
pool_candidates = all_scores[:pool_size]
# From pool, claim top N positive and top N negative
positive_pool = [
(mid, score) for mid, score in pool_candidates if score >= 0
]
negative_pool = [
(mid, score) for mid, score in pool_candidates if score < 0
]
positive_pool.sort(key=lambda x: x[1], reverse=True) # highest first
negative_pool.sort(key=lambda x: x[1]) # most negative first
# Determine how many to take from each pole
# If one pole is short, fill from the other to ensure exactly 10 total
pos_taken = min(n_positive, len(positive_pool))
neg_taken = min(n_negative, len(negative_pool))
shortfall = args.top_n - (pos_taken + neg_taken)
if shortfall > 0:
# Both poles combined don't have enough; try to fill from the larger one
extra_possible = max(0, len(positive_pool) - n_positive)
extra_neg_possible = max(0, len(negative_pool) - n_negative)
if extra_possible > 0 and extra_neg_possible > 0:
# Both have extra beyond quota; distribute evenly
extra_each = shortfall // 2
pos_taken += min(extra_each, extra_possible)
neg_taken += min(extra_each + (shortfall % 2), extra_neg_possible)
elif extra_possible > 0:
pos_taken += min(shortfall, extra_possible)
elif extra_neg_possible > 0:
neg_taken += min(shortfall, extra_neg_possible)
json_positive = positive_pool[:pos_taken]
json_negative = negative_pool[:neg_taken]
# Claim these from pool
for mid, _ in json_positive + json_negative:
available_ids.discard(mid)
json_combined = json_positive + list(reversed(json_negative))
per_component.append(json_combined)
all_motion_ids.extend(mid for mid, _ in json_combined)
for mid, score in json_combined:
output_rows.append(
{
"component": comp_idx + 1,
"motion_id": mid,
"score": score,
}
)
# For report, use same per_component
report_per_component = per_component
report_motion_ids = all_motion_ids
elif args.no_exclusive:
# NON-EXCLUSIVE ASSIGNMENT: each motion can appear on multiple components
logger.info("Using exclusive assignment (each motion to its best component)")
# Step 1: For each motion, find its best component
motion_best: Dict[
int, Tuple[int, float]
] = {} # motion_id -> (component, score)
for mid, vec in motion_scores.items():
best_comp, best_score = find_best_component(vec, args.components)
motion_best[mid] = (best_comp, best_score)
# Step 2: Collect top motions per component
comp_scores: Dict[int, List[Tuple[int, float]]] = {
i: [] for i in range(args.components)
}
for mid, (best_comp, best_score) in motion_best.items():
comp_scores[best_comp].append((mid, best_score))
# Step 3: Sort and take top N per component for JSON output
for comp_idx in range(args.components):
scored = comp_scores[comp_idx]
scored.sort(key=lambda x: x[1], reverse=True)
# Get unique motions for positive and negative poles
# Positive: top N by score
# Negative: bottom N by score (excluding already used)
json_positive = []
json_negative = []
used_ids = set()
# Sort by score descending for positive
for mid, score in scored:
if len(json_positive) < n_positive and mid not in used_ids:
json_positive.append((mid, score))
used_ids.add(mid)
# Sort by score ascending for negative (most negative first)
for mid, score in sorted(scored, key=lambda x: x[1]):
if len(json_negative) < n_negative and mid not in used_ids:
json_negative.append((mid, score))
used_ids.add(mid)
json_combined = json_positive + list(reversed(json_negative))
per_component.append(json_combined)
# Track all IDs for fetching
all_motion_ids.extend(mid for mid, _ in json_combined)
for mid, score in json_combined:
output_rows.append(
{
"component": comp_idx + 1,
"motion_id": mid,
"score": score,
}
)
# Also track IDs for report (may need more motions)
report_all_ids: Dict[int, List[int]] = {i: [] for i in range(args.components)}
for comp_idx in range(args.components):
scored = comp_scores[comp_idx]
scored.sort(key=lambda x: x[1], reverse=True)
# Get more for report
report_ids = [mid for mid, _ in scored[: args.report_top_n]]
report_all_ids[comp_idx] = report_ids
report_motion_ids = []
for comp_idx in range(args.components):
report_motion_ids.extend(report_all_ids[comp_idx])
# Build per_component for report (with more motions)
# Report needs the same positive/negative separation as JSON output
report_per_component: List[List[Tuple[int, float]]] = []
for comp_idx in range(args.components):
scored = comp_scores[comp_idx]
scored.sort(key=lambda x: x[1], reverse=True)
# Separate positive and negative
positive = [(mid, score) for mid, score in scored if score >= 0]
negative = [(mid, score) for mid, score in scored if score < 0]
# Get top N per pole for report (same logic as JSON but more motions)
report_n_pos = args.report_top_n // 2
report_n_neg = args.report_top_n - report_n_pos
report_pos = positive[:report_n_pos]
report_neg = negative[:report_n_neg]
# Combine: positive first, then negative (reversed so most negative at end)
report_combined = report_pos + list(reversed(report_neg))
report_per_component.append(report_combined)
else:
# NON-EXCLUSIVE: each component selects its own top motions (original behavior)
logger.info(
"Using non-exclusive assignment (motions can appear on multiple components)"
)
for comp_idx in range(args.components):
scored: List[Tuple[int, float]] = []
for mid, vec in motion_scores.items():
if comp_idx < len(vec):
scored.append((mid, vec[comp_idx]))
scored.sort(key=lambda x: x[1], reverse=True)
top_positive = scored[:n_positive]
top_negative = scored[-n_negative:]
combined = top_positive + list(reversed(top_negative))
per_component.append(combined)
all_motion_ids.extend(mid for mid, _ in combined)
# For non-exclusive, each motion in per_component goes to JSON
for comp_idx, top_motions in enumerate(per_component):
for mid, score in top_motions:
output_rows.append(
{
"component": comp_idx + 1,
"motion_id": mid,
"score": score,
}
)
# For report, use same per_component
report_per_component = per_component
report_motion_ids = all_motion_ids
# Batch-fetch motion details
unique_ids = list(set(all_motion_ids))
if not unique_ids:
logger.error("No motion IDs to fetch")
con.close()
return 4
logger.info("Fetching details for %d unique motions ...", len(unique_ids))
placeholders = ", ".join("?" for _ in unique_ids)
detail_rows = con.execute(
f"SELECT id, title, body_text, date, policy_area FROM motions WHERE id IN ({placeholders})",
unique_ids,
).fetchall()
con.close()
details_map: Dict[int, tuple] = {row[0]: row for row in detail_rows}
logger.info("Fetched details for %d motions", len(details_map))
# Enrich output_rows with details
for row in output_rows:
mid = row["motion_id"]
detail = details_map.get(mid)
if detail:
row["title"] = detail[1]
row["body_text"] = detail[2]
row["date"] = str(detail[3])[:10] if detail[3] else None
row["policy_area"] = detail[4]
# Write JSON output
output: Dict[str, Any] = {
"window": args.window,
"assignment_mode": "pool" if pool_assignment else "non-exclusive",
"pool_size": pool_size if pool_assignment else None,
"rows": output_rows,
}
out_dir = os.path.dirname(args.out)
if out_dir:
os.makedirs(out_dir, exist_ok=True)
with open(args.out, "w", encoding="utf-8") as f:
json.dump(output, f, ensure_ascii=False, indent=2)
logger.info(
"Written %d rows (%d components) to %s",
len(output_rows),
args.components,
args.out,
)
# Generate markdown report
if generate_report:
report_path = args.report_out
if report_path is None:
# Default: same directory, .md extension
base = args.out.rsplit(".", 1)[0]
report_path = base + "_report.md"
report_dir = os.path.dirname(report_path)
if report_dir:
os.makedirs(report_dir, exist_ok=True)
# Get theme labels from SVD_THEMES if available
theme_labels = None
try:
# Try to import theme labels
sys.path.insert(0, ROOT)
from explorer import SVD_THEMES
theme_labels = {
k: v.get("label", f"Component {k}") for k, v in SVD_THEMES.items()
}
except Exception:
logger.debug("Could not load theme labels, using defaults")
# For report, fetch details for all report motions
report_unique_ids = list(set(report_motion_ids))
if report_unique_ids and report_unique_ids != unique_ids:
con = duckdb.connect(database=args.db, read_only=True)
placeholders = ", ".join("?" for _ in report_unique_ids)
report_detail_rows = con.execute(
f"SELECT id, title, body_text, date, policy_area FROM motions WHERE id IN ({placeholders})",
report_unique_ids,
).fetchall()
con.close()
# Merge with existing details
for row in report_detail_rows:
details_map[row[0]] = row
markdown = generate_markdown_report(
report_per_component,
details_map,
args.window,
exclusive,
args.report_top_n,
theme_labels,
)
with open(report_path, "w", encoding="utf-8") as f:
f.write(markdown)
logger.info("Written markdown report to %s", report_path)
return 0
if __name__ == "__main__":
raise SystemExit(main())