Compare commits
2 commits
8af98a44ae
...
4b6733b9b6
Author | SHA1 | Date | |
---|---|---|---|
4b6733b9b6 | |||
28cb58356e |
40
.drone.yml
|
@ -7,9 +7,47 @@ platform:
|
|||
arch: ''
|
||||
|
||||
steps:
|
||||
- name: Test
|
||||
- name: install dependencies
|
||||
image: thetadev256/ucast-dev
|
||||
volumes:
|
||||
- name: cache
|
||||
path: /root/.cache
|
||||
commands:
|
||||
- poetry install
|
||||
|
||||
- name: lint
|
||||
image: thetadev256/ucast-dev
|
||||
volumes:
|
||||
- name: cache
|
||||
path: /root/.cache
|
||||
commands:
|
||||
- poetry run invoke lint
|
||||
|
||||
- name: start worker
|
||||
image: thetadev256/ucast-dev
|
||||
volumes:
|
||||
- name: cache
|
||||
path: /root/.cache
|
||||
environment:
|
||||
UCAST_REDIS_HOST: redis
|
||||
commands:
|
||||
- poetry run invoke worker
|
||||
detach: true
|
||||
|
||||
- name: test
|
||||
image: thetadev256/ucast-dev
|
||||
volumes:
|
||||
- name: cache
|
||||
path: /root/.cache
|
||||
environment:
|
||||
UCAST_REDIS_HOST: redis
|
||||
commands:
|
||||
- poetry run invoke test
|
||||
|
||||
services:
|
||||
- name: redis
|
||||
image: redis:alpine
|
||||
|
||||
volumes:
|
||||
- name: cache
|
||||
temp: { }
|
||||
|
|
5
.gitignore
vendored
|
@ -14,11 +14,6 @@ node_modules
|
|||
# Jupyter
|
||||
.ipynb_checkpoints
|
||||
|
||||
# Media files
|
||||
*.webm
|
||||
*.mp4
|
||||
*.mp3
|
||||
|
||||
# Application data
|
||||
/_run*
|
||||
*.sqlite3
|
||||
|
|
|
@ -44,6 +44,9 @@ honcho = "^1.1.0"
|
|||
requires = ["poetry-core>=1.0.0"]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
DJANGO_SETTINGS_MODULE = "ucast_project.settings"
|
||||
|
||||
[tool.flake8]
|
||||
extend-ignore = "E501"
|
||||
|
||||
|
|
4
tasks.py
|
@ -91,8 +91,8 @@ def get_cover(c, vid=""):
|
|||
cv_file = tests.DIR_TESTFILES / "cover" / f"c{ti}_gradient.png"
|
||||
cv_blur_file = tests.DIR_TESTFILES / "cover" / f"c{ti}_blur.png"
|
||||
|
||||
tn_file = youtube.download_thumbnail(vinfo, tn_file)
|
||||
util.download_file(channel_metadata.avatar_url, av_file)
|
||||
youtube.download_thumbnail(vinfo, tn_file)
|
||||
util.download_image_file(channel_metadata.avatar_url, av_file)
|
||||
|
||||
cover.create_cover_file(
|
||||
tn_file, av_file, title, channel_name, cover.COVER_STYLE_GRADIENT, cv_file
|
||||
|
|
|
@ -4,8 +4,3 @@ from django.apps import AppConfig
|
|||
class UcastConfig(AppConfig):
|
||||
default_auto_field = "django.db.models.BigAutoField"
|
||||
name = "ucast"
|
||||
|
||||
def ready(self):
|
||||
from ucast.tasks import download
|
||||
|
||||
download.schedule_update_channels()
|
||||
|
|
0
ucast/management/__init__.py
Normal file
0
ucast/management/commands/__init__.py
Normal file
11
ucast/management/commands/rqscheduler.py
Normal file
|
@ -0,0 +1,11 @@
|
|||
from django_rq.management.commands import rqscheduler
|
||||
|
||||
from ucast.tasks import schedule
|
||||
|
||||
|
||||
class Command(rqscheduler.Command):
|
||||
def handle(self, *args, **kwargs):
|
||||
print("Starting ucast scheduler")
|
||||
schedule.clear_scheduled_jobs()
|
||||
schedule.register_scheduled_jobs()
|
||||
super(Command, self).handle(*args, **kwargs)
|
|
@ -197,9 +197,9 @@ def _get_baseimage(thumbnail: Image.Image, style: CoverStyle):
|
|||
ctn_width = int(COVER_WIDTH / thumbnail.height * thumbnail.width)
|
||||
ctn_x_left = int((ctn_width - COVER_WIDTH) / 2)
|
||||
|
||||
ctn = thumbnail.resize((ctn_width, COVER_WIDTH), Image.LANCZOS).filter(
|
||||
ImageFilter.GaussianBlur(20)
|
||||
)
|
||||
ctn = thumbnail.resize(
|
||||
(ctn_width, COVER_WIDTH), Image.Resampling.LANCZOS
|
||||
).filter(ImageFilter.GaussianBlur(20))
|
||||
cover.paste(ctn, (-ctn_x_left, 0))
|
||||
|
||||
return cover
|
||||
|
@ -219,9 +219,9 @@ def _resize_thumbnail(thumbnail: Image.Image) -> Image.Image:
|
|||
tn_crop_y_top = int((tn_resize_height - tn_height) / 2)
|
||||
tn_crop_y_bottom = tn_resize_height - tn_crop_y_top
|
||||
|
||||
return thumbnail.resize((COVER_WIDTH, tn_resize_height), Image.LANCZOS).crop(
|
||||
(0, tn_crop_y_top, COVER_WIDTH, tn_crop_y_bottom)
|
||||
)
|
||||
return thumbnail.resize(
|
||||
(COVER_WIDTH, tn_resize_height), Image.Resampling.LANCZOS
|
||||
).crop((0, tn_crop_y_top, COVER_WIDTH, tn_crop_y_bottom))
|
||||
|
||||
|
||||
def _prepare_text_background(
|
||||
|
@ -357,7 +357,7 @@ def _draw_text_avatar(
|
|||
)
|
||||
|
||||
if avatar:
|
||||
avt = avatar.resize((avt_size, avt_size), Image.LANCZOS)
|
||||
avt = avatar.resize((avt_size, avt_size), Image.Resampling.LANCZOS)
|
||||
|
||||
circle_mask = Image.new("L", (avt_size, avt_size))
|
||||
circle_mask_draw = ImageDraw.Draw(circle_mask)
|
||||
|
|
|
@ -1,32 +1,11 @@
|
|||
import os
|
||||
from pathlib import Path
|
||||
from typing import Tuple
|
||||
|
||||
import slugify
|
||||
from django.conf import settings
|
||||
|
||||
UCAST_DIRNAME = "_ucast"
|
||||
|
||||
|
||||
def _get_slug(str_in: str) -> str:
|
||||
return slugify.slugify(str_in, lowercase=False, separator="_")
|
||||
|
||||
|
||||
def _get_unique_slug(str_in: str, root_dir: Path, extension="") -> Tuple[Path, str]:
|
||||
original_slug = _get_slug(str_in)
|
||||
slug = original_slug
|
||||
i = 0
|
||||
|
||||
while True:
|
||||
testfile = root_dir / (slug + extension)
|
||||
|
||||
if not testfile.exists():
|
||||
return testfile, slug
|
||||
|
||||
i += 1
|
||||
slug = f"{original_slug}_{i}"
|
||||
|
||||
|
||||
class ChannelFolder:
|
||||
def __init__(self, dir_root: Path):
|
||||
self.dir_root = dir_root
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import shutil
|
||||
import io
|
||||
from pathlib import Path
|
||||
|
||||
import requests
|
||||
|
@ -15,33 +15,45 @@ def download_file(url: str, download_path: Path):
|
|||
open(download_path, "wb").write(r.content)
|
||||
|
||||
|
||||
def download_image_file(url: str, download_path: Path) -> Path:
|
||||
download_file(url, download_path)
|
||||
img = Image.open(download_path)
|
||||
img_ext = img.format.lower()
|
||||
img.close()
|
||||
def download_image_file(url: str, download_path: Path):
|
||||
"""
|
||||
Download an image and convert it to the type given
|
||||
by the path.
|
||||
|
||||
:param url: Image URL
|
||||
:param download_path: Download path
|
||||
"""
|
||||
r = requests.get(url, allow_redirects=True)
|
||||
r.raise_for_status()
|
||||
|
||||
img = Image.open(io.BytesIO(r.content))
|
||||
img_ext = img.format.lower()
|
||||
if img_ext == "jpeg":
|
||||
img_ext = "jpg"
|
||||
|
||||
new_path = download_path.with_suffix("." + img_ext)
|
||||
shutil.move(download_path, new_path)
|
||||
return new_path
|
||||
if "." + img_ext == download_path.suffix:
|
||||
open(download_path, "wb").write(r.content)
|
||||
else:
|
||||
img.save(download_path)
|
||||
|
||||
|
||||
def resize_avatar(original_file: Path, new_file: Path):
|
||||
avatar = Image.open(original_file)
|
||||
avatar_new_height = int(AVATAR_SM_WIDTH / avatar.width * avatar.height)
|
||||
avatar = avatar.resize((AVATAR_SM_WIDTH, avatar_new_height), Image.LANCZOS)
|
||||
avatar = avatar.resize(
|
||||
(AVATAR_SM_WIDTH, avatar_new_height), Image.Resampling.LANCZOS
|
||||
)
|
||||
avatar.save(new_file)
|
||||
|
||||
|
||||
def resize_thumbnail(original_file: Path, new_file: Path):
|
||||
thumbnail = Image.open(original_file)
|
||||
tn_new_height = int(THUMBNAIL_SM_WIDTH / thumbnail.width * thumbnail.height)
|
||||
thumbnail = thumbnail.resize((THUMBNAIL_SM_WIDTH, tn_new_height), Image.LANCZOS)
|
||||
thumbnail = thumbnail.resize(
|
||||
(THUMBNAIL_SM_WIDTH, tn_new_height), Image.Resampling.LANCZOS
|
||||
)
|
||||
thumbnail.save(new_file)
|
||||
|
||||
|
||||
def get_slug(str_in: str) -> str:
|
||||
return slugify.slugify(str_in, lowercase=False, separator="_")
|
||||
def get_slug(text: str) -> str:
|
||||
return slugify.slugify(text, lowercase=False, separator="_")
|
||||
|
|
|
@ -92,7 +92,7 @@ class ChannelMetadata:
|
|||
avatar_url: str
|
||||
|
||||
|
||||
def download_thumbnail(vinfo: VideoDetails, download_path: Path) -> Path:
|
||||
def download_thumbnail(vinfo: VideoDetails, download_path: Path):
|
||||
"""
|
||||
Download the thumbnail image of a YouTube video and save it at the given filepath.
|
||||
The thumbnail file ending is added to the path.
|
||||
|
@ -108,7 +108,8 @@ def download_thumbnail(vinfo: VideoDetails, download_path: Path) -> Path:
|
|||
logging.info(f"downloading thumbnail {url}...")
|
||||
|
||||
try:
|
||||
return util.download_image_file(url, download_path)
|
||||
util.download_image_file(url, download_path)
|
||||
return
|
||||
except requests.HTTPError:
|
||||
logging.warning(f"downloading thumbnail {url} failed")
|
||||
pass
|
||||
|
@ -158,20 +159,20 @@ def download_audio(
|
|||
|
||||
def tag_audio(audio_path: Path, vinfo: VideoDetails, cover_path: Path):
|
||||
title_text = f"{vinfo.published.date().isoformat()} {vinfo.title}"
|
||||
comment = f"https://youtu.be/{vinfo.id}\n\n{vinfo.description}"
|
||||
|
||||
audio = id3.ID3(audio_path)
|
||||
audio["TPE1"] = id3.TPE1(encoding=3, text=vinfo.channel_name) # Artist
|
||||
audio["TALB"] = id3.TALB(encoding=3, text=vinfo.channel_name) # Album
|
||||
audio["TIT2"] = id3.TIT2(encoding=3, text=title_text) # Title
|
||||
audio["TYER"] = id3.TYER(encoding=3, text=str(vinfo.published.year)) # Year
|
||||
audio["TDAT"] = id3.TDAT(encoding=3, text=vinfo.published.strftime("%d%m")) # Date
|
||||
audio["COMM"] = id3.COMM(encoding=3, text=f"YT-ID: {vinfo.id}") # Comment
|
||||
tag = id3.ID3(audio_path)
|
||||
tag["TPE1"] = id3.TPE1(encoding=3, text=vinfo.channel_name) # Artist
|
||||
tag["TALB"] = id3.TALB(encoding=3, text=vinfo.channel_name) # Album
|
||||
tag["TIT2"] = id3.TIT2(encoding=3, text=title_text) # Title
|
||||
tag["TDRC"] = id3.TDRC(encoding=3, text=vinfo.published.date().isoformat()) # Date
|
||||
tag["COMM"] = id3.COMM(encoding=3, text=comment) # Comment
|
||||
|
||||
with open(cover_path, "rb") as albumart:
|
||||
audio["APIC"] = id3.APIC(
|
||||
tag["APIC"] = id3.APIC(
|
||||
encoding=3, mime="image/png", type=3, desc="Cover", data=albumart.read()
|
||||
)
|
||||
audio.save()
|
||||
tag.save()
|
||||
|
||||
|
||||
def channel_url_from_id(channel_id: str) -> str:
|
||||
|
@ -231,22 +232,6 @@ def get_channel_metadata(channel_url: str) -> ChannelMetadata:
|
|||
return ChannelMetadata(channel_id, name, description, avatar)
|
||||
|
||||
|
||||
def download_avatar(avatar_url: str, download_path: Path) -> Path:
|
||||
"""
|
||||
Download the avatar image of a channel. The .jpg file ending
|
||||
is added to the path.
|
||||
|
||||
:param avatar_url: Channel avatar URL
|
||||
:param download_path: Download path
|
||||
:return: Path with file ending
|
||||
"""
|
||||
logging.info(f"downloading avatar {avatar_url}...")
|
||||
|
||||
download_path = download_path.with_suffix(".jpg")
|
||||
util.download_file(avatar_url, download_path)
|
||||
return download_path
|
||||
|
||||
|
||||
def get_channel_videos_from_feed(channel_id: str) -> List[VideoScraped]:
|
||||
"""
|
||||
Return videos of a channel using YouTube's RSS feed. Using the feed is fast,
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
import os
|
||||
from datetime import datetime
|
||||
|
||||
import django_rq
|
||||
from django.conf import settings
|
||||
from django.utils import timezone
|
||||
|
||||
from ucast.models import Channel, Video
|
||||
|
@ -21,10 +19,8 @@ def _get_or_create_channel(channel_id: str) -> Channel:
|
|||
channel_slug = Channel.get_new_slug(channel_data.name)
|
||||
channel_folder = store.get_channel_folder(channel_slug)
|
||||
|
||||
avatar_file = youtube.download_avatar(
|
||||
channel_data.avatar_url, channel_folder.file_avatar
|
||||
)
|
||||
util.resize_avatar(avatar_file, channel_folder.file_avatar_sm)
|
||||
util.download_image_file(channel_data.avatar_url, channel_folder.file_avatar)
|
||||
util.resize_avatar(channel_folder.file_avatar, channel_folder.file_avatar_sm)
|
||||
|
||||
channel = Channel(
|
||||
id=channel_id,
|
||||
|
@ -81,9 +77,8 @@ def download_video(video: Video):
|
|||
details = youtube.download_audio(video.id, audio_file)
|
||||
|
||||
# Download/convert thumbnails
|
||||
tn_path = youtube.download_thumbnail(
|
||||
details, channel_folder.get_thumbnail(video.slug)
|
||||
)
|
||||
tn_path = channel_folder.get_thumbnail(video.slug)
|
||||
youtube.download_thumbnail(details, tn_path)
|
||||
util.resize_thumbnail(tn_path, channel_folder.get_thumbnail(video.slug, True))
|
||||
cover_file = channel_folder.get_cover(video.slug)
|
||||
cover.create_cover_file(
|
||||
|
@ -120,15 +115,6 @@ def import_channel(channel_id: str, limit: int = None):
|
|||
_load_scraped_video(vid, channel)
|
||||
|
||||
|
||||
def schedule_update_channels():
|
||||
django_rq.get_scheduler().schedule(
|
||||
datetime.utcnow(),
|
||||
update_channels,
|
||||
id="schedule_update_channels",
|
||||
interval=settings.YT_UPDATE_INTERVAL,
|
||||
)
|
||||
|
||||
|
||||
def update_channels():
|
||||
"""
|
||||
Update all channels from their RSS feeds and download new videos.
|
||||
|
|
27
ucast/tasks/schedule.py
Normal file
|
@ -0,0 +1,27 @@
|
|||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
import django_rq
|
||||
from django.conf import settings
|
||||
|
||||
from ucast.tasks import download
|
||||
|
||||
scheduler = django_rq.get_scheduler()
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def clear_scheduled_jobs():
|
||||
"""Delete all scheduled jobs to prevent duplicates"""
|
||||
for job in scheduler.get_jobs():
|
||||
log.debug("Deleting scheduled job %s", job)
|
||||
job.delete()
|
||||
|
||||
|
||||
def register_scheduled_jobs():
|
||||
"""Register all scheduled jobs"""
|
||||
scheduler.schedule(
|
||||
datetime.utcnow(),
|
||||
download.update_channels,
|
||||
id="schedule_update_channels",
|
||||
interval=settings.YT_UPDATE_INTERVAL,
|
||||
)
|
|
@ -1,3 +1,3 @@
|
|||
from importlib.resources import files
|
||||
|
||||
DIR_TESTFILES = files("ucast.tests.testfiles")
|
||||
DIR_TESTFILES = files("ucast.tests._testfiles")
|
||||
|
|
BIN
ucast/tests/_testfiles/audio/audio1.mp3
Normal file
Before Width: | Height: | Size: 186 KiB After Width: | Height: | Size: 186 KiB |
Before Width: | Height: | Size: 32 KiB After Width: | Height: | Size: 32 KiB |
Before Width: | Height: | Size: 53 KiB After Width: | Height: | Size: 53 KiB |
Before Width: | Height: | Size: 26 KiB After Width: | Height: | Size: 26 KiB |
Before Width: | Height: | Size: 268 KiB After Width: | Height: | Size: 268 KiB |
Before Width: | Height: | Size: 234 KiB After Width: | Height: | Size: 234 KiB |
Before Width: | Height: | Size: 218 KiB After Width: | Height: | Size: 218 KiB |
Before Width: | Height: | Size: 215 KiB After Width: | Height: | Size: 215 KiB |
Before Width: | Height: | Size: 183 KiB After Width: | Height: | Size: 183 KiB |
Before Width: | Height: | Size: 216 KiB After Width: | Height: | Size: 216 KiB |
Before Width: | Height: | Size: 173 KiB After Width: | Height: | Size: 173 KiB |
|
@ -1,8 +1,9 @@
|
|||
### Quellen der Thumbnails/Avatarbilder zum Testen
|
||||
### Quellen der Thumbnails/Avatarbilder/Audiodateien zum Testen
|
||||
|
||||
- a1/t1: [ThetaDev @ Embedded World 2019](https://www.youtube.com/watch?v=ZPxEr4YdWt8), by [ThetaDev](https://www.youtube.com/channel/UCGiJh0NZ52wRhYKYnuZI08Q) (CC-BY)
|
||||
- a2/t2: [Sintel - Open Movie by Blender Foundation](https://www.youtube.com/watch?v=eRsGyueVLvQ), by [Blender](https://www.youtube.com/c/BlenderFoundation) (CC-BY)
|
||||
- a3/t3: [Systemabsturz Teaser zur DiVOC bb3](https://www.youtube.com/watch?v=uFqgQ35wyYY), by [media.ccc.de](https://www.youtube.com/channel/UC2TXq_t06Hjdr2g_KdKpHQg) (CC-BY)
|
||||
- audio1: [No copyright intro free fire intro](https://www.youtube.com/watch?v=I0RRENheeTo), by [Shahzaib Hassan](https://www.youtube.com/channel/UCmLTTbctUZobNQrr8RtX8uQ), (CC-BY)
|
||||
|
||||
### Weitere Testvideos
|
||||
|
Before Width: | Height: | Size: 92 KiB After Width: | Height: | Size: 92 KiB |
Before Width: | Height: | Size: 20 KiB After Width: | Height: | Size: 20 KiB |
Before Width: | Height: | Size: 28 KiB After Width: | Height: | Size: 28 KiB |
0
ucast/tests/service/__init__.py
Normal file
56
ucast/tests/service/test_storage.py
Normal file
|
@ -0,0 +1,56 @@
|
|||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
from ucast.service import storage
|
||||
|
||||
|
||||
def test_create_channel_folders(settings):
|
||||
tmpdir_o = tempfile.TemporaryDirectory()
|
||||
tmpdir = Path(tmpdir_o.name)
|
||||
settings.DOWNLOAD_ROOT = tmpdir
|
||||
|
||||
store = storage.Storage()
|
||||
cf1 = store.get_channel_folder("ThetaDev")
|
||||
cf2 = store.get_channel_folder("Jeff_Geerling")
|
||||
cf1b = store.get_channel_folder("ThetaDev")
|
||||
|
||||
cf1_path = tmpdir / "ThetaDev"
|
||||
cf2_path = tmpdir / "Jeff_Geerling"
|
||||
|
||||
assert cf1.dir_root == cf1_path
|
||||
assert cf1b.dir_root == cf1_path
|
||||
assert cf2.dir_root == cf2_path
|
||||
|
||||
assert os.path.isdir(cf1_path)
|
||||
assert os.path.isdir(cf2_path)
|
||||
|
||||
|
||||
def test_channel_folder():
|
||||
tmpdir_o = tempfile.TemporaryDirectory()
|
||||
tmpdir = Path(tmpdir_o.name)
|
||||
ucast_dir = tmpdir / "_ucast"
|
||||
|
||||
cf = storage.ChannelFolder(tmpdir)
|
||||
|
||||
# Verify internal paths
|
||||
assert cf.file_avatar == ucast_dir / "avatar.jpg"
|
||||
assert cf.file_avatar_sm == ucast_dir / "avatar_sm.webp"
|
||||
assert cf.dir_covers == ucast_dir / "covers"
|
||||
assert cf.dir_thumbnails == ucast_dir / "thumbnails"
|
||||
|
||||
# Create the folder
|
||||
assert not cf.does_exist()
|
||||
cf.create()
|
||||
assert cf.does_exist()
|
||||
|
||||
assert cf.get_cover("my_video_title") == ucast_dir / "covers" / "my_video_title.png"
|
||||
assert (
|
||||
cf.get_thumbnail("my_video_title")
|
||||
== ucast_dir / "thumbnails" / "my_video_title.webp"
|
||||
)
|
||||
assert (
|
||||
cf.get_thumbnail("my_video_title", True)
|
||||
== ucast_dir / "thumbnails" / "my_video_title_sm.webp"
|
||||
)
|
||||
assert cf.get_audio("my_video_title") == tmpdir / "my_video_title.mp3"
|
92
ucast/tests/service/test_util.py
Normal file
|
@ -0,0 +1,92 @@
|
|||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from PIL import Image, ImageChops
|
||||
|
||||
from ucast import tests
|
||||
from ucast.service import util
|
||||
|
||||
TEST_FILE_URL = "https://yt3.ggpht.com/ytc/AKedOLSnFfmpibLLoqyaYdsF6bJ-zaLPzomII__FrJve1w=s900-c-k-c0x00ffffff-no-rj"
|
||||
|
||||
|
||||
def test_download_file():
|
||||
tmpdir_o = tempfile.TemporaryDirectory()
|
||||
tmpdir = Path(tmpdir_o.name)
|
||||
download_file = tmpdir / "download.jpg"
|
||||
expected_tn_file = tests.DIR_TESTFILES / "avatar" / "a1.jpg"
|
||||
|
||||
util.download_file(TEST_FILE_URL, download_file)
|
||||
|
||||
downloaded_avatar = Image.open(download_file)
|
||||
expected_avatar = Image.open(expected_tn_file)
|
||||
|
||||
diff = ImageChops.difference(downloaded_avatar, expected_avatar)
|
||||
assert diff.getbbox() is None
|
||||
|
||||
|
||||
def test_download_image_file():
|
||||
tmpdir_o = tempfile.TemporaryDirectory()
|
||||
tmpdir = Path(tmpdir_o.name)
|
||||
download_file = tmpdir / "download.jpg"
|
||||
expected_tn_file = tests.DIR_TESTFILES / "avatar" / "a1.jpg"
|
||||
|
||||
util.download_image_file(TEST_FILE_URL, download_file)
|
||||
|
||||
downloaded_avatar = Image.open(download_file)
|
||||
expected_avatar = Image.open(expected_tn_file)
|
||||
|
||||
diff = ImageChops.difference(downloaded_avatar, expected_avatar)
|
||||
assert diff.getbbox() is None
|
||||
|
||||
|
||||
def test_download_image_file_conv():
|
||||
tmpdir_o = tempfile.TemporaryDirectory()
|
||||
tmpdir = Path(tmpdir_o.name)
|
||||
download_file = tmpdir / "download.png"
|
||||
expected_tn_file = tests.DIR_TESTFILES / "avatar" / "a1.jpg"
|
||||
|
||||
util.download_image_file(TEST_FILE_URL, download_file)
|
||||
|
||||
downloaded_avatar = Image.open(download_file)
|
||||
expected_avatar = Image.open(expected_tn_file)
|
||||
|
||||
diff = ImageChops.difference(downloaded_avatar, expected_avatar)
|
||||
assert diff.getbbox() is None
|
||||
|
||||
|
||||
def test_resize_avatar():
|
||||
tmpdir_o = tempfile.TemporaryDirectory()
|
||||
tmpdir = Path(tmpdir_o.name)
|
||||
source_file = tests.DIR_TESTFILES / "avatar" / "a1.jpg"
|
||||
resized_file = tmpdir / "avatar.webp"
|
||||
|
||||
util.resize_avatar(source_file, resized_file)
|
||||
|
||||
resized_avatar = Image.open(resized_file)
|
||||
assert resized_avatar.size == (100, 100)
|
||||
|
||||
|
||||
def test_resize_thumbnail():
|
||||
tmpdir_o = tempfile.TemporaryDirectory()
|
||||
tmpdir = Path(tmpdir_o.name)
|
||||
source_file = tests.DIR_TESTFILES / "thumbnail" / "t1.webp"
|
||||
resized_file = tmpdir / "thumbnail.webp"
|
||||
|
||||
util.resize_thumbnail(source_file, resized_file)
|
||||
|
||||
resized_thumbnail = Image.open(resized_file)
|
||||
assert resized_thumbnail.size == (360, 202)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"text,expected_slug",
|
||||
[
|
||||
("Hello World 👋", "Hello_World"),
|
||||
("ÄäÖöÜüß", "AaOoUuss"),
|
||||
("오징어 게임", "ojingeo_geim"),
|
||||
],
|
||||
)
|
||||
def test_slug(text: str, expected_slug: str):
|
||||
slug = util.get_slug(text)
|
||||
assert slug == expected_slug
|
214
ucast/tests/service/test_youtube.py
Normal file
|
@ -0,0 +1,214 @@
|
|||
import datetime
|
||||
import io
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from mutagen import id3
|
||||
from PIL import Image, ImageChops
|
||||
|
||||
from ucast import tests
|
||||
from ucast.service import youtube
|
||||
|
||||
VIDEO_ID_THETADEV = "ZPxEr4YdWt8"
|
||||
VIDEO_ID_SHORT = "lcQZ6YwQHiw"
|
||||
VIDEO_ID_PERSUASION = "DWjFW7Yq1fA"
|
||||
|
||||
CHANNEL_ID_THETADEV = "UCGiJh0NZ52wRhYKYnuZI08Q"
|
||||
CHANNEL_ID_BLENDER = "UCSMOQeBJ2RAnuFungnQOxLg"
|
||||
CHANNEL_URL_BLENDER = "https://www.youtube.com/c/BlenderFoundation"
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def video_details() -> youtube.VideoDetails:
|
||||
return youtube.get_video_details(VIDEO_ID_THETADEV)
|
||||
|
||||
|
||||
def test_download_thumbnail(video_details):
|
||||
tmpdir_o = tempfile.TemporaryDirectory()
|
||||
tmpdir = Path(tmpdir_o.name)
|
||||
tn_file = tmpdir / "thumbnail.webp"
|
||||
expected_tn_file = tests.DIR_TESTFILES / "thumbnail" / "t1.webp"
|
||||
|
||||
youtube.download_thumbnail(video_details, tn_file)
|
||||
|
||||
tn = Image.open(tn_file)
|
||||
expected_tn = Image.open(expected_tn_file)
|
||||
|
||||
diff = ImageChops.difference(tn, expected_tn)
|
||||
assert diff.getbbox() is None
|
||||
|
||||
|
||||
def test_get_video_details(video_details):
|
||||
assert video_details.id == VIDEO_ID_THETADEV
|
||||
assert video_details.title == "ThetaDev @ Embedded World 2019"
|
||||
assert video_details.channel_id == "UCGiJh0NZ52wRhYKYnuZI08Q"
|
||||
assert (
|
||||
video_details.description
|
||||
== """This february I spent one day at the Embedded World in Nuremberg. They showed tons of interesting electronics stuff, so I had to take some pictures and videos for you to see ;-)
|
||||
|
||||
Sorry for the late upload, I just didn't have time to edit my footage.
|
||||
|
||||
Embedded World: https://www.embedded-world.de/
|
||||
|
||||
My website: https://thdev.org
|
||||
Twitter: https://twitter.com/Theta_Dev"""
|
||||
)
|
||||
assert video_details.duration == 267
|
||||
assert not video_details.is_currently_live
|
||||
assert not video_details.is_livestream
|
||||
assert not video_details.is_short
|
||||
assert video_details.published == datetime.datetime(
|
||||
2019, 6, 2, tzinfo=datetime.timezone.utc
|
||||
)
|
||||
|
||||
|
||||
def test_get_video_details_short():
|
||||
vinfo = youtube.get_video_details(VIDEO_ID_SHORT)
|
||||
assert vinfo.id == VIDEO_ID_SHORT
|
||||
assert (
|
||||
vinfo.title
|
||||
== "Small pink flowers | #shorts | Free Stock Video | \
|
||||
creative commons short videos | creative #short"
|
||||
)
|
||||
assert not vinfo.is_currently_live
|
||||
assert not vinfo.is_livestream
|
||||
assert vinfo.is_short
|
||||
|
||||
|
||||
def test_download_audio():
|
||||
tmpdir_o = tempfile.TemporaryDirectory()
|
||||
tmpdir = Path(tmpdir_o.name)
|
||||
download_file = tmpdir / "download.mp3"
|
||||
|
||||
vinfo = youtube.download_audio(VIDEO_ID_PERSUASION, download_file)
|
||||
assert vinfo.id == VIDEO_ID_PERSUASION
|
||||
assert vinfo.title == "Persuasion (Instrumental) – RYYZN (No Copyright Music)"
|
||||
assert vinfo.duration == 100
|
||||
|
||||
# Check with ffmpeg if the audio file is valid
|
||||
res = subprocess.run(
|
||||
["ffmpeg", "-i", str(download_file)],
|
||||
capture_output=True,
|
||||
universal_newlines=True,
|
||||
)
|
||||
assert "Stream #0:0: Audio: mp3" in res.stderr
|
||||
|
||||
match = re.search(r"Duration: (\d{2}:\d{2}:\d{2})", res.stderr)
|
||||
assert match[1] == "00:01:40"
|
||||
|
||||
|
||||
def test_tag_audio(video_details):
|
||||
tmpdir_o = tempfile.TemporaryDirectory()
|
||||
tmpdir = Path(tmpdir_o.name)
|
||||
audio_file = tmpdir / "audio.mp3"
|
||||
cover_file = tests.DIR_TESTFILES / "cover" / "c1_blur.png"
|
||||
shutil.copyfile(tests.DIR_TESTFILES / "audio" / "audio1.mp3", audio_file)
|
||||
|
||||
youtube.tag_audio(audio_file, video_details, cover_file)
|
||||
|
||||
tag = id3.ID3(audio_file)
|
||||
assert tag["TPE1"].text[0] == "ThetaDev"
|
||||
assert tag["TALB"].text[0] == "ThetaDev"
|
||||
assert tag["TIT2"].text[0] == "2019-06-02 ThetaDev @ Embedded World 2019"
|
||||
assert tag["TDRC"].text[0].text == "2019-06-02"
|
||||
assert (
|
||||
tag["COMM::XXX"].text[0]
|
||||
== """https://youtu.be/ZPxEr4YdWt8
|
||||
|
||||
This february I spent one day at the Embedded World in Nuremberg. They showed tons of interesting electronics stuff, so I had to take some pictures and videos for you to see ;-)
|
||||
|
||||
Sorry for the late upload, I just didn't have time to edit my footage.
|
||||
|
||||
Embedded World: https://www.embedded-world.de/
|
||||
|
||||
My website: https://thdev.org
|
||||
Twitter: https://twitter.com/Theta_Dev"""
|
||||
)
|
||||
|
||||
tag_cover = tag["APIC:Cover"]
|
||||
assert tag_cover.mime == "image/png"
|
||||
|
||||
tag_cover_img = Image.open(io.BytesIO(tag_cover.data))
|
||||
expected_cover_img = Image.open(cover_file)
|
||||
diff = ImageChops.difference(tag_cover_img, expected_cover_img)
|
||||
assert diff.getbbox() is None
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"channel_str,channel_url",
|
||||
[
|
||||
(
|
||||
"https://www.youtube.com/channel/UCGiJh0NZ52wRhYKYnuZI08Q",
|
||||
"https://www.youtube.com/channel/UCGiJh0NZ52wRhYKYnuZI08Q",
|
||||
),
|
||||
(
|
||||
"https://www.youtube.com/c/MrBeast6000",
|
||||
"https://www.youtube.com/c/MrBeast6000",
|
||||
),
|
||||
(
|
||||
"https://www.youtube.com/user/LinusTechTips",
|
||||
"https://www.youtube.com/user/LinusTechTips",
|
||||
),
|
||||
(
|
||||
"UCGiJh0NZ52wRhYKYnuZI08Q",
|
||||
"https://www.youtube.com/channel/UCGiJh0NZ52wRhYKYnuZI08Q",
|
||||
),
|
||||
(
|
||||
"https://piped.mha.fi/user/LinusTechTips",
|
||||
"https://www.youtube.com/user/LinusTechTips",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_channel_url_from_str(channel_str: str, channel_url: str):
|
||||
url = youtube.channel_url_from_str(channel_str)
|
||||
assert url == channel_url
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"channel_url,channel_id,name,avatar_url",
|
||||
[
|
||||
(
|
||||
youtube.channel_url_from_id(CHANNEL_ID_THETADEV),
|
||||
CHANNEL_ID_THETADEV,
|
||||
"ThetaDev",
|
||||
"https://yt3.ggpht.com/ytc/AKedOLSnFfmpibLLoqyaYdsF6bJ-zaLPzomII__FrJve1w=s900-c-k-c0x00ffffff-no-rj",
|
||||
),
|
||||
(
|
||||
CHANNEL_URL_BLENDER,
|
||||
CHANNEL_ID_BLENDER,
|
||||
"Blender",
|
||||
"https://yt3.ggpht.com/ytc/AKedOLT_31fFSD3FWEBnHZnyZeJx-GPHJwYCQKcEpaq8NQ=s900-c-k-c0x00ffffff-no-rj",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_channel_metadata(
|
||||
channel_url: str, channel_id: str, name: str, avatar_url: str
|
||||
):
|
||||
metadata = youtube.get_channel_metadata(channel_url)
|
||||
assert metadata.id == channel_id
|
||||
assert metadata.name == name
|
||||
assert metadata.avatar_url == avatar_url
|
||||
assert metadata.description
|
||||
|
||||
|
||||
def test_get_channel_videos_from_feed():
|
||||
videos = youtube.get_channel_videos_from_feed(CHANNEL_ID_THETADEV)
|
||||
assert videos
|
||||
|
||||
v1 = videos[0]
|
||||
assert len(v1.id) == 11
|
||||
assert v1.published.tzinfo == datetime.timezone.utc
|
||||
assert v1.published.second > 0 or v1.published.minute > 0 or v1.published.hour > 0
|
||||
|
||||
|
||||
def test_get_channel_videos_from_scraper():
|
||||
videos = youtube.get_channel_videos_from_scraper(CHANNEL_ID_THETADEV)
|
||||
assert videos
|
||||
|
||||
v1 = videos[0]
|
||||
assert len(v1.id) == 11
|
||||
assert v1.published is None
|
|
@ -1,24 +0,0 @@
|
|||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
from PIL import Image, ImageChops
|
||||
|
||||
from ucast import tests
|
||||
from ucast.service import util
|
||||
|
||||
TEST_FILE_URL = "https://yt3.ggpht.com/ytc/AKedOLSnFfmpibLLoqyaYdsF6bJ-zaLPzomII__FrJve1w=s900-c-k-c0x00ffffff-no-rj"
|
||||
|
||||
|
||||
def test_download_file():
|
||||
tmpdir_o = tempfile.TemporaryDirectory()
|
||||
tmpdir = Path(tmpdir_o.name)
|
||||
download_file = tmpdir / "download.jpg"
|
||||
expected_tn_file = tests.DIR_TESTFILES / "avatar" / "a1.jpg"
|
||||
|
||||
util.download_file(TEST_FILE_URL, download_file)
|
||||
|
||||
downloaded_avatar = Image.open(download_file)
|
||||
expected_avatar = Image.open(expected_tn_file)
|
||||
|
||||
diff = ImageChops.difference(downloaded_avatar, expected_avatar)
|
||||
assert diff.getbbox() is None
|
|
@ -1,126 +0,0 @@
|
|||
import datetime
|
||||
import re
|
||||
import subprocess
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from PIL import Image, ImageChops
|
||||
|
||||
from ucast import tests
|
||||
from ucast.service import youtube
|
||||
|
||||
VIDEO_ID_THETADEV = "ZPxEr4YdWt8"
|
||||
VIDEO_ID_SHORT = "lcQZ6YwQHiw"
|
||||
VIDEO_ID_PERSUASION = "DWjFW7Yq1fA"
|
||||
|
||||
CHANNEL_ID_THETADEV = "UCGiJh0NZ52wRhYKYnuZI08Q"
|
||||
CHANNEL_ID_BLENDER = "UCSMOQeBJ2RAnuFungnQOxLg"
|
||||
CHANNEL_URL_BLENDER = "https://www.youtube.com/c/BlenderFoundation"
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def video_info() -> youtube.VideoDetails:
|
||||
return youtube.get_video_details(VIDEO_ID_THETADEV)
|
||||
|
||||
|
||||
def test_download_thumbnail(video_info):
|
||||
tmpdir_o = tempfile.TemporaryDirectory()
|
||||
tmpdir = Path(tmpdir_o.name)
|
||||
tn_file = tmpdir / "thumbnail"
|
||||
expected_tn_file = tests.DIR_TESTFILES / "thumbnail" / "t1.webp"
|
||||
|
||||
tn_file = youtube.download_thumbnail(video_info, tn_file)
|
||||
assert tn_file.suffix == ".webp"
|
||||
|
||||
tn = Image.open(tn_file)
|
||||
expected_tn = Image.open(expected_tn_file)
|
||||
|
||||
diff = ImageChops.difference(tn, expected_tn)
|
||||
assert diff.getbbox() is None
|
||||
|
||||
|
||||
def test_get_video_info(video_info):
|
||||
assert video_info.id == VIDEO_ID_THETADEV
|
||||
assert video_info.title == "ThetaDev @ Embedded World 2019"
|
||||
assert video_info.channel_id == "UCGiJh0NZ52wRhYKYnuZI08Q"
|
||||
assert (
|
||||
video_info.description
|
||||
== """This february I spent one day at the Embedded World in Nuremberg. They showed tons of interesting electronics stuff, so I had to take some pictures and videos for you to see ;-)
|
||||
|
||||
Sorry for the late upload, I just didn't have time to edit my footage.
|
||||
|
||||
Embedded World: https://www.embedded-world.de/
|
||||
|
||||
My website: https://thdev.org
|
||||
Twitter: https://twitter.com/Theta_Dev"""
|
||||
)
|
||||
assert video_info.duration == 267
|
||||
assert not video_info.is_currently_live
|
||||
assert not video_info.is_livestream
|
||||
assert not video_info.is_short
|
||||
assert video_info.published == datetime.datetime(
|
||||
2019, 6, 2, tzinfo=datetime.timezone.utc
|
||||
)
|
||||
|
||||
|
||||
def test_get_video_info_short():
|
||||
vinfo = youtube.get_video_details(VIDEO_ID_SHORT)
|
||||
assert vinfo.id == VIDEO_ID_SHORT
|
||||
assert (
|
||||
vinfo.title
|
||||
== "Small pink flowers | #shorts | Free Stock Video | \
|
||||
creative commons short videos | creative #short"
|
||||
)
|
||||
assert not vinfo.is_currently_live
|
||||
assert not vinfo.is_livestream
|
||||
assert vinfo.is_short
|
||||
|
||||
|
||||
def test_download_video():
|
||||
tmpdir_o = tempfile.TemporaryDirectory()
|
||||
tmpdir = Path(tmpdir_o.name)
|
||||
download_file = tmpdir / "download.mp3"
|
||||
|
||||
vinfo = youtube.download_audio(VIDEO_ID_PERSUASION, download_file)
|
||||
assert vinfo.id == VIDEO_ID_PERSUASION
|
||||
assert vinfo.title == "Persuasion (Instrumental) – RYYZN (No Copyright Music)"
|
||||
assert vinfo.duration == 100
|
||||
|
||||
# Check with ffmpeg if the audio file is valid
|
||||
res = subprocess.run(
|
||||
["ffmpeg", "-i", str(download_file)],
|
||||
capture_output=True,
|
||||
universal_newlines=True,
|
||||
)
|
||||
assert "Stream #0:0: Audio: mp3" in res.stderr
|
||||
|
||||
match = re.search(r"Duration: (\d{2}:\d{2}:\d{2})", res.stderr)
|
||||
assert match[1] == "00:01:40"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"channel_url,channel_id,name,avatar_url",
|
||||
[
|
||||
(
|
||||
youtube.channel_url_from_id(CHANNEL_ID_THETADEV),
|
||||
CHANNEL_ID_THETADEV,
|
||||
"ThetaDev",
|
||||
"https://yt3.ggpht.com/ytc/AKedOLSnFfmpibLLoqyaYdsF6bJ-zaLPzomII__FrJve1w=s900-c-k-c0x00ffffff-no-rj",
|
||||
),
|
||||
(
|
||||
CHANNEL_URL_BLENDER,
|
||||
CHANNEL_ID_BLENDER,
|
||||
"Blender",
|
||||
"https://yt3.ggpht.com/ytc/AKedOLT_31fFSD3FWEBnHZnyZeJx-GPHJwYCQKcEpaq8NQ=s900-c-k-c0x00ffffff-no-rj",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_channel_metadata(
|
||||
channel_url: str, channel_id: str, name: str, avatar_url: str
|
||||
):
|
||||
metadata = youtube.get_channel_metadata(channel_url)
|
||||
assert metadata.id == channel_id
|
||||
assert metadata.name == name
|
||||
assert metadata.avatar_url == avatar_url
|
||||
assert metadata.description
|