diff --git a/.gitignore b/.gitignore index 5faf2c6..be99abb 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ /.tox __pycache__ *.egg-info +/tsgrain.toml +/raindb.json diff --git a/README.rst b/README.rst index 53d31d4..d4b3b29 100644 --- a/README.rst +++ b/README.rst @@ -26,7 +26,7 @@ wodurch das Bewässerungsprogramm beim Neustart fortgesetzt wird. 5. Ist der Timer abgelaufen, wird der Ventilausgang deaktivert und der Bewässerungsauftrag aus der Warteschlange entfernt. 6. Wird ein Bewässerungsauftrag abgebrochen (entweder durch Tastendruck oder durch - Löschen des Zeitplans, muss ebenfalls der Ventilausgang deaktivert und der + Löschen des Zeitplans, muss ebenfalls der Ventilausgang deaktiviert und der Bewässerungsauftrag aus der Warteschlange entfernt werden. 7. Wird der Controller beendet, werden alle laufenden Aufträge angehalten. Die Ventilausgänge werden deaktiviert und die gesamte Warteschlange inklusive @@ -75,15 +75,21 @@ Der Controller kann über eine GRPC-Schnittstelle mit anderen Anwendungen kommun Datenmodelle ============ +Source +------ + +``name`` + Name der Quelle: manual, schedule + +``priority`` + **Priorität:** Priorität der Quelle + Task ---- ``source`` **Quelle:** Zeitplan (mit Zeitplan-ID), Tastendruck -``priority`` - **Priorität:** Priorität der Bewässerungsaufgabe - ``zone: int`` ID der Bewässerungszone (Platz) @@ -97,7 +103,7 @@ Task Schedule -------- -``datetime: datetime`` +``date: datetime`` Datum/Uhrzeit ``duration: int`` @@ -108,3 +114,13 @@ Schedule ``repeat: bool`` Zeitplan täglich wiederholen + + +Konfiguration +============= + +``MAX_ACTIVE_ZONES`` + +in/output pins + + diff --git a/requirements.txt b/requirements.txt index ed69640..0185ff4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,4 @@ -tinydb +tinydb~=4.6.1 +cyra~=1.0.2 +schedule~=1.1.0 +smbus~=1.1.post2 diff --git a/requirements_test.txt b/requirements_test.txt index 9955dec..48792e3 100644 --- a/requirements_test.txt +++ b/requirements_test.txt @@ -1,2 +1,5 @@ pytest pytest-cov +pytest-mock +py-defer +importlib_resources diff --git a/setup.py b/setup.py index 5498a54..3fcc186 100644 --- a/setup.py +++ b/setup.py @@ -23,6 +23,8 @@ setuptools.setup( py_modules=['tsgrain_controller'], install_requires=[ 'tinydb', + 'cyra', + 'schedule', ], packages=setuptools.find_packages(exclude=['tests*']), entry_points={ diff --git a/tests/fixtures.py b/tests/fixtures.py new file mode 100644 index 0000000..1328759 --- /dev/null +++ b/tests/fixtures.py @@ -0,0 +1,45 @@ +# coding=utf-8 +from typing import Optional, Callable, Dict +from importlib_resources import files +import os + +from tsgrain_controller import io, util, config + +DIR_TESTFILES = str(files('tests.testfiles').joinpath('')) +FILE_CFG = os.path.join(DIR_TESTFILES, 'tsgrain.toml') + + +class TestingIo(io.Io): + + def __init__(self, cb_manual: Optional[Callable[[int], None]], + cb_mode: Optional[Callable[[], None]]): + self.cb_manual = cb_manual + self.cb_mode = cb_mode + + self.outputs: Dict[str, bool] = dict() + + def trigger_cb_manual(self, zone_id: int): + if self.cb_manual is not None: + self.cb_manual(zone_id) + + def trigger_cb_mode(self): + if self.cb_mode is not None: + self.cb_mode() + + def write_output(self, key: str, val: bool): + self.outputs[key] = val + + +class TestingApp(util.AppInterface): + + def __init__(self, db_file=''): + self.auto = False + self.cfg = config.Config(FILE_CFG) + self.cfg.load_file(False) + self.cfg.db_path = db_file + + def is_auto_enabled(self) -> bool: + return self.auto + + def get_cfg(self) -> config.Config: + return self.cfg diff --git a/tests/test_database.py b/tests/test_database.py new file mode 100644 index 0000000..37a593e --- /dev/null +++ b/tests/test_database.py @@ -0,0 +1,93 @@ +# coding=utf-8 +import tempfile +import os + +from tests import fixtures +from tsgrain_controller import database, util, queue, models + + +def test_db_init(): + with tempfile.TemporaryDirectory() as td: + dbfile = os.path.join(td, 'raindb.json') + db = database.RainDB(dbfile) + db.close() + + assert os.path.isfile(dbfile) + + +def test_db_jobs(): + with tempfile.TemporaryDirectory() as td: + dbfile = os.path.join(td, 'raindb.json') + db = database.RainDB(dbfile) + + job1 = models.Job(util.datetime_new(2022, 1, 10, 12, 30), 60, [1, 3], + True, False) + job2 = models.Job(util.datetime_new(2022, 1, 10, 12, 30), 60, [1, 3], + True, True) + + db.insert_job(job1) + db.insert_job(job2) + + # Reopen database + db.close() + db = database.RainDB(dbfile) + + jobs = db.get_jobs() + assert util.to_json(jobs) == '{"1": {"date": "2022-01-10T12:30:00", \ +"duration": 60, "zones": [1, 3], "enable": [1, 3], "repeat": false}, "2": \ +{"date": "2022-01-10T12:30:00", "duration": 60, "zones": [1, 3], \ +"enable": [1, 3], "repeat": true}}' + + job2.enable = False + db.update_job(2, job2) + db.delete_job(1) + + jobs = db.get_jobs() + assert util.to_json(jobs) == '{"2": {"date": "2022-01-10T12:30:00", \ +"duration": 60, "zones": [1, 3], "enable": [1, 3], "repeat": true}}' + + +def test_db_queue(): + app = fixtures.TestingApp() + q = queue.TaskQueue(app) + + task1 = models.Task(models.Source.MANUAL, 1, 10) + task2 = models.Task(models.Source.SCHEDULE, 1, 5) + + assert q.enqueue(task1) + assert q.enqueue(task2) + + with tempfile.TemporaryDirectory() as td: + dbfile = os.path.join(td, 'raindb.json') + app = fixtures.TestingApp(dbfile) + + db = database.RainDB(dbfile) + db.store_queue(q) + + # Reopen database + db.close() + db = database.RainDB(dbfile) + + read_queue = queue.TaskQueue(app) + db.load_queue(read_queue) + + assert util.to_json( + read_queue) == '[{"source": "MANUAL", "zone_id": 1, \ +"duration": 10, "remaining": 10}, {"source": "SCHEDULE", "zone_id": 1, "duration": 5, \ +"remaining": 5}]' + + +def test_db_auto_mode(): + with tempfile.TemporaryDirectory() as td: + dbfile = os.path.join(td, 'raindb.json') + + db = database.RainDB(dbfile) + assert db.get_auto_mode() is False + + db.set_auto_mode(True) + assert db.get_auto_mode() is True + + # Reopen database + db.close() + db = database.RainDB(dbfile) + assert db.get_auto_mode() is True diff --git a/tests/test_output.py b/tests/test_output.py new file mode 100644 index 0000000..1163240 --- /dev/null +++ b/tests/test_output.py @@ -0,0 +1,106 @@ +# coding=utf-8 +from typing import Optional +import time +import defer +from unittest import mock + +from tests import fixtures +from tsgrain_controller import output, queue, models + + +class _TaskHolder(queue.TaskHolder): + + def __init__(self, task: Optional[models.Task]): + self.task = task + + def get_current_task(self) -> Optional[models.Task]: + return self.task + + +@defer.with_defer +def test_zone_outputs(): + io = fixtures.TestingIo(None, None) + task_holder = _TaskHolder(None) + app = fixtures.TestingApp() + op = output.Outputs(io, task_holder, app) + op.start() + defer.defer(op.stop) + + time.sleep(0.01) + + # Expect all devices initialized to OFF + assert io.outputs == { + 'VALVE_1': False, + 'VALVE_2': False, + 'VALVE_3': False, + 'LED_Z_1': False, + 'LED_Z_2': False, + 'LED_Z_3': False, + 'LED_M_AUTO': False, + 'LED_M_MAN': False, + } + + task_holder.task = models.Task(models.Source.MANUAL, 1, 100) + time.sleep(0.01) + assert io.outputs == { + 'VALVE_1': True, + 'VALVE_2': False, + 'VALVE_3': False, + 'LED_Z_1': True, + 'LED_Z_2': False, + 'LED_Z_3': False, + 'LED_M_AUTO': False, + # Manual LED is blinking + 'LED_M_MAN': mock.ANY, + } + + task_holder.task = None + time.sleep(0.01) + assert io.outputs == { + 'VALVE_1': False, + 'VALVE_2': False, + 'VALVE_3': False, + 'LED_Z_1': False, + 'LED_Z_2': False, + 'LED_Z_3': False, + 'LED_M_AUTO': False, + 'LED_M_MAN': False, + } + + +""" +@defer.with_defer +def test_zone_led_blink(): + io = fixtures.TestingIo(None, None) + task_holder = _TaskHolder(None) + app = fixtures.TestingApp() + op = output.Outputs(io, task_holder, app) + + def get_blink_time(key: str) -> Optional[int]: + start_time = time.time_ns() + first_pulse_ns = 0 + old_state = io.outputs[key] + + while (time.time_ns() - start_time) < 1e9: + state = io.outputs[key] + if state != old_state: + old_state = state + + if first_pulse_ns == 0: + first_pulse_ns = time.time_ns() + else: + return int((time.time_ns() - first_pulse_ns) / 1e6) + + return None + + op.start() + defer.defer(op.stop) + + task_holder.task = models.Task(models.Source.MANUAL, 1, 60) + time.sleep(0.005) + assert get_blink_time('LED_Z_1') is None + + task_holder.task = models.Task(models.Source.MANUAL, 1, 30) + time.sleep(0.005) + assert get_blink_time('LED_Z_1') == pytest.approx(250, 0.1) +""" diff --git a/tests/test_queue.py b/tests/test_queue.py new file mode 100644 index 0000000..8cff419 --- /dev/null +++ b/tests/test_queue.py @@ -0,0 +1,82 @@ +# coding=utf-8 +import time +from datetime import datetime + +from tests import fixtures +from tsgrain_controller import queue, util, models + + +def test_task(): + task = models.Task(models.Source.MANUAL, 1, 10) + + assert task.serialize() == { + 'source': 'MANUAL', + 'zone_id': 1, + 'duration': 10, + 'remaining': 10 + } + + assert task.serialize_rpc() == { + 'source': 'MANUAL', + 'zone_id': 1, + 'duration': 10, + 'datetime_started': None, + 'datetime_finished': None + } + + +def test_task_started(mocker): + mocker.patch('tsgrain_controller.util.datetime_now', + return_value=datetime(2022, 1, 10, 6, 0)) + + task = models.Task(models.Source.MANUAL, 1, 10) + task.start() + + assert task.serialize() == { + 'source': 'MANUAL', + 'zone_id': 1, + 'duration': 10, + 'remaining': 10 + } + + assert task.serialize_rpc() == { + 'source': 'MANUAL', + 'zone_id': 1, + 'duration': 10, + 'datetime_started': datetime(2022, 1, 10, 6, 0), + 'datetime_finished': datetime(2022, 1, 10, 6, 0, 10) + } + + +def test_add_tasks(): + app = fixtures.TestingApp() + q = queue.TaskQueue(app) + + task1 = models.Task(models.Source.MANUAL, 1, 10) + task1b = models.Task(models.Source.SCHEDULE, 1, 5) + task1c = models.Task(models.Source.MANUAL, 1, 5) + + assert q.enqueue(task1) + assert q.enqueue(task1b) + assert not q.enqueue(task1c) + + +def test_queue_runner(): + app = fixtures.TestingApp() + q = queue.TaskQueue(app) + + task1 = models.Task(models.Source.MANUAL, 1, 1) + task2 = models.Task(models.Source.MANUAL, 2, 5) + task3 = models.Task(models.Source.SCHEDULE, 2, 10) + + assert q.enqueue(task1) + assert q.enqueue(task2) + assert q.enqueue(task3) + + q.start() + time.sleep(0.9) + q.stop() + + assert util.to_json(q) == \ + '[{"source": "MANUAL", "zone_id": 2, "duration": 5, "remaining": 4}, \ +{"source": "SCHEDULE", "zone_id": 2, "duration": 10, "remaining": 10}]' diff --git a/tests/test_util.py b/tests/test_util.py new file mode 100644 index 0000000..3608edf --- /dev/null +++ b/tests/test_util.py @@ -0,0 +1,42 @@ +# coding=utf-8 +from dataclasses import dataclass +from enum import Enum +import datetime +import pytest +from tsgrain_controller import util + + +@dataclass +class _TestCls: + name: str + money: int + + +class _TestClsSerializable(_TestCls): + + def serialize(self): + return {'name': 'Little' + self.name, 'money': self.money} + + +class _TestEnum(Enum): + OPT1 = 1 + OPT2 = 2 + + +@pytest.mark.parametrize('o,expected', [ + ('hello', '"hello"'), + (42, '42'), + ({ + 'k1': 'v1', + 'k2': 'v2' + }, '{"k1": "v1", "k2": "v2"}'), + (_TestEnum.OPT2, '2'), + (_TestCls("Mary", 121), '{"name": "Mary", "money": 121}'), + (_TestClsSerializable("Mary", + 121), '{"name": "LittleMary", "money": 121}'), + (datetime.datetime(2022, 1, 2, 15, 30, 11), '"2022-01-02T15:30:11"'), + (datetime.date(2022, 1, 2), '"2022-01-02"'), +]) +def test_to_json(o, expected): + res = util.to_json(o) + assert res == expected diff --git a/tests/testfiles/tsgrain.toml b/tests/testfiles/tsgrain.toml new file mode 100644 index 0000000..01f63cc --- /dev/null +++ b/tests/testfiles/tsgrain.toml @@ -0,0 +1 @@ +n_zones = 3 # Anzahl Bewässerungszonen diff --git a/tsgrain_controller/__main__.py b/tsgrain_controller/__main__.py index d478924..1a9e4ae 100644 --- a/tsgrain_controller/__main__.py +++ b/tsgrain_controller/__main__.py @@ -1,5 +1,30 @@ +import signal +import sys + +from tsgrain_controller import application + + def run(): - pass + console_flag = False + + if len(sys.argv) > 1: + x = sys.argv[1] + if x.startswith('c'): + console_flag = True + + app = application.Application(console_flag) + + def _signal_handler(sig, frame): + app.stop() + + print('Exited.') + sys.exit(0) + + signal.signal(signal.SIGINT, _signal_handler) + + app.start() + + signal.pause() if __name__ == '__main__': diff --git a/tsgrain_controller/application.py b/tsgrain_controller/application.py new file mode 100644 index 0000000..e0819ee --- /dev/null +++ b/tsgrain_controller/application.py @@ -0,0 +1,64 @@ +# coding=utf-8 +import logging + +from tsgrain_controller import config, controller, database, io, jobschedule, output, \ + queue, util + + +class Application(util.AppInterface): + + def __init__(self, console_flag: bool): + self.logger = logging.getLogger() + self.logger.setLevel(logging.DEBUG) + + self.cfg = config.Config('tsgrain.toml') + self.cfg.load_file() + + self.db = database.RainDB(self.cfg.db_path) + + self.queue = queue.TaskQueue(self) + self.db.load_queue(self.queue) + + b_ctrl = controller.ButtonController(self.queue, self.cfg) + + self.io = io.new_io(self, console_flag) + self.io.set_callbacks(b_ctrl.cb_manual, self.cb_modekey) + + self.outputs = output.Outputs(self.io, self.queue, self) + + self.scheduler = jobschedule.Scheduler(self.db, self.queue) + + self.auto_en = self.db.get_auto_mode() + + def is_auto_enabled(self) -> bool: + return self.auto_en + + def get_cfg(self) -> config.Config: + return self.cfg + + def get_logger(self) -> logging.Logger: + return self.logger + + def cb_modekey(self): + self.auto_en = not self.auto_en + + if self.auto_en: + logging.info('Auto mode ON') + else: + logging.info('Auto mode OFF') + + def start(self): + self.io.start() + self.outputs.start() + self.queue.start() + self.scheduler.start() + + def stop(self): + self.scheduler.stop() + self.queue.stop() + self.outputs.stop() + self.io.stop() + + # Store application state + self.db.store_queue(self.queue) + self.db.set_auto_mode(self.auto_en) diff --git a/tsgrain_controller/config.py b/tsgrain_controller/config.py new file mode 100644 index 0000000..6975be4 --- /dev/null +++ b/tsgrain_controller/config.py @@ -0,0 +1,66 @@ +# coding=utf-8 +import cyra + + +class Config(cyra.Config): + builder = cyra.ConfigBuilder() + + builder.comment('Anzahl Bewaesserungszonen') + n_zones = builder.define('n_zones', 7) + + builder.comment('Pfad der Datenbankdatei') + db_path = builder.define('db_path', 'raindb.json') + + builder.comment('Manuelle Bewaesserungszeit in Sekunden') + manual_time = builder.define('manual_time', 300) + + builder.comment('Debug-Ausgaben loggen') + log_debug = builder.define('log_debug', False) + + builder.push('io') + + builder.comment('ID des I2C-Bus') + i2c_bus_id = builder.define('i2c_bus_id', 0) + + builder.comment( + 'GPIO-Pin, mit dem der Interrupt-Pin des MCP23017 verbunden ist') + gpio_interrupt = builder.define('gpio_interrupt', 17) + + builder.comment('Entprellzeit in Sekunden') + gpio_delay = builder.define('gpio_delay', 0.05) + + builder.pop() + + builder.comment('Ausgaenge') + output_devices = builder.define( + 'output_devices', { + 'VALVE_1': '0x27/B0/!', + 'VALVE_2': '0x27/B1/!', + 'VALVE_3': '0x27/B2/!', + 'VALVE_4': '0x27/B3/!', + 'VALVE_5': '0x27/B4/!', + 'VALVE_6': '0x27/B5/!', + 'VALVE_7': '0x27/B6/!', + 'LED_Z_1': '0x27/A0', + 'LED_Z_2': '0x27/A1', + 'LED_Z_3': '0x27/A2', + 'LED_Z_4': '0x27/A3', + 'LED_Z_5': '0x27/A4', + 'LED_Z_6': '0x27/A5', + 'LED_Z_7': '0x27/A6', + 'LED_M_AUTO': '0x23/B0', + 'LED_M_MAN': '0x23/B1', + }) + + builder.comment('Eingaenge') + input_devices = builder.define( + 'input_devices', { + 'BT_Z_1': '0x23/A0/!', + 'BT_Z_2': '0x23/A1/!', + 'BT_Z_3': '0x23/A2/!', + 'BT_Z_4': '0x23/A3/!', + 'BT_Z_5': '0x23/A4/!', + 'BT_Z_6': '0x23/A5/!', + 'BT_Z_7': '0x23/A6/!', + 'BT_MODE': '0x23/A7/!', + }) diff --git a/tsgrain_controller/controller.py b/tsgrain_controller/controller.py new file mode 100644 index 0000000..7d686f4 --- /dev/null +++ b/tsgrain_controller/controller.py @@ -0,0 +1,36 @@ +# coding=utf-8 +from typing import Callable, Optional + +from tsgrain_controller import config, models, queue + + +class ButtonController: + + def __init__(self, task_queue: queue.TaskQueue, cfg: config.Config): + self.task_queue = task_queue + self.cfg = cfg + self.cb_error: Optional[Callable[[], None]] = None + + def cb_manual(self, zone_id: int): + current_task = self.task_queue.get_current_task() + # Cancel manually started tasks + if current_task is not None \ + and current_task.zone_id == zone_id and current_task.source == models.Source.MANUAL: + self.task_queue.cancel_current_task() + else: + task = models.Task(models.Source.MANUAL, zone_id, + self.cfg.manual_time) + self.task_queue.enqueue(task, True) + + +class QueuingButtonController(ButtonController): + + def cb_manual(self, zone_id: int): + # If a task from this zone is in queue, cancel it + for task in self.task_queue.tasks: + if task.zone_id == zone_id and task.source == models.Source.MANUAL: + self.task_queue.cancel_task(task) + return + + task = models.Task(models.Source.MANUAL, zone_id, self.cfg.manual_time) + self.task_queue.enqueue(task) diff --git a/tsgrain_controller/database.py b/tsgrain_controller/database.py new file mode 100644 index 0000000..10ae766 --- /dev/null +++ b/tsgrain_controller/database.py @@ -0,0 +1,115 @@ +# coding=utf-8 +import tinydb +from typing import Dict, Optional +from tsgrain_controller import queue, util, models + + +class RainDB: + + def __init__(self, db_path: str): + self._db = tinydb.TinyDB(db_path, default=util.serializer) + + self._jobs = self._db.table('jobs', cache_size=0) + self._tasks = self._db.table('tasks', cache_size=0) + self._options = self._db.table('options', cache_size=0) + + def close(self): + """Die Datenbank schließen""" + self._db.close() + + def get_jobs(self) -> Dict[int, models.Job]: + """ + Gibt alle gespeicherten Bewässerungsjobs zurück + + :return: Bewässerungsjobs: dict(id -> models.Job) + """ + res = dict() + for job_data in self._jobs.all(): + res[job_data.doc_id] = models.Job.deserialize(job_data) + return res + + def get_job(self, job_id: int) -> Optional[models.Job]: + """ + Gibt den Bewässerungsjob mit der gegebenen ID zurück + + :param job_id: ID des Bewässerungsjobs + :return: Bewässerungsjob + """ + job = self._jobs.get(None, job_id) + if job is None: + return None + + return models.Job.deserialize(job) + + def insert_job(self, job: models.Job): + """ + Fügt der Datenbank einen neuen Bewässerungsjob hinzu + + :param job: Bewässerungsjob + """ + self._jobs.insert(util.serializer(job)) + + def update_job(self, job_id: int, job: models.Job): + """ + Aktualisiert einen Bewässerungsjob + + :param job_id: ID des Bewässerungsjobs + :param job: Bewässerungsjob + """ + self._jobs.update(util.serializer(job), None, [job_id]) + + def delete_job(self, job_id: int): + """ + Lösche den Bewässerungsjob mit der gegebenen ID + + :param job_id: ID des Bewässerungsjobs + """ + self._jobs.remove(None, [job_id]) + + def store_queue(self, task_queue: queue.TaskQueue): + """ + Speichere die aktuelle Warteschlange in der Datenbank + + :param task_queue: Warteschlange + """ + self.empty_queue() + for task in task_queue.serialize(): + self._tasks.insert(util.serializer(task)) + + def load_queue(self, task_queue: queue.TaskQueue): + """ + Lade die gespeicherten Tasks aus der Datenbank in die Warteschlange + + :param task_queue: Warteschlange + """ + for task_data in self._tasks.all(): + task = models.Task.deserialize(task_data) + task_queue.tasks.append(task) + self.empty_queue() + + def empty_queue(self): + """Lösche die gespeicherte Warteschlange""" + self._tasks.truncate() + + def set_auto_mode(self, state: bool): + """ + Speichere den Status des Automatikmodus + + :param state: Automatikstatus + """ + self._options.upsert({ + 'key': 'auto_mode', + 'val': state + }, + tinydb.Query().key == 'auto_mode') + + def get_auto_mode(self) -> bool: + """ + Rufe den Status des Automatikmodus ab + + :return: Automatikstatus + """ + option = self._options.get(tinydb.Query().key == 'auto_mode') + if option is None: + return False + return option.get('val', False) diff --git a/tsgrain_controller/io/__init__.py b/tsgrain_controller/io/__init__.py new file mode 100644 index 0000000..1ad3ac0 --- /dev/null +++ b/tsgrain_controller/io/__init__.py @@ -0,0 +1,32 @@ +# coding=utf-8 +from typing import Callable, Optional + +from tsgrain_controller import util + + +class Io: + + def set_callbacks(self, cb_manual: Optional[Callable[[int], None]], + cb_mode: Optional[Callable[[], None]]): + pass + + def start(self): + pass + + def stop(self): + pass + + def write_output(self, key: str, val: bool): + pass + + +def new_io(app: util.AppInterface, console_flag: bool) -> Io: + if not console_flag: + try: + from tsgrain_controller.io import mcp23017 as io_mod + except ImportError: + from tsgrain_controller.io import console as io_mod + else: + from tsgrain_controller.io import console as io_mod + + return io_mod.Io(app) diff --git a/tsgrain_controller/io/console.py b/tsgrain_controller/io/console.py new file mode 100644 index 0000000..5acbdb2 --- /dev/null +++ b/tsgrain_controller/io/console.py @@ -0,0 +1,113 @@ +# coding=utf-8 +import curses +import logging +import math +from typing import Callable, Dict, Optional + +from tsgrain_controller import io, util + + +class CursesHandler(logging.Handler): + + def __init__(self, screen): + logging.Handler.__init__(self) + self.screen = screen + self.screen.scrollok(True) + self.screen.idlok(True) + self.screen.leaveok(True) + self.screen.refresh() + + def emit(self, record): + try: + msg = self.format(record) + screen = self.screen + fs = "\n%s" + screen.addstr(fs % msg) + screen.refresh() + + except (KeyboardInterrupt, SystemExit): + raise + + except: # noqa: E722 + self.handleError(record) + + +class Io(util.StoppableThread, io.Io): + + def __init__(self, app: util.AppInterface): + super().__init__(0.01) + self.app = app + self.cb_manual: Optional[Callable[[int], None]] = None + self.cb_mode: Optional[Callable[[], None]] = None + + self._screen: Optional = None + self._outputs: Dict[str, bool] = dict() + + def set_callbacks(self, cb_manual: Optional[Callable[[int], None]], + cb_mode: Optional[Callable[[], None]]): + self.cb_manual = cb_manual + self.cb_mode = cb_mode + + def setup(self): + screen = curses.initscr() + curses.noecho() + curses.curs_set(0) + screen.nodelay(True) + + max_y, max_x = screen.getmaxyx() + width_half = math.floor(max_x / 2) + self._screen = curses.newwin(max_y, width_half, 0, 0) + win_log = curses.newwin(max_y, width_half, 0, width_half) + + formatter_display = logging.Formatter( + '%(asctime)-8s|%(levelname)-5s| %(message)-s', '%H:%M:%S') + mh = CursesHandler(win_log) + mh.setFormatter(formatter_display) + self.app.get_logger().handlers = [] + self.app.get_logger().addHandler(mh) + + self._screen.nodelay(True) + + logging.info('Window size: max_x=%d, max_y=%d, width_half=%d', max_x, + max_y, width_half) + + def _trigger_cb_manual(self, zone_id: int): + if self.cb_manual is not None: + self.cb_manual(zone_id) + + def _trigger_cb_mode(self): + if self.cb_mode is not None: + self.cb_mode() + + def write_output(self, key: str, val: bool): + self._outputs[key] = val + + def run_cycle(self): + c = self._screen.getch() + + def state_str(state: bool) -> str: + if state: + return '●' + else: + return '○' + + # Mode key (0) + if c == 48: + self._trigger_cb_mode() + # Zone keys (1-7) + elif 49 <= c <= 55: + self._trigger_cb_manual(c - 48) + + self._screen.erase() + self._screen.addstr(0, 0, + 'Buttons: 1-7: Manual control, 0: Auto on/off') + + i = 1 + for key, output in self._outputs.items(): + self._screen.addstr(i, 0, '%s: %s' % (key, state_str(output))) + i += 1 + + def cleanup(self): + curses.echo() + curses.curs_set(1) + curses.endwin() diff --git a/tsgrain_controller/io/mcp23017.py b/tsgrain_controller/io/mcp23017.py new file mode 100644 index 0000000..d52d128 --- /dev/null +++ b/tsgrain_controller/io/mcp23017.py @@ -0,0 +1,448 @@ +# coding=utf-8 +import logging +import time +from dataclasses import dataclass +from enum import Enum +from typing import Callable, Dict, Optional, Tuple + +import RPi.GPIO as GPIO +import smbus + +from tsgrain_controller import io, util + +# MCP-Register +# Quelle: https://ww1.microchip.com/downloads/en/devicedoc/20001952c.pdf +# Seite 16, Table 3-3, 3-5 + +MCP_IODIRA = 0x00 +""" +I/O-Modus pro GPIOA-Pin (standardmäßig ``1`` / Eingang) + +- ``1`` Eingang +- ``0`` Ausgang +""" + +MCP_IODIRB = 0x01 +""" +I/O-Modus pro GPIOB-Pin (standardmäßig ``1`` / Eingang) + +- ``1`` Eingang +- ``0`` Ausgang +""" + +MCP_IPOLA = 0x02 +""" +Polarität pro GPIOA-Pin. + +- ``1`` Low wenn aktiv +- ``0`` High wenn aktiv + +Die Einstellung hatte im Test nur bei als Eingängen konfigurierten Pins +einen Effekt. Bei Ausgängen wird stattdessen der gesetzte Wert invertiert. +""" + +MCP_IPOLB = 0x03 +"""Polarität pro GPIOB-Pin. Invertiert wenn Bit gesetzt.""" + +MCP_GPINTENA = 0x04 +""" +Aktiviert einen Interrupt bei Zustandswechsel eines GPIOA-Pins. + +Das Auslöen von Interrupts +erfordert das Setzen der Register ``DEFVAL`` und ``INTCON``. +""" + +MCP_GPINTENB = 0x05 +""" +Aktiviert einen Interrupt bei Zustandswechsel eines GPIOB-Pins. + +Das Auslöen von Interrupts +erfordert das Setzen der Register ``DEFVAL`` und ``INTCON``. +""" + +MCP_DEFVALA = 0x06 +""" +Vergleichsregister für Interrupt bei Zustandswechsel eines GPIOA-Pins. + +Wenn mittels ``GPINTEN`` und ``INTCON``-Register ein vergleichender +Interrupt konfiguriert wurde, verursacht der Wechsel des Pin-Zustands +auf den gegenteiligen Wert von ``DEFVAL`` einen Interrupt. +""" + +MCP_DEFVALB = 0x07 +""" +Vergleichsregister für Interrupt bei Zustandswechsel eines GPIOB-Pins. + +Wenn mittels ``GPINTEN`` und ``INTCON``-Register ein vergleichender +Interrupt konfiguriert wurde, verursacht der Wechsel des Pin-Zustands +auf den gegenteiligen Wert von ``DEFVAL`` einen Interrupt. +""" + +MCP_INTCONA = 0x08 +""" +Wahl des Interruptmodus für GPIOA. + +- ``1`` Der Wert des Pins wird mit dem entsprechenden Bit im ``DEFVAL``-Register + verglichen. Beim gegenteiligen Wert erfolgt ein Interrupt. + +- ``0`` Ein Interrupt wird bei jeder Zustandsänderung des Pins ausgelöst. +""" + +MCP_INTCONB = 0x09 +""" +Wahl des Interruptmodus für GPIOB. + +- ``1`` Der Wert des Pins wird mit dem entsprechenden Bit im ``DEFVAL``-Register + verglichen. Beim gegenteiligen Wert erfolgt ein Interrupt. + +- ``0`` Ein Interrupt wird bei jeder Zustandsänderung des Pins ausgelöst. +""" + +MCP_IOCON = 0x0a +""" +Konfiguration für den I2C-Portexpander. + +Bit ``7``: **BANK** + Ändert die Adressierung der Register. Dieses Programm verwendet die + Standardkonfiguration 0, also nicht ändern. + +Bit ``6``: **MIRROR** + Wenn gesetzt, sind beide Interrupt-Pins ``INTA`` und ``INTB`` miteinander + verodert, d.h. ein Interrupt auf einem der zwei Ports aktiviert beide Pins. + +Bit ``5``: **SEQOP** + Sequenzieller Modus, inkrementiert den Adresszähler bei jedem Zugriff. + + - ``1`` Sequenzieller Modus deaktiviert + - ``0`` Sequenzieller Modus aktiviert + +Bit ``4``: **DISSLW** + Slew-Rate control + +Bit ``3``: nicht implementiert + +Bit ``2``: **ODR** + Konfiguriert den Interruptpin als Open-Drain-Ausgang, wenn gesetzt. + Wenn nicht gesetzt, ist der Interruptpin ein aktiv treibender Ausgang + mit der in Bit ``1`` festgelegten Polarität. + +Bit ``1``: **INTPOL** + Polarität des Interrupt-Pins + + - ``1`` High wenn aktiv + - ``0`` Low wenn aktiv + +Bit ``0``: nicht implementiert +""" + +MCP_GPPUA = 0x0c +"""GPIOA-Pullup-Widerstandsregister. 100k-Pullup aktiviert, wenn Bit gesetzt.""" + +MCP_GPPUB = 0x0d +"""GPIOB-Pullup-Widerstandsregister. 100k-Pullup aktiviert, wenn Bit gesetzt.""" + +MCP_INTFA = 0x0e +""" +Interrupt-Flag-Register. Wurde ein Interrupt auf GPIOA ausgelöst, ist +das entsprechende Bit in diesem Register gesetzt. Read-only. +""" + +MCP_INTFB = 0x0f +""" +Interrupt-Flag-Register. Wurde ein Interrupt auf GPIOB ausgelöst, ist +das entsprechende Bit in diesem Register gesetzt. Read-only. +""" + +MCP_INTCAPA = 0x10 +""" +Interrupt-Capture-Register. Wurde ein Interrupt auf GPIOA ausgelöst, +speichert dieses Register den entsprechenden Wert. +""" + +MCP_INTCAPB = 0x11 +""" +Interrupt-Capture-Register. Wurde ein Interrupt auf GPIOB ausgelöst, +speichert dieses Register den entsprechenden Wert. +""" + +MCP_GPIOA = 0x12 +""" +GPIOA-Portregister. Wert entspricht dem Zustand der GPIO-Pins. +Wird in dieses Register geschrieben, wird das ``OLAT``-Register verändert. +""" + +MCP_GPIOB = 0x13 +""" +GPIOA-Portregister. Wert entspricht dem Zustand der GPIO-Pins. +Wird in dieses Register geschrieben, wird das ``OLAT``-Register verändert. +""" + +MCP_OLATA = 0x14 +""" +GPIOA-Output-Latch-Register. Wert entspricht dem Zustand der Output-Latches, +die wiederum die als Output konfigurierten Pins ansteuern. +""" + +MCP_OLATB = 0x15 +""" +GPIOB-Output-Latch-Register. Wert entspricht dem Zustand der Output-Latches, +die wiederum die als Output konfigurierten Pins ansteuern. +""" + + +class PinConfigInvalid(Exception): + + def __init__(self, cfg_str: str): + super().__init__('MCP23017 pin config %s invalid' % cfg_str) + + +class _MCP23017Port(Enum): + A = 0 + B = 1 + + def reg(self, reg_a: int) -> int: + return reg_a + self.value + + +@dataclass +class _MCP23017Device: + i2c_address: int + port: _MCP23017Port + pin: int + invert: bool + + @classmethod + def from_config(cls, cfg_str: str): + cfg_parts = cfg_str.split('/') + if len(cfg_parts) < 2: + raise PinConfigInvalid(cfg_str) + + i2c_addr_str = cfg_parts[0] + port_str = cfg_parts[1][0] + pin_str = cfg_parts[1][1:] + + try: + i2c_addr = int(i2c_addr_str, 16) + except ValueError: + raise PinConfigInvalid(cfg_str) + + try: + port = _MCP23017Port[port_str] + except KeyError: + raise PinConfigInvalid(cfg_str) + + try: + pin = int(pin_str) + except ValueError: + raise PinConfigInvalid(cfg_str) + + invert = False + + if len(cfg_parts) >= 3: + attr_str = cfg_parts[2] + + invert = '!' in attr_str + + return cls(i2c_addr, port, pin, invert) + + +class Io(io.Io): + + def __init__(self, app: util.AppInterface): + self.cb_manual: Optional[Callable[[int], None]] = None + self.cb_mode: Optional[Callable[[], None]] = None + self.cfg = app.get_cfg() + + self.bus = smbus.SMBus(self.cfg.i2c_bus_id) + + self.i2c_cache: Dict[Tuple[int, int], int] = dict() + + self.mcp_addresses = set() + self.output_devices: Dict[str, _MCP23017Device] = dict() + self.input_devices: Dict[str, _MCP23017Device] = dict() + + # Parse config + for key, cfg_str in self.cfg.input_devices.items(): + device = _MCP23017Device.from_config(cfg_str) + self.mcp_addresses.add(device.i2c_address) + self.input_devices[key] = device + + for key, cfg_str in self.cfg.output_devices.items(): + device = _MCP23017Device.from_config(cfg_str) + self.mcp_addresses.add(device.i2c_address) + self.output_devices[key] = device + + def _trigger_cb_manual(self, zone_id: int): + if self.cb_manual is not None: + self.cb_manual(zone_id) + + def _trigger_cb_mode(self): + if self.cb_mode is not None: + self.cb_mode() + + def set_callbacks(self, cb_manual: Optional[Callable[[int], None]], + cb_mode: Optional[Callable[[], None]]): + self.cb_manual = cb_manual + self.cb_mode = cb_mode + + def _i2c_read_byte(self, + i2c_address: int, + register: int, + use_cache=False) -> int: + key = (i2c_address, register) + + if use_cache and key in self.i2c_cache: + return self.i2c_cache[key] + + data = self.bus.read_byte_data(i2c_address, register) + + if use_cache: + self.i2c_cache[key] = data + + return data + + def _i2c_write_byte(self, i2c_address: int, register: int, value: int): + self.bus.write_byte_data(i2c_address, register, value) + + def _i2c_read_bit(self, + i2c_address: int, + register: int, + bit: int, + use_cache=False): + data = self._i2c_read_byte(i2c_address, register, use_cache) + bitmask = 1 << bit + return bool(data & bitmask) + + def _i2c_write_bit(self, i2c_address: int, register: int, bit: int, + value: bool): + data = self._i2c_read_byte(i2c_address, register) + bitmask = 1 << bit + + if value: + data |= bitmask # Maskiertes Bit setzen + else: + data &= ~bitmask # Maskiertes Bit löschen + + self._i2c_write_byte(i2c_address, register, data) + + def _i2c_clear_cache(self): + self.i2c_cache = dict() + + def _configure_mcp(self, i2c_address: int): + self._i2c_write_byte(i2c_address, MCP_IODIRA, 0xff) + self._i2c_write_byte(i2c_address, MCP_IODIRB, 0xff) + self._i2c_write_byte(i2c_address, MCP_IPOLA, 0) + self._i2c_write_byte(i2c_address, MCP_IPOLB, 0) + self._i2c_write_byte(i2c_address, MCP_GPINTENA, 0) + self._i2c_write_byte(i2c_address, MCP_GPINTENB, 0) + self._i2c_write_byte(i2c_address, MCP_DEFVALA, 0) + self._i2c_write_byte(i2c_address, MCP_DEFVALB, 0) + self._i2c_write_byte(i2c_address, MCP_INTCONA, 0) + self._i2c_write_byte(i2c_address, MCP_INTCONB, 0) + self._i2c_write_byte(i2c_address, MCP_IOCON, 0b01000010) + self._i2c_write_byte(i2c_address, MCP_GPPUA, 0) + self._i2c_write_byte(i2c_address, MCP_GPPUB, 0) + self._i2c_write_byte(i2c_address, MCP_OLATA, 0) + self._i2c_write_byte(i2c_address, MCP_OLATB, 0) + + def _configure_input_device(self, device: _MCP23017Device): + if device.invert: + self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_IPOLA), + device.pin, True) + + self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_GPINTENA), + device.pin, True) + + def _configure_output_device(self, device: _MCP23017Device): + if device.invert: + # self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_IPOLA), + # device.pin, True) + self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_OLATA), + device.pin, True) + + self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_IODIRA), + device.pin, False) + + def _read_interrupt(self) -> Optional[str]: + self._i2c_clear_cache() + + for key, device in self.input_devices.items(): + if self._i2c_read_bit(device.i2c_address, + device.port.reg(MCP_INTFA), device.pin, + True): + return key + + return None + + def _read_inputs(self) -> Dict[str, bool]: + res = dict() + self._i2c_clear_cache() + + for key, device in self.input_devices.items(): + res[key] = self._i2c_read_bit(device.i2c_address, + device.port.reg(MCP_GPIOA), + device.pin, True) + + return res + + def _interrupt_handler(self, int_pin: int): + key = self._read_interrupt() + if key is None: + return + + time.sleep(self.cfg.gpio_delay) + + input_states = self._read_inputs() + if key not in input_states: + return + + if input_states[key]: + logging.debug('%s pressed', key) + else: + logging.debug('%s released', key) + + if key == 'BT_MODE': + self._trigger_cb_mode() + elif key.startswith('BT_Z_'): + zoneid_str = key[5:] + try: + zoneid = int(zoneid_str) + except ValueError: + return + self._trigger_cb_manual(zoneid) + + def write_output(self, key: str, val: bool): + device = self.output_devices[key] + if device.invert: + val = not val + + self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_OLATA), + device.pin, val) + + def start(self): + for i2c_address in self.mcp_addresses: + self._configure_mcp(i2c_address) + + for device in self.input_devices.values(): + self._configure_input_device(device) + + for device in self.output_devices.values(): + self._configure_output_device(device) + + # clear interrupts by reading inputs + self._read_inputs() + + GPIO.setmode(GPIO.BCM) + GPIO.setwarnings(False) + + GPIO.setup(self.cfg.gpio_interrupt, + GPIO.IN, + pull_up_down=GPIO.PUD_DOWN) + + GPIO.add_event_detect(self.cfg.gpio_interrupt, + GPIO.RISING, + callback=self._interrupt_handler, + bouncetime=10) + + def stop(self): + GPIO.cleanup() diff --git a/tsgrain_controller/jobschedule.py b/tsgrain_controller/jobschedule.py new file mode 100644 index 0000000..7e0e22d --- /dev/null +++ b/tsgrain_controller/jobschedule.py @@ -0,0 +1,38 @@ +# coding=utf-8 +from datetime import datetime +from typing import Optional + +import schedule + +from tsgrain_controller import database, models, queue, util + + +class Scheduler(util.StoppableThread): + + def __init__(self, db: database.RainDB, task_queue: queue.TaskQueue): + super().__init__(1) + self.db = db + self.queue = task_queue + self._job: Optional[schedule.Job] = None + + def _minute_handler(self): + jobs = self.db.get_jobs() + for job in jobs.values(): + if job.check(datetime.now()): + for zone in job.zones: + task = models.Task(models.Source.SCHEDULE, zone, + job.duration) + self.queue.enqueue(task) + + def start(self): + self._job = schedule.every().minute.at(':00').do(self._minute_handler) + super().start() + + def run_cycle(self): + schedule.run_pending() + + def stop(self): + super().stop() + if self._job: + schedule.cancel_job(self._job) + self._job = None diff --git a/tsgrain_controller/models.py b/tsgrain_controller/models.py new file mode 100644 index 0000000..15d94c3 --- /dev/null +++ b/tsgrain_controller/models.py @@ -0,0 +1,164 @@ +# coding=utf-8 +import uuid +from dataclasses import dataclass +from datetime import datetime, timedelta +from enum import Enum +from typing import List, Optional, Dict, Any + +from tsgrain_controller import util + + +@dataclass +class Job: + date: datetime + duration: int + zones: List[int] + enable: bool + repeat: bool + + @property + def is_active(self) -> bool: + if not self.enable: + return False + if self.repeat: + return True + + return self.date > util.datetime_now() + + def check(self, date_now: datetime) -> bool: + if not self.enable: + return False + + if self.repeat: + check_datetime = datetime.combine(date_now.date(), + self.date.time()) + else: + check_datetime = self.date + + return date_now - check_datetime < timedelta(minutes=1) + + @classmethod + def deserialize(cls, data: dict) -> 'Job': + return cls( + date=datetime.fromisoformat(data['date']), + duration=data['duration'], + zones=data['zones'], + enable=data['zones'], + repeat=data['repeat'], + ) + + +class Source(Enum): + MANUAL = 2 + SCHEDULE = 1 + + +@dataclass +class Task: + source: Source + """Quelle des Tasks (Manuell/Zeitplan)""" + + zone_id: int + """Nummer der Zone""" + + duration: int + """Beregnungsdauer in Sekunden""" + + _remaining: int = 0 + """Interne Variable, um die verbleibende Zeit eines gestoppten Tasks zu speichern""" + + _id: int = 0 + + datetime_started: Optional[datetime] = None + """Zeitpunkt, wann der Task gestartet wurde""" + + def __post_init__(self): + self._remaining = self.duration + self._id = uuid.uuid1().int + + @property + def is_running(self) -> bool: + """ + :return: True falls der Task momentan läuft + """ + return self.datetime_started is not None + + @property + def remaining(self) -> int: + """ + :return: Verbleibende Zeit in Sekunden + """ + if not self.is_running: + return self._remaining + + d = self.datetime_finished - util.datetime_now() + return d.seconds + + @property + def is_done(self) -> bool: + """ + :return: True wenn der Task bereits abgeschlossen ist. + """ + return self.remaining <= 0 + + @property + def datetime_finished(self) -> Optional[datetime]: + """ + :return: Zeitpunkt, zu dem der Task abgeschlossen sein wird. + None falls der Task momentan nicht läuft. + """ + if self.datetime_started is None: + return None + + return self.datetime_started + timedelta(seconds=self._remaining) + + def start(self): + """Startet den Task zur aktuellen Zeit.""" + if self.is_running: + raise util.TaskRunException('already running') + self.datetime_started = util.datetime_now() + + def stop(self): + """Stoppt den Task und speichert die verbleibende Zeit""" + if not self.is_running: + raise util.TaskRunException('not running') + + self._remaining = self.remaining + self.datetime_started = None + + def serialize(self) -> Dict[str, Any]: + """Task zur Speicherung in der Datenbank in dict umwandeln.""" + return { + 'source': self.source.name, + 'zone_id': self.zone_id, + 'duration': self.duration, + 'remaining': self.remaining + } + + def serialize_rpc(self) -> Dict[str, Any]: + """Task zur aktuellen Statusübertragung in dict umwandeln.""" + return { + 'source': self.source.name, + 'zone_id': self.zone_id, + 'duration': self.duration, + 'datetime_started': self.datetime_started, + 'datetime_finished': self.datetime_finished + } + + @classmethod + def deserialize(cls, data: dict) -> 'Task': + task = cls(source=Source[data['source']], + zone_id=data['zone_id'], + duration=data['duration']) + task._remaining = data['remaining'] + return task + + def __eq__(self, other: 'Task') -> bool: + return self._id == other._id + + def __hash__(self) -> int: + return hash(self._id) + + def __str__(self): + return 'ZONE %d: %ds (%s)' % (self.zone_id, self.duration, + self.source.name) diff --git a/tsgrain_controller/output.py b/tsgrain_controller/output.py new file mode 100644 index 0000000..87a694b --- /dev/null +++ b/tsgrain_controller/output.py @@ -0,0 +1,151 @@ +from dataclasses import dataclass +from typing import Dict +from tsgrain_controller import io, util, queue, models + + +@dataclass +class OutputState: + on: bool + # Blinks/second + blink_freq: float = 0 + + def is_on(self, ms_ticks: int) -> bool: + if self.on and self.blink_freq > 0: + period = 1000 / self.blink_freq + period_progress = ms_ticks % period + return period_progress < (period / 2) + + return self.on + + def __str__(self): + if not self.on: + return 'OFF' + if self.blink_freq == 0: + return 'ON' + return '((%d))' % self.blink_freq + + +class OutputDevice: + """Ein einzelnes digitales Ausgabegerät""" + + def __init__(self, name: str, o_io: io.Io): + self.name = name + self._io = o_io + self.set_state = OutputState(False) + self._state = False + + self._io.write_output(self.name, False) + + def _write_state(self, state: bool): + if state != self._state: + self._state = state + self._io.write_output(self.name, self._state) + + def update_state(self, ms_ticks: int): + self._write_state(self.set_state.is_on(ms_ticks)) + + +class Outputs(util.StoppableThread): + """ + Outputs ist für die Ausgabegeräte der Bewässerungssteuerung zuständig + (Ventilausgänge und LEDs) + """ + + def __init__(self, o_io: io.Io, task_holder: queue.TaskHolder, + app: util.AppInterface): + super().__init__(0.01) + + self.task_holder = task_holder + self.app = app + self.n_zones = self.app.get_cfg().n_zones + + self.valve_outputs: Dict[int, OutputDevice] = { + i: OutputDevice('VALVE_%d' % i, o_io) + for i in range(1, self.n_zones + 1) + } + self.zone_leds: Dict[int, OutputDevice] = { + i: OutputDevice('LED_Z_%d' % i, o_io) + for i in range(1, self.n_zones + 1) + } + + self.mode_led_auto = OutputDevice('LED_M_AUTO', o_io) + self.mode_led_man = OutputDevice('LED_M_MAN', o_io) + + self._output_devices = [ + *self.valve_outputs.values(), + *self.zone_leds.values(), + self.mode_led_auto, + self.mode_led_man, + ] + + def _get_valve_op(self, valve_id: int) -> OutputDevice: + if valve_id not in self.valve_outputs: + raise Exception('Valve does not exist') + return self.valve_outputs[valve_id] + + def _get_zoneled_op(self, valve_id: int) -> OutputDevice: + if valve_id not in self.zone_leds: + raise Exception('Zone LED does not exist') + return self.zone_leds[valve_id] + + def _set_zone(self, zone_id: int, state: bool): + self._get_valve_op(zone_id).set_state.on = state + self._get_zoneled_op(zone_id).set_state.on = state + + def _set_zone_time(self, zone_id: int, remaining_seconds: int): + is_on = remaining_seconds > 0 + + blink_freq = 0 + if remaining_seconds < 60: + blink_freq = round((60 - remaining_seconds) / 15) + + self._get_valve_op(zone_id).set_state.on = is_on + zone_led = self._get_zoneled_op(zone_id) + zone_led.set_state.on = is_on + zone_led.set_state.blink_freq = blink_freq + + def _set_mode_led_auto(self, enabled: bool, current: bool): + blink_freq = 0 + if current: + blink_freq = 2 + + self.mode_led_auto.set_state.on = enabled + self.mode_led_auto.set_state.blink_freq = blink_freq + + def _set_mode_led_manual(self, current: bool): + self.mode_led_man.set_state.on = current + self.mode_led_man.set_state.blink_freq = 2 + + def _reset_states(self): + for output in self._output_devices: + output.set_state = OutputState(False) + + def _update_states(self): + ms_ticks = util.time_ms() + + for output in self._output_devices: + output.update_state(ms_ticks) + + def reset(self): + self._reset_states() + self._update_states() + + def run_cycle(self): + self._reset_states() + + task = self.task_holder.get_current_task() + if task is not None: + self._set_zone_time(task.zone_id, task.remaining) + self._set_mode_led_manual(task.source == models.Source.MANUAL) + self._set_mode_led_auto(self.app.is_auto_enabled(), + task.source == models.Source.SCHEDULE) + else: + self._set_mode_led_auto(self.app.is_auto_enabled(), False) + + self._update_states() + + def setup(self): + self.reset() + + def cleanup(self): + self.reset() diff --git a/tsgrain_controller/queue.py b/tsgrain_controller/queue.py new file mode 100644 index 0000000..0b1d98b --- /dev/null +++ b/tsgrain_controller/queue.py @@ -0,0 +1,103 @@ +# coding=utf-8 +import logging +from typing import Any, Dict, List, Optional + +from tsgrain_controller import models, util + + +class TaskHolder: + + def get_current_task(self) -> Optional[models.Task]: + pass + + +class TaskQueue(util.StoppableThread, TaskHolder): + + def __init__(self, app: util.AppInterface): + super().__init__(0.1) + self.app = app + + self.tasks: List[models.Task] = list() + self.running_task: Optional[models.Task] = None + + def enqueue(self, task: models.Task, exclusive: bool = False) -> bool: + """ + Fügt der Warteschlange einen neuen Task hinzu. Die Warteschlange + kann nicht mehrere Tasks der selben Quelle und Zone aufnehmen. + Kann ein Task nicht aufgenommen werden, wird False zurückgegeben. + + :param task: Neuer Task + :param exclusive: Wenn True, verbiete mehrere Tasks von der selben Quelle + :return: True wenn Task erfolgreich hinzugefügt + """ + for t in self.tasks: + if t.source == task.source and (exclusive + or t.zone_id == task.zone_id): + return False + + self.tasks.append(task) + logging.info('Task added to queue (%s)', task) + return True + + def get_current_task(self) -> Optional[models.Task]: + """ + Gib den aktuell laufenden Task zurück. + :return: aktuell laufender Task + """ + return self.running_task + + def cancel_task(self, task: models.Task): + self.tasks.remove(task) + logging.info('Task cancelled (%s)', task) + if self.running_task == task: + self.running_task = None + + def cancel_current_task(self): + """ + Bricht den aktuell laufenden Task ab + (z.B. bei manuellem Stopp mittels Taster) + """ + if self.running_task is not None: + self.tasks.remove(self.running_task) + logging.info('Running task cancelled (%s)', self.running_task) + self.running_task = None + + def serialize(self) -> List[models.Task]: + """Task zur Speicherung in der Datenbank in dict umwandeln.""" + return self.tasks + + def serialize_rpc(self) -> Dict[str, Any]: + """Task zur aktuellen Statusübertragung in dict umwandeln.""" + return {'current_time': util.datetime_now(), 'tasks': self.serialize()} + + def run_cycle(self): + # Get a new task if none is running + if self.running_task is None: + for task in self.tasks: + # Only start scheduled tasks if automatic mode is enabled + if task.source == models.Source.SCHEDULE and not self.app.is_auto_enabled( + ): + continue + + self.running_task = task + self.running_task.start() + logging.info('Queued task started (%s)', self.running_task) + break + + # Check currently running task + if self.running_task is not None: + # Stop scheduled tasks if auto mode is disabled + if self.running_task.source == models.Source.SCHEDULE and not self.app.is_auto_enabled( + ): + self.running_task.stop() + logging.info('Running task stopped (%s)', self.running_task) + self.running_task = None + elif self.running_task.is_done: + self.tasks.remove(self.running_task) + logging.info('Running task done (%s)', self.running_task) + self.running_task = None + + def cleanup(self): + if self.running_task: + self.running_task.stop() + self.running_task = None diff --git a/tsgrain_controller/util.py b/tsgrain_controller/util.py new file mode 100644 index 0000000..e7c7c11 --- /dev/null +++ b/tsgrain_controller/util.py @@ -0,0 +1,136 @@ +# coding=utf-8 +import datetime +import json +import logging +import threading +import time +from enum import Enum +from typing import Any, Union + +from tsgrain_controller import config + + +def _get_np_attrs(o) -> dict: + """ + Return all non-protected attributes of the given object. + + :param o: Object + :return: Dict of attributes + """ + return {k: v for k, v in o.__dict__.items() if not k.startswith('_')} + + +def serializer(o: Any) -> Union[str, dict, int, float, bool]: + """ + Serialize object to json-storable format + + :param o: Object to serialize + :return: Serialized output data + """ + if hasattr(o, 'serialize'): + return o.serialize() + elif isinstance(o, datetime.datetime) or isinstance(o, datetime.date): + return o.isoformat() + elif isinstance(o, Enum): + return o.value + elif isinstance(o, int) or isinstance(o, float) or isinstance(o, bool): + return o + elif hasattr(o, '__dict__'): + return _get_np_attrs(o) + return str(o) + + +def to_json(o, pretty=False) -> str: + """ + Convert object to json. + Uses t7he ``serialize()`` method of the target object if available. + + :param o: Object to serialize + :param pretty: Prettify with indents + :return: JSON string + """ + return json.dumps(o, + default=serializer, + indent=2 if pretty else None, + ensure_ascii=False) + + +def to_json_file(o, path): + """ + Convert object to json and writes the result to a file. + Uses the ``serialize()`` method of the target object if available. + + :param o: Object to serialize + :param path: File path + """ + with open(path, 'w', encoding='utf-8') as f: + json.dump(o, f, default=serializer, indent=2, ensure_ascii=False) + + +def time_ms() -> int: + return round(time.time() * 1000) + + +def datetime_now() -> datetime.datetime: + return datetime.datetime.now() + + +def datetime_new(year, + month=None, + day=None, + hour=0, + minute=0, + second=0, + microsecond=0) -> datetime.datetime: + return datetime.datetime(year, month, day, hour, minute, second, + microsecond) + + +class StoppableThread(threading.Thread): + + def __init__(self, interval: float = 1): + super().__init__() + self._interval = interval + self._stop_signal = threading.Event() + + def setup(self): + pass + + def cleanup(self): + pass + + def run_cycle(self): + pass + + def run(self): + self.setup() + + while not self._stop_signal.is_set(): + self.run_cycle() + time.sleep(self._interval) + + self.cleanup() + + def stop(self): + self._stop_signal.set() + self.join() + + +class AppInterface: + + def is_auto_enabled(self) -> bool: + pass + + def get_cfg(self) -> config.Config: + pass + + def get_logger(self) -> logging.Logger: + pass + + +class ZoneUnavailableException(Exception): + pass + + +class TaskRunException(Exception): + pass