Compare commits

..

No commits in common. "4b6733b9b6148332fc42f6ece4c55fbb91bee4b5" and "8af98a44ae4f958bf7e556367ea2b1da7fe1bae8" have entirely different histories.

38 changed files with 252 additions and 496 deletions

View file

@ -7,47 +7,9 @@ platform:
arch: ''
steps:
- name: install dependencies
- name: Test
image: thetadev256/ucast-dev
volumes:
- name: cache
path: /root/.cache
commands:
- poetry install
- name: lint
image: thetadev256/ucast-dev
volumes:
- name: cache
path: /root/.cache
commands:
- poetry run invoke lint
- name: start worker
image: thetadev256/ucast-dev
volumes:
- name: cache
path: /root/.cache
environment:
UCAST_REDIS_HOST: redis
commands:
- poetry run invoke worker
detach: true
- name: test
image: thetadev256/ucast-dev
volumes:
- name: cache
path: /root/.cache
environment:
UCAST_REDIS_HOST: redis
commands:
- poetry run invoke test
services:
- name: redis
image: redis:alpine
volumes:
- name: cache
temp: { }

5
.gitignore vendored
View file

@ -14,6 +14,11 @@ node_modules
# Jupyter
.ipynb_checkpoints
# Media files
*.webm
*.mp4
*.mp3
# Application data
/_run*
*.sqlite3

View file

@ -44,9 +44,6 @@ honcho = "^1.1.0"
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.pytest.ini_options]
DJANGO_SETTINGS_MODULE = "ucast_project.settings"
[tool.flake8]
extend-ignore = "E501"

View file

