Compare commits

..

11 commits

Author SHA1 Message Date
7d72045d4f add jobschedule, MCP23017 library 2022-01-27 23:21:27 +01:00
70caa4664a integrated database, added scheduler 2022-01-27 12:01:43 +01:00
8e051600af add db 2022-01-26 19:54:43 +01:00
3ead68d492 updated queue 2022-01-22 17:23:29 +01:00
a42b106918 attempted refactor of task queue 2022-01-22 12:21:31 +01:00
81a28e9fd3 add output tests 2022-01-22 07:57:18 +01:00
bc69e81bf6 integrated new outputs 2022-01-19 21:18:26 +01:00
e86d434eda finished outputs with blinking capability 2022-01-19 17:24:09 +01:00
ae9e4c7c3d tmp 2022-01-18 14:38:36 +01:00
dd20335717 add queue 2022-01-08 17:19:27 +01:00
91115493f6 add cyra dependency 2022-01-07 12:16:05 +01:00
24 changed files with 1893 additions and 7 deletions

2
.gitignore vendored
View file

@ -3,3 +3,5 @@
/.tox /.tox
__pycache__ __pycache__
*.egg-info *.egg-info
/tsgrain.toml
/raindb.json

View file

@ -26,7 +26,7 @@ wodurch das Bewässerungsprogramm beim Neustart fortgesetzt wird.
5. Ist der Timer abgelaufen, wird der Ventilausgang deaktivert und der Bewässerungsauftrag 5. Ist der Timer abgelaufen, wird der Ventilausgang deaktivert und der Bewässerungsauftrag
aus der Warteschlange entfernt. aus der Warteschlange entfernt.
6. Wird ein Bewässerungsauftrag abgebrochen (entweder durch Tastendruck oder durch 6. Wird ein Bewässerungsauftrag abgebrochen (entweder durch Tastendruck oder durch
Löschen des Zeitplans, muss ebenfalls der Ventilausgang deaktivert und der Löschen des Zeitplans, muss ebenfalls der Ventilausgang deaktiviert und der
Bewässerungsauftrag aus der Warteschlange entfernt werden. Bewässerungsauftrag aus der Warteschlange entfernt werden.
7. Wird der Controller beendet, werden alle laufenden Aufträge angehalten. Die 7. Wird der Controller beendet, werden alle laufenden Aufträge angehalten. Die
Ventilausgänge werden deaktiviert und die gesamte Warteschlange inklusive Ventilausgänge werden deaktiviert und die gesamte Warteschlange inklusive
@ -75,15 +75,21 @@ Der Controller kann über eine GRPC-Schnittstelle mit anderen Anwendungen kommun
Datenmodelle Datenmodelle
============ ============
Source
------
``name``
Name der Quelle: manual, schedule
``priority``
**Priorität:** Priorität der Quelle
Task Task
---- ----
``source`` ``source``
**Quelle:** Zeitplan (mit Zeitplan-ID), Tastendruck **Quelle:** Zeitplan (mit Zeitplan-ID), Tastendruck
``priority``
**Priorität:** Priorität der Bewässerungsaufgabe
``zone: int`` ``zone: int``
ID der Bewässerungszone (Platz) ID der Bewässerungszone (Platz)
@ -97,7 +103,7 @@ Task
Schedule Schedule
-------- --------
``datetime: datetime`` ``date: datetime``
Datum/Uhrzeit Datum/Uhrzeit
``duration: int`` ``duration: int``
@ -108,3 +114,13 @@ Schedule
``repeat: bool`` ``repeat: bool``
Zeitplan täglich wiederholen Zeitplan täglich wiederholen
Konfiguration
=============
``MAX_ACTIVE_ZONES``
in/output pins

View file

@ -1 +1,4 @@
tinydb tinydb~=4.6.1
cyra~=1.0.2
schedule~=1.1.0
smbus~=1.1.post2

View file

@ -1,2 +1,5 @@
pytest pytest
pytest-cov pytest-cov
pytest-mock
py-defer
importlib_resources

View file

@ -23,6 +23,8 @@ setuptools.setup(
py_modules=['tsgrain_controller'], py_modules=['tsgrain_controller'],
install_requires=[ install_requires=[
'tinydb', 'tinydb',
'cyra',
'schedule',
], ],
packages=setuptools.find_packages(exclude=['tests*']), packages=setuptools.find_packages(exclude=['tests*']),
entry_points={ entry_points={

45
tests/fixtures.py Normal file
View file

@ -0,0 +1,45 @@
# coding=utf-8
from typing import Optional, Callable, Dict
from importlib_resources import files
import os
from tsgrain_controller import io, util, config
DIR_TESTFILES = str(files('tests.testfiles').joinpath(''))
FILE_CFG = os.path.join(DIR_TESTFILES, 'tsgrain.toml')
class TestingIo(io.Io):
def __init__(self, cb_manual: Optional[Callable[[int], None]],
cb_mode: Optional[Callable[[], None]]):
self.cb_manual = cb_manual
self.cb_mode = cb_mode
self.outputs: Dict[str, bool] = dict()
def trigger_cb_manual(self, zone_id: int):
if self.cb_manual is not None:
self.cb_manual(zone_id)
def trigger_cb_mode(self):
if self.cb_mode is not None:
self.cb_mode()
def write_output(self, key: str, val: bool):
self.outputs[key] = val
class TestingApp(util.AppInterface):
def __init__(self, db_file=''):
self.auto = False
self.cfg = config.Config(FILE_CFG)
self.cfg.load_file(False)
self.cfg.db_path = db_file
def is_auto_enabled(self) -> bool:
return self.auto
def get_cfg(self) -> config.Config:
return self.cfg

93
tests/test_database.py Normal file
View file

@ -0,0 +1,93 @@
# coding=utf-8
import tempfile
import os
from tests import fixtures
from tsgrain_controller import database, util, queue, models
def test_db_init():
with tempfile.TemporaryDirectory() as td:
dbfile = os.path.join(td, 'raindb.json')
db = database.RainDB(dbfile)
db.close()
assert os.path.isfile(dbfile)
def test_db_jobs():
with tempfile.TemporaryDirectory() as td:
dbfile = os.path.join(td, 'raindb.json')
db = database.RainDB(dbfile)
job1 = models.Job(util.datetime_new(2022, 1, 10, 12, 30), 60, [1, 3],
True, False)
job2 = models.Job(util.datetime_new(2022, 1, 10, 12, 30), 60, [1, 3],
True, True)
db.insert_job(job1)
db.insert_job(job2)
# Reopen database
db.close()
db = database.RainDB(dbfile)
jobs = db.get_jobs()
assert util.to_json(jobs) == '{"1": {"date": "2022-01-10T12:30:00", \
"duration": 60, "zones": [1, 3], "enable": [1, 3], "repeat": false}, "2": \
{"date": "2022-01-10T12:30:00", "duration": 60, "zones": [1, 3], \
"enable": [1, 3], "repeat": true}}'
job2.enable = False
db.update_job(2, job2)
db.delete_job(1)
jobs = db.get_jobs()
assert util.to_json(jobs) == '{"2": {"date": "2022-01-10T12:30:00", \
"duration": 60, "zones": [1, 3], "enable": [1, 3], "repeat": true}}'
def test_db_queue():
app = fixtures.TestingApp()
q = queue.TaskQueue(app)
task1 = models.Task(models.Source.MANUAL, 1, 10)
task2 = models.Task(models.Source.SCHEDULE, 1, 5)
assert q.enqueue(task1)
assert q.enqueue(task2)
with tempfile.TemporaryDirectory() as td:
dbfile = os.path.join(td, 'raindb.json')
app = fixtures.TestingApp(dbfile)
db = database.RainDB(dbfile)
db.store_queue(q)
# Reopen database
db.close()
db = database.RainDB(dbfile)
read_queue = queue.TaskQueue(app)
db.load_queue(read_queue)
assert util.to_json(
read_queue) == '[{"source": "MANUAL", "zone_id": 1, \
"duration": 10, "remaining": 10}, {"source": "SCHEDULE", "zone_id": 1, "duration": 5, \
"remaining": 5}]'
def test_db_auto_mode():
with tempfile.TemporaryDirectory() as td:
dbfile = os.path.join(td, 'raindb.json')
db = database.RainDB(dbfile)
assert db.get_auto_mode() is False
db.set_auto_mode(True)
assert db.get_auto_mode() is True
# Reopen database
db.close()
db = database.RainDB(dbfile)
assert db.get_auto_mode() is True

106
tests/test_output.py Normal file
View file

@ -0,0 +1,106 @@
# coding=utf-8
from typing import Optional
import time
import defer
from unittest import mock
from tests import fixtures
from tsgrain_controller import output, queue, models
class _TaskHolder(queue.TaskHolder):
def __init__(self, task: Optional[models.Task]):
self.task = task
def get_current_task(self) -> Optional[models.Task]:
return self.task
@defer.with_defer
def test_zone_outputs():
io = fixtures.TestingIo(None, None)
task_holder = _TaskHolder(None)
app = fixtures.TestingApp()
op = output.Outputs(io, task_holder, app)
op.start()
defer.defer(op.stop)
time.sleep(0.01)
# Expect all devices initialized to OFF
assert io.outputs == {
'VALVE_1': False,
'VALVE_2': False,
'VALVE_3': False,
'LED_Z_1': False,
'LED_Z_2': False,
'LED_Z_3': False,
'LED_M_AUTO': False,
'LED_M_MAN': False,
}
task_holder.task = models.Task(models.Source.MANUAL, 1, 100)
time.sleep(0.01)
assert io.outputs == {
'VALVE_1': True,
'VALVE_2': False,
'VALVE_3': False,
'LED_Z_1': True,
'LED_Z_2': False,
'LED_Z_3': False,
'LED_M_AUTO': False,
# Manual LED is blinking
'LED_M_MAN': mock.ANY,
}
task_holder.task = None
time.sleep(0.01)
assert io.outputs == {
'VALVE_1': False,
'VALVE_2': False,
'VALVE_3': False,
'LED_Z_1': False,
'LED_Z_2': False,
'LED_Z_3': False,
'LED_M_AUTO': False,
'LED_M_MAN': False,
}
"""
@defer.with_defer
def test_zone_led_blink():
io = fixtures.TestingIo(None, None)
task_holder = _TaskHolder(None)
app = fixtures.TestingApp()
op = output.Outputs(io, task_holder, app)
def get_blink_time(key: str) -> Optional[int]:
start_time = time.time_ns()
first_pulse_ns = 0
old_state = io.outputs[key]
while (time.time_ns() - start_time) < 1e9:
state = io.outputs[key]
if state != old_state:
old_state = state
if first_pulse_ns == 0:
first_pulse_ns = time.time_ns()
else:
return int((time.time_ns() - first_pulse_ns) / 1e6)
return None
op.start()
defer.defer(op.stop)
task_holder.task = models.Task(models.Source.MANUAL, 1, 60)
time.sleep(0.005)
assert get_blink_time('LED_Z_1') is None
task_holder.task = models.Task(models.Source.MANUAL, 1, 30)
time.sleep(0.005)
assert get_blink_time('LED_Z_1') == pytest.approx(250, 0.1)
"""

82
tests/test_queue.py Normal file
View file

@ -0,0 +1,82 @@
# coding=utf-8
import time
from datetime import datetime
from tests import fixtures
from tsgrain_controller import queue, util, models
def test_task():
task = models.Task(models.Source.MANUAL, 1, 10)
assert task.serialize() == {
'source': 'MANUAL',
'zone_id': 1,
'duration': 10,
'remaining': 10
}
assert task.serialize_rpc() == {
'source': 'MANUAL',
'zone_id': 1,
'duration': 10,
'datetime_started': None,
'datetime_finished': None
}
def test_task_started(mocker):
mocker.patch('tsgrain_controller.util.datetime_now',
return_value=datetime(2022, 1, 10, 6, 0))
task = models.Task(models.Source.MANUAL, 1, 10)
task.start()
assert task.serialize() == {
'source': 'MANUAL',
'zone_id': 1,
'duration': 10,
'remaining': 10
}
assert task.serialize_rpc() == {
'source': 'MANUAL',
'zone_id': 1,
'duration': 10,
'datetime_started': datetime(2022, 1, 10, 6, 0),
'datetime_finished': datetime(2022, 1, 10, 6, 0, 10)
}
def test_add_tasks():
app = fixtures.TestingApp()
q = queue.TaskQueue(app)
task1 = models.Task(models.Source.MANUAL, 1, 10)
task1b = models.Task(models.Source.SCHEDULE, 1, 5)
task1c = models.Task(models.Source.MANUAL, 1, 5)
assert q.enqueue(task1)
assert q.enqueue(task1b)
assert not q.enqueue(task1c)
def test_queue_runner():
app = fixtures.TestingApp()
q = queue.TaskQueue(app)
task1 = models.Task(models.Source.MANUAL, 1, 1)
task2 = models.Task(models.Source.MANUAL, 2, 5)
task3 = models.Task(models.Source.SCHEDULE, 2, 10)
assert q.enqueue(task1)
assert q.enqueue(task2)
assert q.enqueue(task3)
q.start()
time.sleep(0.9)
q.stop()
assert util.to_json(q) == \
'[{"source": "MANUAL", "zone_id": 2, "duration": 5, "remaining": 4}, \
{"source": "SCHEDULE", "zone_id": 2, "duration": 10, "remaining": 10}]'

42
tests/test_util.py Normal file
View file

@ -0,0 +1,42 @@
# coding=utf-8
from dataclasses import dataclass
from enum import Enum
import datetime
import pytest
from tsgrain_controller import util
@dataclass
class _TestCls:
name: str
money: int
class _TestClsSerializable(_TestCls):
def serialize(self):
return {'name': 'Little' + self.name, 'money': self.money}
class _TestEnum(Enum):
OPT1 = 1
OPT2 = 2
@pytest.mark.parametrize('o,expected', [
('hello', '"hello"'),
(42, '42'),
({
'k1': 'v1',
'k2': 'v2'
}, '{"k1": "v1", "k2": "v2"}'),
(_TestEnum.OPT2, '2'),
(_TestCls("Mary", 121), '{"name": "Mary", "money": 121}'),
(_TestClsSerializable("Mary",
121), '{"name": "LittleMary", "money": 121}'),
(datetime.datetime(2022, 1, 2, 15, 30, 11), '"2022-01-02T15:30:11"'),
(datetime.date(2022, 1, 2), '"2022-01-02"'),
])
def test_to_json(o, expected):
res = util.to_json(o)
assert res == expected

View file

@ -0,0 +1 @@
n_zones = 3 # Anzahl Bewässerungszonen

View file

@ -1,5 +1,30 @@
import signal
import sys
from tsgrain_controller import application
def run(): def run():
pass console_flag = False
if len(sys.argv) > 1:
x = sys.argv[1]
if x.startswith('c'):
console_flag = True
app = application.Application(console_flag)
def _signal_handler(sig, frame):
app.stop()
print('Exited.')
sys.exit(0)
signal.signal(signal.SIGINT, _signal_handler)
app.start()
signal.pause()
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -0,0 +1,64 @@
# coding=utf-8
import logging
from tsgrain_controller import config, controller, database, io, jobschedule, output, \
queue, util
class Application(util.AppInterface):
def __init__(self, console_flag: bool):
self.logger = logging.getLogger()
self.logger.setLevel(logging.DEBUG)
self.cfg = config.Config('tsgrain.toml')
self.cfg.load_file()
self.db = database.RainDB(self.cfg.db_path)
self.queue = queue.TaskQueue(self)
self.db.load_queue(self.queue)
b_ctrl = controller.ButtonController(self.queue, self.cfg)
self.io = io.new_io(self, console_flag)
self.io.set_callbacks(b_ctrl.cb_manual, self.cb_modekey)
self.outputs = output.Outputs(self.io, self.queue, self)
self.scheduler = jobschedule.Scheduler(self.db, self.queue)
self.auto_en = self.db.get_auto_mode()
def is_auto_enabled(self) -> bool:
return self.auto_en
def get_cfg(self) -> config.Config:
return self.cfg
def get_logger(self) -> logging.Logger:
return self.logger
def cb_modekey(self):
self.auto_en = not self.auto_en
if self.auto_en:
logging.info('Auto mode ON')
else:
logging.info('Auto mode OFF')
def start(self):
self.io.start()
self.outputs.start()
self.queue.start()
self.scheduler.start()
def stop(self):
self.scheduler.stop()
self.queue.stop()
self.outputs.stop()
self.io.stop()
# Store application state
self.db.store_queue(self.queue)
self.db.set_auto_mode(self.auto_en)

View file

@ -0,0 +1,66 @@
# coding=utf-8
import cyra
class Config(cyra.Config):
builder = cyra.ConfigBuilder()
builder.comment('Anzahl Bewaesserungszonen')
n_zones = builder.define('n_zones', 7)
builder.comment('Pfad der Datenbankdatei')
db_path = builder.define('db_path', 'raindb.json')
builder.comment('Manuelle Bewaesserungszeit in Sekunden')
manual_time = builder.define('manual_time', 300)
builder.comment('Debug-Ausgaben loggen')
log_debug = builder.define('log_debug', False)
builder.push('io')
builder.comment('ID des I2C-Bus')
i2c_bus_id = builder.define('i2c_bus_id', 0)
builder.comment(
'GPIO-Pin, mit dem der Interrupt-Pin des MCP23017 verbunden ist')
gpio_interrupt = builder.define('gpio_interrupt', 17)
builder.comment('Entprellzeit in Sekunden')
gpio_delay = builder.define('gpio_delay', 0.05)
builder.pop()
builder.comment('Ausgaenge')
output_devices = builder.define(
'output_devices', {
'VALVE_1': '0x27/B0/!',
'VALVE_2': '0x27/B1/!',
'VALVE_3': '0x27/B2/!',
'VALVE_4': '0x27/B3/!',
'VALVE_5': '0x27/B4/!',
'VALVE_6': '0x27/B5/!',
'VALVE_7': '0x27/B6/!',
'LED_Z_1': '0x27/A0',
'LED_Z_2': '0x27/A1',
'LED_Z_3': '0x27/A2',
'LED_Z_4': '0x27/A3',
'LED_Z_5': '0x27/A4',
'LED_Z_6': '0x27/A5',
'LED_Z_7': '0x27/A6',
'LED_M_AUTO': '0x23/B0',
'LED_M_MAN': '0x23/B1',
})
builder.comment('Eingaenge')
input_devices = builder.define(
'input_devices', {
'BT_Z_1': '0x23/A0/!',
'BT_Z_2': '0x23/A1/!',
'BT_Z_3': '0x23/A2/!',
'BT_Z_4': '0x23/A3/!',
'BT_Z_5': '0x23/A4/!',
'BT_Z_6': '0x23/A5/!',
'BT_Z_7': '0x23/A6/!',
'BT_MODE': '0x23/A7/!',
})

View file

@ -0,0 +1,36 @@
# coding=utf-8
from typing import Callable, Optional
from tsgrain_controller import config, models, queue
class ButtonController:
def __init__(self, task_queue: queue.TaskQueue, cfg: config.Config):
self.task_queue = task_queue
self.cfg = cfg
self.cb_error: Optional[Callable[[], None]] = None
def cb_manual(self, zone_id: int):
current_task = self.task_queue.get_current_task()
# Cancel manually started tasks
if current_task is not None \
and current_task.zone_id == zone_id and current_task.source == models.Source.MANUAL:
self.task_queue.cancel_current_task()
else:
task = models.Task(models.Source.MANUAL, zone_id,
self.cfg.manual_time)
self.task_queue.enqueue(task, True)
class QueuingButtonController(ButtonController):
def cb_manual(self, zone_id: int):
# If a task from this zone is in queue, cancel it
for task in self.task_queue.tasks:
if task.zone_id == zone_id and task.source == models.Source.MANUAL:
self.task_queue.cancel_task(task)
return
task = models.Task(models.Source.MANUAL, zone_id, self.cfg.manual_time)
self.task_queue.enqueue(task)

View file

@ -0,0 +1,115 @@
# coding=utf-8
import tinydb
from typing import Dict, Optional
from tsgrain_controller import queue, util, models
class RainDB:
def __init__(self, db_path: str):
self._db = tinydb.TinyDB(db_path, default=util.serializer)
self._jobs = self._db.table('jobs', cache_size=0)
self._tasks = self._db.table('tasks', cache_size=0)
self._options = self._db.table('options', cache_size=0)
def close(self):
"""Die Datenbank schließen"""
self._db.close()
def get_jobs(self) -> Dict[int, models.Job]:
"""
Gibt alle gespeicherten Bewässerungsjobs zurück
:return: Bewässerungsjobs: dict(id -> models.Job)
"""
res = dict()
for job_data in self._jobs.all():
res[job_data.doc_id] = models.Job.deserialize(job_data)
return res
def get_job(self, job_id: int) -> Optional[models.Job]:
"""
Gibt den Bewässerungsjob mit der gegebenen ID zurück
:param job_id: ID des Bewässerungsjobs
:return: Bewässerungsjob
"""
job = self._jobs.get(None, job_id)
if job is None:
return None
return models.Job.deserialize(job)
def insert_job(self, job: models.Job):
"""
Fügt der Datenbank einen neuen Bewässerungsjob hinzu
:param job: Bewässerungsjob
"""
self._jobs.insert(util.serializer(job))
def update_job(self, job_id: int, job: models.Job):
"""
Aktualisiert einen Bewässerungsjob
:param job_id: ID des Bewässerungsjobs
:param job: Bewässerungsjob
"""
self._jobs.update(util.serializer(job), None, [job_id])
def delete_job(self, job_id: int):
"""
Lösche den Bewässerungsjob mit der gegebenen ID
:param job_id: ID des Bewässerungsjobs
"""
self._jobs.remove(None, [job_id])
def store_queue(self, task_queue: queue.TaskQueue):
"""
Speichere die aktuelle Warteschlange in der Datenbank
:param task_queue: Warteschlange
"""
self.empty_queue()
for task in task_queue.serialize():
self._tasks.insert(util.serializer(task))
def load_queue(self, task_queue: queue.TaskQueue):
"""
Lade die gespeicherten Tasks aus der Datenbank in die Warteschlange
:param task_queue: Warteschlange
"""
for task_data in self._tasks.all():
task = models.Task.deserialize(task_data)
task_queue.tasks.append(task)
self.empty_queue()
def empty_queue(self):
"""Lösche die gespeicherte Warteschlange"""
self._tasks.truncate()
def set_auto_mode(self, state: bool):
"""
Speichere den Status des Automatikmodus
:param state: Automatikstatus
"""
self._options.upsert({
'key': 'auto_mode',
'val': state
},
tinydb.Query().key == 'auto_mode')
def get_auto_mode(self) -> bool:
"""
Rufe den Status des Automatikmodus ab
:return: Automatikstatus
"""
option = self._options.get(tinydb.Query().key == 'auto_mode')
if option is None:
return False
return option.get('val', False)

View file

@ -0,0 +1,32 @@
# coding=utf-8
from typing import Callable, Optional
from tsgrain_controller import util
class Io:
def set_callbacks(self, cb_manual: Optional[Callable[[int], None]],
cb_mode: Optional[Callable[[], None]]):
pass
def start(self):
pass
def stop(self):
pass
def write_output(self, key: str, val: bool):
pass
def new_io(app: util.AppInterface, console_flag: bool) -> Io:
if not console_flag:
try:
from tsgrain_controller.io import mcp23017 as io_mod
except ImportError:
from tsgrain_controller.io import console as io_mod
else:
from tsgrain_controller.io import console as io_mod
return io_mod.Io(app)

View file

@ -0,0 +1,113 @@
# coding=utf-8
import curses
import logging
import math
from typing import Callable, Dict, Optional
from tsgrain_controller import io, util
class CursesHandler(logging.Handler):
def __init__(self, screen):
logging.Handler.__init__(self)
self.screen = screen
self.screen.scrollok(True)
self.screen.idlok(True)
self.screen.leaveok(True)
self.screen.refresh()
def emit(self, record):
try:
msg = self.format(record)
screen = self.screen
fs = "\n%s"
screen.addstr(fs % msg)
screen.refresh()
except (KeyboardInterrupt, SystemExit):
raise
except: # noqa: E722
self.handleError(record)
class Io(util.StoppableThread, io.Io):
def __init__(self, app: util.AppInterface):
super().__init__(0.01)
self.app = app
self.cb_manual: Optional[Callable[[int], None]] = None
self.cb_mode: Optional[Callable[[], None]] = None
self._screen: Optional = None
self._outputs: Dict[str, bool] = dict()
def set_callbacks(self, cb_manual: Optional[Callable[[int], None]],
cb_mode: Optional[Callable[[], None]]):
self.cb_manual = cb_manual
self.cb_mode = cb_mode
def setup(self):
screen = curses.initscr()
curses.noecho()
curses.curs_set(0)
screen.nodelay(True)
max_y, max_x = screen.getmaxyx()
width_half = math.floor(max_x / 2)
self._screen = curses.newwin(max_y, width_half, 0, 0)
win_log = curses.newwin(max_y, width_half, 0, width_half)
formatter_display = logging.Formatter(
'%(asctime)-8s|%(levelname)-5s| %(message)-s', '%H:%M:%S')
mh = CursesHandler(win_log)
mh.setFormatter(formatter_display)
self.app.get_logger().handlers = []
self.app.get_logger().addHandler(mh)
self._screen.nodelay(True)
logging.info('Window size: max_x=%d, max_y=%d, width_half=%d', max_x,
max_y, width_half)
def _trigger_cb_manual(self, zone_id: int):
if self.cb_manual is not None:
self.cb_manual(zone_id)
def _trigger_cb_mode(self):
if self.cb_mode is not None:
self.cb_mode()
def write_output(self, key: str, val: bool):
self._outputs[key] = val
def run_cycle(self):
c = self._screen.getch()
def state_str(state: bool) -> str:
if state:
return ''
else:
return ''
# Mode key (0)
if c == 48:
self._trigger_cb_mode()
# Zone keys (1-7)
elif 49 <= c <= 55:
self._trigger_cb_manual(c - 48)
self._screen.erase()
self._screen.addstr(0, 0,
'Buttons: 1-7: Manual control, 0: Auto on/off')
i = 1
for key, output in self._outputs.items():
self._screen.addstr(i, 0, '%s: %s' % (key, state_str(output)))
i += 1
def cleanup(self):
curses.echo()
curses.curs_set(1)
curses.endwin()

View file

@ -0,0 +1,448 @@
# coding=utf-8
import logging
import time
from dataclasses import dataclass
from enum import Enum
from typing import Callable, Dict, Optional, Tuple
import RPi.GPIO as GPIO
import smbus
from tsgrain_controller import io, util
# MCP-Register
# Quelle: https://ww1.microchip.com/downloads/en/devicedoc/20001952c.pdf
# Seite 16, Table 3-3, 3-5
MCP_IODIRA = 0x00
"""
I/O-Modus pro GPIOA-Pin (standardmäßig ``1`` / Eingang)
- ``1`` Eingang
- ``0`` Ausgang
"""
MCP_IODIRB = 0x01
"""
I/O-Modus pro GPIOB-Pin (standardmäßig ``1`` / Eingang)
- ``1`` Eingang
- ``0`` Ausgang
"""
MCP_IPOLA = 0x02
"""
Polarität pro GPIOA-Pin.
- ``1`` Low wenn aktiv
- ``0`` High wenn aktiv
Die Einstellung hatte im Test nur bei als Eingängen konfigurierten Pins
einen Effekt. Bei Ausgängen wird stattdessen der gesetzte Wert invertiert.
"""
MCP_IPOLB = 0x03
"""Polarität pro GPIOB-Pin. Invertiert wenn Bit gesetzt."""
MCP_GPINTENA = 0x04
"""
Aktiviert einen Interrupt bei Zustandswechsel eines GPIOA-Pins.
Das Auslöen von Interrupts
erfordert das Setzen der Register ``DEFVAL`` und ``INTCON``.
"""
MCP_GPINTENB = 0x05
"""
Aktiviert einen Interrupt bei Zustandswechsel eines GPIOB-Pins.
Das Auslöen von Interrupts
erfordert das Setzen der Register ``DEFVAL`` und ``INTCON``.
"""
MCP_DEFVALA = 0x06
"""
Vergleichsregister für Interrupt bei Zustandswechsel eines GPIOA-Pins.
Wenn mittels ``GPINTEN`` und ``INTCON``-Register ein vergleichender
Interrupt konfiguriert wurde, verursacht der Wechsel des Pin-Zustands
auf den gegenteiligen Wert von ``DEFVAL`` einen Interrupt.
"""
MCP_DEFVALB = 0x07
"""
Vergleichsregister für Interrupt bei Zustandswechsel eines GPIOB-Pins.
Wenn mittels ``GPINTEN`` und ``INTCON``-Register ein vergleichender
Interrupt konfiguriert wurde, verursacht der Wechsel des Pin-Zustands
auf den gegenteiligen Wert von ``DEFVAL`` einen Interrupt.
"""
MCP_INTCONA = 0x08
"""
Wahl des Interruptmodus für GPIOA.
- ``1`` Der Wert des Pins wird mit dem entsprechenden Bit im ``DEFVAL``-Register
verglichen. Beim gegenteiligen Wert erfolgt ein Interrupt.
- ``0`` Ein Interrupt wird bei jeder Zustandsänderung des Pins ausgelöst.
"""
MCP_INTCONB = 0x09
"""
Wahl des Interruptmodus für GPIOB.
- ``1`` Der Wert des Pins wird mit dem entsprechenden Bit im ``DEFVAL``-Register
verglichen. Beim gegenteiligen Wert erfolgt ein Interrupt.
- ``0`` Ein Interrupt wird bei jeder Zustandsänderung des Pins ausgelöst.
"""
MCP_IOCON = 0x0a
"""
Konfiguration für den I2C-Portexpander.
Bit ``7``: **BANK**
Ändert die Adressierung der Register. Dieses Programm verwendet die
Standardkonfiguration 0, also nicht ändern.
Bit ``6``: **MIRROR**
Wenn gesetzt, sind beide Interrupt-Pins ``INTA`` und ``INTB`` miteinander
verodert, d.h. ein Interrupt auf einem der zwei Ports aktiviert beide Pins.
Bit ``5``: **SEQOP**
Sequenzieller Modus, inkrementiert den Adresszähler bei jedem Zugriff.
- ``1`` Sequenzieller Modus deaktiviert
- ``0`` Sequenzieller Modus aktiviert
Bit ``4``: **DISSLW**
Slew-Rate control
Bit ``3``: nicht implementiert
Bit ``2``: **ODR**
Konfiguriert den Interruptpin als Open-Drain-Ausgang, wenn gesetzt.
Wenn nicht gesetzt, ist der Interruptpin ein aktiv treibender Ausgang
mit der in Bit ``1`` festgelegten Polarität.
Bit ``1``: **INTPOL**
Polarität des Interrupt-Pins
- ``1`` High wenn aktiv
- ``0`` Low wenn aktiv
Bit ``0``: nicht implementiert
"""
MCP_GPPUA = 0x0c
"""GPIOA-Pullup-Widerstandsregister. 100k-Pullup aktiviert, wenn Bit gesetzt."""
MCP_GPPUB = 0x0d
"""GPIOB-Pullup-Widerstandsregister. 100k-Pullup aktiviert, wenn Bit gesetzt."""
MCP_INTFA = 0x0e
"""
Interrupt-Flag-Register. Wurde ein Interrupt auf GPIOA ausgelöst, ist
das entsprechende Bit in diesem Register gesetzt. Read-only.
"""
MCP_INTFB = 0x0f
"""
Interrupt-Flag-Register. Wurde ein Interrupt auf GPIOB ausgelöst, ist
das entsprechende Bit in diesem Register gesetzt. Read-only.
"""
MCP_INTCAPA = 0x10
"""
Interrupt-Capture-Register. Wurde ein Interrupt auf GPIOA ausgelöst,
speichert dieses Register den entsprechenden Wert.
"""
MCP_INTCAPB = 0x11
"""
Interrupt-Capture-Register. Wurde ein Interrupt auf GPIOB ausgelöst,
speichert dieses Register den entsprechenden Wert.
"""
MCP_GPIOA = 0x12
"""
GPIOA-Portregister. Wert entspricht dem Zustand der GPIO-Pins.
Wird in dieses Register geschrieben, wird das ``OLAT``-Register verändert.
"""
MCP_GPIOB = 0x13
"""
GPIOA-Portregister. Wert entspricht dem Zustand der GPIO-Pins.
Wird in dieses Register geschrieben, wird das ``OLAT``-Register verändert.
"""
MCP_OLATA = 0x14
"""
GPIOA-Output-Latch-Register. Wert entspricht dem Zustand der Output-Latches,
die wiederum die als Output konfigurierten Pins ansteuern.
"""
MCP_OLATB = 0x15
"""
GPIOB-Output-Latch-Register. Wert entspricht dem Zustand der Output-Latches,
die wiederum die als Output konfigurierten Pins ansteuern.
"""
class PinConfigInvalid(Exception):
def __init__(self, cfg_str: str):
super().__init__('MCP23017 pin config %s invalid' % cfg_str)
class _MCP23017Port(Enum):
A = 0
B = 1
def reg(self, reg_a: int) -> int:
return reg_a + self.value
@dataclass
class _MCP23017Device:
i2c_address: int
port: _MCP23017Port
pin: int
invert: bool
@classmethod
def from_config(cls, cfg_str: str):
cfg_parts = cfg_str.split('/')
if len(cfg_parts) < 2:
raise PinConfigInvalid(cfg_str)
i2c_addr_str = cfg_parts[0]
port_str = cfg_parts[1][0]
pin_str = cfg_parts[1][1:]
try:
i2c_addr = int(i2c_addr_str, 16)
except ValueError:
raise PinConfigInvalid(cfg_str)
try:
port = _MCP23017Port[port_str]
except KeyError:
raise PinConfigInvalid(cfg_str)
try:
pin = int(pin_str)
except ValueError:
raise PinConfigInvalid(cfg_str)
invert = False
if len(cfg_parts) >= 3:
attr_str = cfg_parts[2]
invert = '!' in attr_str
return cls(i2c_addr, port, pin, invert)
class Io(io.Io):
def __init__(self, app: util.AppInterface):
self.cb_manual: Optional[Callable[[int], None]] = None
self.cb_mode: Optional[Callable[[], None]] = None
self.cfg = app.get_cfg()
self.bus = smbus.SMBus(self.cfg.i2c_bus_id)
self.i2c_cache: Dict[Tuple[int, int], int] = dict()
self.mcp_addresses = set()
self.output_devices: Dict[str, _MCP23017Device] = dict()
self.input_devices: Dict[str, _MCP23017Device] = dict()
# Parse config
for key, cfg_str in self.cfg.input_devices.items():
device = _MCP23017Device.from_config(cfg_str)
self.mcp_addresses.add(device.i2c_address)
self.input_devices[key] = device
for key, cfg_str in self.cfg.output_devices.items():
device = _MCP23017Device.from_config(cfg_str)
self.mcp_addresses.add(device.i2c_address)
self.output_devices[key] = device
def _trigger_cb_manual(self, zone_id: int):
if self.cb_manual is not None:
self.cb_manual(zone_id)
def _trigger_cb_mode(self):
if self.cb_mode is not None:
self.cb_mode()
def set_callbacks(self, cb_manual: Optional[Callable[[int], None]],
cb_mode: Optional[Callable[[], None]]):
self.cb_manual = cb_manual
self.cb_mode = cb_mode
def _i2c_read_byte(self,
i2c_address: int,
register: int,
use_cache=False) -> int:
key = (i2c_address, register)
if use_cache and key in self.i2c_cache:
return self.i2c_cache[key]
data = self.bus.read_byte_data(i2c_address, register)
if use_cache:
self.i2c_cache[key] = data
return data
def _i2c_write_byte(self, i2c_address: int, register: int, value: int):
self.bus.write_byte_data(i2c_address, register, value)
def _i2c_read_bit(self,
i2c_address: int,
register: int,
bit: int,
use_cache=False):
data = self._i2c_read_byte(i2c_address, register, use_cache)
bitmask = 1 << bit
return bool(data & bitmask)
def _i2c_write_bit(self, i2c_address: int, register: int, bit: int,
value: bool):
data = self._i2c_read_byte(i2c_address, register)
bitmask = 1 << bit
if value:
data |= bitmask # Maskiertes Bit setzen
else:
data &= ~bitmask # Maskiertes Bit löschen
self._i2c_write_byte(i2c_address, register, data)
def _i2c_clear_cache(self):
self.i2c_cache = dict()
def _configure_mcp(self, i2c_address: int):
self._i2c_write_byte(i2c_address, MCP_IODIRA, 0xff)
self._i2c_write_byte(i2c_address, MCP_IODIRB, 0xff)
self._i2c_write_byte(i2c_address, MCP_IPOLA, 0)
self._i2c_write_byte(i2c_address, MCP_IPOLB, 0)
self._i2c_write_byte(i2c_address, MCP_GPINTENA, 0)
self._i2c_write_byte(i2c_address, MCP_GPINTENB, 0)
self._i2c_write_byte(i2c_address, MCP_DEFVALA, 0)
self._i2c_write_byte(i2c_address, MCP_DEFVALB, 0)
self._i2c_write_byte(i2c_address, MCP_INTCONA, 0)
self._i2c_write_byte(i2c_address, MCP_INTCONB, 0)
self._i2c_write_byte(i2c_address, MCP_IOCON, 0b01000010)
self._i2c_write_byte(i2c_address, MCP_GPPUA, 0)
self._i2c_write_byte(i2c_address, MCP_GPPUB, 0)
self._i2c_write_byte(i2c_address, MCP_OLATA, 0)
self._i2c_write_byte(i2c_address, MCP_OLATB, 0)
def _configure_input_device(self, device: _MCP23017Device):
if device.invert:
self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_IPOLA),
device.pin, True)
self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_GPINTENA),
device.pin, True)
def _configure_output_device(self, device: _MCP23017Device):
if device.invert:
# self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_IPOLA),
# device.pin, True)
self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_OLATA),
device.pin, True)
self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_IODIRA),
device.pin, False)
def _read_interrupt(self) -> Optional[str]:
self._i2c_clear_cache()
for key, device in self.input_devices.items():
if self._i2c_read_bit(device.i2c_address,
device.port.reg(MCP_INTFA), device.pin,
True):
return key
return None
def _read_inputs(self) -> Dict[str, bool]:
res = dict()
self._i2c_clear_cache()
for key, device in self.input_devices.items():
res[key] = self._i2c_read_bit(device.i2c_address,
device.port.reg(MCP_GPIOA),
device.pin, True)
return res
def _interrupt_handler(self, int_pin: int):
key = self._read_interrupt()
if key is None:
return
time.sleep(self.cfg.gpio_delay)
input_states = self._read_inputs()
if key not in input_states:
return
if input_states[key]:
logging.debug('%s pressed', key)
else:
logging.debug('%s released', key)
if key == 'BT_MODE':
self._trigger_cb_mode()
elif key.startswith('BT_Z_'):
zoneid_str = key[5:]
try:
zoneid = int(zoneid_str)
except ValueError:
return
self._trigger_cb_manual(zoneid)
def write_output(self, key: str, val: bool):
device = self.output_devices[key]
if device.invert:
val = not val
self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_OLATA),
device.pin, val)
def start(self):
for i2c_address in self.mcp_addresses:
self._configure_mcp(i2c_address)
for device in self.input_devices.values():
self._configure_input_device(device)
for device in self.output_devices.values():
self._configure_output_device(device)
# clear interrupts by reading inputs
self._read_inputs()
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(self.cfg.gpio_interrupt,
GPIO.IN,
pull_up_down=GPIO.PUD_DOWN)
GPIO.add_event_detect(self.cfg.gpio_interrupt,
GPIO.RISING,
callback=self._interrupt_handler,
bouncetime=10)
def stop(self):
GPIO.cleanup()

