# coding=utf-8 import subprocess import tempfile import time from datetime import datetime from unittest import mock import pytest import grpc from google.protobuf import empty_pb2, wrappers_pb2 from tsgrain_controller import application, models, util from tsgrain_controller.grpc_generated import tsgrain_pb2, tsgrain_pb2_grpc from tsgrain_controller.io import io_factory from tests import fixtures JOB1_DATA = { 'date': datetime(2022, 1, 10, 12, 30), 'duration': 60, 'enable': True, 'repeat': False, 'zones': [1, 3] } JOB2_DATA = { 'date': datetime(2022, 1, 11, 18, 0), 'duration': 300, 'enable': True, 'repeat': True, 'zones': [5] } @pytest.fixture def app(mocker): mocker.patch('tsgrain_controller.io.io_factory.new_io', return_value=fixtures.TestingIo()) with tempfile.TemporaryDirectory() as td: app = application.Application(io_factory.IoType.NONE, td, fixtures.FILE_CFG) assert not app.is_running() app.start() assert app.is_running() yield app app.stop() assert not app.is_running() @pytest.fixture def grpc_client(app): with grpc.insecure_channel('localhost:50052') as channel: client = tsgrain_pb2_grpc.TSGRainStub(channel) yield client def test_request_task(grpc_client): # Manually start a task (like via button press) res = grpc_client.RequestTask( tsgrain_pb2.TaskRequest(source=tsgrain_pb2.TaskSource.MANUAL, zone_id=2, duration=30, queuing=False, cancelling=True)) assert res.started assert not res.stopped # Queue processing time time.sleep(0.1) # Try to start the same task again -> nothing happens res = grpc_client.RequestTask( tsgrain_pb2.TaskRequest(source=tsgrain_pb2.TaskSource.MANUAL, zone_id=2, duration=30, queuing=False, cancelling=False)) assert not res.started assert not res.stopped # Try to start the same task again -> task should cancel res = grpc_client.RequestTask( tsgrain_pb2.TaskRequest(source=tsgrain_pb2.TaskSource.MANUAL, zone_id=2, duration=30, queuing=False, cancelling=True)) assert not res.started assert res.stopped def test_request_task_queue(grpc_client): # Manually start a task (like via button press) res = grpc_client.RequestTask( tsgrain_pb2.TaskRequest(source=tsgrain_pb2.TaskSource.MANUAL, zone_id=2, duration=30, queuing=False, cancelling=True)) assert res.started assert not res.stopped task_list = grpc_client.GetTasks(empty_pb2.Empty()) assert task_list.now.seconds == pytest.approx(util.datetime_to_proto( datetime.now()).seconds, abs=1) assert len(task_list.tasks) == 1 assert task_list.tasks[0].zone_id == 2 # Duplicate task should not be enqueued res = grpc_client.RequestTask( tsgrain_pb2.TaskRequest(source=tsgrain_pb2.TaskSource.MANUAL, zone_id=2, duration=30, queuing=True, cancelling=False)) assert not res.started assert not res.stopped task_list = grpc_client.GetTasks(empty_pb2.Empty()) assert len(task_list.tasks) == 1 # Enqueue a new task res = grpc_client.RequestTask( tsgrain_pb2.TaskRequest(source=tsgrain_pb2.TaskSource.MANUAL, zone_id=3, duration=30, queuing=True, cancelling=False)) assert res.started assert not res.stopped task_list = grpc_client.GetTasks(empty_pb2.Empty()) assert len(task_list.tasks) == 2 assert task_list.tasks[0].zone_id == 2 assert task_list.tasks[1].zone_id == 3 def test_crud_job(grpc_client): # Insert jobs job1 = models.Job.deserialize(JOB1_DATA) job2 = models.Job.deserialize(JOB2_DATA) assert grpc_client.CreateJob(job1.serialize_proto()).id == 1 assert grpc_client.CreateJob(job2.serialize_proto()).id == 2 # Get a job got_job = grpc_client.GetJob(tsgrain_pb2.JobID(id=1)) assert got_job.id == 1 assert models.Job.deserialize_proto(got_job).serialize() == JOB1_DATA # Get all jobs job_list = grpc_client.GetJobs(empty_pb2.Empty()) assert len(job_list.jobs) == 2 assert job_list.jobs[0].zones == [1, 3] assert job_list.jobs[1].zones == [5] # Update job job2.id = 2 job2.zones = [4, 5] grpc_client.UpdateJob(job2.serialize_proto()) assert grpc_client.GetJob(tsgrain_pb2.JobID(id=2)).zones == [4, 5] # Delete job grpc_client.DeleteJob(tsgrain_pb2.JobID(id=1)) assert len(grpc_client.GetJobs(empty_pb2.Empty()).jobs) == 1 # Get job that does not exist with pytest.raises(grpc.RpcError): grpc_client.GetJob(tsgrain_pb2.JobID(id=1)) # Delete job that does not exist with pytest.raises(grpc.RpcError): grpc_client.DeleteJob(tsgrain_pb2.JobID(id=1)) def test_get_system_datetime(grpc_client, mocker): mock_res = mock.Mock() mock_res.stdout = 'Europe/Berlin' cmd_run_mock: mock.MagicMock = mocker.patch('subprocess.run', return_value=mock_res) system_time = grpc_client.GetSystemTime(empty_pb2.Empty()) assert system_time.datetime.seconds == pytest.approx( datetime.now().timestamp(), abs=1) assert system_time.timezone == 'Europe/Berlin' cmd_parts = ['cat', '/etc/timezone'] cmd_run_mock.assert_called_once_with(cmd_parts, check=True, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) def test_set_system_datetime(grpc_client, mocker): cmd_run_mock: mock.MagicMock = mocker.patch('subprocess.run') date_time = datetime(2021, 12, 25, 16, 30, 14) grpc_client.SetSystemTime(util.datetime_to_proto(date_time)) cmd_parts = ['date', '-s', '2021-12-25 16:30:14'] cmd_run_mock.assert_called_once_with(cmd_parts, check=True, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) def test_set_system_timezone(grpc_client, mocker): cmd_run_mock: mock.MagicMock = mocker.patch('subprocess.run') grpc_client.SetSystemTimezone( wrappers_pb2.StringValue(value='Europe/Berlin')) cmd_parts = ['timedatectl', 'set-timezone', 'Europe/Berlin'] cmd_run_mock.assert_called_once_with(cmd_parts, check=True, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) def test_get_n_zones(grpc_client): n_zones = grpc_client.GetNZones(empty_pb2.Empty()).value assert n_zones == 5