Controller/tsgrain_controller/io/mcp23017.py
2022-02-22 08:27:28 +01:00

535 lines
15 KiB
Python

# coding=utf-8
import logging
import time
from dataclasses import dataclass
from enum import Enum
from typing import Dict, Optional, Tuple
from RPi import GPIO # pylint: disable=import-error
import smbus
from tsgrain_controller import io, models
# MCP-Register
# Quelle: https://ww1.microchip.com/downloads/en/devicedoc/20001952c.pdf
# Seite 16, Table 3-3, 3-5
MCP_IODIRA = 0x00
"""
I/O-Modus pro GPIOA-Pin (standardmäßig ``1`` / Eingang)
- ``1`` Eingang
- ``0`` Ausgang
"""
MCP_IODIRB = 0x01
"""
I/O-Modus pro GPIOB-Pin (standardmäßig ``1`` / Eingang)
- ``1`` Eingang
- ``0`` Ausgang
"""
MCP_IPOLA = 0x02
"""
Polarität pro GPIOA-Pin.
- ``1`` Low wenn aktiv
- ``0`` High wenn aktiv
Die Einstellung hatte im Test nur bei als Eingängen konfigurierten Pins
einen Effekt. Invertierte Ausgänge setzt die Library deswegen auf den
entgegengesetzten Wert.
"""
MCP_IPOLB = 0x03
"""Polarität pro GPIOB-Pin. Invertiert wenn Bit gesetzt."""
MCP_GPINTENA = 0x04
"""
Aktiviert einen Interrupt bei Zustandswechsel eines GPIOA-Pins.
Das Auslöen von Interrupts
erfordert das Setzen der Register ``DEFVAL`` und ``INTCON``.
"""
MCP_GPINTENB = 0x05
"""
Aktiviert einen Interrupt bei Zustandswechsel eines GPIOB-Pins.
Das Auslöen von Interrupts
erfordert das Setzen der Register ``DEFVAL`` und ``INTCON``.
"""
MCP_DEFVALA = 0x06
"""
Vergleichsregister für Interrupt bei Zustandswechsel eines GPIOA-Pins.
Wenn mittels ``GPINTEN`` und ``INTCON``-Register ein vergleichender
Interrupt konfiguriert wurde, verursacht der Wechsel des Pin-Zustands
auf den gegenteiligen Wert von ``DEFVAL`` einen Interrupt.
"""
MCP_DEFVALB = 0x07
"""
Vergleichsregister für Interrupt bei Zustandswechsel eines GPIOB-Pins.
Wenn mittels ``GPINTEN`` und ``INTCON``-Register ein vergleichender
Interrupt konfiguriert wurde, verursacht der Wechsel des Pin-Zustands
auf den gegenteiligen Wert von ``DEFVAL`` einen Interrupt.
"""
MCP_INTCONA = 0x08
"""
Wahl des Interruptmodus für GPIOA.
- ``1`` Der Wert des Pins wird mit dem entsprechenden Bit im ``DEFVAL``-Register
verglichen. Beim gegenteiligen Wert erfolgt ein Interrupt.
- ``0`` Ein Interrupt wird bei jeder Zustandsänderung des Pins ausgelöst.
"""
MCP_INTCONB = 0x09
"""
Wahl des Interruptmodus für GPIOB.
- ``1`` Der Wert des Pins wird mit dem entsprechenden Bit im ``DEFVAL``-Register
verglichen. Beim gegenteiligen Wert erfolgt ein Interrupt.
- ``0`` Ein Interrupt wird bei jeder Zustandsänderung des Pins ausgelöst.
"""
MCP_IOCON = 0x0a
"""
Konfiguration für den I2C-Portexpander.
Bit ``7``: **BANK**
Ändert die Adressierung der Register. Dieses Programm verwendet die
Standardkonfiguration 0, also nicht ändern.
Bit ``6``: **MIRROR**
Wenn gesetzt, sind beide Interrupt-Pins ``INTA`` und ``INTB`` miteinander
verodert, d.h. ein Interrupt auf einem der zwei Ports aktiviert beide Pins.
Bit ``5``: **SEQOP**
Sequenzieller Modus, inkrementiert den Adresszähler bei jedem Zugriff.
- ``1`` Sequenzieller Modus deaktiviert
- ``0`` Sequenzieller Modus aktiviert
Bit ``4``: **DISSLW**
Slew-Rate control
Bit ``3``: nicht implementiert
Bit ``2``: **ODR**
Konfiguriert den Interruptpin als Open-Drain-Ausgang, wenn gesetzt.
Wenn nicht gesetzt, ist der Interruptpin ein aktiv treibender Ausgang
mit der in Bit ``1`` festgelegten Polarität.
Bit ``1``: **INTPOL**
Polarität des Interrupt-Pins
- ``1`` High wenn aktiv
- ``0`` Low wenn aktiv
Bit ``0``: nicht implementiert
"""
MCP_GPPUA = 0x0c
"""GPIOA-Pullup-Widerstandsregister. 100k-Pullup aktiviert, wenn Bit gesetzt."""
MCP_GPPUB = 0x0d
"""GPIOB-Pullup-Widerstandsregister. 100k-Pullup aktiviert, wenn Bit gesetzt."""
MCP_INTFA = 0x0e
"""
Interrupt-Flag-Register. Wurde ein Interrupt auf GPIOA ausgelöst, ist
das entsprechende Bit in diesem Register gesetzt. Read-only.
"""
MCP_INTFB = 0x0f
"""
Interrupt-Flag-Register. Wurde ein Interrupt auf GPIOB ausgelöst, ist
das entsprechende Bit in diesem Register gesetzt. Read-only.
"""
MCP_INTCAPA = 0x10
"""
Interrupt-Capture-Register. Wurde ein Interrupt auf GPIOA ausgelöst,
speichert dieses Register den entsprechenden Wert.
"""
MCP_INTCAPB = 0x11
"""
Interrupt-Capture-Register. Wurde ein Interrupt auf GPIOB ausgelöst,
speichert dieses Register den entsprechenden Wert.
"""
MCP_GPIOA = 0x12
"""
GPIOA-Portregister. Wert entspricht dem Zustand der GPIO-Pins.
Wird in dieses Register geschrieben, wird das ``OLAT``-Register verändert.
"""
MCP_GPIOB = 0x13
"""
GPIOA-Portregister. Wert entspricht dem Zustand der GPIO-Pins.
Wird in dieses Register geschrieben, wird das ``OLAT``-Register verändert.
"""
MCP_OLATA = 0x14
"""
GPIOA-Output-Latch-Register. Wert entspricht dem Zustand der Output-Latches,
die wiederum die als Output konfigurierten Pins ansteuern.
"""
MCP_OLATB = 0x15
"""
GPIOB-Output-Latch-Register. Wert entspricht dem Zustand der Output-Latches,
die wiederum die als Output konfigurierten Pins ansteuern.
"""
class PinConfigInvalid(Exception):
def __init__(self, cfg_str: str):
super().__init__(f'MCP23017 pin config {cfg_str} invalid')
class _MCP23017Port(Enum):
"""IO-Port des MCP23017 (A/B)"""
A = 0
B = 1
def reg(self, reg_a: int) -> int:
return reg_a + self.value
@dataclass
class _MCP23017Device:
"""
Ein einzelnes mit einen MCP23017 I2C-Portexpander verbundenes
Eingabe/Ausgabegerät.
"""
i2c_address: int
"""I2C-Adresse des MCP23017"""
port: _MCP23017Port
"""IO-Port des MCP23017 (A/B)"""
pin: int
"""IO-Pin des MCP23017 (0-7)"""
invert: bool
"""Zustand des Pins invertieren"""
@classmethod
def from_config(cls, cfg_str: str) -> '_MCP23017Device':
"""
Parst einen Konfigurationsstring und erstellt daraus
ein neues ``_MCP23017Device``-Objekt.
Der Konfigurationsstring hat folgendes Format:
``'I2C_ADDR/PIN'``
Beispiel: ``0x27/B0`` (MCP23017 mit I2C-Adresse 0x27, Pin B0)
Um den Zustand eines Geräts zu invertieren, einfach ``/!``
an den Konfigurationsstring anfügen: ``0x27/B0/!``
:param cfg_str: Konfigurationsstring
:return: Neues ``_MCP23017Device``-Objekt
"""
cfg_parts = cfg_str.split('/')
if len(cfg_parts) < 2:
raise PinConfigInvalid(cfg_str)
i2c_addr_str = cfg_parts[0]
port_str = cfg_parts[1][0]
pin_str = cfg_parts[1][1:]
try:
i2c_addr = int(i2c_addr_str, 16)
except ValueError as e:
raise PinConfigInvalid(cfg_str) from e
try:
port = _MCP23017Port[port_str]
except KeyError as e:
raise PinConfigInvalid(cfg_str) from e
try:
pin = int(pin_str)
except ValueError as e:
raise PinConfigInvalid(cfg_str) from e
invert = False
if len(cfg_parts) >= 3:
attr_str = cfg_parts[2]
invert = '!' in attr_str
return cls(i2c_addr, port, pin, invert)
class Io(io.Io):
def __init__(self, app: models.AppInterface):
super().__init__()
self.cfg = app.get_cfg()
self.bus = smbus.SMBus(self.cfg.i2c_bus_id)
self.i2c_cache: Dict[Tuple[int, int], int] = {}
self.mcp_addresses = set()
self.output_devices: Dict[str, _MCP23017Device] = {}
self.input_devices: Dict[str, _MCP23017Device] = {}
# Parse config and initialize pins
for key, cfg_str in self.cfg.input_devices.items():
device = _MCP23017Device.from_config(cfg_str)
self.mcp_addresses.add(device.i2c_address)
self.input_devices[key] = device
for key, cfg_str in self.cfg.output_devices.items():
device = _MCP23017Device.from_config(cfg_str)
self.mcp_addresses.add(device.i2c_address)
self.output_devices[key] = device
def _i2c_read_byte(self,
i2c_address: int,
register: int,
use_cache=False) -> int:
"""
Lese ein Byte Daten von einem I2C-Gerät
:param i2c_address: I2C-Adresse
:param register: I2C-Register
:param use_cache: Daten zwischenspeichern und bei erneutem
Aufruf nicht erneut abfragen.
:return: Datenbyte
"""
key = (i2c_address, register)
if use_cache and key in self.i2c_cache:
return self.i2c_cache[key]
data = self.bus.read_byte_data(i2c_address, register)
if use_cache:
self.i2c_cache[key] = data
return data
def _i2c_write_byte(self, i2c_address: int, register: int, value: int):
"""
Schreibe ein Byte Daten an ein I2C-Gerät
:param i2c_address: I2C-Adresse
:param register: I2C-Register
:param value: Datenbyte
"""
self.bus.write_byte_data(i2c_address, register, value)
def _i2c_read_bit(self,
i2c_address: int,
register: int,
bit: int,
use_cache=False) -> bool:
"""
Lese ein Bit Daten von einem I2C-Gerät
:param i2c_address: I2C-Adresse
:param register: I2C-Register
:param bit: Nummer des Bits (0-7)
:param use_cache: Daten zwischenspeichern und bei erneutem
Aufruf nicht erneut abfragen.
:return Datenbit
"""
data = self._i2c_read_byte(i2c_address, register, use_cache)
bitmask = 1 << bit
return bool(data & bitmask)
def _i2c_write_bit(self, i2c_address: int, register: int, bit: int,
value: bool):
"""
Schreibe ein Bit Daten an ein I2C-Gerät
:param i2c_address: I2C-Adresse
:param register: I2C-Register
:param bit: Nummer des Bits (0-7)
:param value: Datenbit
"""
data = self._i2c_read_byte(i2c_address, register)
bitmask = 1 << bit
if value:
data |= bitmask # Maskiertes Bit setzen
else:
data &= ~bitmask # Maskiertes Bit löschen
self._i2c_write_byte(i2c_address, register, data)
def _i2c_clear_cache(self):
"""
Leere den I2C-Cache
(verwendet von ``_i2c_read_bit()`` und ``_i2c_read_byte()``).
"""
self.i2c_cache = {}
def _configure_mcp(self, i2c_address: int):
"""Schreibe die initiale Konfiguration an ein MCP23017-Gerät"""
# I/O-Modus: Lesend
self._i2c_write_byte(i2c_address, MCP_IODIRA, 0xff)
self._i2c_write_byte(i2c_address, MCP_IODIRB, 0xff)
# I/O-Polarität: Active-high
self._i2c_write_byte(i2c_address, MCP_IPOLA, 0)
self._i2c_write_byte(i2c_address, MCP_IPOLB, 0)
# Interrupt aus
self._i2c_write_byte(i2c_address, MCP_GPINTENA, 0)
self._i2c_write_byte(i2c_address, MCP_GPINTENB, 0)
self._i2c_write_byte(i2c_address, MCP_DEFVALA, 0)
self._i2c_write_byte(i2c_address, MCP_DEFVALB, 0)
self._i2c_write_byte(i2c_address, MCP_INTCONA, 0)
self._i2c_write_byte(i2c_address, MCP_INTCONB, 0)
# Interrupt-Polarität: Active-high
# Interrupt-Ports spiegeln
self._i2c_write_byte(i2c_address, MCP_IOCON, 0b01000010)
# Pullup aus
self._i2c_write_byte(i2c_address, MCP_GPPUA, 0)
self._i2c_write_byte(i2c_address, MCP_GPPUB, 0)
# Outputs aus
self._i2c_write_byte(i2c_address, MCP_OLATA, 0)
self._i2c_write_byte(i2c_address, MCP_OLATB, 0)
def _configure_input_device(self, device: _MCP23017Device):
"""
Konfiguriere einen MCP-Pin als Eingabegerät
:param device: Gerätedefinition
"""
if device.invert:
self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_IPOLA),
device.pin, True)
self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_GPINTENA),
device.pin, True)
def _configure_output_device(self, device: _MCP23017Device):
"""
Konfiguriere einen MCP-Pin als Ausgabegerät
:param device: Gerätedefinition
"""
if device.invert:
# self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_IPOLA),
# device.pin, True)
self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_OLATA),
device.pin, True)
self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_IODIRA),
device.pin, False)
def _read_interrupt(self) -> Optional[str]:
"""
Rufe ab, welches Eingabegerät den Interrupt ausgelöst hat
:return: Name des Eingabegeräts (oder None)
"""
self._i2c_clear_cache()
for key, device in self.input_devices.items():
if self._i2c_read_bit(device.i2c_address,
device.port.reg(MCP_INTFA), device.pin,
True):
return key
return None
def _read_inputs(self) -> Dict[str, bool]:
"""
Lese die Zustände aller Eingabegeräte aus
:return: Dict(Gerätename => Zustand)
"""
res = {}
self._i2c_clear_cache()
for key, device in self.input_devices.items():
res[key] = self._i2c_read_bit(device.i2c_address,
device.port.reg(MCP_GPIOA),
device.pin, True)
return res
def _interrupt_handler(self, int_pin: int): # pylint: disable=unused-argument
"""
Diese Funktion wird von der RPi-GPIO-Library bei einem Interrupt des MCP23017
(Zustandswechsel eines Eingabegeräts) aufgerufen.
Es wird abgefragt, welcher Input den Interrupt ausgelöst hat und der
entsprechende Input-Callback ausgelöst.
"""
key = self._read_interrupt()
if key is None:
return
time.sleep(self.cfg.gpio_delay)
input_states = self._read_inputs()
if key not in input_states:
return
if input_states[key]:
logging.debug('%s pressed', key)
else:
logging.debug('%s released', key)
self._trigger_cb(key)
def write_output(self, key: str, val: bool):
device = self.output_devices[key]
if device.invert:
val = not val
self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_OLATA),
device.pin, val)
def start(self):
for i2c_address in self.mcp_addresses:
self._configure_mcp(i2c_address)
for device in self.input_devices.values():
self._configure_input_device(device)
for device in self.output_devices.values():
self._configure_output_device(device)
# clear interrupts by reading inputs
self._read_inputs()
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(self.cfg.gpio_interrupt,
GPIO.IN,
pull_up_down=GPIO.PUD_DOWN)
GPIO.add_event_detect(self.cfg.gpio_interrupt,
GPIO.RISING,
callback=self._interrupt_handler,
bouncetime=10)
def stop(self):
GPIO.cleanup()