195 lines
7.4 KiB
Python
195 lines
7.4 KiB
Python
import re
|
|
from xml.sax import saxutils
|
|
|
|
from django import http
|
|
from django.conf import settings
|
|
from django.contrib.sites.shortcuts import get_current_site
|
|
from django.contrib.syndication.views import Feed, add_domain
|
|
from django.utils import feedgenerator
|
|
from django.utils.feedgenerator import Rss201rev2Feed, rfc2822_date
|
|
from django.utils.xmlutils import SimplerXMLGenerator
|
|
|
|
from ucast.models import Channel, Video
|
|
from ucast.service import util
|
|
|
|
URL_REGEX = r"""http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"""
|
|
|
|
|
|
class PodcastFeedType(Rss201rev2Feed):
|
|
content_type = "application/xml; charset=utf-8"
|
|
|
|
def rss_attributes(self):
|
|
attrs = super().rss_attributes()
|
|
attrs["xmlns:itunes"] = "http://www.itunes.com/dtds/podcast-1.0.dtd"
|
|
return attrs
|
|
|
|
@staticmethod
|
|
def _xml_escape(text: str) -> str:
|
|
text = saxutils.escape(text)
|
|
text = re.sub(URL_REGEX, lambda m: f'<a href="{m[0]}">{m[0]}</a>', text)
|
|
text = text.replace("\n", "<br>")
|
|
return text
|
|
|
|
@staticmethod
|
|
def _add_text_element(handler: SimplerXMLGenerator, text: str):
|
|
handler.startElement("description", {})
|
|
handler.ignorableWhitespace(f"<![CDATA[{PodcastFeedType._xml_escape(text)}]]>")
|
|
handler.endElement("description")
|
|
|
|
@staticmethod
|
|
def _format_secs(secs: int) -> str:
|
|
mm, ss = divmod(secs, 60)
|
|
hh, mm = divmod(mm, 60)
|
|
s = "%02d:%02d:%02d" % (hh, mm, ss)
|
|
return s
|
|
|
|
def add_root_elements(self, handler: SimplerXMLGenerator):
|
|
handler.addQuickElement("title", self.feed["title"])
|
|
handler.addQuickElement("link", self.feed["link"])
|
|
self._add_text_element(handler, self.feed["description"])
|
|
if self.feed["feed_url"] is not None:
|
|
handler.addQuickElement(
|
|
"atom:link", None, {"rel": "self", "href": self.feed["feed_url"]}
|
|
)
|
|
if self.feed["language"] is not None:
|
|
handler.addQuickElement("language", self.feed["language"])
|
|
for cat in self.feed["categories"]:
|
|
handler.addQuickElement("category", cat)
|
|
if self.feed["feed_copyright"] is not None:
|
|
handler.addQuickElement("copyright", self.feed["feed_copyright"])
|
|
handler.addQuickElement("lastBuildDate", rfc2822_date(self.latest_post_date()))
|
|
if self.feed["ttl"] is not None:
|
|
handler.addQuickElement("ttl", self.feed["ttl"])
|
|
|
|
if self.feed.get("image_url") is not None:
|
|
handler.startElement("image", {})
|
|
handler.addQuickElement("url", self.feed["image_url"])
|
|
handler.addQuickElement("title", self.feed["title"])
|
|
handler.addQuickElement("link", self.feed["link"])
|
|
handler.endElement("image")
|
|
|
|
handler.addQuickElement(
|
|
"itunes:image", None, {"href": self.feed["image_url"]}
|
|
)
|
|
|
|
def add_item_elements(self, handler: SimplerXMLGenerator, item):
|
|
handler.addQuickElement("title", item["title"])
|
|
handler.addQuickElement("link", item["link"])
|
|
|
|
if item["description"] is not None:
|
|
self._add_text_element(handler, item["description"])
|
|
|
|
# Author information.
|
|
if item["author_name"] and item["author_email"]:
|
|
handler.addQuickElement(
|
|
"author", "%s (%s)" % (item["author_email"], item["author_name"])
|
|
)
|
|
elif item["author_email"]:
|
|
handler.addQuickElement("author", item["author_email"])
|
|
elif item["author_name"]:
|
|
handler.addQuickElement(
|
|
"dc:creator",
|
|
item["author_name"],
|
|
{"xmlns:dc": "http://purl.org/dc/elements/1.1/"},
|
|
)
|
|
|
|
if item["pubdate"] is not None:
|
|
handler.addQuickElement("pubDate", rfc2822_date(item["pubdate"]))
|
|
if item["comments"] is not None:
|
|
handler.addQuickElement("comments", item["comments"])
|
|
if item["unique_id"] is not None:
|
|
guid_attrs = {}
|
|
if isinstance(item.get("unique_id_is_permalink"), bool):
|
|
guid_attrs["isPermaLink"] = str(item["unique_id_is_permalink"]).lower()
|
|
handler.addQuickElement("guid", item["unique_id"], guid_attrs)
|
|
if item["ttl"] is not None:
|
|
handler.addQuickElement("ttl", item["ttl"])
|
|
|
|
# Enclosure.
|
|
if item["enclosures"]:
|
|
enclosures = list(item["enclosures"])
|
|
if len(enclosures) > 1:
|
|
raise ValueError(
|
|
"RSS feed items may only have one enclosure, see "
|
|
"http://www.rssboard.org/rss-profile#element-channel-item-enclosure"
|
|
)
|
|
enclosure = enclosures[0]
|
|
handler.addQuickElement(
|
|
"enclosure",
|
|
"",
|
|
{
|
|
"url": enclosure.url,
|
|
"length": enclosure.length,
|
|
"type": enclosure.mime_type,
|
|
},
|
|
)
|
|
|
|
# Categories.
|
|
for cat in item["categories"]:
|
|
handler.addQuickElement("category", cat)
|
|
|
|
# Cover image
|
|
if item.get("image_url"):
|
|
handler.addQuickElement("itunes:image", None, {"href": item["image_url"]})
|
|
|
|
# Duration
|
|
if item.get("duration"):
|
|
handler.addQuickElement(
|
|
"itunes:duration", self._format_secs(item["duration"])
|
|
)
|
|
|
|
|
|
class UcastFeed(Feed):
|
|
feed_type = PodcastFeedType
|
|
|
|
def get_object(self, request, *args, **kwargs):
|
|
channel_slug = kwargs["channel"]
|
|
return Channel.objects.get(slug=channel_slug)
|
|
|
|
def get_feed(self, channel: Channel, request: http.HttpRequest):
|
|
feed = self.feed_type(
|
|
title=channel.name,
|
|
link=channel.get_absolute_url(),
|
|
description=channel.description,
|
|
language=self.language,
|
|
feed_url=self.full_link_url(request, f"/feed/{channel.slug}"),
|
|
image_url=self.full_link_url(request, f"/files/avatar/{channel.slug}.jpg"),
|
|
)
|
|
|
|
for video in channel.video_set.filter(downloaded__isnull=False).order_by(
|
|
"-published"
|
|
)[: settings.FEED_MAX_ITEMS]:
|
|
feed.add_item(
|
|
title=video.title,
|
|
link=video.get_absolute_url(),
|
|
description=video.description,
|
|
unique_id=video.get_absolute_url(),
|
|
unique_id_is_permalink=True,
|
|
enclosures=self.item_enclosures_domain(video, request),
|
|
pubdate=video.published,
|
|
updateddate=video.downloaded,
|
|
image_url=self.full_link_url(
|
|
request, f"/files/cover/{channel.slug}/{video.slug}.png"
|
|
),
|
|
duration=video.duration,
|
|
)
|
|
return feed
|
|
|
|
@staticmethod
|
|
def full_link_url(request: http.HttpRequest, page_url: str) -> str:
|
|
anon_url = add_domain(
|
|
get_current_site(request).domain,
|
|
page_url,
|
|
request.is_secure(),
|
|
)
|
|
return util.add_key_to_url(anon_url, request.user.get_feed_key())
|
|
|
|
def item_enclosures_domain(self, item: Video, request: http.HttpRequest):
|
|
enc = feedgenerator.Enclosure(
|
|
url=self.full_link_url(
|
|
request, f"/files/audio/{item.channel.slug}/{item.slug}.mp3"
|
|
),
|
|
length=str(item.download_size),
|
|
mime_type="audio/mpeg",
|
|
)
|
|
return [enc]
|