Compare commits

...

2 commits

Author SHA1 Message Date
4b6733b9b6 add tests for yt, storage, util
All checks were successful
continuous-integration/drone/push Build is passing
2022-05-21 03:10:16 +02:00
28cb58356e fixed scheduler init, add redis to CI 2022-05-21 00:05:41 +02:00
38 changed files with 496 additions and 252 deletions

View file

@ -7,9 +7,47 @@ platform:
arch: ''
steps:
- name: Test
- name: install dependencies
image: thetadev256/ucast-dev
volumes:
- name: cache
path: /root/.cache
commands:
- poetry install
- name: lint
image: thetadev256/ucast-dev
volumes:
- name: cache
path: /root/.cache
commands:
- poetry run invoke lint
- name: start worker
image: thetadev256/ucast-dev
volumes:
- name: cache
path: /root/.cache
environment:
UCAST_REDIS_HOST: redis
commands:
- poetry run invoke worker
detach: true
- name: test
image: thetadev256/ucast-dev
volumes:
- name: cache
path: /root/.cache
environment:
UCAST_REDIS_HOST: redis
commands:
- poetry run invoke test
services:
- name: redis
image: redis:alpine
volumes:
- name: cache
temp: { }

5
.gitignore vendored
View file

@ -14,11 +14,6 @@ node_modules
# Jupyter
.ipynb_checkpoints
# Media files
*.webm
*.mp4
*.mp3
# Application data
/_run*
*.sqlite3

View file

@ -44,6 +44,9 @@ honcho = "^1.1.0"
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.pytest.ini_options]
DJANGO_SETTINGS_MODULE = "ucast_project.settings"
[tool.flake8]
extend-ignore = "E501"

View file

