Compare commits
	
		
			No commits in common. "4b6733b9b6148332fc42f6ece4c55fbb91bee4b5" and "8af98a44ae4f958bf7e556367ea2b1da7fe1bae8" have entirely different histories.
		
	
	
		
			
				4b6733b9b6
			
			...
			
				8af98a44ae
			
		
	
		
							
								
								
									
										40
									
								
								.drone.yml
									
										
									
									
									
								
							
							
						
						| 
						 | 
				
			
			@ -7,47 +7,9 @@ platform:
 | 
			
		|||
  arch: ''
 | 
			
		||||
 | 
			
		||||
steps:
 | 
			
		||||
  - name: install dependencies
 | 
			
		||||
  - name: Test
 | 
			
		||||
    image: thetadev256/ucast-dev
 | 
			
		||||
    volumes:
 | 
			
		||||
      - name: cache
 | 
			
		||||
        path: /root/.cache
 | 
			
		||||
    commands:
 | 
			
		||||
      - poetry install
 | 
			
		||||
 | 
			
		||||
  - name: lint
 | 
			
		||||
    image: thetadev256/ucast-dev
 | 
			
		||||
    volumes:
 | 
			
		||||
      - name: cache
 | 
			
		||||
        path: /root/.cache
 | 
			
		||||
    commands:
 | 
			
		||||
      - poetry run invoke lint
 | 
			
		||||
 | 
			
		||||
  - name: start worker
 | 
			
		||||
    image: thetadev256/ucast-dev
 | 
			
		||||
    volumes:
 | 
			
		||||
      - name: cache
 | 
			
		||||
        path: /root/.cache
 | 
			
		||||
    environment:
 | 
			
		||||
      UCAST_REDIS_HOST: redis
 | 
			
		||||
    commands:
 | 
			
		||||
      - poetry run invoke worker
 | 
			
		||||
    detach: true
 | 
			
		||||
 | 
			
		||||
  - name: test
 | 
			
		||||
    image: thetadev256/ucast-dev
 | 
			
		||||
    volumes:
 | 
			
		||||
      - name: cache
 | 
			
		||||
        path: /root/.cache
 | 
			
		||||
    environment:
 | 
			
		||||
      UCAST_REDIS_HOST: redis
 | 
			
		||||
    commands:
 | 
			
		||||
      - poetry run invoke test
 | 
			
		||||
 | 
			
		||||
services:
 | 
			
		||||
  - name: redis
 | 
			
		||||
    image: redis:alpine
 | 
			
		||||
 | 
			
		||||
volumes:
 | 
			
		||||
  - name: cache
 | 
			
		||||
    temp: { }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										5
									
								
								.gitignore
									
										
									
									
										vendored
									
									
								
							
							
						
						| 
						 | 
				
			
			@ -14,6 +14,11 @@ node_modules
 | 
			
		|||
# Jupyter
 | 
			
		||||
.ipynb_checkpoints
 | 
			
		||||
 | 
			
		||||
# Media files
 | 
			
		||||
*.webm
 | 
			
		||||
*.mp4
 | 
			
		||||
*.mp3
 | 
			
		||||
 | 
			
		||||
# Application data
 | 
			
		||||
/_run*
 | 
			
		||||
*.sqlite3
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -44,9 +44,6 @@ honcho = "^1.1.0"
 | 
			
		|||
requires = ["poetry-core>=1.0.0"]
 | 
			
		||||
build-backend = "poetry.core.masonry.api"
 | 
			
		||||
 | 
			
		||||
[tool.pytest.ini_options]
 | 
			
		||||
DJANGO_SETTINGS_MODULE = "ucast_project.settings"
 | 
			
		||||
 | 
			
		||||
[tool.flake8]
 | 
			
		||||
