--- title: Embeddings Similarity Pipeline category: patterns --- # Embeddings Similarity Pipeline ## Rules - Keep embedding calls batched where possible; fallback to per-item attempts on persistent batch failure. - Store raw embeddings, SVD vectors, and fused_embeddings separately; fused_embeddings are typically concatenation [svd + text]. - Compute similarity as normalized cosine on padded vectors; record top-k neighbors in similarity_cache. - Use read_only DuckDB connections in compute workers to allow parallel runs. ## Examples ### pipeline/ai_provider_wrapper.py - Batched embed + fallback ```python for start in range(0, len(texts), batch_size): chunk = texts[start : start + batch_size] resp = _post_with_retries("/embeddings", json={"model": model, "input": chunk}) ... for j in range(i, end): t = texts[j] single, single_exc = _attempt_batch([t], j) if single: results[j] = single[0] ``` ### pipeline/fusion.py - Concatenation and storage ```python try: svd_vec = json.loads(svd_json) except Exception: _logger.exception("Invalid SVD vector JSON for entity %s", entity_id) skipped_missing_svd += 1 continue ... fused = list(svd_vec) + list(text_vec) res = db.store_fused_embedding( int(entity_id), window_id, fused, svd_dims=len(svd_vec), text_dims=len(text_vec), ) ``` ### similarity/compute.py - Normalized cosine similarity ```python # Normalize rows norms = np.linalg.norm(matrix, axis=1, keepdims=True) norms[norms == 0] = 1.0 normalized = matrix / norms sim = normalized @ normalized.T ... # pick top-k neighbors and write to similarity_cache ``` ## Anti-Patterns ### Bad: Assuming consistent vector length **Problem**: Assuming consistent vector length without checks leads to shape errors. **Remediation**: Detect inconsistent lengths, pad with zeros, and log a warning (as seen in compute.py). ### Bad: Inline heavy computation in UI **Problem**: Recomputing heavy pipelines inline in UI requests. **Remediation**: Schedule heavy work in scripts/subprocesses and read precomputed results in UI.