Compare commits

..

No commits in common. "feat/persistent-queue" and "master" have entirely different histories.

2 changed files with 87 additions and 136 deletions

1
.gitignore vendored
View file

@ -18,4 +18,3 @@ share
.mypy_cache
.pdm-build
.pytest_cache
uv.lock

View file

@ -8,27 +8,27 @@ from __future__ import annotations
import asyncio
import time
import logging
import uuid
from moonraker.utils.exceptions import ServerError
from ..common import JobEvent, RequestType
# Annotation imports
from typing import TYPE_CHECKING, Any, Optional, Dict, List, Union, Self
from typing import (
TYPE_CHECKING,
Any,
Optional,
Dict,
List,
Union,
)
if TYPE_CHECKING:
from ..confighelper import ConfigHelper
from ..common import WebRequest, UserInfo
from .klippy_apis import KlippyAPI
from .database import MoonrakerDatabase as DBComp
from .file_manager.file_manager import FileManager
from .job_state import JobState
class JobQueue:
def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server()
self.db: DBComp = self.server.load_component(config, "database")
self.queued_jobs: Dict[str, QueuedJob] = {}
self.lock = asyncio.Lock()
self.pause_requested: bool = False
@ -36,21 +36,18 @@ class JobQueue:
self.automatic = config.getboolean("automatic_transition", False)
self.queue_state: str = "paused"
self.job_delay = config.getfloat("job_transition_delay", 0.01)
if self.job_delay <= 0.0:
if self.job_delay <= 0.:
raise config.error(
"Value for option 'job_transition_delay' in section [job_queue]"
" must be above 0.0"
)
self.job_transition_gcode = config.get("job_transition_gcode", "").strip()
" must be above 0.0")
self.job_transition_gcode = config.get(
"job_transition_gcode", "").strip()
self.pop_queue_handle: Optional[asyncio.TimerHandle] = None
event_loop = self.server.get_event_loop()
event_loop.create_task(self._load_state())
self.server.register_event_handler("server:klippy_ready", self._handle_ready)
self.server.register_event_handler(
"server:klippy_shutdown", self._handle_shutdown
)
"server:klippy_ready", self._handle_ready)
self.server.register_event_handler(
"server:klippy_shutdown", self._handle_shutdown)
self.server.register_event_handler(
"job_state:state_changed", self._on_job_state_changed
)
@ -60,9 +57,8 @@ class JobQueue:
self.server.register_remote_method("start_job_queue", self.start_queue)
self.server.register_endpoint(
"/server/job_queue/job",
RequestType.POST | RequestType.DELETE,
self._handle_job_request,
"/server/job_queue/job", RequestType.POST | RequestType.DELETE,
self._handle_job_request
)
self.server.register_endpoint(
"/server/job_queue/pause", RequestType.POST, self._handle_pause_queue
@ -77,29 +73,16 @@ class JobQueue:
"/server/job_queue/jump", RequestType.POST, self._handle_jump
)
async def _load_state(self) -> None:
queue_data: Dict[str, Any] = await self.db.get_item(
"moonraker", "job_queue.queue_data", {}
)
self._from_ser_dict(queue_data)
if self.queued_jobs:
logging.info(f"queue loaded: {len(self.queued_jobs)} jobs")
async def _persist_state(self) -> None:
queue_data = self._as_ser_dict()
await self.db.insert_item("moonraker", "job_queue.queue_data", queue_data)
async def _handle_ready(self) -> None:
async with self.lock:
if not self.load_on_start or not self.queued_jobs:
return
# start a queued print
if self.queue_state in ["ready", "paused"]:
if self.queue_state in ['ready', 'paused']:
event_loop = self.server.get_event_loop()
self._set_queue_state("loading")
self.pop_queue_handle = event_loop.delay_callback(
1.0, self._pop_job, False
)
1., self._pop_job, False)
async def _handle_shutdown(self) -> None:
has_requested_pause = self.pause_requested
@ -121,8 +104,7 @@ class JobQueue:
event_loop = self.server.get_event_loop()
self._set_queue_state("loading")
self.pop_queue_handle = event_loop.delay_callback(
self.job_delay, self._pop_job
)
self.job_delay, self._pop_job)
async def _on_job_abort(self) -> None:
async with self.lock:
@ -137,7 +119,7 @@ class JobQueue:
if not self.queued_jobs:
self._set_queue_state("paused")
return
kapis: KlippyAPI = self.server.lookup_component("klippy_apis")
kapis: KlippyAPI = self.server.lookup_component('klippy_apis')
uid, job = list(self.queued_jobs.items())[0]
filename = str(job)
can_print = await self._check_can_print()
@ -152,8 +134,7 @@ class JobQueue:
if self.queue_state != "loading":
self._set_queue_state("paused")
raise self.server.error(
"Queue State Changed during Transition Gcode"
)
"Queue State Changed during Transition Gcode")
self._set_queue_state("starting")
await kapis.start_print(
filename, wait_klippy_started=True, user=job.user
@ -172,32 +153,29 @@ class JobQueue:
async def _check_can_print(self) -> bool:
# Query the latest stats
kapis: KlippyAPI = self.server.lookup_component("klippy_apis")
kapis: KlippyAPI = self.server.lookup_component('klippy_apis')
try:
result = await kapis.query_objects({"print_stats": None})
except Exception:
# Klippy not connected
return False
if "print_stats" not in result:
if 'print_stats' not in result:
return False
state: str = result["print_stats"]["state"]
state: str = result['print_stats']['state']
if state in ["printing", "paused"]:
return False
return True
async def queue_job(
self,
filenames: Union[str, List[str]],
check_exists: bool = True,
reset: bool = False,
user: Optional[UserInfo] = None,
) -> None:
async def queue_job(self,
filenames: Union[str, List[str]],
check_exists: bool = True,
reset: bool = False,
user: Optional[UserInfo] = None
) -> None:
async with self.lock:
# Make sure that the file exists
if isinstance(filenames, str):
filenames = [filenames]
# Temporary fix until Mainsail is updated
filenames = [fn.removeprefix("/") for fn in filenames]
if check_exists:
# Make sure all files exist before adding them to the queue
for fname in filenames:
@ -214,9 +192,10 @@ class JobQueue:
if last_evt.is_printing or last_evt == JobEvent.PAUSED:
self._set_queue_state("ready")
async def delete_job(
self, job_ids: Union[str, List[str]], all: bool = False
) -> None:
async def delete_job(self,
job_ids: Union[str, List[str]],
all: bool = False
) -> None:
async with self.lock:
if not self.queued_jobs:
# No jobs in queue, nothing to delete
@ -259,30 +238,14 @@ class JobQueue:
def _job_map_to_list(self) -> List[Dict[str, Any]]:
cur_time = time.time()
return [job.as_dict(cur_time) for job in self.queued_jobs.values()]
def _as_ser_dict(self) -> Dict[str, Any]:
return {"queued_jobs": [job.as_ser_dict() for job in self.queued_jobs.values()]}
def _from_ser_dict(self, data: Dict[str, Any]):
self.queued_jobs.clear()
queued_jobs = data.get("queued_jobs")
if queued_jobs:
for job_data in queued_jobs:
job = QueuedJob.from_ser_dict(job_data)
# check if the file still exists
try:
self._check_job_file(job.filename)
except ServerError:
logging.error(f"Queued print file {job.filename} does not exist")
continue
self.queued_jobs[job.job_id] = job
self._send_queue_event("jobs_added")
return [job.as_dict(cur_time) for
job in self.queued_jobs.values()]
def _check_job_file(self, job_name: str) -> None:
fm: FileManager = self.server.lookup_component("file_manager")
fm: FileManager = self.server.lookup_component('file_manager')
if not fm.check_file_exists("gcodes", job_name):
raise self.server.error(f"G-Code File {job_name} does not exist")
raise self.server.error(
f"G-Code File {job_name} does not exist")
def _set_queue_state(self, new_state: str) -> None:
if new_state != self.queue_state:
@ -295,24 +258,19 @@ class JobQueue:
updated_queue = self._job_map_to_list()
event_loop = self.server.get_event_loop()
event_loop.delay_callback(
0.05,
self.server.send_event,
"job_queue:job_queue_changed",
.05, self.server.send_event, "job_queue:job_queue_changed",
{
"action": action,
"updated_queue": updated_queue,
"queue_state": self.queue_state,
},
)
'action': action,
'updated_queue': updated_queue,
'queue_state': self.queue_state
})
# persist queue data
event_loop.create_task(self._persist_state())
async def _handle_job_request(self, web_request: WebRequest) -> Dict[str, Any]:
async def _handle_job_request(
self, web_request: WebRequest
) -> Dict[str, Any]:
req_type = web_request.get_request_type()
if req_type == RequestType.POST:
files = web_request.get_list("filenames")
files = web_request.get_list('filenames')
reset = web_request.get_boolean("reset", False)
# Validate that all files exist before queueing
user = web_request.get_current_user()
@ -321,22 +279,40 @@ class JobQueue:
if web_request.get_boolean("all", False):
await self.delete_job([], all=True)
else:
job_ids = web_request.get_list("job_ids")
job_ids = web_request.get_list('job_ids')
await self.delete_job(job_ids)
else:
raise self.server.error(f"Invalid request type: {req_type}")
return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state}
return {
'queued_jobs': self._job_map_to_list(),
'queue_state': self.queue_state
}
async def _handle_pause_queue(self, web_request: WebRequest) -> Dict[str, Any]:
async def _handle_pause_queue(self,
web_request: WebRequest
) -> Dict[str, Any]:
await self.pause_queue()
return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state}
return {
'queued_jobs': self._job_map_to_list(),
'queue_state': self.queue_state
}
async def _handle_start_queue(self, web_request: WebRequest) -> Dict[str, Any]:
async def _handle_start_queue(self,
web_request: WebRequest
) -> Dict[str, Any]:
await self.start_queue()
return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state}
return {
'queued_jobs': self._job_map_to_list(),
'queue_state': self.queue_state
}
async def _handle_queue_status(self, web_request: WebRequest) -> Dict[str, Any]:
return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state}
async def _handle_queue_status(self,
web_request: WebRequest
) -> Dict[str, Any]:
return {
'queued_jobs': self._job_map_to_list(),
'queue_state': self.queue_state
}
async def _handle_jump(self, web_request: WebRequest) -> Dict[str, Any]:
job_id: str = web_request.get("job_id")
@ -347,26 +323,19 @@ class JobQueue:
new_queue = {job_id: job}
new_queue.update(self.queued_jobs)
self.queued_jobs = new_queue
return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state}
return {
'queued_jobs': self._job_map_to_list(),
'queue_state': self.queue_state
}
async def close(self):
await self.pause_queue()
class QueuedJob:
def __init__(
self,
filename: str,
user: Optional[UserInfo] = None,
job_id: Optional[str] = None,
time_added: Optional[float] = None,
) -> None:
def __init__(self, filename: str, user: Optional[UserInfo] = None) -> None:
self.filename = filename
self.job_id = job_id or str(uuid.uuid4())
if time_added is None:
self.time_added = time.time()
else:
self.time_added = time_added
self.job_id = f"{id(self):016X}"
self.time_added = time.time()
self._user = user
def __str__(self) -> str:
@ -378,28 +347,11 @@ class QueuedJob:
def as_dict(self, cur_time: float) -> Dict[str, Any]:
return {
"filename": self.filename,
"job_id": self.job_id,
"time_added": self.time_added,
"time_in_queue": cur_time - self.time_added,
'filename': self.filename,
'job_id': self.job_id,
'time_added': self.time_added,
'time_in_queue': cur_time - self.time_added
}
def as_ser_dict(self) -> Dict[str, Any]:
return {
"filename": self.filename,
"job_id": self.job_id,
"time_added": self.time_added,
}
@classmethod
def from_ser_dict(cls, data: Dict[str, Any]) -> Self:
return cls(
data["filename"],
None,
data["job_id"],
data["time_added"],
)
def load_component(config: ConfigHelper) -> JobQueue:
return JobQueue(config)