# coding=utf-8 import threading from concurrent import futures from typing import Generator import logging import grpc from google.protobuf import empty_pb2, wrappers_pb2 from tsgrain_controller import models, util from tsgrain_controller.grpc_generated import tsgrain_pb2, tsgrain_pb2_grpc task_request = tsgrain_pb2.TaskRequest() task_request.zone_id = 2 task_request.duration = 30 class TSGRainServicer(tsgrain_pb2_grpc.TSGRainServicer): def __init__(self, app: models.AppInterface): self.app = app self.tasklist_update = threading.Event() def RequestTask(self, request: tsgrain_pb2.TaskRequest, context) -> tsgrain_pb2.Task: logging.info('GRPC RequestTask: %s', request) res = self.app.request_task( models.TaskRequest.deserialize_proto(request)) return res.serialize_proto() def StartTask(self, request: tsgrain_pb2.TaskStart, context): logging.info('GRPC StartTask: %s', request) success = self.app.start_task(models.Source(request.source), request.zone_id, request.duration, request.queuing) return wrappers_pb2.BoolValue(value=success) def StopTask(self, request: tsgrain_pb2.TaskStop, context): logging.info('GRPC StopTask: %s', request) success = self.app.stop_task(models.Source(request.source), request.zone_id) return wrappers_pb2.BoolValue(value=success) def _get_tasks(self) -> tsgrain_pb2.TaskList: app_tasks = self.app.get_tasks() tasks = [task.serialize_proto() for task in app_tasks] return tsgrain_pb2.TaskList(now=util.datetime_to_proto( util.datetime_now()), tasks=tasks, auto_mode=self.app.get_auto_mode()) def GetTasks(self, request: empty_pb2.Empty, context) -> tsgrain_pb2.TaskList: logging.info('GRPC GetTasks: %s', request) return self._get_tasks() def StreamTasks(self, request: empty_pb2.Empty, context) -> Generator[tsgrain_pb2.TaskList, None, None]: logging.debug('GRPC StreamTasks: %s', request) if self.tasklist_update.wait(5): yield self._get_tasks() self.tasklist_update.clear() def CreateJob(self, request: tsgrain_pb2.Job, context) -> tsgrain_pb2.JobID: logging.info('GRPC CreateJob: %s', request) job = models.Job.deserialize_proto(request) job_id = self.app.create_job(job) return tsgrain_pb2.JobID(id=job_id) def GetJob(self, request: tsgrain_pb2.JobID, context) -> tsgrain_pb2.Job: logging.info('GRPC GetJob: %s', request) try: job = self.app.get_job(request.id) except KeyError: context.set_code(grpc.StatusCode.NOT_FOUND) raise return job.serialize_proto() def GetJobs(self, request: empty_pb2.Empty, context) -> tsgrain_pb2.JobList: logging.info('GRPC GetJobs: %s', request) jobs = [job.serialize_proto() for job in self.app.get_jobs()] return tsgrain_pb2.JobList(jobs=jobs) def UpdateJob(self, request: tsgrain_pb2.Job, context) -> empty_pb2.Empty: logging.info('GRPC UpdateJob: %s', request) job = models.Job.deserialize_proto(request) self.app.update_job(job) return empty_pb2.Empty() def DeleteJob(self, request: tsgrain_pb2.JobID, context) -> empty_pb2.Empty: logging.info('GRPC DeleteJob: %s', request) try: self.app.delete_job(request.id) except KeyError: context.set_code(grpc.StatusCode.NOT_FOUND) raise return empty_pb2.Empty() def EnableJob(self, request: tsgrain_pb2.JobID, context) -> empty_pb2.Empty: logging.info('GRPC EnableJob: %s', request) try: self.app.enable_job(request.id) except KeyError: context.set_code(grpc.StatusCode.NOT_FOUND) raise return empty_pb2.Empty() def DisableJob(self, request: tsgrain_pb2.JobID, context) -> empty_pb2.Empty: logging.info('GRPC DisableJob: %s', request) try: self.app.disable_job(request.id) except KeyError: context.set_code(grpc.StatusCode.NOT_FOUND) raise return empty_pb2.Empty() def GetAutoMode(self, request: empty_pb2.Empty, context) -> wrappers_pb2.BoolValue: logging.info('GRPC GetAutoMode: %s', request) return wrappers_pb2.BoolValue(value=self.app.get_auto_mode()) def SetAutoMode(self, request: wrappers_pb2.BoolValue, context) -> empty_pb2.Empty: logging.info('GRPC SetAutoMode: %s', request) self.app.set_auto_mode(request.value) return empty_pb2.Empty() def GetSystemTime(self, request: empty_pb2.Empty, context) -> tsgrain_pb2.SystemTime: logging.info('GRPC GetConfigTime: %s', request) return tsgrain_pb2.SystemTime(datetime=util.datetime_to_proto( util.datetime_now()), timezone=self.app.get_system_timezone()) def SetSystemTime(self, request: tsgrain_pb2.Timestamp, context) -> empty_pb2.Empty: logging.info('GRPC SetSystemTime: %s', request) if request.seconds > 0: self.app.set_system_datetime(util.datetime_from_proto(request)) else: raise Exception('timestamp invalid') return empty_pb2.Empty() def SetSystemTimezone(self, request: wrappers_pb2.StringValue, context) -> empty_pb2.Empty: logging.info('GRPC SetSystemTimezone: %s', request) self.app.set_system_timezone(request.value) return empty_pb2.Empty() def GetDefaultIrrigationTime(self, request: empty_pb2.Empty, context) -> wrappers_pb2.Int32Value: logging.info('GRPC GetDefaultIrrigationTime: %s', request) return wrappers_pb2.Int32Value(value=self.app.get_cfg().manual_time) def SetDefaultIrrigationTime(self, request: wrappers_pb2.Int32Value, context) -> empty_pb2.Empty: logging.info('GRPC SetDefaultIrrigationTime: %s', request) self.app.get_cfg().manual_time = request.value return empty_pb2.Empty() def GetNZones(self, request: empty_pb2.Empty, context) -> wrappers_pb2.Int32Value: logging.info('GRPC GetNZones: %s', request) return wrappers_pb2.Int32Value(value=self.app.get_cfg().n_zones) def new_server(servicer: TSGRainServicer, port: str) -> grpc.Server: server = grpc.server(futures.ThreadPoolExecutor()) tsgrain_pb2_grpc.add_TSGRainServicer_to_server(servicer, server) server.add_insecure_port(port) return server