View file

@ -0,0 +1,38 @@
# coding=utf-8
from datetime import datetime
from typing import Optional
import schedule
from tsgrain_controller import database, models, queue, util
class Scheduler(util.StoppableThread):
def __init__(self, db: database.RainDB, task_queue: queue.TaskQueue):
super().__init__(1)
self.db = db
self.queue = task_queue
self._job: Optional[schedule.Job] = None
def _minute_handler(self):
jobs = self.db.get_jobs()
for job in jobs.values():
if job.check(datetime.now()):
for zone in job.zones:
task = models.Task(models.Source.SCHEDULE, zone,
job.duration)
self.queue.enqueue(task)
def start(self):
self._job = schedule.every().minute.at(':00').do(self._minute_handler)
super().start()
def run_cycle(self):
schedule.run_pending()
def stop(self):
super().stop()
if self._job:
schedule.cancel_job(self._job)
self._job = None

View file

@ -0,0 +1,164 @@
# coding=utf-8
import uuid
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import List, Optional, Dict, Any
from tsgrain_controller import util
@dataclass
class Job:
date: datetime
duration: int
zones: List[int]
enable: bool
repeat: bool
@property
def is_active(self) -> bool:
if not self.enable:
return False
if self.repeat:
return True
return self.date > util.datetime_now()
def check(self, date_now: datetime) -> bool:
if not self.enable:
return False
if self.repeat:
check_datetime = datetime.combine(date_now.date(),
self.date.time())
else:
check_datetime = self.date
return date_now - check_datetime < timedelta(minutes=1)
@classmethod
def deserialize(cls, data: dict) -> 'Job':
return cls(
date=datetime.fromisoformat(data['date']),
duration=data['duration'],
zones=data['zones'],
enable=data['zones'],
repeat=data['repeat'],
)
class Source(Enum):
MANUAL = 2
SCHEDULE = 1
@dataclass
class Task:
source: Source
"""Quelle des Tasks (Manuell/Zeitplan)"""
zone_id: int
"""Nummer der Zone"""
duration: int
"""Beregnungsdauer in Sekunden"""
_remaining: int = 0
"""Interne Variable, um die verbleibende Zeit eines gestoppten Tasks zu speichern"""
_id: int = 0
datetime_started: Optional[datetime] = None
"""Zeitpunkt, wann der Task gestartet wurde"""
def __post_init__(self):
self._remaining = self.duration
self._id = uuid.uuid1().int
@property
def is_running(self) -> bool:
"""
:return: True falls der Task momentan läuft
"""
return self.datetime_started is not None
@property
def remaining(self) -> int:
"""
:return: Verbleibende Zeit in Sekunden
"""
if not self.is_running:
return self._remaining
d = self.datetime_finished - util.datetime_now()
return d.seconds
@property
def is_done(self) -> bool:
"""
:return: True wenn der Task bereits abgeschlossen ist.
"""
return self.remaining <= 0
@property
def datetime_finished(self) -> Optional[datetime]:
"""
:return: Zeitpunkt, zu dem der Task abgeschlossen sein wird.
None falls der Task momentan nicht läuft.
"""
if self.datetime_started is None:
return None
return self.datetime_started + timedelta(seconds=self._remaining)
def start(self):
"""Startet den Task zur aktuellen Zeit."""
if self.is_running:
raise util.TaskRunException('already running')
self.datetime_started = util.datetime_now()
def stop(self):
"""Stoppt den Task und speichert die verbleibende Zeit"""
if not self.is_running:
raise util.TaskRunException('not running')
self._remaining = self.remaining
self.datetime_started = None
def serialize(self) -> Dict[str, Any]:
"""Task zur Speicherung in der Datenbank in dict umwandeln."""
return {
'source': self.source.name,
'zone_id': self.zone_id,
'duration': self.duration,
'remaining': self.remaining
}
def serialize_rpc(self) -> Dict[str, Any]:
"""Task zur aktuellen Statusübertragung in dict umwandeln."""
return {
'source': self.source.name,
'zone_id': self.zone_id,
'duration': self.duration,
'datetime_started': self.datetime_started,
'datetime_finished': self.datetime_finished
}
@classmethod
def deserialize(cls, data: dict) -> 'Task':
task = cls(source=Source[data['source']],
zone_id=data['zone_id'],
duration=data['duration'])
task._remaining = data['remaining']
return task
def __eq__(self, other: 'Task') -> bool:
return self._id == other._id
def __hash__(self) -> int:
return hash(self._id)
def __str__(self):
return 'ZONE %d: %ds (%s)' % (self.zone_id, self.duration,
self.source.name)

