Controller/tsgrain_controller/models.py
2022-02-22 00:12:38 +01:00

441 lines
13 KiB
Python

# coding=utf-8
import logging
import uuid
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import List, Optional, Dict, Any
from tsgrain_controller import util, config
from tsgrain_controller.grpc_generated import tsgrain_pb2
@dataclass
class Job:
"""Bewässerungsjob (Zeitsteuerung)"""
date: datetime
"""Startdatum/Uhrzeit"""
duration: int
"""Bewässerungsdauer in Sekunden"""
zones: List[int]
"""Liste der Zonen, die bewässert werden sollen"""
enable: bool
"""Bewässerungsjob aktiviert"""
repeat: bool
"""Bewässerungsjob täglich wiederholen"""
id: int = 0
"""
ID, mit der der Bewässerungsjob in der Datenbank gespeichert wird.
Ist Null bei neu erstellten Objekten, die sich noch nicht in der Datenbank
befinden.
"""
def check(self, date_now: datetime) -> bool:
"""
Gibt True zurück, wenn der Bewässerungsjob in dieser Minute
gestartet werden soll.
:param date_now: Aktuelles Datum/Uhrzeit
"""
if not self.enable:
return False
if self.repeat:
return date_now.hour == self.date.hour \
and date_now.minute == self.date.minute
return date_now.year == self.date.year \
and date_now.month == self.date.month \
and date_now.day == self.date.day \
and date_now.hour == self.date.hour \
and date_now.minute == self.date.minute
def serialize(self) -> Dict[str, Any]:
return {
'date': self.date,
'duration': self.duration,
'zones': self.zones,
'enable': self.enable,
'repeat': self.repeat,
}
@classmethod
def deserialize(cls, data: dict, job_id: int = 0) -> 'Job':
return cls(
date=util.datetime_deserialize(data['date']),
duration=data['duration'],
zones=data['zones'],
enable=data['enable'],
repeat=data['repeat'],
id=job_id,
)
def serialize_proto(self) -> tsgrain_pb2.Job:
return tsgrain_pb2.Job(id=self.id,
date=util.datetime_to_proto(self.date),
duration=self.duration,
zones=self.zones,
enable=self.enable,
repeat=self.repeat)
@classmethod
def deserialize_proto(cls, data: tsgrain_pb2.Job) -> 'Job':
return cls(
date=util.datetime_from_proto(data.date),
duration=data.duration,
zones=list(data.zones),
enable=data.enable,
repeat=data.repeat,
id=data.id,
)
def validate(self, app: 'AppInterface'):
"""
Prüfe, ob der Task gültige Daten enthält
(Zonen existieren, Bewässerungszeit > 0)
"""
if not self.zones:
raise util.InvalidInputException('No zones set')
if self.duration < 1:
raise util.InvalidInputException('Irrigation time not set')
for zone_id in self.zones:
_validate_zone_id(zone_id, app)
class Source(Enum):
"""Quelle einer Bewässerungsaufgabe"""
MANUAL = 0
SCHEDULE = 1
@dataclass
class Task:
"""Bewässerungsaufgabe"""
source: Source
"""Quelle des Tasks (Manuell/Zeitplan)"""
zone_id: int
"""Nummer der Zone"""
duration: int
"""Beregnungsdauer in Sekunden"""
_remaining: int = 0
"""Interne Variable, um die verbleibende Zeit eines gestoppten Tasks zu speichern"""
_id: int = 0
datetime_started: Optional[datetime] = None
"""Zeitpunkt, wann der Task gestartet wurde"""
def __post_init__(self):
self._remaining = self.duration
self._id = uuid.uuid1().int
@property
def is_running(self) -> bool:
"""
:return: True falls der Task momentan läuft
"""
return self.datetime_started is not None
@property
def remaining(self) -> int:
"""
:return: Verbleibende Zeit in Sekunden
"""
if not self.is_running:
return self._remaining
d = self.datetime_finished - util.datetime_now()
return d.seconds
@property
def is_done(self) -> bool:
"""
:return: True wenn der Task bereits abgeschlossen ist.
"""
return self.remaining <= 0
@property
def datetime_finished(self) -> Optional[datetime]:
"""
:return: Zeitpunkt, zu dem der Task abgeschlossen sein wird.
None falls der Task momentan nicht läuft.
"""
if self.datetime_started is None:
return None
return self.datetime_started + timedelta(seconds=self._remaining)
def start(self):
"""Startet den Task zur aktuellen Zeit."""
if self.is_running:
raise util.TaskRunException('already running')
self.datetime_started = util.datetime_now()
def stop(self):
"""Stoppt den Task und speichert die verbleibende Zeit"""
if not self.is_running:
raise util.TaskRunException('not running')
self._remaining = self.remaining
self.datetime_started = None
def serialize(self) -> Dict[str, Any]:
"""Task zur Speicherung in der Datenbank in dict umwandeln."""
return {
'source': self.source.name,
'zone_id': self.zone_id,
'duration': self.duration,
'remaining': self.remaining
}
def serialize_rpc(self) -> Dict[str, Any]:
"""Task zur aktuellen Statusübertragung in dict umwandeln."""
return {
'source': self.source.name,
'zone_id': self.zone_id,
'duration': self.duration,
'datetime_started': self.datetime_started,
'datetime_finished': self.datetime_finished
}
def serialize_proto(self) -> tsgrain_pb2.Task:
return tsgrain_pb2.Task(
source=self.source.value,
zone_id=self.zone_id,
duration=self.duration,
datetime_started=util.datetime_to_proto(self.datetime_started),
datetime_finished=util.datetime_to_proto(self.datetime_finished))
def validate(self, app: 'AppInterface'):
if not isinstance(self.source, Source):
raise util.InvalidInputException('Source invalid')
if self.duration < 1:
raise util.InvalidInputException('Irrigation time not set')
_validate_zone_id(self.zone_id, app)
@classmethod
def deserialize(cls, data: dict) -> 'Task':
task = cls(source=Source[data['source']],
zone_id=data['zone_id'],
duration=data['duration'])
task._remaining = data['remaining']
return task
def __eq__(self, other: 'Task') -> bool:
return self._id == other._id
def __hash__(self) -> int:
return hash(self._id)
def __str__(self):
return f'ZONE {self.zone_id}: {self.duration} {self.source}'
@dataclass
class TaskRequest:
"""Enthält alle Parameter einer zu erstellenden Bewässerungsaufgabe"""
source: Source
"""Quelle des Tasks (Manuell/Zeitplan)"""
zone_id: int
"""Nummer der Zone"""
duration: int = 0
"""Beregnungsdauer in Sekunden"""
queuing: bool = False
"""
Task in die Warteschlange einreihen, wenn er nicht sofort gestartet werden kann.
"""
cancelling: bool = False
"""Task aus der Warteschlange entfernen/abbrechen, wenn er bereits exisitiert."""
@classmethod
def deserialize_proto(cls, data: tsgrain_pb2.TaskRequest) -> 'TaskRequest':
return cls(source=Source(data.source),
zone_id=data.zone_id,
duration=data.duration,
queuing=data.queuing,
cancelling=data.cancelling)
@dataclass
class TaskRequestResult:
"""
Wird beim Versuch, eine neue Bewässerungsaufgabe zu starten, zurückgegeben.
Enthält die Information, ob eine Aufgabe gestartet oder gestoppt wurde.
"""
started: bool
"""True wenn eine neue Aufgabe gestartet wurde"""
stopped: bool
"""True wenn eine laufende Aufgabe gestoppt wurde"""
def serialize_proto(self) -> tsgrain_pb2.TaskRequestResult:
return tsgrain_pb2.TaskRequestResult(started=self.started,
stopped=self.stopped)
class AppInterface:
"""Beinhaltet sämtliche Methoden der TSGRain-Anwendung"""
def get_auto_mode(self) -> bool:
"""Gibt den Status des Automatikmodus zurück."""
def set_auto_mode(self, state: bool):
"""Aktiviert/deaktiviert den Automatikmodus."""
def get_cfg(self) -> config.Config:
"""Gibt das Konfigurationsobjekt der Anwendung zurück."""
def get_logger(self) -> logging.Logger:
"""Gibt den Logger der Anwendung zurück."""
def request_task(self, request: TaskRequest) -> TaskRequestResult:
"""
Starte eine neue Bewässerungsaufgabe (oder stoppe eine laufende, wenn
diese bereits läuft). Wird für den manuellen Start mittels Taster verwendet.
:param request: Objekt, dass die Parameter der neuen Aufgabe enthält.
:return: Statusobjekt (Information, ob eine Aufgabe gestartet oder gestoppt
wurde).
"""
def start_task(self, source: Source, zone_id: int, duration: int,
queuing: bool) -> bool:
"""
Starte eine neue Bewässerungsaufgabe
:param source: Quelle der Bewässeungsaufgabe (Manuell/Automatik)
:param zone_id: ID der Bewässerungszone
:param duration: Bewässerungsdauer in Sekunden (0 für Standarddauer)
:param queuing: Neue Aufgabe in die Warteschlange einreihen, wenn momentan
eine andere Zone bewässert wird.
:return: True wenn die Aufgabe erfolgreich gestartet wurde
"""
def stop_task(self, source: Source, zone_id: int) -> bool:
"""
Stoppe eine laufende Bewässerungsaufgabe
:param source: Quelle der Bewässeungsaufgabe (Manuell/Automatik)
:param zone_id: ID der Bewässerungszone
:return: True wenn die Aufgabe erfolgreich gestoppt wurde
"""
def get_tasks(self) -> List[Task]:
"""
Gibt sämtliche in der Warteschlange befindlichen Bewässerungsaufgaben zurück.
:return: Liste von Bewässerungsaufgaben
"""
def create_job(self, job: Job) -> int:
"""
Erstelle einen neuen Bewässerungsjob.
:param job: Bewässerungsjob
:return: ID des neuen Bewässerungsjobs
"""
def get_job(self, job_id: int) -> Job:
"""
Gibt den Bewässerungsjob mit der gegebenen ID zurück.
:param job_id: ID des Bewässerungsjobs
:return: Bewässerungsjob
:raise KeyError: wenn Job nicht gefunden
"""
def get_jobs(self) -> List[Job]:
"""
Gibt alle gespeicherten Bewässerungsjobs zurück.
:return: Bewässerungsjobs: dict(id -> models.Job)
"""
def update_job(self, job: Job):
"""
Aktualisiert einen Bewässerungsjob.
:param job: Bewässerungsjob
"""
def delete_job(self, job_id: int):
"""
Lösche den Bewässerungsjob mit der gegebenen ID.
:param job_id: ID des Bewässerungsjobs
:raise KeyError: wenn Job nicht gefunden
"""
def enable_job(self, job_id: int):
"""
Aktiviere den Bewässerungsjob mit der gegebenen ID.
:param job_id: ID des Bewässerungsjobs
"""
def disable_job(self, job_id: int):
"""
Deaktiviere den Bewässerungsjob mit der gegebenen ID.
:param job_id: ID des Bewässerungsjobs
"""
def notify_queue_update(self):
"""
Wird aufgerufen, wenn die Warteschlange aktualisiert wird.
Dient dazu, andere Komponenten der Anwendung zu benachrichtigen
(momentan den GRPC-Server).
"""
def is_running(self) -> bool:
"""Gibt den Status der Anwendung zurück"""
def get_system_timezone(self) -> str:
"""
Rufe die Systemzeitzone mit dem entsprechenden Befehl ab.
:return: Systemzeitzone im Unix-Format (z.B. Europe/Berlin)
:raise ErrorTimeConfig: wenn der Befehl einen Fehler zurückgibt
:raise ErrorInvalidTimezone: wenn die ermittelte Zeitzone ein ungültiges Format hat
"""
def set_system_datetime(self, date_time: datetime):
"""
Ändere die Systemzeit. Die Anwendung muss hierfür als Root laufen.
:param date_time: Neue Systemzeit
:raise ErrorTimeConfig: wenn Befehl nicht erfolgreich
"""
def set_system_timezone(self, tz: str):
"""
Ändere die Systemzeitzone. Die Anwendung muss hierfür als Root laufen.
:param tz: Neue Zeitzone im Unix-Format (z.B. Europe/Berlin)
:raise ErrorInvalidCmdTemplate: wenn die Befehlsvorlage keinen Platzhalter enthält
:raise ErrorInvalidTimezone: wenn die eingegebene Zeitzone ein ungültiges Format hat
:raise ErrorTimeConfig: wenn der Konfigurationsbefehl einen Fehler zurückgibt
"""
def _validate_zone_id(zone_id: int, app: AppInterface):
if zone_id < 0 or zone_id > app.get_cfg().n_zones:
raise util.InvalidInputException('ZoneID invalid')