115 lines
3.5 KiB
Python
115 lines
3.5 KiB
Python
import redis
|
|
import rq
|
|
import rq_scheduler
|
|
from django.conf import settings
|
|
from django.db.models import ObjectDoesNotExist
|
|
from rq import registry
|
|
|
|
from ucast.models import Video
|
|
from ucast.service import util
|
|
|
|
|
|
def get_redis_connection() -> redis.client.Redis:
|
|
return redis.Redis.from_url(settings.REDIS_URL)
|
|
|
|
|
|
def get_queue() -> rq.Queue:
|
|
redis_conn = get_redis_connection()
|
|
return rq.Queue(default_timeout=settings.REDIS_QUEUE_TIMEOUT, connection=redis_conn)
|
|
|
|
|
|
def get_scheduler(interval=60) -> rq_scheduler.Scheduler:
|
|
redis_conn = get_redis_connection()
|
|
return rq_scheduler.Scheduler(connection=redis_conn, interval=interval)
|
|
|
|
|
|
def get_worker(**kwargs) -> rq.Worker:
|
|
queue = get_queue()
|
|
return rq.Worker(
|
|
queue,
|
|
connection=queue.connection,
|
|
default_result_ttl=settings.REDIS_QUEUE_RESULT_TTL,
|
|
**kwargs,
|
|
)
|
|
|
|
|
|
def enqueue(f, *args, **kwargs) -> rq.job.Job:
|
|
queue = get_queue()
|
|
return queue.enqueue(f, *args, **kwargs)
|
|
|
|
|
|
def get_statistics() -> dict:
|
|
"""
|
|
Return statistics from the RQ Queue.
|
|
|
|
Taken from the django-rq package by Selwin Ong (MIT License)
|
|
https://github.com/rq/django-rq
|
|
|
|
:return: RQ statistics
|
|
"""
|
|
queue = get_queue()
|
|
connection = queue.connection
|
|
connection_kwargs = connection.connection_pool.connection_kwargs
|
|
|
|
# Raw access to the first item from left of the redis list.
|
|
# This might not be accurate since new job can be added from the left
|
|
# with `at_front` parameters.
|
|
# Ideally rq should supports Queue.oldest_job
|
|
last_job_id = connection.lindex(queue.key, 0)
|
|
last_job = queue.fetch_job(last_job_id.decode("utf-8")) if last_job_id else None
|
|
if last_job:
|
|
oldest_job_timestamp = util.to_localtime(last_job.enqueued_at).strftime(
|
|
"%Y-%m-%d, %H:%M:%S"
|
|
)
|
|
else:
|
|
oldest_job_timestamp = "-"
|
|
|
|
# parse_class and connection_pool are not needed and not JSON serializable
|
|
connection_kwargs.pop("parser_class", None)
|
|
connection_kwargs.pop("connection_pool", None)
|
|
|
|
finished_job_registry = registry.FinishedJobRegistry(queue.name, queue.connection)
|
|
started_job_registry = registry.StartedJobRegistry(queue.name, queue.connection)
|
|
deferred_job_registry = registry.DeferredJobRegistry(queue.name, queue.connection)
|
|
failed_job_registry = registry.FailedJobRegistry(queue.name, queue.connection)
|
|
scheduled_job_registry = registry.ScheduledJobRegistry(queue.name, queue.connection)
|
|
|
|
return {
|
|
"name": queue.name,
|
|
"jobs": queue.count,
|
|
"oldest_job_timestamp": oldest_job_timestamp,
|
|
"connection_kwargs": connection_kwargs,
|
|
"workers": rq.Worker.count(queue=queue),
|
|
"finished_jobs": len(finished_job_registry),
|
|
"started_jobs": len(started_job_registry),
|
|
"deferred_jobs": len(deferred_job_registry),
|
|
"failed_jobs": len(failed_job_registry),
|
|
"scheduled_jobs": len(scheduled_job_registry),
|
|
}
|
|
|
|
|
|
def get_failed_job_registry():
|
|
queue = get_queue()
|
|
return registry.FailedJobRegistry(queue.name, queue.connection)
|
|
|
|
|
|
def get_downloading_videos(offset=0, limit=-1):
|
|
queue = get_queue()
|
|
v_ids = set()
|
|
|
|
for job in queue.get_jobs(offset, limit):
|
|
if (
|
|
job.func_name == "ucast.tasks.download.download_video"
|
|
and job.args
|
|
and job.args[0] > 0
|
|
):
|
|
v_ids.add(job.args[0])
|
|
|
|
videos = []
|
|
for v_id in v_ids:
|
|
try:
|
|
videos.append(Video.objects.get(id=v_id))
|
|
except ObjectDoesNotExist:
|
|
pass
|
|
|
|
return videos
|