ucast/ucast/queue.py

115 lines
3.5 KiB
Python

import redis
import rq
import rq_scheduler
from django.conf import settings
from django.db.models import ObjectDoesNotExist
from rq import registry
from ucast.models import Video
from ucast.service import util
def get_redis_connection() -> redis.client.Redis:
return redis.Redis.from_url(settings.REDIS_URL)
def get_queue() -> rq.Queue:
redis_conn = get_redis_connection()
return rq.Queue(default_timeout=settings.REDIS_QUEUE_TIMEOUT, connection=redis_conn)
def get_scheduler(interval=60) -> rq_scheduler.Scheduler:
redis_conn = get_redis_connection()
return rq_scheduler.Scheduler(connection=redis_conn, interval=interval)
def get_worker(**kwargs) -> rq.Worker:
queue = get_queue()
return rq.Worker(
queue,
connection=queue.connection,
default_result_ttl=settings.REDIS_QUEUE_RESULT_TTL,
**kwargs,
)
def enqueue(f, *args, **kwargs) -> rq.job.Job:
queue = get_queue()
return queue.enqueue(f, *args, **kwargs)
def get_statistics() -> dict:
"""
Return statistics from the RQ Queue.
Taken from the django-rq package by Selwin Ong (MIT License)
https://github.com/rq/django-rq
:return: RQ statistics
"""
queue = get_queue()
connection = queue.connection
connection_kwargs = connection.connection_pool.connection_kwargs
# Raw access to the first item from left of the redis list.
# This might not be accurate since new job can be added from the left
# with `at_front` parameters.
# Ideally rq should supports Queue.oldest_job
last_job_id = connection.lindex(queue.key, 0)
last_job = queue.fetch_job(last_job_id.decode("utf-8")) if last_job_id else None
if last_job:
oldest_job_timestamp = util.to_localtime(last_job.enqueued_at).strftime(
"%Y-%m-%d, %H:%M:%S"
)
else:
oldest_job_timestamp = "-"
# parse_class and connection_pool are not needed and not JSON serializable
connection_kwargs.pop("parser_class", None)
connection_kwargs.pop("connection_pool", None)
finished_job_registry = registry.FinishedJobRegistry(queue.name, queue.connection)
started_job_registry = registry.StartedJobRegistry(queue.name, queue.connection)
deferred_job_registry = registry.DeferredJobRegistry(queue.name, queue.connection)
failed_job_registry = registry.FailedJobRegistry(queue.name, queue.connection)
scheduled_job_registry = registry.ScheduledJobRegistry(queue.name, queue.connection)
return {
"name": queue.name,
"jobs": queue.count,
"oldest_job_timestamp": oldest_job_timestamp,
"connection_kwargs": connection_kwargs,
"workers": rq.Worker.count(queue=queue),
"finished_jobs": len(finished_job_registry),
"started_jobs": len(started_job_registry),
"deferred_jobs": len(deferred_job_registry),
"failed_jobs": len(failed_job_registry),
"scheduled_jobs": len(scheduled_job_registry),
}
def get_failed_job_registry():
queue = get_queue()
return registry.FailedJobRegistry(queue.name, queue.connection)
def get_downloading_videos(offset=0, limit=-1):
queue = get_queue()
v_ids = set()
for job in queue.get_jobs(offset, limit):
if (
job.func_name == "ucast.tasks.download.download_video"
and job.args
and job.args[0] > 0
):
v_ids.add(job.args[0])
videos = []
for v_id in v_ids:
try:
videos.append(Video.objects.get(id=v_id))
except ObjectDoesNotExist:
pass
return videos