Controller/tsgrain_controller/output.py
2022-01-30 22:09:56 +01:00

151 lines
4.5 KiB
Python

from dataclasses import dataclass
from typing import Dict
from tsgrain_controller import io, util, task_queue, models
@dataclass
class OutputState:
on: bool
# Blinks/second
blink_freq: float = 0
def is_on(self, ms_ticks: int) -> bool:
if self.on and self.blink_freq > 0:
period = 1000 / self.blink_freq
period_progress = ms_ticks % period
return period_progress < (period / 2)
return self.on
def __str__(self):
if not self.on:
return 'OFF'
if self.blink_freq == 0:
return 'ON'
return f'(({self.blink_freq}))'
class OutputDevice:
"""Ein einzelnes digitales Ausgabegerät"""
def __init__(self, name: str, o_io: io.Io):
self.name = name
self._io = o_io
self.set_state = OutputState(False)
self._state = False
self._io.write_output(self.name, False)
def _write_state(self, state: bool):
if state != self._state:
self._state = state
self._io.write_output(self.name, self._state)
def update_state(self, ms_ticks: int):
self._write_state(self.set_state.is_on(ms_ticks))
class Outputs(util.StoppableThread):
"""
Outputs ist für die Ausgabegeräte der Bewässerungssteuerung zuständig
(Ventilausgänge und LEDs)
"""
def __init__(self, o_io: io.Io, task_holder: task_queue.TaskHolder,
app: models.AppInterface):
super().__init__(0.01)
self.task_holder = task_holder
self.app = app
self.n_zones = self.app.get_cfg().n_zones
self.valve_outputs: Dict[int, OutputDevice] = {
i: OutputDevice(f'VALVE_{i}', o_io)
for i in range(1, self.n_zones + 1)
}
self.zone_leds: Dict[int, OutputDevice] = {
i: OutputDevice(f'LED_Z_{i}', o_io)
for i in range(1, self.n_zones + 1)
}
self.mode_led_auto = OutputDevice('LED_M_AUTO', o_io)
self.mode_led_man = OutputDevice('LED_M_MAN', o_io)
self._output_devices = [
*self.valve_outputs.values(),
*self.zone_leds.values(),
self.mode_led_auto,
self.mode_led_man,
]
def _get_valve_op(self, valve_id: int) -> OutputDevice:
if valve_id not in self.valve_outputs:
raise Exception('Valve does not exist')
return self.valve_outputs[valve_id]
def _get_zoneled_op(self, valve_id: int) -> OutputDevice:
if valve_id not in self.zone_leds:
raise Exception('Zone LED does not exist')
return self.zone_leds[valve_id]
def _set_zone(self, zone_id: int, state: bool):
self._get_valve_op(zone_id).set_state.on = state
self._get_zoneled_op(zone_id).set_state.on = state
def _set_zone_time(self, zone_id: int, remaining_seconds: int):
is_on = remaining_seconds > 0
blink_freq = 0
if remaining_seconds < 60:
blink_freq = round((60 - remaining_seconds) / 15)
self._get_valve_op(zone_id).set_state.on = is_on
zone_led = self._get_zoneled_op(zone_id)
zone_led.set_state.on = is_on
zone_led.set_state.blink_freq = blink_freq
def _set_mode_led_auto(self, enabled: bool, current: bool):
blink_freq = 0
if current:
blink_freq = 2
self.mode_led_auto.set_state.on = enabled
self.mode_led_auto.set_state.blink_freq = blink_freq
def _set_mode_led_manual(self, current: bool):
self.mode_led_man.set_state.on = current
self.mode_led_man.set_state.blink_freq = 2
def _reset_states(self):
for output in self._output_devices:
output.set_state = OutputState(False)
def _update_states(self):
ms_ticks = util.time_ms()
for output in self._output_devices:
output.update_state(ms_ticks)
def reset(self):
self._reset_states()
self._update_states()
def run_cycle(self):
self._reset_states()
task = self.task_holder.get_current_task()
if task is not None:
self._set_zone_time(task.zone_id, task.remaining)
self._set_mode_led_manual(task.source == models.Source.MANUAL)
self._set_mode_led_auto(self.app.get_auto_mode(),
task.source == models.Source.SCHEDULE)
else:
self._set_mode_led_auto(self.app.get_auto_mode(), False)
self._update_states()
def setup(self):
self.reset()
def cleanup(self):
self.reset()