Compare commits
4 commits
master
...
feat/persi
| Author | SHA1 | Date | |
|---|---|---|---|
| 3b243b883c | |||
| 52bb054a5f | |||
| 0bbccfce0b | |||
| dcd9aaad89 |
2 changed files with 136 additions and 87 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
|
@ -18,3 +18,4 @@ share
|
||||||
.mypy_cache
|
.mypy_cache
|
||||||
.pdm-build
|
.pdm-build
|
||||||
.pytest_cache
|
.pytest_cache
|
||||||
|
uv.lock
|
||||||
|
|
|
||||||
|
|
@ -8,27 +8,27 @@ from __future__ import annotations
|
||||||
import asyncio
|
import asyncio
|
||||||
import time
|
import time
|
||||||
import logging
|
import logging
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
from moonraker.utils.exceptions import ServerError
|
||||||
from ..common import JobEvent, RequestType
|
from ..common import JobEvent, RequestType
|
||||||
|
|
||||||
# Annotation imports
|
# Annotation imports
|
||||||
from typing import (
|
from typing import TYPE_CHECKING, Any, Optional, Dict, List, Union, Self
|
||||||
TYPE_CHECKING,
|
|
||||||
Any,
|
|
||||||
Optional,
|
|
||||||
Dict,
|
|
||||||
List,
|
|
||||||
Union,
|
|
||||||
)
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ..confighelper import ConfigHelper
|
from ..confighelper import ConfigHelper
|
||||||
from ..common import WebRequest, UserInfo
|
from ..common import WebRequest, UserInfo
|
||||||
from .klippy_apis import KlippyAPI
|
from .klippy_apis import KlippyAPI
|
||||||
|
from .database import MoonrakerDatabase as DBComp
|
||||||
from .file_manager.file_manager import FileManager
|
from .file_manager.file_manager import FileManager
|
||||||
from .job_state import JobState
|
from .job_state import JobState
|
||||||
|
|
||||||
|
|
||||||
class JobQueue:
|
class JobQueue:
|
||||||
def __init__(self, config: ConfigHelper) -> None:
|
def __init__(self, config: ConfigHelper) -> None:
|
||||||
self.server = config.get_server()
|
self.server = config.get_server()
|
||||||
|
self.db: DBComp = self.server.load_component(config, "database")
|
||||||
self.queued_jobs: Dict[str, QueuedJob] = {}
|
self.queued_jobs: Dict[str, QueuedJob] = {}
|
||||||
self.lock = asyncio.Lock()
|
self.lock = asyncio.Lock()
|
||||||
self.pause_requested: bool = False
|
self.pause_requested: bool = False
|
||||||
|
|
@ -36,18 +36,21 @@ class JobQueue:
|
||||||
self.automatic = config.getboolean("automatic_transition", False)
|
self.automatic = config.getboolean("automatic_transition", False)
|
||||||
self.queue_state: str = "paused"
|
self.queue_state: str = "paused"
|
||||||
self.job_delay = config.getfloat("job_transition_delay", 0.01)
|
self.job_delay = config.getfloat("job_transition_delay", 0.01)
|
||||||
if self.job_delay <= 0.:
|
if self.job_delay <= 0.0:
|
||||||
raise config.error(
|
raise config.error(
|
||||||
"Value for option 'job_transition_delay' in section [job_queue]"
|
"Value for option 'job_transition_delay' in section [job_queue]"
|
||||||
" must be above 0.0")
|
" must be above 0.0"
|
||||||
self.job_transition_gcode = config.get(
|
)
|
||||||
"job_transition_gcode", "").strip()
|
self.job_transition_gcode = config.get("job_transition_gcode", "").strip()
|
||||||
self.pop_queue_handle: Optional[asyncio.TimerHandle] = None
|
self.pop_queue_handle: Optional[asyncio.TimerHandle] = None
|
||||||
|
|
||||||
|
event_loop = self.server.get_event_loop()
|
||||||
|
event_loop.create_task(self._load_state())
|
||||||
|
|
||||||
|
self.server.register_event_handler("server:klippy_ready", self._handle_ready)
|
||||||
self.server.register_event_handler(
|
self.server.register_event_handler(
|
||||||
"server:klippy_ready", self._handle_ready)
|
"server:klippy_shutdown", self._handle_shutdown
|
||||||
self.server.register_event_handler(
|
)
|
||||||
"server:klippy_shutdown", self._handle_shutdown)
|
|
||||||
self.server.register_event_handler(
|
self.server.register_event_handler(
|
||||||
"job_state:state_changed", self._on_job_state_changed
|
"job_state:state_changed", self._on_job_state_changed
|
||||||
)
|
)
|
||||||
|
|
@ -57,8 +60,9 @@ class JobQueue:
|
||||||
self.server.register_remote_method("start_job_queue", self.start_queue)
|
self.server.register_remote_method("start_job_queue", self.start_queue)
|
||||||
|
|
||||||
self.server.register_endpoint(
|
self.server.register_endpoint(
|
||||||
"/server/job_queue/job", RequestType.POST | RequestType.DELETE,
|
"/server/job_queue/job",
|
||||||
self._handle_job_request
|
RequestType.POST | RequestType.DELETE,
|
||||||
|
self._handle_job_request,
|
||||||
)
|
)
|
||||||
self.server.register_endpoint(
|
self.server.register_endpoint(
|
||||||
"/server/job_queue/pause", RequestType.POST, self._handle_pause_queue
|
"/server/job_queue/pause", RequestType.POST, self._handle_pause_queue
|
||||||
|
|
@ -73,16 +77,29 @@ class JobQueue:
|
||||||
"/server/job_queue/jump", RequestType.POST, self._handle_jump
|
"/server/job_queue/jump", RequestType.POST, self._handle_jump
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def _load_state(self) -> None:
|
||||||
|
queue_data: Dict[str, Any] = await self.db.get_item(
|
||||||
|
"moonraker", "job_queue.queue_data", {}
|
||||||
|
)
|
||||||
|
self._from_ser_dict(queue_data)
|
||||||
|
if self.queued_jobs:
|
||||||
|
logging.info(f"queue loaded: {len(self.queued_jobs)} jobs")
|
||||||
|
|
||||||
|
async def _persist_state(self) -> None:
|
||||||
|
queue_data = self._as_ser_dict()
|
||||||
|
await self.db.insert_item("moonraker", "job_queue.queue_data", queue_data)
|
||||||
|
|
||||||
async def _handle_ready(self) -> None:
|
async def _handle_ready(self) -> None:
|
||||||
async with self.lock:
|
async with self.lock:
|
||||||
if not self.load_on_start or not self.queued_jobs:
|
if not self.load_on_start or not self.queued_jobs:
|
||||||
return
|
return
|
||||||
# start a queued print
|
# start a queued print
|
||||||
if self.queue_state in ['ready', 'paused']:
|
if self.queue_state in ["ready", "paused"]:
|
||||||
event_loop = self.server.get_event_loop()
|
event_loop = self.server.get_event_loop()
|
||||||
self._set_queue_state("loading")
|
self._set_queue_state("loading")
|
||||||
self.pop_queue_handle = event_loop.delay_callback(
|
self.pop_queue_handle = event_loop.delay_callback(
|
||||||
1., self._pop_job, False)
|
1.0, self._pop_job, False
|
||||||
|
)
|
||||||
|
|
||||||
async def _handle_shutdown(self) -> None:
|
async def _handle_shutdown(self) -> None:
|
||||||
has_requested_pause = self.pause_requested
|
has_requested_pause = self.pause_requested
|
||||||
|
|
@ -104,7 +121,8 @@ class JobQueue:
|
||||||
event_loop = self.server.get_event_loop()
|
event_loop = self.server.get_event_loop()
|
||||||
self._set_queue_state("loading")
|
self._set_queue_state("loading")
|
||||||
self.pop_queue_handle = event_loop.delay_callback(
|
self.pop_queue_handle = event_loop.delay_callback(
|
||||||
self.job_delay, self._pop_job)
|
self.job_delay, self._pop_job
|
||||||
|
)
|
||||||
|
|
||||||
async def _on_job_abort(self) -> None:
|
async def _on_job_abort(self) -> None:
|
||||||
async with self.lock:
|
async with self.lock:
|
||||||
|
|
@ -119,7 +137,7 @@ class JobQueue:
|
||||||
if not self.queued_jobs:
|
if not self.queued_jobs:
|
||||||
self._set_queue_state("paused")
|
self._set_queue_state("paused")
|
||||||
return
|
return
|
||||||
kapis: KlippyAPI = self.server.lookup_component('klippy_apis')
|
kapis: KlippyAPI = self.server.lookup_component("klippy_apis")
|
||||||
uid, job = list(self.queued_jobs.items())[0]
|
uid, job = list(self.queued_jobs.items())[0]
|
||||||
filename = str(job)
|
filename = str(job)
|
||||||
can_print = await self._check_can_print()
|
can_print = await self._check_can_print()
|
||||||
|
|
@ -134,7 +152,8 @@ class JobQueue:
|
||||||
if self.queue_state != "loading":
|
if self.queue_state != "loading":
|
||||||
self._set_queue_state("paused")
|
self._set_queue_state("paused")
|
||||||
raise self.server.error(
|
raise self.server.error(
|
||||||
"Queue State Changed during Transition Gcode")
|
"Queue State Changed during Transition Gcode"
|
||||||
|
)
|
||||||
self._set_queue_state("starting")
|
self._set_queue_state("starting")
|
||||||
await kapis.start_print(
|
await kapis.start_print(
|
||||||
filename, wait_klippy_started=True, user=job.user
|
filename, wait_klippy_started=True, user=job.user
|
||||||
|
|
@ -153,29 +172,32 @@ class JobQueue:
|
||||||
|
|
||||||
async def _check_can_print(self) -> bool:
|
async def _check_can_print(self) -> bool:
|
||||||
# Query the latest stats
|
# Query the latest stats
|
||||||
kapis: KlippyAPI = self.server.lookup_component('klippy_apis')
|
kapis: KlippyAPI = self.server.lookup_component("klippy_apis")
|
||||||
try:
|
try:
|
||||||
result = await kapis.query_objects({"print_stats": None})
|
result = await kapis.query_objects({"print_stats": None})
|
||||||
except Exception:
|
except Exception:
|
||||||
# Klippy not connected
|
# Klippy not connected
|
||||||
return False
|
return False
|
||||||
if 'print_stats' not in result:
|
if "print_stats" not in result:
|
||||||
return False
|
return False
|
||||||
state: str = result['print_stats']['state']
|
state: str = result["print_stats"]["state"]
|
||||||
if state in ["printing", "paused"]:
|
if state in ["printing", "paused"]:
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
async def queue_job(self,
|
async def queue_job(
|
||||||
|
self,
|
||||||
filenames: Union[str, List[str]],
|
filenames: Union[str, List[str]],
|
||||||
check_exists: bool = True,
|
check_exists: bool = True,
|
||||||
reset: bool = False,
|
reset: bool = False,
|
||||||
user: Optional[UserInfo] = None
|
user: Optional[UserInfo] = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
async with self.lock:
|
async with self.lock:
|
||||||
# Make sure that the file exists
|
# Make sure that the file exists
|
||||||
if isinstance(filenames, str):
|
if isinstance(filenames, str):
|
||||||
filenames = [filenames]
|
filenames = [filenames]
|
||||||
|
# Temporary fix until Mainsail is updated
|
||||||
|
filenames = [fn.removeprefix("/") for fn in filenames]
|
||||||
if check_exists:
|
if check_exists:
|
||||||
# Make sure all files exist before adding them to the queue
|
# Make sure all files exist before adding them to the queue
|
||||||
for fname in filenames:
|
for fname in filenames:
|
||||||
|
|
@ -192,9 +214,8 @@ class JobQueue:
|
||||||
if last_evt.is_printing or last_evt == JobEvent.PAUSED:
|
if last_evt.is_printing or last_evt == JobEvent.PAUSED:
|
||||||
self._set_queue_state("ready")
|
self._set_queue_state("ready")
|
||||||
|
|
||||||
async def delete_job(self,
|
async def delete_job(
|
||||||
job_ids: Union[str, List[str]],
|
self, job_ids: Union[str, List[str]], all: bool = False
|
||||||
all: bool = False
|
|
||||||
) -> None:
|
) -> None:
|
||||||
async with self.lock:
|
async with self.lock:
|
||||||
if not self.queued_jobs:
|
if not self.queued_jobs:
|
||||||
|
|
@ -238,14 +259,30 @@ class JobQueue:
|
||||||
|
|
||||||
def _job_map_to_list(self) -> List[Dict[str, Any]]:
|
def _job_map_to_list(self) -> List[Dict[str, Any]]:
|
||||||
cur_time = time.time()
|
cur_time = time.time()
|
||||||
return [job.as_dict(cur_time) for
|
return [job.as_dict(cur_time) for job in self.queued_jobs.values()]
|
||||||
job in self.queued_jobs.values()]
|
|
||||||
|
def _as_ser_dict(self) -> Dict[str, Any]:
|
||||||
|
return {"queued_jobs": [job.as_ser_dict() for job in self.queued_jobs.values()]}
|
||||||
|
|
||||||
|
def _from_ser_dict(self, data: Dict[str, Any]):
|
||||||
|
self.queued_jobs.clear()
|
||||||
|
queued_jobs = data.get("queued_jobs")
|
||||||
|
if queued_jobs:
|
||||||
|
for job_data in queued_jobs:
|
||||||
|
job = QueuedJob.from_ser_dict(job_data)
|
||||||
|
# check if the file still exists
|
||||||
|
try:
|
||||||
|
self._check_job_file(job.filename)
|
||||||
|
except ServerError:
|
||||||
|
logging.error(f"Queued print file {job.filename} does not exist")
|
||||||
|
continue
|
||||||
|
self.queued_jobs[job.job_id] = job
|
||||||
|
self._send_queue_event("jobs_added")
|
||||||
|
|
||||||
def _check_job_file(self, job_name: str) -> None:
|
def _check_job_file(self, job_name: str) -> None:
|
||||||
fm: FileManager = self.server.lookup_component('file_manager')
|
fm: FileManager = self.server.lookup_component("file_manager")
|
||||||
if not fm.check_file_exists("gcodes", job_name):
|
if not fm.check_file_exists("gcodes", job_name):
|
||||||
raise self.server.error(
|
raise self.server.error(f"G-Code File {job_name} does not exist")
|
||||||
f"G-Code File {job_name} does not exist")
|
|
||||||
|
|
||||||
def _set_queue_state(self, new_state: str) -> None:
|
def _set_queue_state(self, new_state: str) -> None:
|
||||||
if new_state != self.queue_state:
|
if new_state != self.queue_state:
|
||||||
|
|
@ -258,19 +295,24 @@ class JobQueue:
|
||||||
updated_queue = self._job_map_to_list()
|
updated_queue = self._job_map_to_list()
|
||||||
event_loop = self.server.get_event_loop()
|
event_loop = self.server.get_event_loop()
|
||||||
event_loop.delay_callback(
|
event_loop.delay_callback(
|
||||||
.05, self.server.send_event, "job_queue:job_queue_changed",
|
0.05,
|
||||||
|
self.server.send_event,
|
||||||
|
"job_queue:job_queue_changed",
|
||||||
{
|
{
|
||||||
'action': action,
|
"action": action,
|
||||||
'updated_queue': updated_queue,
|
"updated_queue": updated_queue,
|
||||||
'queue_state': self.queue_state
|
"queue_state": self.queue_state,
|
||||||
})
|
},
|
||||||
|
)
|
||||||
|
|
||||||
async def _handle_job_request(
|
# persist queue data
|
||||||
self, web_request: WebRequest
|
event_loop.create_task(self._persist_state())
|
||||||
) -> Dict[str, Any]:
|
|
||||||
|
|
||||||
|
async def _handle_job_request(self, web_request: WebRequest) -> Dict[str, Any]:
|
||||||
req_type = web_request.get_request_type()
|
req_type = web_request.get_request_type()
|
||||||
if req_type == RequestType.POST:
|
if req_type == RequestType.POST:
|
||||||
files = web_request.get_list('filenames')
|
files = web_request.get_list("filenames")
|
||||||
reset = web_request.get_boolean("reset", False)
|
reset = web_request.get_boolean("reset", False)
|
||||||
# Validate that all files exist before queueing
|
# Validate that all files exist before queueing
|
||||||
user = web_request.get_current_user()
|
user = web_request.get_current_user()
|
||||||
|
|
@ -279,40 +321,22 @@ class JobQueue:
|
||||||
if web_request.get_boolean("all", False):
|
if web_request.get_boolean("all", False):
|
||||||
await self.delete_job([], all=True)
|
await self.delete_job([], all=True)
|
||||||
else:
|
else:
|
||||||
job_ids = web_request.get_list('job_ids')
|
job_ids = web_request.get_list("job_ids")
|
||||||
await self.delete_job(job_ids)
|
await self.delete_job(job_ids)
|
||||||
else:
|
else:
|
||||||
raise self.server.error(f"Invalid request type: {req_type}")
|
raise self.server.error(f"Invalid request type: {req_type}")
|
||||||
return {
|
return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state}
|
||||||
'queued_jobs': self._job_map_to_list(),
|
|
||||||
'queue_state': self.queue_state
|
|
||||||
}
|
|
||||||
|
|
||||||
async def _handle_pause_queue(self,
|
async def _handle_pause_queue(self, web_request: WebRequest) -> Dict[str, Any]:
|
||||||
web_request: WebRequest
|
|
||||||
) -> Dict[str, Any]:
|
|
||||||
await self.pause_queue()
|
await self.pause_queue()
|
||||||
return {
|
return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state}
|
||||||
'queued_jobs': self._job_map_to_list(),
|
|
||||||
'queue_state': self.queue_state
|
|
||||||
}
|
|
||||||
|
|
||||||
async def _handle_start_queue(self,
|
async def _handle_start_queue(self, web_request: WebRequest) -> Dict[str, Any]:
|
||||||
web_request: WebRequest
|
|
||||||
) -> Dict[str, Any]:
|
|
||||||
await self.start_queue()
|
await self.start_queue()
|
||||||
return {
|
return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state}
|
||||||
'queued_jobs': self._job_map_to_list(),
|
|
||||||
'queue_state': self.queue_state
|
|
||||||
}
|
|
||||||
|
|
||||||
async def _handle_queue_status(self,
|
async def _handle_queue_status(self, web_request: WebRequest) -> Dict[str, Any]:
|
||||||
web_request: WebRequest
|
return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state}
|
||||||
) -> Dict[str, Any]:
|
|
||||||
return {
|
|
||||||
'queued_jobs': self._job_map_to_list(),
|
|
||||||
'queue_state': self.queue_state
|
|
||||||
}
|
|
||||||
|
|
||||||
async def _handle_jump(self, web_request: WebRequest) -> Dict[str, Any]:
|
async def _handle_jump(self, web_request: WebRequest) -> Dict[str, Any]:
|
||||||
job_id: str = web_request.get("job_id")
|
job_id: str = web_request.get("job_id")
|
||||||
|
|
@ -323,19 +347,26 @@ class JobQueue:
|
||||||
new_queue = {job_id: job}
|
new_queue = {job_id: job}
|
||||||
new_queue.update(self.queued_jobs)
|
new_queue.update(self.queued_jobs)
|
||||||
self.queued_jobs = new_queue
|
self.queued_jobs = new_queue
|
||||||
return {
|
return {"queued_jobs": self._job_map_to_list(), "queue_state": self.queue_state}
|
||||||
'queued_jobs': self._job_map_to_list(),
|
|
||||||
'queue_state': self.queue_state
|
|
||||||
}
|
|
||||||
|
|
||||||
async def close(self):
|
async def close(self):
|
||||||
await self.pause_queue()
|
await self.pause_queue()
|
||||||
|
|
||||||
|
|
||||||
class QueuedJob:
|
class QueuedJob:
|
||||||
def __init__(self, filename: str, user: Optional[UserInfo] = None) -> None:
|
def __init__(
|
||||||
|
self,
|
||||||
|
filename: str,
|
||||||
|
user: Optional[UserInfo] = None,
|
||||||
|
job_id: Optional[str] = None,
|
||||||
|
time_added: Optional[float] = None,
|
||||||
|
) -> None:
|
||||||
self.filename = filename
|
self.filename = filename
|
||||||
self.job_id = f"{id(self):016X}"
|
self.job_id = job_id or str(uuid.uuid4())
|
||||||
|
if time_added is None:
|
||||||
self.time_added = time.time()
|
self.time_added = time.time()
|
||||||
|
else:
|
||||||
|
self.time_added = time_added
|
||||||
self._user = user
|
self._user = user
|
||||||
|
|
||||||
def __str__(self) -> str:
|
def __str__(self) -> str:
|
||||||
|
|
@ -347,11 +378,28 @@ class QueuedJob:
|
||||||
|
|
||||||
def as_dict(self, cur_time: float) -> Dict[str, Any]:
|
def as_dict(self, cur_time: float) -> Dict[str, Any]:
|
||||||
return {
|
return {
|
||||||
'filename': self.filename,
|
"filename": self.filename,
|
||||||
'job_id': self.job_id,
|
"job_id": self.job_id,
|
||||||
'time_added': self.time_added,
|
"time_added": self.time_added,
|
||||||
'time_in_queue': cur_time - self.time_added
|
"time_in_queue": cur_time - self.time_added,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def as_ser_dict(self) -> Dict[str, Any]:
|
||||||
|
return {
|
||||||
|
"filename": self.filename,
|
||||||
|
"job_id": self.job_id,
|
||||||
|
"time_added": self.time_added,
|
||||||
|
}
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_ser_dict(cls, data: Dict[str, Any]) -> Self:
|
||||||
|
return cls(
|
||||||
|
data["filename"],
|
||||||
|
None,
|
||||||
|
data["job_id"],
|
||||||
|
data["time_added"],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def load_component(config: ConfigHelper) -> JobQueue:
|
def load_component(config: ConfigHelper) -> JobQueue:
|
||||||
return JobQueue(config)
|
return JobQueue(config)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue