"""Derive topic-based labels for SVD axes from motion content. Uses TF-IDF keyword extraction on motion titles (Dutch stopwords removed) to identify the key policy topics defining each axis. Generates a review report with suggested labels per component. Usage: uv run python3 scripts/derive_svd_labels.py --db data/motions.db --window current_parliament uv run python3 scripts/derive_svd_labels.py --db data/motions.db --window current_parliament --pool-size 50 """ from __future__ import annotations import argparse import json import logging import os import sys from collections import Counter from datetime import datetime from pathlib import Path from typing import Any import duckdb ROOT = Path(__file__).parent.parent.resolve() if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) logger = logging.getLogger("derive_svd_labels") logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") # Dutch stopwords — words too common to be informative for axis labeling DUTCH_STOPWORDS = frozenset( { "de", "het", "een", "van", "en", "in", "is", "dat", "op", "te", "voor", "met", "zijn", "aan", "niet", "om", "ook", "als", "maar", "bij", "door", "over", "naar", "uit", "dan", "was", "worden", "dit", "die", "zou", "kunnen", "moet", "worden", "worden", "heeft", "worden", "hun", "nog", "wel", "dan", "meer", "of", "tegen", "onder", "geen", "alle", "zal", "er", "zich", "na", "tot", "omdat", "hoe", "wat", "wie", "waar", "waarom", "kan", "moet", "motie", "lid", "leden", "c.s.", "over", "verzoekt", "regering", "kamer", "vaststelling", "begrotingsstaten", "ministerie", "jaar", "voorstel", "wijziging", "amendement", "gewijzigde", "nader", "gewest", "artikel", "eerste", "tweede", "derde", "vierde", "nummer", "nr", "ontvangen", "datum", "voorgesteld", "beraadslaging", "overwegende", "constaterende", } ) def load_svd_vectors(conn: duckdb.DuckDBPyConnection, window: str) -> list[dict]: """Load SVD vectors + motion metadata for the given window.""" query = """ SELECT v.entity_id AS motion_id, v.vector, m.title, m.body_text, m.policy_area, m.date FROM svd_vectors v JOIN motions m ON v.entity_id = m.id::text WHERE v.entity_type = 'motion' AND v.window_id = ? ORDER BY m.date DESC """ rows = conn.execute(query, [window]).fetchall() return [ { "motion_id": row[0], "scores": row[1], "title": row[2], "body_text": row[3], "policy_area": row[4], "date": row[5], } for row in rows ] def parse_vector(scores_json: str | list) -> list[float]: """Parse SVD scores from JSON string or list.""" if isinstance(scores_json, list): return [float(v) if v is not None else 0.0 for v in scores_json] if isinstance(scores_json, str): try: vec = json.loads(scores_json) return [float(v) if v is not None else 0.0 for v in vec] except (json.JSONDecodeError, TypeError): return [] if scores_json is None: return [] return [] def extract_keywords(title: str, n: int = 5) -> list[str]: """Extract top-n distinctive keywords from motion title. Returns lowercase words, removing stopwords and very short tokens. """ # Strip common prefixes like "Motie van het lid X.c.s." cleaned = title.lower() # Remove common motion prefix patterns import re cleaned = re.sub(r"motie van het lid \w+(\s+c\.s\.)?\s+", "", cleaned) cleaned = re.sub(r"gewijzigde motie van het lid \w+\s+", "", cleaned) cleaned = re.sub(r"nader gewijzigde motie van het lid \w+\s+", "", cleaned) cleaned = re.sub(r"amendement van het lid \w+\s+", "", cleaned) cleaned = re.sub(r"gewijzigd amendement van het lid \w+\s+", "", cleaned) cleaned = re.sub(r"voorstel tot wijziging van\s+", "", cleaned) # Remove parenthetical references cleaned = re.sub(r"\(.*?\)", " ", cleaned) # Remove remaining noise cleaned = re.sub(r"[^\w\s]", " ", cleaned) cleaned = re.sub(r"\d+", " ", cleaned) words = [ w.strip() for w in cleaned.split() if len(w) > 2 and w not in DUTCH_STOPWORDS ] return words def compute_tfidf( motions: list[dict], component_idx: int ) -> tuple[list[str], list[str]]: """Compute TF-IDF keywords for positive and negative pole of a component. Returns: (pos_keywords, neg_keywords): top-10 most distinctive words for each pole """ pos_words = [] neg_words = [] # Collect words by pole for m in motions: vec = parse_vector(m["scores"]) if len(vec) <= component_idx: continue score = vec[component_idx] words = extract_keywords(m["title"]) if score > 0: pos_words.extend(words) else: neg_words.extend(words) # TF-IDF: term freq * inverse doc freq # For a single document pool, IDF is log(N / df) where df = docs containing term # Since all motions contribute words, we compute per-pole word importance def keyword_scores(words: list[str]) -> list[str]: counter = Counter(words) total = len(words) or 1 # Score = frequency / sqrt(total) to dampen very common words scored = [(w, c / (total**0.5)) for w, c in counter.most_common(50)] # Deduplicate while preserving order seen = set() result = [] for w, s in scored: if w not in seen: seen.add(w) result.append(w) return result[:10] return keyword_scores(pos_words), keyword_scores(neg_words) def get_component_motions( motions: list[dict], component_idx: int, pool_size: int = 50, ) -> tuple[list[dict], list[dict]]: """Get top N positive and negative motions by loading for a component.""" scored = [] for m in motions: vec = parse_vector(m["scores"]) if len(vec) <= component_idx: continue score = vec[component_idx] scored.append((abs(score), score, m)) scored.sort(key=lambda x: x[0], reverse=True) pool = scored[:pool_size] pos = sorted( [(s, m) for _, s, m in pool if s > 0], key=lambda x: x[0], reverse=True, )[:5] neg = sorted( [(s, m) for _, s, m in pool if s < 0], key=lambda x: x[0], )[:5] return [m for _, m in pos], [m for _, m in neg] def build_report( motions: list[dict], n_components: int, pool_size: int, current_labels: dict[int, str], ) -> str: """Build the markdown review report.""" lines = [ f"# SVD Axis Label Review Report", f"", f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}", f"Window: current_parliament | Pool size: {pool_size}", f"", f"---", f"", f"## Methodology", f"", f"- TF-IDF keyword extraction on motion titles", f"- Dutch stopwords removed before scoring", f"- Top {pool_size} motions by absolute loading per component", f"- Top 5 positive + top 5 negative pole motions shown", f"- Suggested label derived from TF-IDF keywords + motion review", f"", f"---", f"", ] for comp in range(1, n_components + 1): idx = comp - 1 pos_motions, neg_motions = get_component_motions(motions, idx, pool_size) pos_keywords, neg_keywords = compute_tfidf(motions, idx) lines.append(f"## Component {comp}") lines.append(f"") lines.append(f"**Current label:** {current_labels.get(comp, '(none)')}") lines.append(f"") lines.append(f"**Positive pole keywords:** {', '.join(pos_keywords[:10])}") lines.append(f"") lines.append(f"**Negative pole keywords:** {', '.join(neg_keywords[:10])}") lines.append(f"") lines.append(f"**Top 5 positive-pole motions:**") for i, m in enumerate(pos_motions[:5], 1): title = m["title"][:120] + "..." if len(m["title"]) > 120 else m["title"] lines.append(f" {i}. [{m['motion_id']}] {title}") lines.append(f"") lines.append(f"**Top 5 negative-pole motions:**") for i, m in enumerate(neg_motions[:5], 1): title = m["title"][:120] + "..." if len(m["title"]) > 120 else m["title"] lines.append(f" {i}. [{m['motion_id']}] {title}") lines.append(f"") lines.append(f"**Suggested label:** _[TBD after review]_") lines.append(f"") lines.append(f"---") lines.append(f"") return "\n".join(lines) def main(): parser = argparse.ArgumentParser( description="Derive SVD axis labels from motion content" ) parser.add_argument("--db", default="data/motions.db", help="Path to motions.db") parser.add_argument( "--window", default="current_parliament", help="Parliamentary window" ) parser.add_argument( "--pool-size", type=int, default=50, help="Motions per pole (default: 50)" ) parser.add_argument( "--output", default="thoughts/explorer/svd_label_review.md", help="Output report path", ) parser.add_argument( "--n-components", type=int, default=10, help="Number of SVD components (default: 10)", ) args = parser.parse_args() db_path = ROOT / args.db if not db_path.exists(): logger.error(f"Database not found: {db_path}") sys.exit(1) conn = duckdb.connect(str(db_path), read_only=True) try: motions = load_svd_vectors(conn, args.window) logger.info(f"Loaded {len(motions)} motions for window '{args.window}'") # Current labels from config for reference current_labels = { 1: "Economische sectorbelangen versus sociale welvaart", 2: "Nationalistische versus multilateralistische oriëntatie", 3: "Verzorgingsstaat versus defensie en nationale veiligheid", 4: "Internationale solidariteit versus nationale financiële belangen", 5: "Ecologische transitie versus economische conservatie", 6: "Klimaatbeleid en milieu versus economische belangen", 7: "Praktisch-bestuurlijke vs. idealistisch-procedurele oriëntatie", 8: "Pro-Europese en cosmopolitische oriëntatie versus binnenlandse focus", 9: "Institutionele hervorming versus pragmatisch bestuur", 10: "Kritiek op overheidsbemoeienis versus bestuurlijke effectiviteit", } report = build_report( motions, args.n_components, args.pool_size, current_labels ) output_path = ROOT / args.output output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(report) logger.info(f"Report written to: {output_path}") # Also write motion JSON for reference motions_out = [] for comp in range(1, args.n_components + 1): idx = comp - 1 pos_m, neg_m = get_component_motions(motions, idx, args.pool_size) for score, m in [(+1, m) for m in pos_m] + [(-1, m) for m in neg_m]: motions_out.append( { **m, "component": comp, "pole": "positive" if score > 0 else "negative", } ) motions_path = ROOT / "thoughts/explorer/svd_label_motions.json" motions_path.write_text( json.dumps( { "window": args.window, "pool_size": args.pool_size, "rows": motions_out, }, indent=2, ensure_ascii=False, ) ) logger.info(f"Motion data written to: {motions_path}") finally: conn.close() if __name__ == "__main__": main()