# coding=utf-8 import logging import time from dataclasses import dataclass from enum import Enum from typing import Dict, Optional, Tuple from RPi import GPIO # pylint: disable=import-error import smbus from tsgrain_controller import io, models # MCP-Register # Quelle: https://ww1.microchip.com/downloads/en/devicedoc/20001952c.pdf # Seite 16, Table 3-3, 3-5 MCP_IODIRA = 0x00 """ I/O-Modus pro GPIOA-Pin (standardmäßig ``1`` / Eingang) - ``1`` Eingang - ``0`` Ausgang """ MCP_IODIRB = 0x01 """ I/O-Modus pro GPIOB-Pin (standardmäßig ``1`` / Eingang) - ``1`` Eingang - ``0`` Ausgang """ MCP_IPOLA = 0x02 """ Polarität pro GPIOA-Pin. - ``1`` Low wenn aktiv - ``0`` High wenn aktiv Die Einstellung hatte im Test nur bei als Eingängen konfigurierten Pins einen Effekt. Invertierte Ausgänge setzt die Library deswegen auf den entgegengesetzten Wert. """ MCP_IPOLB = 0x03 """Polarität pro GPIOB-Pin. Invertiert wenn Bit gesetzt.""" MCP_GPINTENA = 0x04 """ Aktiviert einen Interrupt bei Zustandswechsel eines GPIOA-Pins. Das Auslöen von Interrupts erfordert das Setzen der Register ``DEFVAL`` und ``INTCON``. """ MCP_GPINTENB = 0x05 """ Aktiviert einen Interrupt bei Zustandswechsel eines GPIOB-Pins. Das Auslöen von Interrupts erfordert das Setzen der Register ``DEFVAL`` und ``INTCON``. """ MCP_DEFVALA = 0x06 """ Vergleichsregister für Interrupt bei Zustandswechsel eines GPIOA-Pins. Wenn mittels ``GPINTEN`` und ``INTCON``-Register ein vergleichender Interrupt konfiguriert wurde, verursacht der Wechsel des Pin-Zustands auf den gegenteiligen Wert von ``DEFVAL`` einen Interrupt. """ MCP_DEFVALB = 0x07 """ Vergleichsregister für Interrupt bei Zustandswechsel eines GPIOB-Pins. Wenn mittels ``GPINTEN`` und ``INTCON``-Register ein vergleichender Interrupt konfiguriert wurde, verursacht der Wechsel des Pin-Zustands auf den gegenteiligen Wert von ``DEFVAL`` einen Interrupt. """ MCP_INTCONA = 0x08 """ Wahl des Interruptmodus für GPIOA. - ``1`` Der Wert des Pins wird mit dem entsprechenden Bit im ``DEFVAL``-Register verglichen. Beim gegenteiligen Wert erfolgt ein Interrupt. - ``0`` Ein Interrupt wird bei jeder Zustandsänderung des Pins ausgelöst. """ MCP_INTCONB = 0x09 """ Wahl des Interruptmodus für GPIOB. - ``1`` Der Wert des Pins wird mit dem entsprechenden Bit im ``DEFVAL``-Register verglichen. Beim gegenteiligen Wert erfolgt ein Interrupt. - ``0`` Ein Interrupt wird bei jeder Zustandsänderung des Pins ausgelöst. """ MCP_IOCON = 0x0a """ Konfiguration für den I2C-Portexpander. Bit ``7``: **BANK** Ändert die Adressierung der Register. Dieses Programm verwendet die Standardkonfiguration 0, also nicht ändern. Bit ``6``: **MIRROR** Wenn gesetzt, sind beide Interrupt-Pins ``INTA`` und ``INTB`` miteinander verodert, d.h. ein Interrupt auf einem der zwei Ports aktiviert beide Pins. Bit ``5``: **SEQOP** Sequenzieller Modus, inkrementiert den Adresszähler bei jedem Zugriff. - ``1`` Sequenzieller Modus deaktiviert - ``0`` Sequenzieller Modus aktiviert Bit ``4``: **DISSLW** Slew-Rate control Bit ``3``: nicht implementiert Bit ``2``: **ODR** Konfiguriert den Interruptpin als Open-Drain-Ausgang, wenn gesetzt. Wenn nicht gesetzt, ist der Interruptpin ein aktiv treibender Ausgang mit der in Bit ``1`` festgelegten Polarität. Bit ``1``: **INTPOL** Polarität des Interrupt-Pins - ``1`` High wenn aktiv - ``0`` Low wenn aktiv Bit ``0``: nicht implementiert """ MCP_GPPUA = 0x0c """GPIOA-Pullup-Widerstandsregister. 100k-Pullup aktiviert, wenn Bit gesetzt.""" MCP_GPPUB = 0x0d """GPIOB-Pullup-Widerstandsregister. 100k-Pullup aktiviert, wenn Bit gesetzt.""" MCP_INTFA = 0x0e """ Interrupt-Flag-Register. Wurde ein Interrupt auf GPIOA ausgelöst, ist das entsprechende Bit in diesem Register gesetzt. Read-only. """ MCP_INTFB = 0x0f """ Interrupt-Flag-Register. Wurde ein Interrupt auf GPIOB ausgelöst, ist das entsprechende Bit in diesem Register gesetzt. Read-only. """ MCP_INTCAPA = 0x10 """ Interrupt-Capture-Register. Wurde ein Interrupt auf GPIOA ausgelöst, speichert dieses Register den entsprechenden Wert. """ MCP_INTCAPB = 0x11 """ Interrupt-Capture-Register. Wurde ein Interrupt auf GPIOB ausgelöst, speichert dieses Register den entsprechenden Wert. """ MCP_GPIOA = 0x12 """ GPIOA-Portregister. Wert entspricht dem Zustand der GPIO-Pins. Wird in dieses Register geschrieben, wird das ``OLAT``-Register verändert. """ MCP_GPIOB = 0x13 """ GPIOA-Portregister. Wert entspricht dem Zustand der GPIO-Pins. Wird in dieses Register geschrieben, wird das ``OLAT``-Register verändert. """ MCP_OLATA = 0x14 """ GPIOA-Output-Latch-Register. Wert entspricht dem Zustand der Output-Latches, die wiederum die als Output konfigurierten Pins ansteuern. """ MCP_OLATB = 0x15 """ GPIOB-Output-Latch-Register. Wert entspricht dem Zustand der Output-Latches, die wiederum die als Output konfigurierten Pins ansteuern. """ class PinConfigInvalid(Exception): def __init__(self, cfg_str: str): super().__init__(f'MCP23017 pin config {cfg_str} invalid') class _MCP23017Port(Enum): """IO-Port des MCP23017 (A/B)""" A = 0 B = 1 def reg(self, reg_a: int) -> int: return reg_a + self.value @dataclass class _MCP23017Device: """ Ein einzelnes mit einen MCP23017 I2C-Portexpander verbundenes Eingabe/Ausgabegerät. """ i2c_address: int """I2C-Adresse des MCP23017""" port: _MCP23017Port """IO-Port des MCP23017 (A/B)""" pin: int """IO-Pin des MCP23017 (0-7)""" invert: bool """Zustand des Pins invertieren""" @classmethod def from_config(cls, cfg_str: str) -> '_MCP23017Device': """ Parst einen Konfigurationsstring und erstellt daraus ein neues ``_MCP23017Device``-Objekt. Der Konfigurationsstring hat folgendes Format: ``'I2C_ADDR/PIN'`` Beispiel: ``0x27/B0`` (MCP23017 mit I2C-Adresse 0x27, Pin B0) Um den Zustand eines Geräts zu invertieren, einfach ``/!`` an den Konfigurationsstring anfügen: ``0x27/B0/!`` :param cfg_str: Konfigurationsstring :return: Neues ``_MCP23017Device``-Objekt """ cfg_parts = cfg_str.split('/') if len(cfg_parts) < 2: raise PinConfigInvalid(cfg_str) i2c_addr_str = cfg_parts[0] port_str = cfg_parts[1][0] pin_str = cfg_parts[1][1:] try: i2c_addr = int(i2c_addr_str, 16) except ValueError as e: raise PinConfigInvalid(cfg_str) from e try: port = _MCP23017Port[port_str] except KeyError as e: raise PinConfigInvalid(cfg_str) from e try: pin = int(pin_str) except ValueError as e: raise PinConfigInvalid(cfg_str) from e invert = False if len(cfg_parts) >= 3: attr_str = cfg_parts[2] invert = '!' in attr_str return cls(i2c_addr, port, pin, invert) class Io(io.Io): def __init__(self, app: models.AppInterface): super().__init__() self.cfg = app.get_cfg() self.bus = smbus.SMBus(self.cfg.i2c_bus_id) self.i2c_cache: Dict[Tuple[int, int], int] = {} self.mcp_addresses = set() self.output_devices: Dict[str, _MCP23017Device] = {} self.input_devices: Dict[str, _MCP23017Device] = {} # Parse config and initialize pins for key, cfg_str in self.cfg.input_devices.items(): device = _MCP23017Device.from_config(cfg_str) self.mcp_addresses.add(device.i2c_address) self.input_devices[key] = device for key, cfg_str in self.cfg.output_devices.items(): device = _MCP23017Device.from_config(cfg_str) self.mcp_addresses.add(device.i2c_address) self.output_devices[key] = device def _i2c_read_byte(self, i2c_address: int, register: int, use_cache=False) -> int: """ Lese ein Byte Daten von einem I2C-Gerät :param i2c_address: I2C-Adresse :param register: I2C-Register :param use_cache: Daten zwischenspeichern und bei erneutem Aufruf nicht erneut abfragen. :return: Datenbyte """ key = (i2c_address, register) if use_cache and key in self.i2c_cache: return self.i2c_cache[key] data = self.bus.read_byte_data(i2c_address, register) if use_cache: self.i2c_cache[key] = data return data def _i2c_write_byte(self, i2c_address: int, register: int, value: int): """ Schreibe ein Byte Daten an ein I2C-Gerät :param i2c_address: I2C-Adresse :param register: I2C-Register :param value: Datenbyte """ self.bus.write_byte_data(i2c_address, register, value) def _i2c_read_bit(self, i2c_address: int, register: int, bit: int, use_cache=False) -> bool: """ Lese ein Bit Daten von einem I2C-Gerät :param i2c_address: I2C-Adresse :param register: I2C-Register :param bit: Nummer des Bits (0-7) :param use_cache: Daten zwischenspeichern und bei erneutem Aufruf nicht erneut abfragen. :return Datenbit """ data = self._i2c_read_byte(i2c_address, register, use_cache) bitmask = 1 << bit return bool(data & bitmask) def _i2c_write_bit(self, i2c_address: int, register: int, bit: int, value: bool): """ Schreibe ein Bit Daten an ein I2C-Gerät :param i2c_address: I2C-Adresse :param register: I2C-Register :param bit: Nummer des Bits (0-7) :param value: Datenbit """ data = self._i2c_read_byte(i2c_address, register) bitmask = 1 << bit if value: data |= bitmask # Maskiertes Bit setzen else: data &= ~bitmask # Maskiertes Bit löschen self._i2c_write_byte(i2c_address, register, data) def _i2c_clear_cache(self): """ Leere den I2C-Cache (verwendet von ``_i2c_read_bit()`` und ``_i2c_read_byte()``). """ self.i2c_cache = {} def _configure_mcp(self, i2c_address: int): """Schreibe die initiale Konfiguration an ein MCP23017-Gerät""" # I/O-Modus: Lesend self._i2c_write_byte(i2c_address, MCP_IODIRA, 0xff) self._i2c_write_byte(i2c_address, MCP_IODIRB, 0xff) # I/O-Polarität: Active-high self._i2c_write_byte(i2c_address, MCP_IPOLA, 0) self._i2c_write_byte(i2c_address, MCP_IPOLB, 0) # Interrupt aus self._i2c_write_byte(i2c_address, MCP_GPINTENA, 0) self._i2c_write_byte(i2c_address, MCP_GPINTENB, 0) self._i2c_write_byte(i2c_address, MCP_DEFVALA, 0) self._i2c_write_byte(i2c_address, MCP_DEFVALB, 0) self._i2c_write_byte(i2c_address, MCP_INTCONA, 0) self._i2c_write_byte(i2c_address, MCP_INTCONB, 0) # Interrupt-Polarität: Active-high # Interrupt-Ports spiegeln self._i2c_write_byte(i2c_address, MCP_IOCON, 0b01000010) # Pullup aus self._i2c_write_byte(i2c_address, MCP_GPPUA, 0) self._i2c_write_byte(i2c_address, MCP_GPPUB, 0) # Outputs aus self._i2c_write_byte(i2c_address, MCP_OLATA, 0) self._i2c_write_byte(i2c_address, MCP_OLATB, 0) def _configure_input_device(self, device: _MCP23017Device): """ Konfiguriere einen MCP-Pin als Eingabegerät :param device: Gerätedefinition """ if device.invert: self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_IPOLA), device.pin, True) self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_GPINTENA), device.pin, True) def _configure_output_device(self, device: _MCP23017Device): """ Konfiguriere einen MCP-Pin als Ausgabegerät :param device: Gerätedefinition """ if device.invert: # self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_IPOLA), # device.pin, True) self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_OLATA), device.pin, True) self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_IODIRA), device.pin, False) def _read_interrupt(self) -> Optional[str]: """ Rufe ab, welches Eingabegerät den Interrupt ausgelöst hat :return: Name des Eingabegeräts (oder None) """ self._i2c_clear_cache() for key, device in self.input_devices.items(): if self._i2c_read_bit(device.i2c_address, device.port.reg(MCP_INTFA), device.pin, True): return key return None def _read_inputs(self) -> Dict[str, bool]: """ Lese die Zustände aller Eingabegeräte aus :return: Dict(Gerätename => Zustand) """ res = {} self._i2c_clear_cache() for key, device in self.input_devices.items(): res[key] = self._i2c_read_bit(device.i2c_address, device.port.reg(MCP_GPIOA), device.pin, True) return res def _interrupt_handler(self, int_pin: int): # pylint: disable=unused-argument """ Diese Funktion wird von der RPi-GPIO-Library bei einem Interrupt des MCP23017 (Zustandswechsel eines Eingabegeräts) aufgerufen. Es wird abgefragt, welcher Input den Interrupt ausgelöst hat und der entsprechende Input-Callback ausgelöst. """ key = self._read_interrupt() if key is None: return time.sleep(self.cfg.gpio_delay) input_states = self._read_inputs() if key not in input_states: return if input_states[key]: logging.debug('%s pressed', key) else: logging.debug('%s released', key) self._trigger_cb(key) def write_output(self, key: str, val: bool): device = self.output_devices[key] if device.invert: val = not val self._i2c_write_bit(device.i2c_address, device.port.reg(MCP_OLATA), device.pin, val) def start(self): for i2c_address in self.mcp_addresses: self._configure_mcp(i2c_address) for device in self.input_devices.values(): self._configure_input_device(device) for device in self.output_devices.values(): self._configure_output_device(device) # clear interrupts by reading inputs self._read_inputs() GPIO.setmode(GPIO.BCM) GPIO.setwarnings(False) GPIO.setup(self.cfg.gpio_interrupt, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) GPIO.add_event_detect(self.cfg.gpio_interrupt, GPIO.RISING, callback=self._interrupt_handler, bouncetime=10) def stop(self): GPIO.cleanup()