Compare commits
No commits in common. "feat/persistent-queue" and "master" have entirely different histories.
feat/persi
...
master
2 changed files with 87 additions and 136 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
|
@ -18,4 +18,3 @@ share
|
|||
.mypy_cache
|
||||
.pdm-build
|
||||
.pytest_cache
|
||||
uv.lock
|
||||
|
|
|
|||
|
|
@ -8,27 +8,27 @@ from __future__ import annotations
|
|||
import asyncio
|
||||
import time
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
from moonraker.utils.exceptions import ServerError
|
||||
from ..common import JobEvent, RequestType
|
||||
|
||||
# Annotation imports
|
||||
from typing import TYPE_CHECKING, Any, Optional, Dict, List, Union, Self
|
||||
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Optional,
|
||||
Dict,
|
||||
List,
|
||||
Union,
|
||||
)
|
||||
if TYPE_CHECKING:
|
||||
from ..confighelper import ConfigHelper
|
||||
from ..common import WebRequest, UserInfo
|
||||
from .klippy_apis import KlippyAPI
|
||||
from .database import MoonrakerDatabase as DBComp
|
||||
from .file_manager.file_manager import FileManager
|
||||
from .job_state import JobState
|
||||
|
||||
|
||||
class JobQueue:
|
||||
def __init__(self, config: ConfigHelper) -> None:
|
||||
self.server = config.get_server()
|
||||
self.db: DBComp = self.server.load_component(config, "database")
|
||||
self.queued_jobs: Dict[str, QueuedJob] = {}
|
||||
self.lock = asyncio.Lock()
|
||||
self.pause_requested: bool = False
|
||||
|
|
@ -36,21 +36,18 @@ class JobQueue:
|
|||
self.automatic = config.getboolean("automatic_transition", False)
|
||||
self.queue_state: str = "paused"
|
||||
self.job_delay = config.getfloat("job_transition_delay", 0.01)
|
||||
if self.job_delay <= 0.0:
|
||||
if self.job_delay <= 0.:
|
||||
raise config.error(
|
||||
"Value for option 'job_transition_delay' in section [job_queue]"
|
||||
" must be above 0.0"
|
||||
)
|
||||
self.job_transition_gcode = config.get("job_transition_gcode", "").strip()
|
||||
" must be above 0.0")
|
||||
self.job_transition_gcode = config.get(
|
||||
"job_transition_gcode", "").strip()
|
||||
self.pop_queue_handle: Optional[asyncio.TimerHandle] = None
|
||||
|
||||
event_loop = self.server.get_event_loop()
|
||||
event_loop.create_task(self._load_state())
|
||||
|
||||
self.server.register_event_handler("server:klippy_ready", self._handle_ready)
|
||||
self.server.register_event_handler(
|
||||
"server:klippy_shutdown", self._handle_shutdown
|
||||
)
|
||||
"server:klippy_ready", self._handle_ready)
|
||||
self.server.register_event_handler(
|
||||
"server:klippy_shutdown", self._handle_shutdown)
|
||||
self.server.register_event_handler(
|
||||
"job_state:state_changed", self._on_job_state_changed
|
||||
)
|
||||
|
|
@ -60,9 +57,8 @@ class JobQueue:
|
|||
self.server.register_remote_method("start_job_queue", self.start_queue)
|
||||
|
||||
self.server.register_endpoint(
|
||||
"/server/job_queue/job",
|
||||
RequestType.POST | RequestType.DELETE,
|
||||
self._handle_job_request,
|
||||
"/server/job_queue/job", RequestType.POST | RequestType.DELETE,
|
||||
self._handle_job_request
|
||||
)
|
||||
self.server.register_endpoint(
|
||||
"/server/job_queue/pause", RequestType.POST, self._handle_pause_queue
|
||||
|
|
@ -77,29 +73,16 @@ class JobQueue:
|
|||
"/server/job_queue/jump", RequestType.POST, self._handle_jump
|
||||
)
|
||||
|
||||
async def _load_state(self) -> None:
|
||||
queue_data: Dict[str, Any] = await self.db.get_item(
|
||||
"moonraker", "job_queue.queue_data", {}
|
||||
)
|
||||
self._from_ser_dict(queue_data)
|
||||
if self.queued_jobs:
|
||||
logging.info(f"queue loaded: {len(self.queued_jobs)} jobs")
|
||||
|
||||
async def _persist_state(self) -> None:
|
||||
queue_data = self._as_ser_dict()
|
||||
await self.db.insert_item("moonraker", "job_queue.queue_data", queue_data)
|
||||
|
||||
async def _handle_ready(self) -> None:
|
||||
async with self.lock:
|
||||
if not self.load_on_start or not self.queued_jobs:
|
||||
return
|
||||
# start a queued print
|
||||
if self.queue_state in ["ready", "paused"]:
|
||||
if self.queue_state in ['ready', 'paused']:
|
||||
event_loop = self.server.get_event_loop()
|
||||
self._set_queue_state("loading")
|
||||
self.pop_queue_handle = event_loop.delay_callback(
|
||||
1.0, self._pop_job, False
|
||||
)
|
||||
1., self._pop_job, False)
|
||||
|
||||
async def _handle_shutdown(self) -> None:
|
||||
has_requested_pause = self.pause_requested
|
||||
|
|
@ -121,8 +104,7 @@ class JobQueue:
|
|||
event_loop = self.server.get_event_loop()
|
||||
self._set_queue_state("loading")
|
||||
self.pop_queue_handle = event_loop.delay_callback(
|
||||
self.job_delay, self._pop_job
|
||||
)
|
||||
self.job_delay, self._pop_job)
|
||||
|
||||
async def _on_job_abort(self) -> None:
|
||||
async with self.lock:
|
||||
|
|
@ -137,7 +119,7 @@ class JobQueue:
|
|||
if not self.queued_jobs:
|
||||
self._set_queue_state("paused")
|
||||
return
|
||||
kapis: KlippyAPI = self.server.lookup_component("klippy_apis")
|
||||
kapis: KlippyAPI = self.server.lookup_component('klippy_apis')
|
||||
uid, job = list(self.queued_jobs.items())[0]
|
||||
filename = str(job)
|
||||
can_print = await self._check_can_print()
|
||||
|
|
@ -152,8 +134,7 @@ class JobQueue:
|
|||
if self.queue_state != "loading":
|
||||
self._set_queue_state("paused")
|
||||
raise self.server.error(
|
||||
"Queue State Changed during Transition Gcode"
|
||||
)
|
||||
"Queue State Changed during Transition Gcode")
|
||||
self._set_queue_state("starting")
|
||||
await kapis.start_print(
|
||||
filename, wait_klippy_started=True, user=job.user
|
||||
|
|
@ -172,32 +153,29 @@ class JobQueue:
|
|||
|
||||
async def _check_can_print(self) -> bool:
|
||||
# Query the latest stats
|
||||
kapis: KlippyAPI = self.server.lookup_component("klippy_apis")
|
||||
kapis: KlippyAPI = self.server.lookup_component('klippy_apis')
|
||||
try:
|
||||
result = await kapis.query_objects({"print_stats": None})
|
||||
except Exception:
|
||||
# Klippy not connected
|
||||
return False
|
||||
if "print_stats" not in result:
|
||||
if 'print_stats' not in result:
|
||||
return False
|
||||
state: str = result["print_stats"]["state"]
|
||||
state: str = result['print_stats']['state']
|
||||
if state in ["printing", "paused"]:
|
||||
return False
|
||||
return True
|
||||
|
||||
async def queue_job(
|
||||
self,
|
||||
filenames: Union[str, List[str]],
|
||||
check_exists: bool = True,
|
||||
reset: bool = False,
|
||||
user: Optional[UserInfo] = None,
|
||||
) -> None:
|
||||
async def queue_job(self,
|
||||
filenames: Union[str, List[str]],
|
||||
check_exists: bool = True,
|
||||
reset: bool = False,
|
||||
user: Optional[UserInfo] = None
|
||||
) -> None:
|
||||
async with self.lock:
|
||||
# Make sure that the file exists
|
||||
if isinstance(filenames, str):
|
||||
filenames = [filenames]
|
||||
# Temporary fix until Mainsail is updated
|
||||
filenames = [fn.removeprefix("/") for fn in filenames]
|
||||
if check_exists:
|
||||
# Make sure all files exist before adding them to the queue
|
||||
for fname in filenames:
|
||||
|
|
@ -214,9 +192,10 @@ class JobQueue:
|
|||
if last_evt.is_printing or last_evt == JobEvent.PAUSED:
|
||||
self._set_queue_state("ready")
|
||||
|
||||
async def delete_job(
|
||||
self, job_ids: Union[str, List[str]], all: bool = False
|
||||
) -> None:
|
||||
async def delete_job(self,
|
||||
job_ids: Union[str, List[str]],
|
||||
all: bool = False
|
||||
) -> None:
|
||||
async with self.lock:
|
||||
if not self.queued_jobs:
|
||||
# No jobs in queue, nothing to delete
|
||||
|
|
@ -259,30 +238,14 @@ class JobQueue:
|
|||
|
||||
def _job_map_to_list(self) -> List[Dict[str, Any]]:
|
||||
cur_time = time.time()
|
||||
return [job.as_dict(cur_time) for job in self.queued_jobs.values()]
|
||||
|
||||
def _as_ser_dict(self) -> Dict[str, Any]:
|
||||
return {"queued_jobs": [job.as_ser_dict() for job in self.queued_jobs.values()]}
|
||||
|
||||
def _from_ser_dict(self, data: Dict[str, Any]):
|
||||
self.queued_jobs.clear()
|
||||
queued_jobs = data.get("queued_jobs")
|
||||
if queued_jobs:
|
||||
for job_data in queued_jobs:
|
||||
job = QueuedJob.from_ser_dict(job_data)
|
||||
# check if the file still exists
|
||||
try:
|
||||
self._check_job_file(job.filename)
|
||||
except ServerError:
|
||||
logging.error(f"Queued print file {job.filename} does not exist")
|
||||
continue
|
||||
self.queued_jobs[job.job_id] = job
|
||||
self._send_queue_event("jobs_added")
|
||||
return [job.as_dict(cur_time) for
|
||||
job in self.queued_jobs.values()]
|
||||
|
||||
def _check_job_file(self, job_name: str) -> None:
|
||||
fm: FileManager = self.server.lookup_component("file_manager")
|
||||
fm: FileManager = self.server.lookup_component('file_manager')
|
||||
if not fm.check_file_exists("gcodes", job_name):
|
||||
raise self.server.error(f"G-Code File {job_name} does not exist")
|
||||
raise self.server.error(
|
||||
f"G-Code File {job_name} does not exist")
|
||||
|
||||
def _set_queue_state(self, new_state: str) -> None:
|
||||
if new_state != self.queue_state:
|
||||
|
|
@ -295,24 +258,19 @@ class JobQueue:
|
|||
updated_queue = self._job_map_to_list()
|
||||
event_loop = self.server.get_event_loop()
|
||||
event_loop.delay_callback(
|
||||
0.05,
|
||||
self.server.send_event,
|
||||
"job_queue:job_queue_changed",
|
||||
.05, self.server.send_event, "job_queue:job_queue_changed",
|
||||
{
|
||||
"action": action,
|
||||
"updated_queue": updated_queue,
|
||||
"queue_state": self.queue_state,
|
||||
},
|
||||
)
|
||||
'action': action,
|
||||
'updated_queue': updated_queue,
|
||||
'queue_state': self.queue_state
|
||||
})
|
||||
|
||||
# persist queue data
|
||||
event_loop.create_task(self._persist_state())
|
||||
|
||||
|
||||
async def _handle_job_request(self, web_request: WebRequest) -> Dict[str, Any]:
|
||||
async def _handle_job_request(
|
||||
self, web_request: WebRequest
|
||||
) -> Dict[str, Any]:
|
||||
req_type = web_request.get_request_type()
|
||||
if req_type == RequestType.POST:
|
||||
files = web_request.get_list("filenames")
|
||||
files = web_request.get_list('filenames')
|
||||
reset = web_request.get_boolean("reset", False)
|
||||
# Validate that all files exist before queueing
|
||||
user = web_request.get_current_user()
|
||||
|
|
@ -321,22 +279,40 @@ class JobQueue:
|
|||
if web_request.get_boolean("all", False):
|
||||
await self.delete_job([], all=True)
|
||||
else:
|
||||
job_ids = web_request.get_list("job_ids")
|
||||
job_ids = web_request.get_list('job_ids')
|
||||
await self.delete_job(job_ids)
|
||||
else:
|
||||
raise self.server.error(f"Invalid request type: {req_type}")
|
||||
return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state}
|
||||
return {
|
||||
'queued_jobs': self._job_map_to_list(),
|
||||
'queue_state': self.queue_state
|
||||
}
|
||||
|
||||
async def _handle_pause_queue(self, web_request: WebRequest) -> Dict[str, Any]:
|
||||
async def _handle_pause_queue(self,
|
||||
web_request: WebRequest
|
||||
) -> Dict[str, Any]:
|
||||
await self.pause_queue()
|
||||
return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state}
|
||||
return {
|
||||
'queued_jobs': self._job_map_to_list(),
|
||||
'queue_state': self.queue_state
|
||||
}
|
||||
|
||||
async def _handle_start_queue(self, web_request: WebRequest) -> Dict[str, Any]:
|
||||
async def _handle_start_queue(self,
|
||||
web_request: WebRequest
|
||||
) -> Dict[str, Any]:
|
||||
await self.start_queue()
|
||||
return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state}
|
||||
return {
|
||||
'queued_jobs': self._job_map_to_list(),
|
||||
'queue_state': self.queue_state
|
||||
}
|
||||
|
||||
async def _handle_queue_status(self, web_request: WebRequest) -> Dict[str, Any]:
|
||||
return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state}
|
||||
async def _handle_queue_status(self,
|
||||
web_request: WebRequest
|
||||
) -> Dict[str, Any]:
|
||||
return {
|
||||
'queued_jobs': self._job_map_to_list(),
|
||||
'queue_state': self.queue_state
|
||||
}
|
||||
|
||||
async def _handle_jump(self, web_request: WebRequest) -> Dict[str, Any]:
|
||||
job_id: str = web_request.get("job_id")
|
||||
|
|
@ -347,26 +323,19 @@ class JobQueue:
|
|||
new_queue = {job_id: job}
|
||||
new_queue.update(self.queued_jobs)
|
||||
self.queued_jobs = new_queue
|
||||
return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state}
|
||||
return {
|
||||
'queued_jobs': self._job_map_to_list(),
|
||||
'queue_state': self.queue_state
|
||||
}
|
||||
|
||||
async def close(self):
|
||||
await self.pause_queue()
|
||||
|
||||
|
||||
class QueuedJob:
|
||||
def __init__(
|
||||
self,
|
||||
filename: str,
|
||||
user: Optional[UserInfo] = None,
|
||||
job_id: Optional[str] = None,
|
||||
time_added: Optional[float] = None,
|
||||
) -> None:
|
||||
def __init__(self, filename: str, user: Optional[UserInfo] = None) -> None:
|
||||
self.filename = filename
|
||||
self.job_id = job_id or str(uuid.uuid4())
|
||||
if time_added is None:
|
||||
self.time_added = time.time()
|
||||
else:
|
||||
self.time_added = time_added
|
||||
self.job_id = f"{id(self):016X}"
|
||||
self.time_added = time.time()
|
||||
self._user = user
|
||||
|
||||
def __str__(self) -> str:
|
||||
|
|
@ -378,28 +347,11 @@ class QueuedJob:
|
|||
|
||||
def as_dict(self, cur_time: float) -> Dict[str, Any]:
|
||||
return {
|
||||
"filename": self.filename,
|
||||
"job_id": self.job_id,
|
||||
"time_added": self.time_added,
|
||||
"time_in_queue": cur_time - self.time_added,
|
||||
'filename': self.filename,
|
||||
'job_id': self.job_id,
|
||||
'time_added': self.time_added,
|
||||
'time_in_queue': cur_time - self.time_added
|
||||
}
|
||||
|
||||
def as_ser_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"filename": self.filename,
|
||||
"job_id": self.job_id,
|
||||
"time_added": self.time_added,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_ser_dict(cls, data: Dict[str, Any]) -> Self:
|
||||
return cls(
|
||||
data["filename"],
|
||||
None,
|
||||
data["job_id"],
|
||||
data["time_added"],
|
||||
)
|
||||
|
||||
|
||||
def load_component(config: ConfigHelper) -> JobQueue:
|
||||
return JobQueue(config)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue