# coding=utf-8 import logging import uuid from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum from typing import List, Optional, Dict, Any from tsgrain_controller import util, config from tsgrain_controller.grpc_generated import tsgrain_pb2 @dataclass class Job: """Bewässerungsjob (Zeitsteuerung)""" date: datetime """Startdatum/Uhrzeit""" duration: int """Bewässerungsdauer in Sekunden""" zones: List[int] """Liste der Zonen, die bewässert werden sollen""" enable: bool """Bewässerungsjob aktiviert""" repeat: bool """Bewässerungsjob täglich wiederholen""" id: int = 0 """ ID, mit der der Bewässerungsjob in der Datenbank gespeichert wird. Ist Null bei neu erstellten Objekten, die sich noch nicht in der Datenbank befinden. """ @property def is_active(self) -> bool: """ Gibt True zurück, wenn der Bewässerungsjob in Zukunft ausgeführt werden wird. """ if not self.enable: return False if self.repeat: return True return self.date > util.datetime_now() def check(self, date_now: datetime) -> bool: """ Gibt True zurück, wenn der Bewässerungsjob in dieser Minute gestartet werden soll. :param date_now: Aktuelles Datum/Uhrzeit """ if not self.enable: return False if self.repeat: return date_now.hour == self.date.hour \ and date_now.minute == self.date.minute return date_now.year == self.date.year \ and date_now.month == self.date.month \ and date_now.day == self.date.day \ and date_now.hour == self.date.hour \ and date_now.minute == self.date.minute def serialize(self) -> Dict[str, Any]: return { 'date': self.date, 'duration': self.duration, 'zones': self.zones, 'enable': self.enable, 'repeat': self.repeat, } @classmethod def deserialize(cls, data: dict, job_id: int = 0) -> 'Job': return cls( date=util.datetime_deserialize(data['date']), duration=data['duration'], zones=data['zones'], enable=data['enable'], repeat=data['repeat'], id=job_id, ) def serialize_proto(self) -> tsgrain_pb2.Job: return tsgrain_pb2.Job(id=self.id, date=util.datetime_to_proto(self.date), duration=self.duration, zones=self.zones, enable=self.enable, repeat=self.repeat) @classmethod def deserialize_proto(cls, data: tsgrain_pb2.Job) -> 'Job': return cls( date=util.datetime_from_proto(data.date), duration=data.duration, zones=list(data.zones), enable=data.enable, repeat=data.repeat, id=data.id, ) def validate(self, app: 'AppInterface'): if not self.zones: raise util.InvalidInputException('No zones set') if self.duration < 1: raise util.InvalidInputException('Irrigation time not set') for zone_id in self.zones: _validate_zone_id(zone_id, app) class Source(Enum): """Quelle einer Bewässerungsaufgabe""" MANUAL = 0 SCHEDULE = 1 @dataclass class Task: """Bewässerungsaufgabe""" source: Source """Quelle des Tasks (Manuell/Zeitplan)""" zone_id: int """Nummer der Zone""" duration: int """Beregnungsdauer in Sekunden""" _remaining: int = 0 """Interne Variable, um die verbleibende Zeit eines gestoppten Tasks zu speichern""" _id: int = 0 datetime_started: Optional[datetime] = None """Zeitpunkt, wann der Task gestartet wurde""" def __post_init__(self): self._remaining = self.duration self._id = uuid.uuid1().int @property def is_running(self) -> bool: """ :return: True falls der Task momentan läuft """ return self.datetime_started is not None @property def remaining(self) -> int: """ :return: Verbleibende Zeit in Sekunden """ if not self.is_running: return self._remaining d = self.datetime_finished - util.datetime_now() return d.seconds @property def is_done(self) -> bool: """ :return: True wenn der Task bereits abgeschlossen ist. """ return self.remaining <= 0 @property def datetime_finished(self) -> Optional[datetime]: """ :return: Zeitpunkt, zu dem der Task abgeschlossen sein wird. None falls der Task momentan nicht läuft. """ if self.datetime_started is None: return None return self.datetime_started + timedelta(seconds=self._remaining) def start(self): """Startet den Task zur aktuellen Zeit.""" if self.is_running: raise util.TaskRunException('already running') self.datetime_started = util.datetime_now() def stop(self): """Stoppt den Task und speichert die verbleibende Zeit""" if not self.is_running: raise util.TaskRunException('not running') self._remaining = self.remaining self.datetime_started = None def serialize(self) -> Dict[str, Any]: """Task zur Speicherung in der Datenbank in dict umwandeln.""" return { 'source': self.source.name, 'zone_id': self.zone_id, 'duration': self.duration, 'remaining': self.remaining } def serialize_rpc(self) -> Dict[str, Any]: """Task zur aktuellen Statusübertragung in dict umwandeln.""" return { 'source': self.source.name, 'zone_id': self.zone_id, 'duration': self.duration, 'datetime_started': self.datetime_started, 'datetime_finished': self.datetime_finished } def serialize_proto(self) -> tsgrain_pb2.Task: return tsgrain_pb2.Task( source=self.source.value, zone_id=self.zone_id, duration=self.duration, datetime_started=util.datetime_to_proto(self.datetime_started), datetime_finished=util.datetime_to_proto(self.datetime_finished)) def validate(self, app: 'AppInterface'): if not isinstance(self.source, Source): raise util.InvalidInputException('Source invalid') if self.duration < 1: raise util.InvalidInputException('Irrigation time not set') _validate_zone_id(self.zone_id, app) @classmethod def deserialize(cls, data: dict) -> 'Task': task = cls(source=Source[data['source']], zone_id=data['zone_id'], duration=data['duration']) task._remaining = data['remaining'] return task def __eq__(self, other: 'Task') -> bool: return self._id == other._id def __hash__(self) -> int: return hash(self._id) def __str__(self): return f'ZONE {self.zone_id}: {self.duration} {self.source}' @dataclass class TaskRequest: """Enthält alle Parameter einer zu erstellenden Bewässerungsaufgabe""" source: Source """Quelle des Tasks (Manuell/Zeitplan)""" zone_id: int """Nummer der Zone""" duration: int = 0 """Beregnungsdauer in Sekunden""" queuing: bool = False """ Task in die Warteschlange einreihen, wenn er nicht sofort gestartet werden kann. """ cancelling: bool = False """Task aus der Warteschlange entfernen/abbrechen, wenn er bereits exisitiert.""" @classmethod def deserialize_proto(cls, data: tsgrain_pb2.TaskRequest) -> 'TaskRequest': return cls(source=Source(data.source), zone_id=data.zone_id, duration=data.duration, queuing=data.queuing, cancelling=data.cancelling) @dataclass class TaskRequestResult: """ Wird beim Versuch, eine neue Bewässerungsaufgabe zu starten, zurückgegeben. Enthält die Information, ob eine Aufgabe gestartet oder gestoppt wurde. """ started: bool """True wenn eine neue Aufgabe gestartet wurde""" stopped: bool """True wenn eine laufende Aufgabe gestoppt wurde""" def serialize_proto(self) -> tsgrain_pb2.TaskRequestResult: return tsgrain_pb2.TaskRequestResult(started=self.started, stopped=self.stopped) class AppInterface: """Beinhaltet sämtliche Methoden der TSGRain-Anwendung""" def get_auto_mode(self) -> bool: """Gibt den Status des Automatikmodus zurück.""" def set_auto_mode(self, state: bool): """Aktiviert/deaktiviert den Automatikmodus.""" def get_cfg(self) -> config.Config: """Gibt das Konfigurationsobjekt der Anwendung zurück.""" def get_logger(self) -> logging.Logger: """Gibt den Logger der Anwendung zurück.""" def request_task(self, request: TaskRequest) -> TaskRequestResult: """ Starte eine neue Bewässerungsaufgabe (oder stoppe eine laufende, wenn diese bereits läuft). Wird für den manuellen Start mittels Taster verwendet. :param request: Objekt, dass die Parameter der neuen Aufgabe enthält. :return: Statusobjekt (Information, ob eine Aufgabe gestartet oder gestoppt wurde). """ def start_task(self, source: Source, zone_id: int, duration: int, queuing: bool) -> bool: """ Starte eine neue Bewässerungsaufgabe :param source: Quelle der Bewässeungsaufgabe (Manuell/Automatik) :param zone_id: ID der Bewässerungszone :param duration: Bewässerungsdauer in Sekunden (0 für Standarddauer) :param queuing: Neue Aufgabe in die Warteschlange einreihen, wenn momentan eine andere Zone bewässert wird. :return: True wenn die Aufgabe erfolgreich gestartet wurde """ def stop_task(self, source: Source, zone_id: int) -> bool: """ Stoppe eine laufende Bewässerungsaufgabe :param source: Quelle der Bewässeungsaufgabe (Manuell/Automatik) :param zone_id: ID der Bewässerungszone :return: True wenn die Aufgabe erfolgreich gestoppt wurde """ def get_tasks(self) -> List[Task]: """ Gibt sämtliche in der Warteschlange befindlichen Bewässerungsaufgaben zurück. :return: Liste von Bewässerungsaufgaben """ def create_job(self, job: Job) -> int: """ Erstelle einen neuen Bewässerungsjob. :param job: Bewässerungsjob :return: ID des neuen Bewässerungsjobs """ def get_job(self, job_id: int) -> Job: """ Gibt den Bewässerungsjob mit der gegebenen ID zurück. :param job_id: ID des Bewässerungsjobs :return: Bewässerungsjob :raise KeyError: wenn Job nicht gefunden """ def get_jobs(self) -> List[Job]: """ Gibt alle gespeicherten Bewässerungsjobs zurück. :return: Bewässerungsjobs: dict(id -> models.Job) """ def update_job(self, job: Job): """ Aktualisiert einen Bewässerungsjob. :param job: Bewässerungsjob """ def delete_job(self, job_id: int): """ Lösche den Bewässerungsjob mit der gegebenen ID. :param job_id: ID des Bewässerungsjobs :raise KeyError: wenn Job nicht gefunden """ def enable_job(self, job_id: int): """ Aktiviere den Bewässerungsjob mit der gegebenen ID. :param job_id: ID des Bewässerungsjobs """ def disable_job(self, job_id: int): """ Deaktiviere den Bewässerungsjob mit der gegebenen ID. :param job_id: ID des Bewässerungsjobs """ def notify_queue_update(self): """ Wird aufgerufen, wenn die Warteschlange aktualisiert wird. Dient dazu, andere Komponenten der Anwendung zu benachrichtigen (momentan den GRPC-Server). """ def is_running(self) -> bool: """Gibt den Status der Anwendung zurück""" def get_system_timezone(self) -> str: """ Rufe die Systemzeitzone mit dem entsprechenden Befehl ab. :return: Systemzeitzone im Unix-Format (z.B. Europe/Berlin) :raise ErrorTimeConfig: wenn der Befehl einen Fehler zurückgibt :raise ErrorInvalidTimezone: wenn die ermittelte Zeitzone ein ungültiges Format hat """ def set_system_datetime(self, date_time: datetime): """ Ändere die Systemzeit. Die Anwendung muss hierfür als Root laufen. :param date_time: Neue Systemzeit :raise ErrorTimeConfig: wenn Befehl nicht erfolgreich """ def set_system_timezone(self, tz: str): """ Ändere die Systemzeitzone. Die Anwendung muss hierfür als Root laufen. :param tz: Neue Zeitzone im Unix-Format (z.B. Europe/Berlin) :raise ErrorInvalidCmdTemplate: wenn die Befehlsvorlage keinen Platzhalter enthält :raise ErrorInvalidTimezone: wenn die eingegebene Zeitzone ein ungültiges Format hat :raise ErrorTimeConfig: wenn der Konfigurationsbefehl einen Fehler zurückgibt """ def _validate_zone_id(zone_id: int, app: AppInterface): if zone_id < 0 or zone_id > app.get_cfg().n_zones: raise util.InvalidInputException('ZoneID invalid')