308 lines
6.7 KiB
Go
308 lines
6.7 KiB
Go
package tsgrain_rpc
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"io"
|
|
"time"
|
|
|
|
"code.thetadev.de/TSGRain/WebUI/src/model"
|
|
tsgrain_grpc "code.thetadev.de/TSGRain/WebUI/src/tsgrain_rpc/proto"
|
|
"code.thetadev.de/TSGRain/WebUI/src/util"
|
|
"github.com/rs/zerolog/log"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/protobuf/types/known/emptypb"
|
|
"google.golang.org/protobuf/types/known/wrapperspb"
|
|
)
|
|
|
|
type RPCClient struct {
|
|
address string
|
|
conn *grpc.ClientConn
|
|
tsgrain tsgrain_grpc.TSGRainClient
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
func NewClient(address string) *RPCClient {
|
|
return &RPCClient{
|
|
address: address,
|
|
}
|
|
}
|
|
|
|
func (c *RPCClient) Start() error {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
c.ctx = ctx
|
|
c.cancel = cancel
|
|
|
|
conn, err := grpc.DialContext(c.ctx, c.address, grpc.WithTransportCredentials(
|
|
insecure.NewCredentials()))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.conn = conn
|
|
c.tsgrain = tsgrain_grpc.NewTSGRainClient(c.conn)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *RPCClient) Stop() error {
|
|
c.cancel()
|
|
return c.conn.Close()
|
|
}
|
|
|
|
func mapTask(pbTask *tsgrain_grpc.Task) model.Task {
|
|
task := model.Task{
|
|
Source: int32(pbTask.Source),
|
|
ZoneId: pbTask.ZoneId,
|
|
Duration: pbTask.Duration,
|
|
}
|
|
|
|
if pbTask.DatetimeStarted != nil {
|
|
task.DatetimeStarted = &pbTask.DatetimeStarted.Seconds
|
|
}
|
|
if pbTask.DatetimeFinished != nil {
|
|
task.DatetimeFinished = &pbTask.DatetimeFinished.Seconds
|
|
}
|
|
return task
|
|
}
|
|
|
|
func mapTaskList(pbTaskList *tsgrain_grpc.TaskList) model.TaskList {
|
|
taskList := model.TaskList{
|
|
Now: pbTaskList.Now.Seconds,
|
|
Tasks: []model.Task{},
|
|
AutoMode: pbTaskList.AutoMode,
|
|
}
|
|
|
|
for _, pbTask := range pbTaskList.Tasks {
|
|
if pbTask != nil {
|
|
taskList.Tasks = append(taskList.Tasks, mapTask(pbTask))
|
|
}
|
|
}
|
|
return taskList
|
|
}
|
|
|
|
func (c *RPCClient) StartManualTask(zone_id int32, duration int32) (bool, error) {
|
|
res, err := c.tsgrain.StartTask(c.ctx, &tsgrain_grpc.TaskStart{
|
|
Source: tsgrain_grpc.TaskSource_MANUAL,
|
|
ZoneId: zone_id,
|
|
Duration: duration,
|
|
Queuing: true,
|
|
})
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return res.Value, nil
|
|
}
|
|
|
|
func (c *RPCClient) StopManualTask(zone_id int32) (bool, error) {
|
|
res, err := c.tsgrain.StopTask(c.ctx, &tsgrain_grpc.TaskStop{
|
|
Source: tsgrain_grpc.TaskSource_MANUAL,
|
|
ZoneId: zone_id,
|
|
})
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return res.Value, nil
|
|
}
|
|
|
|
func (c *RPCClient) GetTasks() (model.TaskList, error) {
|
|
pbTaskList, err := c.tsgrain.GetTasks(c.ctx, &emptypb.Empty{})
|
|
if err != nil {
|
|
return model.TaskList{}, err
|
|
}
|
|
return mapTaskList(pbTaskList), nil
|
|
}
|
|
|
|
func broadcastTaskList(bc util.Broadcaster, pbTaskList *tsgrain_grpc.TaskList) error {
|
|
taskList := mapTaskList(pbTaskList)
|
|
|
|
taskListJson, err := json.Marshal(taskList)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
bc.Broadcast(taskListJson)
|
|
log.Debug().RawJSON("task_list", taskListJson).Msg("TaskList received")
|
|
return nil
|
|
}
|
|
|
|
func (c *RPCClient) streamTasks(bc util.Broadcaster) error {
|
|
stream, err := c.tsgrain.StreamTasks(c.ctx, &emptypb.Empty{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
pbTaskList, err := stream.Recv()
|
|
if errors.Is(err, io.EOF) {
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return err
|
|
} else {
|
|
_ = broadcastTaskList(bc, pbTaskList)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *RPCClient) StreamTasks(bc util.Broadcaster) {
|
|
// Get initial state
|
|
pbTaskList, err := c.tsgrain.GetTasks(c.ctx, &emptypb.Empty{})
|
|
if err == nil {
|
|
_ = broadcastTaskList(bc, pbTaskList)
|
|
}
|
|
|
|
// Keep stream running if it errors
|
|
for {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
return
|
|
default:
|
|
err := c.streamTasks(bc)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("StreamTasks error")
|
|
time.Sleep(3 * time.Second)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *RPCClient) CreateJob(job model.NewJob) (int32, error) {
|
|
pbJob := &tsgrain_grpc.Job{
|
|
Date: &tsgrain_grpc.Timestamp{Seconds: job.Date},
|
|
Duration: job.Duration,
|
|
Zones: job.Zones,
|
|
Enable: job.Enable,
|
|
Repeat: job.Repeat,
|
|
}
|
|
|
|
res, err := c.tsgrain.CreateJob(c.ctx, pbJob)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return res.Id, nil
|
|
}
|
|
|
|
func (c *RPCClient) GetJobs() (model.JobList, error) {
|
|
jobs := []model.Job{}
|
|
|
|
res, err := c.tsgrain.GetJobs(c.ctx, &emptypb.Empty{})
|
|
if err != nil {
|
|
return jobs, err
|
|
}
|
|
|
|
for _, pbJob := range res.Jobs {
|
|
job := model.Job{
|
|
Id: pbJob.Id,
|
|
Date: pbJob.Date.Seconds,
|
|
Duration: pbJob.Duration,
|
|
Zones: pbJob.Zones,
|
|
Enable: pbJob.Enable,
|
|
Repeat: pbJob.Repeat,
|
|
}
|
|
jobs = append(jobs, job)
|
|
}
|
|
|
|
return jobs, nil
|
|
}
|
|
|
|
func (c *RPCClient) GetJob(id int32) (model.Job, error) {
|
|
res, err := c.tsgrain.GetJob(c.ctx, &tsgrain_grpc.JobID{Id: id})
|
|
if err != nil {
|
|
return model.Job{}, err
|
|
}
|
|
|
|
job := model.Job{
|
|
Id: res.Id,
|
|
Date: res.Date.Seconds,
|
|
Duration: res.Duration,
|
|
Zones: res.Zones,
|
|
Enable: res.Enable,
|
|
Repeat: res.Repeat,
|
|
}
|
|
return job, nil
|
|
}
|
|
|
|
func (c *RPCClient) UpdateJob(job model.Job) error {
|
|
pbJob := tsgrain_grpc.Job{
|
|
Id: job.Id,
|
|
Date: &tsgrain_grpc.Timestamp{Seconds: job.Date},
|
|
Duration: job.Duration,
|
|
Zones: job.Zones,
|
|
Enable: job.Enable,
|
|
Repeat: job.Repeat,
|
|
}
|
|
|
|
_, err := c.tsgrain.UpdateJob(c.ctx, &pbJob)
|
|
return err
|
|
}
|
|
|
|
func (c *RPCClient) DeleteJob(job_id int32) error {
|
|
_, err := c.tsgrain.DeleteJob(c.ctx, &tsgrain_grpc.JobID{Id: job_id})
|
|
return err
|
|
}
|
|
|
|
func (c *RPCClient) GetAutoMode() (bool, error) {
|
|
res, err := c.tsgrain.GetAutoMode(c.ctx, &emptypb.Empty{})
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return res.Value, nil
|
|
}
|
|
|
|
func (c *RPCClient) SetAutoMode(state bool) error {
|
|
_, err := c.tsgrain.SetAutoMode(c.ctx, wrapperspb.Bool(state))
|
|
return err
|
|
}
|
|
|
|
func (c *RPCClient) GetSystemTime() (model.SystemTime, error) {
|
|
configTime, err := c.tsgrain.GetSystemTime(c.ctx, &emptypb.Empty{})
|
|
if err != nil {
|
|
return model.SystemTime{}, err
|
|
}
|
|
|
|
return model.SystemTime{
|
|
Time: configTime.Datetime.Seconds,
|
|
Timezone: configTime.Timezone,
|
|
}, nil
|
|
}
|
|
|
|
func (c *RPCClient) SetSystemTime(timestamp int64) error {
|
|
_, err := c.tsgrain.SetSystemTime(c.ctx,
|
|
&tsgrain_grpc.Timestamp{Seconds: timestamp})
|
|
return err
|
|
}
|
|
|
|
func (c *RPCClient) SetSystemTimezone(timezone string) error {
|
|
_, err := c.tsgrain.SetSystemTimezone(c.ctx,
|
|
&wrapperspb.StringValue{Value: timezone})
|
|
return err
|
|
}
|
|
|
|
func (c *RPCClient) GetDefaultIrrigationTime() (int32, error) {
|
|
res, err := c.tsgrain.GetDefaultIrrigationTime(c.ctx, &emptypb.Empty{})
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return res.Value, nil
|
|
}
|
|
|
|
func (c *RPCClient) SetDefaultIrrigationTime(defaultTime int32) error {
|
|
_, err := c.tsgrain.SetDefaultIrrigationTime(c.ctx, wrapperspb.Int32(defaultTime))
|
|
return err
|
|
}
|
|
|
|
func (c *RPCClient) GetNZones() (int32, error) {
|
|
val, err := c.tsgrain.GetNZones(c.ctx, &emptypb.Empty{})
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return val.Value, nil
|
|
}
|