229 lines
7.5 KiB
Python
229 lines
7.5 KiB
Python
# coding=utf-8
|
|
import subprocess
|
|
import tempfile
|
|
import time
|
|
from datetime import datetime
|
|
from unittest import mock
|
|
|
|
import pytest
|
|
import grpc
|
|
from google.protobuf import empty_pb2, wrappers_pb2
|
|
|
|
from tsgrain_controller import application, models, util
|
|
from tsgrain_controller.grpc_generated import tsgrain_pb2, tsgrain_pb2_grpc
|
|
from tsgrain_controller.io import io_factory
|
|
from tests import fixtures
|
|
|
|
JOB1_DATA = {
|
|
'date': datetime(2022, 1, 10, 12, 30),
|
|
'duration': 60,
|
|
'enable': True,
|
|
'repeat': False,
|
|
'zones': [1, 3]
|
|
}
|
|
|
|
JOB2_DATA = {
|
|
'date': datetime(2022, 1, 11, 18, 0),
|
|
'duration': 300,
|
|
'enable': True,
|
|
'repeat': True,
|
|
'zones': [5]
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
def app(mocker):
|
|
mocker.patch('tsgrain_controller.io.io_factory.new_io',
|
|
return_value=fixtures.TestingIo())
|
|
|
|
with tempfile.TemporaryDirectory() as td:
|
|
app = application.Application(io_factory.IoType.NONE, td,
|
|
fixtures.FILE_CFG)
|
|
assert not app.is_running()
|
|
app.start()
|
|
assert app.is_running()
|
|
yield app
|
|
app.stop()
|
|
assert not app.is_running()
|
|
|
|
|
|
@pytest.fixture
|
|
def grpc_client(app):
|
|
with grpc.insecure_channel('localhost:50052') as channel:
|
|
client = tsgrain_pb2_grpc.TSGRainStub(channel)
|
|
yield client
|
|
|
|
|
|
def test_request_task(grpc_client):
|
|
# Manually start a task (like via button press)
|
|
res = grpc_client.RequestTask(
|
|
tsgrain_pb2.TaskRequest(source=tsgrain_pb2.TaskSource.MANUAL,
|
|
zone_id=2,
|
|
duration=30,
|
|
queuing=False,
|
|
cancelling=True))
|
|
|
|
assert res.started
|
|
assert not res.stopped
|
|
|
|
# Queue processing time
|
|
time.sleep(0.1)
|
|
|
|
# Try to start the same task again -> nothing happens
|
|
res = grpc_client.RequestTask(
|
|
tsgrain_pb2.TaskRequest(source=tsgrain_pb2.TaskSource.MANUAL,
|
|
zone_id=2,
|
|
duration=30,
|
|
queuing=False,
|
|
cancelling=False))
|
|
assert not res.started
|
|
assert not res.stopped
|
|
|
|
# Try to start the same task again -> task should cancel
|
|
res = grpc_client.RequestTask(
|
|
tsgrain_pb2.TaskRequest(source=tsgrain_pb2.TaskSource.MANUAL,
|
|
zone_id=2,
|
|
duration=30,
|
|
queuing=False,
|
|
cancelling=True))
|
|
assert not res.started
|
|
assert res.stopped
|
|
|
|
|
|
def test_request_task_queue(grpc_client):
|
|
# Manually start a task (like via button press)
|
|
res = grpc_client.RequestTask(
|
|
tsgrain_pb2.TaskRequest(source=tsgrain_pb2.TaskSource.MANUAL,
|
|
zone_id=2,
|
|
duration=30,
|
|
queuing=False,
|
|
cancelling=True))
|
|
assert res.started
|
|
assert not res.stopped
|
|
|
|
task_list = grpc_client.GetTasks(empty_pb2.Empty())
|
|
assert task_list.now.seconds == pytest.approx(util.datetime_to_proto(
|
|
datetime.now()).seconds,
|
|
abs=1)
|
|
|
|
assert len(task_list.tasks) == 1
|
|
assert task_list.tasks[0].zone_id == 2
|
|
|
|
# Duplicate task should not be enqueued
|
|
res = grpc_client.RequestTask(
|
|
tsgrain_pb2.TaskRequest(source=tsgrain_pb2.TaskSource.MANUAL,
|
|
zone_id=2,
|
|
duration=30,
|
|
queuing=True,
|
|
cancelling=False))
|
|
assert not res.started
|
|
assert not res.stopped
|
|
task_list = grpc_client.GetTasks(empty_pb2.Empty())
|
|
assert len(task_list.tasks) == 1
|
|
|
|
# Enqueue a new task
|
|
res = grpc_client.RequestTask(
|
|
tsgrain_pb2.TaskRequest(source=tsgrain_pb2.TaskSource.MANUAL,
|
|
zone_id=3,
|
|
duration=30,
|
|
queuing=True,
|
|
cancelling=False))
|
|
assert res.started
|
|
assert not res.stopped
|
|
|
|
task_list = grpc_client.GetTasks(empty_pb2.Empty())
|
|
assert len(task_list.tasks) == 2
|
|
|
|
assert task_list.tasks[0].zone_id == 2
|
|
assert task_list.tasks[1].zone_id == 3
|
|
|
|
|
|
def test_crud_job(grpc_client):
|
|
# Insert jobs
|
|
job1 = models.Job.deserialize(JOB1_DATA)
|
|
job2 = models.Job.deserialize(JOB2_DATA)
|
|
|
|
assert grpc_client.CreateJob(job1.serialize_proto()).id == 1
|
|
assert grpc_client.CreateJob(job2.serialize_proto()).id == 2
|
|
|
|
# Get a job
|
|
got_job = grpc_client.GetJob(tsgrain_pb2.JobID(id=1))
|
|
assert got_job.id == 1
|
|
assert models.Job.deserialize_proto(got_job).serialize() == JOB1_DATA
|
|
|
|
# Get all jobs
|
|
job_list = grpc_client.GetJobs(empty_pb2.Empty())
|
|
assert len(job_list.jobs) == 2
|
|
assert job_list.jobs[0].zones == [1, 3]
|
|
assert job_list.jobs[1].zones == [5]
|
|
|
|
# Update job
|
|
job2.id = 2
|
|
job2.zones = [4, 5]
|
|
grpc_client.UpdateJob(job2.serialize_proto())
|
|
assert grpc_client.GetJob(tsgrain_pb2.JobID(id=2)).zones == [4, 5]
|
|
|
|
# Delete job
|
|
grpc_client.DeleteJob(tsgrain_pb2.JobID(id=1))
|
|
assert len(grpc_client.GetJobs(empty_pb2.Empty()).jobs) == 1
|
|
|
|
# Get job that does not exist
|
|
with pytest.raises(grpc.RpcError):
|
|
grpc_client.GetJob(tsgrain_pb2.JobID(id=1))
|
|
|
|
# Delete job that does not exist
|
|
with pytest.raises(grpc.RpcError):
|
|
grpc_client.DeleteJob(tsgrain_pb2.JobID(id=1))
|
|
|
|
|
|
def test_get_system_datetime(grpc_client, mocker):
|
|
mock_res = mock.Mock()
|
|
mock_res.stdout = 'Europe/Berlin'
|
|
|
|
cmd_run_mock: mock.MagicMock = mocker.patch('subprocess.run',
|
|
return_value=mock_res)
|
|
|
|
system_time = grpc_client.GetSystemTime(empty_pb2.Empty())
|
|
assert system_time.datetime.seconds == pytest.approx(
|
|
datetime.now().timestamp(), abs=1)
|
|
assert system_time.timezone == 'Europe/Berlin'
|
|
|
|
cmd_parts = ['cat', '/etc/timezone']
|
|
cmd_run_mock.assert_called_once_with(cmd_parts,
|
|
check=True,
|
|
universal_newlines=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|
|
|
|
|
|
def test_set_system_datetime(grpc_client, mocker):
|
|
cmd_run_mock: mock.MagicMock = mocker.patch('subprocess.run')
|
|
|
|
date_time = datetime(2021, 12, 25, 16, 30, 14)
|
|
grpc_client.SetSystemTime(util.datetime_to_proto(date_time))
|
|
|
|
cmd_parts = ['date', '-s', '2021-12-25 16:30:14']
|
|
cmd_run_mock.assert_called_once_with(cmd_parts,
|
|
check=True,
|
|
universal_newlines=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|
|
|
|
|
|
def test_set_system_timezone(grpc_client, mocker):
|
|
cmd_run_mock: mock.MagicMock = mocker.patch('subprocess.run')
|
|
|
|
grpc_client.SetSystemTimezone(
|
|
wrappers_pb2.StringValue(value='Europe/Berlin'))
|
|
|
|
cmd_parts = ['timedatectl', 'set-timezone', 'Europe/Berlin']
|
|
cmd_run_mock.assert_called_once_with(cmd_parts,
|
|
check=True,
|
|
universal_newlines=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|
|
|
|
|
|
def test_get_n_zones(grpc_client):
|
|
n_zones = grpc_client.GetNZones(empty_pb2.Empty()).value
|
|
assert n_zones == 5
|