View file

@ -0,0 +1,151 @@
from dataclasses import dataclass
from typing import Dict
from tsgrain_controller import io, util, queue, models
@dataclass
class OutputState:
on: bool
# Blinks/second
blink_freq: float = 0
def is_on(self, ms_ticks: int) -> bool:
if self.on and self.blink_freq > 0:
period = 1000 / self.blink_freq
period_progress = ms_ticks % period
return period_progress < (period / 2)
return self.on
def __str__(self):
if not self.on:
return 'OFF'
if self.blink_freq == 0:
return 'ON'
return '((%d))' % self.blink_freq
class OutputDevice:
"""Ein einzelnes digitales Ausgabegerät"""
def __init__(self, name: str, o_io: io.Io):
self.name = name
self._io = o_io
self.set_state = OutputState(False)
self._state = False
self._io.write_output(self.name, False)
def _write_state(self, state: bool):
if state != self._state:
self._state = state
self._io.write_output(self.name, self._state)
def update_state(self, ms_ticks: int):
self._write_state(self.set_state.is_on(ms_ticks))
class Outputs(util.StoppableThread):
"""
Outputs ist für die Ausgabegeräte der Bewässerungssteuerung zuständig
(Ventilausgänge und LEDs)
"""
def __init__(self, o_io: io.Io, task_holder: queue.TaskHolder,
app: util.AppInterface):
super().__init__(0.01)
self.task_holder = task_holder
self.app = app
self.n_zones = self.app.get_cfg().n_zones
self.valve_outputs: Dict[int, OutputDevice] = {
i: OutputDevice('VALVE_%d' % i, o_io)
for i in range(1, self.n_zones + 1)
}
self.zone_leds: Dict[int, OutputDevice] = {
i: OutputDevice('LED_Z_%d' % i, o_io)
for i in range(1, self.n_zones + 1)
}
self.mode_led_auto = OutputDevice('LED_M_AUTO', o_io)
self.mode_led_man = OutputDevice('LED_M_MAN', o_io)
self._output_devices = [
*self.valve_outputs.values(),
*self.zone_leds.values(),
self.mode_led_auto,
self.mode_led_man,
]
def _get_valve_op(self, valve_id: int) -> OutputDevice:
if valve_id not in self.valve_outputs:
raise Exception('Valve does not exist')
return self.valve_outputs[valve_id]
def _get_zoneled_op(self, valve_id: int) -> OutputDevice:
if valve_id not in self.zone_leds:
raise Exception('Zone LED does not exist')
return self.zone_leds[valve_id]
def _set_zone(self, zone_id: int, state: bool):
self._get_valve_op(zone_id).set_state.on = state
self._get_zoneled_op(zone_id).set_state.on = state
def _set_zone_time(self, zone_id: int, remaining_seconds: int):
is_on = remaining_seconds > 0
blink_freq = 0
if remaining_seconds < 60:
blink_freq = round((60 - remaining_seconds) / 15)
self._get_valve_op(zone_id).set_state.on = is_on
zone_led = self._get_zoneled_op(zone_id)
zone_led.set_state.on = is_on
zone_led.set_state.blink_freq = blink_freq
def _set_mode_led_auto(self, enabled: bool, current: bool):
blink_freq = 0
if current:
blink_freq = 2
self.mode_led_auto.set_state.on = enabled
self.mode_led_auto.set_state.blink_freq = blink_freq
def _set_mode_led_manual(self, current: bool):
self.mode_led_man.set_state.on = current
self.mode_led_man.set_state.blink_freq = 2
def _reset_states(self):
for output in self._output_devices:
output.set_state = OutputState(False)
def _update_states(self):
ms_ticks = util.time_ms()
for output in self._output_devices:
output.update_state(ms_ticks)
def reset(self):
self._reset_states()
self._update_states()
def run_cycle(self):
self._reset_states()
task = self.task_holder.get_current_task()
if task is not None:
self._set_zone_time(task.zone_id, task.remaining)
self._set_mode_led_manual(task.source == models.Source.MANUAL)
self._set_mode_led_auto(self.app.is_auto_enabled(),
task.source == models.Source.SCHEDULE)
else:
self._set_mode_led_auto(self.app.is_auto_enabled(), False)
self._update_states()
def setup(self):
self.reset()
def cleanup(self):
self.reset()

