Controller/tsgrain_controller/jobschedule.py

40 lines
1.2 KiB
Python

# coding=utf-8
from datetime import datetime
from typing import Optional
import schedule
from tsgrain_controller import models, util
class Scheduler(util.StoppableThread):
def __init__(self, app: models.AppInterface):
super().__init__(1)
self.app = app
self._job: Optional[schedule.Job] = None
def _minute_handler(self):
jobs = self.app.get_jobs()
for job in jobs:
if job.check(datetime.now()):
for zone in job.zones:
self.app.request_task(
models.TaskRequest(source=models.Source.SCHEDULE,
zone_id=zone,
duration=job.duration,
queuing=True,
cancelling=False))
def start(self):
self._job = schedule.every().minute.at(':00').do(self._minute_handler)
super().start()
def run_cycle(self):
schedule.run_pending()
def stop(self):
super().stop()
if self._job:
schedule.cancel_job(self._job)
self._job = None