180 lines
4.5 KiB
Python
180 lines
4.5 KiB
Python
# coding=utf-8
|
|
import sys
|
|
from datetime import date, datetime
|
|
import json
|
|
import threading
|
|
import time
|
|
import math
|
|
from enum import Enum
|
|
from typing import Any, Union, Optional
|
|
|
|
from tsgrain_controller.grpc_generated import tsgrain_pb2
|
|
|
|
|
|
def _get_np_attrs(o) -> dict:
|
|
"""
|
|
Return all non-protected attributes of the given object.
|
|
|
|
:param o: Object
|
|
:return: Dict of attributes
|
|
"""
|
|
return {k: v for k, v in o.__dict__.items() if not k.startswith('_')}
|
|
|
|
|
|
def serializer(o: Any) -> Union[str, dict, int, float, bool]:
|
|
"""
|
|
Serialize object to json-storable format
|
|
|
|
:param o: Object to serialize
|
|
:return: Serialized output data
|
|
"""
|
|
if hasattr(o, 'serialize'):
|
|
return o.serialize()
|
|
if isinstance(o, (datetime, date)):
|
|
return o.isoformat()
|
|
if isinstance(o, Enum):
|
|
return o.value
|
|
if isinstance(o, (bool, float, int)):
|
|
return o
|
|
if hasattr(o, '__dict__'):
|
|
return _get_np_attrs(o)
|
|
return str(o)
|
|
|
|
|
|
def to_json(o, pretty=False) -> str:
|
|
"""
|
|
Convert object to json.
|
|
Uses t7he ``serialize()`` method of the target object if available.
|
|
|
|
:param o: Object to serialize
|
|
:param pretty: Prettify with indents
|
|
:return: JSON string
|
|
"""
|
|
return json.dumps(o,
|
|
default=serializer,
|
|
indent=2 if pretty else None,
|
|
ensure_ascii=False)
|
|
|
|
|
|
def to_json_file(o, path):
|
|
"""
|
|
Convert object to json and writes the result to a file.
|
|
Uses the ``serialize()`` method of the target object if available.
|
|
|
|
:param o: Object to serialize
|
|
:param path: File path
|
|
"""
|
|
with open(path, 'w', encoding='utf-8') as f:
|
|
json.dump(o, f, default=serializer, indent=2, ensure_ascii=False)
|
|
|
|
|
|
def time_ms() -> int:
|
|
return round(time.time() * 1000)
|
|
|
|
|
|
def datetime_now() -> datetime:
|
|
return datetime.now()
|
|
|
|
|
|
# pylint: disable-next=too-many-arguments
|
|
def datetime_new(year,
|
|
month=None,
|
|
day=None,
|
|
hour=0,
|
|
minute=0,
|
|
second=0,
|
|
microsecond=0) -> datetime:
|
|
return datetime(year, month, day, hour, minute, second, microsecond)
|
|
|
|
|
|
def datetime_to_proto(
|
|
date_time: Optional[datetime]) -> Optional[tsgrain_pb2.Timestamp]:
|
|
"""Konvertiert ein Python datetime-Object in einen protobuf-Zeitstempel"""
|
|
if date_time is None:
|
|
return None
|
|
|
|
return tsgrain_pb2.Timestamp(seconds=math.floor(date_time.timestamp()))
|
|
|
|
|
|
def datetime_from_proto(
|
|
timestamp: Optional[tsgrain_pb2.Timestamp]) -> Optional[datetime]:
|
|
"""Konvertiert einen protobuf-Zeitstempel in ein Python datetime-Objekt"""
|
|
if timestamp is None:
|
|
return None
|
|
|
|
return datetime.fromtimestamp(timestamp.seconds)
|
|
|
|
|
|
def datetime_deserialize(date_time: Union[datetime, str]) -> datetime:
|
|
"""
|
|
Deserialisiert ein datetime-Objekt, falls es im String-Format (YYYY-MM-DDThh:mm:ss)
|
|
vorliegt.
|
|
"""
|
|
if isinstance(date_time, str):
|
|
return datetime.fromisoformat(date_time)
|
|
return date_time
|
|
|
|
|
|
class StoppableThread(threading.Thread):
|
|
|
|
def __init__(self, interval: float = 1):
|
|
super().__init__()
|
|
self._interval = interval
|
|
self._stop_signal = threading.Event()
|
|
|
|
def setup(self):
|
|
pass
|
|
|
|
def cleanup(self):
|
|
pass
|
|
|
|
def run_cycle(self):
|
|
"""
|
|
Führe einen Durchlauf des Threads aus.
|
|
|
|
Diese Funktion darf nicht blockieren, sonst kann der
|
|
Thread nicht gestoppt werden.
|
|
"""
|
|
|
|
def run(self):
|
|
"""
|
|
Führe die ``run_cycle()``-Funktion in
|
|
Endlosschleife aus, mit einer durch das
|
|
``interval``-Attribut bestimmten Verzögerung zwischen
|
|
den Durchläufen.
|
|
"""
|
|
try:
|
|
self.setup()
|
|
|
|
while not self._stop_signal.is_set():
|
|
self.run_cycle()
|
|
time.sleep(self._interval)
|
|
|
|
self.cleanup()
|
|
# Application should stop if there is an
|
|
# unrecoverable error in a thread
|
|
except: # pylint: disable=bare-except
|
|
sys.excepthook(*sys.exc_info())
|
|
|
|
def stop(self):
|
|
"""
|
|
Stoppe den Thread und warte bis er beendet. Dies dauert
|
|
maximal ``interval`` + Durchlaufzeit von ``run_cycle()``.
|
|
"""
|
|
self._stop_signal.set()
|
|
try:
|
|
self.join()
|
|
except RuntimeError:
|
|
pass
|
|
|
|
|
|
class ZoneUnavailableException(Exception):
|
|
pass
|
|
|
|
|
|
class TaskRunException(Exception):
|
|
pass
|
|
|
|
|
|
class InvalidInputException(Exception):
|
|
pass
|