# coding=utf-8 import logging from typing import List, Optional from tsgrain_controller import models, util class TaskHolder: def get_current_task(self) -> Optional[models.Task]: pass class TaskQueue(util.StoppableThread, TaskHolder): def __init__(self, app: models.AppInterface): super().__init__(0.1) self.app = app self.tasks: List[models.Task] = [] self.running_task: Optional[models.Task] = None def enqueue(self, task: models.Task, queuing: bool = True) -> bool: """ Fügt der Warteschlange einen neuen Task hinzu. Die Warteschlange kann nicht mehrere Tasks der selben Quelle und Zone aufnehmen. Kann ein Task nicht aufgenommen werden, wird False zurückgegeben. :param task: Neuer Task :param queuing: Füge Task der Warteschlange hinzu, wenn bereits ein anderer Task läuft :return: True wenn Task erfolgreich hinzugefügt """ if not queuing and self.running_task is not None: return False # Abbrechen, wenn bereits ein Task mit gleicher Quelle und Zone existiert for t in self.tasks: if t.source == task.source and t.zone_id == task.zone_id: return False self.tasks.append(task) logging.info('Task added to queue (%s)', task) self.app.notify_queue_update() return True def get_current_task(self) -> Optional[models.Task]: """ Gib den aktuell laufenden Task zurück. :return: aktuell laufender Task """ return self.running_task def cancel_task(self, task: models.Task): self.tasks.remove(task) logging.info('Task cancelled (%s)', task) if self.running_task == task: self.running_task = None self.app.notify_queue_update() def cancel_current_task(self): """ Bricht den aktuell laufenden Task ab (z.B. bei manuellem Stopp mittels Taster) """ if self.running_task is not None: self.tasks.remove(self.running_task) logging.info('Running task cancelled (%s)', self.running_task) self.running_task = None self.app.notify_queue_update() def serialize(self) -> List[models.Task]: """Task zur Speicherung in der Datenbank in dict umwandeln.""" return self.tasks def run_cycle(self): # Get a new task if none is running if self.running_task is None: for task in self.tasks: # Only start scheduled tasks if automatic mode is enabled if task.source == models.Source.SCHEDULE and not self.app.get_auto_mode( ): continue self.running_task = task self.running_task.start() logging.info('Queued task started (%s)', self.running_task) self.app.notify_queue_update() break # Check currently running task if self.running_task is not None: # Stop scheduled tasks if auto mode is disabled if self.running_task.source == models.Source.SCHEDULE and not self.app.get_auto_mode( ): self.running_task.stop() logging.info('Running task stopped (%s)', self.running_task) self.running_task = None self.app.notify_queue_update() elif self.running_task.is_done: self.tasks.remove(self.running_task) logging.info('Running task done (%s)', self.running_task) self.running_task = None self.app.notify_queue_update() def cleanup(self): if self.running_task: self.running_task.stop() self.running_task = None