Controller/tsgrain_controller/util.py

165 lines
4 KiB
Python

# coding=utf-8
import sys
from datetime import date, datetime
import json
import threading
import time
import math
from enum import Enum
from typing import Any, Union, Optional
from tsgrain_controller.grpc_generated import tsgrain_pb2
def _get_np_attrs(o) -> dict:
"""
Return all non-protected attributes of the given object.
:param o: Object
:return: Dict of attributes
"""
return {k: v for k, v in o.__dict__.items() if not k.startswith('_')}
def serializer(o: Any) -> Union[str, dict, int, float, bool]:
"""
Serialize object to json-storable format
:param o: Object to serialize
:return: Serialized output data
"""
if hasattr(o, 'serialize'):
return o.serialize()
if isinstance(o, (datetime, date)):
return o.isoformat()
if isinstance(o, Enum):
return o.value
if isinstance(o, (bool, float, int)):
return o
if hasattr(o, '__dict__'):
return _get_np_attrs(o)
return str(o)
def to_json(o, pretty=False) -> str:
"""
Convert object to json.
Uses t7he ``serialize()`` method of the target object if available.
:param o: Object to serialize
:param pretty: Prettify with indents
:return: JSON string
"""
return json.dumps(o,
default=serializer,
indent=2 if pretty else None,
ensure_ascii=False)
def to_json_file(o, path):
"""
Convert object to json and writes the result to a file.
Uses the ``serialize()`` method of the target object if available.
:param o: Object to serialize
:param path: File path
"""
with open(path, 'w', encoding='utf-8') as f:
json.dump(o, f, default=serializer, indent=2, ensure_ascii=False)
def time_ms() -> int:
return round(time.time() * 1000)
def datetime_now() -> datetime:
return datetime.now()
# pylint: disable-next=too-many-arguments
def datetime_new(year,
month=None,
day=None,
hour=0,
minute=0,
second=0,
microsecond=0) -> datetime:
return datetime(year, month, day, hour, minute, second, microsecond)
def datetime_to_proto(
date_time: Optional[datetime]) -> Optional[tsgrain_pb2.Timestamp]:
"""Konvertiert ein Python datetime-Object in einen protobuf-Zeitstempel"""
if date_time is None:
return None
return tsgrain_pb2.Timestamp(seconds=math.floor(date_time.timestamp()))
def datetime_from_proto(
timestamp: Optional[tsgrain_pb2.Timestamp]) -> Optional[datetime]:
"""Konvertiert einen protobuf-Zeitstempel in ein Python datetime-Objekt"""
if timestamp is None:
return None
return datetime.fromtimestamp(timestamp.seconds)
def datetime_deserialize(date_time: Union[datetime, str]) -> datetime:
"""
Deserialisiert ein datetime-Objekt, falls es im String-Format (YYYY-MM-DDThh:mm:ss)
vorliegt.
"""
if isinstance(date_time, str):
return datetime.fromisoformat(date_time)
return date_time
class StoppableThread(threading.Thread):
def __init__(self, interval: float = 1):
super().__init__()
self._interval = interval
self._stop_signal = threading.Event()
def setup(self):
pass
def cleanup(self):
pass
def run_cycle(self):
pass
def run(self):
try:
self.setup()
while not self._stop_signal.is_set():
self.run_cycle()
time.sleep(self._interval)
self.cleanup()
# Application should stop if there is an
# unrecoverable error in a thread
except: # pylint: disable=bare-except
sys.excepthook(*sys.exc_info())
def stop(self):
self._stop_signal.set()
try:
self.join()
except RuntimeError:
pass
class ZoneUnavailableException(Exception):
pass
class TaskRunException(Exception):
pass
class InvalidInputException(Exception):
pass