270 lines
7.7 KiB
Python
270 lines
7.7 KiB
Python
# coding=utf-8
|
|
import subprocess
|
|
import tempfile
|
|
from unittest import mock
|
|
|
|
import pytest
|
|
import time
|
|
from datetime import datetime
|
|
|
|
from tsgrain_controller import application, models
|
|
from tsgrain_controller.io import io_factory
|
|
from tests import fixtures
|
|
|
|
JOB1_DATA = {
|
|
'date': datetime(2022, 1, 10, 12, 30),
|
|
'duration': 60,
|
|
'enable': True,
|
|
'repeat': False,
|
|
'zones': [1, 3]
|
|
}
|
|
|
|
JOB2_DATA = {
|
|
'date': datetime(2022, 1, 11, 18, 0),
|
|
'duration': 300,
|
|
'enable': True,
|
|
'repeat': True,
|
|
'zones': [5]
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
def app(mocker):
|
|
mocker.patch('tsgrain_controller.io.io_factory.new_io',
|
|
return_value=fixtures.TestingIo())
|
|
|
|
with tempfile.TemporaryDirectory() as td:
|
|
app = application.Application(io_factory.IoType.NONE, td,
|
|
fixtures.FILE_CFG)
|
|
|
|
assert not app.is_running()
|
|
app.start()
|
|
assert app.is_running()
|
|
yield app
|
|
app.stop()
|
|
assert not app.is_running()
|
|
|
|
|
|
def test_request_task(app):
|
|
# Manually start a task (like via button press)
|
|
res = app.request_task(
|
|
models.TaskRequest(source=models.Source.MANUAL,
|
|
zone_id=2,
|
|
duration=30,
|
|
queuing=False,
|
|
cancelling=True))
|
|
assert res.started
|
|
assert not res.stopped
|
|
|
|
# Queue processing time
|
|
time.sleep(0.1)
|
|
|
|
# Try to start the same task again -> nothing happens
|
|
res = app.request_task(
|
|
models.TaskRequest(source=models.Source.MANUAL,
|
|
zone_id=2,
|
|
duration=30,
|
|
queuing=False,
|
|
cancelling=False))
|
|
assert not res.started
|
|
assert not res.stopped
|
|
|
|
# Try to start the same task again -> task should cancel
|
|
res = app.request_task(
|
|
models.TaskRequest(source=models.Source.MANUAL,
|
|
zone_id=2,
|
|
duration=30,
|
|
queuing=False,
|
|
cancelling=True))
|
|
assert not res.started
|
|
assert res.stopped
|
|
|
|
|
|
def test_request_task_queue(app):
|
|
# Enqueue and start a new task
|
|
res = app.request_task(
|
|
models.TaskRequest(source=models.Source.MANUAL,
|
|
zone_id=2,
|
|
duration=30,
|
|
queuing=True,
|
|
cancelling=True))
|
|
assert res.started
|
|
assert not res.stopped
|
|
assert len(app.queue.tasks) == 1
|
|
assert app.queue.tasks[0].zone_id == 2
|
|
|
|
# Duplicate task should not be enqueued
|
|
res = app.request_task(
|
|
models.TaskRequest(source=models.Source.MANUAL,
|
|
zone_id=2,
|
|
duration=30,
|
|
queuing=True,
|
|
cancelling=False))
|
|
assert not res.started
|
|
assert not res.stopped
|
|
assert len(app.queue.tasks) == 1
|
|
|
|
# Enqueue a new task
|
|
res = app.request_task(
|
|
models.TaskRequest(source=models.Source.MANUAL,
|
|
zone_id=3,
|
|
duration=30,
|
|
queuing=True,
|
|
cancelling=False))
|
|
assert res.started
|
|
assert not res.stopped
|
|
|
|
tasks = app.get_tasks()
|
|
assert len(tasks) == 2
|
|
|
|
assert tasks[0].zone_id == 2
|
|
assert tasks[1].zone_id == 3
|
|
|
|
# Queue processing time
|
|
time.sleep(0.2)
|
|
|
|
# Try to enqueue the same task again -> should cancel
|
|
res = app.request_task(
|
|
models.TaskRequest(source=models.Source.MANUAL,
|
|
zone_id=3,
|
|
duration=30,
|
|
queuing=True,
|
|
cancelling=True))
|
|
|
|
assert not res.started
|
|
assert res.stopped
|
|
|
|
|
|
def test_start_task(app):
|
|
# Manually start a task (like via button press)
|
|
assert app.start_task(models.Source.MANUAL, 2, 30, False)
|
|
|
|
# Queue processing time
|
|
time.sleep(0.1)
|
|
|
|
# Try to start the same task again -> nothing happens
|
|
assert not app.start_task(models.Source.MANUAL, 2, 30, False)
|
|
|
|
|
|
def test_start_task_queue(app):
|
|
# Enqueue and start a new task
|
|
assert app.start_task(models.Source.MANUAL, 2, 30, True)
|
|
|
|
# Queue processing time
|
|
time.sleep(0.1)
|
|
assert app.queue.running_task.zone_id == 2
|
|
|
|
# Duplicate task should not be enqueued
|
|
assert not app.start_task(models.Source.MANUAL, 2, 30, True)
|
|
|
|
# Enqueue new tasks
|
|
assert app.start_task(models.Source.MANUAL, 3, 30, True)
|
|
assert app.start_task(models.Source.SCHEDULE, 2, 30, True)
|
|
|
|
|
|
def test_stop_task(app):
|
|
# Enqueue and start a new task
|
|
assert app.start_task(models.Source.MANUAL, 2, 30, True)
|
|
|
|
# Queue processing time
|
|
time.sleep(0.1)
|
|
assert app.queue.running_task.zone_id == 2
|
|
|
|
# Stop task
|
|
assert not app.stop_task(models.Source.MANUAL, 3)
|
|
assert app.stop_task(models.Source.MANUAL, 2)
|
|
|
|
# Queue processing time
|
|
time.sleep(0.1)
|
|
|
|
assert app.queue.running_task is None
|
|
|
|
|
|
def test_crud_job(app):
|
|
# Insert jobs
|
|
job1 = models.Job.deserialize(JOB1_DATA)
|
|
job2 = models.Job.deserialize(JOB2_DATA)
|
|
|
|
assert app.create_job(job1) == job1.id == 1
|
|
assert app.create_job(job2) == job2.id == 2
|
|
|
|
# Get a job
|
|
got_job = app.get_job(1)
|
|
assert got_job.id == 1
|
|
assert got_job.serialize() == JOB1_DATA
|
|
|
|
# Get all jobs
|
|
got_jobs = app.get_jobs()
|
|
assert len(got_jobs) == 2
|
|
assert got_jobs[0].zones == [1, 3]
|
|
assert got_jobs[1].zones == [5]
|
|
|
|
# Update job
|
|
job2.zones = [4, 5]
|
|
app.update_job(job2)
|
|
assert app.get_job(2).zones == [4, 5]
|
|
|
|
# Disable job
|
|
app.disable_job(1)
|
|
assert app.get_job(1).enable is False
|
|
|
|
# Enable job
|
|
app.enable_job(1)
|
|
assert app.get_job(1).enable is True
|
|
|
|
# Delete job
|
|
app.delete_job(1)
|
|
assert len(app.get_jobs()) == 1
|
|
|
|
# Get job that does not exist
|
|
with pytest.raises(KeyError):
|
|
app.get_job(1)
|
|
|
|
# Delete job that does not exist
|
|
with pytest.raises(KeyError):
|
|
app.delete_job(1)
|
|
|
|
|
|
def test_get_system_datetime(app, mocker):
|
|
mock_res = mock.Mock()
|
|
mock_res.stdout = 'Europe/Berlin'
|
|
|
|
cmd_run_mock: mock.MagicMock = mocker.patch('subprocess.run',
|
|
return_value=mock_res)
|
|
|
|
tz = app.get_system_timezone()
|
|
assert tz == 'Europe/Berlin'
|
|
|
|
cmd_parts = ['cat', '/etc/timezone']
|
|
cmd_run_mock.assert_called_once_with(cmd_parts,
|
|
check=True,
|
|
universal_newlines=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|
|
|
|
|
|
def test_set_system_datetime(app, mocker):
|
|
cmd_run_mock: mock.MagicMock = mocker.patch('subprocess.run')
|
|
|
|
date_time = datetime(2021, 12, 25, 16, 30, 14)
|
|
app.set_system_datetime(date_time)
|
|
|
|
cmd_parts = ['date', '-s', '2021-12-25 16:30:14']
|
|
cmd_run_mock.assert_called_once_with(cmd_parts,
|
|
check=True,
|
|
universal_newlines=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|
|
|
|
|
|
def test_set_system_timezone(app, mocker):
|
|
cmd_run_mock: mock.MagicMock = mocker.patch('subprocess.run')
|
|
|
|
app.set_system_timezone('Europe/Berlin')
|
|
|
|
cmd_parts = ['timedatectl', 'set-timezone', 'Europe/Berlin']
|
|
cmd_run_mock.assert_called_once_with(cmd_parts,
|
|
check=True,
|
|
universal_newlines=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|