feat: add pipeline health checks module and CLI runner

- Create health/ package with HealthStatus, HealthCheck, HealthReport
- Add check_motion_freshness, check_embedding_coverage, check_llm_coverage
- Add scripts/health_check.py CLI with text/JSON output and exit codes
- Add comprehensive tests for core, checks, and CLI

P4-005: Pipeline health checks
main
Sven Geboers 4 weeks ago
parent 04cc62ea06
commit e352d7c7bc
  1. 42
      health/__init__.py
  2. 140
      health/checks.py
  3. 98
      scripts/health_check.py
  4. 69
      tests/scripts/test_health_check.py
  5. 92
      tests/test_health_checks.py
  6. 54
      tests/test_health_core.py

@ -0,0 +1,42 @@
from dataclasses import dataclass
from enum import Enum
from typing import List
class HealthStatus(Enum):
OK = "ok"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class HealthCheck:
name: str
status: HealthStatus
message: str
details: dict
@dataclass
class HealthReport:
checks: List[HealthCheck]
@property
def status(self) -> HealthStatus:
if any(c.status == HealthStatus.CRITICAL for c in self.checks):
return HealthStatus.CRITICAL
if any(c.status == HealthStatus.WARNING for c in self.checks):
return HealthStatus.WARNING
return HealthStatus.OK
@property
def exit_code(self) -> int:
if self.status == HealthStatus.CRITICAL:
return 2
if self.status == HealthStatus.WARNING:
return 1
return 0
def run_checks(checks: List[HealthCheck]) -> HealthReport:
return HealthReport(checks=checks)

@ -0,0 +1,140 @@
from datetime import datetime, timedelta
from typing import Any, Dict, Optional
from health import HealthCheck, HealthStatus
def check_motion_freshness(
conn: Any,
max_age_days: int = 7,
min_motions: int = 100,
) -> HealthCheck:
try:
result = conn.execute(
"SELECT COUNT(*) FROM motions WHERE date >= ?",
[datetime.now() - timedelta(days=max_age_days)],
).fetchone()
count = result[0] if result else 0
except Exception as e:
return HealthCheck(
name="motion_freshness",
status=HealthStatus.CRITICAL,
message=f"Could not query motion freshness: {e}",
details={},
)
if count == 0:
return HealthCheck(
name="motion_freshness",
status=HealthStatus.CRITICAL,
message=f"No motions in last {max_age_days} days",
details={"count": 0, "threshold": max_age_days},
)
if count < min_motions:
return HealthCheck(
name="motion_freshness",
status=HealthStatus.WARNING,
message=f"Only {count} motions in last {max_age_days} days (expected >= {min_motions})",
details={"count": count, "threshold": max_age_days, "min_expected": min_motions},
)
return HealthCheck(
name="motion_freshness",
status=HealthStatus.OK,
message=f"{count} motions in last {max_age_days} days",
details={"count": count, "threshold": max_age_days},
)
def check_embedding_coverage(
conn: Any,
min_coverage: float = 0.95,
) -> HealthCheck:
try:
total_result = conn.execute("SELECT COUNT(*) FROM motions").fetchone()
total = total_result[0] if total_result else 0
if total == 0:
return HealthCheck(
name="embedding_coverage",
status=HealthStatus.CRITICAL,
message="No motions in database",
details={"total": 0, "with_embeddings": 0, "coverage": 0.0},
)
embed_result = conn.execute(
"SELECT COUNT(DISTINCT motion_id) FROM fused_embeddings"
).fetchone()
with_embeddings = embed_result[0] if embed_result else 0
coverage = with_embeddings / total
except Exception as e:
return HealthCheck(
name="embedding_coverage",
status=HealthStatus.CRITICAL,
message=f"Could not query embedding coverage: {e}",
details={},
)
if coverage < min_coverage:
return HealthCheck(
name="embedding_coverage",
status=HealthStatus.WARNING,
message=f"Embedding coverage {coverage:.1%} (expected >= {min_coverage:.0%})",
details={"total": total, "with_embeddings": with_embeddings, "coverage": coverage},
)
return HealthCheck(
name="embedding_coverage",
status=HealthStatus.OK,
message=f"Embedding coverage {coverage:.1%}",
details={"total": total, "with_embeddings": with_embeddings, "coverage": coverage},
)
def check_llm_coverage(
conn: Any,
max_missing_ratio: float = 0.15,
) -> HealthCheck:
try:
total_result = conn.execute("SELECT COUNT(*) FROM motions").fetchone()
total = total_result[0] if total_result else 0
if total == 0:
return HealthCheck(
name="llm_coverage",
status=HealthStatus.CRITICAL,
message="No motions in database",
details={"total": 0, "missing": 0, "missing_ratio": 0.0},
)
missing_result = conn.execute(
"SELECT COUNT(*) FROM motions WHERE layman_explanation IS NULL OR layman_explanation = ''"
).fetchone()
missing = missing_result[0] if missing_result else 0
missing_ratio = missing / total
except Exception as e:
return HealthCheck(
name="llm_coverage",
status=HealthStatus.CRITICAL,
message=f"Could not query LLM coverage: {e}",
details={},
)
if missing_ratio > max_missing_ratio:
return HealthCheck(
name="llm_coverage",
status=HealthStatus.CRITICAL,
message=f"{missing_ratio:.1%} missing layman explanations ({missing}/{total})",
details={"total": total, "missing": missing, "missing_ratio": missing_ratio},
)
if missing_ratio > 0.05:
return HealthCheck(
name="llm_coverage",
status=HealthStatus.WARNING,
message=f"{missing_ratio:.1%} missing layman explanations ({missing}/{total})",
details={"total": total, "missing": missing, "missing_ratio": missing_ratio},
)
return HealthCheck(
name="llm_coverage",
status=HealthStatus.OK,
message=f"{missing_ratio:.1%} missing layman explanations ({missing}/{total})",
details={"total": total, "missing": missing, "missing_ratio": missing_ratio},
)

