Controller/tsgrain_controller/grpc_server.py

198 lines
6.9 KiB
Python

# coding=utf-8
import threading
from concurrent import futures
from typing import Generator
import logging
import grpc
from google.protobuf import empty_pb2, wrappers_pb2
from tsgrain_controller import models, util
from tsgrain_controller.grpc_generated import tsgrain_pb2, tsgrain_pb2_grpc
task_request = tsgrain_pb2.TaskRequest()
task_request.zone_id = 2
task_request.duration = 30
class TSGRainServicer(tsgrain_pb2_grpc.TSGRainServicer):
def __init__(self, app: models.AppInterface):
self.app = app
self.tasklist_update = threading.Event()
def RequestTask(self, request: tsgrain_pb2.TaskRequest,
context) -> tsgrain_pb2.Task:
logging.info('GRPC RequestTask: %s', request)
res = self.app.request_task(
models.TaskRequest.deserialize_proto(request))
return res.serialize_proto()
def StartTask(self, request: tsgrain_pb2.TaskStart, context):
logging.info('GRPC StartTask: %s', request)
success = self.app.start_task(models.Source(request.source),
request.zone_id, request.duration,
request.queuing)
return wrappers_pb2.BoolValue(value=success)
def StopTask(self, request: tsgrain_pb2.TaskStop, context):
logging.info('GRPC StopTask: %s', request)
success = self.app.stop_task(models.Source(request.source),
request.zone_id)
return wrappers_pb2.BoolValue(value=success)
def _get_tasks(self) -> tsgrain_pb2.TaskList:
app_tasks = self.app.get_tasks()
tasks = [task.serialize_proto() for task in app_tasks]
return tsgrain_pb2.TaskList(now=util.datetime_to_proto(
util.datetime_now()),
tasks=tasks,
auto_mode=self.app.get_auto_mode())
def GetTasks(self, request: empty_pb2.Empty,
context) -> tsgrain_pb2.TaskList:
logging.info('GRPC GetTasks: %s', request)
return self._get_tasks()
def StreamTasks(self, request: empty_pb2.Empty,
context) -> Generator[tsgrain_pb2.TaskList, None, None]:
logging.debug('GRPC StreamTasks: %s', request)
if self.tasklist_update.wait(5):
yield self._get_tasks()
self.tasklist_update.clear()
def CreateJob(self, request: tsgrain_pb2.Job,
context) -> tsgrain_pb2.JobID:
logging.info('GRPC CreateJob: %s', request)
job = models.Job.deserialize_proto(request)
job_id = self.app.create_job(job)
return tsgrain_pb2.JobID(id=job_id)
def GetJob(self, request: tsgrain_pb2.JobID, context) -> tsgrain_pb2.Job:
logging.info('GRPC GetJob: %s', request)
try:
job = self.app.get_job(request.id)
except KeyError:
context.set_code(grpc.StatusCode.NOT_FOUND)
raise
return job.serialize_proto()
def GetJobs(self, request: empty_pb2.Empty,
context) -> tsgrain_pb2.JobList:
logging.info('GRPC GetJobs: %s', request)
jobs = [job.serialize_proto() for job in self.app.get_jobs()]
return tsgrain_pb2.JobList(jobs=jobs)
def UpdateJob(self, request: tsgrain_pb2.Job, context) -> empty_pb2.Empty:
logging.info('GRPC UpdateJob: %s', request)
job = models.Job.deserialize_proto(request)
self.app.update_job(job)
return empty_pb2.Empty()
def DeleteJob(self, request: tsgrain_pb2.JobID,
context) -> empty_pb2.Empty:
logging.info('GRPC DeleteJob: %s', request)
try:
self.app.delete_job(request.id)
except KeyError:
context.set_code(grpc.StatusCode.NOT_FOUND)
raise
return empty_pb2.Empty()
def EnableJob(self, request: tsgrain_pb2.JobID,
context) -> empty_pb2.Empty:
logging.info('GRPC EnableJob: %s', request)
try:
self.app.enable_job(request.id)
except KeyError:
context.set_code(grpc.StatusCode.NOT_FOUND)
raise
return empty_pb2.Empty()
def DisableJob(self, request: tsgrain_pb2.JobID,
context) -> empty_pb2.Empty:
logging.info('GRPC DisableJob: %s', request)
try:
self.app.disable_job(request.id)
except KeyError:
context.set_code(grpc.StatusCode.NOT_FOUND)
raise
return empty_pb2.Empty()
def GetAutoMode(self, request: empty_pb2.Empty,
context) -> wrappers_pb2.BoolValue:
logging.info('GRPC GetAutoMode: %s', request)
return wrappers_pb2.BoolValue(value=self.app.get_auto_mode())
def SetAutoMode(self, request: wrappers_pb2.BoolValue,
context) -> empty_pb2.Empty:
logging.info('GRPC SetAutoMode: %s', request)
self.app.set_auto_mode(request.value)
return empty_pb2.Empty()
def GetSystemTime(self, request: empty_pb2.Empty,
context) -> tsgrain_pb2.SystemTime:
logging.info('GRPC GetConfigTime: %s', request)
return tsgrain_pb2.SystemTime(datetime=util.datetime_to_proto(
util.datetime_now()),
timezone=self.app.get_system_timezone())
def SetSystemTime(self, request: tsgrain_pb2.Timestamp,
context) -> empty_pb2.Empty:
logging.info('GRPC SetSystemTime: %s', request)
if request.seconds > 0:
self.app.set_system_datetime(util.datetime_from_proto(request))
else:
raise Exception('timestamp invalid')
return empty_pb2.Empty()
def SetSystemTimezone(self, request: wrappers_pb2.StringValue,
context) -> empty_pb2.Empty:
logging.info('GRPC SetSystemTimezone: %s', request)
self.app.set_system_timezone(request.value)
return empty_pb2.Empty()
def GetDefaultIrrigationTime(self, request: empty_pb2.Empty,
context) -> wrappers_pb2.Int32Value:
logging.info('GRPC GetDefaultIrrigationTime: %s', request)
return wrappers_pb2.Int32Value(value=self.app.get_cfg().manual_time)
def SetDefaultIrrigationTime(self, request: wrappers_pb2.Int32Value,
context) -> empty_pb2.Empty:
logging.info('GRPC SetDefaultIrrigationTime: %s', request)
self.app.get_cfg().manual_time = request.value
return empty_pb2.Empty()
def GetNZones(self, request: empty_pb2.Empty,
context) -> wrappers_pb2.Int32Value:
logging.info('GRPC GetNZones: %s', request)
return wrappers_pb2.Int32Value(value=self.app.get_cfg().n_zones)
def new_server(servicer: TSGRainServicer, port: str) -> grpc.Server:
server = grpc.server(futures.ThreadPoolExecutor())
tsgrain_pb2_grpc.add_TSGRainServicer_to_server(servicer, server)
server.add_insecure_port(port)
return server