344 lines
10 KiB
Python
344 lines
10 KiB
Python
import os
|
|
from functools import wraps
|
|
from pathlib import Path
|
|
from typing import Callable
|
|
|
|
from django import http
|
|
from django.conf import settings
|
|
from django.contrib.auth.decorators import login_required
|
|
from django.contrib.sites.shortcuts import get_current_site
|
|
from django.contrib.syndication.views import add_domain
|
|
from django.core.paginator import Paginator
|
|
from django.middleware.http import ConditionalGetMiddleware
|
|
from django.shortcuts import render
|
|
from django.urls import reverse
|
|
from django.utils.decorators import decorator_from_middleware
|
|
|
|
from ucast import feed, forms, queue
|
|
from ucast.models import Channel, User, Video
|
|
from ucast.service import controller, opml, storage
|
|
from ucast.tasks import download
|
|
|
|
|
|
@login_required
|
|
def home(request: http.HttpRequest):
|
|
channels = Channel.objects.all()
|
|
|
|
site_url = add_domain(get_current_site(request).domain, "", request.is_secure())
|
|
form = forms.AddChannelForm()
|
|
|
|
if request.method == "POST":
|
|
form = forms.AddChannelForm(request.POST)
|
|
if form.is_valid():
|
|
channel_str = form.cleaned_data["channel_str"]
|
|
try:
|
|
channel = controller.create_channel(channel_str)
|
|
queue.enqueue(download.update_channel, channel.id)
|
|
except ValueError:
|
|
form.add_error("channel_str", "Channel URL invalid")
|
|
except controller.ChannelAlreadyExistsException:
|
|
form.add_error("channel_str", "Channel already exists")
|
|
|
|
if form.is_valid():
|
|
return http.HttpResponseRedirect(reverse(home))
|
|
|
|
return render(
|
|
request,
|
|
"ucast/channels.html",
|
|
{
|
|
"channels": channels,
|
|
"site_url": site_url,
|
|
"form": form,
|
|
},
|
|
)
|
|
|
|
|
|
@login_required
|
|
def videos(request: http.HttpRequest, channel: str):
|
|
chan = Channel.objects.get(slug=channel)
|
|
|
|
if request.method == "POST":
|
|
if "activate" in request.POST:
|
|
chan.active = True
|
|
chan.save()
|
|
elif "deactivate" in request.POST:
|
|
chan.active = False
|
|
chan.save()
|
|
elif "delete_video" in request.POST:
|
|
form = forms.DeleteVideoForm(request.POST)
|
|
if form.is_valid():
|
|
id = form.cleaned_data["id"]
|
|
controller.delete_video(id)
|
|
else:
|
|
return http.HttpResponseBadRequest()
|
|
elif "delete_channel" in request.POST:
|
|
controller.delete_channel(chan.id)
|
|
return http.HttpResponseRedirect(reverse(home))
|
|
else:
|
|
return http.HttpResponseBadRequest()
|
|
|
|
return http.HttpResponseRedirect(reverse(videos, args=[channel]))
|
|
|
|
vids = Video.objects.filter(channel=chan, downloaded__isnull=False).order_by(
|
|
"-published"
|
|
)
|
|
site_url = add_domain(get_current_site(request).domain, "", request.is_secure())
|
|
|
|
page_number = request.GET.get("page")
|
|
videos_p = Paginator(vids, 20)
|
|
|
|
template_name = "ucast/videos.html"
|
|
if request.htmx:
|
|
template_name = "ucast/videos_items.html"
|
|
|
|
n_pending = Video.objects.filter(
|
|
channel=chan, downloaded__isnull=True, is_deleted=False
|
|
).count()
|
|
|
|
return render(
|
|
request,
|
|
template_name,
|
|
{
|
|
"videos": videos_p.get_page(page_number),
|
|
"channel": chan,
|
|
"site_url": site_url,
|
|
"n_pending": n_pending,
|
|
},
|
|
)
|
|
|
|
|
|
@login_required
|
|
def channel_edit(request: http.HttpRequest, channel: str):
|
|
chan = Channel.objects.get(slug=channel)
|
|
form = forms.EditChannelForm(
|
|
initial={
|
|
"skip_shorts": chan.skip_shorts,
|
|
"skip_livestreams": chan.skip_livestreams,
|
|
}
|
|
)
|
|
|
|
if request.method == "POST":
|
|
form = forms.EditChannelForm(request.POST)
|
|
if form.is_valid():
|
|
chan.skip_shorts = form.cleaned_data["skip_shorts"]
|
|
chan.skip_livestreams = form.cleaned_data["skip_livestreams"]
|
|
chan.save()
|
|
|
|
return http.HttpResponseRedirect(reverse(videos, args=[channel]))
|
|
|
|
return render(
|
|
request,
|
|
"ucast/channel_edit.html",
|
|
{
|
|
"channel": chan,
|
|
"form": form,
|
|
},
|
|
)
|
|
|
|
|
|
@login_required
|
|
def channel_download(request: http.HttpRequest, channel: str):
|
|
chan = Channel.objects.get(slug=channel)
|
|
|
|
if request.method == "POST":
|
|
form = forms.DownloadChannelForm(request.POST)
|
|
if form.is_valid():
|
|
queue.enqueue(
|
|
download.download_channel, chan.id, form.cleaned_data["n_videos"]
|
|
)
|
|
return http.HttpResponseRedirect(reverse(videos, args=[channel]))
|
|
|
|
return render(
|
|
request,
|
|
"ucast/channel_download.html",
|
|
{
|
|
"channel": chan,
|
|
"form": forms.DownloadChannelForm(),
|
|
},
|
|
)
|
|
|
|
|
|
@login_required
|
|
def downloads(request: http.HttpRequest):
|
|
freg = queue.get_failed_job_registry()
|
|
ids = freg.get_job_ids(0, 50)
|
|
failed_jobs = freg.job_class.fetch_many(ids, freg.connection, freg.serializer)
|
|
|
|
downloading_videos = queue.get_downloading_videos(limit=100)
|
|
|
|
template_name = "ucast/downloads.html"
|
|
if request.htmx:
|
|
template_name = "ucast/downloads_items.html"
|
|
|
|
return render(
|
|
request,
|
|
template_name,
|
|
{
|
|
"failed_jobs": failed_jobs,
|
|
"downloading_videos": downloading_videos,
|
|
"n_tasks": queue.get_queue().count,
|
|
},
|
|
)
|
|
|
|
|
|
@login_required
|
|
def error_details(request: http.HttpRequest, job_id: str):
|
|
freg = queue.get_failed_job_registry()
|
|
job = freg.job_class.fetch(job_id, freg.connection, freg.serializer)
|
|
|
|
return render(request, "ucast/error_details.html", {"job": job})
|
|
|
|
|
|
@login_required
|
|
def download_errors_requeue(request: http.HttpRequest):
|
|
form = forms.RequeueForm(request.POST)
|
|
|
|
if form.is_valid():
|
|
freg = queue.get_failed_job_registry()
|
|
freg.requeue(str(form.cleaned_data["id"]))
|
|
|
|
return http.HttpResponseRedirect(reverse(downloads))
|
|
|
|
|
|
@login_required
|
|
def download_errors_requeue_all(request: http.HttpRequest):
|
|
freg = queue.get_failed_job_registry()
|
|
for job_id in freg.get_job_ids():
|
|
freg.requeue(job_id)
|
|
|
|
return http.HttpResponseRedirect(reverse(downloads))
|
|
|
|
|
|
@login_required
|
|
def download_errors_delete(request: http.HttpRequest):
|
|
form = forms.RequeueForm(request.POST)
|
|
|
|
if form.is_valid():
|
|
freg = queue.get_failed_job_registry()
|
|
freg.remove(str(form.cleaned_data["id"]), delete_job=True)
|
|
|
|
return http.HttpResponseRedirect(reverse(downloads))
|
|
|
|
|
|
@login_required
|
|
def download_errors_delete_all(request: http.HttpRequest):
|
|
freg = queue.get_failed_job_registry()
|
|
for job_id in freg.get_job_ids():
|
|
freg.remove(job_id, delete_job=True)
|
|
|
|
return http.HttpResponseRedirect(reverse(downloads))
|
|
|
|
|
|
@login_required
|
|
def channels_opml(request: http.HttpRequest):
|
|
response = http.HttpResponse(
|
|
content_type="application/xml",
|
|
headers={"Content-Disposition": "attachment; filename=ucast_channels.opml"},
|
|
)
|
|
site_url = add_domain(get_current_site(request).domain, "", request.is_secure())
|
|
opml.write_channels_opml(
|
|
Channel.objects.all(), site_url, request.user.get_feed_key(), response
|
|
)
|
|
return response
|
|
|
|
|
|
@login_required
|
|
def search(request: http.HttpRequest):
|
|
query = request.GET.get("q")
|
|
vids = []
|
|
if query:
|
|
vids = Video.objects.filter(downloaded__isnull=False, title__icontains=query)[
|
|
:30
|
|
]
|
|
|
|
return render(request, "ucast/search.html", {"query": query, "videos": vids})
|
|
|
|
|
|
def _channel_file(channel: str, get_file: Callable[[storage.ChannelFolder], Path]):
|
|
store = storage.Storage()
|
|
|
|
try:
|
|
cf = store.get_channel_folder(channel)
|
|
except FileNotFoundError:
|
|
raise http.Http404
|
|
|
|
file_path = get_file(cf)
|
|
if not os.path.isfile(file_path):
|
|
raise http.Http404
|
|
|
|
if not settings.INTERNAL_REDIRECT_HEADER:
|
|
return http.FileResponse(open(file_path, "rb"))
|
|
|
|
file_path_rel = file_path.relative_to(store.dir_data)
|
|
url_path_internal = f"/{settings.INTERNAL_FILES_ROOT}/{file_path_rel.as_posix()}"
|
|
|
|
response = http.HttpResponse()
|
|
response.headers[settings.INTERNAL_REDIRECT_HEADER] = url_path_internal
|
|
# Content type is set to text/html by default and has to be unset
|
|
del response.headers["Content-Type"]
|
|
return response
|
|
|
|
|
|
def login_or_key_required(function):
|
|
def decorator(view_func):
|
|
@wraps(view_func)
|
|
def _wrapped_view(request, *args, **kwargs):
|
|
key = request.GET.get("key")
|
|
if key:
|
|
try:
|
|
request.user = User.objects.get(feed_key=key)
|
|
except User.DoesNotExist:
|
|
pass
|
|
|
|
if request.user.is_authenticated:
|
|
return view_func(request, *args, **kwargs)
|
|
|
|
return http.HttpResponse("401 Unauthorized", status=401)
|
|
|
|
return _wrapped_view
|
|
|
|
return decorator(function)
|
|
|
|
|
|
@login_or_key_required
|
|
def audio(request: http.HttpRequest, channel: str, video: str):
|
|
# Trim off file extension
|
|
video_slug = video.rsplit(".")[0]
|
|
|
|
return _channel_file(channel, lambda cf: cf.get_audio(video_slug))
|
|
|
|
|
|
@login_or_key_required
|
|
def cover(request: http.HttpRequest, channel: str, video: str):
|
|
# Trim off file extension
|
|
video_slug = video.rsplit(".")[0]
|
|
|
|
return _channel_file(channel, lambda cf: cf.get_cover(video_slug))
|
|
|
|
|
|
@login_or_key_required
|
|
def thumbnail(request: http.HttpRequest, channel: str, video: str):
|
|
# Trim off file extension
|
|
video_slug = video.rsplit(".")[0]
|
|
|
|
is_sm = "sm" in request.GET
|
|
|
|
return _channel_file(channel, lambda cf: cf.get_thumbnail(video_slug, is_sm))
|
|
|
|
|
|
@login_or_key_required
|
|
def avatar(request: http.HttpRequest, channel: str):
|
|
# Trim off file extension
|
|
channel_slug = channel.rsplit(".")[0]
|
|
|
|
is_sm = "sm" in request.GET
|
|
|
|
if is_sm:
|
|
return _channel_file(channel_slug, lambda cf: cf.file_avatar_sm)
|
|
return _channel_file(channel_slug, lambda cf: cf.file_avatar)
|
|
|
|
|
|
@login_or_key_required
|
|
@decorator_from_middleware(ConditionalGetMiddleware)
|
|
def podcast_feed(request: http.HttpRequest, *args, **kwargs):
|
|
return feed.UcastFeed()(request, *args, **kwargs)
|