449 lines
14 KiB
Python
449 lines
14 KiB
Python
# coding=utf-8
|
|
import logging
|
|
import uuid
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta
|
|
from enum import Enum
|
|
from typing import List, Optional, Dict, Any
|
|
|
|
from tsgrain_controller import util, config
|
|
from tsgrain_controller.grpc_generated import tsgrain_pb2
|
|
|
|
|
|
@dataclass
|
|
class Job:
|
|
"""Bewässerungsjob (Zeitsteuerung)"""
|
|
|
|
date: datetime
|
|
"""Startdatum/Uhrzeit"""
|
|
|
|
duration: int
|
|
"""Bewässerungsdauer in Sekunden"""
|
|
|
|
zones: List[int]
|
|
"""Liste der Zonen, die bewässert werden sollen"""
|
|
|
|
enable: bool
|
|
"""Bewässerungsjob aktiviert"""
|
|
|
|
repeat: bool
|
|
"""Bewässerungsjob täglich wiederholen"""
|
|
|
|
id: int = 0
|
|
"""
|
|
ID, mit der der Bewässerungsjob in der Datenbank gespeichert wird.
|
|
Ist Null bei neu erstellten Objekten, die sich noch nicht in der Datenbank
|
|
befinden.
|
|
"""
|
|
|
|
@property
|
|
def is_active(self) -> bool:
|
|
"""
|
|
Gibt True zurück, wenn der Bewässerungsjob in Zukunft ausgeführt werden wird.
|
|
"""
|
|
if not self.enable:
|
|
return False
|
|
if self.repeat:
|
|
return True
|
|
|
|
return self.date > util.datetime_now()
|
|
|
|
def check(self, date_now: datetime) -> bool:
|
|
"""
|
|
Gibt True zurück, wenn der Bewässerungsjob in dieser Minute
|
|
gestartet werden soll.
|
|
|
|
:param date_now: Aktuelles Datum/Uhrzeit
|
|
"""
|
|
if not self.enable:
|
|
return False
|
|
|
|
if self.repeat:
|
|
return date_now.hour == self.date.hour \
|
|
and date_now.minute == self.date.minute
|
|
|
|
return date_now.year == self.date.year \
|
|
and date_now.month == self.date.month \
|
|
and date_now.day == self.date.day \
|
|
and date_now.hour == self.date.hour \
|
|
and date_now.minute == self.date.minute
|
|
|
|
def serialize(self) -> Dict[str, Any]:
|
|
return {
|
|
'date': self.date,
|
|
'duration': self.duration,
|
|
'zones': self.zones,
|
|
'enable': self.enable,
|
|
'repeat': self.repeat,
|
|
}
|
|
|
|
@classmethod
|
|
def deserialize(cls, data: dict, job_id: int = 0) -> 'Job':
|
|
return cls(
|
|
date=util.datetime_deserialize(data['date']),
|
|
duration=data['duration'],
|
|
zones=data['zones'],
|
|
enable=data['enable'],
|
|
repeat=data['repeat'],
|
|
id=job_id,
|
|
)
|
|
|
|
def serialize_proto(self) -> tsgrain_pb2.Job:
|
|
return tsgrain_pb2.Job(id=self.id,
|
|
date=util.datetime_to_proto(self.date),
|
|
duration=self.duration,
|
|
zones=self.zones,
|
|
enable=self.enable,
|
|
repeat=self.repeat)
|
|
|
|
@classmethod
|
|
def deserialize_proto(cls, data: tsgrain_pb2.Job) -> 'Job':
|
|
return cls(
|
|
date=util.datetime_from_proto(data.date),
|
|
duration=data.duration,
|
|
zones=list(data.zones),
|
|
enable=data.enable,
|
|
repeat=data.repeat,
|
|
id=data.id,
|
|
)
|
|
|
|
def validate(self, app: 'AppInterface'):
|
|
if not self.zones:
|
|
raise util.InvalidInputException('No zones set')
|
|
if self.duration < 1:
|
|
raise util.InvalidInputException('Irrigation time not set')
|
|
|
|
for zone_id in self.zones:
|
|
_validate_zone_id(zone_id, app)
|
|
|
|
|
|
class Source(Enum):
|
|
"""Quelle einer Bewässerungsaufgabe"""
|
|
|
|
MANUAL = 0
|
|
SCHEDULE = 1
|
|
|
|
|
|
@dataclass
|
|
class Task:
|
|
"""Bewässerungsaufgabe"""
|
|
|
|
source: Source
|
|
"""Quelle des Tasks (Manuell/Zeitplan)"""
|
|
|
|
zone_id: int
|
|
"""Nummer der Zone"""
|
|
|
|
duration: int
|
|
"""Beregnungsdauer in Sekunden"""
|
|
|
|
_remaining: int = 0
|
|
"""Interne Variable, um die verbleibende Zeit eines gestoppten Tasks zu speichern"""
|
|
|
|
_id: int = 0
|
|
|
|
datetime_started: Optional[datetime] = None
|
|
"""Zeitpunkt, wann der Task gestartet wurde"""
|
|
|
|
def __post_init__(self):
|
|
self._remaining = self.duration
|
|
self._id = uuid.uuid1().int
|
|
|
|
@property
|
|
def is_running(self) -> bool:
|
|
"""
|
|
:return: True falls der Task momentan läuft
|
|
"""
|
|
return self.datetime_started is not None
|
|
|
|
@property
|
|
def remaining(self) -> int:
|
|
"""
|
|
:return: Verbleibende Zeit in Sekunden
|
|
"""
|
|
if not self.is_running:
|
|
return self._remaining
|
|
|
|
d = self.datetime_finished - util.datetime_now()
|
|
return d.seconds
|
|
|
|
@property
|
|
def is_done(self) -> bool:
|
|
"""
|
|
:return: True wenn der Task bereits abgeschlossen ist.
|
|
"""
|
|
return self.remaining <= 0
|
|
|
|
@property
|
|
def datetime_finished(self) -> Optional[datetime]:
|
|
"""
|
|
:return: Zeitpunkt, zu dem der Task abgeschlossen sein wird.
|
|
None falls der Task momentan nicht läuft.
|
|
"""
|
|
if self.datetime_started is None:
|
|
return None
|
|
|
|
return self.datetime_started + timedelta(seconds=self._remaining)
|
|
|
|
def start(self):
|
|
"""Startet den Task zur aktuellen Zeit."""
|
|
if self.is_running:
|
|
raise util.TaskRunException('already running')
|
|
self.datetime_started = util.datetime_now()
|
|
|
|
def stop(self):
|
|
"""Stoppt den Task und speichert die verbleibende Zeit"""
|
|
if not self.is_running:
|
|
raise util.TaskRunException('not running')
|
|
|
|
self._remaining = self.remaining
|
|
self.datetime_started = None
|
|
|
|
def serialize(self) -> Dict[str, Any]:
|
|
"""Task zur Speicherung in der Datenbank in dict umwandeln."""
|
|
return {
|
|
'source': self.source.name,
|
|
'zone_id': self.zone_id,
|
|
'duration': self.duration,
|
|
'remaining': self.remaining
|
|
}
|
|
|
|
def serialize_rpc(self) -> Dict[str, Any]:
|
|
"""Task zur aktuellen Statusübertragung in dict umwandeln."""
|
|
return {
|
|
'source': self.source.name,
|
|
'zone_id': self.zone_id,
|
|
'duration': self.duration,
|
|
'datetime_started': self.datetime_started,
|
|
'datetime_finished': self.datetime_finished
|
|
}
|
|
|
|
def serialize_proto(self) -> tsgrain_pb2.Task:
|
|
return tsgrain_pb2.Task(
|
|
source=self.source.value,
|
|
zone_id=self.zone_id,
|
|
duration=self.duration,
|
|
datetime_started=util.datetime_to_proto(self.datetime_started),
|
|
datetime_finished=util.datetime_to_proto(self.datetime_finished))
|
|
|
|
def validate(self, app: 'AppInterface'):
|
|
if not isinstance(self.source, Source):
|
|
raise util.InvalidInputException('Source invalid')
|
|
if self.duration < 1:
|
|
raise util.InvalidInputException('Irrigation time not set')
|
|
_validate_zone_id(self.zone_id, app)
|
|
|
|
@classmethod
|
|
def deserialize(cls, data: dict) -> 'Task':
|
|
task = cls(source=Source[data['source']],
|
|
zone_id=data['zone_id'],
|
|
duration=data['duration'])
|
|
task._remaining = data['remaining']
|
|
return task
|
|
|
|
def __eq__(self, other: 'Task') -> bool:
|
|
return self._id == other._id
|
|
|
|
def __hash__(self) -> int:
|
|
return hash(self._id)
|
|
|
|
def __str__(self):
|
|
return f'ZONE {self.zone_id}: {self.duration} {self.source}'
|
|
|
|
|
|
@dataclass
|
|
class TaskRequest:
|
|
"""Enthält alle Parameter einer zu erstellenden Bewässerungsaufgabe"""
|
|
|
|
source: Source
|
|
"""Quelle des Tasks (Manuell/Zeitplan)"""
|
|
|
|
zone_id: int
|
|
"""Nummer der Zone"""
|
|
|
|
duration: int = 0
|
|
"""Beregnungsdauer in Sekunden"""
|
|
|
|
queuing: bool = False
|
|
"""
|
|
Task in die Warteschlange einreihen, wenn er nicht sofort gestartet werden kann.
|
|
"""
|
|
|
|
cancelling: bool = False
|
|
"""Task aus der Warteschlange entfernen/abbrechen, wenn er bereits exisitiert."""
|
|
|
|
@classmethod
|
|
def deserialize_proto(cls, data: tsgrain_pb2.TaskRequest) -> 'TaskRequest':
|
|
return cls(source=Source(data.source),
|
|
zone_id=data.zone_id,
|
|
duration=data.duration,
|
|
queuing=data.queuing,
|
|
cancelling=data.cancelling)
|
|
|
|
|
|
@dataclass
|
|
class TaskRequestResult:
|
|
"""
|
|
Wird beim Versuch, eine neue Bewässerungsaufgabe zu starten, zurückgegeben.
|
|
Enthält die Information, ob eine Aufgabe gestartet oder gestoppt wurde.
|
|
"""
|
|
|
|
started: bool
|
|
"""True wenn eine neue Aufgabe gestartet wurde"""
|
|
|
|
stopped: bool
|
|
"""True wenn eine laufende Aufgabe gestoppt wurde"""
|
|
|
|
def serialize_proto(self) -> tsgrain_pb2.TaskRequestResult:
|
|
return tsgrain_pb2.TaskRequestResult(started=self.started,
|
|
stopped=self.stopped)
|
|
|
|
|
|
class AppInterface:
|
|
"""Beinhaltet sämtliche Methoden der TSGRain-Anwendung"""
|
|
|
|
def get_auto_mode(self) -> bool:
|
|
"""Gibt den Status des Automatikmodus zurück."""
|
|
|
|
def set_auto_mode(self, state: bool):
|
|
"""Aktiviert/deaktiviert den Automatikmodus."""
|
|
|
|
def get_cfg(self) -> config.Config:
|
|
"""Gibt das Konfigurationsobjekt der Anwendung zurück."""
|
|
|
|
def get_logger(self) -> logging.Logger:
|
|
"""Gibt den Logger der Anwendung zurück."""
|
|
|
|
def request_task(self, request: TaskRequest) -> TaskRequestResult:
|
|
"""
|
|
Starte eine neue Bewässerungsaufgabe (oder stoppe eine laufende, wenn
|
|
diese bereits läuft). Wird für den manuellen Start mittels Taster verwendet.
|
|
|
|
:param request: Objekt, dass die Parameter der neuen Aufgabe enthält.
|
|
:return: Statusobjekt (Information, ob eine Aufgabe gestartet oder gestoppt
|
|
wurde).
|
|
"""
|
|
|
|
def start_task(self, source: Source, zone_id: int, duration: int,
|
|
queuing: bool) -> bool:
|
|
"""
|
|
Starte eine neue Bewässerungsaufgabe
|
|
|
|
:param source: Quelle der Bewässeungsaufgabe (Manuell/Automatik)
|
|
:param zone_id: ID der Bewässerungszone
|
|
:param duration: Bewässerungsdauer in Sekunden (0 für Standarddauer)
|
|
:param queuing: Neue Aufgabe in die Warteschlange einreihen, wenn momentan
|
|
eine andere Zone bewässert wird.
|
|
:return: True wenn die Aufgabe erfolgreich gestartet wurde
|
|
"""
|
|
|
|
def stop_task(self, source: Source, zone_id: int) -> bool:
|
|
"""
|
|
Stoppe eine laufende Bewässerungsaufgabe
|
|
|
|
:param source: Quelle der Bewässeungsaufgabe (Manuell/Automatik)
|
|
:param zone_id: ID der Bewässerungszone
|
|
:return: True wenn die Aufgabe erfolgreich gestoppt wurde
|
|
"""
|
|
|
|
def get_tasks(self) -> List[Task]:
|
|
"""
|
|
Gibt sämtliche in der Warteschlange befindlichen Bewässerungsaufgaben zurück.
|
|
|
|
:return: Liste von Bewässerungsaufgaben
|
|
"""
|
|
|
|
def create_job(self, job: Job) -> int:
|
|
"""
|
|
Erstelle einen neuen Bewässerungsjob.
|
|
|
|
:param job: Bewässerungsjob
|
|
:return: ID des neuen Bewässerungsjobs
|
|
"""
|
|
|
|
def get_job(self, job_id: int) -> Job:
|
|
"""
|
|
Gibt den Bewässerungsjob mit der gegebenen ID zurück.
|
|
|
|
:param job_id: ID des Bewässerungsjobs
|
|
:return: Bewässerungsjob
|
|
:raise KeyError: wenn Job nicht gefunden
|
|
"""
|
|
|
|
def get_jobs(self) -> List[Job]:
|
|
"""
|
|
Gibt alle gespeicherten Bewässerungsjobs zurück.
|
|
|
|
:return: Bewässerungsjobs: dict(id -> models.Job)
|
|
"""
|
|
|
|
def update_job(self, job: Job):
|
|
"""
|
|
Aktualisiert einen Bewässerungsjob.
|
|
|
|
:param job: Bewässerungsjob
|
|
"""
|
|
|
|
def delete_job(self, job_id: int):
|
|
"""
|
|
Lösche den Bewässerungsjob mit der gegebenen ID.
|
|
|
|
:param job_id: ID des Bewässerungsjobs
|
|
:raise KeyError: wenn Job nicht gefunden
|
|
"""
|
|
|
|
def enable_job(self, job_id: int):
|
|
"""
|
|
Aktiviere den Bewässerungsjob mit der gegebenen ID.
|
|
|
|
:param job_id: ID des Bewässerungsjobs
|
|
"""
|
|
|
|
def disable_job(self, job_id: int):
|
|
"""
|
|
Deaktiviere den Bewässerungsjob mit der gegebenen ID.
|
|
|
|
:param job_id: ID des Bewässerungsjobs
|
|
"""
|
|
|
|
def notify_queue_update(self):
|
|
"""
|
|
Wird aufgerufen, wenn die Warteschlange aktualisiert wird.
|
|
Dient dazu, andere Komponenten der Anwendung zu benachrichtigen
|
|
(momentan den GRPC-Server).
|
|
"""
|
|
|
|
def is_running(self) -> bool:
|
|
"""Gibt den Status der Anwendung zurück"""
|
|
|
|
def get_system_timezone(self) -> str:
|
|
"""
|
|
Rufe die Systemzeitzone mit dem entsprechenden Befehl ab.
|
|
|
|
:return: Systemzeitzone im Unix-Format (z.B. Europe/Berlin)
|
|
:raise ErrorTimeConfig: wenn der Befehl einen Fehler zurückgibt
|
|
:raise ErrorInvalidTimezone: wenn die ermittelte Zeitzone ein ungültiges Format hat
|
|
"""
|
|
|
|
def set_system_datetime(self, date_time: datetime):
|
|
"""
|
|
Ändere die Systemzeit. Die Anwendung muss hierfür als Root laufen.
|
|
|
|
:param date_time: Neue Systemzeit
|
|
:raise ErrorTimeConfig: wenn Befehl nicht erfolgreich
|
|
"""
|
|
|
|
def set_system_timezone(self, tz: str):
|
|
"""
|
|
Ändere die Systemzeitzone. Die Anwendung muss hierfür als Root laufen.
|
|
|
|
:param tz: Neue Zeitzone im Unix-Format (z.B. Europe/Berlin)
|
|
:raise ErrorInvalidCmdTemplate: wenn die Befehlsvorlage keinen Platzhalter enthält
|
|
:raise ErrorInvalidTimezone: wenn die eingegebene Zeitzone ein ungültiges Format hat
|
|
:raise ErrorTimeConfig: wenn der Konfigurationsbefehl einen Fehler zurückgibt
|
|
"""
|
|
|
|
|
|
def _validate_zone_id(zone_id: int, app: AppInterface):
|
|
if zone_id < 0 or zone_id > app.get_cfg().n_zones:
|
|
raise util.InvalidInputException('ZoneID invalid')
|