Compare commits

..

5 commits

14 changed files with 246 additions and 72 deletions

View file

@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.1.3
current_version = 0.1.4
commit = True
tag = True

View file

@ -2,4 +2,3 @@ grpcio-tools~=1.43.0
mypy-protobuf~=3.2.0
types-protobuf~=3.19.6
grpc-stubs~=1.24.7
bump2version

View file

@ -6,7 +6,7 @@ with open('README.rst') as f:
setuptools.setup(
name='TSGRain Controller',
version='0.1.3',
version='0.1.4',
author='ThetaDev',
description='TSGRain irrigation controller',
long_description=README,

View file

@ -120,6 +120,20 @@ def test_start_task_queue(app):
assert tasks[0].zone_id == 2
assert tasks[1].zone_id == 3
# Queue processing time
time.sleep(0.2)
# Try to enqueue the same task again -> should cancel
res = app.request_task(
models.TaskRequest(source=models.Source.MANUAL,
zone_id=3,
duration=30,
queuing=True,
cancelling=True))
assert not res.started
assert res.stopped
def test_crud_job(app):
# Insert jobs

22
tests/test_jobschedule.py Normal file
View file

@ -0,0 +1,22 @@
# coding=utf-8
from datetime import datetime
import pytest
from tsgrain_controller import models
@pytest.mark.parametrize('date, repeat, expect', [
(datetime(2022, 1, 18, 10, 30, 0), False, True),
(datetime(2022, 1, 18, 10, 30, 13), False, True),
(datetime(2022, 1, 18, 10, 29, 59), False, False),
(datetime(2022, 1, 19, 10, 30, 0), False, False),
(datetime(2022, 1, 19, 10, 30, 0), True, True),
])
def test_job_check(date, repeat, expect):
date_now = datetime(2022, 1, 18, 10, 30, 0)
job = models.Job(date, 30, [1], True, repeat)
assert job.check(date_now) is expect
job.enable = False
assert job.check(date_now) is False

View file

@ -80,3 +80,30 @@ def test_queue_runner():
assert util.to_json(q) == \
'[{"source": "MANUAL", "zone_id": 2, "duration": 5, "remaining": 4}, \
{"source": "SCHEDULE", "zone_id": 2, "duration": 10, "remaining": 10}]'
def test_interrupt_auto():
"""
Wenn der Automatikmodus unterbrochen und ein Scheduled-Task unterbrochen in der
Warteschlange liegt, sollte ein manueller Task gestartet werden können.
"""
app = fixtures.TestingApp()
q = task_queue.TaskQueue(app)
app.auto = True
taskAuto = models.Task(models.Source.SCHEDULE, 1, 10)
taskMan = models.Task(models.Source.MANUAL, 2, 5)
q.start()
assert q.enqueue(taskAuto)
time.sleep(0.2)
assert not q.enqueue(taskMan, False)
app.auto = False
time.sleep(0.2)
assert q.enqueue(taskMan, False)
q.stop()

View file

@ -20,7 +20,7 @@ def test_run_cmd(cmd_str: str, raise_err: bool):
systimecfg._run_cmd(cmd_str)
def get_system_timezone(mocker):
def test_get_system_timezone(mocker):
mock_res = mock.Mock()
mock_res.stdout = 'Europe/Berlin'
@ -38,6 +38,16 @@ def get_system_timezone(mocker):
stderr=subprocess.PIPE)
def test_get_system_timezone_err(mocker):
mock_res = mock.Mock()
mock_res.stdout = 'Europe Berlin'
mocker.patch('subprocess.run', return_value=mock_res)
with pytest.raises(systimecfg.ErrorInvalidTimezone):
systimecfg.get_system_timezone()
def test_set_system_datetime(mocker):
cmd_run_mock: mock.MagicMock = mocker.patch('subprocess.run')

View file

@ -1 +1 @@
__version__ = '0.1.3'
__version__ = '0.1.4'

