import base64 import datetime from Cryptodome import Random from django.contrib.auth.models import AbstractUser from django.db import models from django.utils import timezone from ucast.service import util def _get_unique_slug( str_in: str, objects: models.query.QuerySet, model_name: str ) -> str: """ Get a new, unique slug for a database item :param str_in: Input string to slugify :param objects: Django query set :return: Slug """ original_slug = util.get_slug(str_in) slug = original_slug for i in range(1, objects.count() + 2): if not objects.filter(slug=slug).exists(): return slug slug = f"{original_slug}_{i}" raise Exception(f"unique {model_name} slug for {original_slug} could not be found") class Channel(models.Model): channel_id = models.CharField(max_length=30, db_index=True) name = models.CharField(max_length=100) slug = models.CharField(max_length=100, db_index=True) description = models.TextField() subscribers = models.CharField(max_length=20, null=True) active = models.BooleanField(default=True) skip_livestreams = models.BooleanField(default=True) skip_shorts = models.BooleanField(default=True) avatar_url = models.CharField(max_length=250, null=True) last_update = models.DateTimeField(default=timezone.now) @classmethod def get_new_slug(cls, name: str) -> str: return _get_unique_slug(name, cls.objects, "channel") def get_full_description(self) -> str: desc = f"https://www.youtube.com/channel/{self.channel_id}" if self.description: desc = f"{self.description}\n\n{desc}" return desc def get_absolute_url(self) -> str: return "https://www.youtube.com/channel/" + self.channel_id def should_download(self, video: "Video") -> bool: if self.skip_livestreams and video.is_livestream: return False if self.skip_shorts and video.is_short: return False return True def download_size(self) -> int: return self.video_set.aggregate(models.Sum("download_size")).get( "download_size__sum" ) def __str__(self): return self.name class Video(models.Model): video_id = models.CharField(max_length=30, db_index=True) title = models.CharField(max_length=200) slug = models.CharField(max_length=209, db_index=True) channel = models.ForeignKey(Channel, on_delete=models.CASCADE) published = models.DateTimeField() downloaded = models.DateTimeField(null=True) description = models.TextField() duration = models.IntegerField() is_livestream = models.BooleanField(default=False) is_short = models.BooleanField(default=False) download_size = models.IntegerField(null=True) is_deleted = models.BooleanField(default=False) @classmethod def get_new_slug(cls, title: str, date: datetime.date, channel_id: str) -> str: title_w_date = f"{date.strftime('%Y%m%d')}_{title}" return _get_unique_slug( title_w_date, cls.objects.filter(channel__channel_id=channel_id), "video" ) def get_full_description(self) -> str: desc = f"https://youtu.be/{self.video_id}" if self.description: desc = f"{self.description}\n\n{desc}" return desc def get_absolute_url(self) -> str: return f"https://www.youtube.com/watch?v={self.video_id}" def __str__(self): return self.title class User(AbstractUser): feed_key = models.CharField(max_length=50, null=True, default=None) def generate_feed_key(self): for _ in range(0, User.objects.count()): key = base64.urlsafe_b64encode(Random.get_random_bytes(18)).decode() if not User.objects.filter(feed_key=key).exists(): self.feed_key = key self.save() return raise Exception("unique feed key could not be found") def get_feed_key(self) -> str: if self.feed_key is None: self.generate_feed_key() return self.feed_key