110 lines
3.7 KiB
Python
110 lines
3.7 KiB
Python
# coding=utf-8
|
|
import logging
|
|
from typing import List, Optional
|
|
|
|
from tsgrain_controller import models, util
|
|
|
|
|
|
class TaskHolder:
|
|
|
|
def get_current_task(self) -> Optional[models.Task]:
|
|
pass
|
|
|
|
|
|
class TaskQueue(util.StoppableThread, TaskHolder):
|
|
|
|
def __init__(self, app: models.AppInterface):
|
|
super().__init__(0.1)
|
|
self.app = app
|
|
|
|
self.tasks: List[models.Task] = []
|
|
self.running_task: Optional[models.Task] = None
|
|
|
|
def enqueue(self, task: models.Task, queuing: bool = True) -> bool:
|
|
"""
|
|
Fügt der Warteschlange einen neuen Task hinzu. Die Warteschlange
|
|
kann nicht mehrere Tasks der selben Quelle und Zone aufnehmen.
|
|
Kann ein Task nicht aufgenommen werden, wird False zurückgegeben.
|
|
|
|
:param task: Neuer Task
|
|
:param queuing: Füge Task der Warteschlange hinzu, wenn bereits
|
|
ein anderer Task läuft
|
|
:return: True wenn Task erfolgreich hinzugefügt
|
|
"""
|
|
if not queuing and self.running_task is not None:
|
|
return False
|
|
|
|
# Abbrechen, wenn bereits ein Task mit gleicher Quelle und Zone existiert
|
|
for t in self.tasks:
|
|
if t.source == task.source and t.zone_id == task.zone_id:
|
|
return False
|
|
|
|
self.tasks.append(task)
|
|
logging.info('Task added to queue (%s)', task)
|
|
self.app.notify_queue_update()
|
|
return True
|
|
|
|
def get_current_task(self) -> Optional[models.Task]:
|
|
"""
|
|
Gib den aktuell laufenden Task zurück.
|
|
:return: aktuell laufender Task
|
|
"""
|
|
return self.running_task
|
|
|
|
def cancel_task(self, task: models.Task):
|
|
self.tasks.remove(task)
|
|
logging.info('Task cancelled (%s)', task)
|
|
if self.running_task == task:
|
|
self.running_task = None
|
|
|
|
self.app.notify_queue_update()
|
|
|
|
def cancel_current_task(self):
|
|
"""
|
|
Bricht den aktuell laufenden Task ab
|
|
(z.B. bei manuellem Stopp mittels Taster)
|
|
"""
|
|
if self.running_task is not None:
|
|
self.tasks.remove(self.running_task)
|
|
logging.info('Running task cancelled (%s)', self.running_task)
|
|
self.running_task = None
|
|
self.app.notify_queue_update()
|
|
|
|
def serialize(self) -> List[models.Task]:
|
|
"""Task zur Speicherung in der Datenbank in dict umwandeln."""
|
|
return self.tasks
|
|
|
|
def run_cycle(self):
|
|
# Get a new task if none is running
|
|
if self.running_task is None:
|
|
for task in self.tasks:
|
|
# Only start scheduled tasks if automatic mode is enabled
|
|
if task.source == models.Source.SCHEDULE and not self.app.get_auto_mode(
|
|
):
|
|
continue
|
|
|
|
self.running_task = task
|
|
self.running_task.start()
|
|
logging.info('Queued task started (%s)', self.running_task)
|
|
self.app.notify_queue_update()
|
|
break
|
|
|
|
# Check currently running task
|
|
if self.running_task is not None:
|
|
# Stop scheduled tasks if auto mode is disabled
|
|
if self.running_task.source == models.Source.SCHEDULE and not self.app.get_auto_mode(
|
|
):
|
|
self.running_task.stop()
|
|
logging.info('Running task stopped (%s)', self.running_task)
|
|
self.running_task = None
|
|
self.app.notify_queue_update()
|
|
elif self.running_task.is_done:
|
|
self.tasks.remove(self.running_task)
|
|
logging.info('Running task done (%s)', self.running_task)
|
|
self.running_task = None
|
|
self.app.notify_queue_update()
|
|
|
|
def cleanup(self):
|
|
if self.running_task:
|
|
self.running_task.stop()
|
|
self.running_task = None
|