extend-ignore = "E501"
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										4
									
								
								tasks.py
									
										
									
									
									
								
							
							
						
						| 
						 | 
				
			
			@ -91,8 +91,8 @@ def get_cover(c, vid=""):
 | 
			
		|||
    cv_file = tests.DIR_TESTFILES / "cover" / f"c{ti}_gradient.png"
 | 
			
		||||
    cv_blur_file = tests.DIR_TESTFILES / "cover" / f"c{ti}_blur.png"
 | 
			
		||||
 | 
			
		||||
    youtube.download_thumbnail(vinfo, tn_file)
 | 
			
		||||
    util.download_image_file(channel_metadata.avatar_url, av_file)
 | 
			
		||||
    tn_file = youtube.download_thumbnail(vinfo, tn_file)
 | 
			
		||||
    util.download_file(channel_metadata.avatar_url, av_file)
 | 
			
		||||
 | 
			
		||||
    cover.create_cover_file(
 | 
			
		||||
        tn_file, av_file, title, channel_name, cover.COVER_STYLE_GRADIENT, cv_file
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -4,3 +4,8 @@ from django.apps import AppConfig
 | 
			
		|||
class UcastConfig(AppConfig):
 | 
			
		||||
    default_auto_field = "django.db.models.BigAutoField"
 | 
			
		||||
    name = "ucast"
 | 
			
		||||
 | 
			
		||||
    def ready(self):
 | 
			
		||||
        from ucast.tasks import download
 | 
			
		||||
 | 
			
		||||
        download.schedule_update_channels()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,11 +0,0 @@
 | 
			
		|||
from django_rq.management.commands import rqscheduler
 | 
			
		||||
 | 
			
		||||
from ucast.tasks import schedule
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Command(rqscheduler.Command):
 | 
			
		||||
    def handle(self, *args, **kwargs):
 | 
			
		||||
        print("Starting ucast scheduler")
 | 
			
		||||
        schedule.clear_scheduled_jobs()
 | 
			
		||||
        schedule.register_scheduled_jobs()
 | 
			
		||||
        super(Command, self).handle(*args, **kwargs)
 | 
			
		||||
| 
						 | 
				
			
			@ -197,9 +197,9 @@ def _get_baseimage(thumbnail: Image.Image, style: CoverStyle):
 | 
			
		|||
        ctn_width = int(COVER_WIDTH / thumbnail.height * thumbnail.width)
 | 
			
		||||
        ctn_x_left = int((ctn_width - COVER_WIDTH) / 2)
 | 
			
		||||
 | 
			
		||||
        ctn = thumbnail.resize(
 | 
			
		||||
            (ctn_width, COVER_WIDTH), Image.Resampling.LANCZOS
 | 
			
		||||
        ).filter(ImageFilter.GaussianBlur(20))
 | 
			
		||||
        ctn = thumbnail.resize((ctn_width, COVER_WIDTH), Image.LANCZOS).filter(
 | 
			
		||||
            ImageFilter.GaussianBlur(20)
 | 
			
		||||
        )
 | 
			
		||||
        cover.paste(ctn, (-ctn_x_left, 0))
 | 
			
		||||
 | 
			
		||||
    return cover
 | 
			
		||||
| 
						 | 
				
			
			@ -219,9 +219,9 @@ def _resize_thumbnail(thumbnail: Image.Image) -> Image.Image:
 | 
			
		|||
    tn_crop_y_top = int((tn_resize_height - tn_height) / 2)
 | 
			
		||||
    tn_crop_y_bottom = tn_resize_height - tn_crop_y_top
 | 
			
		||||
 | 
			
		||||
    return thumbnail.resize(
 | 
			
		||||
        (COVER_WIDTH, tn_resize_height), Image.Resampling.LANCZOS
 | 
			
		||||
    ).crop((0, tn_crop_y_top, COVER_WIDTH, tn_crop_y_bottom))
 | 
			
		||||
    return thumbnail.resize((COVER_WIDTH, tn_resize_height), Image.LANCZOS).crop(
 | 
			
		||||
        (0, tn_crop_y_top, COVER_WIDTH, tn_crop_y_bottom)
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _prepare_text_background(
 | 
			
		||||
| 
						 | 
				
			
			@ -357,7 +357,7 @@ def _draw_text_avatar(
 | 
			
		|||
    )
 | 
			
		||||
 | 
			
		||||
    if avatar:
 | 
			
		||||
        avt = avatar.resize((avt_size, avt_size), Image.Resampling.LANCZOS)
 | 
			
		||||
        avt = avatar.resize((avt_size, avt_size), Image.LANCZOS)
 | 
			
		||||
 | 
			
		||||
        circle_mask = Image.new("L", (avt_size, avt_size))
 | 
			
		||||
        circle_mask_draw = ImageDraw.Draw(circle_mask)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,11 +1,32 @@
 | 
			
		|||
import os
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
from typing import Tuple
 | 
			
		||||
 | 
			
		||||
import slugify
 | 
			
		||||
from django.conf import settings
 | 
			
		||||
 | 
			
		||||
UCAST_DIRNAME = "_ucast"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _get_slug(str_in: str) -> str:
 | 
			
		||||
    return slugify.slugify(str_in, lowercase=False, separator="_")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _get_unique_slug(str_in: str, root_dir: Path, extension="") -> Tuple[Path, str]:
 | 
			
		||||
    original_slug = _get_slug(str_in)
 | 
			
		||||
    slug = original_slug
 | 
			
		||||
    i = 0
 | 
			
		||||
 | 
			
		||||
    while True:
 | 
			
		||||
        testfile = root_dir / (slug + extension)
 | 
			
		||||
 | 
			
		||||
        if not testfile.exists():
 | 
			
		||||
            return testfile, slug
 | 
			
		||||
 | 
			
		||||
        i += 1
 | 
			
		||||
        slug = f"{original_slug}_{i}"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ChannelFolder:
 | 
			
		||||
    def __init__(self, dir_root: Path):
 | 
			
		||||
        self.dir_root = dir_root
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,4 +1,4 @@
 | 
			
		|||
import io
 | 
			
		||||
import shutil
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
 | 
			
		||||
import requests
 | 
			
		||||
| 
						 | 
				
			
			@ -15,45 +15,33 @@ def download_file(url: str, download_path: Path):
 | 
			
		|||
    open(download_path, "wb").write(r.content)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def download_image_file(url: str, download_path: Path):
 | 
			
		||||
    """
 | 
			
		||||
    Download an image and convert it to the type given
 | 
			
		||||
    by the path.
 | 
			
		||||
 | 
			
		||||
    :param url: Image URL
 | 
			
		||||
    :param download_path: Download path
 | 
			
		||||
    """
 | 
			
		||||
    r = requests.get(url, allow_redirects=True)
 | 
			
		||||
    r.raise_for_status()
 | 
			
		||||
 | 
			
		||||
    img = Image.open(io.BytesIO(r.content))
 | 
			
		||||
def download_image_file(url: str, download_path: Path) -> Path:
 | 
			
		||||
    download_file(url, download_path)
 | 
			
		||||
    img = Image.open(download_path)
 | 
			
		||||
    img_ext = img.format.lower()
 | 
			
		||||
    img.close()
 | 
			
		||||
 | 
			
		||||
    if img_ext == "jpeg":
 | 
			
		||||
        img_ext = "jpg"
 | 
			
		||||
 | 
			
		||||
    if "." + img_ext == download_path.suffix:
 | 
			
		||||
        open(download_path, "wb").write(r.content)
 | 
			
		||||
    else:
 | 
			
		||||
        img.save(download_path)
 | 
			
		||||
    new_path = download_path.with_suffix("." + img_ext)
 | 
			
		||||
    shutil.move(download_path, new_path)
 | 
			
		||||
    return new_path
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def resize_avatar(original_file: Path, new_file: Path):
 | 
			
		||||
    avatar = Image.open(original_file)
 | 
			
		||||
    avatar_new_height = int(AVATAR_SM_WIDTH / avatar.width * avatar.height)
 | 
			
		||||
    avatar = avatar.resize(
 | 
			
		||||
        (AVATAR_SM_WIDTH, avatar_new_height), Image.Resampling.LANCZOS
 | 
			
		||||
    )
 | 
			
		||||
    avatar = avatar.resize((AVATAR_SM_WIDTH, avatar_new_height), Image.LANCZOS)
 | 
			
		||||
    avatar.save(new_file)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def resize_thumbnail(original_file: Path, new_file: Path):
 | 
			
		||||
    thumbnail = Image.open(original_file)
 | 
			
		||||
    tn_new_height = int(THUMBNAIL_SM_WIDTH / thumbnail.width * thumbnail.height)
 | 
			
		||||
    thumbnail = thumbnail.resize(
 | 
			
		||||
        (THUMBNAIL_SM_WIDTH, tn_new_height), Image.Resampling.LANCZOS
 | 
			
		||||
    )
 | 
			
		||||
    thumbnail = thumbnail.resize((THUMBNAIL_SM_WIDTH, tn_new_height), Image.LANCZOS)
 | 
			
		||||
    thumbnail.save(new_file)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_slug(text: str) -> str:
 | 
			
		||||
    return slugify.slugify(text, lowercase=False, separator="_")
 | 
			
		||||
def get_slug(str_in: str) -> str:
 | 
			
		||||
    return slugify.slugify(str_in, lowercase=False, separator="_")
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -92,7 +92,7 @@ class ChannelMetadata:
 | 
			
		|||
    avatar_url: str
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def download_thumbnail(vinfo: VideoDetails, download_path: Path):
 | 
			
		||||
def download_thumbnail(vinfo: VideoDetails, download_path: Path) -> Path:
 | 
			
		||||
    """
 | 
			
		||||
    Download the thumbnail image of a YouTube video and save it at the given filepath.
 | 
			
		||||
    The thumbnail file ending is added to the path.
 | 
			
		||||
| 
						 | 
				
			
			@ -108,8 +108,7 @@ def download_thumbnail(vinfo: VideoDetails, download_path: Path):
 | 
			
		|||
        logging.info(f"downloading thumbnail {url}...")
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            util.download_image_file(url, download_path)
 | 
			
		||||
            return
 | 
			
		||||
            return util.download_image_file(url, download_path)
 | 
			
		||||
        except requests.HTTPError:
 | 
			
		||||
            logging.warning(f"downloading thumbnail {url} failed")
 | 
			
		||||
            pass
 | 
			
		||||
| 
						 | 
				
			
			@ -159,20 +158,20 @@ def download_audio(
 | 
			
		|||
 | 
			
		||||
def tag_audio(audio_path: Path, vinfo: VideoDetails, cover_path: Path):
 | 
			
		||||
    title_text = f"{vinfo.published.date().isoformat()} {vinfo.title}"
 | 
			
		||||
    comment = f"https://youtu.be/{vinfo.id}\n\n{vinfo.description}"
 | 
			
		||||
 | 
			
		||||
    tag = id3.ID3(audio_path)
 | 
			
		||||
    tag["TPE1"] = id3.TPE1(encoding=3, text=vinfo.channel_name)  # Artist
 | 
			
		||||
    tag["TALB"] = id3.TALB(encoding=3, text=vinfo.channel_name)  # Album
 | 
			
		||||
    tag["TIT2"] = id3.TIT2(encoding=3, text=title_text)  # Title
 | 
			
		||||
    tag["TDRC"] = id3.TDRC(encoding=3, text=vinfo.published.date().isoformat())  # Date
 | 
			
		||||
    tag["COMM"] = id3.COMM(encoding=3, text=comment)  # Comment
 | 
			
		||||
    audio = id3.ID3(audio_path)
 | 
			
		||||
    audio["TPE1"] = id3.TPE1(encoding=3, text=vinfo.channel_name)  # Artist
 | 
			
		||||
    audio["TALB"] = id3.TALB(encoding=3, text=vinfo.channel_name)  # Album
 | 
			
		||||
    audio["TIT2"] = id3.TIT2(encoding=3, text=title_text)  # Title
 | 
			
		||||
    audio["TYER"] = id3.TYER(encoding=3, text=str(vinfo.published.year))  # Year
 | 
			
		||||
    audio["TDAT"] = id3.TDAT(encoding=3, text=vinfo.published.strftime("%d%m"))  # Date
 | 
			
		||||
    audio["COMM"] = id3.COMM(encoding=3, text=f"YT-ID: {vinfo.id}")  # Comment
 | 
			
		||||
 | 
			
		||||
    with open(cover_path, "rb") as albumart:
 | 
			
		||||
        tag["APIC"] = id3.APIC(
 | 
			
		||||
        audio["APIC"] = id3.APIC(
 | 
			
		||||
            encoding=3, mime="image/png", type=3, desc="Cover", data=albumart.read()
 | 
			
		||||
        )
 | 
			
		||||
    tag.save()
 | 
			
		||||
    audio.save()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def channel_url_from_id(channel_id: str) -> str:
 | 
			
		||||
| 
						 | 
				
			
			@ -232,6 +231,22 @@ def get_channel_metadata(channel_url: str) -> ChannelMetadata:
 | 
			
		|||
    return ChannelMetadata(channel_id, name, description, avatar)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def download_avatar(avatar_url: str, download_path: Path) -> Path:
 | 
			
		||||
    """
 | 
			
		||||
    Download the avatar image of a channel. The .jpg file ending
 | 
			
		||||
    is added to the path.
 | 
			
		||||
 | 
			
		||||
    :param avatar_url: Channel avatar URL
 | 
			
		||||
    :param download_path: Download path
 | 
			
		||||
    :return: Path with file ending
 | 
			
		||||
    """
 | 
			
		||||
    logging.info(f"downloading avatar {avatar_url}...")
 | 
			
		||||
 | 
			
		||||
    download_path = download_path.with_suffix(".jpg")
 | 
			
		||||
    util.download_file(avatar_url, download_path)
 | 
			
		||||
    return download_path
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_channel_videos_from_feed(channel_id: str) -> List[VideoScraped]:
 | 
			
		||||
    """
 | 
			
		||||
    Return videos of a channel using YouTube's RSS feed. Using the feed is fast,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,6 +1,8 @@
 | 
			
		|||
import os
 | 
			
		||||
from datetime import datetime
 | 
			
		||||
 | 
			
		||||
import django_rq
 | 
			
		||||
from django.conf import settings
 | 
			
		||||
from django.utils import timezone
 | 
			
		||||
 | 
			
		||||
from ucast.models import Channel, Video
 | 
			
		||||
| 
						 | 
				
			
			@ -19,8 +21,10 @@ def _get_or_create_channel(channel_id: str) -> Channel:
 | 
			
		|||
        channel_slug = Channel.get_new_slug(channel_data.name)
 | 
			
		||||
        channel_folder = store.get_channel_folder(channel_slug)
 | 
			
		||||
 | 
			
		||||
        util.download_image_file(channel_data.avatar_url, channel_folder.file_avatar)
 | 
			
		||||
        util.resize_avatar(channel_folder.file_avatar, channel_folder.file_avatar_sm)
 | 
			
		||||
        avatar_file = youtube.download_avatar(
 | 
			
		||||
            channel_data.avatar_url, channel_folder.file_avatar
 | 
			
		||||
        )
 | 
			
		||||
        util.resize_avatar(avatar_file, channel_folder.file_avatar_sm)
 | 
			
		||||
 | 
			
		||||
        channel = Channel(
 | 
			
		||||
            id=channel_id,
 | 
			
		||||
| 
						 | 
				
			
			@ -77,8 +81,9 @@ def download_video(video: Video):
 | 
			
		|||
    details = youtube.download_audio(video.id, audio_file)
 | 
			
		||||
 | 
			
		||||
    # Download/convert thumbnails
 | 
			
		||||
    tn_path = channel_folder.get_thumbnail(video.slug)
 | 
			
		||||
    youtube.download_thumbnail(details, tn_path)
 | 
			
		||||
    tn_path = youtube.download_thumbnail(
 | 
			
		||||
        details, channel_folder.get_thumbnail(video.slug)
 | 
			
		||||
    )
 | 
			
		||||
    util.resize_thumbnail(tn_path, channel_folder.get_thumbnail(video.slug, True))
 | 
			
		||||
    cover_file = channel_folder.get_cover(video.slug)
 | 
			
		||||
    cover.create_cover_file(
 | 
			
		||||
| 
						 | 
				
			
			@ -115,6 +120,15 @@ def import_channel(channel_id: str, limit: int = None):
 | 
			
		|||
        _load_scraped_video(vid, channel)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def schedule_update_channels():
 | 
			
		||||
    django_rq.get_scheduler().schedule(
 | 
			
		||||
        datetime.utcnow(),
 | 
			
		||||
        update_channels,
 | 
			
		||||
        id="schedule_update_channels",
 | 
			
		||||
        interval=settings.YT_UPDATE_INTERVAL,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def update_channels():
 | 
			
		||||
    """
 | 
			
		||||
    Update all channels from their RSS feeds and download new videos.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,27 +0,0 @@
 | 
			
		|||
import logging
 | 
			
		||||
from datetime import datetime
 | 
			
		||||
 | 
			
		||||
import django_rq
 | 
			
		||||
from django.conf import settings
 | 
			
		||||
 | 
			
		||||
from ucast.tasks import download
 | 
			
		||||
 | 
			
		||||
scheduler = django_rq.get_scheduler()
 | 
			
		||||
log = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def clear_scheduled_jobs():
 | 
			
		||||
    """Delete all scheduled jobs to prevent duplicates"""
 | 
			
		||||
    for job in scheduler.get_jobs():
 | 
			
		||||
        log.debug("Deleting scheduled job %s", job)
 | 
			
		||||
        job.delete()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def register_scheduled_jobs():
 | 
			
		||||
    """Register all scheduled jobs"""
 | 
			
		||||
    scheduler.schedule(
 | 
			
		||||
        datetime.utcnow(),
 | 
			
		||||
        download.update_channels,
 | 
			
		||||
        id="schedule_update_channels",
 | 
			
		||||
        interval=settings.YT_UPDATE_INTERVAL,
 | 
			
		||||
    )
 | 
			
		||||
| 
						 | 
				
			
			@ -1,3 +1,3 @@
 | 
			
		|||
from importlib.resources import files
 | 
			
		||||
 | 
			
		||||
DIR_TESTFILES = files("ucast.tests._testfiles")
 | 
			
		||||
DIR_TESTFILES = files("ucast.tests.testfiles")
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,56 +0,0 @@
 | 
			
		|||
import os
 | 
			
		||||
import tempfile
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
 | 
			
		||||
from ucast.service import storage
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_create_channel_folders(settings):
 | 
			
		||||
    tmpdir_o = tempfile.TemporaryDirectory()
 | 
			
		||||
    tmpdir = Path(tmpdir_o.name)
 | 
			
		||||
    settings.DOWNLOAD_ROOT = tmpdir
 | 
			
		||||
 | 
			
		||||
    store = storage.Storage()
 | 
			
		||||
    cf1 = store.get_channel_folder("ThetaDev")
 | 
			
		||||
    cf2 = store.get_channel_folder("Jeff_Geerling")
 | 
			
		||||
    cf1b = store.get_channel_folder("ThetaDev")
 | 
			
		||||
 | 
			
		||||
    cf1_path = tmpdir / "ThetaDev"
 | 
			
		||||
    cf2_path = tmpdir / "Jeff_Geerling"
 | 
			
		||||
 | 
			
		||||
    assert cf1.dir_root == cf1_path
 | 
			
		||||
    assert cf1b.dir_root == cf1_path
 | 
			
		||||
    assert cf2.dir_root == cf2_path
 | 
			
		||||
 | 
			
		||||
    assert os.path.isdir(cf1_path)
 | 
			
		||||
    assert os.path.isdir(cf2_path)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_channel_folder():
 | 
			
		||||
    tmpdir_o = tempfile.TemporaryDirectory()
 | 
			
		||||
    tmpdir = Path(tmpdir_o.name)
 | 
			
		||||
    ucast_dir = tmpdir / "_ucast"
 | 
			
		||||
 | 
			
		||||
    cf = storage.ChannelFolder(tmpdir)
 | 
			
		||||
 | 
			
		||||
    # Verify internal paths
 | 
			
		||||
    assert cf.file_avatar == ucast_dir / "avatar.jpg"
 | 
			
		||||
    assert cf.file_avatar_sm == ucast_dir / "avatar_sm.webp"
 | 
			
		||||
    assert cf.dir_covers == ucast_dir / "covers"
 | 
			
		||||
    assert cf.dir_thumbnails == ucast_dir / "thumbnails"
 | 
			
		||||
 | 
			
		||||
    # Create the folder
 | 
			
		||||
    assert not cf.does_exist()
 | 
			
		||||
    cf.create()
 | 
			
		||||
    assert cf.does_exist()
 | 
			
		||||
 | 
			
		||||
    assert cf.get_cover("my_video_title") == ucast_dir / "covers" / "my_video_title.png"
 | 
			
		||||
    assert (
 | 
			
		||||
        cf.get_thumbnail("my_video_title")
 | 
			
		||||
        == ucast_dir / "thumbnails" / "my_video_title.webp"
 | 
			
		||||
    )
 | 
			
		||||
    assert (
 | 
			
		||||
        cf.get_thumbnail("my_video_title", True)
 | 
			
		||||
        == ucast_dir / "thumbnails" / "my_video_title_sm.webp"
 | 
			
		||||
    )
 | 
			
		||||
    assert cf.get_audio("my_video_title") == tmpdir / "my_video_title.mp3"
 | 
			
		||||
| 
						 | 
				
			
			@ -1,92 +0,0 @@
 | 
			
		|||
import tempfile
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
 | 
			
		||||
import pytest
 | 
			
		||||
from PIL import Image, ImageChops
 | 
			
		||||
 | 
			
		||||
from ucast import tests
 | 
			
		||||
from ucast.service import util
 | 
			
		||||
 | 
			
		||||
TEST_FILE_URL = "https://yt3.ggpht.com/ytc/AKedOLSnFfmpibLLoqyaYdsF6bJ-zaLPzomII__FrJve1w=s900-c-k-c0x00ffffff-no-rj"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_download_file():
 | 
			
		||||
    tmpdir_o = tempfile.TemporaryDirectory()
 | 
			
		||||
    tmpdir = Path(tmpdir_o.name)
 | 
			
		||||
    download_file = tmpdir / "download.jpg"
 | 
			
		||||
    expected_tn_file = tests.DIR_TESTFILES / "avatar" / "a1.jpg"
 | 
			
		||||
 | 
			
		||||
    util.download_file(TEST_FILE_URL, download_file)
 | 
			
		||||
 | 
			
		||||
    downloaded_avatar = Image.open(download_file)
 | 
			
		||||
    expected_avatar = Image.open(expected_tn_file)
 | 
			
		||||
 | 
			
		||||
    diff = ImageChops.difference(downloaded_avatar, expected_avatar)
 | 
			
		||||
    assert diff.getbbox() is None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_download_image_file():
 | 
			
		||||
    tmpdir_o = tempfile.TemporaryDirectory()
 | 
			
		||||
    tmpdir = Path(tmpdir_o.name)
 | 
			
		||||
    download_file = tmpdir / "download.jpg"
 | 
			
		||||
    expected_tn_file = tests.DIR_TESTFILES / "avatar" / "a1.jpg"
 | 
			
		||||
 | 
			
		||||
    util.download_image_file(TEST_FILE_URL, download_file)
 | 
			
		||||
 | 
			
		||||
    downloaded_avatar = Image.open(download_file)
 | 
			
		||||
    expected_avatar = Image.open(expected_tn_file)
 | 
			
		||||
 | 
			
		||||
    diff = ImageChops.difference(downloaded_avatar, expected_avatar)
 | 
			
		||||
    assert diff.getbbox() is None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_download_image_file_conv():
 | 
			
		||||
    tmpdir_o = tempfile.TemporaryDirectory()
 | 
			
		||||
    tmpdir = Path(tmpdir_o.name)
 | 
			
		||||
    download_file = tmpdir / "download.png"
 | 
			
		||||
    expected_tn_file = tests.DIR_TESTFILES / "avatar" / "a1.jpg"
 | 
			
		||||
 | 
			
		||||
    util.download_image_file(TEST_FILE_URL, download_file)
 | 
			
		||||
 | 
			
		||||
    downloaded_avatar = Image.open(download_file)
 | 
			
		||||
    expected_avatar = Image.open(expected_tn_file)
 | 
			
		||||
 | 
			
		||||
    diff = ImageChops.difference(downloaded_avatar, expected_avatar)
 | 
			
		||||
    assert diff.getbbox() is None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_resize_avatar():
 | 
			
		||||
    tmpdir_o = tempfile.TemporaryDirectory()
 | 
			
		||||
    tmpdir = Path(tmpdir_o.name)
 | 
			
		||||
    source_file = tests.DIR_TESTFILES / "avatar" / "a1.jpg"
 | 
			
		||||
    resized_file = tmpdir / "avatar.webp"
 | 
			
		||||
 | 
			
		||||
    util.resize_avatar(source_file, resized_file)
 | 
			
		||||
 | 
			
		||||
    resized_avatar = Image.open(resized_file)
 | 
			
		||||
    assert resized_avatar.size == (100, 100)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_resize_thumbnail():
 | 
			
		||||
    tmpdir_o = tempfile.TemporaryDirectory()
 | 
			
		||||
    tmpdir = Path(tmpdir_o.name)
 | 
			
		||||
    source_file = tests.DIR_TESTFILES / "thumbnail" / "t1.webp"
 | 
			
		||||
    resized_file = tmpdir / "thumbnail.webp"
 | 
			
		||||
 | 
			
		||||
    util.resize_thumbnail(source_file, resized_file)
 | 
			
		||||
 | 
			
		||||
    resized_thumbnail = Image.open(resized_file)
 | 
			
		||||
    assert resized_thumbnail.size == (360, 202)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    "text,expected_slug",
 | 
			
		||||
    [
 | 
			
		||||
        ("Hello World 👋", "Hello_World"),
 | 
			
		||||
        ("ÄäÖöÜüß", "AaOoUuss"),
 | 
			
		||||
        ("오징어 게임", "ojingeo_geim"),
 | 
			
		||||
    ],
 | 
			
		||||
)
 | 
			
		||||
def test_slug(text: str, expected_slug: str):
 | 
			
		||||
    slug = util.get_slug(text)
 | 
			
		||||
    assert slug == expected_slug
 | 
			
		||||
| 
						 | 
				
			
			@ -1,214 +0,0 @@
 | 
			
		|||
import datetime
 | 
			
		||||
import io
 | 
			
		||||
import re
 | 
			
		||||
import shutil
 | 
			
		||||
import subprocess
 | 
			
		||||
import tempfile
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
 | 
			
		||||
import pytest
 | 
			
		||||
from mutagen import id3
 | 
			
		||||
from PIL import Image, ImageChops
 | 
			
		||||
 | 
			
		||||
from ucast import tests
 | 
			
		||||
from ucast.service import youtube
 | 
			
		||||
 | 
			
		||||
VIDEO_ID_THETADEV = "ZPxEr4YdWt8"
 | 
			
		||||
VIDEO_ID_SHORT = "lcQZ6YwQHiw"
 | 
			
		||||
VIDEO_ID_PERSUASION = "DWjFW7Yq1fA"
 | 
			
		||||
 | 
			
		||||
CHANNEL_ID_THETADEV = "UCGiJh0NZ52wRhYKYnuZI08Q"
 | 
			
		||||
CHANNEL_ID_BLENDER = "UCSMOQeBJ2RAnuFungnQOxLg"
 | 
			
		||||
CHANNEL_URL_BLENDER = "https://www.youtube.com/c/BlenderFoundation"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture(scope="module")
 | 
			
		||||
def video_details() -> youtube.VideoDetails:
 | 
			
		||||
    return youtube.get_video_details(VIDEO_ID_THETADEV)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_download_thumbnail(video_details):
 | 
			
		||||
    tmpdir_o = tempfile.TemporaryDirectory()
 | 
			
		||||
    tmpdir = Path(tmpdir_o.name)
 | 
			
		||||
    tn_file = tmpdir / "thumbnail.webp"
 | 
			
		||||
    expected_tn_file = tests.DIR_TESTFILES / "thumbnail" / "t1.webp"
 | 
			
		||||
 | 
			
		||||
    youtube.download_thumbnail(video_details, tn_file)
 | 
			
		||||
 | 
			
		||||
    tn = Image.open(tn_file)
 | 
			
		||||
    expected_tn = Image.open(expected_tn_file)
 | 
			
		||||
 | 
			
		||||
    diff = ImageChops.difference(tn, expected_tn)
 | 
			
		||||
    assert diff.getbbox() is None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_get_video_details(video_details):
 | 
			
		||||
    assert video_details.id == VIDEO_ID_THETADEV
 | 
			
		||||
    assert video_details.title == "ThetaDev @ Embedded World 2019"
 | 
			
		||||
    assert video_details.channel_id == "UCGiJh0NZ52wRhYKYnuZI08Q"
 | 
			
		||||
    assert (
 | 
			
		||||
        video_details.description
 | 
			
		||||
        == """This february I spent one day at the Embedded World in Nuremberg. They showed tons of interesting electronics stuff, so I had to take some pictures and videos for you to see ;-)
 | 
			
		||||
 | 
			
		||||
Sorry for the late upload, I just didn't have time to edit my footage.
 | 
			
		||||
 | 
			
		||||
Embedded World: https://www.embedded-world.de/
 | 
			
		||||
 | 
			
		||||
My website: https://thdev.org
 | 
			
		||||
Twitter: https://twitter.com/Theta_Dev"""
 | 
			
		||||
    )
 | 
			
		||||
    assert video_details.duration == 267
 | 
			
		||||
    assert not video_details.is_currently_live
 | 
			
		||||
    assert not video_details.is_livestream
 | 
			
		||||
    assert not video_details.is_short
 | 
			
		||||
    assert video_details.published == datetime.datetime(
 | 
			
		||||
        2019, 6, 2, tzinfo=datetime.timezone.utc
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_get_video_details_short():
 | 
			
		||||
    vinfo = youtube.get_video_details(VIDEO_ID_SHORT)
 | 
			
		||||
    assert vinfo.id == VIDEO_ID_SHORT
 | 
			
		||||
    assert (
 | 
			
		||||
        vinfo.title
 | 
			
		||||
        == "Small pink flowers | #shorts | Free Stock Video | \
 | 
			
		||||
creative commons short videos | creative #short"
 | 
			
		||||
    )
 | 
			
		||||
    assert not vinfo.is_currently_live
 | 
			
		||||
    assert not vinfo.is_livestream
 | 
			
		||||
    assert vinfo.is_short
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_download_audio():
 | 
			
		||||
    tmpdir_o = tempfile.TemporaryDirectory()
 | 
			
		||||
    tmpdir = Path(tmpdir_o.name)
 | 
			
		||||
    download_file = tmpdir / "download.mp3"
 | 
			
		||||
 | 
			
		||||
    vinfo = youtube.download_audio(VIDEO_ID_PERSUASION, download_file)
 | 
			
		||||
    assert vinfo.id == VIDEO_ID_PERSUASION
 | 
			
		||||
    assert vinfo.title == "Persuasion (Instrumental) – RYYZN (No Copyright Music)"
 | 
			
		||||
    assert vinfo.duration == 100
 | 
			
		||||
 | 
			
		||||
    # Check with ffmpeg if the audio file is valid
 | 
			
		||||
    res = subprocess.run(
 | 
			
		||||
        ["ffmpeg", "-i", str(download_file)],
 | 
			
		||||
        capture_output=True,
 | 
			
		||||
        universal_newlines=True,
 | 
			
		||||
    )
 | 
			
		||||
    assert "Stream #0:0: Audio: mp3" in res.stderr
 | 
			
		||||
 | 
			
		||||
    match = re.search(r"Duration: (\d{2}:\d{2}:\d{2})", res.stderr)
 | 
			
		||||
    assert match[1] == "00:01:40"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_tag_audio(video_details):
 | 
			
		||||
    tmpdir_o = tempfile.TemporaryDirectory()
 | 
			
		||||
    tmpdir = Path(tmpdir_o.name)
 | 
			
		||||
    audio_file = tmpdir / "audio.mp3"
 | 
			
		||||
    cover_file = tests.DIR_TESTFILES / "cover" / "c1_blur.png"
 | 
			
		||||
    shutil.copyfile(tests.DIR_TESTFILES / "audio" / "audio1.mp3", audio_file)
 | 
			
		||||
 | 
			
		||||
    youtube.tag_audio(audio_file, video_details, cover_file)
 | 
			
		||||
 | 
			
		||||
    tag = id3.ID3(audio_file)
 | 
			
		||||
    assert tag["TPE1"].text[0] == "ThetaDev"
 | 
			
		||||
    assert tag["TALB"].text[0] == "ThetaDev"
 | 
			
		||||
    assert tag["TIT2"].text[0] == "2019-06-02 ThetaDev @ Embedded World 2019"
 | 
			
		||||
    assert tag["TDRC"].text[0].text == "2019-06-02"
 | 
			
		||||
    assert (
 | 
			
		||||
        tag["COMM::XXX"].text[0]
 | 
			
		||||
        == """https://youtu.be/ZPxEr4YdWt8
 | 
			
		||||
 | 
			
		||||
This february I spent one day at the Embedded World in Nuremberg. They showed tons of interesting electronics stuff, so I had to take some pictures and videos for you to see ;-)
 | 
			
		||||
 | 
			
		||||
Sorry for the late upload, I just didn't have time to edit my footage.
 | 
			
		||||
 | 
			
		||||
Embedded World: https://www.embedded-world.de/
 | 
			
		||||
 | 
			
		||||
My website: https://thdev.org
 | 
			
		||||
Twitter: https://twitter.com/Theta_Dev"""
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    tag_cover = tag["APIC:Cover"]
 | 
			
		||||
    assert tag_cover.mime == "image/png"
 | 
			
		||||
 | 
			
		||||
    tag_cover_img = Image.open(io.BytesIO(tag_cover.data))
 | 
			
		||||
    expected_cover_img = Image.open(cover_file)
 | 
			
		||||
    diff = ImageChops.difference(tag_cover_img, expected_cover_img)
 | 
			
		||||
    assert diff.getbbox() is None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    "channel_str,channel_url",
 | 
			
		||||
    [
 | 
			
		||||
        (
 | 
			
		||||
            "https://www.youtube.com/channel/UCGiJh0NZ52wRhYKYnuZI08Q",
 | 
			
		||||
            "https://www.youtube.com/channel/UCGiJh0NZ52wRhYKYnuZI08Q",
 | 
			
		||||
        ),
 | 
			
		||||
        (
 | 
			
		||||
            "https://www.youtube.com/c/MrBeast6000",
 | 
			
		||||
            "https://www.youtube.com/c/MrBeast6000",
 | 
			
		||||
        ),
 | 
			
		||||
        (
 | 
			
		||||
            "https://www.youtube.com/user/LinusTechTips",
 | 
			
		||||
            "https://www.youtube.com/user/LinusTechTips",
 | 
			
		||||
        ),
 | 
			
		||||
        (
 | 
			
		||||
            "UCGiJh0NZ52wRhYKYnuZI08Q",
 | 
			
		||||
            "https://www.youtube.com/channel/UCGiJh0NZ52wRhYKYnuZI08Q",
 | 
			
		||||
        ),
 | 
			
		||||
        (
 | 
			
		||||
            "https://piped.mha.fi/user/LinusTechTips",
 | 
			
		||||
            "https://www.youtube.com/user/LinusTechTips",
 | 
			
		||||
        ),
 | 
			
		||||
    ],
 | 
			
		||||
)
 | 
			
		||||
def test_channel_url_from_str(channel_str: str, channel_url: str):
 | 
			
		||||
    url = youtube.channel_url_from_str(channel_str)
 | 
			
		||||
    assert url == channel_url
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    "channel_url,channel_id,name,avatar_url",
 | 
			
		||||
    [
 | 
			
		||||
        (
 | 
			
		||||
            youtube.channel_url_from_id(CHANNEL_ID_THETADEV),
 | 
			
		||||
            CHANNEL_ID_THETADEV,
 | 
			
		||||
            "ThetaDev",
 | 
			
		||||
            "https://yt3.ggpht.com/ytc/AKedOLSnFfmpibLLoqyaYdsF6bJ-zaLPzomII__FrJve1w=s900-c-k-c0x00ffffff-no-rj",
 | 
			
		||||
        ),
 | 
			
		||||
        (
 | 
			
		||||
            CHANNEL_URL_BLENDER,
 | 
			
		||||
            CHANNEL_ID_BLENDER,
 | 
			
		||||
            "Blender",
 | 
			
		||||
            "https://yt3.ggpht.com/ytc/AKedOLT_31fFSD3FWEBnHZnyZeJx-GPHJwYCQKcEpaq8NQ=s900-c-k-c0x00ffffff-no-rj",
 | 
			
		||||
        ),
 | 
			
		||||
    ],
 | 
			
		||||
)
 | 
			
		||||
def test_channel_metadata(
 | 
			
		||||
    channel_url: str, channel_id: str, name: str, avatar_url: str
 | 
			
		||||
):
 | 
			
		||||
    metadata = youtube.get_channel_metadata(channel_url)
 | 
			
		||||
    assert metadata.id == channel_id
 | 
			
		||||
    assert metadata.name == name
 | 
			
		||||
    assert metadata.avatar_url == avatar_url
 | 
			
		||||
    assert metadata.description
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_get_channel_videos_from_feed():
 | 
			
		||||
    videos = youtube.get_channel_videos_from_feed(CHANNEL_ID_THETADEV)
 | 
			
		||||
    assert videos
 | 
			
		||||
 | 
			
		||||
    v1 = videos[0]
 | 
			
		||||
    assert len(v1.id) == 11
 | 
			
		||||
    assert v1.published.tzinfo == datetime.timezone.utc
 | 
			
		||||
    assert v1.published.second > 0 or v1.published.minute > 0 or v1.published.hour > 0
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_get_channel_videos_from_scraper():
 | 
			
		||||
    videos = youtube.get_channel_videos_from_scraper(CHANNEL_ID_THETADEV)
 | 
			
		||||
    assert videos
 | 
			
		||||
 | 
			
		||||
    v1 = videos[0]
 | 
			
		||||
    assert len(v1.id) == 11
 | 
			
		||||
    assert v1.published is None
 | 
			
		||||
							
								
								
									
										24
									
								
								ucast/tests/test_util.py
									
										
									
									
									
										Normal file
									
								
							
							
						
						| 
						 | 
				
			
			@ -0,0 +1,24 @@
 | 
			
		|||
import tempfile
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
 | 
			
		||||
from PIL import Image, ImageChops
 | 
			
		||||
 | 
			
		||||
from ucast import tests
 | 
			
		||||
from ucast.service import util
 | 
			
		||||
 | 
			
		||||
TEST_FILE_URL = "https://yt3.ggpht.com/ytc/AKedOLSnFfmpibLLoqyaYdsF6bJ-zaLPzomII__FrJve1w=s900-c-k-c0x00ffffff-no-rj"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_download_file():
 | 
			
		||||
    tmpdir_o = tempfile.TemporaryDirectory()
 | 
			
		||||
    tmpdir = Path(tmpdir_o.name)
 | 
			
		||||
    download_file = tmpdir / "download.jpg"
 | 
			
		||||
    expected_tn_file = tests.DIR_TESTFILES / "avatar" / "a1.jpg"
 | 
			
		||||
 | 
			
		||||
    util.download_file(TEST_FILE_URL, download_file)
 | 
			
		||||
 | 
			
		||||
    downloaded_avatar = Image.open(download_file)
 | 
			
		||||
    expected_avatar = Image.open(expected_tn_file)
 | 
			
		||||
 | 
			
		||||
    diff = ImageChops.difference(downloaded_avatar, expected_avatar)
 | 
			
		||||
    assert diff.getbbox() is None
 | 
			
		||||
							
								
								
									
										126
									
								
								ucast/tests/test_youtube.py
									
										
									
									
									
										Normal file
									
								
							
							
						
						| 
						 | 
				
			
			@ -0,0 +1,126 @@
 | 
			
		|||
import datetime
 | 
			
		||||
import re
 | 
			
		||||
import subprocess
 | 
			
		||||
import tempfile
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
 | 
			
		||||
import pytest
 | 
			
		||||
from PIL import Image, ImageChops
 | 
			
		||||
 | 
			
		||||
from ucast import tests
 | 
			
		||||
from ucast.service import youtube
 | 
			
		||||
 | 
			
		||||
VIDEO_ID_THETADEV = "ZPxEr4YdWt8"
 | 
			
		||||
VIDEO_ID_SHORT = "lcQZ6YwQHiw"
 | 
			
		||||
VIDEO_ID_PERSUASION = "DWjFW7Yq1fA"
 | 
			
		||||
 | 
			
		||||
CHANNEL_ID_THETADEV = "UCGiJh0NZ52wRhYKYnuZI08Q"
 | 
			
		||||
CHANNEL_ID_BLENDER = "UCSMOQeBJ2RAnuFungnQOxLg"
 | 
			
		||||
CHANNEL_URL_BLENDER = "https://www.youtube.com/c/BlenderFoundation"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture(scope="module")
 | 
			
		||||
def video_info() -> youtube.VideoDetails:
 | 
			
		||||
    return youtube.get_video_details(VIDEO_ID_THETADEV)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_download_thumbnail(video_info):
 | 
			
		||||
    tmpdir_o = tempfile.TemporaryDirectory()
 | 
			
		||||
    tmpdir = Path(tmpdir_o.name)
 | 
			
		||||
    tn_file = tmpdir / "thumbnail"
 | 
			
		||||
    expected_tn_file = tests.DIR_TESTFILES / "thumbnail" / "t1.webp"
 | 
			
		||||
 | 
			
		||||
    tn_file = youtube.download_thumbnail(video_info, tn_file)
 | 
			
		||||
    assert tn_file.suffix == ".webp"
 | 
			
		||||
 | 
			
		||||
    tn = Image.open(tn_file)
 | 
			
		||||
    expected_tn = Image.open(expected_tn_file)
 | 
			
		||||
 | 
			
		||||
    diff = ImageChops.difference(tn, expected_tn)
 | 
			
		||||
    assert diff.getbbox() is None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_get_video_info(video_info):
 | 
			
		||||
    assert video_info.id == VIDEO_ID_THETADEV
 | 
			
		||||
    assert video_info.title == "ThetaDev @ Embedded World 2019"
 | 
			
		||||
    assert video_info.channel_id == "UCGiJh0NZ52wRhYKYnuZI08Q"
 | 
			
		||||
    assert (
 | 
			
		||||
        video_info.description
 | 
			
		||||
        == """This february I spent one day at the Embedded World in Nuremberg. They showed tons of interesting electronics stuff, so I had to take some pictures and videos for you to see ;-)
 | 
			
		||||
 | 
			
		||||
Sorry for the late upload, I just didn't have time to edit my footage.
 | 
			
		||||
 | 
			
		||||
Embedded World: https://www.embedded-world.de/
 | 
			
		||||
 | 
			
		||||
My website: https://thdev.org
 | 
			
		||||
Twitter: https://twitter.com/Theta_Dev"""
 | 
			
		||||
    )
 | 
			
		||||
    assert video_info.duration == 267
 | 
			
		||||
    assert not video_info.is_currently_live
 | 
			
		||||
    assert not video_info.is_livestream
 | 
			
		||||
    assert not video_info.is_short
 | 
			
		||||
    assert video_info.published == datetime.datetime(
 | 
			
		||||
        2019, 6, 2, tzinfo=datetime.timezone.utc
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_get_video_info_short():
 | 
			
		||||
    vinfo = youtube.get_video_details(VIDEO_ID_SHORT)
 | 
			
		||||
    assert vinfo.id == VIDEO_ID_SHORT
 | 
			
		||||
    assert (
 | 
			
		||||
        vinfo.title
 | 
			
		||||
        == "Small pink flowers | #shorts | Free Stock Video | \
 | 
			
		||||
creative commons short videos | creative #short"
 | 
			
		||||
    )
 | 
			
		||||
    assert not vinfo.is_currently_live
 | 
			
		||||
    assert not vinfo.is_livestream
 | 
			
		||||
    assert vinfo.is_short
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_download_video():
 | 
			
		||||
    tmpdir_o = tempfile.TemporaryDirectory()
 | 
			
		||||
    tmpdir = Path(tmpdir_o.name)
 | 
			
		||||
    download_file = tmpdir / "download.mp3"
 | 
			
		||||
 | 
			
		||||
    vinfo = youtube.download_audio(VIDEO_ID_PERSUASION, download_file)
 | 
			
		||||
    assert vinfo.id == VIDEO_ID_PERSUASION
 | 
			
		||||
    assert vinfo.title == "Persuasion (Instrumental) – RYYZN (No Copyright Music)"
 | 
			
		||||
    assert vinfo.duration == 100
 | 
			
		||||
 | 
			
		||||
    # Check with ffmpeg if the audio file is valid
 | 
			
		||||
    res = subprocess.run(
 | 
			
		||||
        ["ffmpeg", "-i", str(download_file)],
 | 
			
		||||
        capture_output=True,
 | 
			
		||||
        universal_newlines=True,
 | 
			
		||||
    )
 | 
			
		||||
    assert "Stream #0:0: Audio: mp3" in res.stderr
 | 
			
		||||
 | 
			
		||||
    match = re.search(r"Duration: (\d{2}:\d{2}:\d{2})", res.stderr)
 | 
			
		||||
    assert match[1] == "00:01:40"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    "channel_url,channel_id,name,avatar_url",
 | 
			
		||||
    [
 | 
			
		||||
        (
 | 
			
		||||
            youtube.channel_url_from_id(CHANNEL_ID_THETADEV),
 | 
			
		||||
            CHANNEL_ID_THETADEV,
 | 
			
		||||
            "ThetaDev",
 | 
			
		||||
            "https://yt3.ggpht.com/ytc/AKedOLSnFfmpibLLoqyaYdsF6bJ-zaLPzomII__FrJve1w=s900-c-k-c0x00ffffff-no-rj",
 | 
			
		||||
        ),
 | 
			
		||||
        (
 | 
			
		||||
            CHANNEL_URL_BLENDER,
 | 
			
		||||
            CHANNEL_ID_BLENDER,
 | 
			
		||||
            "Blender",
 | 
			
		||||
            "https://yt3.ggpht.com/ytc/AKedOLT_31fFSD3FWEBnHZnyZeJx-GPHJwYCQKcEpaq8NQ=s900-c-k-c0x00ffffff-no-rj",
 | 
			
		||||
        ),
 | 
			
		||||
    ],
 | 
			
		||||
)
 | 
			
		||||
def test_channel_metadata(
 | 
			
		||||
    channel_url: str, channel_id: str, name: str, avatar_url: str
 | 
			
		||||
):
 | 
			
		||||
    metadata = youtube.get_channel_metadata(channel_url)
 | 
			
		||||
    assert metadata.id == channel_id
 | 
			
		||||
    assert metadata.name == name
 | 
			
		||||
    assert metadata.avatar_url == avatar_url
 | 
			
		||||
    assert metadata.description
 | 
			
		||||
| 
		 Before Width: | Height: | Size: 186 KiB After Width: | Height: | Size: 186 KiB  | 
| 
		 Before Width: | Height: | Size: 32 KiB After Width: | Height: | Size: 32 KiB  | 
| 
		 Before Width: | Height: | Size: 53 KiB After Width: | Height: | Size: 53 KiB  | 
| 
		 Before Width: | Height: | Size: 26 KiB After Width: | Height: | Size: 26 KiB  | 
| 
		 Before Width: | Height: | Size: 268 KiB After Width: | Height: | Size: 268 KiB  | 
| 
		 Before Width: | Height: | Size: 234 KiB After Width: | Height: | Size: 234 KiB  | 
| 
		 Before Width: | Height: | Size: 218 KiB After Width: | Height: | Size: 218 KiB  | 
| 
		 Before Width: | Height: | Size: 215 KiB After Width: | Height: | Size: 215 KiB  | 
| 
		 Before Width: | Height: | Size: 183 KiB After Width: | Height: | Size: 183 KiB  | 
| 
		 Before Width: | Height: | Size: 216 KiB After Width: | Height: | Size: 216 KiB  | 
| 
		 Before Width: | Height: | Size: 173 KiB After Width: | Height: | Size: 173 KiB  | 
| 
						 | 
				
			
			@ -1,11 +1,10 @@
 | 
			
		|||
### Quellen der Thumbnails/Avatarbilder/Audiodateien zum Testen
 | 
			
		||||
### Quellen der Thumbnails/Avatarbilder zum Testen
 | 
			
		||||
 | 
			
		||||
- a1/t1: [ThetaDev @ Embedded World 2019](https://www.youtube.com/watch?v=ZPxEr4YdWt8), by [ThetaDev](https://www.youtube.com/channel/UCGiJh0NZ52wRhYKYnuZI08Q) (CC-BY)
 | 
			
		||||
- a2/t2: [Sintel - Open Movie by Blender Foundation](https://www.youtube.com/watch?v=eRsGyueVLvQ), by [Blender](https://www.youtube.com/c/BlenderFoundation) (CC-BY)
 | 
			
		||||
- a3/t3: [Systemabsturz Teaser zur DiVOC bb3](https://www.youtube.com/watch?v=uFqgQ35wyYY), by [media.ccc.de](https://www.youtube.com/channel/UC2TXq_t06Hjdr2g_KdKpHQg) (CC-BY)
 | 
			
		||||
- audio1: [No copyright intro free fire intro](https://www.youtube.com/watch?v=I0RRENheeTo), by [Shahzaib Hassan](https://www.youtube.com/channel/UCmLTTbctUZobNQrr8RtX8uQ), (CC-BY)
 | 
			
		||||
 | 
			
		||||
### Weitere Testvideos
 | 
			
		||||
 | 
			
		||||
- [Persuasion (Instrumental) – RYYZN (No Copyright Music)](https://www.youtube.com/watch?v=DWjFW7Yq1fA), by [RYYZN](https://soundcloud.com/ryyzn) (CC-BY)
 | 
			
		||||
- [Small pink flowers | #shorts | Free Stock Video](https://www.youtube.com/watch?v=lcQZ6YwQHiw), by [Shahzaib Hassan](https://www.youtube.com/channel/UCmLTTbctUZobNQrr8RtX8uQ), (CC-BY)
 | 
			
		||||
- [Small pink flowers | #shorts | Free Stock Video](https://www.youtube.com/watch?v=lcQZ6YwQHiw), by [Shahzaib Hassan](https://www.youtube.com/channel/UCmLTTbctUZobNQrr8RtX8uQ), (CC-BY)
 | 
			
		||||
| 
		 Before Width: | Height: | Size: 92 KiB After Width: | Height: | Size: 92 KiB  | 
| 
		 Before Width: | Height: | Size: 20 KiB After Width: | Height: | Size: 20 KiB  | 
| 
		 Before Width: | Height: | Size: 28 KiB After Width: | Height: | Size: 28 KiB  |