# SPDX-FileCopyrightText: 2024-2026 Sebastian Andersson # SPDX-License-Identifier: GPL-3.0-or-later """Tag parsers for different data formats""" import logging import re from typing import Any, Dict, Optional, Tuple logger: logging.Logger = logging.getLogger(__name__) # pylint: disable=too-few-public-methods class OpenTag3DParser: """Parser for OpenTag3D format tags Parses tags following the OpenTag3D specification from https://opentag3d.info/spec Based on OpenTag3D spec version 0.012 Extracts manufacturer, material, color, and other filament data, then creates or matches entries in Spoolman. """ def __init__( self, spoolman_client: Any, filament_name_template: str, filament_field_mapping: Dict[str, str], spool_field_mapping: Dict[str, str], ) -> None: """Initialize with a Spoolman client instance Args: spoolman_client: Client object with methods to interact with Spoolman API filament_name_template: Template string for generating filament names from tag data filament_field_mapping: Mapping from Spoolman filament fields to OpenTag3D fields spool_field_mapping: Mapping from Spoolman spool fields to OpenTag3D fields """ self.spoolman_client = spoolman_client self.filament_name_template = filament_name_template self.filament_field_mapping = filament_field_mapping self.spool_field_mapping = spool_field_mapping def _parse_rgba_to_hex(self, octets: bytes, offset: int) -> Optional[str]: """Parse RGBA color at given offset and return hex string if not transparent black Args: octets: Raw bytes from the NFC tag offset: Byte offset to start reading RGBA (4 bytes) Returns: Hex color string (e.g., "FF0000FF") or None if transparent black """ if len(octets) < offset + 4: return None r = octets[offset] g = octets[offset + 1] b = octets[offset + 2] a = octets[offset + 3] # Only return color if not transparent black (indicates no color) if r == 0 and g == 0 and b == 0 and a == 0: return None return f"{r:02x}{g:02x}{b:02x}{a:02x}" def _apply_field_mapping( self, tag_data: Dict[str, Any], field_mapping: Dict[str, str], base_data: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """Apply field mapping from OpenTag3D data to Spoolman fields Args: tag_data: Parsed OpenTag3D tag data field_mapping: Mapping from Spoolman fields to OpenTag3D fields base_data: Optional base dictionary to start with Returns: Dictionary with mapped fields ready for Spoolman API """ result = base_data.copy() if base_data else {} for spoolman_field, ot3d_field in field_mapping.items(): if ot3d_field in tag_data: value = tag_data[ot3d_field] # Handle nested fields (e.g., "extra.custom_field") if "." in spoolman_field: parts = spoolman_field.split(".", 1) parent_key = parts[0] child_key = parts[1] if parent_key not in result: result[parent_key] = {} result[parent_key][child_key] = value else: result[spoolman_field] = value return result def _generate_filament_name(self, tag_data: Dict[str, Any]) -> str: """Generate filament name from template using tag data Args: tag_data: Parsed OpenTag3D tag data Returns: Formatted filament name """ # Use string formatting with the template try: # Simple approach: format with all fields, then clean up name = self.filament_name_template.format(**tag_data) # Clean up extra spaces name = " ".join(name.split()) # Clean up trailing/leading dashes and spaces around dashes # Remove trailing dash (with optional spaces) name = re.sub(r"\s*-\s*$", "", name) # Remove leading dash (with optional spaces) name = re.sub(r"^\s*-\s*", "", name) # Clean up double spaces again after dash removal name = " ".join(name.split()) return name except (KeyError, ValueError) as ex: logger.warning("Template formatting error: %s, using fallback", ex) # Fallback to simple material_name return tag_data.get("material_name", "Unknown") # pylint: disable=too-many-locals,too-many-branches,too-many-statements def _parse_opentag3d_data(self, octets: bytes) -> Optional[Dict[str, Any]]: """Parse OpenTag3D format data from tag octets Args: octets: Raw bytes from the NDEF record payload Returns: Dictionary with parsed data, or None if not a valid OpenTag3D tag """ # OpenTag3D data starts at offset 0x00 (new spec v0.010+) # Check if we have enough data if len(octets) < 0x64: # Minimum size for core fields return None # Parse tag version (2 bytes, big endian) at offset 0x00 tag_version = int.from_bytes(octets[0x00:0x02], byteorder="big") logger.debug( "Parsing OpenTag3D version %d.%02d tag", tag_version // 256, tag_version % 256, ) if tag_version < 0x000C: # Too old, ignoring return None # Parse base material (5 bytes UTF-8) at offset 0x02 material_base = ( octets[0x02:0x07].decode("utf-8", errors="ignore").rstrip("\x00") ) # Parse material modifiers (5 bytes UTF-8) at offset 0x07 material_mod = octets[0x07:0x0C].decode("utf-8", errors="ignore").rstrip("\x00") # Parse manufacturer (16 bytes UTF-8) at offset 0x1B manufacturer = octets[0x1B:0x2B].decode("utf-8", errors="ignore").rstrip("\x00") # Parse color name (32 bytes UTF-8) at offset 0x2B color_name = octets[0x2B:0x4B].decode("utf-8", errors="ignore").rstrip("\x00") # Parse color 1 (4 bytes RGBA) at offset 0x4B color_1_hex = self._parse_rgba_to_hex(octets, 0x4B) # Parse target diameter (2 bytes, µm) at offset 0x5C target_diameter = int.from_bytes(octets[0x5C:0x5E], byteorder="big") diameter_mm = target_diameter / 1000.0 # Parse target weight (2 bytes, grams) at offset 0x5E target_weight = int.from_bytes(octets[0x5E:0x60], byteorder="big") # Parse print temperature (1 byte, divided by 5) at offset 0x60 print_temp_raw = octets[0x60] print_temp = print_temp_raw * 5 # Parse bed temperature (1 byte, divided by 5) at offset 0x61 bed_temp_raw = octets[0x61] bed_temp = bed_temp_raw * 5 # Parse density (2 bytes, µg/cm³) at offset 0x62 density_raw = int.from_bytes(octets[0x62:0x64], byteorder="big") density = density_raw / 1000.0 # Build material name material_name = material_base if material_mod: material_name = f"{material_base}-{material_mod}" result = { "tag_version": tag_version, "manufacturer": manufacturer, "material_name": material_name, "material_base": material_base, "material_mod": material_mod, "color_name": color_name, "color_hex": color_1_hex, "diameter_mm": diameter_mm, "target_weight": target_weight, "print_temp": print_temp, "bed_temp": bed_temp, "density": density, } # Parse color 2 at offset 0x50 color_2_hex = self._parse_rgba_to_hex(octets, 0x50) if color_2_hex: result["color_2_hex"] = color_2_hex # Parse color 3 at offset 0x54 color_3_hex = self._parse_rgba_to_hex(octets, 0x54) if color_3_hex: result["color_3_hex"] = color_3_hex # Parse color 4 at offset 0x58 color_4_hex = self._parse_rgba_to_hex(octets, 0x58) if color_4_hex: result["color_4_hex"] = color_4_hex # Parse online data URL (32 bytes ASCII) at 0x70 if len(octets) >= 0x70 + 32: online_url = ( octets[0x70:0x90].decode("ascii", errors="ignore").rstrip("\x00") ) if online_url: result["online_data_url"] = online_url # Parse extended fields if available (NTAG215/216) # Extended fields start at 0x90 if len(octets) >= 0x90 + 16: # Parse serial number / batch ID (16 bytes UTF-8) at 0x90 serial = octets[0x90:0xA0].decode("utf-8", errors="ignore").rstrip("\x00") if serial: result["serial"] = serial if len(octets) >= 0xA0 + 4: # Parse manufacture date (4 bytes: year, year, month, day) at 0xA0 mfg_year = int.from_bytes(octets[0xA0:0xA2], byteorder="big") mfg_month = octets[0xA2] mfg_day = octets[0xA3] if mfg_year > 0 and mfg_month > 0 and mfg_day > 0: result["mfg_date"] = f"{mfg_year:04d}-{mfg_month:02d}-{mfg_day:02d}" if len(octets) >= 0xA4 + 3: # Parse manufacture time (3 bytes: hour, minute, second) at 0xA4 mfg_hour = octets[0xA4] mfg_minute = octets[0xA5] mfg_second = octets[0xA6] if mfg_hour < 24 and mfg_minute < 60 and mfg_second < 60: result["mfg_time"] = f"{mfg_hour:02d}:{mfg_minute:02d}:{mfg_second:02d}" if len(octets) >= 0xA7 + 1: # Parse spool core diameter (1 byte, mm) at 0xA7 spool_core_diameter = octets[0xA7] if spool_core_diameter > 0: result["spool_core_diameter"] = spool_core_diameter if len(octets) >= 0xA8 + 1: # Parse MFI temperature (1 byte, divided by 5) at 0xA8 mfi_temp_raw = octets[0xA8] if mfi_temp_raw > 0: result["mfi_temp"] = mfi_temp_raw * 5 if len(octets) >= 0xA9 + 1: # Parse MFI load (1 byte, divided by 10) at 0xA9 mfi_load_raw = octets[0xA9] if mfi_load_raw > 0: result["mfi_load"] = mfi_load_raw * 10 if len(octets) >= 0xAA + 1: # Parse MFI value (1 byte, divided by 10) at 0xAA mfi_value_raw = octets[0xAA] if mfi_value_raw > 0: result["mfi_value"] = mfi_value_raw / 10.0 if len(octets) >= 0xAB + 1: # Parse measured tolerance (1 byte, µm) at 0xAB measured_tolerance = octets[0xAB] if measured_tolerance > 0: result["measured_tolerance"] = measured_tolerance if len(octets) >= 0xAC + 2: # Parse empty spool weight (2 bytes, grams) at 0xAC empty_spool_weight = int.from_bytes(octets[0xAC:0xAE], byteorder="big") if 0 < empty_spool_weight < 65535: result["empty_spool_weight"] = empty_spool_weight if len(octets) >= 0xAE + 2: # Parse measured filament weight (2 bytes, grams) at 0xAE measured_filament_weight = int.from_bytes( octets[0xAE:0xB0], byteorder="big" ) if 0 < measured_filament_weight < 65535: result["measured_filament_weight"] = measured_filament_weight if len(octets) >= 0xB0 + 2: # Parse measured filament length (2 bytes, meters) at 0xB0 measured_filament_length = int.from_bytes( octets[0xB0:0xB2], byteorder="big" ) if 0 < measured_filament_length < 65535: result["measured_filament_length"] = measured_filament_length if len(octets) >= 0xB2 + 2: # Parse transmission distance (2 bytes, µm) at 0xB2 transmission_distance = int.from_bytes(octets[0xB2:0xB4], byteorder="big") if 0 < transmission_distance < 65535: result["transmission_distance"] = transmission_distance if len(octets) >= 0xB4 + 1: # Parse max dry temp (1 byte, divided by 5) at 0xB4 max_dry_temp_raw = octets[0xB4] if max_dry_temp_raw > 0: result["max_dry_temp"] = max_dry_temp_raw * 5 if len(octets) >= 0xB5 + 1: # Parse dry time (1 byte, hours) at 0xB5 dry_time = octets[0xB5] if dry_time > 0: result["dry_time"] = dry_time if len(octets) >= 0xB6 + 1: # Parse min print temp (1 byte, divided by 5) at 0xB6 min_print_temp_raw = octets[0xB6] if min_print_temp_raw > 0: result["min_print_temp"] = min_print_temp_raw * 5 if len(octets) >= 0xB7 + 1: # Parse max print temp (1 byte, divided by 5) at 0xB7 max_print_temp_raw = octets[0xB7] if max_print_temp_raw > 0: result["max_print_temp"] = max_print_temp_raw * 5 if len(octets) >= 0xB8 + 1: # Parse min volumetric speed (1 byte, mm³/s) at 0xB8 min_vso = octets[0xB8] if min_vso > 0: result["min_volumetric_speed"] = min_vso if len(octets) >= 0xB9 + 1: # Parse max volumetric speed (1 byte, mm³/s) at 0xB9 max_vso = octets[0xB9] if max_vso > 0: result["max_volumetric_speed"] = max_vso if len(octets) >= 0xBA + 1: # Parse target volumetric speed (1 byte, mm³/s) at 0xBA target_vso = octets[0xBA] if target_vso > 0: result["target_volumetric_speed"] = target_vso return result # pylint: disable=too-many-locals,too-many-return-statements,too-many-branches,too-many-statements def parse( self, ndef_data: Any, identifier: str ) -> Tuple[Optional[str], Optional[str]]: """Parse OpenTag3D tag data and create/match entries in Spoolman Args: ndef_data: NDEF data structure from the tag identifier: Tag identifier string Returns: Tuple of (spool_id, filament_id) as strings, or (None, None) if not found """ if ndef_data is None: return None, None # Look for the OpenTag3D NDEF record with MIME type "application/opentag3d" octets = None try: if hasattr(ndef_data, "records"): for record in ndef_data.records: # Check if this is an OpenTag3D MIME type record if ( hasattr(record, "type") and record.type == "application/opentag3d" ): octets = record.data logger.debug("Found OpenTag3D NDEF record") break except (AttributeError, TypeError) as ex: logger.debug("Failed to find OpenTag3D NDEF record: %s", ex) if octets is None: logger.debug("Could not find OpenTag3D NDEF record") return None, None # Parse OpenTag3D data tag_data = self._parse_opentag3d_data(octets) if tag_data is None: logger.debug("Not an OpenTag3D format tag") return None, None logger.info( "Parsed OpenTag3D tag: manufacturer=%s, material=%s, color=%s", tag_data["manufacturer"], tag_data["material_name"], tag_data["color_name"], ) logger.debug("All data: %s", tag_data) # Generate filament name from template filament_name = self._generate_filament_name(tag_data) logger.info("Generated filament name from template: %s", filament_name) # Find or create vendor vendor_id = self.spoolman_client.find_vendor_by_name(tag_data["manufacturer"]) if vendor_id is None: logger.info("Creating new vendor: %s", tag_data["manufacturer"]) vendor_id = self.spoolman_client.create_vendor(tag_data["manufacturer"]) if vendor_id is None: logger.error("Failed to create vendor") return None, None # Find or create filament using vendor, material, and name # Material is constructed from base_material and material_modifiers material = tag_data["material_name"] filament_id = self.spoolman_client.find_filament_by_vendor_material_and_name( vendor_id, material, filament_name ) if filament_id is None: logger.info( "Creating new filament: %s %s", tag_data["manufacturer"], filament_name ) # Build base filament data with required fields filament_data = { "vendor_id": vendor_id, "name": filament_name, "material": material, "density": tag_data["density"], "diameter": tag_data["diameter_mm"], "color_hex": tag_data["color_hex"], } # Build multi_color_hexes if color_2_hex is present if "color_2_hex" in tag_data: multi_color_hexes = [tag_data["color_hex"], tag_data["color_2_hex"]] if "color_3_hex" in tag_data: multi_color_hexes.append(tag_data["color_3_hex"]) if "color_4_hex" in tag_data: multi_color_hexes.append(tag_data["color_4_hex"]) filament_data["multi_color_hexes"] = multi_color_hexes # Apply field mapping from config filament_data = self._apply_field_mapping( tag_data, self.filament_field_mapping, filament_data ) filament_id = self.spoolman_client.create_filament(filament_data) if filament_id is None: logger.error("Failed to create filament") return None, None # Create spool with nfc_id logger.info( "Creating new spool for filament %s with nfc_id %s", filament_id, identifier ) # Build base spool data spool_data = { "filament_id": filament_id, } # Apply field mapping from config spool_data = self._apply_field_mapping( tag_data, self.spool_field_mapping, spool_data ) # Add nfc_id to extra field if "extra" not in spool_data: spool_data["extra"] = {} spool_data["extra"]["nfc_id"] = f'"{identifier.lower()}"' if "remaining_weight" not in spool_data: if "measured_weight" in tag_data: spool_data["remaining_weight"] = tag_data["measured_weight"] if "initial_weight" not in spool_data: if "target_weight" in tag_data: spool_data["initial_weight"] = tag_data["target_weight"] spool_id = self.spoolman_client.create_spool(spool_data) if spool_id is None: logger.error("Failed to create spool") return None, None logger.info( "Successfully created spool %s and filament %s", spool_id, filament_id ) return str(spool_id), str(filament_id)