You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
594 lines
22 KiB
594 lines
22 KiB
"""Generate thoughts/explorer/top_svd_top_motions.json from svd_vectors.
|
|
|
|
For each SVD component, finds the top N motions by absolute score (split
|
|
equally between positive and negative pole), joins with the motions table,
|
|
and writes the result to the output JSON file.
|
|
|
|
Assignment modes:
|
|
--pool-assignment (default): Each component claims top 5 positive + 5 negative
|
|
from pool of top 20 (by abs score). Ensures all components have motions.
|
|
--no-exclusive: Each component selects independently (may overlap).
|
|
(exclusive is deprecated, replaced by pool-assignment).
|
|
|
|
Usage:
|
|
uv run python3 scripts/generate_svd_json.py --db data/motions.db --window current_parliament
|
|
uv run python3 scripts/generate_svd_json.py --db data/motions.db --window 2025
|
|
uv run python3 scripts/generate_svd_json.py --db data/motions.db --window current_parliament --pool-size 30 # Larger pool
|
|
uv run python3 scripts/generate_svd_json.py --db data/motions.db --window current_parliament --report-top-n 20 # Detailed report
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
from datetime import datetime
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
if ROOT not in sys.path:
|
|
sys.path.insert(0, ROOT)
|
|
|
|
logger = logging.getLogger("generate_svd_json")
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
|
|
|
|
|
def find_best_component(vec: List[float], max_components: int) -> Tuple[int, float]:
|
|
"""Find component with highest absolute score within valid range.
|
|
|
|
Args:
|
|
vec: SVD vector for the motion
|
|
max_components: Maximum component index to consider
|
|
|
|
Returns: (component_index, score)
|
|
"""
|
|
if not vec:
|
|
return 0, 0.0
|
|
|
|
best_idx = 0
|
|
best_abs = abs(vec[0]) if len(vec) > 0 else 0.0
|
|
best_score = vec[0] if len(vec) > 0 else 0.0
|
|
|
|
# Only consider components within range
|
|
for i in range(min(len(vec), max_components)):
|
|
v = vec[i]
|
|
if abs(v) > best_abs:
|
|
best_abs = abs(v)
|
|
best_idx = i
|
|
best_score = v
|
|
|
|
return best_idx, best_score
|
|
|
|
|
|
def generate_markdown_report(
|
|
per_component: List[List[Tuple[int, float]]],
|
|
details_map: Dict[int, tuple],
|
|
window: str,
|
|
exclusive: bool,
|
|
report_top_n: int,
|
|
theme_labels: Optional[Dict[int, str]] = None,
|
|
) -> str:
|
|
"""Generate markdown report for label review."""
|
|
lines = [
|
|
"# SVD Motion Report",
|
|
f"",
|
|
f"**Window**: {window}",
|
|
f"**Generated**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
|
|
f"**Exclusive Assignment**: {'Yes' if exclusive else 'No'}",
|
|
f"**Motions per component**: {report_top_n} ({(report_top_n // 2)} per pole)",
|
|
f"",
|
|
f"---",
|
|
f"",
|
|
]
|
|
|
|
for comp_idx, top_motions in enumerate(per_component):
|
|
comp_num = comp_idx + 1
|
|
theme = (
|
|
theme_labels.get(comp_num, "TBD")
|
|
if theme_labels
|
|
else f"Component {comp_num}"
|
|
)
|
|
|
|
lines.append(f"## Component {comp_num}: {theme}")
|
|
lines.append(f"")
|
|
|
|
# Separate positive and negative
|
|
positive = [(mid, score) for mid, score in top_motions if score >= 0]
|
|
negative = [(mid, score) for mid, score in top_motions if score < 0]
|
|
|
|
# Sort: positive by score descending, negative by score ascending (most negative first)
|
|
positive.sort(key=lambda x: x[1], reverse=True)
|
|
negative.sort(key=lambda x: x[1])
|
|
|
|
lines.append(f"### Positive Pole ({len(positive)} motions)")
|
|
lines.append(f"")
|
|
lines.append(f"| Score | Motion ID | Title |")
|
|
lines.append(f"|-------|-----------|-------|")
|
|
for mid, score in positive:
|
|
detail = details_map.get(mid)
|
|
title = detail[1] if detail else f"Motion #{mid}"
|
|
# Truncate long titles
|
|
if title and len(title) > 80:
|
|
title = title[:77] + "..."
|
|
lines.append(f"| {score:+.3f} | {mid} | {title} |")
|
|
|
|
lines.append(f"")
|
|
lines.append(f"### Negative Pole ({len(negative)} motions)")
|
|
lines.append(f"")
|
|
lines.append(f"| Score | Motion ID | Title |")
|
|
lines.append(f"|-------|-----------|-------|")
|
|
for mid, score in negative:
|
|
detail = details_map.get(mid)
|
|
title = detail[1] if detail else f"Motion #{mid}"
|
|
# Truncate long titles
|
|
if title and len(title) > 80:
|
|
title = title[:77] + "..."
|
|
lines.append(f"| {score:+.3f} | {mid} | {title} |")
|
|
|
|
lines.append(f"")
|
|
lines.append(f"### Motion Details")
|
|
lines.append(f"")
|
|
|
|
# Show top 3 from each pole with full details
|
|
for pole_name, motions in [
|
|
("Positive", positive[:3]),
|
|
("Negative", negative[:3]),
|
|
]:
|
|
lines.append(f"#### {pole_name} Pole (top 3)")
|
|
lines.append(f"")
|
|
for mid, score in motions:
|
|
detail = details_map.get(mid)
|
|
if detail:
|
|
lines.append(f"**Motion #{mid}** (score: {score:+.3f})")
|
|
lines.append(f"- **Title**: {detail[1]}")
|
|
lines.append(f"- **Date**: {detail[3]}")
|
|
lines.append(f"- **Policy Area**: {detail[4] or 'N/A'}")
|
|
body = detail[2]
|
|
if body:
|
|
# Truncate body text
|
|
if len(body) > 500:
|
|
body = body[:497] + "..."
|
|
lines.append(f"- **Body**: {body}")
|
|
lines.append(f"")
|
|
|
|
lines.append(f"---")
|
|
lines.append(f"")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main(argv: Optional[List[str]] = None) -> int:
|
|
p = argparse.ArgumentParser(
|
|
description="Generate SVD top-motions JSON and report for a window."
|
|
)
|
|
p.add_argument("--db", default="data/motions.db", help="Path to motions.db")
|
|
p.add_argument(
|
|
"--window", default="current_parliament", help="SVD window_id to use"
|
|
)
|
|
p.add_argument(
|
|
"--top-n",
|
|
type=int,
|
|
default=10,
|
|
help="Top N motions per component for JSON output (split pos/neg)",
|
|
)
|
|
p.add_argument(
|
|
"--components", type=int, default=10, help="Number of SVD components to include"
|
|
)
|
|
p.add_argument(
|
|
"--out",
|
|
default="thoughts/explorer/top_svd_top_motions.json",
|
|
help="Output JSON file path",
|
|
)
|
|
p.add_argument(
|
|
"--no-exclusive",
|
|
action="store_true",
|
|
help="Use non-exclusive assignment (each motion can appear on multiple components). "
|
|
"Default is pool-based assignment.",
|
|
)
|
|
p.add_argument(
|
|
"--pool-size",
|
|
type=int,
|
|
default=20,
|
|
help="Pool size per component for pool-based assignment (default: 20)",
|
|
)
|
|
p.add_argument(
|
|
"--report",
|
|
action="store_true",
|
|
default=True,
|
|
help="Generate markdown report (default: True)",
|
|
)
|
|
p.add_argument(
|
|
"--no-report",
|
|
action="store_true",
|
|
help="Disable markdown report generation",
|
|
)
|
|
p.add_argument(
|
|
"--report-top-n",
|
|
type=int,
|
|
default=20,
|
|
help="Number of motions per component to show in report (default: 20)",
|
|
)
|
|
p.add_argument(
|
|
"--report-out",
|
|
default=None,
|
|
help="Output path for markdown report (default: same dir as JSON, .md extension)",
|
|
)
|
|
args = p.parse_args(argv)
|
|
|
|
# Pool-based assignment is the default; --no-exclusive switches to non-exclusive mode
|
|
pool_assignment = not args.no_exclusive
|
|
pool_size = args.pool_size if pool_assignment else 0
|
|
generate_report = args.report and not args.no_report
|
|
|
|
try:
|
|
import duckdb
|
|
except ImportError:
|
|
logger.error("duckdb not available")
|
|
return 2
|
|
|
|
con = duckdb.connect(database=args.db, read_only=True)
|
|
|
|
# Load all motion SVD vectors for the window
|
|
logger.info("Loading motion SVD vectors for window='%s' ...", args.window)
|
|
rows = con.execute(
|
|
"SELECT entity_id, vector FROM svd_vectors "
|
|
"WHERE entity_type='motion' AND window_id=?",
|
|
[args.window],
|
|
).fetchall()
|
|
|
|
if not rows:
|
|
logger.error(
|
|
"No motion vectors found for window='%s' in %s", args.window, args.db
|
|
)
|
|
con.close()
|
|
return 3
|
|
|
|
logger.info("Loaded %d motion vectors", len(rows))
|
|
|
|
# Parse vectors into {motion_id: list[float]}
|
|
motion_scores: Dict[int, List[float]] = {}
|
|
for entity_id, raw_vec in rows:
|
|
try:
|
|
if isinstance(raw_vec, str):
|
|
vec = json.loads(raw_vec)
|
|
elif isinstance(raw_vec, (bytes, bytearray)):
|
|
vec = json.loads(raw_vec.decode())
|
|
elif isinstance(raw_vec, list):
|
|
vec = raw_vec
|
|
else:
|
|
vec = list(raw_vec)
|
|
motion_scores[int(entity_id)] = [
|
|
float(v) if v is not None else 0.0 for v in vec
|
|
]
|
|
except Exception:
|
|
logger.warning("Failed to parse vector for motion_id=%s", entity_id)
|
|
|
|
logger.info("Parsed %d motion vectors", len(motion_scores))
|
|
|
|
n_positive = args.top_n // 2
|
|
n_negative = args.top_n - n_positive
|
|
|
|
report_n_positive = args.report_top_n // 2
|
|
report_n_negative = args.report_top_n - report_n_positive
|
|
|
|
output_rows: List[Dict[str, Any]] = []
|
|
all_motion_ids: List[int] = []
|
|
per_component: List[List[Tuple[int, float]]] = []
|
|
|
|
if pool_assignment:
|
|
# POOL ASSIGNMENT: greedy exclusive assignment from pools
|
|
logger.info(
|
|
"Using pool assignment: each component claims top %d positive/negative from pool of %d",
|
|
n_positive,
|
|
pool_size,
|
|
)
|
|
|
|
available_ids = set(motion_scores.keys())
|
|
motion_map = motion_scores # motion_id -> vec
|
|
|
|
for comp_idx in range(args.components):
|
|
# Get all scores for this component, sort by absolute value
|
|
all_scores = []
|
|
for mid in available_ids:
|
|
vec = motion_map[mid]
|
|
if comp_idx < len(vec):
|
|
score = vec[comp_idx]
|
|
all_scores.append((mid, score))
|
|
|
|
# Sort by absolute score descending
|
|
all_scores.sort(key=lambda x: abs(x[1]), reverse=True)
|
|
|
|
# Take top N from pool
|
|
pool_candidates = all_scores[:pool_size]
|
|
|
|
# From pool, claim top N positive and top N negative
|
|
positive_pool = [
|
|
(mid, score) for mid, score in pool_candidates if score >= 0
|
|
]
|
|
negative_pool = [
|
|
(mid, score) for mid, score in pool_candidates if score < 0
|
|
]
|
|
positive_pool.sort(key=lambda x: x[1], reverse=True) # highest first
|
|
negative_pool.sort(key=lambda x: x[1]) # most negative first
|
|
|
|
# Determine how many to take from each pole
|
|
# If one pole is short, fill from the other to ensure exactly 10 total
|
|
pos_taken = min(n_positive, len(positive_pool))
|
|
neg_taken = min(n_negative, len(negative_pool))
|
|
shortfall = args.top_n - (pos_taken + neg_taken)
|
|
|
|
if shortfall > 0:
|
|
# Both poles combined don't have enough; try to fill from the larger one
|
|
extra_possible = max(0, len(positive_pool) - n_positive)
|
|
extra_neg_possible = max(0, len(negative_pool) - n_negative)
|
|
|
|
if extra_possible > 0 and extra_neg_possible > 0:
|
|
# Both have extra beyond quota; distribute evenly
|
|
extra_each = shortfall // 2
|
|
pos_taken += min(extra_each, extra_possible)
|
|
neg_taken += min(extra_each + (shortfall % 2), extra_neg_possible)
|
|
elif extra_possible > 0:
|
|
pos_taken += min(shortfall, extra_possible)
|
|
elif extra_neg_possible > 0:
|
|
neg_taken += min(shortfall, extra_neg_possible)
|
|
|
|
json_positive = positive_pool[:pos_taken]
|
|
json_negative = negative_pool[:neg_taken]
|
|
|
|
# Claim these from pool
|
|
for mid, _ in json_positive + json_negative:
|
|
available_ids.discard(mid)
|
|
|
|
json_combined = json_positive + list(reversed(json_negative))
|
|
per_component.append(json_combined)
|
|
all_motion_ids.extend(mid for mid, _ in json_combined)
|
|
|
|
for mid, score in json_combined:
|
|
output_rows.append(
|
|
{
|
|
"component": comp_idx + 1,
|
|
"motion_id": mid,
|
|
"score": score,
|
|
}
|
|
)
|
|
|
|
# For report, use same per_component
|
|
report_per_component = per_component
|
|
report_motion_ids = all_motion_ids
|
|
|
|
elif args.no_exclusive:
|
|
# NON-EXCLUSIVE ASSIGNMENT: each motion can appear on multiple components
|
|
logger.info("Using exclusive assignment (each motion to its best component)")
|
|
|
|
# Step 1: For each motion, find its best component
|
|
motion_best: Dict[
|
|
int, Tuple[int, float]
|
|
] = {} # motion_id -> (component, score)
|
|
for mid, vec in motion_scores.items():
|
|
best_comp, best_score = find_best_component(vec, args.components)
|
|
motion_best[mid] = (best_comp, best_score)
|
|
|
|
# Step 2: Collect top motions per component
|
|
comp_scores: Dict[int, List[Tuple[int, float]]] = {
|
|
i: [] for i in range(args.components)
|
|
}
|
|
for mid, (best_comp, best_score) in motion_best.items():
|
|
comp_scores[best_comp].append((mid, best_score))
|
|
|
|
# Step 3: Sort and take top N per component for JSON output
|
|
for comp_idx in range(args.components):
|
|
scored = comp_scores[comp_idx]
|
|
scored.sort(key=lambda x: x[1], reverse=True)
|
|
|
|
# Get unique motions for positive and negative poles
|
|
# Positive: top N by score
|
|
# Negative: bottom N by score (excluding already used)
|
|
json_positive = []
|
|
json_negative = []
|
|
used_ids = set()
|
|
|
|
# Sort by score descending for positive
|
|
for mid, score in scored:
|
|
if len(json_positive) < n_positive and mid not in used_ids:
|
|
json_positive.append((mid, score))
|
|
used_ids.add(mid)
|
|
|
|
# Sort by score ascending for negative (most negative first)
|
|
for mid, score in sorted(scored, key=lambda x: x[1]):
|
|
if len(json_negative) < n_negative and mid not in used_ids:
|
|
json_negative.append((mid, score))
|
|
used_ids.add(mid)
|
|
|
|
json_combined = json_positive + list(reversed(json_negative))
|
|
per_component.append(json_combined)
|
|
|
|
# Track all IDs for fetching
|
|
all_motion_ids.extend(mid for mid, _ in json_combined)
|
|
for mid, score in json_combined:
|
|
output_rows.append(
|
|
{
|
|
"component": comp_idx + 1,
|
|
"motion_id": mid,
|
|
"score": score,
|
|
}
|
|
)
|
|
|
|
# Also track IDs for report (may need more motions)
|
|
report_all_ids: Dict[int, List[int]] = {i: [] for i in range(args.components)}
|
|
for comp_idx in range(args.components):
|
|
scored = comp_scores[comp_idx]
|
|
scored.sort(key=lambda x: x[1], reverse=True)
|
|
# Get more for report
|
|
report_ids = [mid for mid, _ in scored[: args.report_top_n]]
|
|
report_all_ids[comp_idx] = report_ids
|
|
|
|
report_motion_ids = []
|
|
for comp_idx in range(args.components):
|
|
report_motion_ids.extend(report_all_ids[comp_idx])
|
|
|
|
# Build per_component for report (with more motions)
|
|
# Report needs the same positive/negative separation as JSON output
|
|
report_per_component: List[List[Tuple[int, float]]] = []
|
|
for comp_idx in range(args.components):
|
|
scored = comp_scores[comp_idx]
|
|
scored.sort(key=lambda x: x[1], reverse=True)
|
|
|
|
# Separate positive and negative
|
|
positive = [(mid, score) for mid, score in scored if score >= 0]
|
|
negative = [(mid, score) for mid, score in scored if score < 0]
|
|
|
|
# Get top N per pole for report (same logic as JSON but more motions)
|
|
report_n_pos = args.report_top_n // 2
|
|
report_n_neg = args.report_top_n - report_n_pos
|
|
|
|
report_pos = positive[:report_n_pos]
|
|
report_neg = negative[:report_n_neg]
|
|
|
|
# Combine: positive first, then negative (reversed so most negative at end)
|
|
report_combined = report_pos + list(reversed(report_neg))
|
|
report_per_component.append(report_combined)
|
|
|
|
else:
|
|
# NON-EXCLUSIVE: each component selects its own top motions (original behavior)
|
|
logger.info(
|
|
"Using non-exclusive assignment (motions can appear on multiple components)"
|
|
)
|
|
|
|
for comp_idx in range(args.components):
|
|
scored: List[Tuple[int, float]] = []
|
|
for mid, vec in motion_scores.items():
|
|
if comp_idx < len(vec):
|
|
scored.append((mid, vec[comp_idx]))
|
|
|
|
scored.sort(key=lambda x: x[1], reverse=True)
|
|
top_positive = scored[:n_positive]
|
|
top_negative = scored[-n_negative:]
|
|
combined = top_positive + list(reversed(top_negative))
|
|
per_component.append(combined)
|
|
all_motion_ids.extend(mid for mid, _ in combined)
|
|
|
|
# For non-exclusive, each motion in per_component goes to JSON
|
|
for comp_idx, top_motions in enumerate(per_component):
|
|
for mid, score in top_motions:
|
|
output_rows.append(
|
|
{
|
|
"component": comp_idx + 1,
|
|
"motion_id": mid,
|
|
"score": score,
|
|
}
|
|
)
|
|
|
|
# For report, use same per_component
|
|
report_per_component = per_component
|
|
report_motion_ids = all_motion_ids
|
|
|
|
# Batch-fetch motion details
|
|
unique_ids = list(set(all_motion_ids))
|
|
if not unique_ids:
|
|
logger.error("No motion IDs to fetch")
|
|
con.close()
|
|
return 4
|
|
|
|
logger.info("Fetching details for %d unique motions ...", len(unique_ids))
|
|
placeholders = ", ".join("?" for _ in unique_ids)
|
|
detail_rows = con.execute(
|
|
f"SELECT id, title, body_text, date, policy_area FROM motions WHERE id IN ({placeholders})",
|
|
unique_ids,
|
|
).fetchall()
|
|
con.close()
|
|
|
|
details_map: Dict[int, tuple] = {row[0]: row for row in detail_rows}
|
|
logger.info("Fetched details for %d motions", len(details_map))
|
|
|
|
# Enrich output_rows with details
|
|
for row in output_rows:
|
|
mid = row["motion_id"]
|
|
detail = details_map.get(mid)
|
|
if detail:
|
|
row["title"] = detail[1]
|
|
row["body_text"] = detail[2]
|
|
row["date"] = str(detail[3])[:10] if detail[3] else None
|
|
row["policy_area"] = detail[4]
|
|
|
|
# Write JSON output
|
|
output: Dict[str, Any] = {
|
|
"window": args.window,
|
|
"assignment_mode": "pool" if pool_assignment else "non-exclusive",
|
|
"pool_size": pool_size if pool_assignment else None,
|
|
"rows": output_rows,
|
|
}
|
|
|
|
out_dir = os.path.dirname(args.out)
|
|
if out_dir:
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
|
|
with open(args.out, "w", encoding="utf-8") as f:
|
|
json.dump(output, f, ensure_ascii=False, indent=2)
|
|
|
|
logger.info(
|
|
"Written %d rows (%d components) to %s",
|
|
len(output_rows),
|
|
args.components,
|
|
args.out,
|
|
)
|
|
|
|
# Generate markdown report
|
|
if generate_report:
|
|
report_path = args.report_out
|
|
if report_path is None:
|
|
# Default: same directory, .md extension
|
|
base = args.out.rsplit(".", 1)[0]
|
|
report_path = base + "_report.md"
|
|
|
|
report_dir = os.path.dirname(report_path)
|
|
if report_dir:
|
|
os.makedirs(report_dir, exist_ok=True)
|
|
|
|
# Get theme labels from SVD_THEMES if available
|
|
theme_labels = None
|
|
try:
|
|
# Try to import theme labels
|
|
sys.path.insert(0, ROOT)
|
|
from explorer import SVD_THEMES
|
|
|
|
theme_labels = {
|
|
k: v.get("label", f"Component {k}") for k, v in SVD_THEMES.items()
|
|
}
|
|
except Exception:
|
|
logger.debug("Could not load theme labels, using defaults")
|
|
|
|
# For report, fetch details for all report motions
|
|
report_unique_ids = list(set(report_motion_ids))
|
|
if report_unique_ids and report_unique_ids != unique_ids:
|
|
con = duckdb.connect(database=args.db, read_only=True)
|
|
placeholders = ", ".join("?" for _ in report_unique_ids)
|
|
report_detail_rows = con.execute(
|
|
f"SELECT id, title, body_text, date, policy_area FROM motions WHERE id IN ({placeholders})",
|
|
report_unique_ids,
|
|
).fetchall()
|
|
con.close()
|
|
# Merge with existing details
|
|
for row in report_detail_rows:
|
|
details_map[row[0]] = row
|
|
|
|
markdown = generate_markdown_report(
|
|
report_per_component,
|
|
details_map,
|
|
args.window,
|
|
exclusive,
|
|
args.report_top_n,
|
|
theme_labels,
|
|
)
|
|
|
|
with open(report_path, "w", encoding="utf-8") as f:
|
|
f.write(markdown)
|
|
|
|
logger.info("Written markdown report to %s", report_path)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|
|
|