ucast/ucast/tasks/library.py
2022-07-05 20:34:58 +02:00

154 lines
4.3 KiB
Python

import os
from django.db.models import ObjectDoesNotExist
from django.utils import timezone
from PIL import Image
from ucast import queue
from ucast.models import Channel, Video
from ucast.service import cover, storage, util, videoutil, youtube
def recreate_cover(v_id: int):
try:
video = Video.objects.get(id=v_id)
except ObjectDoesNotExist:
return
store = storage.Storage()
cf = store.get_channel_folder(video.channel.slug)
thumbnail_file = cf.get_thumbnail(video.slug)
cover_file = cf.get_cover(video.slug)
audio_file = cf.get_audio(video.slug)
if not os.path.isfile(cf.file_avatar):
raise FileNotFoundError(f"could not find avatar for channel {video.channel_id}")
if not os.path.isfile(thumbnail_file):
raise FileNotFoundError(f"could not find thumbnail for video {video.video_id}")
cover.create_cover_file(
thumbnail_file,
cf.file_avatar,
video.title,
video.channel.name,
cover.COVER_STYLE_BLUR,
cover_file,
)
videoutil.tag_audio(
audio_file,
video.title,
video.channel.name,
video.published.date(),
video.get_full_description(),
cover_file,
)
def recreate_covers():
for video in Video.objects.filter(downloaded__isnull=False):
queue.enqueue(recreate_cover, video.id)
def resize_thumbnail(v_id: int):
try:
video = Video.objects.get(id=v_id)
except ObjectDoesNotExist:
return
store = storage.Storage()
cf = store.get_channel_folder(video.channel.slug)
tn_path = cf.get_thumbnail(video.slug)
tn_img = Image.open(tn_path)
if tn_img.size != videoutil.THUMBNAIL_SIZE:
tn_img = util.resize_image(tn_img, videoutil.THUMBNAIL_SIZE)
tn_img.save(tn_path)
videoutil.resize_thumbnail(tn_path, cf.get_thumbnail(video.slug, True))
def resize_thumbnails():
"""
Used to unify thumbnail sizes for the existing collection before v0.4.2.
Needs to be triggered manually: ``manage.py rqenqueue ucast.tasks.library.resize_thumbnails``.
"""
for video in Video.objects.filter(downloaded__isnull=False):
queue.enqueue(resize_thumbnail, video.id)
def update_file_storage():
store = storage.Storage()
for video in Video.objects.all():
try:
cf = store.get_channel_folder(video.channel.slug)
except FileNotFoundError:
video.downloaded = None
video.download_size = None
video.save()
return
audio_file = cf.get_audio(video.slug)
cover_file = cf.get_cover(video.slug)
tn_file = cf.get_thumbnail(video.slug)
tn_file_sm = cf.get_thumbnail(video.slug, True)
if not os.path.isfile(audio_file) or not os.path.isfile(tn_file):
video.downloaded = None
video.download_size = None
video.save()
return
if not os.path.isfile(tn_file_sm):
videoutil.resize_thumbnail(tn_file, tn_file_sm)
if not os.path.isfile(cover_file):
recreate_cover(video)
if video.downloaded is None:
video.downloaded = timezone.now()
video.download_size = os.path.getsize(audio_file)
video.save()
def update_channel_info(ch_id: int):
try:
channel = Channel.objects.get(id=ch_id)
except ObjectDoesNotExist:
return
channel_data = youtube.get_channel_metadata(
youtube.channel_url_from_id(channel.channel_id)
)
if channel_data.avatar_url != channel.avatar_url:
store = storage.Storage()
channel_folder = store.get_or_create_channel_folder(channel.slug)
util.download_image_file(
channel_data.avatar_url, channel_folder.file_avatar, videoutil.AVATAR_SIZE
)
videoutil.resize_avatar(
channel_folder.file_avatar, channel_folder.file_avatar_sm
)
channel.avatar_url = channel_data.avatar_url
channel.name = channel_data.name
channel.description = channel_data.description
channel.subscribers = channel_data.subscribers
channel.save()
def update_channel_infos():
for channel in Channel.objects.filter(active=True):
queue.enqueue(update_channel_info, channel.id)
def clean_cache():
cache = storage.Cache()
cache.cleanup()