@ -91,8 +91,8 @@ def get_cover(c, vid=""):
cv_file = tests.DIR_TESTFILES / "cover" / f"c{ti}_gradient.png"
cv_blur_file = tests.DIR_TESTFILES / "cover" / f"c{ti}_blur.png"
youtube.download_thumbnail(vinfo, tn_file)
util.download_image_file(channel_metadata.avatar_url, av_file)
tn_file = youtube.download_thumbnail(vinfo, tn_file)
util.download_file(channel_metadata.avatar_url, av_file)
cover.create_cover_file(
tn_file, av_file, title, channel_name, cover.COVER_STYLE_GRADIENT, cv_file

View file

@ -4,3 +4,8 @@ from django.apps import AppConfig
class UcastConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "ucast"
def ready(self):
from ucast.tasks import download
download.schedule_update_channels()

View file

@ -1,11 +0,0 @@
from django_rq.management.commands import rqscheduler
from ucast.tasks import schedule
class Command(rqscheduler.Command):
def handle(self, *args, **kwargs):
print("Starting ucast scheduler")
schedule.clear_scheduled_jobs()
schedule.register_scheduled_jobs()
super(Command, self).handle(*args, **kwargs)

View file

@ -197,9 +197,9 @@ def _get_baseimage(thumbnail: Image.Image, style: CoverStyle):
ctn_width = int(COVER_WIDTH / thumbnail.height * thumbnail.width)
ctn_x_left = int((ctn_width - COVER_WIDTH) / 2)
ctn = thumbnail.resize(
(ctn_width, COVER_WIDTH), Image.Resampling.LANCZOS
).filter(ImageFilter.GaussianBlur(20))
ctn = thumbnail.resize((ctn_width, COVER_WIDTH), Image.LANCZOS).filter(
ImageFilter.GaussianBlur(20)
)
cover.paste(ctn, (-ctn_x_left, 0))
return cover
@ -219,9 +219,9 @@ def _resize_thumbnail(thumbnail: Image.Image) -> Image.Image:
tn_crop_y_top = int((tn_resize_height - tn_height) / 2)
tn_crop_y_bottom = tn_resize_height - tn_crop_y_top
return thumbnail.resize(
(COVER_WIDTH, tn_resize_height), Image.Resampling.LANCZOS
).crop((0, tn_crop_y_top, COVER_WIDTH, tn_crop_y_bottom))
return thumbnail.resize((COVER_WIDTH, tn_resize_height), Image.LANCZOS).crop(
(0, tn_crop_y_top, COVER_WIDTH, tn_crop_y_bottom)
)
def _prepare_text_background(
@ -357,7 +357,7 @@ def _draw_text_avatar(
)
if avatar:
avt = avatar.resize((avt_size, avt_size), Image.Resampling.LANCZOS)
avt = avatar.resize((avt_size, avt_size), Image.LANCZOS)
circle_mask = Image.new("L", (avt_size, avt_size))
circle_mask_draw = ImageDraw.Draw(circle_mask)

View file

@ -1,11 +1,32 @@
import os
from pathlib import Path
from typing import Tuple
import slugify
from django.conf import settings
UCAST_DIRNAME = "_ucast"
def _get_slug(str_in: str) -> str:
return slugify.slugify(str_in, lowercase=False, separator="_")
def _get_unique_slug(str_in: str, root_dir: Path, extension="") -> Tuple[Path, str]:
original_slug = _get_slug(str_in)
slug = original_slug
i = 0
while True:
testfile = root_dir / (slug + extension)
if not testfile.exists():
return testfile, slug
i += 1
slug = f"{original_slug}_{i}"
class ChannelFolder:
def __init__(self, dir_root: Path):
self.dir_root = dir_root

View file

@ -1,4 +1,4 @@
import io
import shutil
from pathlib import Path
import requests
@ -15,45 +15,33 @@ def download_file(url: str, download_path: Path):
open(download_path, "wb").write(r.content)
def download_image_file(url: str, download_path: Path):
"""
Download an image and convert it to the type given
by the path.
:param url: Image URL
:param download_path: Download path
"""
r = requests.get(url, allow_redirects=True)
r.raise_for_status()
img = Image.open(io.BytesIO(r.content))
def download_image_file(url: str, download_path: Path) -> Path:
download_file(url, download_path)
img = Image.open(download_path)
img_ext = img.format.lower()
img.close()
if img_ext == "jpeg":
img_ext = "jpg"
if "." + img_ext == download_path.suffix:
open(download_path, "wb").write(r.content)
else:
img.save(download_path)
new_path = download_path.with_suffix("." + img_ext)
shutil.move(download_path, new_path)
return new_path
def resize_avatar(original_file: Path, new_file: Path):
avatar = Image.open(original_file)
avatar_new_height = int(AVATAR_SM_WIDTH / avatar.width * avatar.height)
avatar = avatar.resize(
(AVATAR_SM_WIDTH, avatar_new_height), Image.Resampling.LANCZOS
)
avatar = avatar.resize((AVATAR_SM_WIDTH, avatar_new_height), Image.LANCZOS)
avatar.save(new_file)
def resize_thumbnail(original_file: Path, new_file: Path):
thumbnail = Image.open(original_file)
tn_new_height = int(THUMBNAIL_SM_WIDTH / thumbnail.width * thumbnail.height)
thumbnail = thumbnail.resize(
(THUMBNAIL_SM_WIDTH, tn_new_height), Image.Resampling.LANCZOS
)
thumbnail = thumbnail.resize((THUMBNAIL_SM_WIDTH, tn_new_height), Image.LANCZOS)
thumbnail.save(new_file)
def get_slug(text: str) -> str:
return slugify.slugify(text, lowercase=False, separator="_")
def get_slug(str_in: str) -> str:
return slugify.slugify(str_in, lowercase=False, separator="_")

View file

@ -92,7 +92,7 @@ class ChannelMetadata:
avatar_url: str
def download_thumbnail(vinfo: VideoDetails, download_path: Path):
def download_thumbnail(vinfo: VideoDetails, download_path: Path) -> Path:
"""
Download the thumbnail image of a YouTube video and save it at the given filepath.
The thumbnail file ending is added to the path.
@ -108,8 +108,7 @@ def download_thumbnail(vinfo: VideoDetails, download_path: Path):
logging.info(f"downloading thumbnail {url}...")
try:
util.download_image_file(url, download_path)
return
return util.download_image_file(url, download_path)
except requests.HTTPError:
logging.warning(f"downloading thumbnail {url} failed")
pass
@ -159,20 +158,20 @@ def download_audio(
def tag_audio(audio_path: Path, vinfo: VideoDetails, cover_path: Path):
title_text = f"{vinfo.published.date().isoformat()} {vinfo.title}"
comment = f"https://youtu.be/{vinfo.id}\n\n{vinfo.description}"
tag = id3.ID3(audio_path)
tag["TPE1"] = id3.TPE1(encoding=3, text=vinfo.channel_name) # Artist
tag["TALB"] = id3.TALB(encoding=3, text=vinfo.channel_name) # Album
tag["TIT2"] = id3.TIT2(encoding=3, text=title_text) # Title
tag["TDRC"] = id3.TDRC(encoding=3, text=vinfo.published.date().isoformat()) # Date
tag["COMM"] = id3.COMM(encoding=3, text=comment) # Comment
audio = id3.ID3(audio_path)
audio["TPE1"] = id3.TPE1(encoding=3, text=vinfo.channel_name) # Artist
audio["TALB"] = id3.TALB(encoding=3, text=vinfo.channel_name) # Album
audio["TIT2"] = id3.TIT2(encoding=3, text=title_text) # Title
audio["TYER"] = id3.TYER(encoding=3, text=str(vinfo.published.year)) # Year
audio["TDAT"] = id3.TDAT(encoding=3, text=vinfo.published.strftime("%d%m")) # Date
audio["COMM"] = id3.COMM(encoding=3, text=f"YT-ID: {vinfo.id}") # Comment
with open(cover_path, "rb") as albumart:
tag["APIC"] = id3.APIC(
audio["APIC"] = id3.APIC(
encoding=3, mime="image/png", type=3, desc="Cover", data=albumart.read()
)
tag.save()
audio.save()
def channel_url_from_id(channel_id: str) -> str:
@ -232,6 +231,22 @@ def get_channel_metadata(channel_url: str) -> ChannelMetadata:
return ChannelMetadata(channel_id, name, description, avatar)
def download_avatar(avatar_url: str, download_path: Path) -> Path:
"""
Download the avatar image of a channel. The .jpg file ending
is added to the path.
:param avatar_url: Channel avatar URL
:param download_path: Download path
:return: Path with file ending
"""
logging.info(f"downloading avatar {avatar_url}...")
download_path = download_path.with_suffix(".jpg")
util.download_file(avatar_url, download_path)
return download_path
def get_channel_videos_from_feed(channel_id: str) -> List[VideoScraped]:
"""
Return videos of a channel using YouTube's RSS feed. Using the feed is fast,

View file

@ -1,6 +1,8 @@
import os
from datetime import datetime
import django_rq
from django.conf import settings
from django.utils import timezone
from ucast.models import Channel, Video
@ -19,8 +21,10 @@ def _get_or_create_channel(channel_id: str) -> Channel:
channel_slug = Channel.get_new_slug(channel_data.name)
channel_folder = store.get_channel_folder(channel_slug)
util.download_image_file(channel_data.avatar_url, channel_folder.file_avatar)
util.resize_avatar(channel_folder.file_avatar, channel_folder.file_avatar_sm)
avatar_file = youtube.download_avatar(
channel_data.avatar_url, channel_folder.file_avatar
)
util.resize_avatar(avatar_file, channel_folder.file_avatar_sm)
channel = Channel(
id=channel_id,
@ -77,8 +81,9 @@ def download_video(video: Video):
details = youtube.download_audio(video.id, audio_file)
# Download/convert thumbnails
tn_path = channel_folder.get_thumbnail(video.slug)
youtube.download_thumbnail(details, tn_path)
tn_path = youtube.download_thumbnail(
details, channel_folder.get_thumbnail(video.slug)
)
util.resize_thumbnail(tn_path, channel_folder.get_thumbnail(video.slug, True))
cover_file = channel_folder.get_cover(video.slug)
cover.create_cover_file(
@ -115,6 +120,15 @@ def import_channel(channel_id: str, limit: int = None):
_load_scraped_video(vid, channel)
def schedule_update_channels():
django_rq.get_scheduler().schedule(
datetime.utcnow(),
update_channels,
id="schedule_update_channels",
interval=settings.YT_UPDATE_INTERVAL,
)
def update_channels():
"""
Update all channels from their RSS feeds and download new videos.

View file

@ -1,27 +0,0 @@
import logging
from datetime import datetime
import django_rq
from django.conf import settings
from ucast.tasks import download
scheduler = django_rq.get_scheduler()
log = logging.getLogger(__name__)
def clear_scheduled_jobs():
"""Delete all scheduled jobs to prevent duplicates"""
for job in scheduler.get_jobs():
log.debug("Deleting scheduled job %s", job)
job.delete()
def register_scheduled_jobs():
"""Register all scheduled jobs"""
scheduler.schedule(
datetime.utcnow(),
download.update_channels,
id="schedule_update_channels",
interval=settings.YT_UPDATE_INTERVAL,
)

View file

@ -1,3 +1,3 @@
from importlib.resources import files
DIR_TESTFILES = files("ucast.tests._testfiles")
DIR_TESTFILES = files("ucast.tests.testfiles")

View file

@ -1,56 +0,0 @@
import os
import tempfile
from pathlib import Path
from ucast.service import storage
def test_create_channel_folders(settings):
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
settings.DOWNLOAD_ROOT = tmpdir
store = storage.Storage()
cf1 = store.get_channel_folder("ThetaDev")
cf2 = store.get_channel_folder("Jeff_Geerling")
cf1b = store.get_channel_folder("ThetaDev")
cf1_path = tmpdir / "ThetaDev"
cf2_path = tmpdir / "Jeff_Geerling"
assert cf1.dir_root == cf1_path
assert cf1b.dir_root == cf1_path
assert cf2.dir_root == cf2_path
assert os.path.isdir(cf1_path)
assert os.path.isdir(cf2_path)
def test_channel_folder():
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
ucast_dir = tmpdir / "_ucast"
cf = storage.ChannelFolder(tmpdir)
# Verify internal paths
assert cf.file_avatar == ucast_dir / "avatar.jpg"
assert cf.file_avatar_sm == ucast_dir / "avatar_sm.webp"
assert cf.dir_covers == ucast_dir / "covers"
assert cf.dir_thumbnails == ucast_dir / "thumbnails"
# Create the folder
assert not cf.does_exist()
cf.create()
assert cf.does_exist()
assert cf.get_cover("my_video_title") == ucast_dir / "covers" / "my_video_title.png"
assert (
cf.get_thumbnail("my_video_title")
== ucast_dir / "thumbnails" / "my_video_title.webp"
)
assert (
cf.get_thumbnail("my_video_title", True)
== ucast_dir / "thumbnails" / "my_video_title_sm.webp"
)
assert cf.get_audio("my_video_title") == tmpdir / "my_video_title.mp3"

View file

@ -1,92 +0,0 @@
import tempfile
from pathlib import Path
import pytest
from PIL import Image, ImageChops
from ucast import tests
from ucast.service import util
TEST_FILE_URL = "https://yt3.ggpht.com/ytc/AKedOLSnFfmpibLLoqyaYdsF6bJ-zaLPzomII__FrJve1w=s900-c-k-c0x00ffffff-no-rj"
def test_download_file():
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
download_file = tmpdir / "download.jpg"
expected_tn_file = tests.DIR_TESTFILES / "avatar" / "a1.jpg"
util.download_file(TEST_FILE_URL, download_file)
downloaded_avatar = Image.open(download_file)
expected_avatar = Image.open(expected_tn_file)
diff = ImageChops.difference(downloaded_avatar, expected_avatar)
assert diff.getbbox() is None
def test_download_image_file():
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
download_file = tmpdir / "download.jpg"
expected_tn_file = tests.DIR_TESTFILES / "avatar" / "a1.jpg"
util.download_image_file(TEST_FILE_URL, download_file)
downloaded_avatar = Image.open(download_file)
expected_avatar = Image.open(expected_tn_file)
diff = ImageChops.difference(downloaded_avatar, expected_avatar)
assert diff.getbbox() is None
def test_download_image_file_conv():
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
download_file = tmpdir / "download.png"
expected_tn_file = tests.DIR_TESTFILES / "avatar" / "a1.jpg"
util.download_image_file(TEST_FILE_URL, download_file)
downloaded_avatar = Image.open(download_file)
expected_avatar = Image.open(expected_tn_file)
diff = ImageChops.difference(downloaded_avatar, expected_avatar)
assert diff.getbbox() is None
def test_resize_avatar():
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
source_file = tests.DIR_TESTFILES / "avatar" / "a1.jpg"
resized_file = tmpdir / "avatar.webp"
util.resize_avatar(source_file, resized_file)
resized_avatar = Image.open(resized_file)
assert resized_avatar.size == (100, 100)
def test_resize_thumbnail():
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
source_file = tests.DIR_TESTFILES / "thumbnail" / "t1.webp"
resized_file = tmpdir / "thumbnail.webp"
util.resize_thumbnail(source_file, resized_file)
resized_thumbnail = Image.open(resized_file)
assert resized_thumbnail.size == (360, 202)
@pytest.mark.parametrize(
"text,expected_slug",
[
("Hello World 👋", "Hello_World"),
("ÄäÖöÜüß", "AaOoUuss"),
("오징어 게임", "ojingeo_geim"),
],
)
def test_slug(text: str, expected_slug: str):
slug = util.get_slug(text)
assert slug == expected_slug

View file

@ -1,214 +0,0 @@
import datetime
import io
import re
import shutil
import subprocess
import tempfile
from pathlib import Path
import pytest
from mutagen import id3
from PIL import Image, ImageChops
from ucast import tests
from ucast.service import youtube
VIDEO_ID_THETADEV = "ZPxEr4YdWt8"
VIDEO_ID_SHORT = "lcQZ6YwQHiw"
VIDEO_ID_PERSUASION = "DWjFW7Yq1fA"
CHANNEL_ID_THETADEV = "UCGiJh0NZ52wRhYKYnuZI08Q"
CHANNEL_ID_BLENDER = "UCSMOQeBJ2RAnuFungnQOxLg"
CHANNEL_URL_BLENDER = "https://www.youtube.com/c/BlenderFoundation"
@pytest.fixture(scope="module")
def video_details() -> youtube.VideoDetails:
return youtube.get_video_details(VIDEO_ID_THETADEV)
def test_download_thumbnail(video_details):
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
tn_file = tmpdir / "thumbnail.webp"
expected_tn_file = tests.DIR_TESTFILES / "thumbnail" / "t1.webp"
youtube.download_thumbnail(video_details, tn_file)
tn = Image.open(tn_file)
expected_tn = Image.open(expected_tn_file)
diff = ImageChops.difference(tn, expected_tn)
assert diff.getbbox() is None
def test_get_video_details(video_details):
assert video_details.id == VIDEO_ID_THETADEV
assert video_details.title == "ThetaDev @ Embedded World 2019"
assert video_details.channel_id == "UCGiJh0NZ52wRhYKYnuZI08Q"
assert (
video_details.description
== """This february I spent one day at the Embedded World in Nuremberg. They showed tons of interesting electronics stuff, so I had to take some pictures and videos for you to see ;-)
Sorry for the late upload, I just didn't have time to edit my footage.
Embedded World: https://www.embedded-world.de/
My website: https://thdev.org
Twitter: https://twitter.com/Theta_Dev"""
)
assert video_details.duration == 267
assert not video_details.is_currently_live
assert not video_details.is_livestream
assert not video_details.is_short
assert video_details.published == datetime.datetime(
2019, 6, 2, tzinfo=datetime.timezone.utc
)
def test_get_video_details_short():
vinfo = youtube.get_video_details(VIDEO_ID_SHORT)
assert vinfo.id == VIDEO_ID_SHORT
assert (
vinfo.title
== "Small pink flowers | #shorts | Free Stock Video | \
creative commons short videos | creative #short"
)
assert not vinfo.is_currently_live
assert not vinfo.is_livestream
assert vinfo.is_short
def test_download_audio():
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
download_file = tmpdir / "download.mp3"
vinfo = youtube.download_audio(VIDEO_ID_PERSUASION, download_file)
assert vinfo.id == VIDEO_ID_PERSUASION
assert vinfo.title == "Persuasion (Instrumental) RYYZN (No Copyright Music)"
assert vinfo.duration == 100
# Check with ffmpeg if the audio file is valid
res = subprocess.run(
["ffmpeg", "-i", str(download_file)],
capture_output=True,
universal_newlines=True,
)
assert "Stream #0:0: Audio: mp3" in res.stderr
match = re.search(r"Duration: (\d{2}:\d{2}:\d{2})", res.stderr)
assert match[1] == "00:01:40"
def test_tag_audio(video_details):
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
audio_file = tmpdir / "audio.mp3"
cover_file = tests.DIR_TESTFILES / "cover" / "c1_blur.png"
shutil.copyfile(tests.DIR_TESTFILES / "audio" / "audio1.mp3", audio_file)
youtube.tag_audio(audio_file, video_details, cover_file)
tag = id3.ID3(audio_file)
assert tag["TPE1"].text[0] == "ThetaDev"
assert tag["TALB"].text[0] == "ThetaDev"
assert tag["TIT2"].text[0] == "2019-06-02 ThetaDev @ Embedded World 2019"
assert tag["TDRC"].text[0].text == "2019-06-02"
assert (
tag["COMM::XXX"].text[0]
== """https://youtu.be/ZPxEr4YdWt8
This february I spent one day at the Embedded World in Nuremberg. They showed tons of interesting electronics stuff, so I had to take some pictures and videos for you to see ;-)
Sorry for the late upload, I just didn't have time to edit my footage.
Embedded World: https://www.embedded-world.de/
My website: https://thdev.org
Twitter: https://twitter.com/Theta_Dev"""
)
tag_cover = tag["APIC:Cover"]
assert tag_cover.mime == "image/png"
tag_cover_img = Image.open(io.BytesIO(tag_cover.data))
expected_cover_img = Image.open(cover_file)
diff = ImageChops.difference(tag_cover_img, expected_cover_img)
assert diff.getbbox() is None
@pytest.mark.parametrize(
"channel_str,channel_url",
[
(
"https://www.youtube.com/channel/UCGiJh0NZ52wRhYKYnuZI08Q",
"https://www.youtube.com/channel/UCGiJh0NZ52wRhYKYnuZI08Q",
),
(
"https://www.youtube.com/c/MrBeast6000",
"https://www.youtube.com/c/MrBeast6000",
),
(
"https://www.youtube.com/user/LinusTechTips",
"https://www.youtube.com/user/LinusTechTips",
),
(
"UCGiJh0NZ52wRhYKYnuZI08Q",
"https://www.youtube.com/channel/UCGiJh0NZ52wRhYKYnuZI08Q",
),
(
"https://piped.mha.fi/user/LinusTechTips",
"https://www.youtube.com/user/LinusTechTips",
),
],
)
def test_channel_url_from_str(channel_str: str, channel_url: str):
url = youtube.channel_url_from_str(channel_str)
assert url == channel_url
@pytest.mark.parametrize(
"channel_url,channel_id,name,avatar_url",
[
(
youtube.channel_url_from_id(CHANNEL_ID_THETADEV),
CHANNEL_ID_THETADEV,
"ThetaDev",
"https://yt3.ggpht.com/ytc/AKedOLSnFfmpibLLoqyaYdsF6bJ-zaLPzomII__FrJve1w=s900-c-k-c0x00ffffff-no-rj",
),
(
CHANNEL_URL_BLENDER,
CHANNEL_ID_BLENDER,
"Blender",
"https://yt3.ggpht.com/ytc/AKedOLT_31fFSD3FWEBnHZnyZeJx-GPHJwYCQKcEpaq8NQ=s900-c-k-c0x00ffffff-no-rj",
),
],
)
def test_channel_metadata(
channel_url: str, channel_id: str, name: str, avatar_url: str
):
metadata = youtube.get_channel_metadata(channel_url)
assert metadata.id == channel_id
assert metadata.name == name
assert metadata.avatar_url == avatar_url
assert metadata.description
def test_get_channel_videos_from_feed():
videos = youtube.get_channel_videos_from_feed(CHANNEL_ID_THETADEV)
assert videos
v1 = videos[0]
assert len(v1.id) == 11
assert v1.published.tzinfo == datetime.timezone.utc
assert v1.published.second > 0 or v1.published.minute > 0 or v1.published.hour > 0
def test_get_channel_videos_from_scraper():
videos = youtube.get_channel_videos_from_scraper(CHANNEL_ID_THETADEV)
assert videos
v1 = videos[0]
assert len(v1.id) == 11
assert v1.published is None

24
ucast/tests/test_util.py Normal file
View file

@ -0,0 +1,24 @@
import tempfile
from pathlib import Path
from PIL import Image, ImageChops
from ucast import tests
from ucast.service import util
TEST_FILE_URL = "https://yt3.ggpht.com/ytc/AKedOLSnFfmpibLLoqyaYdsF6bJ-zaLPzomII__FrJve1w=s900-c-k-c0x00ffffff-no-rj"
def test_download_file():
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
download_file = tmpdir / "download.jpg"
expected_tn_file = tests.DIR_TESTFILES / "avatar" / "a1.jpg"
util.download_file(TEST_FILE_URL, download_file)
downloaded_avatar = Image.open(download_file)
expected_avatar = Image.open(expected_tn_file)
diff = ImageChops.difference(downloaded_avatar, expected_avatar)
assert diff.getbbox() is None

126
ucast/tests/test_youtube.py Normal file
View file

@ -0,0 +1,126 @@
import datetime
import re
import subprocess
import tempfile
from pathlib import Path
import pytest
from PIL import Image, ImageChops
from ucast import tests
from ucast.service import youtube
VIDEO_ID_THETADEV = "ZPxEr4YdWt8"
VIDEO_ID_SHORT = "lcQZ6YwQHiw"
VIDEO_ID_PERSUASION = "DWjFW7Yq1fA"
CHANNEL_ID_THETADEV = "UCGiJh0NZ52wRhYKYnuZI08Q"
CHANNEL_ID_BLENDER = "UCSMOQeBJ2RAnuFungnQOxLg"
CHANNEL_URL_BLENDER = "https://www.youtube.com/c/BlenderFoundation"
@pytest.fixture(scope="module")
def video_info() -> youtube.VideoDetails:
return youtube.get_video_details(VIDEO_ID_THETADEV)
def test_download_thumbnail(video_info):
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
tn_file = tmpdir / "thumbnail"
expected_tn_file = tests.DIR_TESTFILES / "thumbnail" / "t1.webp"
tn_file = youtube.download_thumbnail(video_info, tn_file)
assert tn_file.suffix == ".webp"
tn = Image.open(tn_file)
expected_tn = Image.open(expected_tn_file)
diff = ImageChops.difference(tn, expected_tn)
assert diff.getbbox() is None
def test_get_video_info(video_info):
assert video_info.id == VIDEO_ID_THETADEV
assert video_info.title == "ThetaDev @ Embedded World 2019"
assert video_info.channel_id == "UCGiJh0NZ52wRhYKYnuZI08Q"
assert (
video_info.description
== """This february I spent one day at the Embedded World in Nuremberg. They showed tons of interesting electronics stuff, so I had to take some pictures and videos for you to see ;-)
Sorry for the late upload, I just didn't have time to edit my footage.
Embedded World: https://www.embedded-world.de/
My website: https://thdev.org
Twitter: https://twitter.com/Theta_Dev"""
)
assert video_info.duration == 267
assert not video_info.is_currently_live
assert not video_info.is_livestream
assert not video_info.is_short
assert video_info.published == datetime.datetime(
2019, 6, 2, tzinfo=datetime.timezone.utc
)
def test_get_video_info_short():
vinfo = youtube.get_video_details(VIDEO_ID_SHORT)
assert vinfo.id == VIDEO_ID_SHORT
assert (
vinfo.title
== "Small pink flowers | #shorts | Free Stock Video | \
creative commons short videos | creative #short"
)
assert not vinfo.is_currently_live
assert not vinfo.is_livestream
assert vinfo.is_short
def test_download_video():
tmpdir_o = tempfile.TemporaryDirectory()
tmpdir = Path(tmpdir_o.name)
download_file = tmpdir / "download.mp3"
vinfo = youtube.download_audio(VIDEO_ID_PERSUASION, download_file)
assert vinfo.id == VIDEO_ID_PERSUASION
assert vinfo.title == "Persuasion (Instrumental) RYYZN (No Copyright Music)"
assert vinfo.duration == 100
# Check with ffmpeg if the audio file is valid
res = subprocess.run(
["ffmpeg", "-i", str(download_file)],
capture_output=True,
universal_newlines=True,
)
assert "Stream #0:0: Audio: mp3" in res.stderr
match = re.search(r"Duration: (\d{2}:\d{2}:\d{2})", res.stderr)
assert match[1] == "00:01:40"
@pytest.mark.parametrize(
"channel_url,channel_id,name,avatar_url",
[
(
youtube.channel_url_from_id(CHANNEL_ID_THETADEV),
CHANNEL_ID_THETADEV,
"ThetaDev",
"https://yt3.ggpht.com/ytc/AKedOLSnFfmpibLLoqyaYdsF6bJ-zaLPzomII__FrJve1w=s900-c-k-c0x00ffffff-no-rj",
),
(
CHANNEL_URL_BLENDER,
CHANNEL_ID_BLENDER,
"Blender",
"https://yt3.ggpht.com/ytc/AKedOLT_31fFSD3FWEBnHZnyZeJx-GPHJwYCQKcEpaq8NQ=s900-c-k-c0x00ffffff-no-rj",
),
],
)
def test_channel_metadata(
channel_url: str, channel_id: str, name: str, avatar_url: str
):
metadata = youtube.get_channel_metadata(channel_url)
assert metadata.id == channel_id
assert metadata.name == name
assert metadata.avatar_url == avatar_url
assert metadata.description

View file

Before

Width:  |  Height:  |  Size: 186 KiB

After

Width:  |  Height:  |  Size: 186 KiB

View file

Before

Width:  |  Height:  |  Size: 32 KiB

After

Width:  |  Height:  |  Size: 32 KiB

View file

Before

Width:  |  Height:  |  Size: 53 KiB

After

Width:  |  Height:  |  Size: 53 KiB

View file

Before

Width:  |  Height:  |  Size: 26 KiB

After

Width:  |  Height:  |  Size: 26 KiB

View file

Before

Width:  |  Height:  |  Size: 268 KiB

After

Width:  |  Height:  |  Size: 268 KiB

View file

Before

Width:  |  Height:  |  Size: 234 KiB

After

Width:  |  Height:  |  Size: 234 KiB

View file

Before

Width:  |  Height:  |  Size: 218 KiB

After

Width:  |  Height:  |  Size: 218 KiB

View file

Before

Width:  |  Height:  |  Size: 215 KiB

After

Width:  |  Height:  |  Size: 215 KiB

View file

Before

Width:  |  Height:  |  Size: 183 KiB

After

Width:  |  Height:  |  Size: 183 KiB

View file

Before

Width:  |  Height:  |  Size: 216 KiB

After

Width:  |  Height:  |  Size: 216 KiB

View file

Before

Width:  |  Height:  |  Size: 173 KiB

After

Width:  |  Height:  |  Size: 173 KiB

View file

@ -1,9 +1,8 @@
### Quellen der Thumbnails/Avatarbilder/Audiodateien zum Testen
### Quellen der Thumbnails/Avatarbilder zum Testen
- a1/t1: [ThetaDev @ Embedded World 2019](https://www.youtube.com/watch?v=ZPxEr4YdWt8), by [ThetaDev](https://www.youtube.com/channel/UCGiJh0NZ52wRhYKYnuZI08Q) (CC-BY)
- a2/t2: [Sintel - Open Movie by Blender Foundation](https://www.youtube.com/watch?v=eRsGyueVLvQ), by [Blender](https://www.youtube.com/c/BlenderFoundation) (CC-BY)
- a3/t3: [Systemabsturz Teaser zur DiVOC bb3](https://www.youtube.com/watch?v=uFqgQ35wyYY), by [media.ccc.de](https://www.youtube.com/channel/UC2TXq_t06Hjdr2g_KdKpHQg) (CC-BY)
- audio1: [No copyright intro free fire intro](https://www.youtube.com/watch?v=I0RRENheeTo), by [Shahzaib Hassan](https://www.youtube.com/channel/UCmLTTbctUZobNQrr8RtX8uQ), (CC-BY)
### Weitere Testvideos

View file

Before

Width:  |  Height:  |  Size: 92 KiB

After

Width:  |  Height:  |  Size: 92 KiB

View file

Before

Width:  |  Height:  |  Size: 20 KiB

After

Width:  |  Height:  |  Size: 20 KiB

View file

Before

Width:  |  Height:  |  Size: 28 KiB

After

Width:  |  Height:  |  Size: 28 KiB