80 lines
1.9 KiB
Python
80 lines
1.9 KiB
Python
# coding=utf-8
|
|
from typing import Optional
|
|
import time
|
|
import defer
|
|
from unittest import mock
|
|
|
|
from tests import fixtures
|
|
from tsgrain_controller import output, task_queue, models
|
|
|
|
|
|
class _TaskHolder(task_queue.TaskHolder):
|
|
|
|
def __init__(self, task: Optional[models.Task]):
|
|
self.task = task
|
|
|
|
def get_current_task(self) -> Optional[models.Task]:
|
|
return self.task
|
|
|
|
|
|
@defer.with_defer
|
|
def test_zone_outputs():
|
|
io = fixtures.TestingIo()
|
|
task_holder = _TaskHolder(None)
|
|
app = fixtures.TestingApp()
|
|
op = output.Outputs(io, task_holder, app)
|
|
op.start()
|
|
defer.defer(op.stop)
|
|
|
|
time.sleep(0.02)
|
|
|
|
# Expect all devices initialized to OFF
|
|
assert io.outputs == {
|
|
'VALVE_1': False,
|
|
'VALVE_2': False,
|
|
'VALVE_3': False,
|
|
'VALVE_4': False,
|
|
'VALVE_5': False,
|
|
'LED_Z_1': False,
|
|
'LED_Z_2': False,
|
|
'LED_Z_3': False,
|
|
'LED_Z_4': False,
|
|
'LED_Z_5': False,
|
|
'LED_M_AUTO': False,
|
|
'LED_M_MAN': False,
|
|
}
|
|
|
|
task_holder.task = models.Task(models.Source.MANUAL, 1, 100)
|
|
time.sleep(0.02)
|
|
assert io.outputs == {
|
|
'VALVE_1': True,
|
|
'VALVE_2': False,
|
|
'VALVE_3': False,
|
|
'VALVE_4': False,
|
|
'VALVE_5': False,
|
|
'LED_Z_1': True,
|
|
'LED_Z_2': False,
|
|
'LED_Z_3': False,
|
|
'LED_Z_4': False,
|
|
'LED_Z_5': False,
|
|
'LED_M_AUTO': False,
|
|
# Manual LED is blinking
|
|
'LED_M_MAN': mock.ANY,
|
|
}
|
|
|
|
task_holder.task = None
|
|
time.sleep(0.02)
|
|
assert io.outputs == {
|
|
'VALVE_1': False,
|
|
'VALVE_2': False,
|
|
'VALVE_3': False,
|
|
'VALVE_4': False,
|
|
'VALVE_5': False,
|
|
'LED_Z_1': False,
|
|
'LED_Z_2': False,
|
|
'LED_Z_3': False,
|
|
'LED_Z_4': False,
|
|
'LED_Z_5': False,
|
|
'LED_M_AUTO': False,
|
|
'LED_M_MAN': False,
|
|
}
|