ucast/ucast/views.py

344 lines
10 KiB
Python

import os
from functools import wraps
from pathlib import Path
from typing import Callable
from django import http
from django.conf import settings
from django.contrib.auth.decorators import login_required
from django.contrib.sites.shortcuts import get_current_site
from django.contrib.syndication.views import add_domain
from django.core.paginator import Paginator
from django.middleware.http import ConditionalGetMiddleware
from django.shortcuts import render
from django.urls import reverse
from django.utils.decorators import decorator_from_middleware
from ucast import feed, forms, queue
from ucast.models import Channel, User, Video
from ucast.service import controller, opml, storage
from ucast.tasks import download
@login_required
def home(request: http.HttpRequest):
channels = Channel.objects.all()
site_url = add_domain(get_current_site(request).domain, "", request.is_secure())
form = forms.AddChannelForm()
if request.method == "POST":
form = forms.AddChannelForm(request.POST)
if form.is_valid():
channel_str = form.cleaned_data["channel_str"]
try:
channel = controller.create_channel(channel_str)
queue.enqueue(download.update_channel, channel.id)
except ValueError:
form.add_error("channel_str", "Channel URL invalid")
except controller.ChannelAlreadyExistsException:
form.add_error("channel_str", "Channel already exists")
if form.is_valid():
return http.HttpResponseRedirect(reverse(home))
return render(
request,
"ucast/channels.html",
{
"channels": channels,
"site_url": site_url,
"form": form,
},
)
@login_required
def videos(request: http.HttpRequest, channel: str):
chan = Channel.objects.get(slug=channel)
if request.method == "POST":
if "activate" in request.POST:
chan.active = True
chan.save()
elif "deactivate" in request.POST:
chan.active = False
chan.save()
elif "delete_video" in request.POST:
form = forms.DeleteVideoForm(request.POST)
if form.is_valid():
id = form.cleaned_data["id"]
controller.delete_video(id)
else:
return http.HttpResponseBadRequest()
elif "delete_channel" in request.POST:
controller.delete_channel(chan.id)
return http.HttpResponseRedirect(reverse(home))
else:
return http.HttpResponseBadRequest()
return http.HttpResponseRedirect(reverse(videos, args=[channel]))
vids = Video.objects.filter(channel=chan, downloaded__isnull=False).order_by(
"-published"
)
site_url = add_domain(get_current_site(request).domain, "", request.is_secure())
page_number = request.GET.get("page")
videos_p = Paginator(vids, 20)
template_name = "ucast/videos.html"
if request.htmx:
template_name = "ucast/videos_items.html"
n_pending = Video.objects.filter(
channel=chan, downloaded__isnull=True, is_deleted=False
).count()
return render(
request,
template_name,
{
"videos": videos_p.get_page(page_number),
"channel": chan,
"site_url": site_url,
"n_pending": n_pending,
},
)
@login_required
def channel_edit(request: http.HttpRequest, channel: str):
chan = Channel.objects.get(slug=channel)
form = forms.EditChannelForm(
initial={
"skip_shorts": chan.skip_shorts,
"skip_livestreams": chan.skip_livestreams,
}
)
if request.method == "POST":
form = forms.EditChannelForm(request.POST)
if form.is_valid():
chan.skip_shorts = form.cleaned_data["skip_shorts"]
chan.skip_livestreams = form.cleaned_data["skip_livestreams"]
chan.save()
return http.HttpResponseRedirect(reverse(videos, args=[channel]))
return render(
request,
"ucast/channel_edit.html",
{
"channel": chan,
"form": form,
},
)
@login_required
def channel_download(request: http.HttpRequest, channel: str):
chan = Channel.objects.get(slug=channel)
if request.method == "POST":
form = forms.DownloadChannelForm(request.POST)
if form.is_valid():
queue.enqueue(
download.download_channel, chan.id, form.cleaned_data["n_videos"]
)
return http.HttpResponseRedirect(reverse(videos, args=[channel]))
return render(
request,
"ucast/channel_download.html",
{
"channel": chan,
"form": forms.DownloadChannelForm(),
},
)
@login_required
def downloads(request: http.HttpRequest):
freg = queue.get_failed_job_registry()
ids = freg.get_job_ids(0, 50)
failed_jobs = freg.job_class.fetch_many(ids, freg.connection, freg.serializer)
downloading_videos = queue.get_downloading_videos(limit=100)
template_name = "ucast/downloads.html"
if request.htmx:
template_name = "ucast/downloads_items.html"
return render(
request,
template_name,
{
"failed_jobs": failed_jobs,
"downloading_videos": downloading_videos,
"n_tasks": queue.get_queue().count,
},
)
@login_required
def error_details(request: http.HttpRequest, job_id: str):
freg = queue.get_failed_job_registry()
job = freg.job_class.fetch(job_id, freg.connection, freg.serializer)
return render(request, "ucast/error_details.html", {"job": job})
@login_required
def download_errors_requeue(request: http.HttpRequest):
form = forms.RequeueForm(request.POST)
if form.is_valid():
freg = queue.get_failed_job_registry()
freg.requeue(str(form.cleaned_data["id"]))
return http.HttpResponseRedirect(reverse(downloads))
@login_required
def download_errors_requeue_all(request: http.HttpRequest):
freg = queue.get_failed_job_registry()
for job_id in freg.get_job_ids():
freg.requeue(job_id)
return http.HttpResponseRedirect(reverse(downloads))
@login_required
def download_errors_delete(request: http.HttpRequest):
form = forms.RequeueForm(request.POST)
if form.is_valid():
freg = queue.get_failed_job_registry()
freg.remove(str(form.cleaned_data["id"]), delete_job=True)
return http.HttpResponseRedirect(reverse(downloads))
@login_required
def download_errors_delete_all(request: http.HttpRequest):
freg = queue.get_failed_job_registry()
for job_id in freg.get_job_ids():
freg.remove(job_id, delete_job=True)
return http.HttpResponseRedirect(reverse(downloads))
@login_required
def channels_opml(request: http.HttpRequest):
response = http.HttpResponse(
content_type="application/xml",
headers={"Content-Disposition": "attachment; filename=ucast_channels.opml"},
)
site_url = add_domain(get_current_site(request).domain, "", request.is_secure())
opml.write_channels_opml(
Channel.objects.all(), site_url, request.user.get_feed_key(), response
)
return response
@login_required
def search(request: http.HttpRequest):
query = request.GET.get("q")
vids = []
if query:
vids = Video.objects.filter(downloaded__isnull=False, title__icontains=query)[
:30
]
return render(request, "ucast/search.html", {"query": query, "videos": vids})
def _channel_file(channel: str, get_file: Callable[[storage.ChannelFolder], Path]):
store = storage.Storage()
try:
cf = store.get_channel_folder(channel)
except FileNotFoundError:
raise http.Http404
file_path = get_file(cf)
if not os.path.isfile(file_path):
raise http.Http404
if not settings.INTERNAL_REDIRECT_HEADER:
return http.FileResponse(open(file_path, "rb"))
file_path_rel = file_path.relative_to(store.dir_data)
url_path_internal = f"/{settings.INTERNAL_FILES_ROOT}/{file_path_rel.as_posix()}"
response = http.HttpResponse()
response.headers[settings.INTERNAL_REDIRECT_HEADER] = url_path_internal
# Content type is set to text/html by default and has to be unset
del response.headers["Content-Type"]
return response
def login_or_key_required(function):
def decorator(view_func):
@wraps(view_func)
def _wrapped_view(request, *args, **kwargs):
key = request.GET.get("key")
if key:
try:
request.user = User.objects.get(feed_key=key)
except User.DoesNotExist:
pass
if request.user.is_authenticated:
return view_func(request, *args, **kwargs)
return http.HttpResponse("401 Unauthorized", status=401)
return _wrapped_view
return decorator(function)
@login_or_key_required
def audio(request: http.HttpRequest, channel: str, video: str):
# Trim off file extension
video_slug = video.rsplit(".")[0]
return _channel_file(channel, lambda cf: cf.get_audio(video_slug))
@login_or_key_required
def cover(request: http.HttpRequest, channel: str, video: str):
# Trim off file extension
video_slug = video.rsplit(".")[0]
return _channel_file(channel, lambda cf: cf.get_cover(video_slug))
@login_or_key_required
def thumbnail(request: http.HttpRequest, channel: str, video: str):
# Trim off file extension
video_slug = video.rsplit(".")[0]
is_sm = "sm" in request.GET
return _channel_file(channel, lambda cf: cf.get_thumbnail(video_slug, is_sm))
@login_or_key_required
def avatar(request: http.HttpRequest, channel: str):
# Trim off file extension
channel_slug = channel.rsplit(".")[0]
is_sm = "sm" in request.GET
if is_sm:
return _channel_file(channel_slug, lambda cf: cf.file_avatar_sm)
return _channel_file(channel_slug, lambda cf: cf.file_avatar)
@login_or_key_required
@decorator_from_middleware(ConditionalGetMiddleware)
def podcast_feed(request: http.HttpRequest, *args, **kwargs):
return feed.UcastFeed()(request, *args, **kwargs)