nfc2klipper/lib/opentag3d_parser.py
2026-01-13 11:08:21 +01:00

505 lines
19 KiB
Python

# SPDX-FileCopyrightText: 2024-2026 Sebastian Andersson <sebastian@bittr.nu>
# SPDX-License-Identifier: GPL-3.0-or-later
"""Tag parsers for different data formats"""
import logging
import re
from typing import Any, Dict, Optional, Tuple
logger: logging.Logger = logging.getLogger(__name__)
# pylint: disable=too-few-public-methods
class OpenTag3DParser:
"""Parser for OpenTag3D format tags
Parses tags following the OpenTag3D specification from https://opentag3d.info/spec
Based on OpenTag3D spec version 0.012
Extracts manufacturer, material, color, and other filament data, then creates
or matches entries in Spoolman.
"""
def __init__(
self,
spoolman_client: Any,
filament_name_template: str,
filament_field_mapping: Dict[str, str],
spool_field_mapping: Dict[str, str],
) -> None:
"""Initialize with a Spoolman client instance
Args:
spoolman_client: Client object with methods to interact with Spoolman API
filament_name_template: Template string for generating filament names from tag data
filament_field_mapping: Mapping from Spoolman filament fields to OpenTag3D fields
spool_field_mapping: Mapping from Spoolman spool fields to OpenTag3D fields
"""
self.spoolman_client = spoolman_client
self.filament_name_template = filament_name_template
self.filament_field_mapping = filament_field_mapping
self.spool_field_mapping = spool_field_mapping
def _parse_rgba_to_hex(self, octets: bytes, offset: int) -> Optional[str]:
"""Parse RGBA color at given offset and return hex string if not transparent black
Args:
octets: Raw bytes from the NFC tag
offset: Byte offset to start reading RGBA (4 bytes)
Returns:
Hex color string (e.g., "FF0000FF") or None if transparent black
"""
if len(octets) < offset + 4:
return None
r = octets[offset]
g = octets[offset + 1]
b = octets[offset + 2]
a = octets[offset + 3]
# Only return color if not transparent black (indicates no color)
if r == 0 and g == 0 and b == 0 and a == 0:
return None
return f"{r:02x}{g:02x}{b:02x}{a:02x}"
def _apply_field_mapping(
self,
tag_data: Dict[str, Any],
field_mapping: Dict[str, str],
base_data: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Apply field mapping from OpenTag3D data to Spoolman fields
Args:
tag_data: Parsed OpenTag3D tag data
field_mapping: Mapping from Spoolman fields to OpenTag3D fields
base_data: Optional base dictionary to start with
Returns:
Dictionary with mapped fields ready for Spoolman API
"""
result = base_data.copy() if base_data else {}
for spoolman_field, ot3d_field in field_mapping.items():
if ot3d_field in tag_data:
value = tag_data[ot3d_field]
# Handle nested fields (e.g., "extra.custom_field")
if "." in spoolman_field:
parts = spoolman_field.split(".", 1)
parent_key = parts[0]
child_key = parts[1]
if parent_key not in result:
result[parent_key] = {}
result[parent_key][child_key] = value
else:
result[spoolman_field] = value
return result
def _generate_filament_name(self, tag_data: Dict[str, Any]) -> str:
"""Generate filament name from template using tag data
Args:
tag_data: Parsed OpenTag3D tag data
Returns:
Formatted filament name
"""
# Use string formatting with the template
try:
# Simple approach: format with all fields, then clean up
name = self.filament_name_template.format(**tag_data)
# Clean up extra spaces
name = " ".join(name.split())
# Clean up trailing/leading dashes and spaces around dashes
# Remove trailing dash (with optional spaces)
name = re.sub(r"\s*-\s*$", "", name)
# Remove leading dash (with optional spaces)
name = re.sub(r"^\s*-\s*", "", name)
# Clean up double spaces again after dash removal
name = " ".join(name.split())
return name
except (KeyError, ValueError) as ex:
logger.warning("Template formatting error: %s, using fallback", ex)
# Fallback to simple material_name
return tag_data.get("material_name", "Unknown")
# pylint: disable=too-many-locals,too-many-branches,too-many-statements
def _parse_opentag3d_data(self, octets: bytes) -> Optional[Dict[str, Any]]:
"""Parse OpenTag3D format data from tag octets
Args:
octets: Raw bytes from the NDEF record payload
Returns:
Dictionary with parsed data, or None if not a valid OpenTag3D tag
"""
# OpenTag3D data starts at offset 0x00 (new spec v0.010+)
# Check if we have enough data
if len(octets) < 0x64: # Minimum size for core fields
return None
# Parse tag version (2 bytes, big endian) at offset 0x00
tag_version = int.from_bytes(octets[0x00:0x02], byteorder="big")
logger.debug(
"Parsing OpenTag3D version %d.%02d tag",
tag_version // 256,
tag_version % 256,
)
if tag_version < 0x000C:
# Too old, ignoring
return None
# Parse base material (5 bytes UTF-8) at offset 0x02
material_base = (
octets[0x02:0x07].decode("utf-8", errors="ignore").rstrip("\x00")
)
# Parse material modifiers (5 bytes UTF-8) at offset 0x07
material_mod = octets[0x07:0x0C].decode("utf-8", errors="ignore").rstrip("\x00")
# Parse manufacturer (16 bytes UTF-8) at offset 0x1B
manufacturer = octets[0x1B:0x2B].decode("utf-8", errors="ignore").rstrip("\x00")
# Parse color name (32 bytes UTF-8) at offset 0x2B
color_name = octets[0x2B:0x4B].decode("utf-8", errors="ignore").rstrip("\x00")
# Parse color 1 (4 bytes RGBA) at offset 0x4B
color_1_hex = self._parse_rgba_to_hex(octets, 0x4B)
# Parse target diameter (2 bytes, µm) at offset 0x5C
target_diameter = int.from_bytes(octets[0x5C:0x5E], byteorder="big")
diameter_mm = target_diameter / 1000.0
# Parse target weight (2 bytes, grams) at offset 0x5E
target_weight = int.from_bytes(octets[0x5E:0x60], byteorder="big")
# Parse print temperature (1 byte, divided by 5) at offset 0x60
print_temp_raw = octets[0x60]
print_temp = print_temp_raw * 5
# Parse bed temperature (1 byte, divided by 5) at offset 0x61
bed_temp_raw = octets[0x61]
bed_temp = bed_temp_raw * 5
# Parse density (2 bytes, µg/cm³) at offset 0x62
density_raw = int.from_bytes(octets[0x62:0x64], byteorder="big")
density = density_raw / 1000.0
# Build material name
material_name = material_base
if material_mod:
material_name = f"{material_base}-{material_mod}"
result = {
"tag_version": tag_version,
"manufacturer": manufacturer,
"material_name": material_name,
"material_base": material_base,
"material_mod": material_mod,
"color_name": color_name,
"color_hex": color_1_hex,
"diameter_mm": diameter_mm,
"target_weight": target_weight,
"print_temp": print_temp,
"bed_temp": bed_temp,
"density": density,
}
# Parse color 2 at offset 0x50
color_2_hex = self._parse_rgba_to_hex(octets, 0x50)
if color_2_hex:
result["color_2_hex"] = color_2_hex
# Parse color 3 at offset 0x54
color_3_hex = self._parse_rgba_to_hex(octets, 0x54)
if color_3_hex:
result["color_3_hex"] = color_3_hex
# Parse color 4 at offset 0x58
color_4_hex = self._parse_rgba_to_hex(octets, 0x58)
if color_4_hex:
result["color_4_hex"] = color_4_hex
# Parse online data URL (32 bytes ASCII) at 0x70
if len(octets) >= 0x70 + 32:
online_url = (
octets[0x70:0x90].decode("ascii", errors="ignore").rstrip("\x00")
)
if online_url:
result["online_data_url"] = online_url
# Parse extended fields if available (NTAG215/216)
# Extended fields start at 0x90
if len(octets) >= 0x90 + 16:
# Parse serial number / batch ID (16 bytes UTF-8) at 0x90
serial = octets[0x90:0xA0].decode("utf-8", errors="ignore").rstrip("\x00")
if serial:
result["serial"] = serial
if len(octets) >= 0xA0 + 4:
# Parse manufacture date (4 bytes: year, year, month, day) at 0xA0
mfg_year = int.from_bytes(octets[0xA0:0xA2], byteorder="big")
mfg_month = octets[0xA2]
mfg_day = octets[0xA3]
if mfg_year > 0 and mfg_month > 0 and mfg_day > 0:
result["mfg_date"] = f"{mfg_year:04d}-{mfg_month:02d}-{mfg_day:02d}"
if len(octets) >= 0xA4 + 3:
# Parse manufacture time (3 bytes: hour, minute, second) at 0xA4
mfg_hour = octets[0xA4]
mfg_minute = octets[0xA5]
mfg_second = octets[0xA6]
if mfg_hour < 24 and mfg_minute < 60 and mfg_second < 60:
result["mfg_time"] = f"{mfg_hour:02d}:{mfg_minute:02d}:{mfg_second:02d}"
if len(octets) >= 0xA7 + 1:
# Parse spool core diameter (1 byte, mm) at 0xA7
spool_core_diameter = octets[0xA7]
if spool_core_diameter > 0:
result["spool_core_diameter"] = spool_core_diameter
if len(octets) >= 0xA8 + 1:
# Parse MFI temperature (1 byte, divided by 5) at 0xA8
mfi_temp_raw = octets[0xA8]
if mfi_temp_raw > 0:
result["mfi_temp"] = mfi_temp_raw * 5
if len(octets) >= 0xA9 + 1:
# Parse MFI load (1 byte, divided by 10) at 0xA9
mfi_load_raw = octets[0xA9]
if mfi_load_raw > 0:
result["mfi_load"] = mfi_load_raw * 10
if len(octets) >= 0xAA + 1:
# Parse MFI value (1 byte, divided by 10) at 0xAA
mfi_value_raw = octets[0xAA]
if mfi_value_raw > 0:
result["mfi_value"] = mfi_value_raw / 10.0
if len(octets) >= 0xAB + 1:
# Parse measured tolerance (1 byte, µm) at 0xAB
measured_tolerance = octets[0xAB]
if measured_tolerance > 0:
result["measured_tolerance"] = measured_tolerance
if len(octets) >= 0xAC + 2:
# Parse empty spool weight (2 bytes, grams) at 0xAC
empty_spool_weight = int.from_bytes(octets[0xAC:0xAE], byteorder="big")
if 0 < empty_spool_weight < 65535:
result["empty_spool_weight"] = empty_spool_weight
if len(octets) >= 0xAE + 2:
# Parse measured filament weight (2 bytes, grams) at 0xAE
measured_filament_weight = int.from_bytes(
octets[0xAE:0xB0], byteorder="big"
)
if 0 < measured_filament_weight < 65535:
result["measured_filament_weight"] = measured_filament_weight
if len(octets) >= 0xB0 + 2:
# Parse measured filament length (2 bytes, meters) at 0xB0
measured_filament_length = int.from_bytes(
octets[0xB0:0xB2], byteorder="big"
)
if 0 < measured_filament_length < 65535:
result["measured_filament_length"] = measured_filament_length
if len(octets) >= 0xB2 + 2:
# Parse transmission distance (2 bytes, µm) at 0xB2
transmission_distance = int.from_bytes(octets[0xB2:0xB4], byteorder="big")
if 0 < transmission_distance < 65535:
result["transmission_distance"] = transmission_distance
if len(octets) >= 0xB4 + 1:
# Parse max dry temp (1 byte, divided by 5) at 0xB4
max_dry_temp_raw = octets[0xB4]
if max_dry_temp_raw > 0:
result["max_dry_temp"] = max_dry_temp_raw * 5
if len(octets) >= 0xB5 + 1:
# Parse dry time (1 byte, hours) at 0xB5
dry_time = octets[0xB5]
if dry_time > 0:
result["dry_time"] = dry_time
if len(octets) >= 0xB6 + 1:
# Parse min print temp (1 byte, divided by 5) at 0xB6
min_print_temp_raw = octets[0xB6]
if min_print_temp_raw > 0:
result["min_print_temp"] = min_print_temp_raw * 5
if len(octets) >= 0xB7 + 1:
# Parse max print temp (1 byte, divided by 5) at 0xB7
max_print_temp_raw = octets[0xB7]
if max_print_temp_raw > 0:
result["max_print_temp"] = max_print_temp_raw * 5
if len(octets) >= 0xB8 + 1:
# Parse min volumetric speed (1 byte, mm³/s) at 0xB8
min_vso = octets[0xB8]
if min_vso > 0:
result["min_volumetric_speed"] = min_vso
if len(octets) >= 0xB9 + 1:
# Parse max volumetric speed (1 byte, mm³/s) at 0xB9
max_vso = octets[0xB9]
if max_vso > 0:
result["max_volumetric_speed"] = max_vso
if len(octets) >= 0xBA + 1:
# Parse target volumetric speed (1 byte, mm³/s) at 0xBA
target_vso = octets[0xBA]
if target_vso > 0:
result["target_volumetric_speed"] = target_vso
return result
# pylint: disable=too-many-locals,too-many-return-statements,too-many-branches,too-many-statements
def parse(
self, ndef_data: Any, identifier: str
) -> Tuple[Optional[str], Optional[str]]:
"""Parse OpenTag3D tag data and create/match entries in Spoolman
Args:
ndef_data: NDEF data structure from the tag
identifier: Tag identifier string
Returns:
Tuple of (spool_id, filament_id) as strings, or (None, None) if not found
"""
if ndef_data is None:
return None, None
# Look for the OpenTag3D NDEF record with MIME type "application/opentag3d"
octets = None
try:
if hasattr(ndef_data, "records"):
for record in ndef_data.records:
# Check if this is an OpenTag3D MIME type record
if (
hasattr(record, "type")
and record.type == "application/opentag3d"
):
octets = record.data
logger.debug("Found OpenTag3D NDEF record")
break
except (AttributeError, TypeError) as ex:
logger.debug("Failed to find OpenTag3D NDEF record: %s", ex)
if octets is None:
logger.debug("Could not find OpenTag3D NDEF record")
return None, None
# Parse OpenTag3D data
tag_data = self._parse_opentag3d_data(octets)
if tag_data is None:
logger.debug("Not an OpenTag3D format tag")
return None, None
logger.info(
"Parsed OpenTag3D tag: manufacturer=%s, material=%s, color=%s",
tag_data["manufacturer"],
tag_data["material_name"],
tag_data["color_name"],
)
logger.debug("All data: %s", tag_data)
# Generate filament name from template
filament_name = self._generate_filament_name(tag_data)
logger.info("Generated filament name from template: %s", filament_name)
# Find or create vendor
vendor_id = self.spoolman_client.find_vendor_by_name(tag_data["manufacturer"])
if vendor_id is None:
logger.info("Creating new vendor: %s", tag_data["manufacturer"])
vendor_id = self.spoolman_client.create_vendor(tag_data["manufacturer"])
if vendor_id is None:
logger.error("Failed to create vendor")
return None, None
# Find or create filament using vendor, material, and name
# Material is constructed from base_material and material_modifiers
material = tag_data["material_name"]
filament_id = self.spoolman_client.find_filament_by_vendor_material_and_name(
vendor_id, material, filament_name
)
if filament_id is None:
logger.info(
"Creating new filament: %s %s", tag_data["manufacturer"], filament_name
)
# Build base filament data with required fields
filament_data = {
"vendor_id": vendor_id,
"name": filament_name,
"material": material,
"density": tag_data["density"],
"diameter": tag_data["diameter_mm"],
"color_hex": tag_data["color_hex"],
}
# Build multi_color_hexes if color_2_hex is present
if "color_2_hex" in tag_data:
multi_color_hexes = [tag_data["color_hex"], tag_data["color_2_hex"]]
if "color_3_hex" in tag_data:
multi_color_hexes.append(tag_data["color_3_hex"])
if "color_4_hex" in tag_data:
multi_color_hexes.append(tag_data["color_4_hex"])
filament_data["multi_color_hexes"] = multi_color_hexes
# Apply field mapping from config
filament_data = self._apply_field_mapping(
tag_data, self.filament_field_mapping, filament_data
)
filament_id = self.spoolman_client.create_filament(filament_data)
if filament_id is None:
logger.error("Failed to create filament")
return None, None
# Create spool with nfc_id
logger.info(
"Creating new spool for filament %s with nfc_id %s", filament_id, identifier
)
# Build base spool data
spool_data = {
"filament_id": filament_id,
}
# Apply field mapping from config
spool_data = self._apply_field_mapping(
tag_data, self.spool_field_mapping, spool_data
)
# Add nfc_id to extra field
if "extra" not in spool_data:
spool_data["extra"] = {}
spool_data["extra"]["nfc_id"] = f'"{identifier.lower()}"'
if "remaining_weight" not in spool_data:
if "measured_weight" in tag_data:
spool_data["remaining_weight"] = tag_data["measured_weight"]
if "initial_weight" not in spool_data:
if "target_weight" in tag_data:
spool_data["initial_weight"] = tag_data["target_weight"]
spool_id = self.spoolman_client.create_spool(spool_data)
if spool_id is None:
logger.error("Failed to create spool")
return None, None
logger.info(
"Successfully created spool %s and filament %s", spool_id, filament_id
)
return str(spool_id), str(filament_id)