import os from functools import wraps from pathlib import Path from typing import Callable from django import http from django.conf import settings from django.contrib.auth.decorators import login_required from django.contrib.sites.shortcuts import get_current_site from django.contrib.syndication.views import add_domain from django.core.paginator import Paginator from django.middleware.http import ConditionalGetMiddleware from django.shortcuts import render from django.urls import reverse from django.utils.decorators import decorator_from_middleware from ucast import feed, forms, queue from ucast.models import Channel, User, Video from ucast.service import controller, opml, storage from ucast.tasks import download @login_required def home(request: http.HttpRequest): channels = Channel.objects.all() site_url = add_domain(get_current_site(request).domain, "", request.is_secure()) form = forms.AddChannelForm() if request.method == "POST": form = forms.AddChannelForm(request.POST) if form.is_valid(): channel_str = form.cleaned_data["channel_str"] try: channel = controller.create_channel(channel_str) queue.enqueue(download.update_channel, channel.id) except ValueError: form.add_error("channel_str", "Channel URL invalid") except controller.ChannelAlreadyExistsException: form.add_error("channel_str", "Channel already exists") if form.is_valid(): return http.HttpResponseRedirect(reverse(home)) return render( request, "ucast/channels.html", { "channels": channels, "site_url": site_url, "form": form, }, ) @login_required def videos(request: http.HttpRequest, channel: str): chan = Channel.objects.get(slug=channel) if request.method == "POST": if "activate" in request.POST: chan.active = True chan.save() elif "deactivate" in request.POST: chan.active = False chan.save() elif "delete_video" in request.POST: form = forms.DeleteVideoForm(request.POST) if form.is_valid(): id = form.cleaned_data["id"] controller.delete_video(id) else: return http.HttpResponseBadRequest() elif "delete_channel" in request.POST: controller.delete_channel(chan.id) return http.HttpResponseRedirect(reverse(home)) else: return http.HttpResponseBadRequest() return http.HttpResponseRedirect(reverse(videos, args=[channel])) vids = Video.objects.filter(channel=chan, downloaded__isnull=False).order_by( "-published" ) site_url = add_domain(get_current_site(request).domain, "", request.is_secure()) page_number = request.GET.get("page") videos_p = Paginator(vids, 20) template_name = "ucast/videos.html" if request.htmx: template_name = "ucast/videos_items.html" n_pending = Video.objects.filter( channel=chan, downloaded__isnull=True, is_deleted=False ).count() return render( request, template_name, { "videos": videos_p.get_page(page_number), "channel": chan, "site_url": site_url, "n_pending": n_pending, }, ) @login_required def channel_edit(request: http.HttpRequest, channel: str): chan = Channel.objects.get(slug=channel) form = forms.EditChannelForm( initial={ "skip_shorts": chan.skip_shorts, "skip_livestreams": chan.skip_livestreams, } ) if request.method == "POST": form = forms.EditChannelForm(request.POST) if form.is_valid(): chan.skip_shorts = form.cleaned_data["skip_shorts"] chan.skip_livestreams = form.cleaned_data["skip_livestreams"] chan.save() return http.HttpResponseRedirect(reverse(videos, args=[channel])) return render( request, "ucast/channel_edit.html", { "channel": chan, "form": form, }, ) @login_required def channel_download(request: http.HttpRequest, channel: str): chan = Channel.objects.get(slug=channel) if request.method == "POST": form = forms.DownloadChannelForm(request.POST) if form.is_valid(): queue.enqueue( download.download_channel, chan.id, form.cleaned_data["n_videos"] ) return http.HttpResponseRedirect(reverse(videos, args=[channel])) return render( request, "ucast/channel_download.html", { "channel": chan, "form": forms.DownloadChannelForm(), }, ) @login_required def downloads(request: http.HttpRequest): freg = queue.get_failed_job_registry() ids = freg.get_job_ids(0, 50) failed_jobs = freg.job_class.fetch_many(ids, freg.connection, freg.serializer) downloading_videos = queue.get_downloading_videos(limit=100) template_name = "ucast/downloads.html" if request.htmx: template_name = "ucast/downloads_items.html" return render( request, template_name, { "failed_jobs": failed_jobs, "downloading_videos": downloading_videos, "n_tasks": queue.get_queue().count, }, ) @login_required def error_details(request: http.HttpRequest, job_id: str): freg = queue.get_failed_job_registry() job = freg.job_class.fetch(job_id, freg.connection, freg.serializer) return render(request, "ucast/error_details.html", {"job": job}) @login_required def download_errors_requeue(request: http.HttpRequest): form = forms.RequeueForm(request.POST) if form.is_valid(): freg = queue.get_failed_job_registry() freg.requeue(str(form.cleaned_data["id"])) return http.HttpResponseRedirect(reverse(downloads)) @login_required def download_errors_requeue_all(request: http.HttpRequest): freg = queue.get_failed_job_registry() for job_id in freg.get_job_ids(): freg.requeue(job_id) return http.HttpResponseRedirect(reverse(downloads)) @login_required def download_errors_delete(request: http.HttpRequest): form = forms.RequeueForm(request.POST) if form.is_valid(): freg = queue.get_failed_job_registry() freg.remove(str(form.cleaned_data["id"]), delete_job=True) return http.HttpResponseRedirect(reverse(downloads)) @login_required def download_errors_delete_all(request: http.HttpRequest): freg = queue.get_failed_job_registry() for job_id in freg.get_job_ids(): freg.remove(job_id, delete_job=True) return http.HttpResponseRedirect(reverse(downloads)) @login_required def channels_opml(request: http.HttpRequest): response = http.HttpResponse( content_type="application/xml", headers={"Content-Disposition": "attachment; filename=ucast_channels.opml"}, ) site_url = add_domain(get_current_site(request).domain, "", request.is_secure()) opml.write_channels_opml( Channel.objects.all(), site_url, request.user.get_feed_key(), response ) return response @login_required def search(request: http.HttpRequest): query = request.GET.get("q") vids = [] if query: vids = Video.objects.filter(downloaded__isnull=False, title__icontains=query)[ :30 ] return render(request, "ucast/search.html", {"query": query, "videos": vids}) def _channel_file(channel: str, get_file: Callable[[storage.ChannelFolder], Path]): store = storage.Storage() try: cf = store.get_channel_folder(channel) except FileNotFoundError: raise http.Http404 file_path = get_file(cf) if not os.path.isfile(file_path): raise http.Http404 if not settings.INTERNAL_REDIRECT_HEADER: return http.FileResponse(open(file_path, "rb")) file_path_rel = file_path.relative_to(store.dir_data) url_path_internal = f"/{settings.INTERNAL_FILES_ROOT}/{file_path_rel.as_posix()}" response = http.HttpResponse() response.headers[settings.INTERNAL_REDIRECT_HEADER] = url_path_internal # Content type is set to text/html by default and has to be unset del response.headers["Content-Type"] return response def login_or_key_required(function): def decorator(view_func): @wraps(view_func) def _wrapped_view(request, *args, **kwargs): key = request.GET.get("key") if key: try: request.user = User.objects.get(feed_key=key) except User.DoesNotExist: pass if request.user.is_authenticated: return view_func(request, *args, **kwargs) return http.HttpResponse("401 Unauthorized", status=401) return _wrapped_view return decorator(function) @login_or_key_required def audio(request: http.HttpRequest, channel: str, video: str): # Trim off file extension video_slug = video.rsplit(".")[0] return _channel_file(channel, lambda cf: cf.get_audio(video_slug)) @login_or_key_required def cover(request: http.HttpRequest, channel: str, video: str): # Trim off file extension video_slug = video.rsplit(".")[0] return _channel_file(channel, lambda cf: cf.get_cover(video_slug)) @login_or_key_required def thumbnail(request: http.HttpRequest, channel: str, video: str): # Trim off file extension video_slug = video.rsplit(".")[0] is_sm = "sm" in request.GET return _channel_file(channel, lambda cf: cf.get_thumbnail(video_slug, is_sm)) @login_or_key_required def avatar(request: http.HttpRequest, channel: str): # Trim off file extension channel_slug = channel.rsplit(".")[0] is_sm = "sm" in request.GET if is_sm: return _channel_file(channel_slug, lambda cf: cf.file_avatar_sm) return _channel_file(channel_slug, lambda cf: cf.file_avatar) @login_or_key_required @decorator_from_middleware(ConditionalGetMiddleware) def podcast_feed(request: http.HttpRequest, *args, **kwargs): return feed.UcastFeed()(request, *args, **kwargs)