"""Automated pipeline scheduling. Runs the parliamentary embedding pipeline and motion summarization on a configurable schedule using the `schedule` library. Usage: uv run python scheduler.py # start scheduler loop uv run python scheduler.py --once # run once and exit uv run python scheduler.py --pipeline-time 03:00 --summarizer-every 6 """ from __future__ import annotations import argparse import logging import signal import sys import time from typing import Callable import schedule from config import config from pipeline.run_pipeline import run as run_pipeline from summarizer import summarizer _logger = logging.getLogger(__name__) class PipelineScheduler: """Schedules and runs pipeline jobs.""" def __init__(self, db_path: str = "data/motions.db"): self.db_path = db_path self._running = False def run_pipeline(self) -> int: """Run the full embedding pipeline. Returns the exit code from the pipeline run. """ _logger.info("Starting scheduled pipeline run") try: args = argparse.Namespace( db_path=self.db_path, window_size="annual", start_date=None, end_date=None, svd_k=50, svd_workers=None, text_model=None, text_batch_size=200, skip_metadata=False, skip_extract=False, skip_svd=False, skip_text=False, skip_fusion=False, dry_run=False, ) result = run_pipeline(args) _logger.info("Pipeline run completed with code %s", result) return result if isinstance(result, int) else 0 except Exception: _logger.exception("Pipeline run failed") return 1 def run_summarizer(self) -> None: """Run motion summarization for missing explanations.""" _logger.info("Starting scheduled summarizer run") try: summarizer.update_motion_summaries() _logger.info("Summarizer run completed") except Exception: _logger.exception("Summarizer run failed") def schedule_daily(self, time_str: str = "02:00") -> None: """Schedule the pipeline to run daily at *time_str*.""" _logger.info("Scheduling daily pipeline run at %s", time_str) schedule.every().day.at(time_str).do(self.run_pipeline) def schedule_summarizer(self, every_n_hours: int = 6) -> None: """Schedule the summarizer to run every *every_n_hours* hours.""" _logger.info("Scheduling summarizer every %s hours", every_n_hours) schedule.every(every_n_hours).hours.do(self.run_summarizer) def _signal_handler(self, signum, frame) -> None: """Handle shutdown signals gracefully.""" _logger.info("Received signal %s, shutting down", signum) self.stop() def start(self) -> None: """Start the scheduler loop. Blocks until :meth:`stop` is called or a signal is received. """ self._running = True # Register signal handlers for graceful shutdown signal.signal(signal.SIGTERM, self._signal_handler) signal.signal(signal.SIGINT, self._signal_handler) _logger.info("Scheduler started") while self._running: schedule.run_pending() time.sleep(1) _logger.info("Scheduler stopped") def stop(self) -> None: """Stop the scheduler loop.""" self._running = False def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Automated pipeline scheduler", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument( "--db-path", default="data/motions.db", help="Path to the DuckDB file", ) parser.add_argument( "--pipeline-time", default="02:00", help="Daily pipeline run time (HH:MM)", ) parser.add_argument( "--summarizer-every", type=int, default=6, help="Run summarizer every N hours", ) parser.add_argument( "--once", action="store_true", help="Run pipeline + summarizer once and exit (no scheduling loop)", ) return parser def main(argv: list[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s", ) sched = PipelineScheduler(db_path=args.db_path) if args.once: _logger.info("Running in single-shot mode") pipeline_rc = sched.run_pipeline() sched.run_summarizer() return pipeline_rc sched.schedule_daily(args.pipeline_time) if args.summarizer_every > 0: sched.schedule_summarizer(args.summarizer_every) sched.start() return 0 if __name__ == "__main__": sys.exit(main())