@ -0,0 +1,98 @@
#!/usr/bin/env python3
"""Pipeline health check CLI runner.
Exit codes:
0 = all healthy
1 = any warning
2 = any critical
"""
import argparse
import json
import sys
import duckdb
from config import config
from health import run_checks
from health.checks import check_embedding_coverage, check_llm_coverage, check_motion_freshness
def main() -> int:
parser = argparse.ArgumentParser(description="Check pipeline health")
parser.add_argument(
"--format",
choices=["text", "json"],
default="text",
help="Output format",
)
parser.add_argument(
"--threshold-days",
type=int,
default=7,
help="Max age in days for motion freshness check",
)
parser.add_argument(
"--min-embed-coverage",
type=float,
default=0.95,
help="Minimum embedding coverage ratio",
)
parser.add_argument(
"--max-missing-llm",
type=float,
default=0.15,
help="Maximum missing layman explanation ratio",
)
args = parser.parse_args()
try:
conn = duckdb.connect(config.DATABASE_PATH)
except Exception as e:
print(f"CRITICAL: Could not connect to database: {e}", file=sys.stderr)
return 2
checks = [
check_motion_freshness(conn, max_age_days=args.threshold_days),
check_embedding_coverage(conn, min_coverage=args.min_embed_coverage),
check_llm_coverage(conn, max_missing_ratio=args.max_missing_llm),
]
report = run_checks(checks)
if args.format == "json":
output = {
"status": report.status.value,
"exit_code": report.exit_code,
"checks": [
{
"name": c.name,
"status": c.status.value,
"message": c.message,
"details": c.details,
}
for c in report.checks
],
}
print(json.dumps(output, indent=2))
else:
status_emoji = {"ok": "", "warning": "", "critical": ""}
print(f"Health Report: {status_emoji.get(report.status.value, '?')} {report.status.value.upper()}")
print("-" * 50)
for check in report.checks:
emoji = status_emoji.get(check.status.value, "?")
print(f"{emoji} {check.name}: {check.message}")
if check.details:
for key, value in check.details.items():
print(f" {key}: {value}")
try:
conn.close()
except Exception:
pass
return report.exit_code
if __name__ == "__main__":
sys.exit(main())

