Controller/tsgrain_controller/task_queue.py
2022-02-22 00:15:16 +01:00

110 lines
3.7 KiB
Python

# coding=utf-8
import logging
from typing import List, Optional
from tsgrain_controller import models, util
class TaskHolder:
def get_current_task(self) -> Optional[models.Task]:
pass
class TaskQueue(util.StoppableThread, TaskHolder):
def __init__(self, app: models.AppInterface):
super().__init__(0.1)
self.app = app
self.tasks: List[models.Task] = []
self.running_task: Optional[models.Task] = None
def enqueue(self, task: models.Task, queuing: bool = True) -> bool:
"""
Fügt der Warteschlange einen neuen Task hinzu. Die Warteschlange
kann nicht mehrere Tasks der selben Quelle und Zone aufnehmen.
Kann ein Task nicht aufgenommen werden, wird False zurückgegeben.
:param task: Neuer Task
:param queuing: Füge Task der Warteschlange hinzu, wenn bereits
ein anderer Task läuft
:return: True wenn Task erfolgreich hinzugefügt
"""
if not queuing and self.running_task is not None:
return False
# Abbrechen, wenn bereits ein Task mit gleicher Quelle und Zone existiert
for t in self.tasks:
if t.source == task.source and t.zone_id == task.zone_id:
return False
self.tasks.append(task)
logging.info('Task added to queue (%s)', task)
self.app.notify_queue_update()
return True
def get_current_task(self) -> Optional[models.Task]:
"""
Gib den aktuell laufenden Task zurück.
:return: aktuell laufender Task
"""
return self.running_task
def cancel_task(self, task: models.Task):
self.tasks.remove(task)
logging.info('Task cancelled (%s)', task)
if self.running_task == task:
self.running_task = None
self.app.notify_queue_update()
def cancel_current_task(self):
"""
Bricht den aktuell laufenden Task ab
(z.B. bei manuellem Stopp mittels Taster)
"""
if self.running_task is not None:
self.tasks.remove(self.running_task)
logging.info('Running task cancelled (%s)', self.running_task)
self.running_task = None
self.app.notify_queue_update()
def serialize(self) -> List[models.Task]:
"""Task zur Speicherung in der Datenbank in dict umwandeln."""
return self.tasks
def run_cycle(self):
# Get a new task if none is running
if self.running_task is None:
for task in self.tasks:
# Only start scheduled tasks if automatic mode is enabled
if task.source == models.Source.SCHEDULE and not self.app.get_auto_mode(
):
continue
self.running_task = task
self.running_task.start()
logging.info('Queued task started (%s)', self.running_task)
self.app.notify_queue_update()
break
# Check currently running task
if self.running_task is not None:
# Stop scheduled tasks if auto mode is disabled
if self.running_task.source == models.Source.SCHEDULE and not self.app.get_auto_mode(
):
self.running_task.stop()
logging.info('Running task stopped (%s)', self.running_task)
self.running_task = None
self.app.notify_queue_update()
elif self.running_task.is_done:
self.tasks.remove(self.running_task)
logging.info('Running task done (%s)', self.running_task)
self.running_task = None
self.app.notify_queue_update()
def cleanup(self):
if self.running_task:
self.running_task.stop()
self.running_task = None