name: embeddings_similarity_pipeline rules: - Keep embedding calls batched where possible; fallback to per-item attempts on persistent batch failure. - Store raw embeddings, SVD vectors, and fused_embeddings separately; fused_embeddings are typically concatenation [svd + text]. - Compute similarity as normalized cosine on padded vectors; record top-k neighbors in similarity_cache. - Use read_only DuckDB connections in compute workers to allow parallel runs. examples: - path: pipeline/ai_provider_wrapper.py excerpt: | ```python for start in range(0, len(texts), batch_size): chunk = texts[start : start + batch_size] resp = _post_with_retries("/embeddings", json={"model": model, "input": chunk}) ... for j in range(i, end): t = texts[j] single, single_exc = _attempt_batch([t], j) if single: results[j] = single[0] ``` note: batched embed + fallback per-item retry - path: pipeline/fusion.py excerpt: | ```python try: svd_vec = json.loads(svd_json) except Exception: _logger.exception("Invalid SVD vector JSON for entity %s", entity_id) skipped_missing_svd += 1 continue ... fused = list(svd_vec) + list(text_vec) res = db.store_fused_embedding( int(entity_id), window_id, fused, svd_dims=len(svd_vec), text_dims=len(text_vec), ) ``` note: concatenation of vectors and storage via MotionDatabase - path: similarity/compute.py excerpt: | ```python # Normalize rows norms = np.linalg.norm(matrix, axis=1, keepdims=True) norms[norms == 0] = 1.0 normalized = matrix / norms sim = normalized @ normalized.T ... # pick top-k neighbors and write to similarity_cache ``` note: numeric pipeline and padding to consistent dimensionality anti_patterns: - Bad: Assuming consistent vector length without checks (leads to shape errors). remediation: Detect inconsistent lengths, pad with zeros, and log a warning (as seen in compute.py). - Bad: Recomputing heavy pipelines inline in UI requests. remediation: schedule heavy work in scripts/subprocesses and read precomputed results in UI.