# coding=utf-8 import logging import os.path from datetime import datetime from typing import List, Optional from tsgrain_controller import config, database, jobschedule, output, \ task_queue, \ models, grpc_server, systimecfg from tsgrain_controller.io import io_factory class Application(models.AppInterface): def __init__(self, io_type: io_factory.IoType, workdir: str = '', cfg_path: Optional[str] = None): if cfg_path is None: cfg_path = os.path.join(workdir, 'tsgrain.toml') self.logger = logging.getLogger() self.logger.setLevel(logging.INFO) self.cfg = config.Config(cfg_path) self.cfg.load_file() if self.cfg.log_debug: self.logger.setLevel(logging.DEBUG) if self.cfg.db_path: db_path = self.cfg.db_path else: db_path = os.path.join(workdir, 'raindb.json') self.db = database.RainDB(db_path) self.queue = task_queue.TaskQueue(self) self.db.load_queue(self.queue) self.io = io_factory.new_io(self, io_type) self.io.set_callbacks(self._cb_manual, self._cb_modekey) self.outputs = output.Outputs(self.io, self.queue, self) self.scheduler = jobschedule.Scheduler(self) self.grpc_tsgrain_servicer = grpc_server.TSGRainServicer(self) self.grpc_server = grpc_server.new_server(self.grpc_tsgrain_servicer, self.cfg.grpc_port) self._auto_en = self.db.get_auto_mode() self._running = False def get_auto_mode(self) -> bool: return self._auto_en def set_auto_mode(self, state: bool): if state != self._auto_en: self._auto_en = state self.grpc_tsgrain_servicer.tasklist_update.set() if self._auto_en: logging.info('Auto mode ON') else: logging.info('Auto mode OFF') def get_cfg(self) -> config.Config: return self.cfg def get_logger(self) -> logging.Logger: return self.logger def _cb_manual(self, zone_id: int): self.request_task( models.TaskRequest(source=models.Source.MANUAL, zone_id=zone_id, duration=self.cfg.manual_time, queuing=False, cancelling=True)) def _cb_modekey(self): self.set_auto_mode(not self.get_auto_mode()) def request_task(self, request: models.TaskRequest) -> models.TaskRequestResult: if request.queuing: if request.cancelling: # If a task from this zone is in queue, cancel it for task in self.queue.tasks: if task.zone_id == request.zone_id and task.source == request.source: self.queue.cancel_task(task) return models.TaskRequestResult(False, True) elif request.cancelling: current_task = self.queue.get_current_task() # Cancel manually started tasks if current_task is not None \ and current_task.zone_id == request.zone_id \ and current_task.source == request.source: self.queue.cancel_current_task() return models.TaskRequestResult(False, True) duration = request.duration if duration < 1: duration = self.cfg.manual_time task = models.Task(request.source, request.zone_id, duration) task.validate(self) started = self.queue.enqueue(task, not request.queuing) return models.TaskRequestResult(started, False) def start_task(self, source: models.Source, zone_id: int, duration: int, queuing: bool) -> bool: if duration < 1: duration = self.cfg.manual_time task = models.Task(source, zone_id, duration) task.validate(self) return self.queue.enqueue(task, not queuing) def stop_task(self, source: models.Source, zone_id: int) -> bool: for task in self.queue.tasks: if task.zone_id == zone_id and task.source == source: self.queue.cancel_task(task) return True return False def get_tasks(self) -> List[models.Task]: return self.queue.tasks def create_job(self, job: models.Job) -> int: job.validate(self) return self.db.insert_job(job) def get_job(self, job_id: int) -> models.Job: return self.db.get_job(job_id) def get_jobs(self) -> List[models.Job]: return self.db.get_jobs() def update_job(self, job: models.Job): job.validate(self) self.db.update_job(job) def delete_job(self, job_id): self.db.delete_job(job_id) def enable_job(self, job_id: int): job = self.db.get_job(job_id) job.enable = True self.db.update_job(job) def disable_job(self, job_id: int): job = self.db.get_job(job_id) job.enable = False self.db.update_job(job) def notify_queue_update(self): self.grpc_tsgrain_servicer.tasklist_update.set() def is_running(self) -> bool: return self._running def get_system_timezone(self) -> str: return systimecfg.get_system_timezone(self.cfg.cmd_get_timezone) def set_system_datetime(self, date_time: datetime): systimecfg.set_system_datetime(date_time, self.cfg.cmd_set_datetime) def set_system_timezone(self, tz: str): systimecfg.set_system_timezone(tz, self.cfg.cmd_set_timezone) def start(self): self._running = True self.io.start() self.outputs.start() self.queue.start() self.scheduler.start() self.grpc_server.start() def stop(self): self._running = False self.grpc_server.stop(None) self.scheduler.stop() self.queue.stop() self.outputs.stop() self.io.stop() # Store application state self.db.store_queue(self.queue) self.db.set_auto_mode(self._auto_en) # Save config self.cfg.save_file()