Compare commits

..

10 commits

Author SHA1 Message Date
98227a0a86 add spoolman auth 2026-04-12 12:04:22 +02:00
Sebastian Andersson
b0c58fca13
Update README.md
Update description of link to https://github.com/ryanch/openprinttag_scanner
2026-02-16 10:59:08 +01:00
Sebastian Andersson
4720976009
Update README.md
Added link to https://github.com/ryanch/openprinttag_scanner
2026-02-16 10:57:15 +01:00
dependabot[bot]
3823b4e780 Bump gunicorn from 25.0.3 to 25.1.0
Bumps [gunicorn](https://github.com/benoitc/gunicorn) from 25.0.3 to 25.1.0.
- [Release notes](https://github.com/benoitc/gunicorn/releases)
- [Commits](https://github.com/benoitc/gunicorn/compare/25.0.3...25.1.0)

---
updated-dependencies:
- dependency-name: gunicorn
  dependency-version: 25.1.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-02-16 06:33:36 +01:00
dependabot[bot]
2caca5ff91 Bump npyscreen from 4.10.5 to 5.0.2
Bumps [npyscreen](https://github.com/npcole/npyscreen) from 4.10.5 to 5.0.2.
- [Release notes](https://github.com/npcole/npyscreen/releases)
- [Changelog](https://github.com/npcole/npyscreen/blob/master/CHANGELOG)
- [Commits](https://github.com/npcole/npyscreen/commits/v5.0.2)

---
updated-dependencies:
- dependency-name: npyscreen
  dependency-version: 5.0.2
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-02-09 09:21:07 +01:00
Sebastian Andersson
f44f349d8a Fix nfc2klipper_api with gunicorn and formatting 2026-02-09 09:18:56 +01:00
dependabot[bot]
b89b0c2f32 Bump gunicorn from 24.1.1 to 25.0.3
Bumps [gunicorn](https://github.com/benoitc/gunicorn) from 24.1.1 to 25.0.3.
- [Release notes](https://github.com/benoitc/gunicorn/releases)
- [Commits](https://github.com/benoitc/gunicorn/compare/24.1.1...25.0.3)

---
updated-dependencies:
- dependency-name: gunicorn
  dependency-version: 25.0.3
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-02-09 09:18:56 +01:00
dependabot[bot]
911409c310 Bump gunicorn from 23.0.0 to 24.1.1
Bumps [gunicorn](https://github.com/benoitc/gunicorn) from 23.0.0 to 24.1.1.
- [Release notes](https://github.com/benoitc/gunicorn/releases)
- [Commits](https://github.com/benoitc/gunicorn/compare/23.0.0...24.1.1)

---
updated-dependencies:
- dependency-name: gunicorn
  dependency-version: 24.1.1
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-01-26 08:12:33 +01:00
Sebastian Andersson
82afd6a786 Added links to related projects 2026-01-14 16:33:46 +01:00
Sebastian Andersson
96cfcc9b08 Move OpenTag3DParser to own file 2026-01-13 11:08:21 +01:00
10 changed files with 593 additions and 555 deletions

View file

@ -33,7 +33,7 @@ Automatically sets the loaded spool &amp; filament in klipper by using NFC/RFID
- [Use with Happy-Hare](#use-with-happy-hare) - [Use with Happy-Hare](#use-with-happy-hare)
- [Use with Prusa's OpenPrintTag tags](#use-with-prusas-openprinttag-tags) - [Use with Prusa's OpenPrintTag tags](#use-with-prusas-openprinttag-tags)
- [Use with OpenTag3D tags](#use-with-opentag3d-tags) - [Use with OpenTag3D tags](#use-with-opentag3d-tags)
- [See also](#see-also) - [Related projects](#related-projects)
- [Developer info](#developer-info) - [Developer info](#developer-info)
@ -301,12 +301,14 @@ See Spoolman's API documentation [here](https://donkie.github.io/Spoolman/) to s
You can also add extra fields in Spoolman for saving the data from the OpenTag3D tags. You can also add extra fields in Spoolman for saving the data from the OpenTag3D tags.
## See also ## Related projects
If nfc2klipper doesn't work for some reason,
[spool2klipper](https://github.com/bofh69/spool2klipper) can be use * [FilaMan](https://www.filaman.app/) - a filament management system.
to automatically update the `active_filament` variable whenever the spool * [esp_to_spoolman](https://github.com/dimbas80/esp_to_spoolman) - like nfc2klipper, but running on an ESP32.
is changed in Moonraker (when changing it in the frontend for example). * [OpenPrintTag Scanner](https://github.com/ryanch/openprinttag_scanner) - like esp_to_spoolman, but with display and for PrusaLink instead of Klipper.
That way `ASSERT_ACTIVE_FILAMENT` will still work correctly. * [spool2klipper](https://github.com/bofh69/spool2klipper) - to set the filament id (and other data) when the spool id is changed.
* [spoolman2slicer](https://github.com/bofh69/spoolman2slicer) - create filament config from Spoolman.
* [OpenTag3d](https://opentag3d.info/) tag format.
## Developer info ## Developer info

View file

@ -4,7 +4,6 @@
"""Tag parsers for different data formats""" """Tag parsers for different data formats"""
import logging import logging
import re
from typing import Any, Dict, List, Optional, Protocol, Tuple from typing import Any, Dict, List, Optional, Protocol, Tuple
import ndef # pylint: disable=import-error import ndef # pylint: disable=import-error
@ -133,496 +132,3 @@ class TagIdentifierParser:
identifier, identifier,
) )
return None, None return None, None
class OpenTag3DParser:
"""Parser for OpenTag3D format tags
Parses tags following the OpenTag3D specification from https://opentag3d.info/spec
Based on OpenTag3D spec version 0.012
Extracts manufacturer, material, color, and other filament data, then creates
or matches entries in Spoolman.
"""
def __init__(
self,
spoolman_client: Any,
filament_name_template: str,
filament_field_mapping: Dict[str, str],
spool_field_mapping: Dict[str, str],
) -> None:
"""Initialize with a Spoolman client instance
Args:
spoolman_client: Client object with methods to interact with Spoolman API
filament_name_template: Template string for generating filament names from tag data
filament_field_mapping: Mapping from Spoolman filament fields to OpenTag3D fields
spool_field_mapping: Mapping from Spoolman spool fields to OpenTag3D fields
"""
self.spoolman_client = spoolman_client
self.filament_name_template = filament_name_template
self.filament_field_mapping = filament_field_mapping
self.spool_field_mapping = spool_field_mapping
def _parse_rgba_to_hex(self, octets: bytes, offset: int) -> Optional[str]:
"""Parse RGBA color at given offset and return hex string if not transparent black
Args:
octets: Raw bytes from the NFC tag
offset: Byte offset to start reading RGBA (4 bytes)
Returns:
Hex color string (e.g., "FF0000FF") or None if transparent black
"""
if len(octets) < offset + 4:
return None
r = octets[offset]
g = octets[offset + 1]
b = octets[offset + 2]
a = octets[offset + 3]
# Only return color if not transparent black (indicates no color)
if r == 0 and g == 0 and b == 0 and a == 0:
return None
return f"{r:02x}{g:02x}{b:02x}{a:02x}"
def _apply_field_mapping(
self,
tag_data: Dict[str, Any],
field_mapping: Dict[str, str],
base_data: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Apply field mapping from OpenTag3D data to Spoolman fields
Args:
tag_data: Parsed OpenTag3D tag data
field_mapping: Mapping from Spoolman fields to OpenTag3D fields
base_data: Optional base dictionary to start with
Returns:
Dictionary with mapped fields ready for Spoolman API
"""
result = base_data.copy() if base_data else {}
for spoolman_field, ot3d_field in field_mapping.items():
if ot3d_field in tag_data:
value = tag_data[ot3d_field]
# Handle nested fields (e.g., "extra.custom_field")
if "." in spoolman_field:
parts = spoolman_field.split(".", 1)
parent_key = parts[0]
child_key = parts[1]
if parent_key not in result:
result[parent_key] = {}
result[parent_key][child_key] = value
else:
result[spoolman_field] = value
return result
def _generate_filament_name(self, tag_data: Dict[str, Any]) -> str:
"""Generate filament name from template using tag data
Args:
tag_data: Parsed OpenTag3D tag data
Returns:
Formatted filament name
"""
# Use string formatting with the template
try:
# Simple approach: format with all fields, then clean up
name = self.filament_name_template.format(**tag_data)
# Clean up extra spaces
name = " ".join(name.split())
# Clean up trailing/leading dashes and spaces around dashes
# Remove trailing dash (with optional spaces)
name = re.sub(r"\s*-\s*$", "", name)
# Remove leading dash (with optional spaces)
name = re.sub(r"^\s*-\s*", "", name)
# Clean up double spaces again after dash removal
name = " ".join(name.split())
return name
except (KeyError, ValueError) as ex:
logger.warning("Template formatting error: %s, using fallback", ex)
# Fallback to simple material_name
return tag_data.get("material_name", "Unknown")
# pylint: disable=too-many-locals,too-many-branches,too-many-statements
def _parse_opentag3d_data(self, octets: bytes) -> Optional[Dict[str, Any]]:
"""Parse OpenTag3D format data from tag octets
Args:
octets: Raw bytes from the NDEF record payload
Returns:
Dictionary with parsed data, or None if not a valid OpenTag3D tag
"""
# OpenTag3D data starts at offset 0x00 (new spec v0.010+)
# Check if we have enough data
if len(octets) < 0x64: # Minimum size for core fields
return None
# Parse tag version (2 bytes, big endian) at offset 0x00
tag_version = int.from_bytes(octets[0x00:0x02], byteorder="big")
logger.debug(
"Parsing OpenTag3D version %d.%02d tag",
tag_version // 256,
tag_version % 256,
)
if tag_version < 0x000C:
# Too old, ignoring
return None
# Parse base material (5 bytes UTF-8) at offset 0x02
material_base = (
octets[0x02:0x07].decode("utf-8", errors="ignore").rstrip("\x00")
)
# Parse material modifiers (5 bytes UTF-8) at offset 0x07
material_mod = octets[0x07:0x0C].decode("utf-8", errors="ignore").rstrip("\x00")
# Parse manufacturer (16 bytes UTF-8) at offset 0x1B
manufacturer = octets[0x1B:0x2B].decode("utf-8", errors="ignore").rstrip("\x00")
# Parse color name (32 bytes UTF-8) at offset 0x2B
color_name = octets[0x2B:0x4B].decode("utf-8", errors="ignore").rstrip("\x00")
# Parse color 1 (4 bytes RGBA) at offset 0x4B
color_1_hex = self._parse_rgba_to_hex(octets, 0x4B)
# Parse target diameter (2 bytes, µm) at offset 0x5C
target_diameter = int.from_bytes(octets[0x5C:0x5E], byteorder="big")
diameter_mm = target_diameter / 1000.0
# Parse target weight (2 bytes, grams) at offset 0x5E
target_weight = int.from_bytes(octets[0x5E:0x60], byteorder="big")
# Parse print temperature (1 byte, divided by 5) at offset 0x60
print_temp_raw = octets[0x60]
print_temp = print_temp_raw * 5
# Parse bed temperature (1 byte, divided by 5) at offset 0x61
bed_temp_raw = octets[0x61]
bed_temp = bed_temp_raw * 5
# Parse density (2 bytes, µg/cm³) at offset 0x62
density_raw = int.from_bytes(octets[0x62:0x64], byteorder="big")
density = density_raw / 1000.0
# Build material name
material_name = material_base
if material_mod:
material_name = f"{material_base}-{material_mod}"
result = {
"tag_version": tag_version,
"manufacturer": manufacturer,
"material_name": material_name,
"material_base": material_base,
"material_mod": material_mod,
"color_name": color_name,
"color_hex": color_1_hex,
"diameter_mm": diameter_mm,
"target_weight": target_weight,
"print_temp": print_temp,
"bed_temp": bed_temp,
"density": density,
}
# Parse color 2 at offset 0x50
color_2_hex = self._parse_rgba_to_hex(octets, 0x50)
if color_2_hex:
result["color_2_hex"] = color_2_hex
# Parse color 3 at offset 0x54
color_3_hex = self._parse_rgba_to_hex(octets, 0x54)
if color_3_hex:
result["color_3_hex"] = color_3_hex
# Parse color 4 at offset 0x58
color_4_hex = self._parse_rgba_to_hex(octets, 0x58)
if color_4_hex:
result["color_4_hex"] = color_4_hex
# Parse online data URL (32 bytes ASCII) at 0x70
if len(octets) >= 0x70 + 32:
online_url = (
octets[0x70:0x90].decode("ascii", errors="ignore").rstrip("\x00")
)
if online_url:
result["online_data_url"] = online_url
# Parse extended fields if available (NTAG215/216)
# Extended fields start at 0x90
if len(octets) >= 0x90 + 16:
# Parse serial number / batch ID (16 bytes UTF-8) at 0x90
serial = octets[0x90:0xA0].decode("utf-8", errors="ignore").rstrip("\x00")
if serial:
result["serial"] = serial
if len(octets) >= 0xA0 + 4:
# Parse manufacture date (4 bytes: year, year, month, day) at 0xA0
mfg_year = int.from_bytes(octets[0xA0:0xA2], byteorder="big")
mfg_month = octets[0xA2]
mfg_day = octets[0xA3]
if mfg_year > 0 and mfg_month > 0 and mfg_day > 0:
result["mfg_date"] = f"{mfg_year:04d}-{mfg_month:02d}-{mfg_day:02d}"
if len(octets) >= 0xA4 + 3:
# Parse manufacture time (3 bytes: hour, minute, second) at 0xA4
mfg_hour = octets[0xA4]
mfg_minute = octets[0xA5]
mfg_second = octets[0xA6]
if mfg_hour < 24 and mfg_minute < 60 and mfg_second < 60:
result["mfg_time"] = f"{mfg_hour:02d}:{mfg_minute:02d}:{mfg_second:02d}"
if len(octets) >= 0xA7 + 1:
# Parse spool core diameter (1 byte, mm) at 0xA7
spool_core_diameter = octets[0xA7]
if spool_core_diameter > 0:
result["spool_core_diameter"] = spool_core_diameter
if len(octets) >= 0xA8 + 1:
# Parse MFI temperature (1 byte, divided by 5) at 0xA8
mfi_temp_raw = octets[0xA8]
if mfi_temp_raw > 0:
result["mfi_temp"] = mfi_temp_raw * 5
if len(octets) >= 0xA9 + 1:
# Parse MFI load (1 byte, divided by 10) at 0xA9
mfi_load_raw = octets[0xA9]
if mfi_load_raw > 0:
result["mfi_load"] = mfi_load_raw * 10
if len(octets) >= 0xAA + 1:
# Parse MFI value (1 byte, divided by 10) at 0xAA
mfi_value_raw = octets[0xAA]
if mfi_value_raw > 0:
result["mfi_value"] = mfi_value_raw / 10.0
if len(octets) >= 0xAB + 1:
# Parse measured tolerance (1 byte, µm) at 0xAB
measured_tolerance = octets[0xAB]
if measured_tolerance > 0:
result["measured_tolerance"] = measured_tolerance
if len(octets) >= 0xAC + 2:
# Parse empty spool weight (2 bytes, grams) at 0xAC
empty_spool_weight = int.from_bytes(octets[0xAC:0xAE], byteorder="big")
if 0 < empty_spool_weight < 65535:
result["empty_spool_weight"] = empty_spool_weight
if len(octets) >= 0xAE + 2:
# Parse measured filament weight (2 bytes, grams) at 0xAE
measured_filament_weight = int.from_bytes(
octets[0xAE:0xB0], byteorder="big"
)
if 0 < measured_filament_weight < 65535:
result["measured_filament_weight"] = measured_filament_weight
if len(octets) >= 0xB0 + 2:
# Parse measured filament length (2 bytes, meters) at 0xB0
measured_filament_length = int.from_bytes(
octets[0xB0:0xB2], byteorder="big"
)
if 0 < measured_filament_length < 65535:
result["measured_filament_length"] = measured_filament_length
if len(octets) >= 0xB2 + 2:
# Parse transmission distance (2 bytes, µm) at 0xB2
transmission_distance = int.from_bytes(octets[0xB2:0xB4], byteorder="big")
if 0 < transmission_distance < 65535:
result["transmission_distance"] = transmission_distance
if len(octets) >= 0xB4 + 1:
# Parse max dry temp (1 byte, divided by 5) at 0xB4
max_dry_temp_raw = octets[0xB4]
if max_dry_temp_raw > 0:
result["max_dry_temp"] = max_dry_temp_raw * 5
if len(octets) >= 0xB5 + 1:
# Parse dry time (1 byte, hours) at 0xB5
dry_time = octets[0xB5]
if dry_time > 0:
result["dry_time"] = dry_time
if len(octets) >= 0xB6 + 1:
# Parse min print temp (1 byte, divided by 5) at 0xB6
min_print_temp_raw = octets[0xB6]
if min_print_temp_raw > 0:
result["min_print_temp"] = min_print_temp_raw * 5
if len(octets) >= 0xB7 + 1:
# Parse max print temp (1 byte, divided by 5) at 0xB7
max_print_temp_raw = octets[0xB7]
if max_print_temp_raw > 0:
result["max_print_temp"] = max_print_temp_raw * 5
if len(octets) >= 0xB8 + 1:
# Parse min volumetric speed (1 byte, mm³/s) at 0xB8
min_vso = octets[0xB8]
if min_vso > 0:
result["min_volumetric_speed"] = min_vso
if len(octets) >= 0xB9 + 1:
# Parse max volumetric speed (1 byte, mm³/s) at 0xB9
max_vso = octets[0xB9]
if max_vso > 0:
result["max_volumetric_speed"] = max_vso
if len(octets) >= 0xBA + 1:
# Parse target volumetric speed (1 byte, mm³/s) at 0xBA
target_vso = octets[0xBA]
if target_vso > 0:
result["target_volumetric_speed"] = target_vso
return result
# pylint: disable=too-many-locals,too-many-return-statements,too-many-branches,too-many-statements
def parse(
self, ndef_data: Any, identifier: str
) -> Tuple[Optional[str], Optional[str]]:
"""Parse OpenTag3D tag data and create/match entries in Spoolman
Args:
ndef_data: NDEF data structure from the tag
identifier: Tag identifier string
Returns:
Tuple of (spool_id, filament_id) as strings, or (None, None) if not found
"""
if ndef_data is None:
return None, None
# Look for the OpenTag3D NDEF record with MIME type "application/opentag3d"
octets = None
try:
if hasattr(ndef_data, "records"):
for record in ndef_data.records:
# Check if this is an OpenTag3D MIME type record
if (
hasattr(record, "type")
and record.type == "application/opentag3d"
):
octets = record.data
logger.debug("Found OpenTag3D NDEF record")
break
except (AttributeError, TypeError) as ex:
logger.debug("Failed to find OpenTag3D NDEF record: %s", ex)
if octets is None:
logger.debug("Could not find OpenTag3D NDEF record")
return None, None
# Parse OpenTag3D data
tag_data = self._parse_opentag3d_data(octets)
if tag_data is None:
logger.debug("Not an OpenTag3D format tag")
return None, None
logger.info(
"Parsed OpenTag3D tag: manufacturer=%s, material=%s, color=%s",
tag_data["manufacturer"],
tag_data["material_name"],
tag_data["color_name"],
)
logger.debug("All data: %s", tag_data)
# Generate filament name from template
filament_name = self._generate_filament_name(tag_data)
logger.info("Generated filament name from template: %s", filament_name)
# Find or create vendor
vendor_id = self.spoolman_client.find_vendor_by_name(tag_data["manufacturer"])
if vendor_id is None:
logger.info("Creating new vendor: %s", tag_data["manufacturer"])
vendor_id = self.spoolman_client.create_vendor(tag_data["manufacturer"])
if vendor_id is None:
logger.error("Failed to create vendor")
return None, None
# Find or create filament using vendor, material, and name
# Material is constructed from base_material and material_modifiers
material = tag_data["material_name"]
filament_id = self.spoolman_client.find_filament_by_vendor_material_and_name(
vendor_id, material, filament_name
)
if filament_id is None:
logger.info(
"Creating new filament: %s %s", tag_data["manufacturer"], filament_name
)
# Build base filament data with required fields
filament_data = {
"vendor_id": vendor_id,
"name": filament_name,
"material": material,
"density": tag_data["density"],
"diameter": tag_data["diameter_mm"],
"color_hex": tag_data["color_hex"],
}
# Build multi_color_hexes if color_2_hex is present
if "color_2_hex" in tag_data:
multi_color_hexes = [tag_data["color_hex"], tag_data["color_2_hex"]]
if "color_3_hex" in tag_data:
multi_color_hexes.append(tag_data["color_3_hex"])
if "color_4_hex" in tag_data:
multi_color_hexes.append(tag_data["color_4_hex"])
filament_data["multi_color_hexes"] = multi_color_hexes
# Apply field mapping from config
filament_data = self._apply_field_mapping(
tag_data, self.filament_field_mapping, filament_data
)
filament_id = self.spoolman_client.create_filament(filament_data)
if filament_id is None:
logger.error("Failed to create filament")
return None, None
# Create spool with nfc_id
logger.info(
"Creating new spool for filament %s with nfc_id %s", filament_id, identifier
)
# Build base spool data
spool_data = {
"filament_id": filament_id,
}
# Apply field mapping from config
spool_data = self._apply_field_mapping(
tag_data, self.spool_field_mapping, spool_data
)
# Add nfc_id to extra field
if "extra" not in spool_data:
spool_data["extra"] = {}
spool_data["extra"]["nfc_id"] = f'"{identifier.lower()}"'
if "remaining_weight" not in spool_data:
if "measured_weight" in tag_data:
spool_data["remaining_weight"] = tag_data["measured_weight"]
if "initial_weight" not in spool_data:
if "target_weight" in tag_data:
spool_data["initial_weight"] = tag_data["target_weight"]
spool_id = self.spoolman_client.create_spool(spool_data)
if spool_id is None:
logger.error("Failed to create spool")
return None, None
logger.info(
"Successfully created spool %s and filament %s", spool_id, filament_id
)
return str(spool_id), str(filament_id)

505
lib/opentag3d_parser.py Normal file
View file

@ -0,0 +1,505 @@
# SPDX-FileCopyrightText: 2024-2026 Sebastian Andersson <sebastian@bittr.nu>
# SPDX-License-Identifier: GPL-3.0-or-later
"""Tag parsers for different data formats"""
import logging
import re
from typing import Any, Dict, Optional, Tuple
logger: logging.Logger = logging.getLogger(__name__)
# pylint: disable=too-few-public-methods
class OpenTag3DParser:
"""Parser for OpenTag3D format tags
Parses tags following the OpenTag3D specification from https://opentag3d.info/spec
Based on OpenTag3D spec version 0.012
Extracts manufacturer, material, color, and other filament data, then creates
or matches entries in Spoolman.
"""
def __init__(
self,
spoolman_client: Any,
filament_name_template: str,
filament_field_mapping: Dict[str, str],
spool_field_mapping: Dict[str, str],
) -> None:
"""Initialize with a Spoolman client instance
Args:
spoolman_client: Client object with methods to interact with Spoolman API
filament_name_template: Template string for generating filament names from tag data
filament_field_mapping: Mapping from Spoolman filament fields to OpenTag3D fields
spool_field_mapping: Mapping from Spoolman spool fields to OpenTag3D fields
"""
self.spoolman_client = spoolman_client
self.filament_name_template = filament_name_template
self.filament_field_mapping = filament_field_mapping
self.spool_field_mapping = spool_field_mapping
def _parse_rgba_to_hex(self, octets: bytes, offset: int) -> Optional[str]:
"""Parse RGBA color at given offset and return hex string if not transparent black
Args:
octets: Raw bytes from the NFC tag
offset: Byte offset to start reading RGBA (4 bytes)
Returns:
Hex color string (e.g., "FF0000FF") or None if transparent black
"""
if len(octets) < offset + 4:
return None
r = octets[offset]
g = octets[offset + 1]
b = octets[offset + 2]
a = octets[offset + 3]
# Only return color if not transparent black (indicates no color)
if r == 0 and g == 0 and b == 0 and a == 0:
return None
return f"{r:02x}{g:02x}{b:02x}{a:02x}"
def _apply_field_mapping(
self,
tag_data: Dict[str, Any],
field_mapping: Dict[str, str],
base_data: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Apply field mapping from OpenTag3D data to Spoolman fields
Args:
tag_data: Parsed OpenTag3D tag data
field_mapping: Mapping from Spoolman fields to OpenTag3D fields
base_data: Optional base dictionary to start with
Returns:
Dictionary with mapped fields ready for Spoolman API
"""
result = base_data.copy() if base_data else {}
for spoolman_field, ot3d_field in field_mapping.items():
if ot3d_field in tag_data:
value = tag_data[ot3d_field]
# Handle nested fields (e.g., "extra.custom_field")
if "." in spoolman_field:
parts = spoolman_field.split(".", 1)
parent_key = parts[0]
child_key = parts[1]
if parent_key not in result:
result[parent_key] = {}
result[parent_key][child_key] = value
else:
result[spoolman_field] = value
return result
def _generate_filament_name(self, tag_data: Dict[str, Any]) -> str:
"""Generate filament name from template using tag data
Args:
tag_data: Parsed OpenTag3D tag data
Returns:
Formatted filament name
"""
# Use string formatting with the template
try:
# Simple approach: format with all fields, then clean up
name = self.filament_name_template.format(**tag_data)
# Clean up extra spaces
name = " ".join(name.split())
# Clean up trailing/leading dashes and spaces around dashes
# Remove trailing dash (with optional spaces)
name = re.sub(r"\s*-\s*$", "", name)
# Remove leading dash (with optional spaces)
name = re.sub(r"^\s*-\s*", "", name)
# Clean up double spaces again after dash removal
name = " ".join(name.split())
return name
except (KeyError, ValueError) as ex:
logger.warning("Template formatting error: %s, using fallback", ex)
# Fallback to simple material_name
return tag_data.get("material_name", "Unknown")
# pylint: disable=too-many-locals,too-many-branches,too-many-statements
def _parse_opentag3d_data(self, octets: bytes) -> Optional[Dict[str, Any]]:
"""Parse OpenTag3D format data from tag octets
Args:
octets: Raw bytes from the NDEF record payload
Returns:
Dictionary with parsed data, or None if not a valid OpenTag3D tag
"""
# OpenTag3D data starts at offset 0x00 (new spec v0.010+)
# Check if we have enough data
if len(octets) < 0x64: # Minimum size for core fields
return None
# Parse tag version (2 bytes, big endian) at offset 0x00
tag_version = int.from_bytes(octets[0x00:0x02], byteorder="big")
logger.debug(
"Parsing OpenTag3D version %d.%02d tag",
tag_version // 256,
tag_version % 256,
)
if tag_version < 0x000C:
# Too old, ignoring
return None
# Parse base material (5 bytes UTF-8) at offset 0x02
material_base = (
octets[0x02:0x07].decode("utf-8", errors="ignore").rstrip("\x00")
)
# Parse material modifiers (5 bytes UTF-8) at offset 0x07
material_mod = octets[0x07:0x0C].decode("utf-8", errors="ignore").rstrip("\x00")
# Parse manufacturer (16 bytes UTF-8) at offset 0x1B
manufacturer = octets[0x1B:0x2B].decode("utf-8", errors="ignore").rstrip("\x00")
# Parse color name (32 bytes UTF-8) at offset 0x2B
color_name = octets[0x2B:0x4B].decode("utf-8", errors="ignore").rstrip("\x00")
# Parse color 1 (4 bytes RGBA) at offset 0x4B
color_1_hex = self._parse_rgba_to_hex(octets, 0x4B)
# Parse target diameter (2 bytes, µm) at offset 0x5C
target_diameter = int.from_bytes(octets[0x5C:0x5E], byteorder="big")
diameter_mm = target_diameter / 1000.0
# Parse target weight (2 bytes, grams) at offset 0x5E
target_weight = int.from_bytes(octets[0x5E:0x60], byteorder="big")
# Parse print temperature (1 byte, divided by 5) at offset 0x60
print_temp_raw = octets[0x60]
print_temp = print_temp_raw * 5
# Parse bed temperature (1 byte, divided by 5) at offset 0x61
bed_temp_raw = octets[0x61]
bed_temp = bed_temp_raw * 5
# Parse density (2 bytes, µg/cm³) at offset 0x62
density_raw = int.from_bytes(octets[0x62:0x64], byteorder="big")
density = density_raw / 1000.0
# Build material name
material_name = material_base
if material_mod:
material_name = f"{material_base}-{material_mod}"
result = {
"tag_version": tag_version,
"manufacturer": manufacturer,
"material_name": material_name,
"material_base": material_base,
"material_mod": material_mod,
"color_name": color_name,
"color_hex": color_1_hex,
"diameter_mm": diameter_mm,
"target_weight": target_weight,
"print_temp": print_temp,
"bed_temp": bed_temp,
"density": density,
}
# Parse color 2 at offset 0x50
color_2_hex = self._parse_rgba_to_hex(octets, 0x50)
if color_2_hex:
result["color_2_hex"] = color_2_hex
# Parse color 3 at offset 0x54
color_3_hex = self._parse_rgba_to_hex(octets, 0x54)
if color_3_hex:
result["color_3_hex"] = color_3_hex
# Parse color 4 at offset 0x58
color_4_hex = self._parse_rgba_to_hex(octets, 0x58)
if color_4_hex:
result["color_4_hex"] = color_4_hex
# Parse online data URL (32 bytes ASCII) at 0x70
if len(octets) >= 0x70 + 32:
online_url = (
octets[0x70:0x90].decode("ascii", errors="ignore").rstrip("\x00")
)
if online_url:
result["online_data_url"] = online_url
# Parse extended fields if available (NTAG215/216)
# Extended fields start at 0x90
if len(octets) >= 0x90 + 16:
# Parse serial number / batch ID (16 bytes UTF-8) at 0x90
serial = octets[0x90:0xA0].decode("utf-8", errors="ignore").rstrip("\x00")
if serial:
result["serial"] = serial
if len(octets) >= 0xA0 + 4:
# Parse manufacture date (4 bytes: year, year, month, day) at 0xA0
mfg_year = int.from_bytes(octets[0xA0:0xA2], byteorder="big")
mfg_month = octets[0xA2]
mfg_day = octets[0xA3]
if mfg_year > 0 and mfg_month > 0 and mfg_day > 0:
result["mfg_date"] = f"{mfg_year:04d}-{mfg_month:02d}-{mfg_day:02d}"
if len(octets) >= 0xA4 + 3:
# Parse manufacture time (3 bytes: hour, minute, second) at 0xA4
mfg_hour = octets[0xA4]
mfg_minute = octets[0xA5]
mfg_second = octets[0xA6]
if mfg_hour < 24 and mfg_minute < 60 and mfg_second < 60:
result["mfg_time"] = f"{mfg_hour:02d}:{mfg_minute:02d}:{mfg_second:02d}"
if len(octets) >= 0xA7 + 1:
# Parse spool core diameter (1 byte, mm) at 0xA7
spool_core_diameter = octets[0xA7]
if spool_core_diameter > 0:
result["spool_core_diameter"] = spool_core_diameter
if len(octets) >= 0xA8 + 1:
# Parse MFI temperature (1 byte, divided by 5) at 0xA8
mfi_temp_raw = octets[0xA8]
if mfi_temp_raw > 0:
result["mfi_temp"] = mfi_temp_raw * 5
if len(octets) >= 0xA9 + 1:
# Parse MFI load (1 byte, divided by 10) at 0xA9
mfi_load_raw = octets[0xA9]
if mfi_load_raw > 0:
result["mfi_load"] = mfi_load_raw * 10
if len(octets) >= 0xAA + 1:
# Parse MFI value (1 byte, divided by 10) at 0xAA
mfi_value_raw = octets[0xAA]
if mfi_value_raw > 0:
result["mfi_value"] = mfi_value_raw / 10.0
if len(octets) >= 0xAB + 1:
# Parse measured tolerance (1 byte, µm) at 0xAB
measured_tolerance = octets[0xAB]
if measured_tolerance > 0:
result["measured_tolerance"] = measured_tolerance
if len(octets) >= 0xAC + 2:
# Parse empty spool weight (2 bytes, grams) at 0xAC
empty_spool_weight = int.from_bytes(octets[0xAC:0xAE], byteorder="big")
if 0 < empty_spool_weight < 65535:
result["empty_spool_weight"] = empty_spool_weight
if len(octets) >= 0xAE + 2:
# Parse measured filament weight (2 bytes, grams) at 0xAE
measured_filament_weight = int.from_bytes(
octets[0xAE:0xB0], byteorder="big"
)
if 0 < measured_filament_weight < 65535:
result["measured_filament_weight"] = measured_filament_weight
if len(octets) >= 0xB0 + 2:
# Parse measured filament length (2 bytes, meters) at 0xB0
measured_filament_length = int.from_bytes(
octets[0xB0:0xB2], byteorder="big"
)
if 0 < measured_filament_length < 65535:
result["measured_filament_length"] = measured_filament_length
if len(octets) >= 0xB2 + 2:
# Parse transmission distance (2 bytes, µm) at 0xB2
transmission_distance = int.from_bytes(octets[0xB2:0xB4], byteorder="big")
if 0 < transmission_distance < 65535:
result["transmission_distance"] = transmission_distance
if len(octets) >= 0xB4 + 1:
# Parse max dry temp (1 byte, divided by 5) at 0xB4
max_dry_temp_raw = octets[0xB4]
if max_dry_temp_raw > 0:
result["max_dry_temp"] = max_dry_temp_raw * 5
if len(octets) >= 0xB5 + 1:
# Parse dry time (1 byte, hours) at 0xB5
dry_time = octets[0xB5]
if dry_time > 0:
result["dry_time"] = dry_time
if len(octets) >= 0xB6 + 1:
# Parse min print temp (1 byte, divided by 5) at 0xB6
min_print_temp_raw = octets[0xB6]
if min_print_temp_raw > 0:
result["min_print_temp"] = min_print_temp_raw * 5
if len(octets) >= 0xB7 + 1:
# Parse max print temp (1 byte, divided by 5) at 0xB7
max_print_temp_raw = octets[0xB7]
if max_print_temp_raw > 0:
result["max_print_temp"] = max_print_temp_raw * 5
if len(octets) >= 0xB8 + 1:
# Parse min volumetric speed (1 byte, mm³/s) at 0xB8
min_vso = octets[0xB8]
if min_vso > 0:
result["min_volumetric_speed"] = min_vso
if len(octets) >= 0xB9 + 1:
# Parse max volumetric speed (1 byte, mm³/s) at 0xB9
max_vso = octets[0xB9]
if max_vso > 0:
result["max_volumetric_speed"] = max_vso
if len(octets) >= 0xBA + 1:
# Parse target volumetric speed (1 byte, mm³/s) at 0xBA
target_vso = octets[0xBA]
if target_vso > 0:
result["target_volumetric_speed"] = target_vso
return result
# pylint: disable=too-many-locals,too-many-return-statements,too-many-branches,too-many-statements
def parse(
self, ndef_data: Any, identifier: str
) -> Tuple[Optional[str], Optional[str]]:
"""Parse OpenTag3D tag data and create/match entries in Spoolman
Args:
ndef_data: NDEF data structure from the tag
identifier: Tag identifier string
Returns:
Tuple of (spool_id, filament_id) as strings, or (None, None) if not found
"""
if ndef_data is None:
return None, None
# Look for the OpenTag3D NDEF record with MIME type "application/opentag3d"
octets = None
try:
if hasattr(ndef_data, "records"):
for record in ndef_data.records:
# Check if this is an OpenTag3D MIME type record
if (
hasattr(record, "type")
and record.type == "application/opentag3d"
):
octets = record.data
logger.debug("Found OpenTag3D NDEF record")
break
except (AttributeError, TypeError) as ex:
logger.debug("Failed to find OpenTag3D NDEF record: %s", ex)
if octets is None:
logger.debug("Could not find OpenTag3D NDEF record")
return None, None
# Parse OpenTag3D data
tag_data = self._parse_opentag3d_data(octets)
if tag_data is None:
logger.debug("Not an OpenTag3D format tag")
return None, None
logger.info(
"Parsed OpenTag3D tag: manufacturer=%s, material=%s, color=%s",
tag_data["manufacturer"],
tag_data["material_name"],
tag_data["color_name"],
)
logger.debug("All data: %s", tag_data)
# Generate filament name from template
filament_name = self._generate_filament_name(tag_data)
logger.info("Generated filament name from template: %s", filament_name)
# Find or create vendor
vendor_id = self.spoolman_client.find_vendor_by_name(tag_data["manufacturer"])
if vendor_id is None:
logger.info("Creating new vendor: %s", tag_data["manufacturer"])
vendor_id = self.spoolman_client.create_vendor(tag_data["manufacturer"])
if vendor_id is None:
logger.error("Failed to create vendor")
return None, None
# Find or create filament using vendor, material, and name
# Material is constructed from base_material and material_modifiers
material = tag_data["material_name"]
filament_id = self.spoolman_client.find_filament_by_vendor_material_and_name(
vendor_id, material, filament_name
)
if filament_id is None:
logger.info(
"Creating new filament: %s %s", tag_data["manufacturer"], filament_name
)
# Build base filament data with required fields
filament_data = {
"vendor_id": vendor_id,
"name": filament_name,
"material": material,
"density": tag_data["density"],
"diameter": tag_data["diameter_mm"],
"color_hex": tag_data["color_hex"],
}
# Build multi_color_hexes if color_2_hex is present
if "color_2_hex" in tag_data:
multi_color_hexes = [tag_data["color_hex"], tag_data["color_2_hex"]]
if "color_3_hex" in tag_data:
multi_color_hexes.append(tag_data["color_3_hex"])
if "color_4_hex" in tag_data:
multi_color_hexes.append(tag_data["color_4_hex"])
filament_data["multi_color_hexes"] = multi_color_hexes
# Apply field mapping from config
filament_data = self._apply_field_mapping(
tag_data, self.filament_field_mapping, filament_data
)
filament_id = self.spoolman_client.create_filament(filament_data)
if filament_id is None:
logger.error("Failed to create filament")
return None, None
# Create spool with nfc_id
logger.info(
"Creating new spool for filament %s with nfc_id %s", filament_id, identifier
)
# Build base spool data
spool_data = {
"filament_id": filament_id,
}
# Apply field mapping from config
spool_data = self._apply_field_mapping(
tag_data, self.spool_field_mapping, spool_data
)
# Add nfc_id to extra field
if "extra" not in spool_data:
spool_data["extra"] = {}
spool_data["extra"]["nfc_id"] = f'"{identifier.lower()}"'
if "remaining_weight" not in spool_data:
if "measured_weight" in tag_data:
spool_data["remaining_weight"] = tag_data["measured_weight"]
if "initial_weight" not in spool_data:
if "target_weight" in tag_data:
spool_data["initial_weight"] = tag_data["target_weight"]
spool_id = self.spoolman_client.create_spool(spool_data)
if spool_id is None:
logger.error("Failed to create spool")
return None, None
logger.info(
"Successfully created spool %s and filament %s", spool_id, filament_id
)
return str(spool_id), str(filament_id)

View file

@ -7,6 +7,7 @@ import json
import logging import logging
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
import requests import requests
import base64
logger: logging.Logger = logging.getLogger(__name__) logger: logging.Logger = logging.getLogger(__name__)
@ -15,15 +16,39 @@ logger: logging.Logger = logging.getLogger(__name__)
class SpoolmanClient: class SpoolmanClient:
"""Spoolman Web Client""" """Spoolman Web Client"""
def __init__(self, url: str) -> None: def __init__(
self,
url: str,
http_username: Optional[str],
http_password: Optional[str],
http_headers: Optional[str],
) -> None:
if url.endswith("/"): if url.endswith("/"):
url = url[:-1] url = url[:-1]
self.url: str = url self.url: str = url
self.http_headers = {}
if http_headers:
for c in http_headers.split(";"):
c = c.strip()
if not c:
continue
c_parts = c.split(":", 1)
if len(c_parts) != 2:
raise Exception(
f"Section [spoolman], Option http_headers: {c}: Invalid header format"
)
self.http_headers[c_parts[0]] = c_parts[1]
if http_username and http_password:
creds = base64.b64encode(
f"{http_username}:{http_password}".encode()
).decode()
self.http_headers["Authorization"] = "Basic " + creds
def get_spool(self, spool_id: int) -> Dict[str, Any]: def get_spool(self, spool_id: int) -> Dict[str, Any]:
"""Get the spool from Spoolman""" """Get the spool from Spoolman"""
url: str = self.url + f"/api/v1/spool/{spool_id}" url: str = self.url + f"/api/v1/spool/{spool_id}"
response = requests.get(url, timeout=10) response = requests.get(url, timeout=10, headers=self.http_headers)
if response.status_code != 200: if response.status_code != 200:
raise ValueError(f"Request to spoolman failed: {response}") raise ValueError(f"Request to spoolman failed: {response}")
return response.json() return response.json()
@ -31,7 +56,7 @@ class SpoolmanClient:
def get_spools(self) -> List[Dict[str, Any]]: def get_spools(self) -> List[Dict[str, Any]]:
"""Get the spools from spoolman""" """Get the spools from spoolman"""
url: str = self.url + "/api/v1/spool" url: str = self.url + "/api/v1/spool"
response = requests.get(url, timeout=10) response = requests.get(url, timeout=10, headers=self.http_headers)
if response.status_code != 200: if response.status_code != 200:
raise ValueError(f"Request to spoolman failed: {response}") raise ValueError(f"Request to spoolman failed: {response}")
records: List[Dict[str, Any]] = json.loads(response.text) records: List[Dict[str, Any]] = json.loads(response.text)
@ -61,7 +86,9 @@ class SpoolmanClient:
extra["nfc_id"] = '""' extra["nfc_id"] = '""'
url: str = self.url + f"/api/v1/spool/{spool_id}" url: str = self.url + f"/api/v1/spool/{spool_id}"
response = requests.patch(url, timeout=10, json={"extra": extra}) response = requests.patch(
url, timeout=10, headers=self.http_headers, json={"extra": extra}
)
if response.status_code != 200: if response.status_code != 200:
raise ValueError(f"Request to spoolman failed: {response}: {response.text}") raise ValueError(f"Request to spoolman failed: {response}: {response.text}")
@ -85,7 +112,9 @@ class SpoolmanClient:
extra["nfc_id"] = nfc_id extra["nfc_id"] = nfc_id
url: str = self.url + f"/api/v1/spool/{spool_id}" url: str = self.url + f"/api/v1/spool/{spool_id}"
response = requests.patch(url, timeout=10, json={"extra": extra}) response = requests.patch(
url, timeout=10, headers=self.http_headers, json={"extra": extra}
)
if response.status_code != 200: if response.status_code != 200:
raise ValueError(f"Request to spoolman failed: {response}: {response.text}") raise ValueError(f"Request to spoolman failed: {response}: {response.text}")
@ -102,7 +131,7 @@ class SpoolmanClient:
""" """
try: try:
url: str = self.url + "/api/v1/vendor" url: str = self.url + "/api/v1/vendor"
response = requests.get(url, timeout=10) response = requests.get(url, timeout=10, headers=self.http_headers)
if response.status_code != 200: if response.status_code != 200:
logger.error( logger.error(
"Failed to find vendor '%s': HTTP %d - %s", "Failed to find vendor '%s': HTTP %d - %s",
@ -136,7 +165,9 @@ class SpoolmanClient:
try: try:
url: str = self.url + "/api/v1/vendor" url: str = self.url + "/api/v1/vendor"
data = {"name": name} data = {"name": name}
response = requests.post(url, json=data, timeout=10) response = requests.post(
url, json=data, timeout=10, headers=self.http_headers
)
if response.status_code not in (200, 201): if response.status_code not in (200, 201):
logger.error( logger.error(
"Failed to create vendor '%s': HTTP %d - %s", "Failed to create vendor '%s': HTTP %d - %s",
@ -167,7 +198,9 @@ class SpoolmanClient:
try: try:
url: str = self.url + "/api/v1/filament" url: str = self.url + "/api/v1/filament"
params = {"vendor_id": vendor_id} params = {"vendor_id": vendor_id}
response = requests.get(url, params=params, timeout=10) response = requests.get(
url, params=params, timeout=10, headers=self.http_headers
)
if response.status_code != 200: if response.status_code != 200:
logger.error( logger.error(
"Failed to find filament '%s' for vendor %d: HTTP %d - %s", "Failed to find filament '%s' for vendor %d: HTTP %d - %s",
@ -211,7 +244,9 @@ class SpoolmanClient:
try: try:
url: str = self.url + "/api/v1/filament" url: str = self.url + "/api/v1/filament"
params = {"vendor_id": vendor_id} params = {"vendor_id": vendor_id}
response = requests.get(url, params=params, timeout=10) response = requests.get(
url, params=params, timeout=10, headers=self.http_headers
)
if response.status_code != 200: if response.status_code != 200:
logger.error( logger.error(
"Failed to find filament '%s' (material: %s) for vendor %d: HTTP %d - %s", "Failed to find filament '%s' (material: %s) for vendor %d: HTTP %d - %s",
@ -262,7 +297,9 @@ class SpoolmanClient:
try: try:
url: str = self.url + "/api/v1/filament" url: str = self.url + "/api/v1/filament"
logger.debug("Creating new filament: %s", data) logger.debug("Creating new filament: %s", data)
response = requests.post(url, json=data, timeout=10) response = requests.post(
url, json=data, timeout=10, headers=self.http_headers
)
if response.status_code not in (200, 201): if response.status_code not in (200, 201):
logger.error( logger.error(
"Failed to create filament: HTTP %d - %s", "Failed to create filament: HTTP %d - %s",
@ -294,7 +331,9 @@ class SpoolmanClient:
try: try:
url: str = self.url + "/api/v1/spool" url: str = self.url + "/api/v1/spool"
logger.debug("Creating new spool: %s", data) logger.debug("Creating new spool: %s", data)
response = requests.post(url, json=data, timeout=10) response = requests.post(
url, json=data, timeout=10, headers=self.http_headers
)
if response.status_code not in (200, 201): if response.status_code not in (200, 201):
logger.error( logger.error(
"Failed to create spool: HTTP %d - %s", "Failed to create spool: HTTP %d - %s",

View file

@ -22,6 +22,8 @@ nfc-device = "tty:AMA0"
[spoolman] [spoolman]
# URL for the spoolman installation # URL for the spoolman installation
spoolman-url = "http://mainsailos.local:7912" spoolman-url = "http://mainsailos.local:7912"
# http_username = ""
# http_password = ""
[moonraker] [moonraker]
# URL for the moonraker installation # URL for the moonraker installation

View file

@ -99,7 +99,7 @@ if __name__ == "__main__":
try: try:
api_cmd = [sys.executable, api_script] api_cmd = [sys.executable, api_script]
if parsed_args.config_dir: if parsed_args.config_dir:
api_cmd.extend(["-c", parsed_args.config_dir]) os.environ["NFC2KLIPPER_CONFIG_DIR"] = parsed_args.config_dir
# pylint: disable=consider-using-with # pylint: disable=consider-using-with
api_process = subprocess.Popen(api_cmd) # nosec api_process = subprocess.Popen(api_cmd) # nosec

View file

@ -14,7 +14,6 @@
# pylint: disable=duplicate-code # pylint: disable=duplicate-code
import argparse
import os import os
import sys import sys
from typing import Any, Dict, Optional, Tuple, Union from typing import Any, Dict, Optional, Tuple, Union
@ -23,23 +22,10 @@ from flask import Flask, render_template
from lib.config import Nfc2KlipperConfig from lib.config import Nfc2KlipperConfig
from lib.ipc import IPCClient from lib.ipc import IPCClient
Nfc2KlipperConfig.configure_logging() Nfc2KlipperConfig.configure_logging()
# Parse command line arguments cfg_path = os.environ.get("NFC2KLIPPER_CFG_DIR", Nfc2KlipperConfig.CFG_DIR)
# pylint: disable=duplicate-code args: Optional[Dict[str, Any]] = Nfc2KlipperConfig.get_config(cfg_path)
parser = argparse.ArgumentParser(description="Web API service for nfc2klipper.")
parser.add_argument(
"-c",
"--config-dir",
metavar="DIR",
default=None,
help=f"Configuration directory (default: {Nfc2KlipperConfig.CFG_DIR})",
)
parsed_args = parser.parse_args()
args: Optional[Dict[str, Any]] = Nfc2KlipperConfig.get_config(parsed_args.config_dir)
# pylint: enable=duplicate-code
if not args: if not args:
print( print(

View file

@ -25,7 +25,8 @@ from lib.config import Nfc2KlipperConfig
from lib.ipc import IPCServer from lib.ipc import IPCServer
from lib.moonraker_web_client import MoonrakerWebClient from lib.moonraker_web_client import MoonrakerWebClient
from lib.nfc_handler import NfcHandler from lib.nfc_handler import NfcHandler
from lib.nfc_parsers import NdefTextParser, TagIdentifierParser, OpenTag3DParser from lib.nfc_parsers import NdefTextParser, TagIdentifierParser
from lib.opentag3d_parser import OpenTag3DParser
from lib.spoolman_client import SpoolmanClient from lib.spoolman_client import SpoolmanClient
Nfc2KlipperConfig.configure_logging() Nfc2KlipperConfig.configure_logging()
@ -100,7 +101,12 @@ if USE_MOCK_OBJECTS:
args["nfc"]["nfc-device"] args["nfc"]["nfc-device"]
) )
else: else:
spoolman = SpoolmanClient(args["spoolman"]["spoolman-url"]) spoolman = SpoolmanClient(
args["spoolman"]["spoolman-url"],
args["spoolman"].get("http_username"),
args["spoolman"].get("http_password"),
args["spoolman"].get("http_headers"),
)
moonraker = MoonrakerWebClient( moonraker = MoonrakerWebClient(
args["moonraker"]["moonraker-url"], args["moonraker"]["moonraker-url"],
setting_gcode_template, setting_gcode_template,
@ -239,7 +245,6 @@ def on_nfc_tag_present(ndef_data: Any, identifier: str) -> None:
def on_nfc_no_tag_present() -> None: def on_nfc_no_tag_present() -> None:
"""Called when no tag is present (or tag without data)""" """Called when no tag is present (or tag without data)"""
if should_clear_spool(): if should_clear_spool():
set_spool_and_filament(0, 0) set_spool_and_filament(0, 0)

View file

@ -1,9 +1,9 @@
flask==3.0.3 flask==3.0.3
toml==0.10.2 toml==0.10.2
nfcpy==1.0.4 nfcpy==1.0.4
npyscreen==4.10.5 npyscreen==5.0.2
requests==2.32.5 requests==2.32.5
urllib3>=2.6.0 urllib3>=2.6.0
Gunicorn==23.0.0 Gunicorn==25.1.0
types-toml==0.10.8.20240310 types-toml==0.10.8.20240310
types-requests==2.32.4.20260107 types-requests==2.32.4.20260107

View file

@ -14,6 +14,8 @@ import nfc
import npyscreen import npyscreen
import requests import requests
from lib.config import Nfc2KlipperConfig
from lib.spoolman_client import SpoolmanClient
SPOOL = "SPOOL" SPOOL = "SPOOL"
FILAMENT = "FILAMENT" FILAMENT = "FILAMENT"
@ -26,26 +28,19 @@ parser = argparse.ArgumentParser(
parser.add_argument("--version", action="version", version="%(prog)s 0.0.1") parser.add_argument("--version", action="version", version="%(prog)s 0.0.1")
# pylint: disable=R0801 # pylint: disable=R0801
parser.add_argument( parser.add_argument("-c", "--config", metavar="config", help="Config directory")
"-d",
"--nfc-device",
metavar="device",
default="ttyAMA0",
help="Which NFC reader to use, see "
+ "https://nfcpy.readthedocs.io/en/latest/topics/get-started.html#open-a-local-device"
+ " for format",
)
parser.add_argument(
"-u",
"--url",
metavar="URL",
default="http://mainsailos.local:7912",
help="URL for the Spoolman installation",
)
args = parser.parse_args() args = parser.parse_args()
config = Nfc2KlipperConfig.get_config(args.config)
assert config, "No config found"
spoolman = SpoolmanClient(
config["spoolman"]["spoolman-url"],
config["spoolman"].get("http_username"),
config["spoolman"].get("http_password"),
config["spoolman"].get("http_headers"),
)
def record_to_text(record): def record_to_text(record):
@ -77,9 +72,7 @@ class PostSelectForm(npyscreen.FormBaseNew):
when_pressed_function=self.exit_app, when_pressed_function=self.exit_app,
) )
url = args.url + "/api/v1/spool" self.records = spoolman.get_spools()
records = requests.get(url, timeout=10)
self.records = json.loads(records.text)
self.records = sorted(self.records, key=lambda x: x["id"], reverse=True) self.records = sorted(self.records, key=lambda x: x["id"], reverse=True)
self.posts = self.add( self.posts = self.add(
@ -125,7 +118,7 @@ class TagWritingApp(npyscreen.NPSAppManaged):
self.status = "Written" self.status = "Written"
clf = nfc.ContactlessFrontend(args.nfc_device) clf = nfc.ContactlessFrontend(config["nfc"]["nfc-device"])
clf.connect( clf.connect(
rdwr={"on-connect": lambda tag: self.on_nfc_connect(tag, spool, filament)} rdwr={"on-connect": lambda tag: self.on_nfc_connect(tag, spool, filament)}
) )