ucast/ucast/management/commands/rqworker.py
2022-05-21 18:36:29 +02:00

103 lines
2.8 KiB
Python

"""
Based on the django-rq package by Selwin Ong (MIT License)
https://github.com/rq/django-rq
"""
import os
import sys
from django.core.management.base import BaseCommand
from django.db import connections
from redis.exceptions import ConnectionError
from rq import use_connection
from rq.logutils import setup_loghandlers
from ucast import queue
def reset_db_connections():
for c in connections.all():
c.close()
class Command(BaseCommand):
"""Runs RQ worker"""
help = __doc__
def add_arguments(self, parser):
parser.add_argument(
"--pid",
action="store",
dest="pid",
default=None,
help="PID file to write the worker`s pid into",
)
parser.add_argument(
"--burst",
action="store_true",
dest="burst",
default=False,
help="Run worker in burst mode",
)
parser.add_argument(
"--with-scheduler",
action="store_true",
dest="with_scheduler",
default=False,
help="Run worker with scheduler enabled",
)
parser.add_argument(
"--name",
action="store",
dest="name",
default=None,
help="Name of the worker",
)
parser.add_argument(
"--worker-ttl",
action="store",
type=int,
dest="worker_ttl",
default=420,
help="Default worker timeout to be used",
)
def handle(self, *args, **options):
pid = options.get("pid")
if pid:
with open(os.path.expanduser(pid), "w") as fp:
fp.write(str(os.getpid()))
# Verbosity is defined by default in BaseCommand for all commands
verbosity = options.get("verbosity")
if verbosity >= 2:
level = "DEBUG"
elif verbosity == 0:
level = "WARNING"
else:
level = "INFO"
setup_loghandlers(level)
try:
# Instantiate a worker
worker_kwargs = {
"name": options["name"],
"default_worker_ttl": options["worker_ttl"],
}
w = queue.get_worker(**worker_kwargs)
# Call use_connection to push the redis connection into LocalStack
# without this, jobs using RQ's get_current_job() will fail
use_connection(w.connection)
# Close any opened DB connection before any fork
reset_db_connections()
w.work(
burst=options.get("burst", False),
with_scheduler=options.get("with_scheduler", False),
logging_level=level,
)
except ConnectionError as e:
self.stderr.write(str(e))
sys.exit(1)