Controller/tests/test_grpc_server.py

229 lines
7.5 KiB
Python

# coding=utf-8
import subprocess
import tempfile
import time
from datetime import datetime
from unittest import mock
import pytest
import grpc
from google.protobuf import empty_pb2, wrappers_pb2
from tsgrain_controller import application, models, util
from tsgrain_controller.grpc_generated import tsgrain_pb2, tsgrain_pb2_grpc
from tsgrain_controller.io import io_factory
from tests import fixtures
JOB1_DATA = {
'date': datetime(2022, 1, 10, 12, 30),
'duration': 60,
'enable': True,
'repeat': False,
'zones': [1, 3]
}
JOB2_DATA = {
'date': datetime(2022, 1, 11, 18, 0),
'duration': 300,
'enable': True,
'repeat': True,
'zones': [5]
}
@pytest.fixture
def app(mocker):
mocker.patch('tsgrain_controller.io.io_factory.new_io',
return_value=fixtures.TestingIo())
with tempfile.TemporaryDirectory() as td:
app = application.Application(io_factory.IoType.NONE, td,
fixtures.FILE_CFG)
assert not app.is_running()
app.start()
assert app.is_running()
yield app
app.stop()
assert not app.is_running()
@pytest.fixture
def grpc_client(app):
with grpc.insecure_channel('localhost:50052') as channel:
client = tsgrain_pb2_grpc.TSGRainStub(channel)
yield client
def test_request_task(grpc_client):
# Manually start a task (like via button press)
res = grpc_client.RequestTask(
tsgrain_pb2.TaskRequest(source=tsgrain_pb2.TaskSource.MANUAL,
zone_id=2,
duration=30,
queuing=False,
cancelling=True))
assert res.started
assert not res.stopped
# Queue processing time
time.sleep(0.1)
# Try to start the same task again -> nothing happens
res = grpc_client.RequestTask(
tsgrain_pb2.TaskRequest(source=tsgrain_pb2.TaskSource.MANUAL,
zone_id=2,
duration=30,
queuing=False,
cancelling=False))
assert not res.started
assert not res.stopped
# Try to start the same task again -> task should cancel
res = grpc_client.RequestTask(
tsgrain_pb2.TaskRequest(source=tsgrain_pb2.TaskSource.MANUAL,
zone_id=2,
duration=30,
queuing=False,
cancelling=True))
assert not res.started
assert res.stopped
def test_request_task_queue(grpc_client):
# Manually start a task (like via button press)
res = grpc_client.RequestTask(
tsgrain_pb2.TaskRequest(source=tsgrain_pb2.TaskSource.MANUAL,
zone_id=2,
duration=30,
queuing=False,
cancelling=True))
assert res.started
assert not res.stopped
task_list = grpc_client.GetTasks(empty_pb2.Empty())
assert task_list.now.seconds == pytest.approx(util.datetime_to_proto(
datetime.now()).seconds,
abs=1)
assert len(task_list.tasks) == 1
assert task_list.tasks[0].zone_id == 2
# Duplicate task should not be enqueued
res = grpc_client.RequestTask(
tsgrain_pb2.TaskRequest(source=tsgrain_pb2.TaskSource.MANUAL,
zone_id=2,
duration=30,
queuing=True,
cancelling=False))
assert not res.started
assert not res.stopped
task_list = grpc_client.GetTasks(empty_pb2.Empty())
assert len(task_list.tasks) == 1
# Enqueue a new task
res = grpc_client.RequestTask(
tsgrain_pb2.TaskRequest(source=tsgrain_pb2.TaskSource.MANUAL,
zone_id=3,
duration=30,
queuing=True,
cancelling=False))
assert res.started
assert not res.stopped
task_list = grpc_client.GetTasks(empty_pb2.Empty())
assert len(task_list.tasks) == 2
assert task_list.tasks[0].zone_id == 2
assert task_list.tasks[1].zone_id == 3
def test_crud_job(grpc_client):
# Insert jobs
job1 = models.Job.deserialize(JOB1_DATA)
job2 = models.Job.deserialize(JOB2_DATA)
assert grpc_client.CreateJob(job1.serialize_proto()).id == 1
assert grpc_client.CreateJob(job2.serialize_proto()).id == 2
# Get a job
got_job = grpc_client.GetJob(tsgrain_pb2.JobID(id=1))
assert got_job.id == 1
assert models.Job.deserialize_proto(got_job).serialize() == JOB1_DATA
# Get all jobs
job_list = grpc_client.GetJobs(empty_pb2.Empty())
assert len(job_list.jobs) == 2
assert job_list.jobs[0].zones == [1, 3]
assert job_list.jobs[1].zones == [5]
# Update job
job2.id = 2
job2.zones = [4, 5]
grpc_client.UpdateJob(job2.serialize_proto())
assert grpc_client.GetJob(tsgrain_pb2.JobID(id=2)).zones == [4, 5]
# Delete job
grpc_client.DeleteJob(tsgrain_pb2.JobID(id=1))
assert len(grpc_client.GetJobs(empty_pb2.Empty()).jobs) == 1
# Get job that does not exist
with pytest.raises(grpc.RpcError):
grpc_client.GetJob(tsgrain_pb2.JobID(id=1))
# Delete job that does not exist
with pytest.raises(grpc.RpcError):
grpc_client.DeleteJob(tsgrain_pb2.JobID(id=1))
def test_get_system_datetime(grpc_client, mocker):
mock_res = mock.Mock()
mock_res.stdout = 'Europe/Berlin'
cmd_run_mock: mock.MagicMock = mocker.patch('subprocess.run',
return_value=mock_res)
system_time = grpc_client.GetSystemTime(empty_pb2.Empty())
assert system_time.datetime.seconds == pytest.approx(
datetime.now().timestamp(), abs=1)
assert system_time.timezone == 'Europe/Berlin'
cmd_parts = ['cat', '/etc/timezone']
cmd_run_mock.assert_called_once_with(cmd_parts,
check=True,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def test_set_system_datetime(grpc_client, mocker):
cmd_run_mock: mock.MagicMock = mocker.patch('subprocess.run')
date_time = datetime(2021, 12, 25, 16, 30, 14)
grpc_client.SetSystemTime(util.datetime_to_proto(date_time))
cmd_parts = ['date', '-s', '2021-12-25 16:30:14']
cmd_run_mock.assert_called_once_with(cmd_parts,
check=True,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def test_set_system_timezone(grpc_client, mocker):
cmd_run_mock: mock.MagicMock = mocker.patch('subprocess.run')
grpc_client.SetSystemTimezone(
wrappers_pb2.StringValue(value='Europe/Berlin'))
cmd_parts = ['timedatectl', 'set-timezone', 'Europe/Berlin']
cmd_run_mock.assert_called_once_with(cmd_parts,
check=True,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def test_get_n_zones(grpc_client):
n_zones = grpc_client.GetNZones(empty_pb2.Empty()).value
assert n_zones == 5