Compare commits
10 commits
d1538a6096
...
98227a0a86
| Author | SHA1 | Date | |
|---|---|---|---|
| 98227a0a86 | |||
|
|
b0c58fca13 |
||
|
|
4720976009 |
||
|
|
3823b4e780 | ||
|
|
2caca5ff91 | ||
|
|
f44f349d8a | ||
|
|
b89b0c2f32 | ||
|
|
911409c310 | ||
|
|
82afd6a786 | ||
|
|
96cfcc9b08 |
10 changed files with 593 additions and 555 deletions
16
README.md
16
README.md
|
|
@ -33,7 +33,7 @@ Automatically sets the loaded spool & filament in klipper by using NFC/RFID
|
|||
- [Use with Happy-Hare](#use-with-happy-hare)
|
||||
- [Use with Prusa's OpenPrintTag tags](#use-with-prusas-openprinttag-tags)
|
||||
- [Use with OpenTag3D tags](#use-with-opentag3d-tags)
|
||||
- [See also](#see-also)
|
||||
- [Related projects](#related-projects)
|
||||
- [Developer info](#developer-info)
|
||||
|
||||
|
||||
|
|
@ -301,12 +301,14 @@ See Spoolman's API documentation [here](https://donkie.github.io/Spoolman/) to s
|
|||
You can also add extra fields in Spoolman for saving the data from the OpenTag3D tags.
|
||||
|
||||
|
||||
## See also
|
||||
If nfc2klipper doesn't work for some reason,
|
||||
[spool2klipper](https://github.com/bofh69/spool2klipper) can be use
|
||||
to automatically update the `active_filament` variable whenever the spool
|
||||
is changed in Moonraker (when changing it in the frontend for example).
|
||||
That way `ASSERT_ACTIVE_FILAMENT` will still work correctly.
|
||||
## Related projects
|
||||
|
||||
* [FilaMan](https://www.filaman.app/) - a filament management system.
|
||||
* [esp_to_spoolman](https://github.com/dimbas80/esp_to_spoolman) - like nfc2klipper, but running on an ESP32.
|
||||
* [OpenPrintTag Scanner](https://github.com/ryanch/openprinttag_scanner) - like esp_to_spoolman, but with display and for PrusaLink instead of Klipper.
|
||||
* [spool2klipper](https://github.com/bofh69/spool2klipper) - to set the filament id (and other data) when the spool id is changed.
|
||||
* [spoolman2slicer](https://github.com/bofh69/spoolman2slicer) - create filament config from Spoolman.
|
||||
* [OpenTag3d](https://opentag3d.info/) tag format.
|
||||
|
||||
## Developer info
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@
|
|||
"""Tag parsers for different data formats"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
from typing import Any, Dict, List, Optional, Protocol, Tuple
|
||||
|
||||
import ndef # pylint: disable=import-error
|
||||
|
|
@ -133,496 +132,3 @@ class TagIdentifierParser:
|
|||
identifier,
|
||||
)
|
||||
return None, None
|
||||
|
||||
|
||||
class OpenTag3DParser:
|
||||
"""Parser for OpenTag3D format tags
|
||||
|
||||
Parses tags following the OpenTag3D specification from https://opentag3d.info/spec
|
||||
Based on OpenTag3D spec version 0.012
|
||||
Extracts manufacturer, material, color, and other filament data, then creates
|
||||
or matches entries in Spoolman.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
spoolman_client: Any,
|
||||
filament_name_template: str,
|
||||
filament_field_mapping: Dict[str, str],
|
||||
spool_field_mapping: Dict[str, str],
|
||||
) -> None:
|
||||
"""Initialize with a Spoolman client instance
|
||||
|
||||
Args:
|
||||
spoolman_client: Client object with methods to interact with Spoolman API
|
||||
filament_name_template: Template string for generating filament names from tag data
|
||||
filament_field_mapping: Mapping from Spoolman filament fields to OpenTag3D fields
|
||||
spool_field_mapping: Mapping from Spoolman spool fields to OpenTag3D fields
|
||||
"""
|
||||
self.spoolman_client = spoolman_client
|
||||
self.filament_name_template = filament_name_template
|
||||
self.filament_field_mapping = filament_field_mapping
|
||||
self.spool_field_mapping = spool_field_mapping
|
||||
|
||||
def _parse_rgba_to_hex(self, octets: bytes, offset: int) -> Optional[str]:
|
||||
"""Parse RGBA color at given offset and return hex string if not transparent black
|
||||
|
||||
Args:
|
||||
octets: Raw bytes from the NFC tag
|
||||
offset: Byte offset to start reading RGBA (4 bytes)
|
||||
|
||||
Returns:
|
||||
Hex color string (e.g., "FF0000FF") or None if transparent black
|
||||
"""
|
||||
if len(octets) < offset + 4:
|
||||
return None
|
||||
|
||||
r = octets[offset]
|
||||
g = octets[offset + 1]
|
||||
b = octets[offset + 2]
|
||||
a = octets[offset + 3]
|
||||
|
||||
# Only return color if not transparent black (indicates no color)
|
||||
if r == 0 and g == 0 and b == 0 and a == 0:
|
||||
return None
|
||||
|
||||
return f"{r:02x}{g:02x}{b:02x}{a:02x}"
|
||||
|
||||
def _apply_field_mapping(
|
||||
self,
|
||||
tag_data: Dict[str, Any],
|
||||
field_mapping: Dict[str, str],
|
||||
base_data: Optional[Dict[str, Any]] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""Apply field mapping from OpenTag3D data to Spoolman fields
|
||||
|
||||
Args:
|
||||
tag_data: Parsed OpenTag3D tag data
|
||||
field_mapping: Mapping from Spoolman fields to OpenTag3D fields
|
||||
base_data: Optional base dictionary to start with
|
||||
|
||||
Returns:
|
||||
Dictionary with mapped fields ready for Spoolman API
|
||||
"""
|
||||
result = base_data.copy() if base_data else {}
|
||||
|
||||
for spoolman_field, ot3d_field in field_mapping.items():
|
||||
if ot3d_field in tag_data:
|
||||
value = tag_data[ot3d_field]
|
||||
# Handle nested fields (e.g., "extra.custom_field")
|
||||
if "." in spoolman_field:
|
||||
parts = spoolman_field.split(".", 1)
|
||||
parent_key = parts[0]
|
||||
child_key = parts[1]
|
||||
if parent_key not in result:
|
||||
result[parent_key] = {}
|
||||
result[parent_key][child_key] = value
|
||||
else:
|
||||
result[spoolman_field] = value
|
||||
|
||||
return result
|
||||
|
||||
def _generate_filament_name(self, tag_data: Dict[str, Any]) -> str:
|
||||
"""Generate filament name from template using tag data
|
||||
|
||||
Args:
|
||||
tag_data: Parsed OpenTag3D tag data
|
||||
|
||||
Returns:
|
||||
Formatted filament name
|
||||
"""
|
||||
# Use string formatting with the template
|
||||
try:
|
||||
# Simple approach: format with all fields, then clean up
|
||||
name = self.filament_name_template.format(**tag_data)
|
||||
# Clean up extra spaces
|
||||
name = " ".join(name.split())
|
||||
# Clean up trailing/leading dashes and spaces around dashes
|
||||
# Remove trailing dash (with optional spaces)
|
||||
name = re.sub(r"\s*-\s*$", "", name)
|
||||
# Remove leading dash (with optional spaces)
|
||||
name = re.sub(r"^\s*-\s*", "", name)
|
||||
# Clean up double spaces again after dash removal
|
||||
name = " ".join(name.split())
|
||||
return name
|
||||
except (KeyError, ValueError) as ex:
|
||||
logger.warning("Template formatting error: %s, using fallback", ex)
|
||||
# Fallback to simple material_name
|
||||
return tag_data.get("material_name", "Unknown")
|
||||
|
||||
# pylint: disable=too-many-locals,too-many-branches,too-many-statements
|
||||
def _parse_opentag3d_data(self, octets: bytes) -> Optional[Dict[str, Any]]:
|
||||
"""Parse OpenTag3D format data from tag octets
|
||||
|
||||
Args:
|
||||
octets: Raw bytes from the NDEF record payload
|
||||
|
||||
Returns:
|
||||
Dictionary with parsed data, or None if not a valid OpenTag3D tag
|
||||
"""
|
||||
# OpenTag3D data starts at offset 0x00 (new spec v0.010+)
|
||||
# Check if we have enough data
|
||||
if len(octets) < 0x64: # Minimum size for core fields
|
||||
return None
|
||||
|
||||
# Parse tag version (2 bytes, big endian) at offset 0x00
|
||||
tag_version = int.from_bytes(octets[0x00:0x02], byteorder="big")
|
||||
|
||||
logger.debug(
|
||||
"Parsing OpenTag3D version %d.%02d tag",
|
||||
tag_version // 256,
|
||||
tag_version % 256,
|
||||
)
|
||||
|
||||
if tag_version < 0x000C:
|
||||
# Too old, ignoring
|
||||
return None
|
||||
|
||||
# Parse base material (5 bytes UTF-8) at offset 0x02
|
||||
material_base = (
|
||||
octets[0x02:0x07].decode("utf-8", errors="ignore").rstrip("\x00")
|
||||
)
|
||||
|
||||
# Parse material modifiers (5 bytes UTF-8) at offset 0x07
|
||||
material_mod = octets[0x07:0x0C].decode("utf-8", errors="ignore").rstrip("\x00")
|
||||
|
||||
# Parse manufacturer (16 bytes UTF-8) at offset 0x1B
|
||||
manufacturer = octets[0x1B:0x2B].decode("utf-8", errors="ignore").rstrip("\x00")
|
||||
|
||||
# Parse color name (32 bytes UTF-8) at offset 0x2B
|
||||
color_name = octets[0x2B:0x4B].decode("utf-8", errors="ignore").rstrip("\x00")
|
||||
|
||||
# Parse color 1 (4 bytes RGBA) at offset 0x4B
|
||||
color_1_hex = self._parse_rgba_to_hex(octets, 0x4B)
|
||||
|
||||
# Parse target diameter (2 bytes, µm) at offset 0x5C
|
||||
target_diameter = int.from_bytes(octets[0x5C:0x5E], byteorder="big")
|
||||
diameter_mm = target_diameter / 1000.0
|
||||
|
||||
# Parse target weight (2 bytes, grams) at offset 0x5E
|
||||
target_weight = int.from_bytes(octets[0x5E:0x60], byteorder="big")
|
||||
|
||||
# Parse print temperature (1 byte, divided by 5) at offset 0x60
|
||||
print_temp_raw = octets[0x60]
|
||||
print_temp = print_temp_raw * 5
|
||||
|
||||
# Parse bed temperature (1 byte, divided by 5) at offset 0x61
|
||||
bed_temp_raw = octets[0x61]
|
||||
bed_temp = bed_temp_raw * 5
|
||||
|
||||
# Parse density (2 bytes, µg/cm³) at offset 0x62
|
||||
density_raw = int.from_bytes(octets[0x62:0x64], byteorder="big")
|
||||
density = density_raw / 1000.0
|
||||
|
||||
# Build material name
|
||||
material_name = material_base
|
||||
if material_mod:
|
||||
material_name = f"{material_base}-{material_mod}"
|
||||
|
||||
result = {
|
||||
"tag_version": tag_version,
|
||||
"manufacturer": manufacturer,
|
||||
"material_name": material_name,
|
||||
"material_base": material_base,
|
||||
"material_mod": material_mod,
|
||||
"color_name": color_name,
|
||||
"color_hex": color_1_hex,
|
||||
"diameter_mm": diameter_mm,
|
||||
"target_weight": target_weight,
|
||||
"print_temp": print_temp,
|
||||
"bed_temp": bed_temp,
|
||||
"density": density,
|
||||
}
|
||||
|
||||
# Parse color 2 at offset 0x50
|
||||
color_2_hex = self._parse_rgba_to_hex(octets, 0x50)
|
||||
if color_2_hex:
|
||||
result["color_2_hex"] = color_2_hex
|
||||
|
||||
# Parse color 3 at offset 0x54
|
||||
color_3_hex = self._parse_rgba_to_hex(octets, 0x54)
|
||||
if color_3_hex:
|
||||
result["color_3_hex"] = color_3_hex
|
||||
|
||||
# Parse color 4 at offset 0x58
|
||||
color_4_hex = self._parse_rgba_to_hex(octets, 0x58)
|
||||
if color_4_hex:
|
||||
result["color_4_hex"] = color_4_hex
|
||||
|
||||
# Parse online data URL (32 bytes ASCII) at 0x70
|
||||
if len(octets) >= 0x70 + 32:
|
||||
online_url = (
|
||||
octets[0x70:0x90].decode("ascii", errors="ignore").rstrip("\x00")
|
||||
)
|
||||
if online_url:
|
||||
result["online_data_url"] = online_url
|
||||
|
||||
# Parse extended fields if available (NTAG215/216)
|
||||
# Extended fields start at 0x90
|
||||
if len(octets) >= 0x90 + 16:
|
||||
# Parse serial number / batch ID (16 bytes UTF-8) at 0x90
|
||||
serial = octets[0x90:0xA0].decode("utf-8", errors="ignore").rstrip("\x00")
|
||||
if serial:
|
||||
result["serial"] = serial
|
||||
|
||||
if len(octets) >= 0xA0 + 4:
|
||||
# Parse manufacture date (4 bytes: year, year, month, day) at 0xA0
|
||||
mfg_year = int.from_bytes(octets[0xA0:0xA2], byteorder="big")
|
||||
mfg_month = octets[0xA2]
|
||||
mfg_day = octets[0xA3]
|
||||
if mfg_year > 0 and mfg_month > 0 and mfg_day > 0:
|
||||
result["mfg_date"] = f"{mfg_year:04d}-{mfg_month:02d}-{mfg_day:02d}"
|
||||
|
||||
if len(octets) >= 0xA4 + 3:
|
||||
# Parse manufacture time (3 bytes: hour, minute, second) at 0xA4
|
||||
mfg_hour = octets[0xA4]
|
||||
mfg_minute = octets[0xA5]
|
||||
mfg_second = octets[0xA6]
|
||||
if mfg_hour < 24 and mfg_minute < 60 and mfg_second < 60:
|
||||
result["mfg_time"] = f"{mfg_hour:02d}:{mfg_minute:02d}:{mfg_second:02d}"
|
||||
|
||||
if len(octets) >= 0xA7 + 1:
|
||||
# Parse spool core diameter (1 byte, mm) at 0xA7
|
||||
spool_core_diameter = octets[0xA7]
|
||||
if spool_core_diameter > 0:
|
||||
result["spool_core_diameter"] = spool_core_diameter
|
||||
|
||||
if len(octets) >= 0xA8 + 1:
|
||||
# Parse MFI temperature (1 byte, divided by 5) at 0xA8
|
||||
mfi_temp_raw = octets[0xA8]
|
||||
if mfi_temp_raw > 0:
|
||||
result["mfi_temp"] = mfi_temp_raw * 5
|
||||
|
||||
if len(octets) >= 0xA9 + 1:
|
||||
# Parse MFI load (1 byte, divided by 10) at 0xA9
|
||||
mfi_load_raw = octets[0xA9]
|
||||
if mfi_load_raw > 0:
|
||||
result["mfi_load"] = mfi_load_raw * 10
|
||||
|
||||
if len(octets) >= 0xAA + 1:
|
||||
# Parse MFI value (1 byte, divided by 10) at 0xAA
|
||||
mfi_value_raw = octets[0xAA]
|
||||
if mfi_value_raw > 0:
|
||||
result["mfi_value"] = mfi_value_raw / 10.0
|
||||
|
||||
if len(octets) >= 0xAB + 1:
|
||||
# Parse measured tolerance (1 byte, µm) at 0xAB
|
||||
measured_tolerance = octets[0xAB]
|
||||
if measured_tolerance > 0:
|
||||
result["measured_tolerance"] = measured_tolerance
|
||||
|
||||
if len(octets) >= 0xAC + 2:
|
||||
# Parse empty spool weight (2 bytes, grams) at 0xAC
|
||||
empty_spool_weight = int.from_bytes(octets[0xAC:0xAE], byteorder="big")
|
||||
if 0 < empty_spool_weight < 65535:
|
||||
result["empty_spool_weight"] = empty_spool_weight
|
||||
|
||||
if len(octets) >= 0xAE + 2:
|
||||
# Parse measured filament weight (2 bytes, grams) at 0xAE
|
||||
measured_filament_weight = int.from_bytes(
|
||||
octets[0xAE:0xB0], byteorder="big"
|
||||
)
|
||||
if 0 < measured_filament_weight < 65535:
|
||||
result["measured_filament_weight"] = measured_filament_weight
|
||||
|
||||
if len(octets) >= 0xB0 + 2:
|
||||
# Parse measured filament length (2 bytes, meters) at 0xB0
|
||||
measured_filament_length = int.from_bytes(
|
||||
octets[0xB0:0xB2], byteorder="big"
|
||||
)
|
||||
if 0 < measured_filament_length < 65535:
|
||||
result["measured_filament_length"] = measured_filament_length
|
||||
|
||||
if len(octets) >= 0xB2 + 2:
|
||||
# Parse transmission distance (2 bytes, µm) at 0xB2
|
||||
transmission_distance = int.from_bytes(octets[0xB2:0xB4], byteorder="big")
|
||||
if 0 < transmission_distance < 65535:
|
||||
result["transmission_distance"] = transmission_distance
|
||||
|
||||
if len(octets) >= 0xB4 + 1:
|
||||
# Parse max dry temp (1 byte, divided by 5) at 0xB4
|
||||
max_dry_temp_raw = octets[0xB4]
|
||||
if max_dry_temp_raw > 0:
|
||||
result["max_dry_temp"] = max_dry_temp_raw * 5
|
||||
|
||||
if len(octets) >= 0xB5 + 1:
|
||||
# Parse dry time (1 byte, hours) at 0xB5
|
||||
dry_time = octets[0xB5]
|
||||
if dry_time > 0:
|
||||
result["dry_time"] = dry_time
|
||||
|
||||
if len(octets) >= 0xB6 + 1:
|
||||
# Parse min print temp (1 byte, divided by 5) at 0xB6
|
||||
min_print_temp_raw = octets[0xB6]
|
||||
if min_print_temp_raw > 0:
|
||||
result["min_print_temp"] = min_print_temp_raw * 5
|
||||
|
||||
if len(octets) >= 0xB7 + 1:
|
||||
# Parse max print temp (1 byte, divided by 5) at 0xB7
|
||||
max_print_temp_raw = octets[0xB7]
|
||||
if max_print_temp_raw > 0:
|
||||
result["max_print_temp"] = max_print_temp_raw * 5
|
||||
|
||||
if len(octets) >= 0xB8 + 1:
|
||||
# Parse min volumetric speed (1 byte, mm³/s) at 0xB8
|
||||
min_vso = octets[0xB8]
|
||||
if min_vso > 0:
|
||||
result["min_volumetric_speed"] = min_vso
|
||||
|
||||
if len(octets) >= 0xB9 + 1:
|
||||
# Parse max volumetric speed (1 byte, mm³/s) at 0xB9
|
||||
max_vso = octets[0xB9]
|
||||
if max_vso > 0:
|
||||
result["max_volumetric_speed"] = max_vso
|
||||
|
||||
if len(octets) >= 0xBA + 1:
|
||||
# Parse target volumetric speed (1 byte, mm³/s) at 0xBA
|
||||
target_vso = octets[0xBA]
|
||||
if target_vso > 0:
|
||||
result["target_volumetric_speed"] = target_vso
|
||||
|
||||
return result
|
||||
|
||||
# pylint: disable=too-many-locals,too-many-return-statements,too-many-branches,too-many-statements
|
||||
def parse(
|
||||
self, ndef_data: Any, identifier: str
|
||||
) -> Tuple[Optional[str], Optional[str]]:
|
||||
"""Parse OpenTag3D tag data and create/match entries in Spoolman
|
||||
|
||||
Args:
|
||||
ndef_data: NDEF data structure from the tag
|
||||
identifier: Tag identifier string
|
||||
|
||||
Returns:
|
||||
Tuple of (spool_id, filament_id) as strings, or (None, None) if not found
|
||||
"""
|
||||
if ndef_data is None:
|
||||
return None, None
|
||||
|
||||
# Look for the OpenTag3D NDEF record with MIME type "application/opentag3d"
|
||||
octets = None
|
||||
try:
|
||||
if hasattr(ndef_data, "records"):
|
||||
for record in ndef_data.records:
|
||||
# Check if this is an OpenTag3D MIME type record
|
||||
if (
|
||||
hasattr(record, "type")
|
||||
and record.type == "application/opentag3d"
|
||||
):
|
||||
octets = record.data
|
||||
logger.debug("Found OpenTag3D NDEF record")
|
||||
break
|
||||
except (AttributeError, TypeError) as ex:
|
||||
logger.debug("Failed to find OpenTag3D NDEF record: %s", ex)
|
||||
|
||||
if octets is None:
|
||||
logger.debug("Could not find OpenTag3D NDEF record")
|
||||
return None, None
|
||||
|
||||
# Parse OpenTag3D data
|
||||
tag_data = self._parse_opentag3d_data(octets)
|
||||
if tag_data is None:
|
||||
logger.debug("Not an OpenTag3D format tag")
|
||||
return None, None
|
||||
|
||||
logger.info(
|
||||
"Parsed OpenTag3D tag: manufacturer=%s, material=%s, color=%s",
|
||||
tag_data["manufacturer"],
|
||||
tag_data["material_name"],
|
||||
tag_data["color_name"],
|
||||
)
|
||||
logger.debug("All data: %s", tag_data)
|
||||
|
||||
# Generate filament name from template
|
||||
filament_name = self._generate_filament_name(tag_data)
|
||||
logger.info("Generated filament name from template: %s", filament_name)
|
||||
|
||||
# Find or create vendor
|
||||
vendor_id = self.spoolman_client.find_vendor_by_name(tag_data["manufacturer"])
|
||||
|
||||
if vendor_id is None:
|
||||
logger.info("Creating new vendor: %s", tag_data["manufacturer"])
|
||||
vendor_id = self.spoolman_client.create_vendor(tag_data["manufacturer"])
|
||||
if vendor_id is None:
|
||||
logger.error("Failed to create vendor")
|
||||
return None, None
|
||||
|
||||
# Find or create filament using vendor, material, and name
|
||||
# Material is constructed from base_material and material_modifiers
|
||||
material = tag_data["material_name"]
|
||||
filament_id = self.spoolman_client.find_filament_by_vendor_material_and_name(
|
||||
vendor_id, material, filament_name
|
||||
)
|
||||
|
||||
if filament_id is None:
|
||||
logger.info(
|
||||
"Creating new filament: %s %s", tag_data["manufacturer"], filament_name
|
||||
)
|
||||
|
||||
# Build base filament data with required fields
|
||||
filament_data = {
|
||||
"vendor_id": vendor_id,
|
||||
"name": filament_name,
|
||||
"material": material,
|
||||
"density": tag_data["density"],
|
||||
"diameter": tag_data["diameter_mm"],
|
||||
"color_hex": tag_data["color_hex"],
|
||||
}
|
||||
|
||||
# Build multi_color_hexes if color_2_hex is present
|
||||
if "color_2_hex" in tag_data:
|
||||
multi_color_hexes = [tag_data["color_hex"], tag_data["color_2_hex"]]
|
||||
if "color_3_hex" in tag_data:
|
||||
multi_color_hexes.append(tag_data["color_3_hex"])
|
||||
if "color_4_hex" in tag_data:
|
||||
multi_color_hexes.append(tag_data["color_4_hex"])
|
||||
filament_data["multi_color_hexes"] = multi_color_hexes
|
||||
|
||||
# Apply field mapping from config
|
||||
filament_data = self._apply_field_mapping(
|
||||
tag_data, self.filament_field_mapping, filament_data
|
||||
)
|
||||
|
||||
filament_id = self.spoolman_client.create_filament(filament_data)
|
||||
if filament_id is None:
|
||||
logger.error("Failed to create filament")
|
||||
return None, None
|
||||
|
||||
# Create spool with nfc_id
|
||||
logger.info(
|
||||
"Creating new spool for filament %s with nfc_id %s", filament_id, identifier
|
||||
)
|
||||
|
||||
# Build base spool data
|
||||
spool_data = {
|
||||
"filament_id": filament_id,
|
||||
}
|
||||
|
||||
# Apply field mapping from config
|
||||
spool_data = self._apply_field_mapping(
|
||||
tag_data, self.spool_field_mapping, spool_data
|
||||
)
|
||||
|
||||
# Add nfc_id to extra field
|
||||
if "extra" not in spool_data:
|
||||
spool_data["extra"] = {}
|
||||
spool_data["extra"]["nfc_id"] = f'"{identifier.lower()}"'
|
||||
|
||||
if "remaining_weight" not in spool_data:
|
||||
if "measured_weight" in tag_data:
|
||||
spool_data["remaining_weight"] = tag_data["measured_weight"]
|
||||
|
||||
if "initial_weight" not in spool_data:
|
||||
if "target_weight" in tag_data:
|
||||
spool_data["initial_weight"] = tag_data["target_weight"]
|
||||
|
||||
spool_id = self.spoolman_client.create_spool(spool_data)
|
||||
|
||||
if spool_id is None:
|
||||
logger.error("Failed to create spool")
|
||||
return None, None
|
||||
|
||||
logger.info(
|
||||
"Successfully created spool %s and filament %s", spool_id, filament_id
|
||||
)
|
||||
return str(spool_id), str(filament_id)
|
||||
|
|
|
|||
505
lib/opentag3d_parser.py
Normal file
505
lib/opentag3d_parser.py
Normal file
|
|
@ -0,0 +1,505 @@
|
|||
# SPDX-FileCopyrightText: 2024-2026 Sebastian Andersson <sebastian@bittr.nu>
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
"""Tag parsers for different data formats"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
from typing import Any, Dict, Optional, Tuple
|
||||
|
||||
logger: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
# pylint: disable=too-few-public-methods
|
||||
|
||||
|
||||
class OpenTag3DParser:
|
||||
"""Parser for OpenTag3D format tags
|
||||
|
||||
Parses tags following the OpenTag3D specification from https://opentag3d.info/spec
|
||||
Based on OpenTag3D spec version 0.012
|
||||
Extracts manufacturer, material, color, and other filament data, then creates
|
||||
or matches entries in Spoolman.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
spoolman_client: Any,
|
||||
filament_name_template: str,
|
||||
filament_field_mapping: Dict[str, str],
|
||||
spool_field_mapping: Dict[str, str],
|
||||
) -> None:
|
||||
"""Initialize with a Spoolman client instance
|
||||
|
||||
Args:
|
||||
spoolman_client: Client object with methods to interact with Spoolman API
|
||||
filament_name_template: Template string for generating filament names from tag data
|
||||
filament_field_mapping: Mapping from Spoolman filament fields to OpenTag3D fields
|
||||
spool_field_mapping: Mapping from Spoolman spool fields to OpenTag3D fields
|
||||
"""
|
||||
self.spoolman_client = spoolman_client
|
||||
self.filament_name_template = filament_name_template
|
||||
self.filament_field_mapping = filament_field_mapping
|
||||
self.spool_field_mapping = spool_field_mapping
|
||||
|
||||
def _parse_rgba_to_hex(self, octets: bytes, offset: int) -> Optional[str]:
|
||||
"""Parse RGBA color at given offset and return hex string if not transparent black
|
||||
|
||||
Args:
|
||||
octets: Raw bytes from the NFC tag
|
||||
offset: Byte offset to start reading RGBA (4 bytes)
|
||||
|
||||
Returns:
|
||||
Hex color string (e.g., "FF0000FF") or None if transparent black
|
||||
"""
|
||||
if len(octets) < offset + 4:
|
||||
return None
|
||||
|
||||
r = octets[offset]
|
||||
g = octets[offset + 1]
|
||||
b = octets[offset + 2]
|
||||
a = octets[offset + 3]
|
||||
|
||||
# Only return color if not transparent black (indicates no color)
|
||||
if r == 0 and g == 0 and b == 0 and a == 0:
|
||||
return None
|
||||
|
||||
return f"{r:02x}{g:02x}{b:02x}{a:02x}"
|
||||
|
||||
def _apply_field_mapping(
|
||||
self,
|
||||
tag_data: Dict[str, Any],
|
||||
field_mapping: Dict[str, str],
|
||||
base_data: Optional[Dict[str, Any]] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""Apply field mapping from OpenTag3D data to Spoolman fields
|
||||
|
||||
Args:
|
||||
tag_data: Parsed OpenTag3D tag data
|
||||
field_mapping: Mapping from Spoolman fields to OpenTag3D fields
|
||||
base_data: Optional base dictionary to start with
|
||||
|
||||
Returns:
|
||||
Dictionary with mapped fields ready for Spoolman API
|
||||
"""
|
||||
result = base_data.copy() if base_data else {}
|
||||
|
||||
for spoolman_field, ot3d_field in field_mapping.items():
|
||||
if ot3d_field in tag_data:
|
||||
value = tag_data[ot3d_field]
|
||||
# Handle nested fields (e.g., "extra.custom_field")
|
||||
if "." in spoolman_field:
|
||||
parts = spoolman_field.split(".", 1)
|
||||
parent_key = parts[0]
|
||||
child_key = parts[1]
|
||||
if parent_key not in result:
|
||||
result[parent_key] = {}
|
||||
result[parent_key][child_key] = value
|
||||
else:
|
||||
result[spoolman_field] = value
|
||||
|
||||
return result
|
||||
|
||||
def _generate_filament_name(self, tag_data: Dict[str, Any]) -> str:
|
||||
"""Generate filament name from template using tag data
|
||||
|
||||
Args:
|
||||
tag_data: Parsed OpenTag3D tag data
|
||||
|
||||
Returns:
|
||||
Formatted filament name
|
||||
"""
|
||||
# Use string formatting with the template
|
||||
try:
|
||||
# Simple approach: format with all fields, then clean up
|
||||
name = self.filament_name_template.format(**tag_data)
|
||||
# Clean up extra spaces
|
||||
name = " ".join(name.split())
|
||||
# Clean up trailing/leading dashes and spaces around dashes
|
||||
# Remove trailing dash (with optional spaces)
|
||||
name = re.sub(r"\s*-\s*$", "", name)
|
||||
# Remove leading dash (with optional spaces)
|
||||
name = re.sub(r"^\s*-\s*", "", name)
|
||||
# Clean up double spaces again after dash removal
|
||||
name = " ".join(name.split())
|
||||
return name
|
||||
except (KeyError, ValueError) as ex:
|
||||
logger.warning("Template formatting error: %s, using fallback", ex)
|
||||
# Fallback to simple material_name
|
||||
return tag_data.get("material_name", "Unknown")
|
||||
|
||||
# pylint: disable=too-many-locals,too-many-branches,too-many-statements
|
||||
def _parse_opentag3d_data(self, octets: bytes) -> Optional[Dict[str, Any]]:
|
||||
"""Parse OpenTag3D format data from tag octets
|
||||
|
||||
Args:
|
||||
octets: Raw bytes from the NDEF record payload
|
||||
|
||||
Returns:
|
||||
Dictionary with parsed data, or None if not a valid OpenTag3D tag
|
||||
"""
|
||||
# OpenTag3D data starts at offset 0x00 (new spec v0.010+)
|
||||
# Check if we have enough data
|
||||
if len(octets) < 0x64: # Minimum size for core fields
|
||||
return None
|
||||
|
||||
# Parse tag version (2 bytes, big endian) at offset 0x00
|
||||
tag_version = int.from_bytes(octets[0x00:0x02], byteorder="big")
|
||||
|
||||
logger.debug(
|
||||
"Parsing OpenTag3D version %d.%02d tag",
|
||||
tag_version // 256,
|
||||
tag_version % 256,
|
||||
)
|
||||
|
||||
if tag_version < 0x000C:
|
||||
# Too old, ignoring
|
||||
return None
|
||||
|
||||
# Parse base material (5 bytes UTF-8) at offset 0x02
|
||||
material_base = (
|
||||
octets[0x02:0x07].decode("utf-8", errors="ignore").rstrip("\x00")
|
||||
)
|
||||
|
||||
# Parse material modifiers (5 bytes UTF-8) at offset 0x07
|
||||
material_mod = octets[0x07:0x0C].decode("utf-8", errors="ignore").rstrip("\x00")
|
||||
|
||||
# Parse manufacturer (16 bytes UTF-8) at offset 0x1B
|
||||
manufacturer = octets[0x1B:0x2B].decode("utf-8", errors="ignore").rstrip("\x00")
|
||||
|
||||
# Parse color name (32 bytes UTF-8) at offset 0x2B
|
||||
color_name = octets[0x2B:0x4B].decode("utf-8", errors="ignore").rstrip("\x00")
|
||||
|
||||
# Parse color 1 (4 bytes RGBA) at offset 0x4B
|
||||
color_1_hex = self._parse_rgba_to_hex(octets, 0x4B)
|
||||
|
||||
# Parse target diameter (2 bytes, µm) at offset 0x5C
|
||||
target_diameter = int.from_bytes(octets[0x5C:0x5E], byteorder="big")
|
||||
diameter_mm = target_diameter / 1000.0
|
||||
|
||||
# Parse target weight (2 bytes, grams) at offset 0x5E
|
||||
target_weight = int.from_bytes(octets[0x5E:0x60], byteorder="big")
|
||||
|
||||
# Parse print temperature (1 byte, divided by 5) at offset 0x60
|
||||
print_temp_raw = octets[0x60]
|
||||
print_temp = print_temp_raw * 5
|
||||
|
||||
# Parse bed temperature (1 byte, divided by 5) at offset 0x61
|
||||
bed_temp_raw = octets[0x61]
|
||||
bed_temp = bed_temp_raw * 5
|
||||
|
||||
# Parse density (2 bytes, µg/cm³) at offset 0x62
|
||||
density_raw = int.from_bytes(octets[0x62:0x64], byteorder="big")
|
||||
density = density_raw / 1000.0
|
||||
|
||||
# Build material name
|
||||
material_name = material_base
|
||||
if material_mod:
|
||||
material_name = f"{material_base}-{material_mod}"
|
||||
|
||||
result = {
|
||||
"tag_version": tag_version,
|
||||
"manufacturer": manufacturer,
|
||||
"material_name": material_name,
|
||||
"material_base": material_base,
|
||||
"material_mod": material_mod,
|
||||
"color_name": color_name,
|
||||
"color_hex": color_1_hex,
|
||||
"diameter_mm": diameter_mm,
|
||||
"target_weight": target_weight,
|
||||
"print_temp": print_temp,
|
||||
"bed_temp": bed_temp,
|
||||
"density": density,
|
||||
}
|
||||
|
||||
# Parse color 2 at offset 0x50
|
||||
color_2_hex = self._parse_rgba_to_hex(octets, 0x50)
|
||||
if color_2_hex:
|
||||
result["color_2_hex"] = color_2_hex
|
||||
|
||||
# Parse color 3 at offset 0x54
|
||||
color_3_hex = self._parse_rgba_to_hex(octets, 0x54)
|
||||
if color_3_hex:
|
||||
result["color_3_hex"] = color_3_hex
|
||||
|
||||
# Parse color 4 at offset 0x58
|
||||
color_4_hex = self._parse_rgba_to_hex(octets, 0x58)
|
||||
if color_4_hex:
|
||||
result["color_4_hex"] = color_4_hex
|
||||
|
||||
# Parse online data URL (32 bytes ASCII) at 0x70
|
||||
if len(octets) >= 0x70 + 32:
|
||||
online_url = (
|
||||
octets[0x70:0x90].decode("ascii", errors="ignore").rstrip("\x00")
|
||||
)
|
||||
if online_url:
|
||||
result["online_data_url"] = online_url
|
||||
|
||||
# Parse extended fields if available (NTAG215/216)
|
||||
# Extended fields start at 0x90
|
||||
if len(octets) >= 0x90 + 16:
|
||||
# Parse serial number / batch ID (16 bytes UTF-8) at 0x90
|
||||
serial = octets[0x90:0xA0].decode("utf-8", errors="ignore").rstrip("\x00")
|
||||
if serial:
|
||||
result["serial"] = serial
|
||||
|
||||
if len(octets) >= 0xA0 + 4:
|
||||
# Parse manufacture date (4 bytes: year, year, month, day) at 0xA0
|
||||
mfg_year = int.from_bytes(octets[0xA0:0xA2], byteorder="big")
|
||||
mfg_month = octets[0xA2]
|
||||
mfg_day = octets[0xA3]
|
||||
if mfg_year > 0 and mfg_month > 0 and mfg_day > 0:
|
||||
result["mfg_date"] = f"{mfg_year:04d}-{mfg_month:02d}-{mfg_day:02d}"
|
||||
|
||||
if len(octets) >= 0xA4 + 3:
|
||||
# Parse manufacture time (3 bytes: hour, minute, second) at 0xA4
|
||||
mfg_hour = octets[0xA4]
|
||||
mfg_minute = octets[0xA5]
|
||||
mfg_second = octets[0xA6]
|
||||
if mfg_hour < 24 and mfg_minute < 60 and mfg_second < 60:
|
||||
result["mfg_time"] = f"{mfg_hour:02d}:{mfg_minute:02d}:{mfg_second:02d}"
|
||||
|
||||
if len(octets) >= 0xA7 + 1:
|
||||
# Parse spool core diameter (1 byte, mm) at 0xA7
|
||||
spool_core_diameter = octets[0xA7]
|
||||
if spool_core_diameter > 0:
|
||||
result["spool_core_diameter"] = spool_core_diameter
|
||||
|
||||
if len(octets) >= 0xA8 + 1:
|
||||
# Parse MFI temperature (1 byte, divided by 5) at 0xA8
|
||||
mfi_temp_raw = octets[0xA8]
|
||||
if mfi_temp_raw > 0:
|
||||
result["mfi_temp"] = mfi_temp_raw * 5
|
||||
|
||||
if len(octets) >= 0xA9 + 1:
|
||||
# Parse MFI load (1 byte, divided by 10) at 0xA9
|
||||
mfi_load_raw = octets[0xA9]
|
||||
if mfi_load_raw > 0:
|
||||
result["mfi_load"] = mfi_load_raw * 10
|
||||
|
||||
if len(octets) >= 0xAA + 1:
|
||||
# Parse MFI value (1 byte, divided by 10) at 0xAA
|
||||
mfi_value_raw = octets[0xAA]
|
||||
if mfi_value_raw > 0:
|
||||
result["mfi_value"] = mfi_value_raw / 10.0
|
||||
|
||||
if len(octets) >= 0xAB + 1:
|
||||
# Parse measured tolerance (1 byte, µm) at 0xAB
|
||||
measured_tolerance = octets[0xAB]
|
||||
if measured_tolerance > 0:
|
||||
result["measured_tolerance"] = measured_tolerance
|
||||
|
||||
if len(octets) >= 0xAC + 2:
|
||||
# Parse empty spool weight (2 bytes, grams) at 0xAC
|
||||
empty_spool_weight = int.from_bytes(octets[0xAC:0xAE], byteorder="big")
|
||||
if 0 < empty_spool_weight < 65535:
|
||||
result["empty_spool_weight"] = empty_spool_weight
|
||||
|
||||
if len(octets) >= 0xAE + 2:
|
||||
# Parse measured filament weight (2 bytes, grams) at 0xAE
|
||||
measured_filament_weight = int.from_bytes(
|
||||
octets[0xAE:0xB0], byteorder="big"
|
||||
)
|
||||
if 0 < measured_filament_weight < 65535:
|
||||
result["measured_filament_weight"] = measured_filament_weight
|
||||
|
||||
if len(octets) >= 0xB0 + 2:
|
||||
# Parse measured filament length (2 bytes, meters) at 0xB0
|
||||
measured_filament_length = int.from_bytes(
|
||||
octets[0xB0:0xB2], byteorder="big"
|
||||
)
|
||||
if 0 < measured_filament_length < 65535:
|
||||
result["measured_filament_length"] = measured_filament_length
|
||||
|
||||
if len(octets) >= 0xB2 + 2:
|
||||
# Parse transmission distance (2 bytes, µm) at 0xB2
|
||||
transmission_distance = int.from_bytes(octets[0xB2:0xB4], byteorder="big")
|
||||
if 0 < transmission_distance < 65535:
|
||||
result["transmission_distance"] = transmission_distance
|
||||
|
||||
if len(octets) >= 0xB4 + 1:
|
||||
# Parse max dry temp (1 byte, divided by 5) at 0xB4
|
||||
max_dry_temp_raw = octets[0xB4]
|
||||
if max_dry_temp_raw > 0:
|
||||
result["max_dry_temp"] = max_dry_temp_raw * 5
|
||||
|
||||
if len(octets) >= 0xB5 + 1:
|
||||
# Parse dry time (1 byte, hours) at 0xB5
|
||||
dry_time = octets[0xB5]
|
||||
if dry_time > 0:
|
||||
result["dry_time"] = dry_time
|
||||
|
||||
if len(octets) >= 0xB6 + 1:
|
||||
# Parse min print temp (1 byte, divided by 5) at 0xB6
|
||||
min_print_temp_raw = octets[0xB6]
|
||||
if min_print_temp_raw > 0:
|
||||
result["min_print_temp"] = min_print_temp_raw * 5
|
||||
|
||||
if len(octets) >= 0xB7 + 1:
|
||||
# Parse max print temp (1 byte, divided by 5) at 0xB7
|
||||
max_print_temp_raw = octets[0xB7]
|
||||
if max_print_temp_raw > 0:
|
||||
result["max_print_temp"] = max_print_temp_raw * 5
|
||||
|
||||
if len(octets) >= 0xB8 + 1:
|
||||
# Parse min volumetric speed (1 byte, mm³/s) at 0xB8
|
||||
min_vso = octets[0xB8]
|
||||
if min_vso > 0:
|
||||
result["min_volumetric_speed"] = min_vso
|
||||
|
||||
if len(octets) >= 0xB9 + 1:
|
||||
# Parse max volumetric speed (1 byte, mm³/s) at 0xB9
|
||||
max_vso = octets[0xB9]
|
||||
if max_vso > 0:
|
||||
result["max_volumetric_speed"] = max_vso
|
||||
|
||||
if len(octets) >= 0xBA + 1:
|
||||
# Parse target volumetric speed (1 byte, mm³/s) at 0xBA
|
||||
target_vso = octets[0xBA]
|
||||
if target_vso > 0:
|
||||
result["target_volumetric_speed"] = target_vso
|
||||
|
||||
return result
|
||||
|
||||
# pylint: disable=too-many-locals,too-many-return-statements,too-many-branches,too-many-statements
|
||||
def parse(
|
||||
self, ndef_data: Any, identifier: str
|
||||
) -> Tuple[Optional[str], Optional[str]]:
|
||||
"""Parse OpenTag3D tag data and create/match entries in Spoolman
|
||||
|
||||
Args:
|
||||
ndef_data: NDEF data structure from the tag
|
||||
identifier: Tag identifier string
|
||||
|
||||
Returns:
|
||||
Tuple of (spool_id, filament_id) as strings, or (None, None) if not found
|
||||
"""
|
||||
if ndef_data is None:
|
||||
return None, None
|
||||
|
||||
# Look for the OpenTag3D NDEF record with MIME type "application/opentag3d"
|
||||
octets = None
|
||||
try:
|
||||
if hasattr(ndef_data, "records"):
|
||||
for record in ndef_data.records:
|
||||
# Check if this is an OpenTag3D MIME type record
|
||||
if (
|
||||
hasattr(record, "type")
|
||||
and record.type == "application/opentag3d"
|
||||
):
|
||||
octets = record.data
|
||||
logger.debug("Found OpenTag3D NDEF record")
|
||||
break
|
||||
except (AttributeError, TypeError) as ex:
|
||||
logger.debug("Failed to find OpenTag3D NDEF record: %s", ex)
|
||||
|
||||
if octets is None:
|
||||
logger.debug("Could not find OpenTag3D NDEF record")
|
||||
return None, None
|
||||
|
||||
# Parse OpenTag3D data
|
||||
tag_data = self._parse_opentag3d_data(octets)
|
||||
if tag_data is None:
|
||||
logger.debug("Not an OpenTag3D format tag")
|
||||
return None, None
|
||||
|
||||
logger.info(
|
||||
"Parsed OpenTag3D tag: manufacturer=%s, material=%s, color=%s",
|
||||
tag_data["manufacturer"],
|
||||
tag_data["material_name"],
|
||||
tag_data["color_name"],
|
||||
)
|
||||
logger.debug("All data: %s", tag_data)
|
||||
|
||||
# Generate filament name from template
|
||||
filament_name = self._generate_filament_name(tag_data)
|
||||
logger.info("Generated filament name from template: %s", filament_name)
|
||||
|
||||
# Find or create vendor
|
||||
vendor_id = self.spoolman_client.find_vendor_by_name(tag_data["manufacturer"])
|
||||
|
||||
if vendor_id is None:
|
||||
logger.info("Creating new vendor: %s", tag_data["manufacturer"])
|
||||
vendor_id = self.spoolman_client.create_vendor(tag_data["manufacturer"])
|
||||
if vendor_id is None:
|
||||
logger.error("Failed to create vendor")
|
||||
return None, None
|
||||
|
||||
# Find or create filament using vendor, material, and name
|
||||
# Material is constructed from base_material and material_modifiers
|
||||
material = tag_data["material_name"]
|
||||
filament_id = self.spoolman_client.find_filament_by_vendor_material_and_name(
|
||||
vendor_id, material, filament_name
|
||||
)
|
||||
|
||||
if filament_id is None:
|
||||
logger.info(
|
||||
"Creating new filament: %s %s", tag_data["manufacturer"], filament_name
|
||||
)
|
||||
|
||||
# Build base filament data with required fields
|
||||
filament_data = {
|
||||
"vendor_id": vendor_id,
|
||||
"name": filament_name,
|
||||
"material": material,
|
||||
"density": tag_data["density"],
|
||||
"diameter": tag_data["diameter_mm"],
|
||||
"color_hex": tag_data["color_hex"],
|
||||
}
|
||||
|
||||
# Build multi_color_hexes if color_2_hex is present
|
||||
if "color_2_hex" in tag_data:
|
||||
multi_color_hexes = [tag_data["color_hex"], tag_data["color_2_hex"]]
|
||||
if "color_3_hex" in tag_data:
|
||||
multi_color_hexes.append(tag_data["color_3_hex"])
|
||||
if "color_4_hex" in tag_data:
|
||||
multi_color_hexes.append(tag_data["color_4_hex"])
|
||||
filament_data["multi_color_hexes"] = multi_color_hexes
|
||||
|
||||
# Apply field mapping from config
|
||||
filament_data = self._apply_field_mapping(
|
||||
tag_data, self.filament_field_mapping, filament_data
|
||||
)
|
||||
|
||||
filament_id = self.spoolman_client.create_filament(filament_data)
|
||||
if filament_id is None:
|
||||
logger.error("Failed to create filament")
|
||||
return None, None
|
||||
|
||||
# Create spool with nfc_id
|
||||
logger.info(
|
||||
"Creating new spool for filament %s with nfc_id %s", filament_id, identifier
|
||||
)
|
||||
|
||||
# Build base spool data
|
||||
spool_data = {
|
||||
"filament_id": filament_id,
|
||||
}
|
||||
|
||||
# Apply field mapping from config
|
||||
spool_data = self._apply_field_mapping(
|
||||
tag_data, self.spool_field_mapping, spool_data
|
||||
)
|
||||
|
||||
# Add nfc_id to extra field
|
||||
if "extra" not in spool_data:
|
||||
spool_data["extra"] = {}
|
||||
spool_data["extra"]["nfc_id"] = f'"{identifier.lower()}"'
|
||||
|
||||
if "remaining_weight" not in spool_data:
|
||||
if "measured_weight" in tag_data:
|
||||
spool_data["remaining_weight"] = tag_data["measured_weight"]
|
||||
|
||||
if "initial_weight" not in spool_data:
|
||||
if "target_weight" in tag_data:
|
||||
spool_data["initial_weight"] = tag_data["target_weight"]
|
||||
|
||||
spool_id = self.spoolman_client.create_spool(spool_data)
|
||||
|
||||
if spool_id is None:
|
||||
logger.error("Failed to create spool")
|
||||
return None, None
|
||||
|
||||
logger.info(
|
||||
"Successfully created spool %s and filament %s", spool_id, filament_id
|
||||
)
|
||||
return str(spool_id), str(filament_id)
|
||||
|
|
@ -7,6 +7,7 @@ import json
|
|||
import logging
|
||||
from typing import Any, Dict, List, Optional
|
||||
import requests
|
||||
import base64
|
||||
|
||||
logger: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -15,15 +16,39 @@ logger: logging.Logger = logging.getLogger(__name__)
|
|||
class SpoolmanClient:
|
||||
"""Spoolman Web Client"""
|
||||
|
||||
def __init__(self, url: str) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
url: str,
|
||||
http_username: Optional[str],
|
||||
http_password: Optional[str],
|
||||
http_headers: Optional[str],
|
||||
) -> None:
|
||||
if url.endswith("/"):
|
||||
url = url[:-1]
|
||||
self.url: str = url
|
||||
self.http_headers = {}
|
||||
if http_headers:
|
||||
for c in http_headers.split(";"):
|
||||
c = c.strip()
|
||||
if not c:
|
||||
continue
|
||||
c_parts = c.split(":", 1)
|
||||
if len(c_parts) != 2:
|
||||
raise Exception(
|
||||
f"Section [spoolman], Option http_headers: {c}: Invalid header format"
|
||||
)
|
||||
self.http_headers[c_parts[0]] = c_parts[1]
|
||||
|
||||
if http_username and http_password:
|
||||
creds = base64.b64encode(
|
||||
f"{http_username}:{http_password}".encode()
|
||||
).decode()
|
||||
self.http_headers["Authorization"] = "Basic " + creds
|
||||
|
||||
def get_spool(self, spool_id: int) -> Dict[str, Any]:
|
||||
"""Get the spool from Spoolman"""
|
||||
url: str = self.url + f"/api/v1/spool/{spool_id}"
|
||||
response = requests.get(url, timeout=10)
|
||||
response = requests.get(url, timeout=10, headers=self.http_headers)
|
||||
if response.status_code != 200:
|
||||
raise ValueError(f"Request to spoolman failed: {response}")
|
||||
return response.json()
|
||||
|
|
@ -31,7 +56,7 @@ class SpoolmanClient:
|
|||
def get_spools(self) -> List[Dict[str, Any]]:
|
||||
"""Get the spools from spoolman"""
|
||||
url: str = self.url + "/api/v1/spool"
|
||||
response = requests.get(url, timeout=10)
|
||||
response = requests.get(url, timeout=10, headers=self.http_headers)
|
||||
if response.status_code != 200:
|
||||
raise ValueError(f"Request to spoolman failed: {response}")
|
||||
records: List[Dict[str, Any]] = json.loads(response.text)
|
||||
|
|
@ -61,7 +86,9 @@ class SpoolmanClient:
|
|||
extra["nfc_id"] = '""'
|
||||
|
||||
url: str = self.url + f"/api/v1/spool/{spool_id}"
|
||||
response = requests.patch(url, timeout=10, json={"extra": extra})
|
||||
response = requests.patch(
|
||||
url, timeout=10, headers=self.http_headers, json={"extra": extra}
|
||||
)
|
||||
if response.status_code != 200:
|
||||
raise ValueError(f"Request to spoolman failed: {response}: {response.text}")
|
||||
|
||||
|
|
@ -85,7 +112,9 @@ class SpoolmanClient:
|
|||
extra["nfc_id"] = nfc_id
|
||||
|
||||
url: str = self.url + f"/api/v1/spool/{spool_id}"
|
||||
response = requests.patch(url, timeout=10, json={"extra": extra})
|
||||
response = requests.patch(
|
||||
url, timeout=10, headers=self.http_headers, json={"extra": extra}
|
||||
)
|
||||
if response.status_code != 200:
|
||||
raise ValueError(f"Request to spoolman failed: {response}: {response.text}")
|
||||
|
||||
|
|
@ -102,7 +131,7 @@ class SpoolmanClient:
|
|||
"""
|
||||
try:
|
||||
url: str = self.url + "/api/v1/vendor"
|
||||
response = requests.get(url, timeout=10)
|
||||
response = requests.get(url, timeout=10, headers=self.http_headers)
|
||||
if response.status_code != 200:
|
||||
logger.error(
|
||||
"Failed to find vendor '%s': HTTP %d - %s",
|
||||
|
|
@ -136,7 +165,9 @@ class SpoolmanClient:
|
|||
try:
|
||||
url: str = self.url + "/api/v1/vendor"
|
||||
data = {"name": name}
|
||||
response = requests.post(url, json=data, timeout=10)
|
||||
response = requests.post(
|
||||
url, json=data, timeout=10, headers=self.http_headers
|
||||
)
|
||||
if response.status_code not in (200, 201):
|
||||
logger.error(
|
||||
"Failed to create vendor '%s': HTTP %d - %s",
|
||||
|
|
@ -167,7 +198,9 @@ class SpoolmanClient:
|
|||
try:
|
||||
url: str = self.url + "/api/v1/filament"
|
||||
params = {"vendor_id": vendor_id}
|
||||
response = requests.get(url, params=params, timeout=10)
|
||||
response = requests.get(
|
||||
url, params=params, timeout=10, headers=self.http_headers
|
||||
)
|
||||
if response.status_code != 200:
|
||||
logger.error(
|
||||
"Failed to find filament '%s' for vendor %d: HTTP %d - %s",
|
||||
|
|
@ -211,7 +244,9 @@ class SpoolmanClient:
|
|||
try:
|
||||
url: str = self.url + "/api/v1/filament"
|
||||
params = {"vendor_id": vendor_id}
|
||||
response = requests.get(url, params=params, timeout=10)
|
||||
response = requests.get(
|
||||
url, params=params, timeout=10, headers=self.http_headers
|
||||
)
|
||||
if response.status_code != 200:
|
||||
logger.error(
|
||||
"Failed to find filament '%s' (material: %s) for vendor %d: HTTP %d - %s",
|
||||
|
|
@ -262,7 +297,9 @@ class SpoolmanClient:
|
|||
try:
|
||||
url: str = self.url + "/api/v1/filament"
|
||||
logger.debug("Creating new filament: %s", data)
|
||||
response = requests.post(url, json=data, timeout=10)
|
||||
response = requests.post(
|
||||
url, json=data, timeout=10, headers=self.http_headers
|
||||
)
|
||||
if response.status_code not in (200, 201):
|
||||
logger.error(
|
||||
"Failed to create filament: HTTP %d - %s",
|
||||
|
|
@ -294,7 +331,9 @@ class SpoolmanClient:
|
|||
try:
|
||||
url: str = self.url + "/api/v1/spool"
|
||||
logger.debug("Creating new spool: %s", data)
|
||||
response = requests.post(url, json=data, timeout=10)
|
||||
response = requests.post(
|
||||
url, json=data, timeout=10, headers=self.http_headers
|
||||
)
|
||||
if response.status_code not in (200, 201):
|
||||
logger.error(
|
||||
"Failed to create spool: HTTP %d - %s",
|
||||
|
|
|
|||
|
|
@ -22,6 +22,8 @@ nfc-device = "tty:AMA0"
|
|||
[spoolman]
|
||||
# URL for the spoolman installation
|
||||
spoolman-url = "http://mainsailos.local:7912"
|
||||
# http_username = ""
|
||||
# http_password = ""
|
||||
|
||||
[moonraker]
|
||||
# URL for the moonraker installation
|
||||
|
|
|
|||
|
|
@ -99,7 +99,7 @@ if __name__ == "__main__":
|
|||
try:
|
||||
api_cmd = [sys.executable, api_script]
|
||||
if parsed_args.config_dir:
|
||||
api_cmd.extend(["-c", parsed_args.config_dir])
|
||||
os.environ["NFC2KLIPPER_CONFIG_DIR"] = parsed_args.config_dir
|
||||
# pylint: disable=consider-using-with
|
||||
api_process = subprocess.Popen(api_cmd) # nosec
|
||||
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@
|
|||
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
from typing import Any, Dict, Optional, Tuple, Union
|
||||
|
|
@ -23,23 +22,10 @@ from flask import Flask, render_template
|
|||
from lib.config import Nfc2KlipperConfig
|
||||
from lib.ipc import IPCClient
|
||||
|
||||
|
||||
Nfc2KlipperConfig.configure_logging()
|
||||
|
||||
# Parse command line arguments
|
||||
# pylint: disable=duplicate-code
|
||||
parser = argparse.ArgumentParser(description="Web API service for nfc2klipper.")
|
||||
parser.add_argument(
|
||||
"-c",
|
||||
"--config-dir",
|
||||
metavar="DIR",
|
||||
default=None,
|
||||
help=f"Configuration directory (default: {Nfc2KlipperConfig.CFG_DIR})",
|
||||
)
|
||||
parsed_args = parser.parse_args()
|
||||
|
||||
args: Optional[Dict[str, Any]] = Nfc2KlipperConfig.get_config(parsed_args.config_dir)
|
||||
# pylint: enable=duplicate-code
|
||||
cfg_path = os.environ.get("NFC2KLIPPER_CFG_DIR", Nfc2KlipperConfig.CFG_DIR)
|
||||
args: Optional[Dict[str, Any]] = Nfc2KlipperConfig.get_config(cfg_path)
|
||||
|
||||
if not args:
|
||||
print(
|
||||
|
|
|
|||
|
|
@ -25,7 +25,8 @@ from lib.config import Nfc2KlipperConfig
|
|||
from lib.ipc import IPCServer
|
||||
from lib.moonraker_web_client import MoonrakerWebClient
|
||||
from lib.nfc_handler import NfcHandler
|
||||
from lib.nfc_parsers import NdefTextParser, TagIdentifierParser, OpenTag3DParser
|
||||
from lib.nfc_parsers import NdefTextParser, TagIdentifierParser
|
||||
from lib.opentag3d_parser import OpenTag3DParser
|
||||
from lib.spoolman_client import SpoolmanClient
|
||||
|
||||
Nfc2KlipperConfig.configure_logging()
|
||||
|
|
@ -100,7 +101,12 @@ if USE_MOCK_OBJECTS:
|
|||
args["nfc"]["nfc-device"]
|
||||
)
|
||||
else:
|
||||
spoolman = SpoolmanClient(args["spoolman"]["spoolman-url"])
|
||||
spoolman = SpoolmanClient(
|
||||
args["spoolman"]["spoolman-url"],
|
||||
args["spoolman"].get("http_username"),
|
||||
args["spoolman"].get("http_password"),
|
||||
args["spoolman"].get("http_headers"),
|
||||
)
|
||||
moonraker = MoonrakerWebClient(
|
||||
args["moonraker"]["moonraker-url"],
|
||||
setting_gcode_template,
|
||||
|
|
@ -239,7 +245,6 @@ def on_nfc_tag_present(ndef_data: Any, identifier: str) -> None:
|
|||
def on_nfc_no_tag_present() -> None:
|
||||
"""Called when no tag is present (or tag without data)"""
|
||||
if should_clear_spool():
|
||||
|
||||
set_spool_and_filament(0, 0)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1,9 +1,9 @@
|
|||
flask==3.0.3
|
||||
toml==0.10.2
|
||||
nfcpy==1.0.4
|
||||
npyscreen==4.10.5
|
||||
npyscreen==5.0.2
|
||||
requests==2.32.5
|
||||
urllib3>=2.6.0
|
||||
Gunicorn==23.0.0
|
||||
Gunicorn==25.1.0
|
||||
types-toml==0.10.8.20240310
|
||||
types-requests==2.32.4.20260107
|
||||
|
|
|
|||
|
|
@ -14,6 +14,8 @@ import nfc
|
|||
import npyscreen
|
||||
import requests
|
||||
|
||||
from lib.config import Nfc2KlipperConfig
|
||||
from lib.spoolman_client import SpoolmanClient
|
||||
|
||||
SPOOL = "SPOOL"
|
||||
FILAMENT = "FILAMENT"
|
||||
|
|
@ -26,26 +28,19 @@ parser = argparse.ArgumentParser(
|
|||
parser.add_argument("--version", action="version", version="%(prog)s 0.0.1")
|
||||
|
||||
# pylint: disable=R0801
|
||||
parser.add_argument(
|
||||
"-d",
|
||||
"--nfc-device",
|
||||
metavar="device",
|
||||
default="ttyAMA0",
|
||||
help="Which NFC reader to use, see "
|
||||
+ "https://nfcpy.readthedocs.io/en/latest/topics/get-started.html#open-a-local-device"
|
||||
+ " for format",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"-u",
|
||||
"--url",
|
||||
metavar="URL",
|
||||
default="http://mainsailos.local:7912",
|
||||
help="URL for the Spoolman installation",
|
||||
)
|
||||
parser.add_argument("-c", "--config", metavar="config", help="Config directory")
|
||||
|
||||
|
||||
args = parser.parse_args()
|
||||
config = Nfc2KlipperConfig.get_config(args.config)
|
||||
assert config, "No config found"
|
||||
|
||||
spoolman = SpoolmanClient(
|
||||
config["spoolman"]["spoolman-url"],
|
||||
config["spoolman"].get("http_username"),
|
||||
config["spoolman"].get("http_password"),
|
||||
config["spoolman"].get("http_headers"),
|
||||
)
|
||||
|
||||
|
||||
def record_to_text(record):
|
||||
|
|
@ -77,9 +72,7 @@ class PostSelectForm(npyscreen.FormBaseNew):
|
|||
when_pressed_function=self.exit_app,
|
||||
)
|
||||
|
||||
url = args.url + "/api/v1/spool"
|
||||
records = requests.get(url, timeout=10)
|
||||
self.records = json.loads(records.text)
|
||||
self.records = spoolman.get_spools()
|
||||
self.records = sorted(self.records, key=lambda x: x["id"], reverse=True)
|
||||
|
||||
self.posts = self.add(
|
||||
|
|
@ -125,7 +118,7 @@ class TagWritingApp(npyscreen.NPSAppManaged):
|
|||
|
||||
self.status = "Written"
|
||||
|
||||
clf = nfc.ContactlessFrontend(args.nfc_device)
|
||||
clf = nfc.ContactlessFrontend(config["nfc"]["nfc-device"])
|
||||
clf.connect(
|
||||
rdwr={"on-connect": lambda tag: self.on_nfc_connect(tag, spool, filament)}
|
||||
)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue