Compare commits
2 commits
8af98a44ae
...
4b6733b9b6
Author | SHA1 | Date | |
---|---|---|---|
4b6733b9b6 | |||
28cb58356e |
40
.drone.yml
|
@ -7,9 +7,47 @@ platform:
|
||||||
arch: ''
|
arch: ''
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- name: Test
|
- name: install dependencies
|
||||||
image: thetadev256/ucast-dev
|
image: thetadev256/ucast-dev
|
||||||
|
volumes:
|
||||||
|
- name: cache
|
||||||
|
path: /root/.cache
|
||||||
commands:
|
commands:
|
||||||
- poetry install
|
- poetry install
|
||||||
|
|
||||||
|
- name: lint
|
||||||
|
image: thetadev256/ucast-dev
|
||||||
|
volumes:
|
||||||
|
- name: cache
|
||||||
|
path: /root/.cache
|
||||||
|
commands:
|
||||||
- poetry run invoke lint
|
- poetry run invoke lint
|
||||||
|
|
||||||
|
- name: start worker
|
||||||
|
image: thetadev256/ucast-dev
|
||||||
|
volumes:
|
||||||
|
- name: cache
|
||||||
|
path: /root/.cache
|
||||||
|
environment:
|
||||||
|
UCAST_REDIS_HOST: redis
|
||||||
|
commands:
|
||||||
|
- poetry run invoke worker
|
||||||
|
detach: true
|
||||||
|
|
||||||
|
- name: test
|
||||||
|
image: thetadev256/ucast-dev
|
||||||
|
volumes:
|
||||||
|
- name: cache
|
||||||
|
path: /root/.cache
|
||||||
|
environment:
|
||||||
|
UCAST_REDIS_HOST: redis
|
||||||
|
commands:
|
||||||
- poetry run invoke test
|
- poetry run invoke test
|
||||||
|
|
||||||
|
services:
|
||||||
|
- name: redis
|
||||||
|
image: redis:alpine
|
||||||
|
|
||||||
|
volumes:
|
||||||
|
- name: cache
|
||||||
|
temp: { }
|
||||||
|
|
5
.gitignore
vendored
|
@ -14,11 +14,6 @@ node_modules
|
||||||
# Jupyter
|
# Jupyter
|
||||||
.ipynb_checkpoints
|
.ipynb_checkpoints
|
||||||
|
|
||||||
# Media files
|
|
||||||
*.webm
|
|
||||||
*.mp4
|
|
||||||
*.mp3
|
|
||||||
|
|
||||||
# Application data
|
# Application data
|
||||||
/_run*
|
/_run*
|
||||||
*.sqlite3
|
*.sqlite3
|
||||||
|
|
|
@ -44,6 +44,9 @@ honcho = "^1.1.0"
|
||||||
requires = ["poetry-core>=1.0.0"]
|
requires = ["poetry-core>=1.0.0"]
|
||||||
build-backend = "poetry.core.masonry.api"
|
build-backend = "poetry.core.masonry.api"
|
||||||
|
|
||||||
|
[tool.pytest.ini_options]
|
||||||
|
DJANGO_SETTINGS_MODULE = "ucast_project.settings"
|
||||||
|
|
||||||
[tool.flake8]
|
[tool.flake8]
|
||||||
extend-ignore = "E501"
|
extend-ignore = "E501"
|
||||||
|
|
||||||
|
|
4
tasks.py
|
@ -91,8 +91,8 @@ def get_cover(c, vid=""):
|
||||||
cv_file = tests.DIR_TESTFILES / "cover" / f"c{ti}_gradient.png"
|
cv_file = tests.DIR_TESTFILES / "cover" / f"c{ti}_gradient.png"
|
||||||
cv_blur_file = tests.DIR_TESTFILES / "cover" / f"c{ti}_blur.png"
|
cv_blur_file = tests.DIR_TESTFILES / "cover" / f"c{ti}_blur.png"
|
||||||
|
|
||||||
tn_file = youtube.download_thumbnail(vinfo, tn_file)
|
youtube.download_thumbnail(vinfo, tn_file)
|
||||||
util.download_file(channel_metadata.avatar_url, av_file)
|
util.download_image_file(channel_metadata.avatar_url, av_file)
|
||||||
|
|
||||||
cover.create_cover_file(
|
cover.create_cover_file(
|
||||||
tn_file, av_file, title, channel_name, cover.COVER_STYLE_GRADIENT, cv_file
|
tn_file, av_file, title, channel_name, cover.COVER_STYLE_GRADIENT, cv_file
|
||||||
|
|
|
@ -4,8 +4,3 @@ from django.apps import AppConfig
|
||||||
class UcastConfig(AppConfig):
|
class UcastConfig(AppConfig):
|
||||||
default_auto_field = "django.db.models.BigAutoField"
|
default_auto_field = "django.db.models.BigAutoField"
|
||||||
name = "ucast"
|
name = "ucast"
|
||||||
|
|
||||||
def ready(self):
|
|
||||||
from ucast.tasks import download
|
|
||||||
|
|
||||||
download.schedule_update_channels()
|
|
||||||
|
|
0
ucast/management/__init__.py
Normal file
0
ucast/management/commands/__init__.py
Normal file
11
ucast/management/commands/rqscheduler.py
Normal file
|
@ -0,0 +1,11 @@
|
||||||
|
from django_rq.management.commands import rqscheduler
|
||||||
|
|
||||||
|
from ucast.tasks import schedule
|
||||||
|
|
||||||
|
|
||||||
|
class Command(rqscheduler.Command):
|
||||||
|
def handle(self, *args, **kwargs):
|
||||||
|
print("Starting ucast scheduler")
|
||||||
|
schedule.clear_scheduled_jobs()
|
||||||
|
schedule.register_scheduled_jobs()
|
||||||
|
super(Command, self).handle(*args, **kwargs)
|
|
@ -197,9 +197,9 @@ def _get_baseimage(thumbnail: Image.Image, style: CoverStyle):
|
||||||
ctn_width = int(COVER_WIDTH / thumbnail.height * thumbnail.width)
|
ctn_width = int(COVER_WIDTH / thumbnail.height * thumbnail.width)
|
||||||
ctn_x_left = int((ctn_width - COVER_WIDTH) / 2)
|
ctn_x_left = int((ctn_width - COVER_WIDTH) / 2)
|
||||||
|
|
||||||
ctn = thumbnail.resize((ctn_width, COVER_WIDTH), Image.LANCZOS).filter(
|
ctn = thumbnail.resize(
|
||||||
ImageFilter.GaussianBlur(20)
|
(ctn_width, COVER_WIDTH), Image.Resampling.LANCZOS
|
||||||
)
|
).filter(ImageFilter.GaussianBlur(20))
|
||||||
cover.paste(ctn, (-ctn_x_left, 0))
|
cover.paste(ctn, (-ctn_x_left, 0))
|
||||||
|
|
||||||
return cover
|
return cover
|
||||||
|
@ -219,9 +219,9 @@ def _resize_thumbnail(thumbnail: Image.Image) -> Image.Image:
|
||||||
tn_crop_y_top = int((tn_resize_height - tn_height) / 2)
|
tn_crop_y_top = int((tn_resize_height - tn_height) / 2)
|
||||||
tn_crop_y_bottom = tn_resize_height - tn_crop_y_top
|
tn_crop_y_bottom = tn_resize_height - tn_crop_y_top
|
||||||
|
|
||||||
return thumbnail.resize((COVER_WIDTH, tn_resize_height), Image.LANCZOS).crop(
|
return thumbnail.resize(
|
||||||
(0, tn_crop_y_top, COVER_WIDTH, tn_crop_y_bottom)
|
(COVER_WIDTH, tn_resize_height), Image.Resampling.LANCZOS
|
||||||
)
|
).crop((0, tn_crop_y_top, COVER_WIDTH, tn_crop_y_bottom))
|
||||||
|
|
||||||
|
|
||||||
def _prepare_text_background(
|
def _prepare_text_background(
|
||||||
|
@ -357,7 +357,7 @@ def _draw_text_avatar(
|
||||||
)
|
)
|
||||||
|
|
||||||
if avatar:
|
if avatar:
|
||||||
avt = avatar.resize((avt_size, avt_size), Image.LANCZOS)
|
avt = avatar.resize((avt_size, avt_size), Image.Resampling.LANCZOS)
|
||||||
|
|
||||||
circle_mask = Image.new("L", (avt_size, avt_size))
|
circle_mask = Image.new("L", (avt_size, avt_size))
|
||||||
circle_mask_draw = ImageDraw.Draw(circle_mask)
|
circle_mask_draw = ImageDraw.Draw(circle_mask)
|
||||||
|
|
|
@ -1,32 +1,11 @@
|
||||||
import os
|
import os
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Tuple
|
|
||||||
|
|
||||||
import slugify
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
|
|
||||||
UCAST_DIRNAME = "_ucast"
|
UCAST_DIRNAME = "_ucast"
|
||||||
|
|
||||||
|
|
||||||
def _get_slug(str_in: str) -> str:
|
|
||||||
return slugify.slugify(str_in, lowercase=False, separator="_")
|
|
||||||
|
|
||||||
|
|
||||||
def _get_unique_slug(str_in: str, root_dir: Path, extension="") -> Tuple[Path, str]:
|
|
||||||
original_slug = _get_slug(str_in)
|
|
||||||
slug = original_slug
|
|
||||||
i = 0
|
|
||||||
|
|
||||||
while True:
|
|
||||||
testfile = root_dir / (slug + extension)
|
|
||||||
|
|
||||||
if not testfile.exists():
|
|
||||||
return testfile, slug
|
|
||||||
|
|
||||||
i += 1
|
|
||||||
slug = f"{original_slug}_{i}"
|
|
||||||
|
|
||||||
|
|
||||||
class ChannelFolder:
|
class ChannelFolder:
|
||||||
def __init__(self, dir_root: Path):
|
def __init__(self, dir_root: Path):
|
||||||
self.dir_root = dir_root
|
self.dir_root = dir_root
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
import shutil
|
import io
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
|
@ -15,33 +15,45 @@ def download_file(url: str, download_path: Path):
|
||||||
open(download_path, "wb").write(r.content)
|
open(download_path, "wb").write(r.content)
|
||||||
|
|
||||||
|
|
||||||
def download_image_file(url: str, download_path: Path) -> Path:
|
def download_image_file(url: str, download_path: Path):
|
||||||
download_file(url, download_path)
|
"""
|
||||||
img = Image.open(download_path)
|
Download an image and convert it to the type given
|
||||||
img_ext = img.format.lower()
|
by the path.
|
||||||
img.close()
|
|
||||||
|
|
||||||
|
:param url: Image URL
|
||||||
|
:param download_path: Download path
|
||||||
|
"""
|
||||||
|
r = requests.get(url, allow_redirects=True)
|
||||||
|
r.raise_for_status()
|
||||||
|
|
||||||
|
img = Image.open(io.BytesIO(r.content))
|
||||||
|
img_ext = img.format.lower()
|
||||||
if img_ext == "jpeg":
|
if img_ext == "jpeg":
|
||||||
img_ext = "jpg"
|
img_ext = "jpg"
|
||||||
|
|
||||||
new_path = download_path.with_suffix("." + img_ext)
|
if "." + img_ext == download_path.suffix:
|
||||||
shutil.move(download_path, new_path)
|
open(download_path, "wb").write(r.content)
|
||||||
return new_path
|
else:
|
||||||
|
img.save(download_path)
|
||||||
|
|
||||||
|
|
||||||
def resize_avatar(original_file: Path, new_file: Path):
|
def resize_avatar(original_file: Path, new_file: Path):
|
||||||
avatar = Image.open(original_file)
|
avatar = Image.open(original_file)
|
||||||
avatar_new_height = int(AVATAR_SM_WIDTH / avatar.width * avatar.height)
|
avatar_new_height = int(AVATAR_SM_WIDTH / avatar.width * avatar.height)
|
||||||
avatar = avatar.resize((AVATAR_SM_WIDTH, avatar_new_height), Image.LANCZOS)
|
avatar = avatar.resize(
|
||||||
|
(AVATAR_SM_WIDTH, avatar_new_height), Image.Resampling.LANCZOS
|
||||||
|
)
|
||||||
avatar.save(new_file)
|
avatar.save(new_file)
|
||||||
|
|
||||||
|
|
||||||
def resize_thumbnail(original_file: Path, new_file: Path):
|
def resize_thumbnail(original_file: Path, new_file: Path):
|
||||||
thumbnail = Image.open(original_file)
|
thumbnail = Image.open(original_file)
|
||||||
tn_new_height = int(THUMBNAIL_SM_WIDTH / thumbnail.width * thumbnail.height)
|
tn_new_height = int(THUMBNAIL_SM_WIDTH / thumbnail.width * thumbnail.height)
|
||||||
thumbnail = thumbnail.resize((THUMBNAIL_SM_WIDTH, tn_new_height), Image.LANCZOS)
|
thumbnail = thumbnail.resize(
|
||||||
|
(THUMBNAIL_SM_WIDTH, tn_new_height), Image.Resampling.LANCZOS
|
||||||
|
)
|
||||||
thumbnail.save(new_file)
|
thumbnail.save(new_file)
|
||||||
|
|
||||||
|
|
||||||
def get_slug(str_in: str) -> str:
|
def get_slug(text: str) -> str:
|
||||||
return slugify.slugify(str_in, lowercase=False, separator="_")
|
return slugify.slugify(text, lowercase=False, separator="_")
|
||||||
|
|
|
@ -92,7 +92,7 @@ class ChannelMetadata:
|
||||||
avatar_url: str
|
avatar_url: str
|
||||||
|
|
||||||
|
|
||||||
def download_thumbnail(vinfo: VideoDetails, download_path: Path) -> Path:
|
def download_thumbnail(vinfo: VideoDetails, download_path: Path):
|
||||||
"""
|
"""
|
||||||
Download the thumbnail image of a YouTube video and save it at the given filepath.
|
Download the thumbnail image of a YouTube video and save it at the given filepath.
|
||||||
The thumbnail file ending is added to the path.
|
The thumbnail file ending is added to the path.
|
||||||
|
@ -108,7 +108,8 @@ def download_thumbnail(vinfo: VideoDetails, download_path: Path) -> Path:
|
||||||
logging.info(f"downloading thumbnail {url}...")
|
logging.info(f"downloading thumbnail {url}...")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
return util.download_image_file(url, download_path)
|
util.download_image_file(url, download_path)
|
||||||
|
return
|
||||||
except requests.HTTPError:
|
except requests.HTTPError:
|
||||||
logging.warning(f"downloading thumbnail {url} failed")
|
logging.warning(f"downloading thumbnail {url} failed")
|
||||||
pass
|
pass
|
||||||
|
@ -158,20 +159,20 @@ def download_audio(
|
||||||
|
|
||||||
def tag_audio(audio_path: Path, vinfo: VideoDetails, cover_path: Path):
|
def tag_audio(audio_path: Path, vinfo: VideoDetails, cover_path: Path):
|
||||||
title_text = f"{vinfo.published.date().isoformat()} {vinfo.title}"
|
title_text = f"{vinfo.published.date().isoformat()} {vinfo.title}"
|
||||||
|
comment = f"https://youtu.be/{vinfo.id}\n\n{vinfo.description}"
|
||||||
|
|
||||||
audio = id3.ID3(audio_path)
|
tag = id3.ID3(audio_path)
|
||||||
audio["TPE1"] = id3.TPE1(encoding=3, text=vinfo.channel_name) # Artist
|
tag["TPE1"] = id3.TPE1(encoding=3, text=vinfo.channel_name) # Artist
|
||||||
audio["TALB"] = id3.TALB(encoding=3, text=vinfo.channel_name) # Album
|
tag["TALB"] = id3.TALB(encoding=3, text=vinfo.channel_name) # Album
|
||||||
audio["TIT2"] = id3.TIT2(encoding=3, text=title_text) # Title
|
tag["TIT2"] = id3.TIT2(encoding=3, text=title_text) # Title
|
||||||
audio["TYER"] = id3.TYER(encoding=3, text=str(vinfo.published.year)) # Year
|
tag["TDRC"] = id3.TDRC(encoding=3, text=vinfo.published.date().isoformat()) # Date
|
||||||
audio["TDAT"] = id3.TDAT(encoding=3, text=vinfo.published.strftime("%d%m")) # Date
|
tag["COMM"] = id3.COMM(encoding=3, text=comment) # Comment
|
||||||
audio["COMM"] = id3.COMM(encoding=3, text=f"YT-ID: {vinfo.id}") # Comment
|
|
||||||
|
|
||||||
with open(cover_path, "rb") as albumart:
|
with open(cover_path, "rb") as albumart:
|
||||||
audio["APIC"] = id3.APIC(
|
tag["APIC"] = id3.APIC(
|
||||||
encoding=3, mime="image/png", type=3, desc="Cover", data=albumart.read()
|
encoding=3, mime="image/png", type=3, desc="Cover", data=albumart.read()
|
||||||
)
|
)
|
||||||
audio.save()
|
tag.save()
|
||||||
|
|
||||||
|
|
||||||
def channel_url_from_id(channel_id: str) -> str:
|
def channel_url_from_id(channel_id: str) -> str:
|
||||||
|
@ -231,22 +232,6 @@ def get_channel_metadata(channel_url: str) -> ChannelMetadata:
|
||||||
return ChannelMetadata(channel_id, name, description, avatar)
|
return ChannelMetadata(channel_id, name, description, avatar)
|
||||||
|
|
||||||
|
|
||||||
def download_avatar(avatar_url: str, download_path: Path) -> Path:
|
|
||||||
"""
|
|
||||||
Download the avatar image of a channel. The .jpg file ending
|
|
||||||
is added to the path.
|
|
||||||
|
|
||||||
:param avatar_url: Channel avatar URL
|
|
||||||
:param download_path: Download path
|
|
||||||
:return: Path with file ending
|
|
||||||
"""
|
|
||||||
logging.info(f"downloading avatar {avatar_url}...")
|
|
||||||
|
|
||||||
download_path = download_path.with_suffix(".jpg")
|
|
||||||
util.download_file(avatar_url, download_path)
|
|
||||||
return download_path
|
|
||||||
|
|
||||||
|
|
||||||
def get_channel_videos_from_feed(channel_id: str) -> List[VideoScraped]:
|
def get_channel_videos_from_feed(channel_id: str) -> List[VideoScraped]:
|
||||||
"""
|
"""
|
||||||
Return videos of a channel using YouTube's RSS feed. Using the feed is fast,
|
Return videos of a channel using YouTube's RSS feed. Using the feed is fast,
|
||||||
|
|
|
@ -1,8 +1,6 @@
|
||||||
import os
|
import os
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
import django_rq
|
import django_rq
|
||||||
from django.conf import settings
|
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
|
|
||||||
from ucast.models import Channel, Video
|
from ucast.models import Channel, Video
|
||||||
|
@ -21,10 +19,8 @@ def _get_or_create_channel(channel_id: str) -> Channel:
|
||||||
channel_slug = Channel.get_new_slug(channel_data.name)
|
channel_slug = Channel.get_new_slug(channel_data.name)
|
||||||
channel_folder = store.get_channel_folder(channel_slug)
|
channel_folder = store.get_channel_folder(channel_slug)
|
||||||
|
|
||||||
avatar_file = youtube.download_avatar(
|
util.download_image_file(channel_data.avatar_url, channel_folder.file_avatar)
|
||||||
channel_data.avatar_url, channel_folder.file_avatar
|
util.resize_avatar(channel_folder.file_avatar, channel_folder.file_avatar_sm)
|
||||||
)
|
|
||||||
util.resize_avatar(avatar_file, channel_folder.file_avatar_sm)
|
|
||||||
|
|
||||||
channel = Channel(
|
channel = Channel(
|
||||||
id=channel_id,
|
id=channel_id,
|
||||||
|
@ -81,9 +77,8 @@ def download_video(video: Video):
|
||||||
details = youtube.download_audio(video.id, audio_file)
|
details = youtube.download_audio(video.id, audio_file)
|
||||||
|
|
||||||
# Download/convert thumbnails
|
# Download/convert thumbnails
|
||||||
tn_path = youtube.download_thumbnail(
|
tn_path = channel_folder.get_thumbnail(video.slug)
|
||||||
details, channel_folder.get_thumbnail(video.slug)
|
youtube.download_thumbnail(details, tn_path)
|
||||||
)
|
|
||||||
util.resize_thumbnail(tn_path, channel_folder.get_thumbnail(video.slug, True))
|
util.resize_thumbnail(tn_path, channel_folder.get_thumbnail(video.slug, True))
|
||||||
cover_file = channel_folder.get_cover(video.slug)
|
cover_file = channel_folder.get_cover(video.slug)
|
||||||
cover.create_cover_file(
|
cover.create_cover_file(
|
||||||
|
@ -120,15 +115,6 @@ def import_channel(channel_id: str, limit: int = None):
|
||||||
_load_scraped_video(vid, channel)
|
_load_scraped_video(vid, channel)
|
||||||
|
|
||||||
|
|
||||||
def schedule_update_channels():
|
|
||||||
django_rq.get_scheduler().schedule(
|
|
||||||
datetime.utcnow(),
|
|
||||||
update_channels,
|
|
||||||
id="schedule_update_channels",
|
|
||||||
interval=settings.YT_UPDATE_INTERVAL,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def update_channels():
|
def update_channels():
|
||||||
"""
|
"""
|
||||||
Update all channels from their RSS feeds and download new videos.
|
Update all channels from their RSS feeds and download new videos.
|
||||||
|
|
27
ucast/tasks/schedule.py
Normal file
|
@ -0,0 +1,27 @@
|
||||||
|
import logging
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
import django_rq
|
||||||
|
from django.conf import settings
|
||||||
|
|
||||||
|
from ucast.tasks import download
|
||||||
|
|
||||||
|
scheduler = django_rq.get_scheduler()
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def clear_scheduled_jobs():
|
||||||
|
"""Delete all scheduled jobs to prevent duplicates"""
|
||||||
|
for job in scheduler.get_jobs():
|
||||||
|
log.debug("Deleting scheduled job %s", job)
|
||||||
|
job.delete()
|
||||||
|
|
||||||
|
|
||||||
|
def register_scheduled_jobs():
|
||||||
|
"""Register all scheduled jobs"""
|
||||||
|
scheduler.schedule(
|
||||||
|
datetime.utcnow(),
|
||||||
|
download.update_channels,
|
||||||
|
id="schedule_update_channels",
|
||||||
|
interval=settings.YT_UPDATE_INTERVAL,
|
||||||
|
)
|
|
@ -1,3 +1,3 @@
|
||||||
from importlib.resources import files
|
from importlib.resources import files
|
||||||
|
|
||||||
DIR_TESTFILES = files("ucast.tests.testfiles")
|
DIR_TESTFILES = files("ucast.tests._testfiles")
|
||||||
|
|
BIN
ucast/tests/_testfiles/audio/audio1.mp3
Normal file
Before Width: | Height: | Size: 186 KiB After Width: | Height: | Size: 186 KiB |
Before Width: | Height: | Size: 32 KiB After Width: | Height: | Size: 32 KiB |
Before Width: | Height: | Size: 53 KiB After Width: | Height: | Size: 53 KiB |
Before Width: | Height: | Size: 26 KiB After Width: | Height: | Size: 26 KiB |
Before Width: | Height: | Size: 268 KiB After Width: | Height: | Size: 268 KiB |
Before Width: | Height: | Size: 234 KiB After Width: | Height: | Size: 234 KiB |
Before Width: | Height: | Size: 218 KiB After Width: | Height: | Size: 218 KiB |
Before Width: | Height: | Size: 215 KiB After Width: | Height: | Size: 215 KiB |
Before Width: | Height: | Size: 183 KiB After Width: | Height: | Size: 183 KiB |
Before Width: | Height: | Size: 216 KiB After Width: | Height: | Size: 216 KiB |
Before Width: | Height: | Size: 173 KiB After Width: | Height: | Size: 173 KiB |
|
@ -1,10 +1,11 @@
|
||||||
### Quellen der Thumbnails/Avatarbilder zum Testen
|
### Quellen der Thumbnails/Avatarbilder/Audiodateien zum Testen
|
||||||
|
|
||||||
- a1/t1: [ThetaDev @ Embedded World 2019](https://www.youtube.com/watch?v=ZPxEr4YdWt8), by [ThetaDev](https://www.youtube.com/channel/UCGiJh0NZ52wRhYKYnuZI08Q) (CC-BY)
|
- a1/t1: [ThetaDev @ Embedded World 2019](https://www.youtube.com/watch?v=ZPxEr4YdWt8), by [ThetaDev](https://www.youtube.com/channel/UCGiJh0NZ52wRhYKYnuZI08Q) (CC-BY)
|
||||||
- a2/t2: [Sintel - Open Movie by Blender Foundation](https://www.youtube.com/watch?v=eRsGyueVLvQ), by [Blender](https://www.youtube.com/c/BlenderFoundation) (CC-BY)
|
- a2/t2: [Sintel - Open Movie by Blender Foundation](https://www.youtube.com/watch?v=eRsGyueVLvQ), by [Blender](https://www.youtube.com/c/BlenderFoundation) (CC-BY)
|
||||||
- a3/t3: [Systemabsturz Teaser zur DiVOC bb3](https://www.youtube.com/watch?v=uFqgQ35wyYY), by [media.ccc.de](https://www.youtube.com/channel/UC2TXq_t06Hjdr2g_KdKpHQg) (CC-BY)
|
- a3/t3: [Systemabsturz Teaser zur DiVOC bb3](https://www.youtube.com/watch?v=uFqgQ35wyYY), by [media.ccc.de](https://www.youtube.com/channel/UC2TXq_t06Hjdr2g_KdKpHQg) (CC-BY)
|
||||||
|
- audio1: [No copyright intro free fire intro](https://www.youtube.com/watch?v=I0RRENheeTo), by [Shahzaib Hassan](https://www.youtube.com/channel/UCmLTTbctUZobNQrr8RtX8uQ), (CC-BY)
|
||||||
|
|
||||||
### Weitere Testvideos
|
### Weitere Testvideos
|
||||||
|
|
||||||
- [Persuasion (Instrumental) – RYYZN (No Copyright Music)](https://www.youtube.com/watch?v=DWjFW7Yq1fA), by [RYYZN](https://soundcloud.com/ryyzn) (CC-BY)
|
- [Persuasion (Instrumental) – RYYZN (No Copyright Music)](https://www.youtube.com/watch?v=DWjFW7Yq1fA), by [RYYZN](https://soundcloud.com/ryyzn) (CC-BY)
|
||||||
- [Small pink flowers | #shorts | Free Stock Video](https://www.youtube.com/watch?v=lcQZ6YwQHiw), by [Shahzaib Hassan](https://www.youtube.com/channel/UCmLTTbctUZobNQrr8RtX8uQ), (CC-BY)
|
- [Small pink flowers | #shorts | Free Stock Video](https://www.youtube.com/watch?v=lcQZ6YwQHiw), by [Shahzaib Hassan](https://www.youtube.com/channel/UCmLTTbctUZobNQrr8RtX8uQ), (CC-BY)
|
Before Width: | Height: | Size: 92 KiB After Width: | Height: | Size: 92 KiB |
Before Width: | Height: | Size: 20 KiB After Width: | Height: | Size: 20 KiB |
Before Width: | Height: | Size: 28 KiB After Width: | Height: | Size: 28 KiB |
0
ucast/tests/service/__init__.py
Normal file
56
ucast/tests/service/test_storage.py
Normal file
|
@ -0,0 +1,56 @@
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from ucast.service import storage
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_channel_folders(settings):
|
||||||
|
tmpdir_o = tempfile.TemporaryDirectory()
|
||||||
|
tmpdir = Path(tmpdir_o.name)
|
||||||
|
settings.DOWNLOAD_ROOT = tmpdir
|
||||||
|
|
||||||
|
store = storage.Storage()
|
||||||
|
cf1 = store.get_channel_folder("ThetaDev")
|
||||||
|
cf2 = store.get_channel_folder("Jeff_Geerling")
|
||||||
|
cf1b = store.get_channel_folder("ThetaDev")
|
||||||
|
|
||||||
|
cf1_path = tmpdir / "ThetaDev"
|
||||||
|
cf2_path = tmpdir / "Jeff_Geerling"
|
||||||
|
|
||||||
|
assert cf1.dir_root == cf1_path
|
||||||
|
assert cf1b.dir_root == cf1_path
|
||||||
|
assert cf2.dir_root == cf2_path
|
||||||
|
|
||||||
|
assert os.path.isdir(cf1_path)
|
||||||
|
assert os.path.isdir(cf2_path)
|
||||||
|
|
||||||
|
|
||||||
|
def test_channel_folder():
|
||||||
|
tmpdir_o = tempfile.TemporaryDirectory()
|
||||||
|
tmpdir = Path(tmpdir_o.name)
|
||||||
|
ucast_dir = tmpdir / "_ucast"
|
||||||
|
|
||||||
|
cf = storage.ChannelFolder(tmpdir)
|
||||||
|
|
||||||
|
# Verify internal paths
|
||||||
|
assert cf.file_avatar == ucast_dir / "avatar.jpg"
|
||||||
|
assert cf.file_avatar_sm == ucast_dir / "avatar_sm.webp"
|
||||||
|
assert cf.dir_covers == ucast_dir / "covers"
|
||||||
|
assert cf.dir_thumbnails == ucast_dir / "thumbnails"
|
||||||
|
|
||||||
|
# Create the folder
|
||||||
|
assert not cf.does_exist()
|
||||||
|
cf.create()
|
||||||
|
assert cf.does_exist()
|
||||||
|
|
||||||
|
assert cf.get_cover("my_video_title") == ucast_dir / "covers" / "my_video_title.png"
|
||||||
|
assert (
|
||||||
|
cf.get_thumbnail("my_video_title")
|
||||||
|
== ucast_dir / "thumbnails" / "my_video_title.webp"
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
cf.get_thumbnail("my_video_title", True)
|
||||||
|
== ucast_dir / "thumbnails" / "my_video_title_sm.webp"
|
||||||
|
)
|
||||||
|
assert cf.get_audio("my_video_title") == tmpdir / "my_video_title.mp3"
|
92
ucast/tests/service/test_util.py
Normal file
|
@ -0,0 +1,92 @@
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from PIL import Image, ImageChops
|
||||||
|
|
||||||
|
from ucast import tests
|
||||||
|
from ucast.service import util
|
||||||
|
|
||||||
|
TEST_FILE_URL = "https://yt3.ggpht.com/ytc/AKedOLSnFfmpibLLoqyaYdsF6bJ-zaLPzomII__FrJve1w=s900-c-k-c0x00ffffff-no-rj"
|
||||||
|
|
||||||
|
|
||||||
|
def test_download_file():
|
||||||
|
tmpdir_o = tempfile.TemporaryDirectory()
|
||||||
|
tmpdir = Path(tmpdir_o.name)
|
||||||
|
download_file = tmpdir / "download.jpg"
|
||||||
|
expected_tn_file = tests.DIR_TESTFILES / "avatar" / "a1.jpg"
|
||||||
|
|
||||||
|
util.download_file(TEST_FILE_URL, download_file)
|
||||||
|
|
||||||
|
downloaded_avatar = Image.open(download_file)
|
||||||
|
expected_avatar = Image.open(expected_tn_file)
|
||||||
|
|
||||||
|
diff = ImageChops.difference(downloaded_avatar, expected_avatar)
|
||||||
|
assert diff.getbbox() is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_download_image_file():
|
||||||
|
tmpdir_o = tempfile.TemporaryDirectory()
|
||||||
|
tmpdir = Path(tmpdir_o.name)
|
||||||
|
download_file = tmpdir / "download.jpg"
|
||||||
|
expected_tn_file = tests.DIR_TESTFILES / "avatar" / "a1.jpg"
|
||||||
|
|
||||||
|
util.download_image_file(TEST_FILE_URL, download_file)
|
||||||
|
|
||||||
|
downloaded_avatar = Image.open(download_file)
|
||||||
|
expected_avatar = Image.open(expected_tn_file)
|
||||||
|
|
||||||
|
diff = ImageChops.difference(downloaded_avatar, expected_avatar)
|
||||||
|
assert diff.getbbox() is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_download_image_file_conv():
|
||||||
|
tmpdir_o = tempfile.TemporaryDirectory()
|
||||||
|
tmpdir = Path(tmpdir_o.name)
|
||||||
|
download_file = tmpdir / "download.png"
|
||||||
|
expected_tn_file = tests.DIR_TESTFILES / "avatar" / "a1.jpg"
|
||||||
|
|
||||||
|
util.download_image_file(TEST_FILE_URL, download_file)
|
||||||
|
|
||||||
|
downloaded_avatar = Image.open(download_file)
|
||||||
|
expected_avatar = Image.open(expected_tn_file)
|
||||||
|
|
||||||
|
diff = ImageChops.difference(downloaded_avatar, expected_avatar)
|
||||||
|
assert diff.getbbox() is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_resize_avatar():
|
||||||
|
tmpdir_o = tempfile.TemporaryDirectory()
|
||||||
|
tmpdir = Path(tmpdir_o.name)
|
||||||
|
source_file = tests.DIR_TESTFILES / "avatar" / "a1.jpg"
|
||||||
|
resized_file = tmpdir / "avatar.webp"
|
||||||
|
|
||||||
|
util.resize_avatar(source_file, resized_file)
|
||||||
|
|
||||||
|
resized_avatar = Image.open(resized_file)
|
||||||
|
assert resized_avatar.size == (100, 100)
|
||||||
|
|
||||||
|
|
||||||
|
def test_resize_thumbnail():
|
||||||
|
tmpdir_o = tempfile.TemporaryDirectory()
|
||||||
|
tmpdir = Path(tmpdir_o.name)
|
||||||
|
source_file = tests.DIR_TESTFILES / "thumbnail" / "t1.webp"
|
||||||
|
resized_file = tmpdir / "thumbnail.webp"
|
||||||
|
|
||||||
|
util.resize_thumbnail(source_file, resized_file)
|
||||||
|
|
||||||
|
resized_thumbnail = Image.open(resized_file)
|
||||||
|
assert resized_thumbnail.size == (360, 202)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"text,expected_slug",
|
||||||
|
[
|
||||||
|
("Hello World 👋", "Hello_World"),
|
||||||
|
("ÄäÖöÜüß", "AaOoUuss"),
|
||||||
|
("오징어 게임", "ojingeo_geim"),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_slug(text: str, expected_slug: str):
|
||||||
|
slug = util.get_slug(text)
|
||||||
|
assert slug == expected_slug
|
214
ucast/tests/service/test_youtube.py
Normal file
|
@ -0,0 +1,214 @@
|
||||||
|
import datetime
|
||||||
|
import io
|
||||||
|
import re
|
||||||
|
import shutil
|
||||||
|
import subprocess
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from mutagen import id3
|
||||||
|
from PIL import Image, ImageChops
|
||||||
|
|
||||||
|
from ucast import tests
|
||||||
|
from ucast.service import youtube
|
||||||
|
|
||||||
|
VIDEO_ID_THETADEV = "ZPxEr4YdWt8"
|
||||||
|
VIDEO_ID_SHORT = "lcQZ6YwQHiw"
|
||||||
|
VIDEO_ID_PERSUASION = "DWjFW7Yq1fA"
|
||||||
|
|
||||||
|
CHANNEL_ID_THETADEV = "UCGiJh0NZ52wRhYKYnuZI08Q"
|
||||||
|
CHANNEL_ID_BLENDER = "UCSMOQeBJ2RAnuFungnQOxLg"
|
||||||
|
CHANNEL_URL_BLENDER = "https://www.youtube.com/c/BlenderFoundation"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module")
|
||||||
|
def video_details() -> youtube.VideoDetails:
|
||||||
|
return youtube.get_video_details(VIDEO_ID_THETADEV)
|
||||||
|
|
||||||
|
|
||||||
|
def test_download_thumbnail(video_details):
|
||||||
|
tmpdir_o = tempfile.TemporaryDirectory()
|
||||||
|
tmpdir = Path(tmpdir_o.name)
|
||||||
|
tn_file = tmpdir / "thumbnail.webp"
|
||||||
|
expected_tn_file = tests.DIR_TESTFILES / "thumbnail" / "t1.webp"
|
||||||
|
|
||||||
|
youtube.download_thumbnail(video_details, tn_file)
|
||||||
|
|
||||||
|
tn = Image.open(tn_file)
|
||||||
|
expected_tn = Image.open(expected_tn_file)
|
||||||
|
|
||||||
|
diff = ImageChops.difference(tn, expected_tn)
|
||||||
|
assert diff.getbbox() is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_video_details(video_details):
|
||||||
|
assert video_details.id == VIDEO_ID_THETADEV
|
||||||
|
assert video_details.title == "ThetaDev @ Embedded World 2019"
|
||||||
|
assert video_details.channel_id == "UCGiJh0NZ52wRhYKYnuZI08Q"
|
||||||
|
assert (
|
||||||
|
video_details.description
|
||||||
|
== """This february I spent one day at the Embedded World in Nuremberg. They showed tons of interesting electronics stuff, so I had to take some pictures and videos for you to see ;-)
|
||||||
|
|
||||||
|
Sorry for the late upload, I just didn't have time to edit my footage.
|
||||||
|
|
||||||
|
Embedded World: https://www.embedded-world.de/
|
||||||
|
|
||||||
|
My website: https://thdev.org
|
||||||
|
Twitter: https://twitter.com/Theta_Dev"""
|
||||||
|
)
|
||||||
|
assert video_details.duration == 267
|
||||||
|
assert not video_details.is_currently_live
|
||||||
|
assert not video_details.is_livestream
|
||||||
|
assert not video_details.is_short
|
||||||
|
assert video_details.published == datetime.datetime(
|
||||||
|
2019, 6, 2, tzinfo=datetime.timezone.utc
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_video_details_short():
|
||||||
|
vinfo = youtube.get_video_details(VIDEO_ID_SHORT)
|
||||||
|
assert vinfo.id == VIDEO_ID_SHORT
|
||||||
|
assert (
|
||||||
|
vinfo.title
|
||||||
|
== "Small pink flowers | #shorts | Free Stock Video | \
|
||||||
|
creative commons short videos | creative #short"
|
||||||
|
)
|
||||||
|
assert not vinfo.is_currently_live
|
||||||
|
assert not vinfo.is_livestream
|
||||||
|
assert vinfo.is_short
|
||||||
|
|
||||||
|
|
||||||
|
def test_download_audio():
|
||||||
|
tmpdir_o = tempfile.TemporaryDirectory()
|
||||||
|
tmpdir = Path(tmpdir_o.name)
|
||||||
|
download_file = tmpdir / "download.mp3"
|
||||||
|
|
||||||
|
vinfo = youtube.download_audio(VIDEO_ID_PERSUASION, download_file)
|
||||||
|
assert vinfo.id == VIDEO_ID_PERSUASION
|
||||||
|
assert vinfo.title == "Persuasion (Instrumental) – RYYZN (No Copyright Music)"
|
||||||
|
assert vinfo.duration == 100
|
||||||
|
|
||||||
|
# Check with ffmpeg if the audio file is valid
|
||||||
|
res = subprocess.run(
|
||||||
|
["ffmpeg", "-i", str(download_file)],
|
||||||
|
capture_output=True,
|
||||||
|
universal_newlines=True,
|
||||||
|
)
|
||||||
|
assert "Stream #0:0: Audio: mp3" in res.stderr
|
||||||
|
|
||||||
|
match = re.search(r"Duration: (\d{2}:\d{2}:\d{2})", res.stderr)
|
||||||
|
assert match[1] == "00:01:40"
|
||||||
|
|
||||||
|
|
||||||
|
def test_tag_audio(video_details):
|
||||||
|
tmpdir_o = tempfile.TemporaryDirectory()
|
||||||
|
tmpdir = Path(tmpdir_o.name)
|
||||||
|
audio_file = tmpdir / "audio.mp3"
|
||||||
|
cover_file = tests.DIR_TESTFILES / "cover" / "c1_blur.png"
|
||||||
|
shutil.copyfile(tests.DIR_TESTFILES / "audio" / "audio1.mp3", audio_file)
|
||||||
|
|
||||||
|
youtube.tag_audio(audio_file, video_details, cover_file)
|
||||||
|
|
||||||
|
tag = id3.ID3(audio_file)
|
||||||
|
assert tag["TPE1"].text[0] == "ThetaDev"
|
||||||
|
assert tag["TALB"].text[0] == "ThetaDev"
|
||||||
|
assert tag["TIT2"].text[0] == "2019-06-02 ThetaDev @ Embedded World 2019"
|
||||||
|
assert tag["TDRC"].text[0].text == "2019-06-02"
|
||||||
|
assert (
|
||||||
|
tag["COMM::XXX"].text[0]
|
||||||
|
== """https://youtu.be/ZPxEr4YdWt8
|
||||||
|
|
||||||
|
This february I spent one day at the Embedded World in Nuremberg. They showed tons of interesting electronics stuff, so I had to take some pictures and videos for you to see ;-)
|
||||||
|
|
||||||
|
Sorry for the late upload, I just didn't have time to edit my footage.
|
||||||
|
|
||||||
|
Embedded World: https://www.embedded-world.de/
|
||||||
|
|
||||||
|
My website: https://thdev.org
|
||||||
|
Twitter: https://twitter.com/Theta_Dev"""
|
||||||
|
)
|
||||||
|
|
||||||
|
tag_cover = tag["APIC:Cover"]
|
||||||
|
assert tag_cover.mime == "image/png"
|
||||||
|
|
||||||
|
tag_cover_img = Image.open(io.BytesIO(tag_cover.data))
|
||||||
|
expected_cover_img = Image.open(cover_file)
|
||||||
|
diff = ImageChops.difference(tag_cover_img, expected_cover_img)
|
||||||
|
assert diff.getbbox() is None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"channel_str,channel_url",
|
||||||
|
[
|
||||||
|
(
|
||||||
|
"https://www.youtube.com/channel/UCGiJh0NZ52wRhYKYnuZI08Q",
|
||||||
|
"https://www.youtube.com/channel/UCGiJh0NZ52wRhYKYnuZI08Q",
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"https://www.youtube.com/c/MrBeast6000",
|
||||||
|
"https://www.youtube.com/c/MrBeast6000",
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"https://www.youtube.com/user/LinusTechTips",
|
||||||
|
"https://www.youtube.com/user/LinusTechTips",
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"UCGiJh0NZ52wRhYKYnuZI08Q",
|
||||||
|
"https://www.youtube.com/channel/UCGiJh0NZ52wRhYKYnuZI08Q",
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"https://piped.mha.fi/user/LinusTechTips",
|
||||||
|
"https://www.youtube.com/user/LinusTechTips",
|
||||||
|
),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_channel_url_from_str(channel_str: str, channel_url: str):
|
||||||
|
url = youtube.channel_url_from_str(channel_str)
|
||||||
|
assert url == channel_url
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"channel_url,channel_id,name,avatar_url",
|
||||||
|
[
|
||||||
|
(
|
||||||
|
youtube.channel_url_from_id(CHANNEL_ID_THETADEV),
|
||||||
|
CHANNEL_ID_THETADEV,
|
||||||
|
"ThetaDev",
|
||||||
|
"https://yt3.ggpht.com/ytc/AKedOLSnFfmpibLLoqyaYdsF6bJ-zaLPzomII__FrJve1w=s900-c-k-c0x00ffffff-no-rj",
|
||||||
|
),
|
||||||
|
(
|
||||||
|
CHANNEL_URL_BLENDER,
|
||||||
|
CHANNEL_ID_BLENDER,
|
||||||
|
"Blender",
|
||||||
|
"https://yt3.ggpht.com/ytc/AKedOLT_31fFSD3FWEBnHZnyZeJx-GPHJwYCQKcEpaq8NQ=s900-c-k-c0x00ffffff-no-rj",
|
||||||
|
),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_channel_metadata(
|
||||||
|
channel_url: str, channel_id: str, name: str, avatar_url: str
|
||||||
|
):
|
||||||
|
metadata = youtube.get_channel_metadata(channel_url)
|
||||||
|
assert metadata.id == channel_id
|
||||||
|
assert metadata.name == name
|
||||||
|
assert metadata.avatar_url == avatar_url
|
||||||
|
assert metadata.description
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_channel_videos_from_feed():
|
||||||
|
videos = youtube.get_channel_videos_from_feed(CHANNEL_ID_THETADEV)
|
||||||
|
assert videos
|
||||||
|
|
||||||
|
v1 = videos[0]
|
||||||
|
assert len(v1.id) == 11
|
||||||
|
assert v1.published.tzinfo == datetime.timezone.utc
|
||||||
|
assert v1.published.second > 0 or v1.published.minute > 0 or v1.published.hour > 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_channel_videos_from_scraper():
|
||||||
|
videos = youtube.get_channel_videos_from_scraper(CHANNEL_ID_THETADEV)
|
||||||
|
assert videos
|
||||||
|
|
||||||
|
v1 = videos[0]
|
||||||
|
assert len(v1.id) == 11
|
||||||
|
assert v1.published is None
|
|
@ -1,24 +0,0 @@
|
||||||
import tempfile
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
from PIL import Image, ImageChops
|
|
||||||
|
|
||||||
from ucast import tests
|
|
||||||
from ucast.service import util
|
|
||||||
|
|
||||||
TEST_FILE_URL = "https://yt3.ggpht.com/ytc/AKedOLSnFfmpibLLoqyaYdsF6bJ-zaLPzomII__FrJve1w=s900-c-k-c0x00ffffff-no-rj"
|
|
||||||
|
|
||||||
|
|
||||||
def test_download_file():
|
|
||||||
tmpdir_o = tempfile.TemporaryDirectory()
|
|
||||||
tmpdir = Path(tmpdir_o.name)
|
|
||||||
download_file = tmpdir / "download.jpg"
|
|
||||||
expected_tn_file = tests.DIR_TESTFILES / "avatar" / "a1.jpg"
|
|
||||||
|
|
||||||
util.download_file(TEST_FILE_URL, download_file)
|
|
||||||
|
|
||||||
downloaded_avatar = Image.open(download_file)
|
|
||||||
expected_avatar = Image.open(expected_tn_file)
|
|
||||||
|
|
||||||
diff = ImageChops.difference(downloaded_avatar, expected_avatar)
|
|
||||||
assert diff.getbbox() is None
|
|
|
@ -1,126 +0,0 @@
|
||||||
import datetime
|
|
||||||
import re
|
|
||||||
import subprocess
|
|
||||||
import tempfile
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
from PIL import Image, ImageChops
|
|
||||||
|
|
||||||
from ucast import tests
|
|
||||||
from ucast.service import youtube
|
|
||||||
|
|
||||||
VIDEO_ID_THETADEV = "ZPxEr4YdWt8"
|
|
||||||
VIDEO_ID_SHORT = "lcQZ6YwQHiw"
|
|
||||||
VIDEO_ID_PERSUASION = "DWjFW7Yq1fA"
|
|
||||||
|
|
||||||
CHANNEL_ID_THETADEV = "UCGiJh0NZ52wRhYKYnuZI08Q"
|
|
||||||
CHANNEL_ID_BLENDER = "UCSMOQeBJ2RAnuFungnQOxLg"
|
|
||||||
CHANNEL_URL_BLENDER = "https://www.youtube.com/c/BlenderFoundation"
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="module")
|
|
||||||
def video_info() -> youtube.VideoDetails:
|
|
||||||
return youtube.get_video_details(VIDEO_ID_THETADEV)
|
|
||||||
|
|
||||||
|
|
||||||
def test_download_thumbnail(video_info):
|
|
||||||
tmpdir_o = tempfile.TemporaryDirectory()
|
|
||||||
tmpdir = Path(tmpdir_o.name)
|
|
||||||
tn_file = tmpdir / "thumbnail"
|
|
||||||
expected_tn_file = tests.DIR_TESTFILES / "thumbnail" / "t1.webp"
|
|
||||||
|
|
||||||
tn_file = youtube.download_thumbnail(video_info, tn_file)
|
|
||||||
assert tn_file.suffix == ".webp"
|
|
||||||
|
|
||||||
tn = Image.open(tn_file)
|
|
||||||
expected_tn = Image.open(expected_tn_file)
|
|
||||||
|
|
||||||
diff = ImageChops.difference(tn, expected_tn)
|
|
||||||
assert diff.getbbox() is None
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_video_info(video_info):
|
|
||||||
assert video_info.id == VIDEO_ID_THETADEV
|
|
||||||
assert video_info.title == "ThetaDev @ Embedded World 2019"
|
|
||||||
assert video_info.channel_id == "UCGiJh0NZ52wRhYKYnuZI08Q"
|
|
||||||
assert (
|
|
||||||
video_info.description
|
|
||||||
== """This february I spent one day at the Embedded World in Nuremberg. They showed tons of interesting electronics stuff, so I had to take some pictures and videos for you to see ;-)
|
|
||||||
|
|
||||||
Sorry for the late upload, I just didn't have time to edit my footage.
|
|
||||||
|
|
||||||
Embedded World: https://www.embedded-world.de/
|
|
||||||
|
|
||||||
My website: https://thdev.org
|
|
||||||
Twitter: https://twitter.com/Theta_Dev"""
|
|
||||||
)
|
|
||||||
assert video_info.duration == 267
|
|
||||||
assert not video_info.is_currently_live
|
|
||||||
assert not video_info.is_livestream
|
|
||||||
assert not video_info.is_short
|
|
||||||
assert video_info.published == datetime.datetime(
|
|
||||||
2019, 6, 2, tzinfo=datetime.timezone.utc
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_video_info_short():
|
|
||||||
vinfo = youtube.get_video_details(VIDEO_ID_SHORT)
|
|
||||||
assert vinfo.id == VIDEO_ID_SHORT
|
|
||||||
assert (
|
|
||||||
vinfo.title
|
|
||||||
== "Small pink flowers | #shorts | Free Stock Video | \
|
|
||||||
creative commons short videos | creative #short"
|
|
||||||
)
|
|
||||||
assert not vinfo.is_currently_live
|
|
||||||
assert not vinfo.is_livestream
|
|
||||||
assert vinfo.is_short
|
|
||||||
|
|
||||||
|
|
||||||
def test_download_video():
|
|
||||||
tmpdir_o = tempfile.TemporaryDirectory()
|
|
||||||
tmpdir = Path(tmpdir_o.name)
|
|
||||||
download_file = tmpdir / "download.mp3"
|
|
||||||
|
|
||||||
vinfo = youtube.download_audio(VIDEO_ID_PERSUASION, download_file)
|
|
||||||
assert vinfo.id == VIDEO_ID_PERSUASION
|
|
||||||
assert vinfo.title == "Persuasion (Instrumental) – RYYZN (No Copyright Music)"
|
|
||||||
assert vinfo.duration == 100
|
|
||||||
|
|
||||||
# Check with ffmpeg if the audio file is valid
|
|
||||||
res = subprocess.run(
|
|
||||||
["ffmpeg", "-i", str(download_file)],
|
|
||||||
capture_output=True,
|
|
||||||
universal_newlines=True,
|
|
||||||
)
|
|
||||||
assert "Stream #0:0: Audio: mp3" in res.stderr
|
|
||||||
|
|
||||||
match = re.search(r"Duration: (\d{2}:\d{2}:\d{2})", res.stderr)
|
|
||||||
assert match[1] == "00:01:40"
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
|
||||||
"channel_url,channel_id,name,avatar_url",
|
|
||||||
[
|
|
||||||
(
|
|
||||||
youtube.channel_url_from_id(CHANNEL_ID_THETADEV),
|
|
||||||
CHANNEL_ID_THETADEV,
|
|
||||||
"ThetaDev",
|
|
||||||
"https://yt3.ggpht.com/ytc/AKedOLSnFfmpibLLoqyaYdsF6bJ-zaLPzomII__FrJve1w=s900-c-k-c0x00ffffff-no-rj",
|
|
||||||
),
|
|
||||||
(
|
|
||||||
CHANNEL_URL_BLENDER,
|
|
||||||
CHANNEL_ID_BLENDER,
|
|
||||||
"Blender",
|
|
||||||
"https://yt3.ggpht.com/ytc/AKedOLT_31fFSD3FWEBnHZnyZeJx-GPHJwYCQKcEpaq8NQ=s900-c-k-c0x00ffffff-no-rj",
|
|
||||||
),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
def test_channel_metadata(
|
|
||||||
channel_url: str, channel_id: str, name: str, avatar_url: str
|
|
||||||
):
|
|
||||||
metadata = youtube.get_channel_metadata(channel_url)
|
|
||||||
assert metadata.id == channel_id
|
|
||||||
assert metadata.name == name
|
|
||||||
assert metadata.avatar_url == avatar_url
|
|
||||||
assert metadata.description
|
|