View file

@ -39,7 +39,7 @@ class Application(models.AppInterface):
self.db.load_queue(self.queue)
self.io = io_factory.new_io(self, io_type)
self.io.set_callbacks(self._cb_manual, self._cb_modekey)
self.io.set_callback(self._input_cb)
self.outputs = output.Outputs(self.io, self.queue, self)
@ -71,7 +71,15 @@ class Application(models.AppInterface):
def get_logger(self) -> logging.Logger:
return self.logger
def _cb_manual(self, zone_id: int):
def _input_cb(self, key: str):
if key == 'BT_MODE':
self.set_auto_mode(not self.get_auto_mode())
elif key.startswith('BT_Z_'):
zoneid_str = key[5:]
try:
zone_id = int(zoneid_str)
except ValueError:
return
self.request_task(
models.TaskRequest(source=models.Source.MANUAL,
zone_id=zone_id,
@ -79,9 +87,6 @@ class Application(models.AppInterface):
queuing=False,
cancelling=True))
def _cb_modekey(self):
self.set_auto_mode(not self.get_auto_mode())
def request_task(self,
request: models.TaskRequest) -> models.TaskRequestResult:
if request.queuing:
@ -106,7 +111,7 @@ class Application(models.AppInterface):
task = models.Task(request.source, request.zone_id, duration)
task.validate(self)
started = self.queue.enqueue(task, not request.queuing)
started = self.queue.enqueue(task, request.queuing)
return models.TaskRequestResult(started, False)
def start_task(self, source: models.Source, zone_id: int, duration: int,
@ -171,6 +176,7 @@ class Application(models.AppInterface):
systimecfg.set_system_timezone(tz, self.cfg.cmd_set_timezone)
def start(self):
logging.info('Starting application')
self._running = True
self.io.start()
self.outputs.start()
@ -179,6 +185,7 @@ class Application(models.AppInterface):
self.grpc_server.start()
def stop(self):
logging.info('Stopping application')
self._running = False
self.grpc_server.stop(None)
self.scheduler.stop()

View file

@ -5,27 +5,36 @@ from typing import Callable, Optional
class Io:
def __init__(self, *args): # pylint: disable=unused-argument
self.cb_manual: Optional[Callable[[int], None]] = None
self.cb_mode: Optional[Callable[[], None]] = None
self.cb_input: Optional[Callable[[str], None]] = None
def set_callbacks(self, cb_manual: Optional[Callable[[int], None]],
cb_mode: Optional[Callable[[], None]]):
self.cb_manual = cb_manual
self.cb_mode = cb_mode
def set_callback(self, cb: Optional[Callable[[str], None]]):
"""
Setze die Callback-Funktion, die bei einer Eingabe aufgerufen wird.
Als Parameter wird der Name des Eingabegeräts mit übergeben.
def _trigger_cb_manual(self, zone_id: int):
if self.cb_manual is not None:
self.cb_manual(zone_id)
:param cb: Input-Callback-Funktion
"""
self.cb_input = cb
def _trigger_cb_mode(self):
if self.cb_mode is not None:
self.cb_mode()
def _trigger_cb(self, key: str):
"""
Löse die Input-Callback-Funktion aus
:param key: Gerätename
"""
if self.cb_input is not None:
self.cb_input(key)
def start(self):
pass
"""Initialisiere die IO"""
def stop(self):
pass
"""Beende die IO und deaktiviere alle Ausgabegeräte"""
def write_output(self, key: str, val: bool):
pass
"""
Setze den Zustand eines Ausgabegeräts
:param key: Name des Ausgabegeräts (z.B. ``VALVE_1``)
:param val: Status des Ausgabegeräts
"""

View file

@ -77,10 +77,10 @@ class Io(util.StoppableThread, io.Io):
# Mode key (0)
if c == 48:
self._trigger_cb_mode()
self._trigger_cb('BT_MODE')
# Zone keys (1-7)
elif 49 <= c <= 55:
self._trigger_cb_manual(c - 48)
self._trigger_cb(f'BT_Z_{c - 48}')
self._screen.erase()
self._screen.addstr(0, 0,

View file

@ -3,7 +3,7 @@ import logging
import time
from dataclasses import dataclass
from enum import Enum
from typing import Callable, Dict, Optional, Tuple
from typing import Dict, Optional, Tuple
from RPi import GPIO # pylint: disable=import-error
import smbus
@ -38,7 +38,8 @@ Polarität pro GPIOA-Pin.
- ``0`` High wenn aktiv
Die Einstellung hatte im Test nur bei als Eingängen konfigurierten Pins
einen Effekt. Bei Ausgängen wird stattdessen der gesetzte Wert invertiert.
einen Effekt. Invertierte Ausgänge setzt die Library deswegen auf den
entgegengesetzten Wert.
"""
MCP_IPOLB = 0x03
@ -197,6 +198,8 @@ class PinConfigInvalid(Exception):
class _MCP23017Port(Enum):
"""IO-Port des MCP23017 (A/B)"""
A = 0
B = 1
@ -206,13 +209,41 @@ class _MCP23017Port(Enum):
@dataclass
class _MCP23017Device:
"""
Ein einzelnes mit einen MCP23017 I2C-Portexpander verbundenes
Eingabe/Ausgabegerät.
"""
i2c_address: int
"""I2C-Adresse des MCP23017"""
port: _MCP23017Port
"""IO-Port des MCP23017 (A/B)"""
pin: int
"""IO-Pin des MCP23017 (0-7)"""
invert: bool
"""Zustand des Pins invertieren"""
@classmethod
def from_config(cls, cfg_str: str):
def from_config(cls, cfg_str: str) -> '_MCP23017Device':
"""
Parst einen Konfigurationsstring und erstellt daraus
ein neues ``_MCP23017Device``-Objekt.
Der Konfigurationsstring hat folgendes Format:
``'I2C_ADDR/PIN'``
Beispiel: ``0x27/B0`` (MCP23017 mit I2C-Adresse 0x27, Pin B0)
Um den Zustand eines Geräts zu invertieren, einfach ``/!``
an den Konfiguraionsstring anfügen: ``0x27/B0/!``
:param cfg_str: Konfigurationsstring
:return: Neues ``_MCP23017Device``-Objekt
"""
cfg_parts = cfg_str.split('/')
if len(cfg_parts) < 2:
raise PinConfigInvalid(cfg_str)
@ -251,8 +282,6 @@ class Io(io.Io):
def __init__(self, app: models.AppInterface):
super().__init__()
self.cb_manual: Optional[Callable[[int], None]] = None
self.cb_mode: Optional[Callable[[], None]] = None
self.cfg = app.get_cfg()
self.bus = smbus.SMBus(self.cfg.i2c_bus_id)
@ -263,7 +292,7 @@ class Io(io.Io):
self.output_devices: Dict[str, _MCP23017Device] = {}
self.input_devices: Dict[str, _MCP23017Device] = {}
# Parse config
# Parse config and initialize pins
for key, cfg_str in self.cfg.input_devices.items():
device = _MCP23017Device.from_config(cfg_str)
self.mcp_addresses.add(device.i2c_address)
@ -278,6 +307,15 @@ class Io(io.Io):
i2c_address: int,
register: int,
use_cache=False) -> int:
"""
Lese ein Byte Daten von einem I2C-Gerät
:param i2c_address: I2C-Adresse
:param register: I2C-Register
:param use_cache: Daten zwischenspeichern und bei erneutem
Aufruf nicht erneut abfragen.
:return: Datenbyte
"""
key = (i2c_address, register)
if use_cache and key in self.i2c_cache:
@ -291,19 +329,44 @@ class Io(io.Io):
return data
def _i2c_write_byte(self, i2c_address: int, register: int, value: int):
"""
Schreibe ein Byte Daten an ein I2C-Gerät
:param i2c_address: I2C-Adresse
:param register: I2C-Register
:param value: Datenbyte
"""
self.bus.write_byte_data(i2c_address, register, value)
def _i2c_read_bit(self,
i2c_address: int,
register: int,
bit: int,
use_cache=False):
use_cache=False) -> bool:
"""
Lese ein Bit Daten von einem I2C-Gerät
:param i2c_address: I2C-Adresse
:param register: I2C-Register
:param bit: Nummer des Bits (0-7)
:param use_cache: Daten zwischenspeichern und bei erneutem
Aufruf nicht erneut abfragen.
:return Datenbit
"""
data = self._i2c_read_byte(i2c_address, register, use_cache)
bitmask = 1 << bit
return bool(data & bitmask)
def _i2c_write_bit(self, i2c_address: int, register: int, bit: int,
value: bool):
"""
Schreibe ein Bit Daten an ein I2C-Gerät
:param i2c_address: I2C-Adresse
:param register: I2C-Register
:param bit: Nummer des Bits (0-7)
:param value: Datenbit
"""
data = self._i2c_read_byte(i2c_address, register)
bitmask = 1 << bit
@ -315,26 +378,48 @@ class Io(io.Io):
self._i2c_write_byte(i2c_address, register, data)
def _i2c_clear_cache(self):
"""
Leere den I2C-Cache
(verwendet von ``_i2c_read_bit()`` und ``_i2c_read_byte()``).
"""
self.i2c_cache = {}
def _configure_mcp(self, i2c_address: int):
"""Schreibe die initiale Konfiguration an ein MCP23017-Gerät"""
# I/O-Modus: Lesend
self._i2c_write_byte(i2c_address, MCP_IODIRA, 0xff)
self._i2c_write_byte(i2c_address, MCP_IODIRB, 0xff)
# I/O-Polarität: Active-high
self._i2c_write_byte(i2c_address, MCP_IPOLA, 0)
self._i2c_write_byte(i2c_address, MCP_IPOLB, 0)
# Interrupt aus
self._i2c_write_byte(i2c_address, MCP_GPINTENA, 0)
self._i2c_write_byte(i2c_address, MCP_GPINTENB, 0)
self._i2c_write_byte(i2c_address, MCP_DEFVALA, 0)
self._i2c_write_byte(i2c_address, MCP_DEFVALB, 0)
self._i2c_write_byte(i2c_address, MCP_INTCONA, 0)
self._i2c_write_byte(i2c_address, MCP_INTCONB, 0)
# Interrupt-Polarität: Active-high
# Interrupt-Ports spiegeln
self._i2c_write_byte(i2c_address, MCP_IOCON, 0b01000010)
# Pullup aus
self._i2c_write_byte(i2c_address, MCP_GPPUA, 0)
self._i2c_write_byte(i2c_address, MCP_GPPUB, 0)
# Outputs aus
self._i2c_write_byte(i2c_address, MCP_OLATA, 0)
self._i2c_write_byte(i2c_address, MCP_OLATB, 0)
def _configure_input_device(self, device: _MCP23017Device):
"""
Konfiguriere einen MCP-Pin als Eingabegerät
:param device: Gerätedefinition
"""
if device.invert:
self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_IPOLA),
device.pin, True)
@ -343,6 +428,11 @@ class Io(io.Io):
device.pin, True)
def _configure_output_device(self, device: _MCP23017Device):
"""
Konfiguriere einen MCP-Pin als Ausgabegerät
:param device: Gerätedefinition
"""
if device.invert:
# self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_IPOLA),
# device.pin, True)
@ -353,6 +443,11 @@ class Io(io.Io):
device.pin, False)
def _read_interrupt(self) -> Optional[str]:
"""
Rufe ab, welches Eingabegerät den Interrupt ausgelöst hat
:return: Name des Eingabegeräts (oder None)
"""
self._i2c_clear_cache()
for key, device in self.input_devices.items():
@ -364,6 +459,11 @@ class Io(io.Io):
return None
def _read_inputs(self) -> Dict[str, bool]:
"""
Lese die Zustände aller Eingabegeräte aus
:return: Dict(Gerätename => Zustand)
"""
res = {}
self._i2c_clear_cache()
@ -375,6 +475,13 @@ class Io(io.Io):
return res
def _interrupt_handler(self, int_pin: int): # pylint: disable=unused-argument
"""
Diese Funktion wird von der RPi-GPIO-Library bei einem Interrupt des MCP23017
(Zustandswechsel eines Eingabegeräts) aufgerufen.
Es wird abgefragt, welcher Input den Interrupt ausgelöst hat und der
entsprechende Input-Callback ausgelöst.
"""
key = self._read_interrupt()
if key is None:
return
@ -389,16 +496,7 @@ class Io(io.Io):
logging.debug('%s pressed', key)
else:
logging.debug('%s released', key)
if key == 'BT_MODE':
self._trigger_cb_mode()
elif key.startswith('BT_Z_'):
zoneid_str = key[5:]
try:
zoneid = int(zoneid_str)
except ValueError:
return
self._trigger_cb_manual(zoneid)
self._trigger_cb(key)
def write_output(self, key: str, val: bool):
device = self.output_devices[key]

View file

@ -36,18 +36,6 @@ class Job:
befinden.
"""
@property
def is_active(self) -> bool:
"""
Gibt True zurück, wenn der Bewässerungsjob in Zukunft ausgeführt werden wird.
"""
if not self.enable:
return False
if self.repeat:
return True
return self.date > util.datetime_now()
def check(self, date_now: datetime) -> bool:
"""
Gibt True zurück, wenn der Bewässerungsjob in dieser Minute

View file

@ -1,6 +1,6 @@
# coding=utf-8
import logging
from typing import Any, Dict, List, Optional
from typing import List, Optional
from tsgrain_controller import models, util
@ -20,19 +20,23 @@ class TaskQueue(util.StoppableThread, TaskHolder):
self.tasks: List[models.Task] = []
self.running_task: Optional[models.Task] = None
def enqueue(self, task: models.Task, exclusive: bool = False) -> bool:
def enqueue(self, task: models.Task, queuing: bool = True) -> bool:
"""
Fügt der Warteschlange einen neuen Task hinzu. Die Warteschlange
kann nicht mehrere Tasks der selben Quelle und Zone aufnehmen.
Kann ein Task nicht aufgenommen werden, wird False zurückgegeben.
:param task: Neuer Task
:param exclusive: Wenn True, verbiete mehrere Tasks von der selben Quelle
:param queuing: Füge Task der Warteschlange hinzu, wenn bereits
ein anderer Task läuft
:return: True wenn Task erfolgreich hinzugefügt
"""
if not queuing and self.running_task is not None:
return False
# Abbrechen, wenn bereits ein Task mit gleicher Quelle und Zone existiert
for t in self.tasks:
if t.source == task.source and (exclusive
or t.zone_id == task.zone_id):
if t.source == task.source and t.zone_id == task.zone_id:
return False
self.tasks.append(task)
@ -70,10 +74,6 @@ class TaskQueue(util.StoppableThread, TaskHolder):
"""Task zur Speicherung in der Datenbank in dict umwandeln."""
return self.tasks
def serialize_rpc(self) -> Dict[str, Any]:
"""Task zur aktuellen Statusübertragung in dict umwandeln."""
return {'current_time': util.datetime_now(), 'tasks': self.serialize()}
def run_cycle(self):
# Get a new task if none is running
if self.running_task is None: