# coding=utf-8 import sys from datetime import date, datetime import json import threading import time import math from enum import Enum from typing import Any, Union, Optional from tsgrain_controller.grpc_generated import tsgrain_pb2 def _get_np_attrs(o) -> dict: """ Return all non-protected attributes of the given object. :param o: Object :return: Dict of attributes """ return {k: v for k, v in o.__dict__.items() if not k.startswith('_')} def serializer(o: Any) -> Union[str, dict, int, float, bool]: """ Serialize object to json-storable format :param o: Object to serialize :return: Serialized output data """ if hasattr(o, 'serialize'): return o.serialize() if isinstance(o, (datetime, date)): return o.isoformat() if isinstance(o, Enum): return o.value if isinstance(o, (bool, float, int)): return o if hasattr(o, '__dict__'): return _get_np_attrs(o) return str(o) def to_json(o, pretty=False) -> str: """ Convert object to json. Uses t7he ``serialize()`` method of the target object if available. :param o: Object to serialize :param pretty: Prettify with indents :return: JSON string """ return json.dumps(o, default=serializer, indent=2 if pretty else None, ensure_ascii=False) def to_json_file(o, path): """ Convert object to json and writes the result to a file. Uses the ``serialize()`` method of the target object if available. :param o: Object to serialize :param path: File path """ with open(path, 'w', encoding='utf-8') as f: json.dump(o, f, default=serializer, indent=2, ensure_ascii=False) def time_ms() -> int: return round(time.time() * 1000) def datetime_now() -> datetime: return datetime.now() # pylint: disable-next=too-many-arguments def datetime_new(year, month=None, day=None, hour=0, minute=0, second=0, microsecond=0) -> datetime: return datetime(year, month, day, hour, minute, second, microsecond) def datetime_to_proto( date_time: Optional[datetime]) -> Optional[tsgrain_pb2.Timestamp]: """Konvertiert ein Python datetime-Object in einen protobuf-Zeitstempel""" if date_time is None: return None return tsgrain_pb2.Timestamp(seconds=math.floor(date_time.timestamp())) def datetime_from_proto( timestamp: Optional[tsgrain_pb2.Timestamp]) -> Optional[datetime]: """Konvertiert einen protobuf-Zeitstempel in ein Python datetime-Objekt""" if timestamp is None: return None return datetime.fromtimestamp(timestamp.seconds) def datetime_deserialize(date_time: Union[datetime, str]) -> datetime: """ Deserialisiert ein datetime-Objekt, falls es im String-Format (YYYY-MM-DDThh:mm:ss) vorliegt. """ if isinstance(date_time, str): return datetime.fromisoformat(date_time) return date_time class StoppableThread(threading.Thread): def __init__(self, interval: float = 1): super().__init__() self._interval = interval self._stop_signal = threading.Event() def setup(self): pass def cleanup(self): pass def run_cycle(self): pass def run(self): try: self.setup() while not self._stop_signal.is_set(): self.run_cycle() time.sleep(self._interval) self.cleanup() # Application should stop if there is an # unrecoverable error in a thread except: # pylint: disable=bare-except sys.excepthook(*sys.exc_info()) def stop(self): self._stop_signal.set() try: self.join() except RuntimeError: pass class ZoneUnavailableException(Exception): pass class TaskRunException(Exception): pass class InvalidInputException(Exception): pass