@ -91,8 +91,8 @@ def get_cover(c, vid=""):
cv_file = tests.DIR_TESTFILES / "cover" / f"c{ti}_gradient.png"
cv_blur_file = tests.DIR_TESTFILES / "cover" / f"c{ti}_blur.png"
tn_file = youtube.download_thumbnail(vinfo, tn_file)
util.download_file(channel_metadata.avatar_url, av_file)
youtube.download_thumbnail(vinfo, tn_file)
util.download_image_file(channel_metadata.avatar_url, av_file)
cover.create_cover_file(
tn_file, av_file, title, channel_name, cover.COVER_STYLE_GRADIENT, cv_file

View file

@ -4,8 +4,3 @@ from django.apps import AppConfig
class UcastConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "ucast"
def ready(self):
from ucast.tasks import download
download.schedule_update_channels()

View file

View file

View file

@ -0,0 +1,11 @@
from django_rq.management.commands import rqscheduler
from ucast.tasks import schedule
class Command(rqscheduler.Command):
def handle(self, *args, **kwargs):
print("Starting ucast scheduler")
schedule.clear_scheduled_jobs()
schedule.register_scheduled_jobs()
super(Command, self).handle(*args, **kwargs)

View file

@ -197,9 +197,9 @@ def _get_baseimage(thumbnail: Image.Image, style: CoverStyle):
ctn_width = int(COVER_WIDTH / thumbnail.height * thumbnail.width)
ctn_x_left = int((ctn_width - COVER_WIDTH) / 2)
ctn = thumbnail.resize((ctn_width, COVER_WIDTH), Image.LANCZOS).filter(
ImageFilter.GaussianBlur(20)
)
ctn = thumbnail.resize(
(ctn_width, COVER_WIDTH), Image.Resampling.LANCZOS
).filter(ImageFilter.GaussianBlur(20))
cover.paste(ctn, (-ctn_x_left, 0))
return cover
@ -219,9 +219,9 @@ def _resize_thumbnail(thumbnail: Image.Image) -> Image.Image:
tn_crop_y_top = int((tn_resize_height - tn_height) / 2)
tn_crop_y_bottom = tn_resize_height - tn_crop_y_top
return thumbnail.resize((COVER_WIDTH, tn_resize_height), Image.LANCZOS).crop(
(0, tn_crop_y_top, COVER_WIDTH, tn_crop_y_bottom)
)
return thumbnail.resize(
(COVER_WIDTH, tn_resize_height), Image.Resampling.LANCZOS
).crop((0, tn_crop_y_top, COVER_WIDTH, tn_crop_y_bottom))
def _prepare_text_background(
@ -357,7 +357,7 @@ def _draw_text_avatar(
)
if avatar:
avt = avatar.resize((avt_size, avt_size), Image.LANCZOS)
avt = avatar.resize((avt_size, avt_size), Image.Resampling.LANCZOS)
circle_mask = Image.new("L", (avt_size, avt_size))
circle_mask_draw = ImageDraw.Draw(circle_mask)

View file

@ -1,32 +1,11 @@
import os
from pathlib import Path
from typing import Tuple
import slugify
from django.conf import settings
UCAST_DIRNAME = "_ucast"
def _get_slug(str_in: str) -> str:
return slugify.slugify(str_in, lowercase=False, separator="_")
def _get_unique_slug(str_in: str, root_dir: Path, extension="") -> Tuple[Path, str]:
original_slug = _get_slug(str_in)
slug = original_slug
i = 0
while True:
testfile = root_dir / (slug + extension)
if not testfile.exists():
return testfile, slug
i += 1
slug = f"{original_slug}_{i}"
class ChannelFolder:
def __init__(self, dir_root: Path):
self.dir_root = dir_root

View file

@ -1,4 +1,4 @@
import shutil
import io
from pathlib import Path
import requests
@ -15,33 +15,45 @@ def download_file(url: str, download_path: Path):
open(download_path, "wb").write(r.content)
def download_image_file(url: str, download_path: Path) -> Path:
download_file(url, download_path)
img = Image.open(download_path)
img_ext = img.format.lower()
img.close()
def download_image_file(url: str, download_path: Path):
"""
Download an image and convert it to the type given
by the path.
:param url: Image URL
:param download_path: Download path
"""
r = requests.get(url, allow_redirects=True)
r.raise_for_status()
img = Image.open(io.BytesIO(r.content))
img_ext = img.format.lower()
if img_ext == "jpeg":
img_ext = "jpg"
new_path = download_path.with_suffix("." + img_ext)
shutil.move(download_path, new_path)
return new_path
if "." + img_ext == download_path.suffix:
open(download_path, "wb").write(r.content)
else:
img.save(download_path)
def resize_avatar(original_file: Path, new_file: Path):
avatar = Image.open(original_file)
avatar_new_height = int(AVATAR_SM_WIDTH / avatar.width * avatar.height)
avatar = avatar.resize((AVATAR_SM_WIDTH, avatar_new_height), Image.LANCZOS)
avatar = avatar.resize(
(AVATAR_SM_WIDTH, avatar_new_height), Image.Resampling.LANCZOS
)
avatar.save(new_file)
def resize_thumbnail(original_file: Path, new_file: Path):
thumbnail = Image.open(original_file)
tn_new_height = int(THUMBNAIL_SM_WIDTH / thumbnail.width * thumbnail.height)
thumbnail = thumbnail.resize((THUMBNAIL_SM_WIDTH, tn_new_height), Image.LANCZOS)
thumbnail = thumbnail.resize(
(THUMBNAIL_SM_WIDTH, tn_new_height), Image.Resampling.LANCZOS
)
thumbnail.save(new_file)
def get_slug(str_in: str) -> str:
return slugify.slugify(str_in, lowercase=False, separator="_")
def get_slug(text: str) -> str:
return slugify.slugify(text, lowercase=False, separator="_")

View file

@ -92,7 +92,7 @@ class ChannelMetadata:
avatar_url: str
def download_thumbnail(vinfo: VideoDetails, download_path: Path) -> Path:
def download_thumbnail(vinfo: VideoDetails, download_path: Path):
"""
Download the thumbnail image of a YouTube video and save it at the given filepath.
The thumbnail file ending is added to the path.
@ -108,7 +108,8 @@ def download_thumbnail(vinfo: VideoDetails, download_path: Path) -> Path:
logging.info(f"downloading thumbnail {url}...")
try:
return util.download_image_file(url, download_path)
util.download_image_file(url, download_path)
return
except requests.HTTPError:
logging.warning(f"downloading thumbnail {url} failed")
pass
@ -158,20 +159,20 @@ def download_audio(
def tag_audio(audio_path: Path, vinfo: VideoDetails, cover_path: Path):
title_text = f"{vinfo.published.date().isoformat()} {vinfo.title}"
comment = f"https://youtu.be/{vinfo.id}\n\n{vinfo.description}"
audio = id3.ID3(audio_path)
audio["TPE1"] = id3.TPE1(encoding=3, text=vinfo.channel_name) # Artist
audio["TALB"] = id3.TALB(encoding=3, text=vinfo.channel_name) # Album
audio["TIT2"] = id3.TIT2(encoding=3, text=title_text) # Title
audio["TYER"] = id3.TYER(encoding=3, text=str(vinfo.published.year)) # Year
audio["TDAT"] = id3.TDAT(encoding=3, text=vinfo.published.strftime("%d%m")) # Date
audio["COMM"] = id3.COMM(encoding=3, text=f"YT-ID: {vinfo.id}") # Comment
tag = id3.ID3(audio_path)
tag["TPE1"] = id3.TPE1(encoding=3, text=vinfo.channel_name) # Artist
tag["TALB"] = id3.TALB(encoding=3, text=vinfo.channel_name) # Album
tag["TIT2"] = id3.TIT2(encoding=3, text=title_text) # Title
tag["TDRC"] = id3.TDRC(encoding=3, text=vinfo.published.date().isoformat()) # Date
tag["COMM"] = id3.COMM(encoding=3, text=comment) # Comment
with open(cover_path, "rb") as albumart:
audio["APIC"] = id3.APIC(
tag["APIC"] = id3.APIC(
encoding=3, mime="image/png", type=3, desc="Cover", data=albumart.read()
)
audio.save()
tag.save()
def channel_url_from_id(channel_id: str) -> str:
@ -231,22 +232,6 @@ def get_channel_metadata(channel_url: str) -> ChannelMetadata:
return ChannelMetadata(channel_id, name, description, avatar)
def download_avatar(avatar_url: str, download_path: Path) -> Path:
"""
Download the avatar image of a channel. The .jpg file ending
is added to the path.
:param avatar_url: Channel avatar URL
:param download_path: Download path
:return: Path with file ending
"""
logging.info(f"downloading avatar {avatar_url}...")
download_path = download_path.with_suffix(".jpg")
util.download_file(avatar_url, download_path)
return download_path
def get_channel_videos_from_feed(channel_id: str) -> List[VideoScraped]:
"""
Return videos of a channel using YouTube's RSS feed. Using the feed is fast,

View file

@ -1,8 +1,6 @@
import os
from datetime import datetime
import django_rq
from django.conf import settings
from django.utils import timezone
from ucast.models import Channel, Video
@ -21,10 +19,8 @@ def _get_or_create_channel(channel_id: str) -> Channel:
channel_slug = Channel.get_new_slug(channel_data.name)
channel_folder = store.get_channel_folder(channel_slug)
avatar_file = youtube.download_avatar(
channel_data.avatar_url, channel_folder.file_avatar
)
util.resize_avatar(avatar_file, channel_folder.file_avatar_sm)
util.download_image_file(channel_data.avatar_url, channel_folder.file_avatar)
util.resize_avatar(channel_folder.file_avatar, channel_folder.file_avatar_sm)
channel = Channel(
id=channel_id,
@ -81,9 +77,8 @@ def download_video(video: Video):
details = youtube.download_audio(video.id, audio_file)
# Download/convert thumbnails
tn_path = youtube.download_thumbnail(
details, channel_folder.get_thumbnail(video.slug)
)
tn_path = channel_folder.get_thumbnail(video.slug)
youtube.download_thumbnail(details, tn_path)
util.resize_thumbnail(tn_path, channel_folder.get_thumbnail(video.slug, True))
cover_file = channel_folder.get_cover(video.slug)
cover.create_cover_file(
@ -120,15 +115,6 @@ def import_channel(channel_id: str, limit: int = None):
_load_scraped_video(vid, channel)
def schedule_update_channels():
django_rq.get_scheduler().schedule(
datetime.utcnow(),
update_channels,
id="schedule_update_channels",
interval=settings.YT_UPDATE_INTERVAL,
)
def update_channels():
"""
Update all channels from their RSS feeds and download new videos.

27
ucast/tasks/schedule.py Normal file
View file

@ -0,0 +1,27 @@
import logging
from datetime import datetime
import django_rq
from django.conf import settings
from ucast.tasks import download
scheduler = django_rq.get_scheduler()
log = logging.getLogger(__name__)
def clear_scheduled_jobs():
"""Delete all scheduled jobs to prevent duplicates"""
for job in scheduler.get_jobs():
log.debug("Deleting scheduled job %s", job)
job.delete()
def register_scheduled_jobs():
"""Register all scheduled jobs"""
scheduler.schedule(
datetime.utcnow(),
download.update_channels,
id="schedule_update_channels",
interval=settings.YT_UPDATE_INTERVAL,
)

View file

@ -1,3 +1,3 @@
from importlib.resources import files
DIR_TESTFILES = files("ucast.tests.testfiles")
DIR_TESTFILES = files("ucast.tests._testfiles")

Binary file not shown.

View file

Before

Width:  |  Height:  |  Size: 186 KiB

After

Width:  |  Height:  |  Size: 186 KiB

View file

Before

Width:  |  Height:  |  Size: 32 KiB

After

Width:  |  Height:  |  Size: 32 KiB

View file

Before

Width:  |  Height:  |  Size: 53 KiB

After

Width:  |  Height:  |  Size: 53 KiB

View file

Before

Width:  |  Height:  |  Size: 26 KiB

After

Width:  |  Height:  |  Size: 26 KiB

View file

Before

Width:  |  Height:  |  Size: 268 KiB

After

Width:  |  Height:  |  Size: 268 KiB

View file

Before

Width:  |  Height:  |  Size: 234 KiB

After

Width:  |  Height:  |  Size: 234 KiB

View file

Before

Width:  |  Height:  |  Size: 218 KiB

After

Width:  |  Height:  |  Size: 218 KiB

View file

Before

Width:  |  Height:  |  Size: 215 KiB

After

Width:  |  Height:  |  Size: 215 KiB

View file

Before

Width:  |  Height:  |  Size: 183 KiB

After

Width:  |  Height:  |  Size: 183 KiB

View file

Before

Width:  |  Height:  |  Size: 216 KiB

After

Width:  |  Height:  |  Size: 216 KiB

View file

Before

Width:  |  Height:  |  Size: 173 KiB

After

Width:  |  Height:  |  Size: 173 KiB

View file

@ -1,10 +1,11 @@
### Quellen der Thumbnails/Avatarbilder zum Testen
### Quellen der Thumbnails/Avatarbilder/Audiodateien zum Testen
- a1/t1: [ThetaDev @ Embedded World 2019](https://www.youtube.com/watch?v=ZPxEr4YdWt8), by [ThetaDev](https://www.youtube.com/channel/UCGiJh0NZ52wRhYKYnuZI08Q) (CC-BY)
- a2/t2: [Sintel - Open Movie by Blender Foundation](https://www.youtube.com/watch?v=eRsGyueVLvQ), by [Blender](https://www.youtube.com/c/BlenderFoundation) (CC-BY)
- a3/t3: [Systemabsturz Teaser zur DiVOC bb3](https://www.youtube.com/watch?v=uFqgQ35wyYY), by [media.ccc.de](https://www.youtube.com/channel/UC2TXq_t06Hjdr2g_KdKpHQg) (CC-BY)
- audio1: [No copyright intro free fire intro](https://www.youtube.com/watch?v=I0RRENheeTo), by [Shahzaib Hassan](https://www.youtube.com/channel/UCmLTTbctUZobNQrr8RtX8uQ), (CC-BY)
### Weitere Testvideos
- [Persuasion (Instrumental) RYYZN (No Copyright Music)](https://www.youtube.com/watch?v=DWjFW7Yq1fA), by [RYYZN](https://soundcloud.com/ryyzn) (CC-BY)
- [Small pink flowers | #shorts | Free Stock Video](https://www.youtube.com/watch?v=lcQZ6YwQHiw), by [Shahzaib Hassan](https://www.youtube.com/channel/UCmLTTbctUZobNQrr8RtX8uQ), (CC-BY)
- [Small pink flowers | #shorts | Free Stock Video](https://www.youtube.com/watch?v=lcQZ6YwQHiw), by [Shahzaib Hassan](https://www.youtube.com/channel/UCmLTTbctUZobNQrr8RtX8uQ), (CC-BY)

View file

Before

Width:  |  Height:  |  Size: 92 KiB

After

Width:  |  Height:  |  Size: 92 KiB

View file

Before

Width:  |  Height:  |  Size: 20 KiB

After

Width:  |  Height:  |  Size: 20 KiB

View file

Before

Width:  |  Height:  |  Size: 28 KiB

After

Width:  |  Height:  |  Size: 28 KiB

View file

View file

@ -0,0 +1,56 @@
import os
import tempfile
from pathlib import Path
from ucast.service import storage
def test_create_channel_folders(settings):
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
settings.DOWNLOAD_ROOT = tmpdir
store = storage.Storage()
cf1 = store.get_channel_folder("ThetaDev")
cf2 = store.get_channel_folder("Jeff_Geerling")
cf1b = store.get_channel_folder("ThetaDev")
cf1_path = tmpdir / "ThetaDev"
cf2_path = tmpdir / "Jeff_Geerling"
assert cf1.dir_root == cf1_path
assert cf1b.dir_root == cf1_path
assert cf2.dir_root == cf2_path
assert os.path.isdir(cf1_path)
assert os.path.isdir(cf2_path)
def test_channel_folder():
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
ucast_dir = tmpdir / "_ucast"
cf = storage.ChannelFolder(tmpdir)
# Verify internal paths
assert cf.file_avatar == ucast_dir / "avatar.jpg"
assert cf.file_avatar_sm == ucast_dir / "avatar_sm.webp"
assert cf.dir_covers == ucast_dir / "covers"
assert cf.dir_thumbnails == ucast_dir / "thumbnails"
# Create the folder
assert not cf.does_exist()
cf.create()
assert cf.does_exist()
assert cf.get_cover("my_video_title") == ucast_dir / "covers" / "my_video_title.png"
assert (
cf.get_thumbnail("my_video_title")
== ucast_dir / "thumbnails" / "my_video_title.webp"
)
assert (
cf.get_thumbnail("my_video_title", True)
== ucast_dir / "thumbnails" / "my_video_title_sm.webp"
)
assert cf.get_audio("my_video_title") == tmpdir / "my_video_title.mp3"

View file

@ -0,0 +1,92 @@
import tempfile
from pathlib import Path
import pytest
from PIL import Image, ImageChops
from ucast import tests
from ucast.service import util
TEST_FILE_URL = "https://yt3.ggpht.com/ytc/AKedOLSnFfmpibLLoqyaYdsF6bJ-zaLPzomII__FrJve1w=s900-c-k-c0x00ffffff-no-rj"
def test_download_file():
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
download_file = tmpdir / "download.jpg"
expected_tn_file = tests.DIR_TESTFILES / "avatar" / "a1.jpg"
util.download_file(TEST_FILE_URL, download_file)
downloaded_avatar = Image.open(download_file)
expected_avatar = Image.open(expected_tn_file)
diff = ImageChops.difference(downloaded_avatar, expected_avatar)
assert diff.getbbox() is None
def test_download_image_file():
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
download_file = tmpdir / "download.jpg"
expected_tn_file = tests.DIR_TESTFILES / "avatar" / "a1.jpg"
util.download_image_file(TEST_FILE_URL, download_file)
downloaded_avatar = Image.open(download_file)
expected_avatar = Image.open(expected_tn_file)
diff = ImageChops.difference(downloaded_avatar, expected_avatar)
assert diff.getbbox() is None
def test_download_image_file_conv():
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
download_file = tmpdir / "download.png"
expected_tn_file = tests.DIR_TESTFILES / "avatar" / "a1.jpg"
util.download_image_file(TEST_FILE_URL, download_file)
downloaded_avatar = Image.open(download_file)
expected_avatar = Image.open(expected_tn_file)
diff = ImageChops.difference(downloaded_avatar, expected_avatar)
assert diff.getbbox() is None
def test_resize_avatar():
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
source_file = tests.DIR_TESTFILES / "avatar" / "a1.jpg"
resized_file = tmpdir / "avatar.webp"
util.resize_avatar(source_file, resized_file)
resized_avatar = Image.open(resized_file)
assert resized_avatar.size == (100, 100)
def test_resize_thumbnail():
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
source_file = tests.DIR_TESTFILES / "thumbnail" / "t1.webp"
resized_file = tmpdir / "thumbnail.webp"
util.resize_thumbnail(source_file, resized_file)
resized_thumbnail = Image.open(resized_file)
assert resized_thumbnail.size == (360, 202)
@pytest.mark.parametrize(
"text,expected_slug",
[
("Hello World 👋", "Hello_World"),
("ÄäÖöÜüß", "AaOoUuss"),
("오징어 게임", "ojingeo_geim"),
],
)
def test_slug(text: str, expected_slug: str):
slug = util.get_slug(text)
assert slug == expected_slug

View file

@ -0,0 +1,214 @@
import datetime
import io
import re
import shutil
import subprocess
import tempfile
from pathlib import Path
import pytest
from mutagen import id3
from PIL import Image, ImageChops
from ucast import tests
from ucast.service import youtube
VIDEO_ID_THETADEV = "ZPxEr4YdWt8"
VIDEO_ID_SHORT = "lcQZ6YwQHiw"
VIDEO_ID_PERSUASION = "DWjFW7Yq1fA"
CHANNEL_ID_THETADEV = "UCGiJh0NZ52wRhYKYnuZI08Q"
CHANNEL_ID_BLENDER = "UCSMOQeBJ2RAnuFungnQOxLg"
CHANNEL_URL_BLENDER = "https://www.youtube.com/c/BlenderFoundation"
@pytest.fixture(scope="module")
def video_details() -> youtube.VideoDetails:
return youtube.get_video_details(VIDEO_ID_THETADEV)
def test_download_thumbnail(video_details):
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
tn_file = tmpdir / "thumbnail.webp"
expected_tn_file = tests.DIR_TESTFILES / "thumbnail" / "t1.webp"
youtube.download_thumbnail(video_details, tn_file)
tn = Image.open(tn_file)
expected_tn = Image.open(expected_tn_file)
diff = ImageChops.difference(tn, expected_tn)
assert diff.getbbox() is None
def test_get_video_details(video_details):
assert video_details.id == VIDEO_ID_THETADEV
assert video_details.title == "ThetaDev @ Embedded World 2019"
assert video_details.channel_id == "UCGiJh0NZ52wRhYKYnuZI08Q"
assert (
video_details.description
== """This february I spent one day at the Embedded World in Nuremberg. They showed tons of interesting electronics stuff, so I had to take some pictures and videos for you to see ;-)
Sorry for the late upload, I just didn't have time to edit my footage.
Embedded World: https://www.embedded-world.de/
My website: https://thdev.org
Twitter: https://twitter.com/Theta_Dev"""
)
assert video_details.duration == 267
assert not video_details.is_currently_live
assert not video_details.is_livestream
assert not video_details.is_short
assert video_details.published == datetime.datetime(
2019, 6, 2, tzinfo=datetime.timezone.utc
)
def test_get_video_details_short():
vinfo = youtube.get_video_details(VIDEO_ID_SHORT)
assert vinfo.id == VIDEO_ID_SHORT
assert (
vinfo.title
== "Small pink flowers | #shorts | Free Stock Video | \
creative commons short videos | creative #short"
)
assert not vinfo.is_currently_live
assert not vinfo.is_livestream
assert vinfo.is_short
def test_download_audio():
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
download_file = tmpdir / "download.mp3"
vinfo = youtube.download_audio(VIDEO_ID_PERSUASION, download_file)
assert vinfo.id == VIDEO_ID_PERSUASION
assert vinfo.title == "Persuasion (Instrumental) RYYZN (No Copyright Music)"
assert vinfo.duration == 100
# Check with ffmpeg if the audio file is valid
res = subprocess.run(
["ffmpeg", "-i", str(download_file)],
capture_output=True,
universal_newlines=True,
)
assert "Stream #0:0: Audio: mp3" in res.stderr
match = re.search(r"Duration: (\d{2}:\d{2}:\d{2})", res.stderr)
assert match[1] == "00:01:40"
def test_tag_audio(video_details):
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
audio_file = tmpdir / "audio.mp3"
cover_file = tests.DIR_TESTFILES / "cover" / "c1_blur.png"
shutil.copyfile(tests.DIR_TESTFILES / "audio" / "audio1.mp3", audio_file)
youtube.tag_audio(audio_file, video_details, cover_file)
tag = id3.ID3(audio_file)
assert tag["TPE1"].text[0] == "ThetaDev"
assert tag["TALB"].text[0] == "ThetaDev"
assert tag["TIT2"].text[0] == "2019-06-02 ThetaDev @ Embedded World 2019"
assert tag["TDRC"].text[0].text == "2019-06-02"
assert (
tag["COMM::XXX"].text[0]
== """https://youtu.be/ZPxEr4YdWt8
This february I spent one day at the Embedded World in Nuremberg. They showed tons of interesting electronics stuff, so I had to take some pictures and videos for you to see ;-)
Sorry for the late upload, I just didn't have time to edit my footage.
Embedded World: https://www.embedded-world.de/
My website: https://thdev.org
Twitter: https://twitter.com/Theta_Dev"""
)
tag_cover = tag["APIC:Cover"]
assert tag_cover.mime == "image/png"
tag_cover_img = Image.open(io.BytesIO(tag_cover.data))
expected_cover_img = Image.open(cover_file)
diff = ImageChops.difference(tag_cover_img, expected_cover_img)
assert diff.getbbox() is None
@pytest.mark.parametrize(
"channel_str,channel_url",
[
(
"https://www.youtube.com/channel/UCGiJh0NZ52wRhYKYnuZI08Q",
"https://www.youtube.com/channel/UCGiJh0NZ52wRhYKYnuZI08Q",
),
(
"https://www.youtube.com/c/MrBeast6000",
"https://www.youtube.com/c/MrBeast6000",
),
(
"https://www.youtube.com/user/LinusTechTips",
"https://www.youtube.com/user/LinusTechTips",
),
(
"UCGiJh0NZ52wRhYKYnuZI08Q",
"https://www.youtube.com/channel/UCGiJh0NZ52wRhYKYnuZI08Q",
),
(
"https://piped.mha.fi/user/LinusTechTips",
"https://www.youtube.com/user/LinusTechTips",
),
],
)
def test_channel_url_from_str(channel_str: str, channel_url: str):
url = youtube.channel_url_from_str(channel_str)
assert url == channel_url
@pytest.mark.parametrize(
"channel_url,channel_id,name,avatar_url",
[
(
youtube.channel_url_from_id(CHANNEL_ID_THETADEV),
CHANNEL_ID_THETADEV,
"ThetaDev",
"https://yt3.ggpht.com/ytc/AKedOLSnFfmpibLLoqyaYdsF6bJ-zaLPzomII__FrJve1w=s900-c-k-c0x00ffffff-no-rj",
),
(
CHANNEL_URL_BLENDER,
CHANNEL_ID_BLENDER,
"Blender",
"https://yt3.ggpht.com/ytc/AKedOLT_31fFSD3FWEBnHZnyZeJx-GPHJwYCQKcEpaq8NQ=s900-c-k-c0x00ffffff-no-rj",
),
],
)
def test_channel_metadata(
channel_url: str, channel_id: str, name: str, avatar_url: str
):
metadata = youtube.get_channel_metadata(channel_url)
assert metadata.id == channel_id
assert metadata.name == name
assert metadata.avatar_url == avatar_url
assert metadata.description
def test_get_channel_videos_from_feed():
videos = youtube.get_channel_videos_from_feed(CHANNEL_ID_THETADEV)
assert videos
v1 = videos[0]
assert len(v1.id) == 11
assert v1.published.tzinfo == datetime.timezone.utc
assert v1.published.second > 0 or v1.published.minute > 0 or v1.published.hour > 0
def test_get_channel_videos_from_scraper():
videos = youtube.get_channel_videos_from_scraper(CHANNEL_ID_THETADEV)
assert videos
v1 = videos[0]
assert len(v1.id) == 11
assert v1.published is None

View file

@ -1,24 +0,0 @@
import tempfile
from pathlib import Path
from PIL import Image, ImageChops
from ucast import tests
from ucast.service import util
TEST_FILE_URL = "https://yt3.ggpht.com/ytc/AKedOLSnFfmpibLLoqyaYdsF6bJ-zaLPzomII__FrJve1w=s900-c-k-c0x00ffffff-no-rj"
def test_download_file():
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
download_file = tmpdir / "download.jpg"
expected_tn_file = tests.DIR_TESTFILES / "avatar" / "a1.jpg"
util.download_file(TEST_FILE_URL, download_file)
downloaded_avatar = Image.open(download_file)
expected_avatar = Image.open(expected_tn_file)
diff = ImageChops.difference(downloaded_avatar, expected_avatar)
assert diff.getbbox() is None

View file

@ -1,126 +0,0 @@
import datetime
import re
import subprocess
import tempfile
from pathlib import Path
import pytest
from PIL import Image, ImageChops
from ucast import tests
from ucast.service import youtube
VIDEO_ID_THETADEV = "ZPxEr4YdWt8"
VIDEO_ID_SHORT = "lcQZ6YwQHiw"
VIDEO_ID_PERSUASION = "DWjFW7Yq1fA"
CHANNEL_ID_THETADEV = "UCGiJh0NZ52wRhYKYnuZI08Q"
CHANNEL_ID_BLENDER = "UCSMOQeBJ2RAnuFungnQOxLg"
CHANNEL_URL_BLENDER = "https://www.youtube.com/c/BlenderFoundation"
@pytest.fixture(scope="module")
def video_info() -> youtube.VideoDetails:
return youtube.get_video_details(VIDEO_ID_THETADEV)
def test_download_thumbnail(video_info):
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
tn_file = tmpdir / "thumbnail"
expected_tn_file = tests.DIR_TESTFILES / "thumbnail" / "t1.webp"
tn_file = youtube.download_thumbnail(video_info, tn_file)
assert tn_file.suffix == ".webp"
tn = Image.open(tn_file)
expected_tn = Image.open(expected_tn_file)
diff = ImageChops.difference(tn, expected_tn)
assert diff.getbbox() is None
def test_get_video_info(video_info):
assert video_info.id == VIDEO_ID_THETADEV
assert video_info.title == "ThetaDev @ Embedded World 2019"
assert video_info.channel_id == "UCGiJh0NZ52wRhYKYnuZI08Q"
assert (
video_info.description
== """This february I spent one day at the Embedded World in Nuremberg. They showed tons of interesting electronics stuff, so I had to take some pictures and videos for you to see ;-)
Sorry for the late upload, I just didn't have time to edit my footage.
Embedded World: https://www.embedded-world.de/
My website: https://thdev.org
Twitter: https://twitter.com/Theta_Dev"""
)
assert video_info.duration == 267
assert not video_info.is_currently_live
assert not video_info.is_livestream
assert not video_info.is_short
assert video_info.published == datetime.datetime(
2019, 6, 2, tzinfo=datetime.timezone.utc
)
def test_get_video_info_short():
vinfo = youtube.get_video_details(VIDEO_ID_SHORT)
assert vinfo.id == VIDEO_ID_SHORT
assert (
vinfo.title
== "Small pink flowers | #shorts | Free Stock Video | \
creative commons short videos | creative #short"
)
assert not vinfo.is_currently_live
assert not vinfo.is_livestream
assert vinfo.is_short
def test_download_video():
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
download_file = tmpdir / "download.mp3"
vinfo = youtube.download_audio(VIDEO_ID_PERSUASION, download_file)
assert vinfo.id == VIDEO_ID_PERSUASION
assert vinfo.title == "Persuasion (Instrumental) RYYZN (No Copyright Music)"
assert vinfo.duration == 100
# Check with ffmpeg if the audio file is valid
res = subprocess.run(
["ffmpeg", "-i", str(download_file)],
capture_output=True,
universal_newlines=True,
)
assert "Stream #0:0: Audio: mp3" in res.stderr
match = re.search(r"Duration: (\d{2}:\d{2}:\d{2})", res.stderr)
assert match[1] == "00:01:40"
@pytest.mark.parametrize(
"channel_url,channel_id,name,avatar_url",
[
(
youtube.channel_url_from_id(CHANNEL_ID_THETADEV),
CHANNEL_ID_THETADEV,
"ThetaDev",
"https://yt3.ggpht.com/ytc/AKedOLSnFfmpibLLoqyaYdsF6bJ-zaLPzomII__FrJve1w=s900-c-k-c0x00ffffff-no-rj",
),
(
CHANNEL_URL_BLENDER,
CHANNEL_ID_BLENDER,
"Blender",
"https://yt3.ggpht.com/ytc/AKedOLT_31fFSD3FWEBnHZnyZeJx-GPHJwYCQKcEpaq8NQ=s900-c-k-c0x00ffffff-no-rj",
),
],
)
def test_channel_metadata(
channel_url: str, channel_id: str, name: str, avatar_url: str
):
metadata = youtube.get_channel_metadata(channel_url)
assert metadata.id == channel_id
assert metadata.name == name
assert metadata.avatar_url == avatar_url
assert metadata.description