@ -0,0 +1,69 @@
import sys
from unittest.mock import MagicMock, patch
import pytest
from scripts.health_check import main
class TestHealthCheckCLI:
@patch("scripts.health_check.duckdb.connect")
@patch("scripts.health_check.config")
def test_all_ok_exits_0(self, mock_config, mock_connect):
mock_config.DATABASE_PATH = "/fake/db"
mock_conn = MagicMock()
mock_conn.execute.return_value.fetchone.side_effect = [
[150], # motion count
[100], # total motions
[100], # embeddings count
[100], # total motions
[0], # missing explanations
]
mock_connect.return_value = mock_conn
with patch.object(sys, "argv", ["health_check"]):
exit_code = main()
assert exit_code == 0
@patch("scripts.health_check.duckdb.connect")
@patch("scripts.health_check.config")
def test_critical_exits_2(self, mock_config, mock_connect):
mock_config.DATABASE_PATH = "/fake/db"
mock_conn = MagicMock()
mock_conn.execute.return_value.fetchone.side_effect = [
[0], # no recent motions
[100], # total motions
[100], # embeddings count
[100], # total motions
[0], # missing explanations
]
mock_connect.return_value = mock_conn
with patch.object(sys, "argv", ["health_check"]):
exit_code = main()
assert exit_code == 2
@patch("scripts.health_check.duckdb.connect")
@patch("scripts.health_check.config")
def test_json_format(self, mock_config, mock_connect, capsys):
mock_config.DATABASE_PATH = "/fake/db"
mock_conn = MagicMock()
mock_conn.execute.return_value.fetchone.side_effect = [
[150], [100], [100], [100], [0]
]
mock_connect.return_value = mock_conn
with patch.object(sys, "argv", ["health_check", "--format", "json"]):
main()
captured = capsys.readouterr()
assert '"status": "ok"' in captured.out
assert '"exit_code": 0' in captured.out
@patch("scripts.health_check.duckdb.connect")
def test_db_connect_failure_exits_2(self, mock_connect):
mock_connect.side_effect = Exception("cannot open")
with patch.object(sys, "argv", ["health_check"]):
exit_code = main()
assert exit_code == 2

