Controller/tests/test_application.py

270 lines
7.7 KiB
Python

# coding=utf-8
import subprocess
import tempfile
from unittest import mock
import pytest
import time
from datetime import datetime
from tsgrain_controller import application, models
from tsgrain_controller.io import io_factory
from tests import fixtures
JOB1_DATA = {
'date': datetime(2022, 1, 10, 12, 30),
'duration': 60,
'enable': True,
'repeat': False,
'zones': [1, 3]
}
JOB2_DATA = {
'date': datetime(2022, 1, 11, 18, 0),
'duration': 300,
'enable': True,
'repeat': True,
'zones': [5]
}
@pytest.fixture
def app(mocker):
mocker.patch('tsgrain_controller.io.io_factory.new_io',
return_value=fixtures.TestingIo())
with tempfile.TemporaryDirectory() as td:
app = application.Application(io_factory.IoType.NONE, td,
fixtures.FILE_CFG)
assert not app.is_running()
app.start()
assert app.is_running()
yield app
app.stop()
assert not app.is_running()
def test_request_task(app):
# Manually start a task (like via button press)
res = app.request_task(
models.TaskRequest(source=models.Source.MANUAL,
zone_id=2,
duration=30,
queuing=False,
cancelling=True))
assert res.started
assert not res.stopped
# Queue processing time
time.sleep(0.1)
# Try to start the same task again -> nothing happens
res = app.request_task(
models.TaskRequest(source=models.Source.MANUAL,
zone_id=2,
duration=30,
queuing=False,
cancelling=False))
assert not res.started
assert not res.stopped
# Try to start the same task again -> task should cancel
res = app.request_task(
models.TaskRequest(source=models.Source.MANUAL,
zone_id=2,
duration=30,
queuing=False,
cancelling=True))
assert not res.started
assert res.stopped
def test_request_task_queue(app):
# Enqueue and start a new task
res = app.request_task(
models.TaskRequest(source=models.Source.MANUAL,
zone_id=2,
duration=30,
queuing=True,
cancelling=True))
assert res.started
assert not res.stopped
assert len(app.queue.tasks) == 1
assert app.queue.tasks[0].zone_id == 2
# Duplicate task should not be enqueued
res = app.request_task(
models.TaskRequest(source=models.Source.MANUAL,
zone_id=2,
duration=30,
queuing=True,
cancelling=False))
assert not res.started
assert not res.stopped
assert len(app.queue.tasks) == 1
# Enqueue a new task
res = app.request_task(
models.TaskRequest(source=models.Source.MANUAL,
zone_id=3,
duration=30,
queuing=True,
cancelling=False))
assert res.started
assert not res.stopped
tasks = app.get_tasks()
assert len(tasks) == 2
assert tasks[0].zone_id == 2
assert tasks[1].zone_id == 3
# Queue processing time
time.sleep(0.2)
# Try to enqueue the same task again -> should cancel
res = app.request_task(
models.TaskRequest(source=models.Source.MANUAL,
zone_id=3,
duration=30,
queuing=True,
cancelling=True))
assert not res.started
assert res.stopped
def test_start_task(app):
# Manually start a task (like via button press)
assert app.start_task(models.Source.MANUAL, 2, 30, False)
# Queue processing time
time.sleep(0.1)
# Try to start the same task again -> nothing happens
assert not app.start_task(models.Source.MANUAL, 2, 30, False)
def test_start_task_queue(app):
# Enqueue and start a new task
assert app.start_task(models.Source.MANUAL, 2, 30, True)
# Queue processing time
time.sleep(0.1)
assert app.queue.running_task.zone_id == 2
# Duplicate task should not be enqueued
assert not app.start_task(models.Source.MANUAL, 2, 30, True)
# Enqueue new tasks
assert app.start_task(models.Source.MANUAL, 3, 30, True)
assert app.start_task(models.Source.SCHEDULE, 2, 30, True)
def test_stop_task(app):
# Enqueue and start a new task
assert app.start_task(models.Source.MANUAL, 2, 30, True)
# Queue processing time
time.sleep(0.1)
assert app.queue.running_task.zone_id == 2
# Stop task
assert not app.stop_task(models.Source.MANUAL, 3)
assert app.stop_task(models.Source.MANUAL, 2)
# Queue processing time
time.sleep(0.1)
assert app.queue.running_task is None
def test_crud_job(app):
# Insert jobs
job1 = models.Job.deserialize(JOB1_DATA)
job2 = models.Job.deserialize(JOB2_DATA)
assert app.create_job(job1) == job1.id == 1
assert app.create_job(job2) == job2.id == 2
# Get a job
got_job = app.get_job(1)
assert got_job.id == 1
assert got_job.serialize() == JOB1_DATA
# Get all jobs
got_jobs = app.get_jobs()
assert len(got_jobs) == 2
assert got_jobs[0].zones == [1, 3]
assert got_jobs[1].zones == [5]
# Update job
job2.zones = [4, 5]
app.update_job(job2)
assert app.get_job(2).zones == [4, 5]
# Disable job
app.disable_job(1)
assert app.get_job(1).enable is False
# Enable job
app.enable_job(1)
assert app.get_job(1).enable is True
# Delete job
app.delete_job(1)
assert len(app.get_jobs()) == 1
# Get job that does not exist
with pytest.raises(KeyError):
app.get_job(1)
# Delete job that does not exist
with pytest.raises(KeyError):
app.delete_job(1)
def test_get_system_datetime(app, mocker):
mock_res = mock.Mock()
mock_res.stdout = 'Europe/Berlin'
cmd_run_mock: mock.MagicMock = mocker.patch('subprocess.run',
return_value=mock_res)
tz = app.get_system_timezone()
assert tz == 'Europe/Berlin'
cmd_parts = ['cat', '/etc/timezone']
cmd_run_mock.assert_called_once_with(cmd_parts,
check=True,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def test_set_system_datetime(app, mocker):
cmd_run_mock: mock.MagicMock = mocker.patch('subprocess.run')
date_time = datetime(2021, 12, 25, 16, 30, 14)
app.set_system_datetime(date_time)
cmd_parts = ['date', '-s', '2021-12-25 16:30:14']
cmd_run_mock.assert_called_once_with(cmd_parts,
check=True,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def test_set_system_timezone(app, mocker):
cmd_run_mock: mock.MagicMock = mocker.patch('subprocess.run')
app.set_system_timezone('Europe/Berlin')
cmd_parts = ['timedatectl', 'set-timezone', 'Europe/Berlin']
cmd_run_mock.assert_called_once_with(cmd_parts,
check=True,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)