Controller/tests/test_queue.py

109 lines
2.6 KiB
Python

# coding=utf-8
import time
from datetime import datetime
from tests import fixtures
from tsgrain_controller import task_queue, util, models
def test_task():
task = models.Task(models.Source.MANUAL, 1, 10)
assert task.serialize() == {
'source': 'MANUAL',
'zone_id': 1,
'duration': 10,
'remaining': 10
}
assert task.serialize_rpc() == {
'source': 'MANUAL',
'zone_id': 1,
'duration': 10,
'datetime_started': None,
'datetime_finished': None
}
def test_task_started(mocker):
mocker.patch('tsgrain_controller.util.datetime_now',
return_value=datetime(2022, 1, 10, 6, 0))
task = models.Task(models.Source.MANUAL, 1, 10)
task.start()
assert task.serialize() == {
'source': 'MANUAL',
'zone_id': 1,
'duration': 10,
'remaining': 10
}
assert task.serialize_rpc() == {
'source': 'MANUAL',
'zone_id': 1,
'duration': 10,
'datetime_started': datetime(2022, 1, 10, 6, 0),
'datetime_finished': datetime(2022, 1, 10, 6, 0, 10)
}
def test_add_tasks():
app = fixtures.TestingApp()
q = task_queue.TaskQueue(app)
task1 = models.Task(models.Source.MANUAL, 1, 10)
task1b = models.Task(models.Source.SCHEDULE, 1, 5)
task1c = models.Task(models.Source.MANUAL, 1, 5)
assert q.enqueue(task1)
assert q.enqueue(task1b)
assert not q.enqueue(task1c)
def test_queue_runner():
app = fixtures.TestingApp()
q = task_queue.TaskQueue(app)
task1 = models.Task(models.Source.MANUAL, 1, 1)
task2 = models.Task(models.Source.MANUAL, 2, 5)
task3 = models.Task(models.Source.SCHEDULE, 2, 10)
assert q.enqueue(task1)
assert q.enqueue(task2)
assert q.enqueue(task3)
q.start()
time.sleep(0.9)
q.stop()
assert util.to_json(q) == \
'[{"source": "MANUAL", "zone_id": 2, "duration": 5, "remaining": 4}, \
{"source": "SCHEDULE", "zone_id": 2, "duration": 10, "remaining": 10}]'
def test_interrupt_auto():
"""
Wenn der Automatikmodus unterbrochen und ein Scheduled-Task unterbrochen in der
Warteschlange liegt, sollte ein manueller Task gestartet werden können.
"""
app = fixtures.TestingApp()
q = task_queue.TaskQueue(app)
app.auto = True
taskAuto = models.Task(models.Source.SCHEDULE, 1, 10)
taskMan = models.Task(models.Source.MANUAL, 2, 5)
q.start()
assert q.enqueue(taskAuto)
time.sleep(0.2)
assert not q.enqueue(taskMan, False)
app.auto = False
time.sleep(0.2)
assert q.enqueue(taskMan, False)
q.stop()