Controller/tsgrain_controller/application.py

194 lines
6.1 KiB
Python

# coding=utf-8
import logging
import os.path
from datetime import datetime
from typing import List, Optional
from tsgrain_controller import config, database, jobschedule, output, \
task_queue, \
models, grpc_server, systimecfg
from tsgrain_controller.io import io_factory
class Application(models.AppInterface):
def __init__(self,
io_type: io_factory.IoType,
workdir: str = '',
cfg_path: Optional[str] = None):
if cfg_path is None:
cfg_path = os.path.join(workdir, 'tsgrain.toml')
self.logger = logging.getLogger()
self.logger.setLevel(logging.INFO)
self.cfg = config.Config(cfg_path)
self.cfg.load_file()
if self.cfg.log_debug:
self.logger.setLevel(logging.DEBUG)
if self.cfg.db_path:
db_path = self.cfg.db_path
else:
db_path = os.path.join(workdir, 'raindb.json')
self.db = database.RainDB(db_path)
self.queue = task_queue.TaskQueue(self)
self.db.load_queue(self.queue)
self.io = io_factory.new_io(self, io_type)
self.io.set_callbacks(self._cb_manual, self._cb_modekey)
self.outputs = output.Outputs(self.io, self.queue, self)
self.scheduler = jobschedule.Scheduler(self)
self.grpc_tsgrain_servicer = grpc_server.TSGRainServicer(self)
self.grpc_server = grpc_server.new_server(self.grpc_tsgrain_servicer,
self.cfg.grpc_port)
self._auto_en = self.db.get_auto_mode()
self._running = False
def get_auto_mode(self) -> bool:
return self._auto_en
def set_auto_mode(self, state: bool):
if state != self._auto_en:
self._auto_en = state
self.grpc_tsgrain_servicer.tasklist_update.set()
if self._auto_en:
logging.info('Auto mode ON')
else:
logging.info('Auto mode OFF')
def get_cfg(self) -> config.Config:
return self.cfg
def get_logger(self) -> logging.Logger:
return self.logger
def _cb_manual(self, zone_id: int):
self.request_task(
models.TaskRequest(source=models.Source.MANUAL,
zone_id=zone_id,
duration=self.cfg.manual_time,
queuing=False,
cancelling=True))
def _cb_modekey(self):
self.set_auto_mode(not self.get_auto_mode())
def request_task(self,
request: models.TaskRequest) -> models.TaskRequestResult:
if request.queuing:
if request.cancelling:
# If a task from this zone is in queue, cancel it
for task in self.queue.tasks:
if task.zone_id == request.zone_id and task.source == request.source:
self.queue.cancel_task(task)
return models.TaskRequestResult(False, True)
elif request.cancelling:
current_task = self.queue.get_current_task()
# Cancel manually started tasks
if current_task is not None \
and current_task.zone_id == request.zone_id \
and current_task.source == request.source:
self.queue.cancel_current_task()
return models.TaskRequestResult(False, True)
duration = request.duration
if duration < 1:
duration = self.cfg.manual_time
task = models.Task(request.source, request.zone_id, duration)
task.validate(self)
started = self.queue.enqueue(task, not request.queuing)
return models.TaskRequestResult(started, False)
def start_task(self, source: models.Source, zone_id: int, duration: int,
queuing: bool) -> bool:
if duration < 1:
duration = self.cfg.manual_time
task = models.Task(source, zone_id, duration)
task.validate(self)
return self.queue.enqueue(task, not queuing)
def stop_task(self, source: models.Source, zone_id: int) -> bool:
for task in self.queue.tasks:
if task.zone_id == zone_id and task.source == source:
self.queue.cancel_task(task)
return True
return False
def get_tasks(self) -> List[models.Task]:
return self.queue.tasks
def create_job(self, job: models.Job) -> int:
job.validate(self)
return self.db.insert_job(job)
def get_job(self, job_id: int) -> models.Job:
return self.db.get_job(job_id)
def get_jobs(self) -> List[models.Job]:
return self.db.get_jobs()
def update_job(self, job: models.Job):
job.validate(self)
self.db.update_job(job)
def delete_job(self, job_id):
self.db.delete_job(job_id)
def enable_job(self, job_id: int):
job = self.db.get_job(job_id)
job.enable = True
self.db.update_job(job)
def disable_job(self, job_id: int):
job = self.db.get_job(job_id)
job.enable = False
self.db.update_job(job)
def notify_queue_update(self):
self.grpc_tsgrain_servicer.tasklist_update.set()
def is_running(self) -> bool:
return self._running
def get_system_timezone(self) -> str:
return systimecfg.get_system_timezone(self.cfg.cmd_get_timezone)
def set_system_datetime(self, date_time: datetime):
systimecfg.set_system_datetime(date_time, self.cfg.cmd_set_datetime)
def set_system_timezone(self, tz: str):
systimecfg.set_system_timezone(tz, self.cfg.cmd_set_timezone)
def start(self):
self._running = True
self.io.start()
self.outputs.start()
self.queue.start()
self.scheduler.start()
self.grpc_server.start()
def stop(self):
self._running = False
self.grpc_server.stop(None)
self.scheduler.stop()
self.queue.stop()
self.outputs.stop()
self.io.stop()
# Store application state
self.db.store_queue(self.queue)
self.db.set_auto_mode(self._auto_en)
# Save config
self.cfg.save_file()