326 lines
10 KiB
Python
Executable file
326 lines
10 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
# SPDX-FileCopyrightText: 2024-2025 Sebastian Andersson <sebastian@bittr.nu>
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
# /// script
|
|
# requires-python = ">=3.9"
|
|
# dependencies = [
|
|
# "toml==0.10.2",
|
|
# "nfcpy==1.0.4",
|
|
# "requests==2.32.4",
|
|
# ]
|
|
# ///
|
|
|
|
"""Backend service for NFC handling and communication with Moonraker/Spoolman."""
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import signal
|
|
import sys
|
|
import threading
|
|
from typing import Any, Dict, List, Optional, Union
|
|
|
|
from lib.config import Nfc2KlipperConfig
|
|
from lib.ipc import IPCServer
|
|
from lib.moonraker_web_client import MoonrakerWebClient
|
|
from lib.nfc_handler import NfcHandler
|
|
from lib.nfc_parsers import NdefTextParser, TagIdentifierParser
|
|
from lib.opentag3d_parser import OpenTag3DParser
|
|
from lib.spoolman_client import SpoolmanClient
|
|
|
|
Nfc2KlipperConfig.configure_logging()
|
|
logger: logging.Logger = logging.getLogger(__name__)
|
|
|
|
# Parse command line arguments
|
|
# pylint: disable=duplicate-code
|
|
parser = argparse.ArgumentParser(
|
|
description="Backend service for NFC handling and communication with Moonraker/Spoolman."
|
|
)
|
|
parser.add_argument(
|
|
"-c",
|
|
"--config-dir",
|
|
metavar="DIR",
|
|
default=None,
|
|
help=f"Configuration directory (default: {Nfc2KlipperConfig.CFG_DIR})",
|
|
)
|
|
parsed_args = parser.parse_args()
|
|
|
|
args: Optional[Dict[str, Any]] = Nfc2KlipperConfig.get_config(parsed_args.config_dir)
|
|
# pylint: enable=duplicate-code
|
|
|
|
if not args:
|
|
print(
|
|
"WARNING: The config file is missing, installing a default version.",
|
|
file=sys.stderr,
|
|
)
|
|
Nfc2KlipperConfig.install_config(parsed_args.config_dir)
|
|
sys.exit(1)
|
|
|
|
args: Dict[str, Any] = args
|
|
|
|
# Get socket path from config, with fallback to default
|
|
socket_path: str = args.get("webserver", {}).get(
|
|
"socket_path", Nfc2KlipperConfig.DEFAULT_SOCKET_PATH
|
|
)
|
|
socket_path = os.path.expanduser(socket_path)
|
|
|
|
# Get command templates from config
|
|
setting_gcode_template: List[str] = Nfc2KlipperConfig.get_setting_gcode(args)
|
|
clearing_gcode_template: List[str] = Nfc2KlipperConfig.get_clearing_gcode(args)
|
|
|
|
logger.info("Using setting_gcode: %s", setting_gcode_template)
|
|
logger.info("Using clearing_gcode: %s", clearing_gcode_template)
|
|
|
|
# Check if we should use mock objects
|
|
USE_MOCK_OBJECTS: bool = os.environ.get("NFC2KLIPPER_USE_MOCKS", "").lower() in (
|
|
"1",
|
|
"true",
|
|
"yes",
|
|
)
|
|
|
|
if USE_MOCK_OBJECTS:
|
|
logger.info("Using mock objects for testing")
|
|
from lib.mock_objects import (
|
|
MockNfcHandler,
|
|
MockSpoolmanClient,
|
|
MockMoonrakerWebClient,
|
|
)
|
|
|
|
spoolman: Union[SpoolmanClient, "MockSpoolmanClient"] = MockSpoolmanClient(
|
|
args["spoolman"]["spoolman-url"]
|
|
)
|
|
moonraker: Union[MoonrakerWebClient, "MockMoonrakerWebClient"] = (
|
|
MockMoonrakerWebClient(
|
|
args["moonraker"]["moonraker-url"],
|
|
setting_gcode_template,
|
|
clearing_gcode_template,
|
|
)
|
|
)
|
|
nfc_handler: Union[NfcHandler, "MockNfcHandler"] = MockNfcHandler(
|
|
args["nfc"]["nfc-device"]
|
|
)
|
|
else:
|
|
spoolman = SpoolmanClient(
|
|
args["spoolman"]["spoolman-url"],
|
|
args["spoolman"].get("http_username"),
|
|
args["spoolman"].get("http_password"),
|
|
args["spoolman"].get("http_headers"),
|
|
)
|
|
moonraker = MoonrakerWebClient(
|
|
args["moonraker"]["moonraker-url"],
|
|
setting_gcode_template,
|
|
clearing_gcode_template,
|
|
)
|
|
nfc_handler = NfcHandler(args["nfc"]["nfc-device"])
|
|
|
|
last_nfc_id: Optional[str] = None # pylint: disable=C0103
|
|
last_spool_id: Optional[str] = None # pylint: disable=C0103
|
|
|
|
# Create IPC server instance
|
|
ipc_server: IPCServer = IPCServer(socket_path)
|
|
|
|
# Get OpenTag3D filament name template
|
|
opentag3d_filament_template: str = (
|
|
Nfc2KlipperConfig.get_opentag3d_filament_name_template(args)
|
|
)
|
|
logger.info("Using OpenTag3D filament name template: %s", opentag3d_filament_template)
|
|
|
|
# Get OpenTag3D field mappings
|
|
opentag3d_filament_mapping: Dict[str, str] = (
|
|
Nfc2KlipperConfig.get_opentag3d_filament_field_mapping(args)
|
|
)
|
|
opentag3d_spool_mapping: Dict[str, str] = (
|
|
Nfc2KlipperConfig.get_opentag3d_spool_field_mapping(args)
|
|
)
|
|
logger.info("OpenTag3D filament field mapping: %s", opentag3d_filament_mapping)
|
|
logger.info("OpenTag3D spool field mapping: %s", opentag3d_spool_mapping)
|
|
|
|
# Create parsers for different tag formats
|
|
# List of parsers to try in order:
|
|
# 1. NDEF text parser for simple SPOOL:X FILAMENT:Y format
|
|
# 2. Tag ID lookup in Spoolman's nfc_id extra field
|
|
# 3. OpenTag3D parser - only called if tag not found via nfc_id
|
|
parsers: List[Any] = [
|
|
NdefTextParser(),
|
|
TagIdentifierParser(spoolman),
|
|
OpenTag3DParser(
|
|
spoolman,
|
|
opentag3d_filament_template,
|
|
opentag3d_filament_mapping,
|
|
opentag3d_spool_mapping,
|
|
),
|
|
]
|
|
|
|
|
|
def should_always_send() -> bool:
|
|
"""Should SET_ACTIVE_* macros always be called when tag is read,
|
|
or only when different?"""
|
|
assert args is not None # nosec
|
|
always_send: Optional[bool] = args["moonraker"].get("always-send")
|
|
|
|
if always_send is None:
|
|
return False
|
|
|
|
return always_send
|
|
|
|
|
|
def set_spool_and_filament(spool: int, filament: int) -> None:
|
|
"""Calls moonraker with the current spool & filament"""
|
|
|
|
if "old_spool" not in set_spool_and_filament.__dict__: # type: ignore[attr-defined]
|
|
set_spool_and_filament.old_spool = None # type: ignore[attr-defined]
|
|
set_spool_and_filament.old_filament = None # type: ignore[attr-defined]
|
|
|
|
if not should_always_send() and (
|
|
set_spool_and_filament.old_spool == spool # type: ignore[attr-defined]
|
|
and set_spool_and_filament.old_filament == filament # type: ignore[attr-defined]
|
|
):
|
|
logger.info("Read same spool & filament")
|
|
return
|
|
|
|
logger.info("Sending spool #%s, filament #%s to klipper", spool, filament)
|
|
|
|
# In case the post fails, we might not know if the server has received
|
|
# it or not, so set them to None:
|
|
set_spool_and_filament.old_spool = None # type: ignore[attr-defined]
|
|
set_spool_and_filament.old_filament = None # type: ignore[attr-defined]
|
|
|
|
try:
|
|
if spool and filament:
|
|
moonraker.set_spool_and_filament(spool, filament)
|
|
else:
|
|
moonraker.clear_spool_and_filament()
|
|
except Exception as ex: # pylint: disable=W0718
|
|
logger.error(ex)
|
|
return
|
|
|
|
set_spool_and_filament.old_spool = spool # type: ignore[attr-defined]
|
|
set_spool_and_filament.old_filament = filament # type: ignore[attr-defined]
|
|
|
|
|
|
def should_clear_spool() -> bool:
|
|
"""Returns True if the config says the spool should be cleared"""
|
|
assert args is not None # nosec
|
|
if args["moonraker"].get("clear-spool"):
|
|
return True
|
|
return False
|
|
|
|
|
|
def on_nfc_tag_present(ndef_data: Any, identifier: str) -> None:
|
|
"""Handles a read tag"""
|
|
|
|
if identifier:
|
|
global last_nfc_id # pylint: disable=W0603
|
|
last_nfc_id = identifier
|
|
|
|
# Try each parser in order until one returns valid spool and filament data
|
|
spool: Optional[str] = None
|
|
filament: Optional[str] = None
|
|
|
|
for tag_parser in parsers:
|
|
spool_and_filament = tag_parser.parse(ndef_data, identifier)
|
|
if spool_and_filament:
|
|
spool, filament = spool_and_filament
|
|
if spool and filament:
|
|
# Found valid data, stop trying other parsers
|
|
break
|
|
|
|
if spool:
|
|
global last_spool_id # pylint: disable=W0603
|
|
last_spool_id = spool
|
|
|
|
if spool and filament:
|
|
# Convert string to int
|
|
spool_int = int(spool)
|
|
filament_int = int(filament)
|
|
set_spool_and_filament(spool_int, filament_int)
|
|
else:
|
|
logger.info(
|
|
"Did not find spool and filament data in tag (%s)",
|
|
identifier,
|
|
)
|
|
|
|
|
|
def on_nfc_no_tag_present() -> None:
|
|
"""Called when no tag is present (or tag without data)"""
|
|
if should_clear_spool():
|
|
set_spool_and_filament(0, 0)
|
|
|
|
|
|
@ipc_server.register_handler("write_tag")
|
|
def handle_write_tag(spool: int, filament: int) -> Dict[str, Any]:
|
|
"""Handle write_tag command"""
|
|
logger.info(" write spool=%s, filament=%s", spool, filament)
|
|
if nfc_handler.write_to_tag(spool, filament):
|
|
return {"status": "ok"}
|
|
return {"status": "error", "message": "Failed to write to tag"}
|
|
|
|
|
|
@ipc_server.register_handler("set_nfc_id")
|
|
def handle_set_nfc_id(spool: int) -> Dict[str, Any]:
|
|
"""Handle set_nfc_id command"""
|
|
global last_nfc_id # pylint: disable=W0602,W0603
|
|
logger.info("Set nfc_id=%s to spool=%s in Spoolman", last_nfc_id, spool)
|
|
|
|
if last_nfc_id is None:
|
|
return {"status": "error", "message": "No nfc_id to write"}
|
|
|
|
if spoolman.set_nfc_id_for_spool(spool, last_nfc_id):
|
|
return {"status": "ok"}
|
|
|
|
return {"status": "error", "message": "Failed to send nfc_id to Spoolman"}
|
|
|
|
|
|
@ipc_server.register_handler("get_spools")
|
|
def handle_get_spools() -> Dict[str, Any]:
|
|
"""Handle get_spools command"""
|
|
spools = spoolman.get_spools()
|
|
return {"status": "ok", "spools": spools}
|
|
|
|
|
|
@ipc_server.register_handler("get_state")
|
|
def handle_get_state() -> Dict[str, Any]:
|
|
"""Handle get_state command"""
|
|
return {
|
|
"status": "ok",
|
|
"nfc_id": last_nfc_id,
|
|
"spool_id": last_spool_id,
|
|
}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
def signal_handler(signum, _frame):
|
|
"""Handle termination signals"""
|
|
logger.info("Received signal %s, shutting down...", signum)
|
|
nfc_handler.stop()
|
|
# Clean up socket file
|
|
if os.path.exists(socket_path):
|
|
try:
|
|
os.unlink(socket_path)
|
|
except OSError:
|
|
pass
|
|
sys.exit(0)
|
|
|
|
# Register signal handlers
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
if should_clear_spool():
|
|
# Start by unsetting current spool & filament:
|
|
set_spool_and_filament(0, 0)
|
|
|
|
nfc_handler.set_no_tag_present_callback(on_nfc_no_tag_present)
|
|
nfc_handler.set_tag_present_callback(on_nfc_tag_present)
|
|
|
|
logger.info("Starting socket server")
|
|
socket_thread = threading.Thread(target=ipc_server.start)
|
|
socket_thread.daemon = True
|
|
socket_thread.start()
|
|
|
|
logger.info("Starting nfc-handler")
|
|
try:
|
|
nfc_handler.run()
|
|
except (KeyboardInterrupt, SystemExit):
|
|
signal_handler(signal.SIGINT, None)
|