"""Generate thoughts/explorer/top_svd_top_motions.json from svd_vectors. For each SVD component, finds the top N motions by absolute score (split equally between positive and negative pole), joins with the motions table, and writes the result to the output JSON file. Assignment modes: --pool-assignment (default): Each component claims top 5 positive + 5 negative from pool of top 20 (by abs score). Ensures all components have motions. --no-exclusive: Each component selects independently (may overlap). (exclusive is deprecated, replaced by pool-assignment). Usage: uv run python3 scripts/generate_svd_json.py --db data/motions.db --window current_parliament uv run python3 scripts/generate_svd_json.py --db data/motions.db --window 2025 uv run python3 scripts/generate_svd_json.py --db data/motions.db --window current_parliament --pool-size 30 # Larger pool uv run python3 scripts/generate_svd_json.py --db data/motions.db --window current_parliament --report-top-n 20 # Detailed report """ from __future__ import annotations import argparse import json import logging import os import sys from datetime import datetime from typing import Any, Dict, List, Optional, Tuple ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT not in sys.path: sys.path.insert(0, ROOT) logger = logging.getLogger("generate_svd_json") logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") def find_best_component(vec: List[float], max_components: int) -> Tuple[int, float]: """Find component with highest absolute score within valid range. Args: vec: SVD vector for the motion max_components: Maximum component index to consider Returns: (component_index, score) """ if not vec: return 0, 0.0 best_idx = 0 best_abs = abs(vec[0]) if len(vec) > 0 else 0.0 best_score = vec[0] if len(vec) > 0 else 0.0 # Only consider components within range for i in range(min(len(vec), max_components)): v = vec[i] if abs(v) > best_abs: best_abs = abs(v) best_idx = i best_score = v return best_idx, best_score def generate_markdown_report( per_component: List[List[Tuple[int, float]]], details_map: Dict[int, tuple], window: str, exclusive: bool, report_top_n: int, theme_labels: Optional[Dict[int, str]] = None, ) -> str: """Generate markdown report for label review.""" lines = [ "# SVD Motion Report", f"", f"**Window**: {window}", f"**Generated**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", f"**Exclusive Assignment**: {'Yes' if exclusive else 'No'}", f"**Motions per component**: {report_top_n} ({(report_top_n // 2)} per pole)", f"", f"---", f"", ] for comp_idx, top_motions in enumerate(per_component): comp_num = comp_idx + 1 theme = ( theme_labels.get(comp_num, "TBD") if theme_labels else f"Component {comp_num}" ) lines.append(f"## Component {comp_num}: {theme}") lines.append(f"") # Separate positive and negative positive = [(mid, score) for mid, score in top_motions if score >= 0] negative = [(mid, score) for mid, score in top_motions if score < 0] # Sort: positive by score descending, negative by score ascending (most negative first) positive.sort(key=lambda x: x[1], reverse=True) negative.sort(key=lambda x: x[1]) lines.append(f"### Positive Pole ({len(positive)} motions)") lines.append(f"") lines.append(f"| Score | Motion ID | Title |") lines.append(f"|-------|-----------|-------|") for mid, score in positive: detail = details_map.get(mid) title = detail[1] if detail else f"Motion #{mid}" # Truncate long titles if title and len(title) > 80: title = title[:77] + "..." lines.append(f"| {score:+.3f} | {mid} | {title} |") lines.append(f"") lines.append(f"### Negative Pole ({len(negative)} motions)") lines.append(f"") lines.append(f"| Score | Motion ID | Title |") lines.append(f"|-------|-----------|-------|") for mid, score in negative: detail = details_map.get(mid) title = detail[1] if detail else f"Motion #{mid}" # Truncate long titles if title and len(title) > 80: title = title[:77] + "..." lines.append(f"| {score:+.3f} | {mid} | {title} |") lines.append(f"") lines.append(f"### Motion Details") lines.append(f"") # Show top 3 from each pole with full details for pole_name, motions in [ ("Positive", positive[:3]), ("Negative", negative[:3]), ]: lines.append(f"#### {pole_name} Pole (top 3)") lines.append(f"") for mid, score in motions: detail = details_map.get(mid) if detail: lines.append(f"**Motion #{mid}** (score: {score:+.3f})") lines.append(f"- **Title**: {detail[1]}") lines.append(f"- **Date**: {detail[3]}") lines.append(f"- **Policy Area**: {detail[4] or 'N/A'}") body = detail[2] if body: # Truncate body text if len(body) > 500: body = body[:497] + "..." lines.append(f"- **Body**: {body}") lines.append(f"") lines.append(f"---") lines.append(f"") return "\n".join(lines) def main(argv: Optional[List[str]] = None) -> int: p = argparse.ArgumentParser( description="Generate SVD top-motions JSON and report for a window." ) p.add_argument("--db", default="data/motions.db", help="Path to motions.db") p.add_argument( "--window", default="current_parliament", help="SVD window_id to use" ) p.add_argument( "--top-n", type=int, default=10, help="Top N motions per component for JSON output (split pos/neg)", ) p.add_argument( "--components", type=int, default=10, help="Number of SVD components to include" ) p.add_argument( "--out", default="thoughts/explorer/top_svd_top_motions.json", help="Output JSON file path", ) p.add_argument( "--no-exclusive", action="store_true", help="Use non-exclusive assignment (each motion can appear on multiple components). " "Default is pool-based assignment.", ) p.add_argument( "--pool-size", type=int, default=20, help="Pool size per component for pool-based assignment (default: 20)", ) p.add_argument( "--report", action="store_true", default=True, help="Generate markdown report (default: True)", ) p.add_argument( "--no-report", action="store_true", help="Disable markdown report generation", ) p.add_argument( "--report-top-n", type=int, default=20, help="Number of motions per component to show in report (default: 20)", ) p.add_argument( "--report-out", default=None, help="Output path for markdown report (default: same dir as JSON, .md extension)", ) args = p.parse_args(argv) # Pool-based assignment is the default; --no-exclusive switches to non-exclusive mode pool_assignment = not args.no_exclusive pool_size = args.pool_size if pool_assignment else 0 generate_report = args.report and not args.no_report try: import duckdb except ImportError: logger.error("duckdb not available") return 2 con = duckdb.connect(database=args.db, read_only=True) # Load all motion SVD vectors for the window logger.info("Loading motion SVD vectors for window='%s' ...", args.window) rows = con.execute( "SELECT entity_id, vector FROM svd_vectors " "WHERE entity_type='motion' AND window_id=?", [args.window], ).fetchall() if not rows: logger.error( "No motion vectors found for window='%s' in %s", args.window, args.db ) con.close() return 3 logger.info("Loaded %d motion vectors", len(rows)) # Parse vectors into {motion_id: list[float]} motion_scores: Dict[int, List[float]] = {} for entity_id, raw_vec in rows: try: if isinstance(raw_vec, str): vec = json.loads(raw_vec) elif isinstance(raw_vec, (bytes, bytearray)): vec = json.loads(raw_vec.decode()) elif isinstance(raw_vec, list): vec = raw_vec else: vec = list(raw_vec) motion_scores[int(entity_id)] = [ float(v) if v is not None else 0.0 for v in vec ] except Exception: logger.warning("Failed to parse vector for motion_id=%s", entity_id) logger.info("Parsed %d motion vectors", len(motion_scores)) n_positive = args.top_n // 2 n_negative = args.top_n - n_positive report_n_positive = args.report_top_n // 2 report_n_negative = args.report_top_n - report_n_positive output_rows: List[Dict[str, Any]] = [] all_motion_ids: List[int] = [] per_component: List[List[Tuple[int, float]]] = [] if pool_assignment: # POOL ASSIGNMENT: greedy exclusive assignment from pools logger.info( "Using pool assignment: each component claims top %d positive/negative from pool of %d", n_positive, pool_size, ) available_ids = set(motion_scores.keys()) motion_map = motion_scores # motion_id -> vec for comp_idx in range(args.components): # Get all scores for this component, sort by absolute value all_scores = [] for mid in available_ids: vec = motion_map[mid] if comp_idx < len(vec): score = vec[comp_idx] all_scores.append((mid, score)) # Sort by absolute score descending all_scores.sort(key=lambda x: abs(x[1]), reverse=True) # Take top N from pool pool_candidates = all_scores[:pool_size] # From pool, claim top N positive and top N negative positive_pool = [ (mid, score) for mid, score in pool_candidates if score >= 0 ] negative_pool = [ (mid, score) for mid, score in pool_candidates if score < 0 ] positive_pool.sort(key=lambda x: x[1], reverse=True) # highest first negative_pool.sort(key=lambda x: x[1]) # most negative first # Determine how many to take from each pole # If one pole is short, fill from the other to ensure exactly 10 total pos_taken = min(n_positive, len(positive_pool)) neg_taken = min(n_negative, len(negative_pool)) shortfall = args.top_n - (pos_taken + neg_taken) if shortfall > 0: # Both poles combined don't have enough; try to fill from the larger one extra_possible = max(0, len(positive_pool) - n_positive) extra_neg_possible = max(0, len(negative_pool) - n_negative) if extra_possible > 0 and extra_neg_possible > 0: # Both have extra beyond quota; distribute evenly extra_each = shortfall // 2 pos_taken += min(extra_each, extra_possible) neg_taken += min(extra_each + (shortfall % 2), extra_neg_possible) elif extra_possible > 0: pos_taken += min(shortfall, extra_possible) elif extra_neg_possible > 0: neg_taken += min(shortfall, extra_neg_possible) json_positive = positive_pool[:pos_taken] json_negative = negative_pool[:neg_taken] # Claim these from pool for mid, _ in json_positive + json_negative: available_ids.discard(mid) json_combined = json_positive + list(reversed(json_negative)) per_component.append(json_combined) all_motion_ids.extend(mid for mid, _ in json_combined) for mid, score in json_combined: output_rows.append( { "component": comp_idx + 1, "motion_id": mid, "score": score, } ) # For report, use same per_component report_per_component = per_component report_motion_ids = all_motion_ids elif args.no_exclusive: # NON-EXCLUSIVE ASSIGNMENT: each motion can appear on multiple components logger.info("Using exclusive assignment (each motion to its best component)") # Step 1: For each motion, find its best component motion_best: Dict[ int, Tuple[int, float] ] = {} # motion_id -> (component, score) for mid, vec in motion_scores.items(): best_comp, best_score = find_best_component(vec, args.components) motion_best[mid] = (best_comp, best_score) # Step 2: Collect top motions per component comp_scores: Dict[int, List[Tuple[int, float]]] = { i: [] for i in range(args.components) } for mid, (best_comp, best_score) in motion_best.items(): comp_scores[best_comp].append((mid, best_score)) # Step 3: Sort and take top N per component for JSON output for comp_idx in range(args.components): scored = comp_scores[comp_idx] scored.sort(key=lambda x: x[1], reverse=True) # Get unique motions for positive and negative poles # Positive: top N by score # Negative: bottom N by score (excluding already used) json_positive = [] json_negative = [] used_ids = set() # Sort by score descending for positive for mid, score in scored: if len(json_positive) < n_positive and mid not in used_ids: json_positive.append((mid, score)) used_ids.add(mid) # Sort by score ascending for negative (most negative first) for mid, score in sorted(scored, key=lambda x: x[1]): if len(json_negative) < n_negative and mid not in used_ids: json_negative.append((mid, score)) used_ids.add(mid) json_combined = json_positive + list(reversed(json_negative)) per_component.append(json_combined) # Track all IDs for fetching all_motion_ids.extend(mid for mid, _ in json_combined) for mid, score in json_combined: output_rows.append( { "component": comp_idx + 1, "motion_id": mid, "score": score, } ) # Also track IDs for report (may need more motions) report_all_ids: Dict[int, List[int]] = {i: [] for i in range(args.components)} for comp_idx in range(args.components): scored = comp_scores[comp_idx] scored.sort(key=lambda x: x[1], reverse=True) # Get more for report report_ids = [mid for mid, _ in scored[: args.report_top_n]] report_all_ids[comp_idx] = report_ids report_motion_ids = [] for comp_idx in range(args.components): report_motion_ids.extend(report_all_ids[comp_idx]) # Build per_component for report (with more motions) # Report needs the same positive/negative separation as JSON output report_per_component: List[List[Tuple[int, float]]] = [] for comp_idx in range(args.components): scored = comp_scores[comp_idx] scored.sort(key=lambda x: x[1], reverse=True) # Separate positive and negative positive = [(mid, score) for mid, score in scored if score >= 0] negative = [(mid, score) for mid, score in scored if score < 0] # Get top N per pole for report (same logic as JSON but more motions) report_n_pos = args.report_top_n // 2 report_n_neg = args.report_top_n - report_n_pos report_pos = positive[:report_n_pos] report_neg = negative[:report_n_neg] # Combine: positive first, then negative (reversed so most negative at end) report_combined = report_pos + list(reversed(report_neg)) report_per_component.append(report_combined) else: # NON-EXCLUSIVE: each component selects its own top motions (original behavior) logger.info( "Using non-exclusive assignment (motions can appear on multiple components)" ) for comp_idx in range(args.components): scored: List[Tuple[int, float]] = [] for mid, vec in motion_scores.items(): if comp_idx < len(vec): scored.append((mid, vec[comp_idx])) scored.sort(key=lambda x: x[1], reverse=True) top_positive = scored[:n_positive] top_negative = scored[-n_negative:] combined = top_positive + list(reversed(top_negative)) per_component.append(combined) all_motion_ids.extend(mid for mid, _ in combined) # For non-exclusive, each motion in per_component goes to JSON for comp_idx, top_motions in enumerate(per_component): for mid, score in top_motions: output_rows.append( { "component": comp_idx + 1, "motion_id": mid, "score": score, } ) # For report, use same per_component report_per_component = per_component report_motion_ids = all_motion_ids # Batch-fetch motion details unique_ids = list(set(all_motion_ids)) if not unique_ids: logger.error("No motion IDs to fetch") con.close() return 4 logger.info("Fetching details for %d unique motions ...", len(unique_ids)) placeholders = ", ".join("?" for _ in unique_ids) detail_rows = con.execute( f"SELECT id, title, body_text, date, policy_area FROM motions WHERE id IN ({placeholders})", unique_ids, ).fetchall() con.close() details_map: Dict[int, tuple] = {row[0]: row for row in detail_rows} logger.info("Fetched details for %d motions", len(details_map)) # Enrich output_rows with details for row in output_rows: mid = row["motion_id"] detail = details_map.get(mid) if detail: row["title"] = detail[1] row["body_text"] = detail[2] row["date"] = str(detail[3])[:10] if detail[3] else None row["policy_area"] = detail[4] # Write JSON output output: Dict[str, Any] = { "window": args.window, "assignment_mode": "pool" if pool_assignment else "non-exclusive", "pool_size": pool_size if pool_assignment else None, "rows": output_rows, } out_dir = os.path.dirname(args.out) if out_dir: os.makedirs(out_dir, exist_ok=True) with open(args.out, "w", encoding="utf-8") as f: json.dump(output, f, ensure_ascii=False, indent=2) logger.info( "Written %d rows (%d components) to %s", len(output_rows), args.components, args.out, ) # Generate markdown report if generate_report: report_path = args.report_out if report_path is None: # Default: same directory, .md extension base = args.out.rsplit(".", 1)[0] report_path = base + "_report.md" report_dir = os.path.dirname(report_path) if report_dir: os.makedirs(report_dir, exist_ok=True) # Get theme labels from SVD_THEMES if available theme_labels = None try: # Try to import theme labels sys.path.insert(0, ROOT) from explorer import SVD_THEMES theme_labels = { k: v.get("label", f"Component {k}") for k, v in SVD_THEMES.items() } except Exception: logger.debug("Could not load theme labels, using defaults") # For report, fetch details for all report motions report_unique_ids = list(set(report_motion_ids)) if report_unique_ids and report_unique_ids != unique_ids: con = duckdb.connect(database=args.db, read_only=True) placeholders = ", ".join("?" for _ in report_unique_ids) report_detail_rows = con.execute( f"SELECT id, title, body_text, date, policy_area FROM motions WHERE id IN ({placeholders})", report_unique_ids, ).fetchall() con.close() # Merge with existing details for row in report_detail_rows: details_map[row[0]] = row markdown = generate_markdown_report( report_per_component, details_map, args.window, exclusive, args.report_top_n, theme_labels, ) with open(report_path, "w", encoding="utf-8") as f: f.write(markdown) logger.info("Written markdown report to %s", report_path) return 0 if __name__ == "__main__": raise SystemExit(main())