""" Based on the django-rq package by Selwin Ong (MIT License) https://github.com/rq/django-rq """ import os from django.core.management.base import BaseCommand from rq_scheduler.utils import setup_loghandlers from ucast import queue from ucast.tasks import schedule class Command(BaseCommand): """Runs RQ Scheduler""" help = __doc__ def add_arguments(self, parser): parser.add_argument( "--pid", action="store", dest="pid", default=None, help="PID file to write the scheduler`s pid into", ) parser.add_argument( "--interval", "-i", type=int, dest="interval", default=60, help="""How often the scheduler checks for new jobs to add to the queue (in seconds).""", ) def handle(self, *args, **options): schedule.clear_scheduled_jobs() schedule.register_scheduled_jobs() pid = options.get("pid") if pid: with open(os.path.expanduser(pid), "w") as fp: fp.write(str(os.getpid())) # Verbosity is defined by default in BaseCommand for all commands verbosity = options.get("verbosity") if verbosity >= 2: level = "DEBUG" elif verbosity == 0: level = "WARNING" else: level = "INFO" setup_loghandlers(level) scheduler = queue.get_scheduler(options.get("interval")) scheduler.run()