@ -0,0 +1,92 @@
from datetime import datetime, timedelta
from unittest.mock import MagicMock
import pytest
from health import HealthStatus
from health.checks import check_embedding_coverage, check_llm_coverage, check_motion_freshness
class TestCheckMotionFreshness:
def test_recent_motions_ok(self):
conn = MagicMock()
conn.execute.return_value.fetchone.return_value = [150]
result = check_motion_freshness(conn, max_age_days=7, min_motions=100)
assert result.status == HealthStatus.OK
assert result.details["count"] == 150
def test_no_motions_critical(self):
conn = MagicMock()
conn.execute.return_value.fetchone.return_value = [0]
result = check_motion_freshness(conn, max_age_days=7, min_motions=100)
assert result.status == HealthStatus.CRITICAL
def test_low_count_warning(self):
conn = MagicMock()
conn.execute.return_value.fetchone.return_value = [50]
result = check_motion_freshness(conn, max_age_days=7, min_motions=100)
assert result.status == HealthStatus.WARNING
def test_query_error_critical(self):
conn = MagicMock()
conn.execute.side_effect = Exception("db down")
result = check_motion_freshness(conn)
assert result.status == HealthStatus.CRITICAL
assert "db down" in result.message
class TestCheckEmbeddingCoverage:
def test_full_coverage_ok(self):
conn = MagicMock()
conn.execute.return_value.fetchone.side_effect = [[100], [100]]
result = check_embedding_coverage(conn, min_coverage=0.95)
assert result.status == HealthStatus.OK
assert result.details["coverage"] == 1.0
def test_low_coverage_warning(self):
conn = MagicMock()
conn.execute.return_value.fetchone.side_effect = [[100], [80]]
result = check_embedding_coverage(conn, min_coverage=0.95)
assert result.status == HealthStatus.WARNING
assert result.details["coverage"] == 0.8
def test_empty_db_critical(self):
conn = MagicMock()
conn.execute.return_value.fetchone.side_effect = [[0], [0]]
result = check_embedding_coverage(conn)
assert result.status == HealthStatus.CRITICAL
def test_query_error_critical(self):
conn = MagicMock()
conn.execute.side_effect = Exception("db down")
result = check_embedding_coverage(conn)
assert result.status == HealthStatus.CRITICAL
class TestCheckLLMCoverage:
def test_full_coverage_ok(self):
conn = MagicMock()
conn.execute.return_value.fetchone.side_effect = [[100], [0]]
result = check_llm_coverage(conn)
assert result.status == HealthStatus.OK
assert result.details["missing_ratio"] == 0.0
def test_some_missing_warning(self):
conn = MagicMock()
conn.execute.return_value.fetchone.side_effect = [[100], [10]]
result = check_llm_coverage(conn)
assert result.status == HealthStatus.WARNING
assert result.details["missing_ratio"] == 0.1
def test_too_many_missing_critical(self):
conn = MagicMock()
conn.execute.return_value.fetchone.side_effect = [[100], [20]]
result = check_llm_coverage(conn, max_missing_ratio=0.15)
assert result.status == HealthStatus.CRITICAL
assert result.details["missing_ratio"] == 0.2
def test_query_error_critical(self):
conn = MagicMock()
conn.execute.side_effect = Exception("db down")
result = check_llm_coverage(conn)
assert result.status == HealthStatus.CRITICAL

@ -0,0 +1,54 @@
import pytest
from health import HealthCheck, HealthReport, HealthStatus, run_checks
class TestHealthStatus:
def test_ok_less_than_warning(self):
assert HealthStatus.OK.value == "ok"
def test_warning_less_than_critical(self):
assert HealthStatus.WARNING.value == "warning"
class TestHealthReport:
def test_all_ok_returns_ok(self):
checks = [
HealthCheck("a", HealthStatus.OK, "fine", {}),
HealthCheck("b", HealthStatus.OK, "fine", {}),
]
report = run_checks(checks)
assert report.status == HealthStatus.OK
assert report.exit_code == 0
def test_one_warning_returns_warning(self):
checks = [
HealthCheck("a", HealthStatus.OK, "fine", {}),
HealthCheck("b", HealthStatus.WARNING, "hmm", {}),
]
report = run_checks(checks)
assert report.status == HealthStatus.WARNING
assert report.exit_code == 1
def test_one_critical_returns_critical(self):
checks = [
HealthCheck("a", HealthStatus.OK, "fine", {}),
HealthCheck("b", HealthStatus.CRITICAL, "bad", {}),
]
report = run_checks(checks)
assert report.status == HealthStatus.CRITICAL
assert report.exit_code == 2
def test_critical_trumps_warning(self):
checks = [
HealthCheck("a", HealthStatus.WARNING, "hmm", {}),
HealthCheck("b", HealthStatus.CRITICAL, "bad", {}),
]
report = run_checks(checks)
assert report.status == HealthStatus.CRITICAL
assert report.exit_code == 2
def test_empty_checks_returns_ok(self):
report = run_checks([])
assert report.status == HealthStatus.OK
assert report.exit_code == 0
Loading…
Cancel
Save