package tsgrain_rpc import ( "context" "encoding/json" "errors" "io" "time" "code.thetadev.de/TSGRain/WebUI/src/model" tsgrain_grpc "code.thetadev.de/TSGRain/WebUI/src/tsgrain_rpc/proto" "code.thetadev.de/TSGRain/WebUI/src/util" "github.com/rs/zerolog/log" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/wrapperspb" ) type RPCClient struct { address string conn *grpc.ClientConn tsgrain tsgrain_grpc.TSGRainClient ctx context.Context cancel context.CancelFunc } func NewClient(address string) *RPCClient { return &RPCClient{ address: address, } } func (c *RPCClient) Start() error { ctx, cancel := context.WithCancel(context.Background()) c.ctx = ctx c.cancel = cancel conn, err := grpc.DialContext(c.ctx, c.address, grpc.WithTransportCredentials( insecure.NewCredentials())) if err != nil { return err } c.conn = conn c.tsgrain = tsgrain_grpc.NewTSGRainClient(c.conn) return nil } func (c *RPCClient) Stop() error { c.cancel() return c.conn.Close() } func mapTask(pbTask *tsgrain_grpc.Task) model.Task { task := model.Task{ Source: int32(pbTask.Source), ZoneId: pbTask.ZoneId, Duration: pbTask.Duration, } if pbTask.DatetimeStarted != nil { task.DatetimeStarted = &pbTask.DatetimeStarted.Seconds } if pbTask.DatetimeFinished != nil { task.DatetimeFinished = &pbTask.DatetimeFinished.Seconds } return task } func mapTaskList(pbTaskList *tsgrain_grpc.TaskList) model.TaskList { taskList := model.TaskList{ Now: pbTaskList.Now.Seconds, Tasks: []model.Task{}, AutoMode: pbTaskList.AutoMode, } for _, pbTask := range pbTaskList.Tasks { if pbTask != nil { taskList.Tasks = append(taskList.Tasks, mapTask(pbTask)) } } return taskList } func (c *RPCClient) StartManualTask(zone_id int32, duration int32) (bool, error) { res, err := c.tsgrain.StartTask(c.ctx, &tsgrain_grpc.TaskStart{ Source: tsgrain_grpc.TaskSource_MANUAL, ZoneId: zone_id, Duration: duration, Queuing: true, }) if err != nil { return false, err } return res.Value, nil } func (c *RPCClient) StopManualTask(zone_id int32) (bool, error) { res, err := c.tsgrain.StopTask(c.ctx, &tsgrain_grpc.TaskStop{ Source: tsgrain_grpc.TaskSource_MANUAL, ZoneId: zone_id, }) if err != nil { return false, err } return res.Value, nil } func (c *RPCClient) GetTasks() (model.TaskList, error) { pbTaskList, err := c.tsgrain.GetTasks(c.ctx, &emptypb.Empty{}) if err != nil { return model.TaskList{}, err } return mapTaskList(pbTaskList), nil } func broadcastTaskList(bc util.Broadcaster, pbTaskList *tsgrain_grpc.TaskList) error { taskList := mapTaskList(pbTaskList) taskListJson, err := json.Marshal(taskList) if err != nil { return err } bc.Broadcast(taskListJson) log.Debug().RawJSON("task_list", taskListJson).Msg("TaskList received") return nil } func (c *RPCClient) streamTasks(bc util.Broadcaster) error { stream, err := c.tsgrain.StreamTasks(c.ctx, &emptypb.Empty{}) if err != nil { return err } for { pbTaskList, err := stream.Recv() if errors.Is(err, io.EOF) { return nil } if err != nil { return err } else { _ = broadcastTaskList(bc, pbTaskList) } } } func (c *RPCClient) StreamTasks(bc util.Broadcaster) { // Get initial state pbTaskList, err := c.tsgrain.GetTasks(c.ctx, &emptypb.Empty{}) if err == nil { _ = broadcastTaskList(bc, pbTaskList) } // Keep stream running if it errors for { select { case <-c.ctx.Done(): return default: err := c.streamTasks(bc) if err != nil { log.Error().Err(err).Msg("StreamTasks error") time.Sleep(3 * time.Second) } } } } func (c *RPCClient) CreateJob(job model.NewJob) (int32, error) { pbJob := &tsgrain_grpc.Job{ Date: &tsgrain_grpc.Timestamp{Seconds: job.Date}, Duration: job.Duration, Zones: job.Zones, Enable: job.Enable, Repeat: job.Repeat, } res, err := c.tsgrain.CreateJob(c.ctx, pbJob) if err != nil { return 0, err } return res.Id, nil } func (c *RPCClient) GetJobs() (model.JobList, error) { jobs := []model.Job{} res, err := c.tsgrain.GetJobs(c.ctx, &emptypb.Empty{}) if err != nil { return jobs, err } for _, pbJob := range res.Jobs { job := model.Job{ Id: pbJob.Id, Date: pbJob.Date.Seconds, Duration: pbJob.Duration, Zones: pbJob.Zones, Enable: pbJob.Enable, Repeat: pbJob.Repeat, } jobs = append(jobs, job) } return jobs, nil } func (c *RPCClient) GetJob(id int32) (model.Job, error) { res, err := c.tsgrain.GetJob(c.ctx, &tsgrain_grpc.JobID{Id: id}) if err != nil { return model.Job{}, err } job := model.Job{ Id: res.Id, Date: res.Date.Seconds, Duration: res.Duration, Zones: res.Zones, Enable: res.Enable, Repeat: res.Repeat, } return job, nil } func (c *RPCClient) UpdateJob(job model.Job) error { pbJob := tsgrain_grpc.Job{ Id: job.Id, Date: &tsgrain_grpc.Timestamp{Seconds: job.Date}, Duration: job.Duration, Zones: job.Zones, Enable: job.Enable, Repeat: job.Repeat, } _, err := c.tsgrain.UpdateJob(c.ctx, &pbJob) return err } func (c *RPCClient) DeleteJob(job_id int32) error { _, err := c.tsgrain.DeleteJob(c.ctx, &tsgrain_grpc.JobID{Id: job_id}) return err } func (c *RPCClient) GetAutoMode() (bool, error) { res, err := c.tsgrain.GetAutoMode(c.ctx, &emptypb.Empty{}) if err != nil { return false, err } return res.Value, nil } func (c *RPCClient) SetAutoMode(state bool) error { _, err := c.tsgrain.SetAutoMode(c.ctx, wrapperspb.Bool(state)) return err } func (c *RPCClient) GetSystemTime() (model.SystemTime, error) { configTime, err := c.tsgrain.GetSystemTime(c.ctx, &emptypb.Empty{}) if err != nil { return model.SystemTime{}, err } return model.SystemTime{ Time: configTime.Datetime.Seconds, Timezone: configTime.Timezone, }, nil } func (c *RPCClient) SetSystemTime(timestamp int64) error { _, err := c.tsgrain.SetSystemTime(c.ctx, &tsgrain_grpc.Timestamp{Seconds: timestamp}) return err } func (c *RPCClient) SetSystemTimezone(timezone string) error { _, err := c.tsgrain.SetSystemTimezone(c.ctx, &wrapperspb.StringValue{Value: timezone}) return err } func (c *RPCClient) GetDefaultIrrigationTime() (int32, error) { res, err := c.tsgrain.GetDefaultIrrigationTime(c.ctx, &emptypb.Empty{}) if err != nil { return 0, err } return res.Value, nil } func (c *RPCClient) SetDefaultIrrigationTime(defaultTime int32) error { _, err := c.tsgrain.SetDefaultIrrigationTime(c.ctx, wrapperspb.Int32(defaultTime)) return err } func (c *RPCClient) GetNZones() (int32, error) { val, err := c.tsgrain.GetNZones(c.ctx, &emptypb.Empty{}) if err != nil { return 0, err } return val.Value, nil }