103 lines
2.8 KiB
Python
103 lines
2.8 KiB
Python
"""
|
|
Based on the django-rq package by Selwin Ong (MIT License)
|
|
https://github.com/rq/django-rq
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
|
|
from django.core.management.base import BaseCommand
|
|
from django.db import connections
|
|
from redis.exceptions import ConnectionError
|
|
from rq import use_connection
|
|
from rq.logutils import setup_loghandlers
|
|
|
|
from ucast import queue
|
|
|
|
|
|
def reset_db_connections():
|
|
for c in connections.all():
|
|
c.close()
|
|
|
|
|
|
class Command(BaseCommand):
|
|
"""Runs RQ worker"""
|
|
|
|
help = __doc__
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument(
|
|
"--pid",
|
|
action="store",
|
|
dest="pid",
|
|
default=None,
|
|
help="PID file to write the worker`s pid into",
|
|
)
|
|
parser.add_argument(
|
|
"--burst",
|
|
action="store_true",
|
|
dest="burst",
|
|
default=False,
|
|
help="Run worker in burst mode",
|
|
)
|
|
parser.add_argument(
|
|
"--with-scheduler",
|
|
action="store_true",
|
|
dest="with_scheduler",
|
|
default=False,
|
|
help="Run worker with scheduler enabled",
|
|
)
|
|
parser.add_argument(
|
|
"--name",
|
|
action="store",
|
|
dest="name",
|
|
default=None,
|
|
help="Name of the worker",
|
|
)
|
|
parser.add_argument(
|
|
"--worker-ttl",
|
|
action="store",
|
|
type=int,
|
|
dest="worker_ttl",
|
|
default=420,
|
|
help="Default worker timeout to be used",
|
|
)
|
|
|
|
def handle(self, *args, **options):
|
|
pid = options.get("pid")
|
|
if pid:
|
|
with open(os.path.expanduser(pid), "w") as fp:
|
|
fp.write(str(os.getpid()))
|
|
|
|
# Verbosity is defined by default in BaseCommand for all commands
|
|
verbosity = options.get("verbosity")
|
|
if verbosity >= 2:
|
|
level = "DEBUG"
|
|
elif verbosity == 0:
|
|
level = "WARNING"
|
|
else:
|
|
level = "INFO"
|
|
setup_loghandlers(level)
|
|
|
|
try:
|
|
# Instantiate a worker
|
|
worker_kwargs = {
|
|
"name": options["name"],
|
|
"default_worker_ttl": options["worker_ttl"],
|
|
}
|
|
w = queue.get_worker(**worker_kwargs)
|
|
|
|
# Call use_connection to push the redis connection into LocalStack
|
|
# without this, jobs using RQ's get_current_job() will fail
|
|
use_connection(w.connection)
|
|
# Close any opened DB connection before any fork
|
|
reset_db_connections()
|
|
|
|
w.work(
|
|
burst=options.get("burst", False),
|
|
with_scheduler=options.get("with_scheduler", False),
|
|
logging_level=level,
|
|
)
|
|
except ConnectionError as e:
|
|
self.stderr.write(str(e))
|
|
sys.exit(1)
|