201 lines
6.4 KiB
Python
201 lines
6.4 KiB
Python
# coding=utf-8
|
|
import logging
|
|
import os.path
|
|
from datetime import datetime
|
|
from typing import List, Optional
|
|
|
|
from tsgrain_controller import config, database, jobschedule, output, \
|
|
task_queue, \
|
|
models, grpc_server, systimecfg
|
|
from tsgrain_controller.io import io_factory
|
|
|
|
|
|
class Application(models.AppInterface):
|
|
|
|
def __init__(self,
|
|
io_type: io_factory.IoType,
|
|
workdir: str = '',
|
|
cfg_path: Optional[str] = None):
|
|
if cfg_path is None:
|
|
cfg_path = os.path.join(workdir, 'tsgrain.toml')
|
|
|
|
self.logger = logging.getLogger()
|
|
self.logger.setLevel(logging.INFO)
|
|
|
|
self.cfg = config.Config(cfg_path)
|
|
self.cfg.load_file()
|
|
|
|
if self.cfg.log_debug:
|
|
self.logger.setLevel(logging.DEBUG)
|
|
|
|
if self.cfg.db_path:
|
|
db_path = self.cfg.db_path
|
|
else:
|
|
db_path = os.path.join(workdir, 'raindb.json')
|
|
|
|
self.db = database.RainDB(db_path)
|
|
|
|
self.queue = task_queue.TaskQueue(self)
|
|
self.db.load_queue(self.queue)
|
|
|
|
self.io = io_factory.new_io(self, io_type)
|
|
self.io.set_callback(self._input_cb)
|
|
|
|
self.outputs = output.Outputs(self.io, self.queue, self)
|
|
|
|
self.scheduler = jobschedule.Scheduler(self)
|
|
|
|
self.grpc_tsgrain_servicer = grpc_server.TSGRainServicer(self)
|
|
self.grpc_server = grpc_server.new_server(self.grpc_tsgrain_servicer,
|
|
self.cfg.grpc_port)
|
|
|
|
self._auto_en = self.db.get_auto_mode()
|
|
self._running = False
|
|
|
|
def get_auto_mode(self) -> bool:
|
|
return self._auto_en
|
|
|
|
def set_auto_mode(self, state: bool):
|
|
if state != self._auto_en:
|
|
self._auto_en = state
|
|
self.grpc_tsgrain_servicer.tasklist_update.set()
|
|
|
|
if self._auto_en:
|
|
logging.info('Auto mode ON')
|
|
else:
|
|
logging.info('Auto mode OFF')
|
|
|
|
def get_cfg(self) -> config.Config:
|
|
return self.cfg
|
|
|
|
def get_logger(self) -> logging.Logger:
|
|
return self.logger
|
|
|
|
def _input_cb(self, key: str):
|
|
if key == 'BT_MODE':
|
|
self.set_auto_mode(not self.get_auto_mode())
|
|
elif key.startswith('BT_Z_'):
|
|
zoneid_str = key[5:]
|
|
try:
|
|
zone_id = int(zoneid_str)
|
|
except ValueError:
|
|
return
|
|
self.request_task(
|
|
models.TaskRequest(source=models.Source.MANUAL,
|
|
zone_id=zone_id,
|
|
duration=self.cfg.manual_time,
|
|
queuing=False,
|
|
cancelling=True))
|
|
|
|
def request_task(self,
|
|
request: models.TaskRequest) -> models.TaskRequestResult:
|
|
if request.queuing:
|
|
if request.cancelling:
|
|
# If a task from this zone is in queue, cancel it
|
|
for task in self.queue.tasks:
|
|
if task.zone_id == request.zone_id and task.source == request.source:
|
|
self.queue.cancel_task(task)
|
|
return models.TaskRequestResult(False, True)
|
|
elif request.cancelling:
|
|
current_task = self.queue.get_current_task()
|
|
# Cancel manually started tasks
|
|
if current_task is not None \
|
|
and current_task.zone_id == request.zone_id \
|
|
and current_task.source == request.source:
|
|
self.queue.cancel_current_task()
|
|
return models.TaskRequestResult(False, True)
|
|
|
|
duration = request.duration
|
|
if duration < 1:
|
|
duration = self.cfg.manual_time
|
|
|
|
task = models.Task(request.source, request.zone_id, duration)
|
|
task.validate(self)
|
|
started = self.queue.enqueue(task, request.queuing)
|
|
return models.TaskRequestResult(started, False)
|
|
|
|
def start_task(self, source: models.Source, zone_id: int, duration: int,
|
|
queuing: bool) -> bool:
|
|
if duration < 1:
|
|
duration = self.cfg.manual_time
|
|
|
|
task = models.Task(source, zone_id, duration)
|
|
task.validate(self)
|
|
return self.queue.enqueue(task, queuing)
|
|
|
|
def stop_task(self, source: models.Source, zone_id: int) -> bool:
|
|
for task in self.queue.tasks:
|
|
if task.zone_id == zone_id and task.source == source:
|
|
self.queue.cancel_task(task)
|
|
return True
|
|
return False
|
|
|
|
def get_tasks(self) -> List[models.Task]:
|
|
return self.queue.tasks
|
|
|
|
def create_job(self, job: models.Job) -> int:
|
|
job.validate(self)
|
|
return self.db.insert_job(job)
|
|
|
|
def get_job(self, job_id: int) -> models.Job:
|
|
return self.db.get_job(job_id)
|
|
|
|
def get_jobs(self) -> List[models.Job]:
|
|
return self.db.get_jobs()
|
|
|
|
def update_job(self, job: models.Job):
|
|
job.validate(self)
|
|
self.db.update_job(job)
|
|
|
|
def delete_job(self, job_id):
|
|
self.db.delete_job(job_id)
|
|
|
|
def enable_job(self, job_id: int):
|
|
job = self.db.get_job(job_id)
|
|
job.enable = True
|
|
self.db.update_job(job)
|
|
|
|
def disable_job(self, job_id: int):
|
|
job = self.db.get_job(job_id)
|
|
job.enable = False
|
|
self.db.update_job(job)
|
|
|
|
def notify_queue_update(self):
|
|
self.grpc_tsgrain_servicer.tasklist_update.set()
|
|
|
|
def is_running(self) -> bool:
|
|
return self._running
|
|
|
|
def get_system_timezone(self) -> str:
|
|
return systimecfg.get_system_timezone(self.cfg.cmd_get_timezone)
|
|
|
|
def set_system_datetime(self, date_time: datetime):
|
|
systimecfg.set_system_datetime(date_time, self.cfg.cmd_set_datetime)
|
|
|
|
def set_system_timezone(self, tz: str):
|
|
systimecfg.set_system_timezone(tz, self.cfg.cmd_set_timezone)
|
|
|
|
def start(self):
|
|
logging.info('Starting application')
|
|
self._running = True
|
|
self.io.start()
|
|
self.outputs.start()
|
|
self.queue.start()
|
|
self.scheduler.start()
|
|
self.grpc_server.start()
|
|
|
|
def stop(self):
|
|
logging.info('Stopping application')
|
|
self._running = False
|
|
self.grpc_server.stop(None)
|
|
self.scheduler.stop()
|
|
self.queue.stop()
|
|
self.outputs.stop()
|
|
self.io.stop()
|
|
|
|
# Store application state
|
|
self.db.store_queue(self.queue)
|
|
self.db.set_auto_mode(self._auto_en)
|
|
|
|
# Save config
|
|
self.cfg.save_file()
|