535 lines
15 KiB
Python
535 lines
15 KiB
Python
# coding=utf-8
|
|
import logging
|
|
import time
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
from typing import Dict, Optional, Tuple
|
|
|
|
from RPi import GPIO # pylint: disable=import-error
|
|
import smbus
|
|
|
|
from tsgrain_controller import io, models
|
|
|
|
# MCP-Register
|
|
# Quelle: https://ww1.microchip.com/downloads/en/devicedoc/20001952c.pdf
|
|
# Seite 16, Table 3-3, 3-5
|
|
|
|
MCP_IODIRA = 0x00
|
|
"""
|
|
I/O-Modus pro GPIOA-Pin (standardmäßig ``1`` / Eingang)
|
|
|
|
- ``1`` Eingang
|
|
- ``0`` Ausgang
|
|
"""
|
|
|
|
MCP_IODIRB = 0x01
|
|
"""
|
|
I/O-Modus pro GPIOB-Pin (standardmäßig ``1`` / Eingang)
|
|
|
|
- ``1`` Eingang
|
|
- ``0`` Ausgang
|
|
"""
|
|
|
|
MCP_IPOLA = 0x02
|
|
"""
|
|
Polarität pro GPIOA-Pin.
|
|
|
|
- ``1`` Low wenn aktiv
|
|
- ``0`` High wenn aktiv
|
|
|
|
Die Einstellung hatte im Test nur bei als Eingängen konfigurierten Pins
|
|
einen Effekt. Invertierte Ausgänge setzt die Library deswegen auf den
|
|
entgegengesetzten Wert.
|
|
"""
|
|
|
|
MCP_IPOLB = 0x03
|
|
"""Polarität pro GPIOB-Pin. Invertiert wenn Bit gesetzt."""
|
|
|
|
MCP_GPINTENA = 0x04
|
|
"""
|
|
Aktiviert einen Interrupt bei Zustandswechsel eines GPIOA-Pins.
|
|
|
|
Das Auslöen von Interrupts
|
|
erfordert das Setzen der Register ``DEFVAL`` und ``INTCON``.
|
|
"""
|
|
|
|
MCP_GPINTENB = 0x05
|
|
"""
|
|
Aktiviert einen Interrupt bei Zustandswechsel eines GPIOB-Pins.
|
|
|
|
Das Auslöen von Interrupts
|
|
erfordert das Setzen der Register ``DEFVAL`` und ``INTCON``.
|
|
"""
|
|
|
|
MCP_DEFVALA = 0x06
|
|
"""
|
|
Vergleichsregister für Interrupt bei Zustandswechsel eines GPIOA-Pins.
|
|
|
|
Wenn mittels ``GPINTEN`` und ``INTCON``-Register ein vergleichender
|
|
Interrupt konfiguriert wurde, verursacht der Wechsel des Pin-Zustands
|
|
auf den gegenteiligen Wert von ``DEFVAL`` einen Interrupt.
|
|
"""
|
|
|
|
MCP_DEFVALB = 0x07
|
|
"""
|
|
Vergleichsregister für Interrupt bei Zustandswechsel eines GPIOB-Pins.
|
|
|
|
Wenn mittels ``GPINTEN`` und ``INTCON``-Register ein vergleichender
|
|
Interrupt konfiguriert wurde, verursacht der Wechsel des Pin-Zustands
|
|
auf den gegenteiligen Wert von ``DEFVAL`` einen Interrupt.
|
|
"""
|
|
|
|
MCP_INTCONA = 0x08
|
|
"""
|
|
Wahl des Interruptmodus für GPIOA.
|
|
|
|
- ``1`` Der Wert des Pins wird mit dem entsprechenden Bit im ``DEFVAL``-Register
|
|
verglichen. Beim gegenteiligen Wert erfolgt ein Interrupt.
|
|
|
|
- ``0`` Ein Interrupt wird bei jeder Zustandsänderung des Pins ausgelöst.
|
|
"""
|
|
|
|
MCP_INTCONB = 0x09
|
|
"""
|
|
Wahl des Interruptmodus für GPIOB.
|
|
|
|
- ``1`` Der Wert des Pins wird mit dem entsprechenden Bit im ``DEFVAL``-Register
|
|
verglichen. Beim gegenteiligen Wert erfolgt ein Interrupt.
|
|
|
|
- ``0`` Ein Interrupt wird bei jeder Zustandsänderung des Pins ausgelöst.
|
|
"""
|
|
|
|
MCP_IOCON = 0x0a
|
|
"""
|
|
Konfiguration für den I2C-Portexpander.
|
|
|
|
Bit ``7``: **BANK**
|
|
Ändert die Adressierung der Register. Dieses Programm verwendet die
|
|
Standardkonfiguration 0, also nicht ändern.
|
|
|
|
Bit ``6``: **MIRROR**
|
|
Wenn gesetzt, sind beide Interrupt-Pins ``INTA`` und ``INTB`` miteinander
|
|
verodert, d.h. ein Interrupt auf einem der zwei Ports aktiviert beide Pins.
|
|
|
|
Bit ``5``: **SEQOP**
|
|
Sequenzieller Modus, inkrementiert den Adresszähler bei jedem Zugriff.
|
|
|
|
- ``1`` Sequenzieller Modus deaktiviert
|
|
- ``0`` Sequenzieller Modus aktiviert
|
|
|
|
Bit ``4``: **DISSLW**
|
|
Slew-Rate control
|
|
|
|
Bit ``3``: nicht implementiert
|
|
|
|
Bit ``2``: **ODR**
|
|
Konfiguriert den Interruptpin als Open-Drain-Ausgang, wenn gesetzt.
|
|
Wenn nicht gesetzt, ist der Interruptpin ein aktiv treibender Ausgang
|
|
mit der in Bit ``1`` festgelegten Polarität.
|
|
|
|
Bit ``1``: **INTPOL**
|
|
Polarität des Interrupt-Pins
|
|
|
|
- ``1`` High wenn aktiv
|
|
- ``0`` Low wenn aktiv
|
|
|
|
Bit ``0``: nicht implementiert
|
|
"""
|
|
|
|
MCP_GPPUA = 0x0c
|
|
"""GPIOA-Pullup-Widerstandsregister. 100k-Pullup aktiviert, wenn Bit gesetzt."""
|
|
|
|
MCP_GPPUB = 0x0d
|
|
"""GPIOB-Pullup-Widerstandsregister. 100k-Pullup aktiviert, wenn Bit gesetzt."""
|
|
|
|
MCP_INTFA = 0x0e
|
|
"""
|
|
Interrupt-Flag-Register. Wurde ein Interrupt auf GPIOA ausgelöst, ist
|
|
das entsprechende Bit in diesem Register gesetzt. Read-only.
|
|
"""
|
|
|
|
MCP_INTFB = 0x0f
|
|
"""
|
|
Interrupt-Flag-Register. Wurde ein Interrupt auf GPIOB ausgelöst, ist
|
|
das entsprechende Bit in diesem Register gesetzt. Read-only.
|
|
"""
|
|
|
|
MCP_INTCAPA = 0x10
|
|
"""
|
|
Interrupt-Capture-Register. Wurde ein Interrupt auf GPIOA ausgelöst,
|
|
speichert dieses Register den entsprechenden Wert.
|
|
"""
|
|
|
|
MCP_INTCAPB = 0x11
|
|
"""
|
|
Interrupt-Capture-Register. Wurde ein Interrupt auf GPIOB ausgelöst,
|
|
speichert dieses Register den entsprechenden Wert.
|
|
"""
|
|
|
|
MCP_GPIOA = 0x12
|
|
"""
|
|
GPIOA-Portregister. Wert entspricht dem Zustand der GPIO-Pins.
|
|
Wird in dieses Register geschrieben, wird das ``OLAT``-Register verändert.
|
|
"""
|
|
|
|
MCP_GPIOB = 0x13
|
|
"""
|
|
GPIOA-Portregister. Wert entspricht dem Zustand der GPIO-Pins.
|
|
Wird in dieses Register geschrieben, wird das ``OLAT``-Register verändert.
|
|
"""
|
|
|
|
MCP_OLATA = 0x14
|
|
"""
|
|
GPIOA-Output-Latch-Register. Wert entspricht dem Zustand der Output-Latches,
|
|
die wiederum die als Output konfigurierten Pins ansteuern.
|
|
"""
|
|
|
|
MCP_OLATB = 0x15
|
|
"""
|
|
GPIOB-Output-Latch-Register. Wert entspricht dem Zustand der Output-Latches,
|
|
die wiederum die als Output konfigurierten Pins ansteuern.
|
|
"""
|
|
|
|
|
|
class PinConfigInvalid(Exception):
|
|
|
|
def __init__(self, cfg_str: str):
|
|
super().__init__(f'MCP23017 pin config {cfg_str} invalid')
|
|
|
|
|
|
class _MCP23017Port(Enum):
|
|
"""IO-Port des MCP23017 (A/B)"""
|
|
|
|
A = 0
|
|
B = 1
|
|
|
|
def reg(self, reg_a: int) -> int:
|
|
return reg_a + self.value
|
|
|
|
|
|
@dataclass
|
|
class _MCP23017Device:
|
|
"""
|
|
Ein einzelnes mit einen MCP23017 I2C-Portexpander verbundenes
|
|
Eingabe/Ausgabegerät.
|
|
"""
|
|
|
|
i2c_address: int
|
|
"""I2C-Adresse des MCP23017"""
|
|
|
|
port: _MCP23017Port
|
|
"""IO-Port des MCP23017 (A/B)"""
|
|
|
|
pin: int
|
|
"""IO-Pin des MCP23017 (0-7)"""
|
|
|
|
invert: bool
|
|
"""Zustand des Pins invertieren"""
|
|
|
|
@classmethod
|
|
def from_config(cls, cfg_str: str) -> '_MCP23017Device':
|
|
"""
|
|
Parst einen Konfigurationsstring und erstellt daraus
|
|
ein neues ``_MCP23017Device``-Objekt.
|
|
|
|
Der Konfigurationsstring hat folgendes Format:
|
|
``'I2C_ADDR/PIN'``
|
|
|
|
Beispiel: ``0x27/B0`` (MCP23017 mit I2C-Adresse 0x27, Pin B0)
|
|
|
|
Um den Zustand eines Geräts zu invertieren, einfach ``/!``
|
|
an den Konfigurationsstring anfügen: ``0x27/B0/!``
|
|
|
|
:param cfg_str: Konfigurationsstring
|
|
:return: Neues ``_MCP23017Device``-Objekt
|
|
"""
|
|
|
|
cfg_parts = cfg_str.split('/')
|
|
if len(cfg_parts) < 2:
|
|
raise PinConfigInvalid(cfg_str)
|
|
|
|
i2c_addr_str = cfg_parts[0]
|
|
port_str = cfg_parts[1][0]
|
|
pin_str = cfg_parts[1][1:]
|
|
|
|
try:
|
|
i2c_addr = int(i2c_addr_str, 16)
|
|
except ValueError as e:
|
|
raise PinConfigInvalid(cfg_str) from e
|
|
|
|
try:
|
|
port = _MCP23017Port[port_str]
|
|
except KeyError as e:
|
|
raise PinConfigInvalid(cfg_str) from e
|
|
|
|
try:
|
|
pin = int(pin_str)
|
|
except ValueError as e:
|
|
raise PinConfigInvalid(cfg_str) from e
|
|
|
|
invert = False
|
|
|
|
if len(cfg_parts) >= 3:
|
|
attr_str = cfg_parts[2]
|
|
|
|
invert = '!' in attr_str
|
|
|
|
return cls(i2c_addr, port, pin, invert)
|
|
|
|
|
|
class Io(io.Io):
|
|
|
|
def __init__(self, app: models.AppInterface):
|
|
super().__init__()
|
|
|
|
self.cfg = app.get_cfg()
|
|
|
|
self.bus = smbus.SMBus(self.cfg.i2c_bus_id)
|
|
|
|
self.i2c_cache: Dict[Tuple[int, int], int] = {}
|
|
|
|
self.mcp_addresses = set()
|
|
self.output_devices: Dict[str, _MCP23017Device] = {}
|
|
self.input_devices: Dict[str, _MCP23017Device] = {}
|
|
|
|
# Parse config and initialize pins
|
|
for key, cfg_str in self.cfg.input_devices.items():
|
|
device = _MCP23017Device.from_config(cfg_str)
|
|
self.mcp_addresses.add(device.i2c_address)
|
|
self.input_devices[key] = device
|
|
|
|
for key, cfg_str in self.cfg.output_devices.items():
|
|
device = _MCP23017Device.from_config(cfg_str)
|
|
self.mcp_addresses.add(device.i2c_address)
|
|
self.output_devices[key] = device
|
|
|
|
def _i2c_read_byte(self,
|
|
i2c_address: int,
|
|
register: int,
|
|
use_cache=False) -> int:
|
|
"""
|
|
Lese ein Byte Daten von einem I2C-Gerät
|
|
|
|
:param i2c_address: I2C-Adresse
|
|
:param register: I2C-Register
|
|
:param use_cache: Daten zwischenspeichern und bei erneutem
|
|
Aufruf nicht erneut abfragen.
|
|
:return: Datenbyte
|
|
"""
|
|
key = (i2c_address, register)
|
|
|
|
if use_cache and key in self.i2c_cache:
|
|
return self.i2c_cache[key]
|
|
|
|
data = self.bus.read_byte_data(i2c_address, register)
|
|
|
|
if use_cache:
|
|
self.i2c_cache[key] = data
|
|
|
|
return data
|
|
|
|
def _i2c_write_byte(self, i2c_address: int, register: int, value: int):
|
|
"""
|
|
Schreibe ein Byte Daten an ein I2C-Gerät
|
|
|
|
:param i2c_address: I2C-Adresse
|
|
:param register: I2C-Register
|
|
:param value: Datenbyte
|
|
"""
|
|
self.bus.write_byte_data(i2c_address, register, value)
|
|
|
|
def _i2c_read_bit(self,
|
|
i2c_address: int,
|
|
register: int,
|
|
bit: int,
|
|
use_cache=False) -> bool:
|
|
"""
|
|
Lese ein Bit Daten von einem I2C-Gerät
|
|
|
|
:param i2c_address: I2C-Adresse
|
|
:param register: I2C-Register
|
|
:param bit: Nummer des Bits (0-7)
|
|
:param use_cache: Daten zwischenspeichern und bei erneutem
|
|
Aufruf nicht erneut abfragen.
|
|
:return Datenbit
|
|
"""
|
|
data = self._i2c_read_byte(i2c_address, register, use_cache)
|
|
bitmask = 1 << bit
|
|
return bool(data & bitmask)
|
|
|
|
def _i2c_write_bit(self, i2c_address: int, register: int, bit: int,
|
|
value: bool):
|
|
"""
|
|
Schreibe ein Bit Daten an ein I2C-Gerät
|
|
|
|
:param i2c_address: I2C-Adresse
|
|
:param register: I2C-Register
|
|
:param bit: Nummer des Bits (0-7)
|
|
:param value: Datenbit
|
|
"""
|
|
data = self._i2c_read_byte(i2c_address, register)
|
|
bitmask = 1 << bit
|
|
|
|
if value:
|
|
data |= bitmask # Maskiertes Bit setzen
|
|
else:
|
|
data &= ~bitmask # Maskiertes Bit löschen
|
|
|
|
self._i2c_write_byte(i2c_address, register, data)
|
|
|
|
def _i2c_clear_cache(self):
|
|
"""
|
|
Leere den I2C-Cache
|
|
(verwendet von ``_i2c_read_bit()`` und ``_i2c_read_byte()``).
|
|
"""
|
|
self.i2c_cache = {}
|
|
|
|
def _configure_mcp(self, i2c_address: int):
|
|
"""Schreibe die initiale Konfiguration an ein MCP23017-Gerät"""
|
|
# I/O-Modus: Lesend
|
|
self._i2c_write_byte(i2c_address, MCP_IODIRA, 0xff)
|
|
self._i2c_write_byte(i2c_address, MCP_IODIRB, 0xff)
|
|
|
|
# I/O-Polarität: Active-high
|
|
self._i2c_write_byte(i2c_address, MCP_IPOLA, 0)
|
|
self._i2c_write_byte(i2c_address, MCP_IPOLB, 0)
|
|
|
|
# Interrupt aus
|
|
self._i2c_write_byte(i2c_address, MCP_GPINTENA, 0)
|
|
self._i2c_write_byte(i2c_address, MCP_GPINTENB, 0)
|
|
self._i2c_write_byte(i2c_address, MCP_DEFVALA, 0)
|
|
self._i2c_write_byte(i2c_address, MCP_DEFVALB, 0)
|
|
self._i2c_write_byte(i2c_address, MCP_INTCONA, 0)
|
|
self._i2c_write_byte(i2c_address, MCP_INTCONB, 0)
|
|
|
|
# Interrupt-Polarität: Active-high
|
|
# Interrupt-Ports spiegeln
|
|
self._i2c_write_byte(i2c_address, MCP_IOCON, 0b01000010)
|
|
|
|
# Pullup aus
|
|
self._i2c_write_byte(i2c_address, MCP_GPPUA, 0)
|
|
self._i2c_write_byte(i2c_address, MCP_GPPUB, 0)
|
|
|
|
# Outputs aus
|
|
self._i2c_write_byte(i2c_address, MCP_OLATA, 0)
|
|
self._i2c_write_byte(i2c_address, MCP_OLATB, 0)
|
|
|
|
def _configure_input_device(self, device: _MCP23017Device):
|
|
"""
|
|
Konfiguriere einen MCP-Pin als Eingabegerät
|
|
|
|
:param device: Gerätedefinition
|
|
"""
|
|
if device.invert:
|
|
self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_IPOLA),
|
|
device.pin, True)
|
|
|
|
self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_GPINTENA),
|
|
device.pin, True)
|
|
|
|
def _configure_output_device(self, device: _MCP23017Device):
|
|
"""
|
|
Konfiguriere einen MCP-Pin als Ausgabegerät
|
|
|
|
:param device: Gerätedefinition
|
|
"""
|
|
if device.invert:
|
|
# self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_IPOLA),
|
|
# device.pin, True)
|
|
self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_OLATA),
|
|
device.pin, True)
|
|
|
|
self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_IODIRA),
|
|
device.pin, False)
|
|
|
|
def _read_interrupt(self) -> Optional[str]:
|
|
"""
|
|
Rufe ab, welches Eingabegerät den Interrupt ausgelöst hat
|
|
|
|
:return: Name des Eingabegeräts (oder None)
|
|
"""
|
|
self._i2c_clear_cache()
|
|
|
|
for key, device in self.input_devices.items():
|
|
if self._i2c_read_bit(device.i2c_address,
|
|
device.port.reg(MCP_INTFA), device.pin,
|
|
True):
|
|
return key
|
|
|
|
return None
|
|
|
|
def _read_inputs(self) -> Dict[str, bool]:
|
|
"""
|
|
Lese die Zustände aller Eingabegeräte aus
|
|
|
|
:return: Dict(Gerätename => Zustand)
|
|
"""
|
|
res = {}
|
|
self._i2c_clear_cache()
|
|
|
|
for key, device in self.input_devices.items():
|
|
res[key] = self._i2c_read_bit(device.i2c_address,
|
|
device.port.reg(MCP_GPIOA),
|
|
device.pin, True)
|
|
|
|
return res
|
|
|
|
def _interrupt_handler(self, int_pin: int): # pylint: disable=unused-argument
|
|
"""
|
|
Diese Funktion wird von der RPi-GPIO-Library bei einem Interrupt des MCP23017
|
|
(Zustandswechsel eines Eingabegeräts) aufgerufen.
|
|
|
|
Es wird abgefragt, welcher Input den Interrupt ausgelöst hat und der
|
|
entsprechende Input-Callback ausgelöst.
|
|
"""
|
|
key = self._read_interrupt()
|
|
if key is None:
|
|
return
|
|
|
|
time.sleep(self.cfg.gpio_delay)
|
|
|
|
input_states = self._read_inputs()
|
|
if key not in input_states:
|
|
return
|
|
|
|
if input_states[key]:
|
|
logging.debug('%s pressed', key)
|
|
else:
|
|
logging.debug('%s released', key)
|
|
self._trigger_cb(key)
|
|
|
|
def write_output(self, key: str, val: bool):
|
|
device = self.output_devices[key]
|
|
if device.invert:
|
|
val = not val
|
|
|
|
self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_OLATA),
|
|
device.pin, val)
|
|
|
|
def start(self):
|
|
for i2c_address in self.mcp_addresses:
|
|
self._configure_mcp(i2c_address)
|
|
|
|
for device in self.input_devices.values():
|
|
self._configure_input_device(device)
|
|
|
|
for device in self.output_devices.values():
|
|
self._configure_output_device(device)
|
|
|
|
# clear interrupts by reading inputs
|
|
self._read_inputs()
|
|
|
|
GPIO.setmode(GPIO.BCM)
|
|
GPIO.setwarnings(False)
|
|
|
|
GPIO.setup(self.cfg.gpio_interrupt,
|
|
GPIO.IN,
|
|
pull_up_down=GPIO.PUD_DOWN)
|
|
|
|
GPIO.add_event_detect(self.cfg.gpio_interrupt,
|
|
GPIO.RISING,
|
|
callback=self._interrupt_handler,
|
|
bouncetime=10)
|
|
|
|
def stop(self):
|
|
GPIO.cleanup()
|