103
tsgrain_controller/queue.py Normal file
View file

@ -0,0 +1,103 @@
# coding=utf-8
import logging
from typing import Any, Dict, List, Optional
from tsgrain_controller import models, util
class TaskHolder:
def get_current_task(self) -> Optional[models.Task]:
pass
class TaskQueue(util.StoppableThread, TaskHolder):
def __init__(self, app: util.AppInterface):
super().__init__(0.1)
self.app = app
self.tasks: List[models.Task] = list()
self.running_task: Optional[models.Task] = None
def enqueue(self, task: models.Task, exclusive: bool = False) -> bool:
"""
Fügt der Warteschlange einen neuen Task hinzu. Die Warteschlange
kann nicht mehrere Tasks der selben Quelle und Zone aufnehmen.
Kann ein Task nicht aufgenommen werden, wird False zurückgegeben.
:param task: Neuer Task
:param exclusive: Wenn True, verbiete mehrere Tasks von der selben Quelle
:return: True wenn Task erfolgreich hinzugefügt
"""
for t in self.tasks:
if t.source == task.source and (exclusive
or t.zone_id == task.zone_id):
return False
self.tasks.append(task)
logging.info('Task added to queue (%s)', task)
return True
def get_current_task(self) -> Optional[models.Task]:
"""
Gib den aktuell laufenden Task zurück.
:return: aktuell laufender Task
"""
return self.running_task
def cancel_task(self, task: models.Task):
self.tasks.remove(task)
logging.info('Task cancelled (%s)', task)
if self.running_task == task:
self.running_task = None
def cancel_current_task(self):
"""
Bricht den aktuell laufenden Task ab
(z.B. bei manuellem Stopp mittels Taster)
"""
if self.running_task is not None:
self.tasks.remove(self.running_task)
logging.info('Running task cancelled (%s)', self.running_task)
self.running_task = None
def serialize(self) -> List[models.Task]:
"""Task zur Speicherung in der Datenbank in dict umwandeln."""
return self.tasks
def serialize_rpc(self) -> Dict[str, Any]:
"""Task zur aktuellen Statusübertragung in dict umwandeln."""
return {'current_time': util.datetime_now(), 'tasks': self.serialize()}
def run_cycle(self):
# Get a new task if none is running
if self.running_task is None:
for task in self.tasks:
# Only start scheduled tasks if automatic mode is enabled
if task.source == models.Source.SCHEDULE and not self.app.is_auto_enabled(
):
continue
self.running_task = task
self.running_task.start()
logging.info('Queued task started (%s)', self.running_task)
break
# Check currently running task
if self.running_task is not None:
# Stop scheduled tasks if auto mode is disabled
if self.running_task.source == models.Source.SCHEDULE and not self.app.is_auto_enabled(
):
self.running_task.stop()
logging.info('Running task stopped (%s)', self.running_task)
self.running_task = None
elif self.running_task.is_done:
self.tasks.remove(self.running_task)
logging.info('Running task done (%s)', self.running_task)
self.running_task = None
def cleanup(self):
if self.running_task:
self.running_task.stop()
self.running_task = None

