from dataclasses import dataclass from typing import Dict from tsgrain_controller import io, util, task_queue, models @dataclass class OutputState: on: bool # Blinks/second blink_freq: float = 0 def is_on(self, ms_ticks: int) -> bool: if self.on and self.blink_freq > 0: period = 1000 / self.blink_freq period_progress = ms_ticks % period return period_progress < (period / 2) return self.on def __str__(self): if not self.on: return 'OFF' if self.blink_freq == 0: return 'ON' return f'(({self.blink_freq}))' class OutputDevice: """Ein einzelnes digitales Ausgabegerät""" def __init__(self, name: str, o_io: io.Io): self.name = name self._io = o_io self.set_state = OutputState(False) self._state = False self._io.write_output(self.name, False) def _write_state(self, state: bool): if state != self._state: self._state = state self._io.write_output(self.name, self._state) def update_state(self, ms_ticks: int): self._write_state(self.set_state.is_on(ms_ticks)) class Outputs(util.StoppableThread): """ Outputs ist für die Ausgabegeräte der Bewässerungssteuerung zuständig (Ventilausgänge und LEDs) """ def __init__(self, o_io: io.Io, task_holder: task_queue.TaskHolder, app: models.AppInterface): super().__init__(0.01) self.task_holder = task_holder self.app = app self.n_zones = self.app.get_cfg().n_zones self.valve_outputs: Dict[int, OutputDevice] = { i: OutputDevice(f'VALVE_{i}', o_io) for i in range(1, self.n_zones + 1) } self.zone_leds: Dict[int, OutputDevice] = { i: OutputDevice(f'LED_Z_{i}', o_io) for i in range(1, self.n_zones + 1) } self.mode_led_auto = OutputDevice('LED_M_AUTO', o_io) self.mode_led_man = OutputDevice('LED_M_MAN', o_io) self._output_devices = [ *self.valve_outputs.values(), *self.zone_leds.values(), self.mode_led_auto, self.mode_led_man, ] def _get_valve_op(self, valve_id: int) -> OutputDevice: if valve_id not in self.valve_outputs: raise Exception('Valve does not exist') return self.valve_outputs[valve_id] def _get_zoneled_op(self, valve_id: int) -> OutputDevice: if valve_id not in self.zone_leds: raise Exception('Zone LED does not exist') return self.zone_leds[valve_id] def _set_zone(self, zone_id: int, state: bool): self._get_valve_op(zone_id).set_state.on = state self._get_zoneled_op(zone_id).set_state.on = state def _set_zone_time(self, zone_id: int, remaining_seconds: int): is_on = remaining_seconds > 0 blink_freq = 0 if remaining_seconds < 60: blink_freq = round((60 - remaining_seconds) / 15) self._get_valve_op(zone_id).set_state.on = is_on zone_led = self._get_zoneled_op(zone_id) zone_led.set_state.on = is_on zone_led.set_state.blink_freq = blink_freq def _set_mode_led_auto(self, enabled: bool, current: bool): blink_freq = 0 if current: blink_freq = 2 self.mode_led_auto.set_state.on = enabled self.mode_led_auto.set_state.blink_freq = blink_freq def _set_mode_led_manual(self, current: bool): self.mode_led_man.set_state.on = current self.mode_led_man.set_state.blink_freq = 2 def _reset_states(self): for output in self._output_devices: output.set_state = OutputState(False) def _update_states(self): ms_ticks = util.time_ms() for output in self._output_devices: output.update_state(ms_ticks) def reset(self): self._reset_states() self._update_states() def run_cycle(self): self._reset_states() task = self.task_holder.get_current_task() if task is not None: self._set_zone_time(task.zone_id, task.remaining) self._set_mode_led_manual(task.source == models.Source.MANUAL) self._set_mode_led_auto(self.app.get_auto_mode(), task.source == models.Source.SCHEDULE) else: self._set_mode_led_auto(self.app.get_auto_mode(), False) self._update_states() def setup(self): self.reset() def cleanup(self): self.reset()