From dcd9aaad893ee069adcda2a002c37ab05cbd06a5 Mon Sep 17 00:00:00 2001 From: ThetaDev Date: Sun, 19 Apr 2026 01:01:18 +0200 Subject: [PATCH 1/4] feat: add job queue --- .gitignore | 1 + moonraker/components/job_queue.py | 211 ++++++++++++++++++------------ 2 files changed, 126 insertions(+), 86 deletions(-) diff --git a/.gitignore b/.gitignore index 5302af4..cb4e7f8 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,4 @@ share .mypy_cache .pdm-build .pytest_cache +uv.lock diff --git a/moonraker/components/job_queue.py b/moonraker/components/job_queue.py index 41312f1..9ed1d24 100644 --- a/moonraker/components/job_queue.py +++ b/moonraker/components/job_queue.py @@ -8,27 +8,28 @@ from __future__ import annotations import asyncio import time import logging +import uuid + +from moonraker.utils.exceptions import ServerError from ..common import JobEvent, RequestType # Annotation imports -from typing import ( - TYPE_CHECKING, - Any, - Optional, - Dict, - List, - Union, -) +from typing import TYPE_CHECKING, Any, Optional, Dict, List, Union, Self + if TYPE_CHECKING: from ..confighelper import ConfigHelper from ..common import WebRequest, UserInfo from .klippy_apis import KlippyAPI + from .database import MoonrakerDatabase as DBComp from .file_manager.file_manager import FileManager from .job_state import JobState + class JobQueue: def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server() + db: DBComp = self.server.load_component(config, "database") + self.sync_provider = db.get_provider_wrapper() self.queued_jobs: Dict[str, QueuedJob] = {} self.lock = asyncio.Lock() self.pause_requested: bool = False @@ -36,18 +37,23 @@ class JobQueue: self.automatic = config.getboolean("automatic_transition", False) self.queue_state: str = "paused" self.job_delay = config.getfloat("job_transition_delay", 0.01) - if self.job_delay <= 0.: + if self.job_delay <= 0.0: raise config.error( "Value for option 'job_transition_delay' in section [job_queue]" - " must be above 0.0") - self.job_transition_gcode = config.get( - "job_transition_gcode", "").strip() + " must be above 0.0" + ) + self.job_transition_gcode = config.get("job_transition_gcode", "").strip() self.pop_queue_handle: Optional[asyncio.TimerHandle] = None + queue_data: Dict[str, Any] = self.sync_provider.get_item( + "moonraker", "job_queue.queue_data", {} + ) + self._from_ser_dict(queue_data) + + self.server.register_event_handler("server:klippy_ready", self._handle_ready) self.server.register_event_handler( - "server:klippy_ready", self._handle_ready) - self.server.register_event_handler( - "server:klippy_shutdown", self._handle_shutdown) + "server:klippy_shutdown", self._handle_shutdown + ) self.server.register_event_handler( "job_state:state_changed", self._on_job_state_changed ) @@ -57,8 +63,9 @@ class JobQueue: self.server.register_remote_method("start_job_queue", self.start_queue) self.server.register_endpoint( - "/server/job_queue/job", RequestType.POST | RequestType.DELETE, - self._handle_job_request + "/server/job_queue/job", + RequestType.POST | RequestType.DELETE, + self._handle_job_request, ) self.server.register_endpoint( "/server/job_queue/pause", RequestType.POST, self._handle_pause_queue @@ -78,11 +85,12 @@ class JobQueue: if not self.load_on_start or not self.queued_jobs: return # start a queued print - if self.queue_state in ['ready', 'paused']: + if self.queue_state in ["ready", "paused"]: event_loop = self.server.get_event_loop() self._set_queue_state("loading") self.pop_queue_handle = event_loop.delay_callback( - 1., self._pop_job, False) + 1.0, self._pop_job, False + ) async def _handle_shutdown(self) -> None: has_requested_pause = self.pause_requested @@ -104,7 +112,8 @@ class JobQueue: event_loop = self.server.get_event_loop() self._set_queue_state("loading") self.pop_queue_handle = event_loop.delay_callback( - self.job_delay, self._pop_job) + self.job_delay, self._pop_job + ) async def _on_job_abort(self) -> None: async with self.lock: @@ -119,7 +128,7 @@ class JobQueue: if not self.queued_jobs: self._set_queue_state("paused") return - kapis: KlippyAPI = self.server.lookup_component('klippy_apis') + kapis: KlippyAPI = self.server.lookup_component("klippy_apis") uid, job = list(self.queued_jobs.items())[0] filename = str(job) can_print = await self._check_can_print() @@ -134,7 +143,8 @@ class JobQueue: if self.queue_state != "loading": self._set_queue_state("paused") raise self.server.error( - "Queue State Changed during Transition Gcode") + "Queue State Changed during Transition Gcode" + ) self._set_queue_state("starting") await kapis.start_print( filename, wait_klippy_started=True, user=job.user @@ -153,25 +163,26 @@ class JobQueue: async def _check_can_print(self) -> bool: # Query the latest stats - kapis: KlippyAPI = self.server.lookup_component('klippy_apis') + kapis: KlippyAPI = self.server.lookup_component("klippy_apis") try: result = await kapis.query_objects({"print_stats": None}) except Exception: # Klippy not connected return False - if 'print_stats' not in result: + if "print_stats" not in result: return False - state: str = result['print_stats']['state'] + state: str = result["print_stats"]["state"] if state in ["printing", "paused"]: return False return True - async def queue_job(self, - filenames: Union[str, List[str]], - check_exists: bool = True, - reset: bool = False, - user: Optional[UserInfo] = None - ) -> None: + async def queue_job( + self, + filenames: Union[str, List[str]], + check_exists: bool = True, + reset: bool = False, + user: Optional[UserInfo] = None, + ) -> None: async with self.lock: # Make sure that the file exists if isinstance(filenames, str): @@ -192,10 +203,9 @@ class JobQueue: if last_evt.is_printing or last_evt == JobEvent.PAUSED: self._set_queue_state("ready") - async def delete_job(self, - job_ids: Union[str, List[str]], - all: bool = False - ) -> None: + async def delete_job( + self, job_ids: Union[str, List[str]], all: bool = False + ) -> None: async with self.lock: if not self.queued_jobs: # No jobs in queue, nothing to delete @@ -238,14 +248,30 @@ class JobQueue: def _job_map_to_list(self) -> List[Dict[str, Any]]: cur_time = time.time() - return [job.as_dict(cur_time) for - job in self.queued_jobs.values()] + return [job.as_dict(cur_time) for job in self.queued_jobs.values()] + + def _as_ser_dict(self) -> Dict[str, Any]: + return {"queued_jobs": [job.as_ser_dict() for job in self.queued_jobs.values()]} + + def _from_ser_dict(self, data: Dict[str, Any]): + self.queued_jobs.clear() + queued_jobs = data.get("queued_jobs") + if queued_jobs: + for job_data in queued_jobs: + job = QueuedJob.from_ser_dict(job_data) + # check if the file still exists + try: + self._check_job_file(job.filename) + except ServerError: + logging.error(f"Queued print file {job.filename} does not exist") + continue + self.queued_jobs[job.job_id] = job + self._send_queue_event("jobs_added") def _check_job_file(self, job_name: str) -> None: - fm: FileManager = self.server.lookup_component('file_manager') + fm: FileManager = self.server.lookup_component("file_manager") if not fm.check_file_exists("gcodes", job_name): - raise self.server.error( - f"G-Code File {job_name} does not exist") + raise self.server.error(f"G-Code File {job_name} does not exist") def _set_queue_state(self, new_state: str) -> None: if new_state != self.queue_state: @@ -258,19 +284,25 @@ class JobQueue: updated_queue = self._job_map_to_list() event_loop = self.server.get_event_loop() event_loop.delay_callback( - .05, self.server.send_event, "job_queue:job_queue_changed", + 0.05, + self.server.send_event, + "job_queue:job_queue_changed", { - 'action': action, - 'updated_queue': updated_queue, - 'queue_state': self.queue_state - }) + "action": action, + "updated_queue": updated_queue, + "queue_state": self.queue_state, + }, + ) - async def _handle_job_request( - self, web_request: WebRequest - ) -> Dict[str, Any]: + # persist queue data + self.sync_provider.insert_item( + "moonraker", "job_queue.queue_data", self._as_ser_dict() + ) + + async def _handle_job_request(self, web_request: WebRequest) -> Dict[str, Any]: req_type = web_request.get_request_type() if req_type == RequestType.POST: - files = web_request.get_list('filenames') + files = web_request.get_list("filenames") reset = web_request.get_boolean("reset", False) # Validate that all files exist before queueing user = web_request.get_current_user() @@ -279,40 +311,22 @@ class JobQueue: if web_request.get_boolean("all", False): await self.delete_job([], all=True) else: - job_ids = web_request.get_list('job_ids') + job_ids = web_request.get_list("job_ids") await self.delete_job(job_ids) else: raise self.server.error(f"Invalid request type: {req_type}") - return { - 'queued_jobs': self._job_map_to_list(), - 'queue_state': self.queue_state - } + return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state} - async def _handle_pause_queue(self, - web_request: WebRequest - ) -> Dict[str, Any]: + async def _handle_pause_queue(self, web_request: WebRequest) -> Dict[str, Any]: await self.pause_queue() - return { - 'queued_jobs': self._job_map_to_list(), - 'queue_state': self.queue_state - } + return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state} - async def _handle_start_queue(self, - web_request: WebRequest - ) -> Dict[str, Any]: + async def _handle_start_queue(self, web_request: WebRequest) -> Dict[str, Any]: await self.start_queue() - return { - 'queued_jobs': self._job_map_to_list(), - 'queue_state': self.queue_state - } + return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state} - async def _handle_queue_status(self, - web_request: WebRequest - ) -> Dict[str, Any]: - return { - 'queued_jobs': self._job_map_to_list(), - 'queue_state': self.queue_state - } + async def _handle_queue_status(self, web_request: WebRequest) -> Dict[str, Any]: + return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state} async def _handle_jump(self, web_request: WebRequest) -> Dict[str, Any]: job_id: str = web_request.get("job_id") @@ -323,19 +337,26 @@ class JobQueue: new_queue = {job_id: job} new_queue.update(self.queued_jobs) self.queued_jobs = new_queue - return { - 'queued_jobs': self._job_map_to_list(), - 'queue_state': self.queue_state - } + return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state} async def close(self): await self.pause_queue() + class QueuedJob: - def __init__(self, filename: str, user: Optional[UserInfo] = None) -> None: + def __init__( + self, + filename: str, + user: Optional[UserInfo] = None, + job_id: Optional[str] = None, + time_added: Optional[float] = None, + ) -> None: self.filename = filename - self.job_id = f"{id(self):016X}" - self.time_added = time.time() + self.job_id = job_id or str(uuid.uuid4()) + if time_added is None: + self.time_added = time.time() + else: + self.time_added = time_added self._user = user def __str__(self) -> str: @@ -347,11 +368,29 @@ class QueuedJob: def as_dict(self, cur_time: float) -> Dict[str, Any]: return { - 'filename': self.filename, - 'job_id': self.job_id, - 'time_added': self.time_added, - 'time_in_queue': cur_time - self.time_added + "filename": self.filename, + "job_id": self.job_id, + "time_added": self.time_added, + "time_in_queue": cur_time - self.time_added, } + def as_ser_dict(self) -> Dict[str, Any]: + return { + "filename": self.filename, + "job_id": self.job_id, + "time_added": self.time_added, + "user": self._user and self._user.as_dict(), + } + + @classmethod + def from_ser_dict(cls, data: Dict[str, Any]) -> Self: + return cls( + data["filename"], + data.get("user"), + data["job_id"], + data["time_added"], + ) + + def load_component(config: ConfigHelper) -> JobQueue: return JobQueue(config) From 0bbccfce0b74d700666d0d6a2f370498c5813513 Mon Sep 17 00:00:00 2001 From: ThetaDev Date: Sun, 19 Apr 2026 01:22:04 +0200 Subject: [PATCH 2/4] fix: use eventloop for db access --- moonraker/components/job_queue.py | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/moonraker/components/job_queue.py b/moonraker/components/job_queue.py index 9ed1d24..fce72dd 100644 --- a/moonraker/components/job_queue.py +++ b/moonraker/components/job_queue.py @@ -28,8 +28,7 @@ if TYPE_CHECKING: class JobQueue: def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server() - db: DBComp = self.server.load_component(config, "database") - self.sync_provider = db.get_provider_wrapper() + self.db: DBComp = self.server.load_component(config, "database") self.queued_jobs: Dict[str, QueuedJob] = {} self.lock = asyncio.Lock() self.pause_requested: bool = False @@ -45,10 +44,8 @@ class JobQueue: self.job_transition_gcode = config.get("job_transition_gcode", "").strip() self.pop_queue_handle: Optional[asyncio.TimerHandle] = None - queue_data: Dict[str, Any] = self.sync_provider.get_item( - "moonraker", "job_queue.queue_data", {} - ) - self._from_ser_dict(queue_data) + event_loop = self.server.get_event_loop() + event_loop.create_task(self._load_state()) self.server.register_event_handler("server:klippy_ready", self._handle_ready) self.server.register_event_handler( @@ -80,6 +77,17 @@ class JobQueue: "/server/job_queue/jump", RequestType.POST, self._handle_jump ) + async def _load_state(self) -> None: + queue_data: Dict[str, Any] = await self.db.get_item( + "moonraker", "job_queue.queue_data", {} + ) + self._from_ser_dict(queue_data) + + async def _persist_state(self) -> None: + queue_data = self._as_ser_dict() + logging.info(f"persisting queue data: {queue_data}") + await self.db.insert_item("moonraker", "job_queue.queue_data", queue_data) + async def _handle_ready(self) -> None: async with self.lock: if not self.load_on_start or not self.queued_jobs: @@ -295,9 +303,8 @@ class JobQueue: ) # persist queue data - self.sync_provider.insert_item( - "moonraker", "job_queue.queue_data", self._as_ser_dict() - ) + event_loop.create_task(self._persist_state()) + async def _handle_job_request(self, web_request: WebRequest) -> Dict[str, Any]: req_type = web_request.get_request_type() From 52bb054a5fcb2b710f8d41649ac6b3c548e85e76 Mon Sep 17 00:00:00 2001 From: ThetaDev Date: Sun, 19 Apr 2026 01:31:23 +0200 Subject: [PATCH 3/4] do not persist user data --- moonraker/components/job_queue.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/moonraker/components/job_queue.py b/moonraker/components/job_queue.py index fce72dd..c12e856 100644 --- a/moonraker/components/job_queue.py +++ b/moonraker/components/job_queue.py @@ -85,7 +85,6 @@ class JobQueue: async def _persist_state(self) -> None: queue_data = self._as_ser_dict() - logging.info(f"persisting queue data: {queue_data}") await self.db.insert_item("moonraker", "job_queue.queue_data", queue_data) async def _handle_ready(self) -> None: @@ -274,6 +273,7 @@ class JobQueue: logging.error(f"Queued print file {job.filename} does not exist") continue self.queued_jobs[job.job_id] = job + logging.info(f"queue loaded: {len(self.queued_jobs)} jobs") self._send_queue_event("jobs_added") def _check_job_file(self, job_name: str) -> None: @@ -386,14 +386,13 @@ class QueuedJob: "filename": self.filename, "job_id": self.job_id, "time_added": self.time_added, - "user": self._user and self._user.as_dict(), } @classmethod def from_ser_dict(cls, data: Dict[str, Any]) -> Self: return cls( data["filename"], - data.get("user"), + None, data["job_id"], data["time_added"], ) From 3b243b883c30435e6da5ff73b0c007f6b114bfbd Mon Sep 17 00:00:00 2001 From: ThetaDev Date: Sun, 19 Apr 2026 01:50:29 +0200 Subject: [PATCH 4/4] remove / prefix from queue paths --- moonraker/components/job_queue.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/moonraker/components/job_queue.py b/moonraker/components/job_queue.py index c12e856..504fedd 100644 --- a/moonraker/components/job_queue.py +++ b/moonraker/components/job_queue.py @@ -82,6 +82,8 @@ class JobQueue: "moonraker", "job_queue.queue_data", {} ) self._from_ser_dict(queue_data) + if self.queued_jobs: + logging.info(f"queue loaded: {len(self.queued_jobs)} jobs") async def _persist_state(self) -> None: queue_data = self._as_ser_dict() @@ -194,6 +196,8 @@ class JobQueue: # Make sure that the file exists if isinstance(filenames, str): filenames = [filenames] + # Temporary fix until Mainsail is updated + filenames = [fn.removeprefix("/") for fn in filenames] if check_exists: # Make sure all files exist before adding them to the queue for fname in filenames: @@ -273,7 +277,6 @@ class JobQueue: logging.error(f"Queued print file {job.filename} does not exist") continue self.queued_jobs[job.job_id] = job - logging.info(f"queue loaded: {len(self.queued_jobs)} jobs") self._send_queue_event("jobs_added") def _check_job_file(self, job_name: str) -> None: