151 lines
4.5 KiB
Python
151 lines
4.5 KiB
Python
from dataclasses import dataclass
|
|
from typing import Dict
|
|
from tsgrain_controller import io, util, task_queue, models
|
|
|
|
|
|
@dataclass
|
|
class OutputState:
|
|
on: bool
|
|
# Blinks/second
|
|
blink_freq: float = 0
|
|
|
|
def is_on(self, ms_ticks: int) -> bool:
|
|
if self.on and self.blink_freq > 0:
|
|
period = 1000 / self.blink_freq
|
|
period_progress = ms_ticks % period
|
|
return period_progress < (period / 2)
|
|
|
|
return self.on
|
|
|
|
def __str__(self):
|
|
if not self.on:
|
|
return 'OFF'
|
|
if self.blink_freq == 0:
|
|
return 'ON'
|
|
return f'(({self.blink_freq}))'
|
|
|
|
|
|
class OutputDevice:
|
|
"""Ein einzelnes digitales Ausgabegerät"""
|
|
|
|
def __init__(self, name: str, o_io: io.Io):
|
|
self.name = name
|
|
self._io = o_io
|
|
self.set_state = OutputState(False)
|
|
self._state = False
|
|
|
|
self._io.write_output(self.name, False)
|
|
|
|
def _write_state(self, state: bool):
|
|
if state != self._state:
|
|
self._state = state
|
|
self._io.write_output(self.name, self._state)
|
|
|
|
def update_state(self, ms_ticks: int):
|
|
self._write_state(self.set_state.is_on(ms_ticks))
|
|
|
|
|
|
class Outputs(util.StoppableThread):
|
|
"""
|
|
Outputs ist für die Ausgabegeräte der Bewässerungssteuerung zuständig
|
|
(Ventilausgänge und LEDs)
|
|
"""
|
|
|
|
def __init__(self, o_io: io.Io, task_holder: task_queue.TaskHolder,
|
|
app: models.AppInterface):
|
|
super().__init__(0.01)
|
|
|
|
self.task_holder = task_holder
|
|
self.app = app
|
|
self.n_zones = self.app.get_cfg().n_zones
|
|
|
|
self.valve_outputs: Dict[int, OutputDevice] = {
|
|
i: OutputDevice(f'VALVE_{i}', o_io)
|
|
for i in range(1, self.n_zones + 1)
|
|
}
|
|
self.zone_leds: Dict[int, OutputDevice] = {
|
|
i: OutputDevice(f'LED_Z_{i}', o_io)
|
|
for i in range(1, self.n_zones + 1)
|
|
}
|
|
|
|
self.mode_led_auto = OutputDevice('LED_M_AUTO', o_io)
|
|
self.mode_led_man = OutputDevice('LED_M_MAN', o_io)
|
|
|
|
self._output_devices = [
|
|
*self.valve_outputs.values(),
|
|
*self.zone_leds.values(),
|
|
self.mode_led_auto,
|
|
self.mode_led_man,
|
|
]
|
|
|
|
def _get_valve_op(self, valve_id: int) -> OutputDevice:
|
|
if valve_id not in self.valve_outputs:
|
|
raise Exception('Valve does not exist')
|
|
return self.valve_outputs[valve_id]
|
|
|
|
def _get_zoneled_op(self, valve_id: int) -> OutputDevice:
|
|
if valve_id not in self.zone_leds:
|
|
raise Exception('Zone LED does not exist')
|
|
return self.zone_leds[valve_id]
|
|
|
|
def _set_zone(self, zone_id: int, state: bool):
|
|
self._get_valve_op(zone_id).set_state.on = state
|
|
self._get_zoneled_op(zone_id).set_state.on = state
|
|
|
|
def _set_zone_time(self, zone_id: int, remaining_seconds: int):
|
|
is_on = remaining_seconds > 0
|
|
|
|
blink_freq = 0
|
|
if remaining_seconds < 60:
|
|
blink_freq = round((60 - remaining_seconds) / 15)
|
|
|
|
self._get_valve_op(zone_id).set_state.on = is_on
|
|
zone_led = self._get_zoneled_op(zone_id)
|
|
zone_led.set_state.on = is_on
|
|
zone_led.set_state.blink_freq = blink_freq
|
|
|
|
def _set_mode_led_auto(self, enabled: bool, current: bool):
|
|
blink_freq = 0
|
|
if current:
|
|
blink_freq = 2
|
|
|
|
self.mode_led_auto.set_state.on = enabled
|
|
self.mode_led_auto.set_state.blink_freq = blink_freq
|
|
|
|
def _set_mode_led_manual(self, current: bool):
|
|
self.mode_led_man.set_state.on = current
|
|
self.mode_led_man.set_state.blink_freq = 2
|
|
|
|
def _reset_states(self):
|
|
for output in self._output_devices:
|
|
output.set_state = OutputState(False)
|
|
|
|
def _update_states(self):
|
|
ms_ticks = util.time_ms()
|
|
|
|
for output in self._output_devices:
|
|
output.update_state(ms_ticks)
|
|
|
|
def reset(self):
|
|
self._reset_states()
|
|
self._update_states()
|
|
|
|
def run_cycle(self):
|
|
self._reset_states()
|
|
|
|
task = self.task_holder.get_current_task()
|
|
if task is not None:
|
|
self._set_zone_time(task.zone_id, task.remaining)
|
|
self._set_mode_led_manual(task.source == models.Source.MANUAL)
|
|
self._set_mode_led_auto(self.app.get_auto_mode(),
|
|
task.source == models.Source.SCHEDULE)
|
|
else:
|
|
self._set_mode_led_auto(self.app.get_auto_mode(), False)
|
|
|
|
self._update_states()
|
|
|
|
def setup(self):
|
|
self.reset()
|
|
|
|
def cleanup(self):
|
|
self.reset()
|