You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
169 lines
5.0 KiB
169 lines
5.0 KiB
"""Automated pipeline scheduling.
|
|
|
|
Runs the parliamentary embedding pipeline and motion summarization
|
|
on a configurable schedule using the `schedule` library.
|
|
|
|
Usage:
|
|
uv run python scheduler.py # start scheduler loop
|
|
uv run python scheduler.py --once # run once and exit
|
|
uv run python scheduler.py --pipeline-time 03:00 --summarizer-every 6
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import logging
|
|
import signal
|
|
import sys
|
|
import time
|
|
from typing import Callable
|
|
|
|
import schedule
|
|
|
|
from config import config
|
|
|
|
from pipeline.run_pipeline import run as run_pipeline
|
|
from summarizer import summarizer
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
class PipelineScheduler:
|
|
"""Schedules and runs pipeline jobs."""
|
|
|
|
def __init__(self, db_path: str = "data/motions.db"):
|
|
self.db_path = db_path
|
|
self._running = False
|
|
|
|
def run_pipeline(self) -> int:
|
|
"""Run the full embedding pipeline.
|
|
|
|
Returns the exit code from the pipeline run.
|
|
"""
|
|
_logger.info("Starting scheduled pipeline run")
|
|
try:
|
|
args = argparse.Namespace(
|
|
db_path=self.db_path,
|
|
window_size="annual",
|
|
start_date=None,
|
|
end_date=None,
|
|
svd_k=50,
|
|
svd_workers=None,
|
|
text_model=None,
|
|
text_batch_size=200,
|
|
skip_metadata=False,
|
|
skip_extract=False,
|
|
skip_svd=False,
|
|
skip_text=False,
|
|
skip_fusion=False,
|
|
dry_run=False,
|
|
)
|
|
result = run_pipeline(args)
|
|
_logger.info("Pipeline run completed with code %s", result)
|
|
return result if isinstance(result, int) else 0
|
|
except Exception:
|
|
_logger.exception("Pipeline run failed")
|
|
return 1
|
|
|
|
def run_summarizer(self) -> None:
|
|
"""Run motion summarization for missing explanations."""
|
|
_logger.info("Starting scheduled summarizer run")
|
|
try:
|
|
summarizer.update_motion_summaries()
|
|
_logger.info("Summarizer run completed")
|
|
except Exception:
|
|
_logger.exception("Summarizer run failed")
|
|
|
|
def schedule_daily(self, time_str: str = "02:00") -> None:
|
|
"""Schedule the pipeline to run daily at *time_str*."""
|
|
_logger.info("Scheduling daily pipeline run at %s", time_str)
|
|
schedule.every().day.at(time_str).do(self.run_pipeline)
|
|
|
|
def schedule_summarizer(self, every_n_hours: int = 6) -> None:
|
|
"""Schedule the summarizer to run every *every_n_hours* hours."""
|
|
_logger.info("Scheduling summarizer every %s hours", every_n_hours)
|
|
schedule.every(every_n_hours).hours.do(self.run_summarizer)
|
|
|
|
def _signal_handler(self, signum, frame) -> None:
|
|
"""Handle shutdown signals gracefully."""
|
|
_logger.info("Received signal %s, shutting down", signum)
|
|
self.stop()
|
|
|
|
def start(self) -> None:
|
|
"""Start the scheduler loop.
|
|
|
|
Blocks until :meth:`stop` is called or a signal is received.
|
|
"""
|
|
self._running = True
|
|
|
|
# Register signal handlers for graceful shutdown
|
|
signal.signal(signal.SIGTERM, self._signal_handler)
|
|
signal.signal(signal.SIGINT, self._signal_handler)
|
|
|
|
_logger.info("Scheduler started")
|
|
while self._running:
|
|
schedule.run_pending()
|
|
time.sleep(1)
|
|
_logger.info("Scheduler stopped")
|
|
|
|
def stop(self) -> None:
|
|
"""Stop the scheduler loop."""
|
|
self._running = False
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
description="Automated pipeline scheduler",
|
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
|
)
|
|
parser.add_argument(
|
|
"--db-path",
|
|
default="data/motions.db",
|
|
help="Path to the DuckDB file",
|
|
)
|
|
parser.add_argument(
|
|
"--pipeline-time",
|
|
default="02:00",
|
|
help="Daily pipeline run time (HH:MM)",
|
|
)
|
|
parser.add_argument(
|
|
"--summarizer-every",
|
|
type=int,
|
|
default=6,
|
|
help="Run summarizer every N hours",
|
|
)
|
|
parser.add_argument(
|
|
"--once",
|
|
action="store_true",
|
|
help="Run pipeline + summarizer once and exit (no scheduling loop)",
|
|
)
|
|
return parser
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
parser = build_parser()
|
|
args = parser.parse_args(argv)
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(name)s %(message)s",
|
|
)
|
|
|
|
sched = PipelineScheduler(db_path=args.db_path)
|
|
|
|
if args.once:
|
|
_logger.info("Running in single-shot mode")
|
|
pipeline_rc = sched.run_pipeline()
|
|
sched.run_summarizer()
|
|
return pipeline_rc
|
|
|
|
sched.schedule_daily(args.pipeline_time)
|
|
if args.summarizer_every > 0:
|
|
sched.schedule_summarizer(args.summarizer_every)
|
|
|
|
sched.start()
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|
|
|