diff --git a/.gitignore b/.gitignore index cb4e7f8..5302af4 100644 --- a/.gitignore +++ b/.gitignore @@ -18,4 +18,3 @@ share .mypy_cache .pdm-build .pytest_cache -uv.lock diff --git a/moonraker/components/job_queue.py b/moonraker/components/job_queue.py index 504fedd..41312f1 100644 --- a/moonraker/components/job_queue.py +++ b/moonraker/components/job_queue.py @@ -8,27 +8,27 @@ from __future__ import annotations import asyncio import time import logging -import uuid - -from moonraker.utils.exceptions import ServerError from ..common import JobEvent, RequestType # Annotation imports -from typing import TYPE_CHECKING, Any, Optional, Dict, List, Union, Self - +from typing import ( + TYPE_CHECKING, + Any, + Optional, + Dict, + List, + Union, +) if TYPE_CHECKING: from ..confighelper import ConfigHelper from ..common import WebRequest, UserInfo from .klippy_apis import KlippyAPI - from .database import MoonrakerDatabase as DBComp from .file_manager.file_manager import FileManager from .job_state import JobState - class JobQueue: def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server() - self.db: DBComp = self.server.load_component(config, "database") self.queued_jobs: Dict[str, QueuedJob] = {} self.lock = asyncio.Lock() self.pause_requested: bool = False @@ -36,21 +36,18 @@ class JobQueue: self.automatic = config.getboolean("automatic_transition", False) self.queue_state: str = "paused" self.job_delay = config.getfloat("job_transition_delay", 0.01) - if self.job_delay <= 0.0: + if self.job_delay <= 0.: raise config.error( "Value for option 'job_transition_delay' in section [job_queue]" - " must be above 0.0" - ) - self.job_transition_gcode = config.get("job_transition_gcode", "").strip() + " must be above 0.0") + self.job_transition_gcode = config.get( + "job_transition_gcode", "").strip() self.pop_queue_handle: Optional[asyncio.TimerHandle] = None - event_loop = self.server.get_event_loop() - event_loop.create_task(self._load_state()) - - self.server.register_event_handler("server:klippy_ready", self._handle_ready) self.server.register_event_handler( - "server:klippy_shutdown", self._handle_shutdown - ) + "server:klippy_ready", self._handle_ready) + self.server.register_event_handler( + "server:klippy_shutdown", self._handle_shutdown) self.server.register_event_handler( "job_state:state_changed", self._on_job_state_changed ) @@ -60,9 +57,8 @@ class JobQueue: self.server.register_remote_method("start_job_queue", self.start_queue) self.server.register_endpoint( - "/server/job_queue/job", - RequestType.POST | RequestType.DELETE, - self._handle_job_request, + "/server/job_queue/job", RequestType.POST | RequestType.DELETE, + self._handle_job_request ) self.server.register_endpoint( "/server/job_queue/pause", RequestType.POST, self._handle_pause_queue @@ -77,29 +73,16 @@ class JobQueue: "/server/job_queue/jump", RequestType.POST, self._handle_jump ) - async def _load_state(self) -> None: - queue_data: Dict[str, Any] = await self.db.get_item( - "moonraker", "job_queue.queue_data", {} - ) - self._from_ser_dict(queue_data) - if self.queued_jobs: - logging.info(f"queue loaded: {len(self.queued_jobs)} jobs") - - async def _persist_state(self) -> None: - queue_data = self._as_ser_dict() - await self.db.insert_item("moonraker", "job_queue.queue_data", queue_data) - async def _handle_ready(self) -> None: async with self.lock: if not self.load_on_start or not self.queued_jobs: return # start a queued print - if self.queue_state in ["ready", "paused"]: + if self.queue_state in ['ready', 'paused']: event_loop = self.server.get_event_loop() self._set_queue_state("loading") self.pop_queue_handle = event_loop.delay_callback( - 1.0, self._pop_job, False - ) + 1., self._pop_job, False) async def _handle_shutdown(self) -> None: has_requested_pause = self.pause_requested @@ -121,8 +104,7 @@ class JobQueue: event_loop = self.server.get_event_loop() self._set_queue_state("loading") self.pop_queue_handle = event_loop.delay_callback( - self.job_delay, self._pop_job - ) + self.job_delay, self._pop_job) async def _on_job_abort(self) -> None: async with self.lock: @@ -137,7 +119,7 @@ class JobQueue: if not self.queued_jobs: self._set_queue_state("paused") return - kapis: KlippyAPI = self.server.lookup_component("klippy_apis") + kapis: KlippyAPI = self.server.lookup_component('klippy_apis') uid, job = list(self.queued_jobs.items())[0] filename = str(job) can_print = await self._check_can_print() @@ -152,8 +134,7 @@ class JobQueue: if self.queue_state != "loading": self._set_queue_state("paused") raise self.server.error( - "Queue State Changed during Transition Gcode" - ) + "Queue State Changed during Transition Gcode") self._set_queue_state("starting") await kapis.start_print( filename, wait_klippy_started=True, user=job.user @@ -172,32 +153,29 @@ class JobQueue: async def _check_can_print(self) -> bool: # Query the latest stats - kapis: KlippyAPI = self.server.lookup_component("klippy_apis") + kapis: KlippyAPI = self.server.lookup_component('klippy_apis') try: result = await kapis.query_objects({"print_stats": None}) except Exception: # Klippy not connected return False - if "print_stats" not in result: + if 'print_stats' not in result: return False - state: str = result["print_stats"]["state"] + state: str = result['print_stats']['state'] if state in ["printing", "paused"]: return False return True - async def queue_job( - self, - filenames: Union[str, List[str]], - check_exists: bool = True, - reset: bool = False, - user: Optional[UserInfo] = None, - ) -> None: + async def queue_job(self, + filenames: Union[str, List[str]], + check_exists: bool = True, + reset: bool = False, + user: Optional[UserInfo] = None + ) -> None: async with self.lock: # Make sure that the file exists if isinstance(filenames, str): filenames = [filenames] - # Temporary fix until Mainsail is updated - filenames = [fn.removeprefix("/") for fn in filenames] if check_exists: # Make sure all files exist before adding them to the queue for fname in filenames: @@ -214,9 +192,10 @@ class JobQueue: if last_evt.is_printing or last_evt == JobEvent.PAUSED: self._set_queue_state("ready") - async def delete_job( - self, job_ids: Union[str, List[str]], all: bool = False - ) -> None: + async def delete_job(self, + job_ids: Union[str, List[str]], + all: bool = False + ) -> None: async with self.lock: if not self.queued_jobs: # No jobs in queue, nothing to delete @@ -259,30 +238,14 @@ class JobQueue: def _job_map_to_list(self) -> List[Dict[str, Any]]: cur_time = time.time() - return [job.as_dict(cur_time) for job in self.queued_jobs.values()] - - def _as_ser_dict(self) -> Dict[str, Any]: - return {"queued_jobs": [job.as_ser_dict() for job in self.queued_jobs.values()]} - - def _from_ser_dict(self, data: Dict[str, Any]): - self.queued_jobs.clear() - queued_jobs = data.get("queued_jobs") - if queued_jobs: - for job_data in queued_jobs: - job = QueuedJob.from_ser_dict(job_data) - # check if the file still exists - try: - self._check_job_file(job.filename) - except ServerError: - logging.error(f"Queued print file {job.filename} does not exist") - continue - self.queued_jobs[job.job_id] = job - self._send_queue_event("jobs_added") + return [job.as_dict(cur_time) for + job in self.queued_jobs.values()] def _check_job_file(self, job_name: str) -> None: - fm: FileManager = self.server.lookup_component("file_manager") + fm: FileManager = self.server.lookup_component('file_manager') if not fm.check_file_exists("gcodes", job_name): - raise self.server.error(f"G-Code File {job_name} does not exist") + raise self.server.error( + f"G-Code File {job_name} does not exist") def _set_queue_state(self, new_state: str) -> None: if new_state != self.queue_state: @@ -295,24 +258,19 @@ class JobQueue: updated_queue = self._job_map_to_list() event_loop = self.server.get_event_loop() event_loop.delay_callback( - 0.05, - self.server.send_event, - "job_queue:job_queue_changed", + .05, self.server.send_event, "job_queue:job_queue_changed", { - "action": action, - "updated_queue": updated_queue, - "queue_state": self.queue_state, - }, - ) + 'action': action, + 'updated_queue': updated_queue, + 'queue_state': self.queue_state + }) - # persist queue data - event_loop.create_task(self._persist_state()) - - - async def _handle_job_request(self, web_request: WebRequest) -> Dict[str, Any]: + async def _handle_job_request( + self, web_request: WebRequest + ) -> Dict[str, Any]: req_type = web_request.get_request_type() if req_type == RequestType.POST: - files = web_request.get_list("filenames") + files = web_request.get_list('filenames') reset = web_request.get_boolean("reset", False) # Validate that all files exist before queueing user = web_request.get_current_user() @@ -321,22 +279,40 @@ class JobQueue: if web_request.get_boolean("all", False): await self.delete_job([], all=True) else: - job_ids = web_request.get_list("job_ids") + job_ids = web_request.get_list('job_ids') await self.delete_job(job_ids) else: raise self.server.error(f"Invalid request type: {req_type}") - return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state} + return { + 'queued_jobs': self._job_map_to_list(), + 'queue_state': self.queue_state + } - async def _handle_pause_queue(self, web_request: WebRequest) -> Dict[str, Any]: + async def _handle_pause_queue(self, + web_request: WebRequest + ) -> Dict[str, Any]: await self.pause_queue() - return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state} + return { + 'queued_jobs': self._job_map_to_list(), + 'queue_state': self.queue_state + } - async def _handle_start_queue(self, web_request: WebRequest) -> Dict[str, Any]: + async def _handle_start_queue(self, + web_request: WebRequest + ) -> Dict[str, Any]: await self.start_queue() - return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state} + return { + 'queued_jobs': self._job_map_to_list(), + 'queue_state': self.queue_state + } - async def _handle_queue_status(self, web_request: WebRequest) -> Dict[str, Any]: - return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state} + async def _handle_queue_status(self, + web_request: WebRequest + ) -> Dict[str, Any]: + return { + 'queued_jobs': self._job_map_to_list(), + 'queue_state': self.queue_state + } async def _handle_jump(self, web_request: WebRequest) -> Dict[str, Any]: job_id: str = web_request.get("job_id") @@ -347,26 +323,19 @@ class JobQueue: new_queue = {job_id: job} new_queue.update(self.queued_jobs) self.queued_jobs = new_queue - return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state} + return { + 'queued_jobs': self._job_map_to_list(), + 'queue_state': self.queue_state + } async def close(self): await self.pause_queue() - class QueuedJob: - def __init__( - self, - filename: str, - user: Optional[UserInfo] = None, - job_id: Optional[str] = None, - time_added: Optional[float] = None, - ) -> None: + def __init__(self, filename: str, user: Optional[UserInfo] = None) -> None: self.filename = filename - self.job_id = job_id or str(uuid.uuid4()) - if time_added is None: - self.time_added = time.time() - else: - self.time_added = time_added + self.job_id = f"{id(self):016X}" + self.time_added = time.time() self._user = user def __str__(self) -> str: @@ -378,28 +347,11 @@ class QueuedJob: def as_dict(self, cur_time: float) -> Dict[str, Any]: return { - "filename": self.filename, - "job_id": self.job_id, - "time_added": self.time_added, - "time_in_queue": cur_time - self.time_added, + 'filename': self.filename, + 'job_id': self.job_id, + 'time_added': self.time_added, + 'time_in_queue': cur_time - self.time_added } - def as_ser_dict(self) -> Dict[str, Any]: - return { - "filename": self.filename, - "job_id": self.job_id, - "time_added": self.time_added, - } - - @classmethod - def from_ser_dict(cls, data: Dict[str, Any]) -> Self: - return cls( - data["filename"], - None, - data["job_id"], - data["time_added"], - ) - - def load_component(config: ConfigHelper) -> JobQueue: return JobQueue(config)