You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
motief/scripts/health_check.py

98 lines
2.6 KiB

#!/usr/bin/env python3
"""Pipeline health check CLI runner.
Exit codes:
0 = all healthy
1 = any warning
2 = any critical
"""
import argparse
import json
import sys
import duckdb
from config import config
from health import run_checks
from health.checks import check_embedding_coverage, check_llm_coverage, check_motion_freshness
def main() -> int:
parser = argparse.ArgumentParser(description="Check pipeline health")
parser.add_argument(
"--format",
choices=["text", "json"],
default="text",
help="Output format",
)
parser.add_argument(
"--threshold-days",
type=int,
default=7,
help="Max age in days for motion freshness check",
)
parser.add_argument(
"--min-embed-coverage",
type=float,
default=0.95,
help="Minimum embedding coverage ratio",
)
parser.add_argument(
"--max-missing-llm",
type=float,
default=0.15,
help="Maximum missing layman explanation ratio",
)
args = parser.parse_args()
try:
conn = duckdb.connect(config.DATABASE_PATH)
except Exception as e:
print(f"CRITICAL: Could not connect to database: {e}", file=sys.stderr)
return 2
checks = [
check_motion_freshness(conn, max_age_days=args.threshold_days),
check_embedding_coverage(conn, min_coverage=args.min_embed_coverage),
check_llm_coverage(conn, max_missing_ratio=args.max_missing_llm),
]
report = run_checks(checks)
if args.format == "json":
output = {
"status": report.status.value,
"exit_code": report.exit_code,
"checks": [
{
"name": c.name,
"status": c.status.value,
"message": c.message,
"details": c.details,
}
for c in report.checks
],
}
print(json.dumps(output, indent=2))
else:
status_emoji = {"ok": "", "warning": "", "critical": ""}
print(f"Health Report: {status_emoji.get(report.status.value, '?')} {report.status.value.upper()}")
print("-" * 50)
for check in report.checks:
emoji = status_emoji.get(check.status.value, "?")
print(f"{emoji} {check.name}: {check.message}")
if check.details:
for key, value in check.details.items():
print(f" {key}: {value}")
try:
conn.close()
except Exception:
pass
return report.exit_code
if __name__ == "__main__":
sys.exit(main())