136
tsgrain_controller/util.py Normal file
View file

@ -0,0 +1,136 @@
# coding=utf-8
import datetime
import json
import logging
import threading
import time
from enum import Enum
from typing import Any, Union
from tsgrain_controller import config
def _get_np_attrs(o) -> dict:
"""
Return all non-protected attributes of the given object.
:param o: Object
:return: Dict of attributes
"""
return {k: v for k, v in o.__dict__.items() if not k.startswith('_')}
def serializer(o: Any) -> Union[str, dict, int, float, bool]:
"""
Serialize object to json-storable format
:param o: Object to serialize
:return: Serialized output data
"""
if hasattr(o, 'serialize'):
return o.serialize()
elif isinstance(o, datetime.datetime) or isinstance(o, datetime.date):
return o.isoformat()
elif isinstance(o, Enum):
return o.value
elif isinstance(o, int) or isinstance(o, float) or isinstance(o, bool):
return o
elif hasattr(o, '__dict__'):
return _get_np_attrs(o)
return str(o)
def to_json(o, pretty=False) -> str:
"""
Convert object to json.
Uses t7he ``serialize()`` method of the target object if available.
:param o: Object to serialize
:param pretty: Prettify with indents
:return: JSON string
"""
return json.dumps(o,
default=serializer,
indent=2 if pretty else None,
ensure_ascii=False)
def to_json_file(o, path):
"""
Convert object to json and writes the result to a file.
Uses the ``serialize()`` method of the target object if available.
:param o: Object to serialize
:param path: File path
"""
with open(path, 'w', encoding='utf-8') as f:
json.dump(o, f, default=serializer, indent=2, ensure_ascii=False)
def time_ms() -> int:
return round(time.time() * 1000)
def datetime_now() -> datetime.datetime:
return datetime.datetime.now()
def datetime_new(year,
month=None,
day=None,
hour=0,
minute=0,
second=0,
microsecond=0) -> datetime.datetime:
return datetime.datetime(year, month, day, hour, minute, second,
microsecond)
class StoppableThread(threading.Thread):
def __init__(self, interval: float = 1):
super().__init__()
self._interval = interval
self._stop_signal = threading.Event()
def setup(self):
pass
def cleanup(self):
pass
def run_cycle(self):
pass
def run(self):
self.setup()
while not self._stop_signal.is_set():
self.run_cycle()
time.sleep(self._interval)
self.cleanup()
def stop(self):
self._stop_signal.set()
self.join()
class AppInterface:
def is_auto_enabled(self) -> bool:
pass
def get_cfg(self) -> config.Config:
pass
def get_logger(self) -> logging.Logger:
pass
class ZoneUnavailableException(Exception):
pass
class TaskRunException(Exception):
pass