ucast/ucast/management/commands/rqenqueue.py
2022-05-21 18:36:29 +02:00

34 lines
980 B
Python

"""
Based on the django-rq package by Selwin Ong (MIT License)
https://github.com/rq/django-rq
"""
from django.core.management.base import BaseCommand
from ucast import queue
class Command(BaseCommand):
"""Queue a function with the given arguments."""
help = __doc__
args = "<function arg arg ...>"
def add_arguments(self, parser):
parser.add_argument(
"--timeout", "-t", type=int, dest="timeout", help="A timeout in seconds"
)
parser.add_argument("args", nargs="*")
def handle(self, *args, **options):
"""
Queues the function given with the first argument with the
parameters given with the rest of the argument list.
"""
verbosity = int(options.get("verbosity", 1))
timeout = options.get("timeout")
q = queue.get_queue()
job = q.enqueue_call(args[0], args=args[1:], timeout=timeout)
if verbosity:
print("Job %s created" % job.id)