Compare commits

...
Sign in to create a new pull request.

4 commits

Author SHA1 Message Date
3b243b883c remove / prefix from queue paths 2026-04-19 01:50:29 +02:00
52bb054a5f do not persist user data 2026-04-19 01:31:23 +02:00
0bbccfce0b fix: use eventloop for db access 2026-04-19 01:22:04 +02:00
dcd9aaad89 feat: add job queue 2026-04-19 01:01:18 +02:00
2 changed files with 136 additions and 87 deletions

1
.gitignore vendored
View file

@ -18,3 +18,4 @@ share
.mypy_cache .mypy_cache
.pdm-build .pdm-build
.pytest_cache .pytest_cache
uv.lock

View file

@ -8,27 +8,27 @@ from __future__ import annotations
import asyncio import asyncio
import time import time
import logging import logging
import uuid
from moonraker.utils.exceptions import ServerError
from ..common import JobEvent, RequestType from ..common import JobEvent, RequestType
# Annotation imports # Annotation imports
from typing import ( from typing import TYPE_CHECKING, Any, Optional, Dict, List, Union, Self
TYPE_CHECKING,
Any,
Optional,
Dict,
List,
Union,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from ..confighelper import ConfigHelper from ..confighelper import ConfigHelper
from ..common import WebRequest, UserInfo from ..common import WebRequest, UserInfo
from .klippy_apis import KlippyAPI from .klippy_apis import KlippyAPI
from .database import MoonrakerDatabase as DBComp
from .file_manager.file_manager import FileManager from .file_manager.file_manager import FileManager
from .job_state import JobState from .job_state import JobState
class JobQueue: class JobQueue:
def __init__(self, config: ConfigHelper) -> None: def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server() self.server = config.get_server()
self.db: DBComp = self.server.load_component(config, "database")
self.queued_jobs: Dict[str, QueuedJob] = {} self.queued_jobs: Dict[str, QueuedJob] = {}
self.lock = asyncio.Lock() self.lock = asyncio.Lock()
self.pause_requested: bool = False self.pause_requested: bool = False
@ -36,18 +36,21 @@ class JobQueue:
self.automatic = config.getboolean("automatic_transition", False) self.automatic = config.getboolean("automatic_transition", False)
self.queue_state: str = "paused" self.queue_state: str = "paused"
self.job_delay = config.getfloat("job_transition_delay", 0.01) self.job_delay = config.getfloat("job_transition_delay", 0.01)
if self.job_delay <= 0.: if self.job_delay <= 0.0:
raise config.error( raise config.error(
"Value for option 'job_transition_delay' in section [job_queue]" "Value for option 'job_transition_delay' in section [job_queue]"
" must be above 0.0") " must be above 0.0"
self.job_transition_gcode = config.get( )
"job_transition_gcode", "").strip() self.job_transition_gcode = config.get("job_transition_gcode", "").strip()
self.pop_queue_handle: Optional[asyncio.TimerHandle] = None self.pop_queue_handle: Optional[asyncio.TimerHandle] = None
event_loop = self.server.get_event_loop()
event_loop.create_task(self._load_state())
self.server.register_event_handler("server:klippy_ready", self._handle_ready)
self.server.register_event_handler( self.server.register_event_handler(
"server:klippy_ready", self._handle_ready) "server:klippy_shutdown", self._handle_shutdown
self.server.register_event_handler( )
"server:klippy_shutdown", self._handle_shutdown)
self.server.register_event_handler( self.server.register_event_handler(
"job_state:state_changed", self._on_job_state_changed "job_state:state_changed", self._on_job_state_changed
) )
@ -57,8 +60,9 @@ class JobQueue:
self.server.register_remote_method("start_job_queue", self.start_queue) self.server.register_remote_method("start_job_queue", self.start_queue)
self.server.register_endpoint( self.server.register_endpoint(
"/server/job_queue/job", RequestType.POST | RequestType.DELETE, "/server/job_queue/job",
self._handle_job_request RequestType.POST | RequestType.DELETE,
self._handle_job_request,
) )
self.server.register_endpoint( self.server.register_endpoint(
"/server/job_queue/pause", RequestType.POST, self._handle_pause_queue "/server/job_queue/pause", RequestType.POST, self._handle_pause_queue
@ -73,16 +77,29 @@ class JobQueue:
"/server/job_queue/jump", RequestType.POST, self._handle_jump "/server/job_queue/jump", RequestType.POST, self._handle_jump
) )
async def _load_state(self) -> None:
queue_data: Dict[str, Any] = await self.db.get_item(
"moonraker", "job_queue.queue_data", {}
)
self._from_ser_dict(queue_data)
if self.queued_jobs:
logging.info(f"queue loaded: {len(self.queued_jobs)} jobs")
async def _persist_state(self) -> None:
queue_data = self._as_ser_dict()
await self.db.insert_item("moonraker", "job_queue.queue_data", queue_data)
async def _handle_ready(self) -> None: async def _handle_ready(self) -> None:
async with self.lock: async with self.lock:
if not self.load_on_start or not self.queued_jobs: if not self.load_on_start or not self.queued_jobs:
return return
# start a queued print # start a queued print
if self.queue_state in ['ready', 'paused']: if self.queue_state in ["ready", "paused"]:
event_loop = self.server.get_event_loop() event_loop = self.server.get_event_loop()
self._set_queue_state("loading") self._set_queue_state("loading")
self.pop_queue_handle = event_loop.delay_callback( self.pop_queue_handle = event_loop.delay_callback(
1., self._pop_job, False) 1.0, self._pop_job, False
)
async def _handle_shutdown(self) -> None: async def _handle_shutdown(self) -> None:
has_requested_pause = self.pause_requested has_requested_pause = self.pause_requested
@ -104,7 +121,8 @@ class JobQueue:
event_loop = self.server.get_event_loop() event_loop = self.server.get_event_loop()
self._set_queue_state("loading") self._set_queue_state("loading")
self.pop_queue_handle = event_loop.delay_callback( self.pop_queue_handle = event_loop.delay_callback(
self.job_delay, self._pop_job) self.job_delay, self._pop_job
)
async def _on_job_abort(self) -> None: async def _on_job_abort(self) -> None:
async with self.lock: async with self.lock:
@ -119,7 +137,7 @@ class JobQueue:
if not self.queued_jobs: if not self.queued_jobs:
self._set_queue_state("paused") self._set_queue_state("paused")
return return
kapis: KlippyAPI = self.server.lookup_component('klippy_apis') kapis: KlippyAPI = self.server.lookup_component("klippy_apis")
uid, job = list(self.queued_jobs.items())[0] uid, job = list(self.queued_jobs.items())[0]
filename = str(job) filename = str(job)
can_print = await self._check_can_print() can_print = await self._check_can_print()
@ -134,7 +152,8 @@ class JobQueue:
if self.queue_state != "loading": if self.queue_state != "loading":
self._set_queue_state("paused") self._set_queue_state("paused")
raise self.server.error( raise self.server.error(
"Queue State Changed during Transition Gcode") "Queue State Changed during Transition Gcode"
)
self._set_queue_state("starting") self._set_queue_state("starting")
await kapis.start_print( await kapis.start_print(
filename, wait_klippy_started=True, user=job.user filename, wait_klippy_started=True, user=job.user
@ -153,29 +172,32 @@ class JobQueue:
async def _check_can_print(self) -> bool: async def _check_can_print(self) -> bool:
# Query the latest stats # Query the latest stats
kapis: KlippyAPI = self.server.lookup_component('klippy_apis') kapis: KlippyAPI = self.server.lookup_component("klippy_apis")
try: try:
result = await kapis.query_objects({"print_stats": None}) result = await kapis.query_objects({"print_stats": None})
except Exception: except Exception:
# Klippy not connected # Klippy not connected
return False return False
if 'print_stats' not in result: if "print_stats" not in result:
return False return False
state: str = result['print_stats']['state'] state: str = result["print_stats"]["state"]
if state in ["printing", "paused"]: if state in ["printing", "paused"]:
return False return False
return True return True
async def queue_job(self, async def queue_job(
self,
filenames: Union[str, List[str]], filenames: Union[str, List[str]],
check_exists: bool = True, check_exists: bool = True,
reset: bool = False, reset: bool = False,
user: Optional[UserInfo] = None user: Optional[UserInfo] = None,
) -> None: ) -> None:
async with self.lock: async with self.lock:
# Make sure that the file exists # Make sure that the file exists
if isinstance(filenames, str): if isinstance(filenames, str):
filenames = [filenames] filenames = [filenames]
# Temporary fix until Mainsail is updated
filenames = [fn.removeprefix("/") for fn in filenames]
if check_exists: if check_exists:
# Make sure all files exist before adding them to the queue # Make sure all files exist before adding them to the queue
for fname in filenames: for fname in filenames:
@ -192,9 +214,8 @@ class JobQueue:
if last_evt.is_printing or last_evt == JobEvent.PAUSED: if last_evt.is_printing or last_evt == JobEvent.PAUSED:
self._set_queue_state("ready") self._set_queue_state("ready")
async def delete_job(self, async def delete_job(
job_ids: Union[str, List[str]], self, job_ids: Union[str, List[str]], all: bool = False
all: bool = False
) -> None: ) -> None:
async with self.lock: async with self.lock:
if not self.queued_jobs: if not self.queued_jobs:
@ -238,14 +259,30 @@ class JobQueue:
def _job_map_to_list(self) -> List[Dict[str, Any]]: def _job_map_to_list(self) -> List[Dict[str, Any]]:
cur_time = time.time() cur_time = time.time()
return [job.as_dict(cur_time) for return [job.as_dict(cur_time) for job in self.queued_jobs.values()]
job in self.queued_jobs.values()]
def _as_ser_dict(self) -> Dict[str, Any]:
return {"queued_jobs": [job.as_ser_dict() for job in self.queued_jobs.values()]}
def _from_ser_dict(self, data: Dict[str, Any]):
self.queued_jobs.clear()
queued_jobs = data.get("queued_jobs")
if queued_jobs:
for job_data in queued_jobs:
job = QueuedJob.from_ser_dict(job_data)
# check if the file still exists
try:
self._check_job_file(job.filename)
except ServerError:
logging.error(f"Queued print file {job.filename} does not exist")
continue
self.queued_jobs[job.job_id] = job
self._send_queue_event("jobs_added")
def _check_job_file(self, job_name: str) -> None: def _check_job_file(self, job_name: str) -> None:
fm: FileManager = self.server.lookup_component('file_manager') fm: FileManager = self.server.lookup_component("file_manager")
if not fm.check_file_exists("gcodes", job_name): if not fm.check_file_exists("gcodes", job_name):
raise self.server.error( raise self.server.error(f"G-Code File {job_name} does not exist")
f"G-Code File {job_name} does not exist")
def _set_queue_state(self, new_state: str) -> None: def _set_queue_state(self, new_state: str) -> None:
if new_state != self.queue_state: if new_state != self.queue_state:
@ -258,19 +295,24 @@ class JobQueue:
updated_queue = self._job_map_to_list() updated_queue = self._job_map_to_list()
event_loop = self.server.get_event_loop() event_loop = self.server.get_event_loop()
event_loop.delay_callback( event_loop.delay_callback(
.05, self.server.send_event, "job_queue:job_queue_changed", 0.05,
self.server.send_event,
"job_queue:job_queue_changed",
{ {
'action': action, "action": action,
'updated_queue': updated_queue, "updated_queue": updated_queue,
'queue_state': self.queue_state "queue_state": self.queue_state,
}) },
)
async def _handle_job_request( # persist queue data
self, web_request: WebRequest event_loop.create_task(self._persist_state())
) -> Dict[str, Any]:
async def _handle_job_request(self, web_request: WebRequest) -> Dict[str, Any]:
req_type = web_request.get_request_type() req_type = web_request.get_request_type()
if req_type == RequestType.POST: if req_type == RequestType.POST:
files = web_request.get_list('filenames') files = web_request.get_list("filenames")
reset = web_request.get_boolean("reset", False) reset = web_request.get_boolean("reset", False)
# Validate that all files exist before queueing # Validate that all files exist before queueing
user = web_request.get_current_user() user = web_request.get_current_user()
@ -279,40 +321,22 @@ class JobQueue:
if web_request.get_boolean("all", False): if web_request.get_boolean("all", False):
await self.delete_job([], all=True) await self.delete_job([], all=True)
else: else:
job_ids = web_request.get_list('job_ids') job_ids = web_request.get_list("job_ids")
await self.delete_job(job_ids) await self.delete_job(job_ids)
else: else:
raise self.server.error(f"Invalid request type: {req_type}") raise self.server.error(f"Invalid request type: {req_type}")
return { return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state}
'queued_jobs': self._job_map_to_list(),
'queue_state': self.queue_state
}
async def _handle_pause_queue(self, async def _handle_pause_queue(self, web_request: WebRequest) -> Dict[str, Any]:
web_request: WebRequest
) -> Dict[str, Any]:
await self.pause_queue() await self.pause_queue()
return { return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state}
'queued_jobs': self._job_map_to_list(),
'queue_state': self.queue_state
}
async def _handle_start_queue(self, async def _handle_start_queue(self, web_request: WebRequest) -> Dict[str, Any]:
web_request: WebRequest
) -> Dict[str, Any]:
await self.start_queue() await self.start_queue()
return { return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state}
'queued_jobs': self._job_map_to_list(),
'queue_state': self.queue_state
}
async def _handle_queue_status(self, async def _handle_queue_status(self, web_request: WebRequest) -> Dict[str, Any]:
web_request: WebRequest return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state}
) -> Dict[str, Any]:
return {
'queued_jobs': self._job_map_to_list(),
'queue_state': self.queue_state
}
async def _handle_jump(self, web_request: WebRequest) -> Dict[str, Any]: async def _handle_jump(self, web_request: WebRequest) -> Dict[str, Any]:
job_id: str = web_request.get("job_id") job_id: str = web_request.get("job_id")
@ -323,19 +347,26 @@ class JobQueue:
new_queue = {job_id: job} new_queue = {job_id: job}
new_queue.update(self.queued_jobs) new_queue.update(self.queued_jobs)
self.queued_jobs = new_queue self.queued_jobs = new_queue
return { return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state}
'queued_jobs': self._job_map_to_list(),
'queue_state': self.queue_state
}
async def close(self): async def close(self):
await self.pause_queue() await self.pause_queue()
class QueuedJob: class QueuedJob:
def __init__(self, filename: str, user: Optional[UserInfo] = None) -> None: def __init__(
self,
filename: str,
user: Optional[UserInfo] = None,
job_id: Optional[str] = None,
time_added: Optional[float] = None,
) -> None:
self.filename = filename self.filename = filename
self.job_id = f"{id(self):016X}" self.job_id = job_id or str(uuid.uuid4())
if time_added is None:
self.time_added = time.time() self.time_added = time.time()
else:
self.time_added = time_added
self._user = user self._user = user
def __str__(self) -> str: def __str__(self) -> str:
@ -347,11 +378,28 @@ class QueuedJob:
def as_dict(self, cur_time: float) -> Dict[str, Any]: def as_dict(self, cur_time: float) -> Dict[str, Any]:
return { return {
'filename': self.filename, "filename": self.filename,
'job_id': self.job_id, "job_id": self.job_id,
'time_added': self.time_added, "time_added": self.time_added,
'time_in_queue': cur_time - self.time_added "time_in_queue": cur_time - self.time_added,
} }
def as_ser_dict(self) -> Dict[str, Any]:
return {
"filename": self.filename,
"job_id": self.job_id,
"time_added": self.time_added,
}
@classmethod
def from_ser_dict(cls, data: Dict[str, Any]) -> Self:
return cls(
data["filename"],
None,
data["job_id"],
data["time_added"],
)
def load_component(config: ConfigHelper) -> JobQueue: def load_component(config: ConfigHelper) -> JobQueue:
return JobQueue(config) return JobQueue(config)