diff --git a/README.md b/README.md index 707e007..9359c6e 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ Automatically sets the loaded spool & filament in klipper by using NFC/RFID - [Use with Happy-Hare](#use-with-happy-hare) - [Use with Prusa's OpenPrintTag tags](#use-with-prusas-openprinttag-tags) - [Use with OpenTag3D tags](#use-with-opentag3d-tags) - - [Related projects](#related-projects) + - [See also](#see-also) - [Developer info](#developer-info) @@ -301,14 +301,12 @@ See Spoolman's API documentation [here](https://donkie.github.io/Spoolman/) to s You can also add extra fields in Spoolman for saving the data from the OpenTag3D tags. -## Related projects - -* [FilaMan](https://www.filaman.app/) - a filament management system. -* [esp_to_spoolman](https://github.com/dimbas80/esp_to_spoolman) - like nfc2klipper, but running on an ESP32. -* [OpenPrintTag Scanner](https://github.com/ryanch/openprinttag_scanner) - like esp_to_spoolman, but with display and for PrusaLink instead of Klipper. -* [spool2klipper](https://github.com/bofh69/spool2klipper) - to set the filament id (and other data) when the spool id is changed. -* [spoolman2slicer](https://github.com/bofh69/spoolman2slicer) - create filament config from Spoolman. -* [OpenTag3d](https://opentag3d.info/) tag format. +## See also +If nfc2klipper doesn't work for some reason, +[spool2klipper](https://github.com/bofh69/spool2klipper) can be use +to automatically update the `active_filament` variable whenever the spool +is changed in Moonraker (when changing it in the frontend for example). +That way `ASSERT_ACTIVE_FILAMENT` will still work correctly. ## Developer info diff --git a/lib/nfc_parsers.py b/lib/nfc_parsers.py index f54170c..ee11aab 100644 --- a/lib/nfc_parsers.py +++ b/lib/nfc_parsers.py @@ -4,6 +4,7 @@ """Tag parsers for different data formats""" import logging +import re from typing import Any, Dict, List, Optional, Protocol, Tuple import ndef # pylint: disable=import-error @@ -132,3 +133,496 @@ class TagIdentifierParser: identifier, ) return None, None + + +class OpenTag3DParser: + """Parser for OpenTag3D format tags + + Parses tags following the OpenTag3D specification from https://opentag3d.info/spec + Based on OpenTag3D spec version 0.012 + Extracts manufacturer, material, color, and other filament data, then creates + or matches entries in Spoolman. + """ + + def __init__( + self, + spoolman_client: Any, + filament_name_template: str, + filament_field_mapping: Dict[str, str], + spool_field_mapping: Dict[str, str], + ) -> None: + """Initialize with a Spoolman client instance + + Args: + spoolman_client: Client object with methods to interact with Spoolman API + filament_name_template: Template string for generating filament names from tag data + filament_field_mapping: Mapping from Spoolman filament fields to OpenTag3D fields + spool_field_mapping: Mapping from Spoolman spool fields to OpenTag3D fields + """ + self.spoolman_client = spoolman_client + self.filament_name_template = filament_name_template + self.filament_field_mapping = filament_field_mapping + self.spool_field_mapping = spool_field_mapping + + def _parse_rgba_to_hex(self, octets: bytes, offset: int) -> Optional[str]: + """Parse RGBA color at given offset and return hex string if not transparent black + + Args: + octets: Raw bytes from the NFC tag + offset: Byte offset to start reading RGBA (4 bytes) + + Returns: + Hex color string (e.g., "FF0000FF") or None if transparent black + """ + if len(octets) < offset + 4: + return None + + r = octets[offset] + g = octets[offset + 1] + b = octets[offset + 2] + a = octets[offset + 3] + + # Only return color if not transparent black (indicates no color) + if r == 0 and g == 0 and b == 0 and a == 0: + return None + + return f"{r:02x}{g:02x}{b:02x}{a:02x}" + + def _apply_field_mapping( + self, + tag_data: Dict[str, Any], + field_mapping: Dict[str, str], + base_data: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + """Apply field mapping from OpenTag3D data to Spoolman fields + + Args: + tag_data: Parsed OpenTag3D tag data + field_mapping: Mapping from Spoolman fields to OpenTag3D fields + base_data: Optional base dictionary to start with + + Returns: + Dictionary with mapped fields ready for Spoolman API + """ + result = base_data.copy() if base_data else {} + + for spoolman_field, ot3d_field in field_mapping.items(): + if ot3d_field in tag_data: + value = tag_data[ot3d_field] + # Handle nested fields (e.g., "extra.custom_field") + if "." in spoolman_field: + parts = spoolman_field.split(".", 1) + parent_key = parts[0] + child_key = parts[1] + if parent_key not in result: + result[parent_key] = {} + result[parent_key][child_key] = value + else: + result[spoolman_field] = value + + return result + + def _generate_filament_name(self, tag_data: Dict[str, Any]) -> str: + """Generate filament name from template using tag data + + Args: + tag_data: Parsed OpenTag3D tag data + + Returns: + Formatted filament name + """ + # Use string formatting with the template + try: + # Simple approach: format with all fields, then clean up + name = self.filament_name_template.format(**tag_data) + # Clean up extra spaces + name = " ".join(name.split()) + # Clean up trailing/leading dashes and spaces around dashes + # Remove trailing dash (with optional spaces) + name = re.sub(r"\s*-\s*$", "", name) + # Remove leading dash (with optional spaces) + name = re.sub(r"^\s*-\s*", "", name) + # Clean up double spaces again after dash removal + name = " ".join(name.split()) + return name + except (KeyError, ValueError) as ex: + logger.warning("Template formatting error: %s, using fallback", ex) + # Fallback to simple material_name + return tag_data.get("material_name", "Unknown") + + # pylint: disable=too-many-locals,too-many-branches,too-many-statements + def _parse_opentag3d_data(self, octets: bytes) -> Optional[Dict[str, Any]]: + """Parse OpenTag3D format data from tag octets + + Args: + octets: Raw bytes from the NDEF record payload + + Returns: + Dictionary with parsed data, or None if not a valid OpenTag3D tag + """ + # OpenTag3D data starts at offset 0x00 (new spec v0.010+) + # Check if we have enough data + if len(octets) < 0x64: # Minimum size for core fields + return None + + # Parse tag version (2 bytes, big endian) at offset 0x00 + tag_version = int.from_bytes(octets[0x00:0x02], byteorder="big") + + logger.debug( + "Parsing OpenTag3D version %d.%02d tag", + tag_version // 256, + tag_version % 256, + ) + + if tag_version < 0x000C: + # Too old, ignoring + return None + + # Parse base material (5 bytes UTF-8) at offset 0x02 + material_base = ( + octets[0x02:0x07].decode("utf-8", errors="ignore").rstrip("\x00") + ) + + # Parse material modifiers (5 bytes UTF-8) at offset 0x07 + material_mod = octets[0x07:0x0C].decode("utf-8", errors="ignore").rstrip("\x00") + + # Parse manufacturer (16 bytes UTF-8) at offset 0x1B + manufacturer = octets[0x1B:0x2B].decode("utf-8", errors="ignore").rstrip("\x00") + + # Parse color name (32 bytes UTF-8) at offset 0x2B + color_name = octets[0x2B:0x4B].decode("utf-8", errors="ignore").rstrip("\x00") + + # Parse color 1 (4 bytes RGBA) at offset 0x4B + color_1_hex = self._parse_rgba_to_hex(octets, 0x4B) + + # Parse target diameter (2 bytes, µm) at offset 0x5C + target_diameter = int.from_bytes(octets[0x5C:0x5E], byteorder="big") + diameter_mm = target_diameter / 1000.0 + + # Parse target weight (2 bytes, grams) at offset 0x5E + target_weight = int.from_bytes(octets[0x5E:0x60], byteorder="big") + + # Parse print temperature (1 byte, divided by 5) at offset 0x60 + print_temp_raw = octets[0x60] + print_temp = print_temp_raw * 5 + + # Parse bed temperature (1 byte, divided by 5) at offset 0x61 + bed_temp_raw = octets[0x61] + bed_temp = bed_temp_raw * 5 + + # Parse density (2 bytes, µg/cm³) at offset 0x62 + density_raw = int.from_bytes(octets[0x62:0x64], byteorder="big") + density = density_raw / 1000.0 + + # Build material name + material_name = material_base + if material_mod: + material_name = f"{material_base}-{material_mod}" + + result = { + "tag_version": tag_version, + "manufacturer": manufacturer, + "material_name": material_name, + "material_base": material_base, + "material_mod": material_mod, + "color_name": color_name, + "color_hex": color_1_hex, + "diameter_mm": diameter_mm, + "target_weight": target_weight, + "print_temp": print_temp, + "bed_temp": bed_temp, + "density": density, + } + + # Parse color 2 at offset 0x50 + color_2_hex = self._parse_rgba_to_hex(octets, 0x50) + if color_2_hex: + result["color_2_hex"] = color_2_hex + + # Parse color 3 at offset 0x54 + color_3_hex = self._parse_rgba_to_hex(octets, 0x54) + if color_3_hex: + result["color_3_hex"] = color_3_hex + + # Parse color 4 at offset 0x58 + color_4_hex = self._parse_rgba_to_hex(octets, 0x58) + if color_4_hex: + result["color_4_hex"] = color_4_hex + + # Parse online data URL (32 bytes ASCII) at 0x70 + if len(octets) >= 0x70 + 32: + online_url = ( + octets[0x70:0x90].decode("ascii", errors="ignore").rstrip("\x00") + ) + if online_url: + result["online_data_url"] = online_url + + # Parse extended fields if available (NTAG215/216) + # Extended fields start at 0x90 + if len(octets) >= 0x90 + 16: + # Parse serial number / batch ID (16 bytes UTF-8) at 0x90 + serial = octets[0x90:0xA0].decode("utf-8", errors="ignore").rstrip("\x00") + if serial: + result["serial"] = serial + + if len(octets) >= 0xA0 + 4: + # Parse manufacture date (4 bytes: year, year, month, day) at 0xA0 + mfg_year = int.from_bytes(octets[0xA0:0xA2], byteorder="big") + mfg_month = octets[0xA2] + mfg_day = octets[0xA3] + if mfg_year > 0 and mfg_month > 0 and mfg_day > 0: + result["mfg_date"] = f"{mfg_year:04d}-{mfg_month:02d}-{mfg_day:02d}" + + if len(octets) >= 0xA4 + 3: + # Parse manufacture time (3 bytes: hour, minute, second) at 0xA4 + mfg_hour = octets[0xA4] + mfg_minute = octets[0xA5] + mfg_second = octets[0xA6] + if mfg_hour < 24 and mfg_minute < 60 and mfg_second < 60: + result["mfg_time"] = f"{mfg_hour:02d}:{mfg_minute:02d}:{mfg_second:02d}" + + if len(octets) >= 0xA7 + 1: + # Parse spool core diameter (1 byte, mm) at 0xA7 + spool_core_diameter = octets[0xA7] + if spool_core_diameter > 0: + result["spool_core_diameter"] = spool_core_diameter + + if len(octets) >= 0xA8 + 1: + # Parse MFI temperature (1 byte, divided by 5) at 0xA8 + mfi_temp_raw = octets[0xA8] + if mfi_temp_raw > 0: + result["mfi_temp"] = mfi_temp_raw * 5 + + if len(octets) >= 0xA9 + 1: + # Parse MFI load (1 byte, divided by 10) at 0xA9 + mfi_load_raw = octets[0xA9] + if mfi_load_raw > 0: + result["mfi_load"] = mfi_load_raw * 10 + + if len(octets) >= 0xAA + 1: + # Parse MFI value (1 byte, divided by 10) at 0xAA + mfi_value_raw = octets[0xAA] + if mfi_value_raw > 0: + result["mfi_value"] = mfi_value_raw / 10.0 + + if len(octets) >= 0xAB + 1: + # Parse measured tolerance (1 byte, µm) at 0xAB + measured_tolerance = octets[0xAB] + if measured_tolerance > 0: + result["measured_tolerance"] = measured_tolerance + + if len(octets) >= 0xAC + 2: + # Parse empty spool weight (2 bytes, grams) at 0xAC + empty_spool_weight = int.from_bytes(octets[0xAC:0xAE], byteorder="big") + if 0 < empty_spool_weight < 65535: + result["empty_spool_weight"] = empty_spool_weight + + if len(octets) >= 0xAE + 2: + # Parse measured filament weight (2 bytes, grams) at 0xAE + measured_filament_weight = int.from_bytes( + octets[0xAE:0xB0], byteorder="big" + ) + if 0 < measured_filament_weight < 65535: + result["measured_filament_weight"] = measured_filament_weight + + if len(octets) >= 0xB0 + 2: + # Parse measured filament length (2 bytes, meters) at 0xB0 + measured_filament_length = int.from_bytes( + octets[0xB0:0xB2], byteorder="big" + ) + if 0 < measured_filament_length < 65535: + result["measured_filament_length"] = measured_filament_length + + if len(octets) >= 0xB2 + 2: + # Parse transmission distance (2 bytes, µm) at 0xB2 + transmission_distance = int.from_bytes(octets[0xB2:0xB4], byteorder="big") + if 0 < transmission_distance < 65535: + result["transmission_distance"] = transmission_distance + + if len(octets) >= 0xB4 + 1: + # Parse max dry temp (1 byte, divided by 5) at 0xB4 + max_dry_temp_raw = octets[0xB4] + if max_dry_temp_raw > 0: + result["max_dry_temp"] = max_dry_temp_raw * 5 + + if len(octets) >= 0xB5 + 1: + # Parse dry time (1 byte, hours) at 0xB5 + dry_time = octets[0xB5] + if dry_time > 0: + result["dry_time"] = dry_time + + if len(octets) >= 0xB6 + 1: + # Parse min print temp (1 byte, divided by 5) at 0xB6 + min_print_temp_raw = octets[0xB6] + if min_print_temp_raw > 0: + result["min_print_temp"] = min_print_temp_raw * 5 + + if len(octets) >= 0xB7 + 1: + # Parse max print temp (1 byte, divided by 5) at 0xB7 + max_print_temp_raw = octets[0xB7] + if max_print_temp_raw > 0: + result["max_print_temp"] = max_print_temp_raw * 5 + + if len(octets) >= 0xB8 + 1: + # Parse min volumetric speed (1 byte, mm³/s) at 0xB8 + min_vso = octets[0xB8] + if min_vso > 0: + result["min_volumetric_speed"] = min_vso + + if len(octets) >= 0xB9 + 1: + # Parse max volumetric speed (1 byte, mm³/s) at 0xB9 + max_vso = octets[0xB9] + if max_vso > 0: + result["max_volumetric_speed"] = max_vso + + if len(octets) >= 0xBA + 1: + # Parse target volumetric speed (1 byte, mm³/s) at 0xBA + target_vso = octets[0xBA] + if target_vso > 0: + result["target_volumetric_speed"] = target_vso + + return result + + # pylint: disable=too-many-locals,too-many-return-statements,too-many-branches,too-many-statements + def parse( + self, ndef_data: Any, identifier: str + ) -> Tuple[Optional[str], Optional[str]]: + """Parse OpenTag3D tag data and create/match entries in Spoolman + + Args: + ndef_data: NDEF data structure from the tag + identifier: Tag identifier string + + Returns: + Tuple of (spool_id, filament_id) as strings, or (None, None) if not found + """ + if ndef_data is None: + return None, None + + # Look for the OpenTag3D NDEF record with MIME type "application/opentag3d" + octets = None + try: + if hasattr(ndef_data, "records"): + for record in ndef_data.records: + # Check if this is an OpenTag3D MIME type record + if ( + hasattr(record, "type") + and record.type == "application/opentag3d" + ): + octets = record.data + logger.debug("Found OpenTag3D NDEF record") + break + except (AttributeError, TypeError) as ex: + logger.debug("Failed to find OpenTag3D NDEF record: %s", ex) + + if octets is None: + logger.debug("Could not find OpenTag3D NDEF record") + return None, None + + # Parse OpenTag3D data + tag_data = self._parse_opentag3d_data(octets) + if tag_data is None: + logger.debug("Not an OpenTag3D format tag") + return None, None + + logger.info( + "Parsed OpenTag3D tag: manufacturer=%s, material=%s, color=%s", + tag_data["manufacturer"], + tag_data["material_name"], + tag_data["color_name"], + ) + logger.debug("All data: %s", tag_data) + + # Generate filament name from template + filament_name = self._generate_filament_name(tag_data) + logger.info("Generated filament name from template: %s", filament_name) + + # Find or create vendor + vendor_id = self.spoolman_client.find_vendor_by_name(tag_data["manufacturer"]) + + if vendor_id is None: + logger.info("Creating new vendor: %s", tag_data["manufacturer"]) + vendor_id = self.spoolman_client.create_vendor(tag_data["manufacturer"]) + if vendor_id is None: + logger.error("Failed to create vendor") + return None, None + + # Find or create filament using vendor, material, and name + # Material is constructed from base_material and material_modifiers + material = tag_data["material_name"] + filament_id = self.spoolman_client.find_filament_by_vendor_material_and_name( + vendor_id, material, filament_name + ) + + if filament_id is None: + logger.info( + "Creating new filament: %s %s", tag_data["manufacturer"], filament_name + ) + + # Build base filament data with required fields + filament_data = { + "vendor_id": vendor_id, + "name": filament_name, + "material": material, + "density": tag_data["density"], + "diameter": tag_data["diameter_mm"], + "color_hex": tag_data["color_hex"], + } + + # Build multi_color_hexes if color_2_hex is present + if "color_2_hex" in tag_data: + multi_color_hexes = [tag_data["color_hex"], tag_data["color_2_hex"]] + if "color_3_hex" in tag_data: + multi_color_hexes.append(tag_data["color_3_hex"]) + if "color_4_hex" in tag_data: + multi_color_hexes.append(tag_data["color_4_hex"]) + filament_data["multi_color_hexes"] = multi_color_hexes + + # Apply field mapping from config + filament_data = self._apply_field_mapping( + tag_data, self.filament_field_mapping, filament_data + ) + + filament_id = self.spoolman_client.create_filament(filament_data) + if filament_id is None: + logger.error("Failed to create filament") + return None, None + + # Create spool with nfc_id + logger.info( + "Creating new spool for filament %s with nfc_id %s", filament_id, identifier + ) + + # Build base spool data + spool_data = { + "filament_id": filament_id, + } + + # Apply field mapping from config + spool_data = self._apply_field_mapping( + tag_data, self.spool_field_mapping, spool_data + ) + + # Add nfc_id to extra field + if "extra" not in spool_data: + spool_data["extra"] = {} + spool_data["extra"]["nfc_id"] = f'"{identifier.lower()}"' + + if "remaining_weight" not in spool_data: + if "measured_weight" in tag_data: + spool_data["remaining_weight"] = tag_data["measured_weight"] + + if "initial_weight" not in spool_data: + if "target_weight" in tag_data: + spool_data["initial_weight"] = tag_data["target_weight"] + + spool_id = self.spoolman_client.create_spool(spool_data) + + if spool_id is None: + logger.error("Failed to create spool") + return None, None + + logger.info( + "Successfully created spool %s and filament %s", spool_id, filament_id + ) + return str(spool_id), str(filament_id) diff --git a/lib/opentag3d_parser.py b/lib/opentag3d_parser.py deleted file mode 100644 index 64b2167..0000000 --- a/lib/opentag3d_parser.py +++ /dev/null @@ -1,505 +0,0 @@ -# SPDX-FileCopyrightText: 2024-2026 Sebastian Andersson -# SPDX-License-Identifier: GPL-3.0-or-later - -"""Tag parsers for different data formats""" - -import logging -import re -from typing import Any, Dict, Optional, Tuple - -logger: logging.Logger = logging.getLogger(__name__) - -# pylint: disable=too-few-public-methods - - -class OpenTag3DParser: - """Parser for OpenTag3D format tags - - Parses tags following the OpenTag3D specification from https://opentag3d.info/spec - Based on OpenTag3D spec version 0.012 - Extracts manufacturer, material, color, and other filament data, then creates - or matches entries in Spoolman. - """ - - def __init__( - self, - spoolman_client: Any, - filament_name_template: str, - filament_field_mapping: Dict[str, str], - spool_field_mapping: Dict[str, str], - ) -> None: - """Initialize with a Spoolman client instance - - Args: - spoolman_client: Client object with methods to interact with Spoolman API - filament_name_template: Template string for generating filament names from tag data - filament_field_mapping: Mapping from Spoolman filament fields to OpenTag3D fields - spool_field_mapping: Mapping from Spoolman spool fields to OpenTag3D fields - """ - self.spoolman_client = spoolman_client - self.filament_name_template = filament_name_template - self.filament_field_mapping = filament_field_mapping - self.spool_field_mapping = spool_field_mapping - - def _parse_rgba_to_hex(self, octets: bytes, offset: int) -> Optional[str]: - """Parse RGBA color at given offset and return hex string if not transparent black - - Args: - octets: Raw bytes from the NFC tag - offset: Byte offset to start reading RGBA (4 bytes) - - Returns: - Hex color string (e.g., "FF0000FF") or None if transparent black - """ - if len(octets) < offset + 4: - return None - - r = octets[offset] - g = octets[offset + 1] - b = octets[offset + 2] - a = octets[offset + 3] - - # Only return color if not transparent black (indicates no color) - if r == 0 and g == 0 and b == 0 and a == 0: - return None - - return f"{r:02x}{g:02x}{b:02x}{a:02x}" - - def _apply_field_mapping( - self, - tag_data: Dict[str, Any], - field_mapping: Dict[str, str], - base_data: Optional[Dict[str, Any]] = None, - ) -> Dict[str, Any]: - """Apply field mapping from OpenTag3D data to Spoolman fields - - Args: - tag_data: Parsed OpenTag3D tag data - field_mapping: Mapping from Spoolman fields to OpenTag3D fields - base_data: Optional base dictionary to start with - - Returns: - Dictionary with mapped fields ready for Spoolman API - """ - result = base_data.copy() if base_data else {} - - for spoolman_field, ot3d_field in field_mapping.items(): - if ot3d_field in tag_data: - value = tag_data[ot3d_field] - # Handle nested fields (e.g., "extra.custom_field") - if "." in spoolman_field: - parts = spoolman_field.split(".", 1) - parent_key = parts[0] - child_key = parts[1] - if parent_key not in result: - result[parent_key] = {} - result[parent_key][child_key] = value - else: - result[spoolman_field] = value - - return result - - def _generate_filament_name(self, tag_data: Dict[str, Any]) -> str: - """Generate filament name from template using tag data - - Args: - tag_data: Parsed OpenTag3D tag data - - Returns: - Formatted filament name - """ - # Use string formatting with the template - try: - # Simple approach: format with all fields, then clean up - name = self.filament_name_template.format(**tag_data) - # Clean up extra spaces - name = " ".join(name.split()) - # Clean up trailing/leading dashes and spaces around dashes - # Remove trailing dash (with optional spaces) - name = re.sub(r"\s*-\s*$", "", name) - # Remove leading dash (with optional spaces) - name = re.sub(r"^\s*-\s*", "", name) - # Clean up double spaces again after dash removal - name = " ".join(name.split()) - return name - except (KeyError, ValueError) as ex: - logger.warning("Template formatting error: %s, using fallback", ex) - # Fallback to simple material_name - return tag_data.get("material_name", "Unknown") - - # pylint: disable=too-many-locals,too-many-branches,too-many-statements - def _parse_opentag3d_data(self, octets: bytes) -> Optional[Dict[str, Any]]: - """Parse OpenTag3D format data from tag octets - - Args: - octets: Raw bytes from the NDEF record payload - - Returns: - Dictionary with parsed data, or None if not a valid OpenTag3D tag - """ - # OpenTag3D data starts at offset 0x00 (new spec v0.010+) - # Check if we have enough data - if len(octets) < 0x64: # Minimum size for core fields - return None - - # Parse tag version (2 bytes, big endian) at offset 0x00 - tag_version = int.from_bytes(octets[0x00:0x02], byteorder="big") - - logger.debug( - "Parsing OpenTag3D version %d.%02d tag", - tag_version // 256, - tag_version % 256, - ) - - if tag_version < 0x000C: - # Too old, ignoring - return None - - # Parse base material (5 bytes UTF-8) at offset 0x02 - material_base = ( - octets[0x02:0x07].decode("utf-8", errors="ignore").rstrip("\x00") - ) - - # Parse material modifiers (5 bytes UTF-8) at offset 0x07 - material_mod = octets[0x07:0x0C].decode("utf-8", errors="ignore").rstrip("\x00") - - # Parse manufacturer (16 bytes UTF-8) at offset 0x1B - manufacturer = octets[0x1B:0x2B].decode("utf-8", errors="ignore").rstrip("\x00") - - # Parse color name (32 bytes UTF-8) at offset 0x2B - color_name = octets[0x2B:0x4B].decode("utf-8", errors="ignore").rstrip("\x00") - - # Parse color 1 (4 bytes RGBA) at offset 0x4B - color_1_hex = self._parse_rgba_to_hex(octets, 0x4B) - - # Parse target diameter (2 bytes, µm) at offset 0x5C - target_diameter = int.from_bytes(octets[0x5C:0x5E], byteorder="big") - diameter_mm = target_diameter / 1000.0 - - # Parse target weight (2 bytes, grams) at offset 0x5E - target_weight = int.from_bytes(octets[0x5E:0x60], byteorder="big") - - # Parse print temperature (1 byte, divided by 5) at offset 0x60 - print_temp_raw = octets[0x60] - print_temp = print_temp_raw * 5 - - # Parse bed temperature (1 byte, divided by 5) at offset 0x61 - bed_temp_raw = octets[0x61] - bed_temp = bed_temp_raw * 5 - - # Parse density (2 bytes, µg/cm³) at offset 0x62 - density_raw = int.from_bytes(octets[0x62:0x64], byteorder="big") - density = density_raw / 1000.0 - - # Build material name - material_name = material_base - if material_mod: - material_name = f"{material_base}-{material_mod}" - - result = { - "tag_version": tag_version, - "manufacturer": manufacturer, - "material_name": material_name, - "material_base": material_base, - "material_mod": material_mod, - "color_name": color_name, - "color_hex": color_1_hex, - "diameter_mm": diameter_mm, - "target_weight": target_weight, - "print_temp": print_temp, - "bed_temp": bed_temp, - "density": density, - } - - # Parse color 2 at offset 0x50 - color_2_hex = self._parse_rgba_to_hex(octets, 0x50) - if color_2_hex: - result["color_2_hex"] = color_2_hex - - # Parse color 3 at offset 0x54 - color_3_hex = self._parse_rgba_to_hex(octets, 0x54) - if color_3_hex: - result["color_3_hex"] = color_3_hex - - # Parse color 4 at offset 0x58 - color_4_hex = self._parse_rgba_to_hex(octets, 0x58) - if color_4_hex: - result["color_4_hex"] = color_4_hex - - # Parse online data URL (32 bytes ASCII) at 0x70 - if len(octets) >= 0x70 + 32: - online_url = ( - octets[0x70:0x90].decode("ascii", errors="ignore").rstrip("\x00") - ) - if online_url: - result["online_data_url"] = online_url - - # Parse extended fields if available (NTAG215/216) - # Extended fields start at 0x90 - if len(octets) >= 0x90 + 16: - # Parse serial number / batch ID (16 bytes UTF-8) at 0x90 - serial = octets[0x90:0xA0].decode("utf-8", errors="ignore").rstrip("\x00") - if serial: - result["serial"] = serial - - if len(octets) >= 0xA0 + 4: - # Parse manufacture date (4 bytes: year, year, month, day) at 0xA0 - mfg_year = int.from_bytes(octets[0xA0:0xA2], byteorder="big") - mfg_month = octets[0xA2] - mfg_day = octets[0xA3] - if mfg_year > 0 and mfg_month > 0 and mfg_day > 0: - result["mfg_date"] = f"{mfg_year:04d}-{mfg_month:02d}-{mfg_day:02d}" - - if len(octets) >= 0xA4 + 3: - # Parse manufacture time (3 bytes: hour, minute, second) at 0xA4 - mfg_hour = octets[0xA4] - mfg_minute = octets[0xA5] - mfg_second = octets[0xA6] - if mfg_hour < 24 and mfg_minute < 60 and mfg_second < 60: - result["mfg_time"] = f"{mfg_hour:02d}:{mfg_minute:02d}:{mfg_second:02d}" - - if len(octets) >= 0xA7 + 1: - # Parse spool core diameter (1 byte, mm) at 0xA7 - spool_core_diameter = octets[0xA7] - if spool_core_diameter > 0: - result["spool_core_diameter"] = spool_core_diameter - - if len(octets) >= 0xA8 + 1: - # Parse MFI temperature (1 byte, divided by 5) at 0xA8 - mfi_temp_raw = octets[0xA8] - if mfi_temp_raw > 0: - result["mfi_temp"] = mfi_temp_raw * 5 - - if len(octets) >= 0xA9 + 1: - # Parse MFI load (1 byte, divided by 10) at 0xA9 - mfi_load_raw = octets[0xA9] - if mfi_load_raw > 0: - result["mfi_load"] = mfi_load_raw * 10 - - if len(octets) >= 0xAA + 1: - # Parse MFI value (1 byte, divided by 10) at 0xAA - mfi_value_raw = octets[0xAA] - if mfi_value_raw > 0: - result["mfi_value"] = mfi_value_raw / 10.0 - - if len(octets) >= 0xAB + 1: - # Parse measured tolerance (1 byte, µm) at 0xAB - measured_tolerance = octets[0xAB] - if measured_tolerance > 0: - result["measured_tolerance"] = measured_tolerance - - if len(octets) >= 0xAC + 2: - # Parse empty spool weight (2 bytes, grams) at 0xAC - empty_spool_weight = int.from_bytes(octets[0xAC:0xAE], byteorder="big") - if 0 < empty_spool_weight < 65535: - result["empty_spool_weight"] = empty_spool_weight - - if len(octets) >= 0xAE + 2: - # Parse measured filament weight (2 bytes, grams) at 0xAE - measured_filament_weight = int.from_bytes( - octets[0xAE:0xB0], byteorder="big" - ) - if 0 < measured_filament_weight < 65535: - result["measured_filament_weight"] = measured_filament_weight - - if len(octets) >= 0xB0 + 2: - # Parse measured filament length (2 bytes, meters) at 0xB0 - measured_filament_length = int.from_bytes( - octets[0xB0:0xB2], byteorder="big" - ) - if 0 < measured_filament_length < 65535: - result["measured_filament_length"] = measured_filament_length - - if len(octets) >= 0xB2 + 2: - # Parse transmission distance (2 bytes, µm) at 0xB2 - transmission_distance = int.from_bytes(octets[0xB2:0xB4], byteorder="big") - if 0 < transmission_distance < 65535: - result["transmission_distance"] = transmission_distance - - if len(octets) >= 0xB4 + 1: - # Parse max dry temp (1 byte, divided by 5) at 0xB4 - max_dry_temp_raw = octets[0xB4] - if max_dry_temp_raw > 0: - result["max_dry_temp"] = max_dry_temp_raw * 5 - - if len(octets) >= 0xB5 + 1: - # Parse dry time (1 byte, hours) at 0xB5 - dry_time = octets[0xB5] - if dry_time > 0: - result["dry_time"] = dry_time - - if len(octets) >= 0xB6 + 1: - # Parse min print temp (1 byte, divided by 5) at 0xB6 - min_print_temp_raw = octets[0xB6] - if min_print_temp_raw > 0: - result["min_print_temp"] = min_print_temp_raw * 5 - - if len(octets) >= 0xB7 + 1: - # Parse max print temp (1 byte, divided by 5) at 0xB7 - max_print_temp_raw = octets[0xB7] - if max_print_temp_raw > 0: - result["max_print_temp"] = max_print_temp_raw * 5 - - if len(octets) >= 0xB8 + 1: - # Parse min volumetric speed (1 byte, mm³/s) at 0xB8 - min_vso = octets[0xB8] - if min_vso > 0: - result["min_volumetric_speed"] = min_vso - - if len(octets) >= 0xB9 + 1: - # Parse max volumetric speed (1 byte, mm³/s) at 0xB9 - max_vso = octets[0xB9] - if max_vso > 0: - result["max_volumetric_speed"] = max_vso - - if len(octets) >= 0xBA + 1: - # Parse target volumetric speed (1 byte, mm³/s) at 0xBA - target_vso = octets[0xBA] - if target_vso > 0: - result["target_volumetric_speed"] = target_vso - - return result - - # pylint: disable=too-many-locals,too-many-return-statements,too-many-branches,too-many-statements - def parse( - self, ndef_data: Any, identifier: str - ) -> Tuple[Optional[str], Optional[str]]: - """Parse OpenTag3D tag data and create/match entries in Spoolman - - Args: - ndef_data: NDEF data structure from the tag - identifier: Tag identifier string - - Returns: - Tuple of (spool_id, filament_id) as strings, or (None, None) if not found - """ - if ndef_data is None: - return None, None - - # Look for the OpenTag3D NDEF record with MIME type "application/opentag3d" - octets = None - try: - if hasattr(ndef_data, "records"): - for record in ndef_data.records: - # Check if this is an OpenTag3D MIME type record - if ( - hasattr(record, "type") - and record.type == "application/opentag3d" - ): - octets = record.data - logger.debug("Found OpenTag3D NDEF record") - break - except (AttributeError, TypeError) as ex: - logger.debug("Failed to find OpenTag3D NDEF record: %s", ex) - - if octets is None: - logger.debug("Could not find OpenTag3D NDEF record") - return None, None - - # Parse OpenTag3D data - tag_data = self._parse_opentag3d_data(octets) - if tag_data is None: - logger.debug("Not an OpenTag3D format tag") - return None, None - - logger.info( - "Parsed OpenTag3D tag: manufacturer=%s, material=%s, color=%s", - tag_data["manufacturer"], - tag_data["material_name"], - tag_data["color_name"], - ) - logger.debug("All data: %s", tag_data) - - # Generate filament name from template - filament_name = self._generate_filament_name(tag_data) - logger.info("Generated filament name from template: %s", filament_name) - - # Find or create vendor - vendor_id = self.spoolman_client.find_vendor_by_name(tag_data["manufacturer"]) - - if vendor_id is None: - logger.info("Creating new vendor: %s", tag_data["manufacturer"]) - vendor_id = self.spoolman_client.create_vendor(tag_data["manufacturer"]) - if vendor_id is None: - logger.error("Failed to create vendor") - return None, None - - # Find or create filament using vendor, material, and name - # Material is constructed from base_material and material_modifiers - material = tag_data["material_name"] - filament_id = self.spoolman_client.find_filament_by_vendor_material_and_name( - vendor_id, material, filament_name - ) - - if filament_id is None: - logger.info( - "Creating new filament: %s %s", tag_data["manufacturer"], filament_name - ) - - # Build base filament data with required fields - filament_data = { - "vendor_id": vendor_id, - "name": filament_name, - "material": material, - "density": tag_data["density"], - "diameter": tag_data["diameter_mm"], - "color_hex": tag_data["color_hex"], - } - - # Build multi_color_hexes if color_2_hex is present - if "color_2_hex" in tag_data: - multi_color_hexes = [tag_data["color_hex"], tag_data["color_2_hex"]] - if "color_3_hex" in tag_data: - multi_color_hexes.append(tag_data["color_3_hex"]) - if "color_4_hex" in tag_data: - multi_color_hexes.append(tag_data["color_4_hex"]) - filament_data["multi_color_hexes"] = multi_color_hexes - - # Apply field mapping from config - filament_data = self._apply_field_mapping( - tag_data, self.filament_field_mapping, filament_data - ) - - filament_id = self.spoolman_client.create_filament(filament_data) - if filament_id is None: - logger.error("Failed to create filament") - return None, None - - # Create spool with nfc_id - logger.info( - "Creating new spool for filament %s with nfc_id %s", filament_id, identifier - ) - - # Build base spool data - spool_data = { - "filament_id": filament_id, - } - - # Apply field mapping from config - spool_data = self._apply_field_mapping( - tag_data, self.spool_field_mapping, spool_data - ) - - # Add nfc_id to extra field - if "extra" not in spool_data: - spool_data["extra"] = {} - spool_data["extra"]["nfc_id"] = f'"{identifier.lower()}"' - - if "remaining_weight" not in spool_data: - if "measured_weight" in tag_data: - spool_data["remaining_weight"] = tag_data["measured_weight"] - - if "initial_weight" not in spool_data: - if "target_weight" in tag_data: - spool_data["initial_weight"] = tag_data["target_weight"] - - spool_id = self.spoolman_client.create_spool(spool_data) - - if spool_id is None: - logger.error("Failed to create spool") - return None, None - - logger.info( - "Successfully created spool %s and filament %s", spool_id, filament_id - ) - return str(spool_id), str(filament_id) diff --git a/lib/spoolman_client.py b/lib/spoolman_client.py index eedc411..2b08641 100644 --- a/lib/spoolman_client.py +++ b/lib/spoolman_client.py @@ -7,7 +7,6 @@ import json import logging from typing import Any, Dict, List, Optional import requests -import base64 logger: logging.Logger = logging.getLogger(__name__) @@ -16,39 +15,15 @@ logger: logging.Logger = logging.getLogger(__name__) class SpoolmanClient: """Spoolman Web Client""" - def __init__( - self, - url: str, - http_username: Optional[str], - http_password: Optional[str], - http_headers: Optional[str], - ) -> None: + def __init__(self, url: str) -> None: if url.endswith("/"): url = url[:-1] self.url: str = url - self.http_headers = {} - if http_headers: - for c in http_headers.split(";"): - c = c.strip() - if not c: - continue - c_parts = c.split(":", 1) - if len(c_parts) != 2: - raise Exception( - f"Section [spoolman], Option http_headers: {c}: Invalid header format" - ) - self.http_headers[c_parts[0]] = c_parts[1] - - if http_username and http_password: - creds = base64.b64encode( - f"{http_username}:{http_password}".encode() - ).decode() - self.http_headers["Authorization"] = "Basic " + creds def get_spool(self, spool_id: int) -> Dict[str, Any]: """Get the spool from Spoolman""" url: str = self.url + f"/api/v1/spool/{spool_id}" - response = requests.get(url, timeout=10, headers=self.http_headers) + response = requests.get(url, timeout=10) if response.status_code != 200: raise ValueError(f"Request to spoolman failed: {response}") return response.json() @@ -56,7 +31,7 @@ class SpoolmanClient: def get_spools(self) -> List[Dict[str, Any]]: """Get the spools from spoolman""" url: str = self.url + "/api/v1/spool" - response = requests.get(url, timeout=10, headers=self.http_headers) + response = requests.get(url, timeout=10) if response.status_code != 200: raise ValueError(f"Request to spoolman failed: {response}") records: List[Dict[str, Any]] = json.loads(response.text) @@ -86,9 +61,7 @@ class SpoolmanClient: extra["nfc_id"] = '""' url: str = self.url + f"/api/v1/spool/{spool_id}" - response = requests.patch( - url, timeout=10, headers=self.http_headers, json={"extra": extra} - ) + response = requests.patch(url, timeout=10, json={"extra": extra}) if response.status_code != 200: raise ValueError(f"Request to spoolman failed: {response}: {response.text}") @@ -112,9 +85,7 @@ class SpoolmanClient: extra["nfc_id"] = nfc_id url: str = self.url + f"/api/v1/spool/{spool_id}" - response = requests.patch( - url, timeout=10, headers=self.http_headers, json={"extra": extra} - ) + response = requests.patch(url, timeout=10, json={"extra": extra}) if response.status_code != 200: raise ValueError(f"Request to spoolman failed: {response}: {response.text}") @@ -131,7 +102,7 @@ class SpoolmanClient: """ try: url: str = self.url + "/api/v1/vendor" - response = requests.get(url, timeout=10, headers=self.http_headers) + response = requests.get(url, timeout=10) if response.status_code != 200: logger.error( "Failed to find vendor '%s': HTTP %d - %s", @@ -165,9 +136,7 @@ class SpoolmanClient: try: url: str = self.url + "/api/v1/vendor" data = {"name": name} - response = requests.post( - url, json=data, timeout=10, headers=self.http_headers - ) + response = requests.post(url, json=data, timeout=10) if response.status_code not in (200, 201): logger.error( "Failed to create vendor '%s': HTTP %d - %s", @@ -198,9 +167,7 @@ class SpoolmanClient: try: url: str = self.url + "/api/v1/filament" params = {"vendor_id": vendor_id} - response = requests.get( - url, params=params, timeout=10, headers=self.http_headers - ) + response = requests.get(url, params=params, timeout=10) if response.status_code != 200: logger.error( "Failed to find filament '%s' for vendor %d: HTTP %d - %s", @@ -244,9 +211,7 @@ class SpoolmanClient: try: url: str = self.url + "/api/v1/filament" params = {"vendor_id": vendor_id} - response = requests.get( - url, params=params, timeout=10, headers=self.http_headers - ) + response = requests.get(url, params=params, timeout=10) if response.status_code != 200: logger.error( "Failed to find filament '%s' (material: %s) for vendor %d: HTTP %d - %s", @@ -297,9 +262,7 @@ class SpoolmanClient: try: url: str = self.url + "/api/v1/filament" logger.debug("Creating new filament: %s", data) - response = requests.post( - url, json=data, timeout=10, headers=self.http_headers - ) + response = requests.post(url, json=data, timeout=10) if response.status_code not in (200, 201): logger.error( "Failed to create filament: HTTP %d - %s", @@ -331,9 +294,7 @@ class SpoolmanClient: try: url: str = self.url + "/api/v1/spool" logger.debug("Creating new spool: %s", data) - response = requests.post( - url, json=data, timeout=10, headers=self.http_headers - ) + response = requests.post(url, json=data, timeout=10) if response.status_code not in (200, 201): logger.error( "Failed to create spool: HTTP %d - %s", diff --git a/nfc2klipper.cfg b/nfc2klipper.cfg index 4a5b1ec..1a464be 100644 --- a/nfc2klipper.cfg +++ b/nfc2klipper.cfg @@ -22,8 +22,6 @@ nfc-device = "tty:AMA0" [spoolman] # URL for the spoolman installation spoolman-url = "http://mainsailos.local:7912" -# http_username = "" -# http_password = "" [moonraker] # URL for the moonraker installation diff --git a/nfc2klipper.py b/nfc2klipper.py index 0a1d28c..bc1738a 100755 --- a/nfc2klipper.py +++ b/nfc2klipper.py @@ -99,7 +99,7 @@ if __name__ == "__main__": try: api_cmd = [sys.executable, api_script] if parsed_args.config_dir: - os.environ["NFC2KLIPPER_CONFIG_DIR"] = parsed_args.config_dir + api_cmd.extend(["-c", parsed_args.config_dir]) # pylint: disable=consider-using-with api_process = subprocess.Popen(api_cmd) # nosec diff --git a/nfc2klipper_api.py b/nfc2klipper_api.py index bb1d9be..a821170 100755 --- a/nfc2klipper_api.py +++ b/nfc2klipper_api.py @@ -14,6 +14,7 @@ # pylint: disable=duplicate-code +import argparse import os import sys from typing import Any, Dict, Optional, Tuple, Union @@ -22,10 +23,23 @@ from flask import Flask, render_template from lib.config import Nfc2KlipperConfig from lib.ipc import IPCClient + Nfc2KlipperConfig.configure_logging() -cfg_path = os.environ.get("NFC2KLIPPER_CFG_DIR", Nfc2KlipperConfig.CFG_DIR) -args: Optional[Dict[str, Any]] = Nfc2KlipperConfig.get_config(cfg_path) +# Parse command line arguments +# pylint: disable=duplicate-code +parser = argparse.ArgumentParser(description="Web API service for nfc2klipper.") +parser.add_argument( + "-c", + "--config-dir", + metavar="DIR", + default=None, + help=f"Configuration directory (default: {Nfc2KlipperConfig.CFG_DIR})", +) +parsed_args = parser.parse_args() + +args: Optional[Dict[str, Any]] = Nfc2KlipperConfig.get_config(parsed_args.config_dir) +# pylint: enable=duplicate-code if not args: print( diff --git a/nfc2klipper_backend.py b/nfc2klipper_backend.py index 2fd7898..3b7efa7 100755 --- a/nfc2klipper_backend.py +++ b/nfc2klipper_backend.py @@ -25,8 +25,7 @@ from lib.config import Nfc2KlipperConfig from lib.ipc import IPCServer from lib.moonraker_web_client import MoonrakerWebClient from lib.nfc_handler import NfcHandler -from lib.nfc_parsers import NdefTextParser, TagIdentifierParser -from lib.opentag3d_parser import OpenTag3DParser +from lib.nfc_parsers import NdefTextParser, TagIdentifierParser, OpenTag3DParser from lib.spoolman_client import SpoolmanClient Nfc2KlipperConfig.configure_logging() @@ -101,12 +100,7 @@ if USE_MOCK_OBJECTS: args["nfc"]["nfc-device"] ) else: - spoolman = SpoolmanClient( - args["spoolman"]["spoolman-url"], - args["spoolman"].get("http_username"), - args["spoolman"].get("http_password"), - args["spoolman"].get("http_headers"), - ) + spoolman = SpoolmanClient(args["spoolman"]["spoolman-url"]) moonraker = MoonrakerWebClient( args["moonraker"]["moonraker-url"], setting_gcode_template, @@ -245,6 +239,7 @@ def on_nfc_tag_present(ndef_data: Any, identifier: str) -> None: def on_nfc_no_tag_present() -> None: """Called when no tag is present (or tag without data)""" if should_clear_spool(): + set_spool_and_filament(0, 0) diff --git a/requirements.txt b/requirements.txt index 5cb0873..791fd88 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,9 @@ flask==3.0.3 toml==0.10.2 nfcpy==1.0.4 -npyscreen==5.0.2 +npyscreen==4.10.5 requests==2.32.5 urllib3>=2.6.0 -Gunicorn==25.1.0 +Gunicorn==23.0.0 types-toml==0.10.8.20240310 types-requests==2.32.4.20260107 diff --git a/write_tags.py b/write_tags.py index 351f370..08d42d2 100755 --- a/write_tags.py +++ b/write_tags.py @@ -14,8 +14,6 @@ import nfc import npyscreen import requests -from lib.config import Nfc2KlipperConfig -from lib.spoolman_client import SpoolmanClient SPOOL = "SPOOL" FILAMENT = "FILAMENT" @@ -28,19 +26,26 @@ parser = argparse.ArgumentParser( parser.add_argument("--version", action="version", version="%(prog)s 0.0.1") # pylint: disable=R0801 -parser.add_argument("-c", "--config", metavar="config", help="Config directory") +parser.add_argument( + "-d", + "--nfc-device", + metavar="device", + default="ttyAMA0", + help="Which NFC reader to use, see " + + "https://nfcpy.readthedocs.io/en/latest/topics/get-started.html#open-a-local-device" + + " for format", +) + +parser.add_argument( + "-u", + "--url", + metavar="URL", + default="http://mainsailos.local:7912", + help="URL for the Spoolman installation", +) args = parser.parse_args() -config = Nfc2KlipperConfig.get_config(args.config) -assert config, "No config found" - -spoolman = SpoolmanClient( - config["spoolman"]["spoolman-url"], - config["spoolman"].get("http_username"), - config["spoolman"].get("http_password"), - config["spoolman"].get("http_headers"), -) def record_to_text(record): @@ -72,7 +77,9 @@ class PostSelectForm(npyscreen.FormBaseNew): when_pressed_function=self.exit_app, ) - self.records = spoolman.get_spools() + url = args.url + "/api/v1/spool" + records = requests.get(url, timeout=10) + self.records = json.loads(records.text) self.records = sorted(self.records, key=lambda x: x["id"], reverse=True) self.posts = self.add( @@ -118,7 +125,7 @@ class TagWritingApp(npyscreen.NPSAppManaged): self.status = "Written" - clf = nfc.ContactlessFrontend(config["nfc"]["nfc-device"]) + clf = nfc.ContactlessFrontend(args.nfc_device) clf.connect( rdwr={"on-connect": lambda tag: self.on_nfc_connect(tag, spool, filament)} )