ucast/ucast/feed.py

195 lines
7.4 KiB
Python

import re
from xml.sax import saxutils
from django import http
from django.conf import settings
from django.contrib.sites.shortcuts import get_current_site
from django.contrib.syndication.views import Feed, add_domain
from django.utils import feedgenerator
from django.utils.feedgenerator import Rss201rev2Feed, rfc2822_date
from django.utils.xmlutils import SimplerXMLGenerator
from ucast.models import Channel, Video
from ucast.service import util
URL_REGEX = r"""http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"""
class PodcastFeedType(Rss201rev2Feed):
content_type = "application/xml; charset=utf-8"
def rss_attributes(self):
attrs = super().rss_attributes()
attrs["xmlns:itunes"] = "http://www.itunes.com/dtds/podcast-1.0.dtd"
return attrs
@staticmethod
def _xml_escape(text: str) -> str:
text = saxutils.escape(text)
text = re.sub(URL_REGEX, lambda m: f'<a href="{m[0]}">{m[0]}</a>', text)
text = text.replace("\n", "<br>")
return text
@staticmethod
def _add_text_element(handler: SimplerXMLGenerator, text: str):
handler.startElement("description", {})
handler.ignorableWhitespace(f"<![CDATA[{PodcastFeedType._xml_escape(text)}]]>")
handler.endElement("description")
@staticmethod
def _format_secs(secs: int) -> str:
mm, ss = divmod(secs, 60)
hh, mm = divmod(mm, 60)
s = "%02d:%02d:%02d" % (hh, mm, ss)
return s
def add_root_elements(self, handler: SimplerXMLGenerator):
handler.addQuickElement("title", self.feed["title"])
handler.addQuickElement("link", self.feed["link"])
self._add_text_element(handler, self.feed["description"])
if self.feed["feed_url"] is not None:
handler.addQuickElement(
"atom:link", None, {"rel": "self", "href": self.feed["feed_url"]}
)
if self.feed["language"] is not None:
handler.addQuickElement("language", self.feed["language"])
for cat in self.feed["categories"]:
handler.addQuickElement("category", cat)
if self.feed["feed_copyright"] is not None:
handler.addQuickElement("copyright", self.feed["feed_copyright"])
handler.addQuickElement("lastBuildDate", rfc2822_date(self.latest_post_date()))
if self.feed["ttl"] is not None:
handler.addQuickElement("ttl", self.feed["ttl"])
if self.feed.get("image_url") is not None:
handler.startElement("image", {})
handler.addQuickElement("url", self.feed["image_url"])
handler.addQuickElement("title", self.feed["title"])
handler.addQuickElement("link", self.feed["link"])
handler.endElement("image")
handler.addQuickElement(
"itunes:image", None, {"href": self.feed["image_url"]}
)
def add_item_elements(self, handler: SimplerXMLGenerator, item):
handler.addQuickElement("title", item["title"])
handler.addQuickElement("link", item["link"])
if item["description"] is not None:
self._add_text_element(handler, item["description"])
# Author information.
if item["author_name"] and item["author_email"]:
handler.addQuickElement(
"author", "%s (%s)" % (item["author_email"], item["author_name"])
)
elif item["author_email"]:
handler.addQuickElement("author", item["author_email"])
elif item["author_name"]:
handler.addQuickElement(
"dc:creator",
item["author_name"],
{"xmlns:dc": "http://purl.org/dc/elements/1.1/"},
)
if item["pubdate"] is not None:
handler.addQuickElement("pubDate", rfc2822_date(item["pubdate"]))
if item["comments"] is not None:
handler.addQuickElement("comments", item["comments"])
if item["unique_id"] is not None:
guid_attrs = {}
if isinstance(item.get("unique_id_is_permalink"), bool):
guid_attrs["isPermaLink"] = str(item["unique_id_is_permalink"]).lower()
handler.addQuickElement("guid", item["unique_id"], guid_attrs)
if item["ttl"] is not None:
handler.addQuickElement("ttl", item["ttl"])
# Enclosure.
if item["enclosures"]:
enclosures = list(item["enclosures"])
if len(enclosures) > 1:
raise ValueError(
"RSS feed items may only have one enclosure, see "
"http://www.rssboard.org/rss-profile#element-channel-item-enclosure"
)
enclosure = enclosures[0]
handler.addQuickElement(
"enclosure",
"",
{
"url": enclosure.url,
"length": enclosure.length,
"type": enclosure.mime_type,
},
)
# Categories.
for cat in item["categories"]:
handler.addQuickElement("category", cat)
# Cover image
if item.get("image_url"):
handler.addQuickElement("itunes:image", None, {"href": item["image_url"]})
# Duration
if item.get("duration"):
handler.addQuickElement(
"itunes:duration", self._format_secs(item["duration"])
)
class UcastFeed(Feed):
feed_type = PodcastFeedType
def get_object(self, request, *args, **kwargs):
channel_slug = kwargs["channel"]
return Channel.objects.get(slug=channel_slug)
def get_feed(self, channel: Channel, request: http.HttpRequest):
feed = self.feed_type(
title=channel.name,
link=channel.get_absolute_url(),
description=channel.description,
language=self.language,
feed_url=self.full_link_url(request, f"/feed/{channel.slug}"),
image_url=self.full_link_url(request, f"/files/avatar/{channel.slug}.jpg"),
)
for video in channel.video_set.filter(downloaded__isnull=False).order_by(
"-published"
)[: settings.FEED_MAX_ITEMS]:
feed.add_item(
title=video.title,
link=video.get_absolute_url(),
description=video.description,
unique_id=video.get_absolute_url(),
unique_id_is_permalink=True,
enclosures=self.item_enclosures_domain(video, request),
pubdate=video.published,
updateddate=video.downloaded,
image_url=self.full_link_url(
request, f"/files/cover/{channel.slug}/{video.slug}.png"
),
duration=video.duration,
)
return feed
@staticmethod
def full_link_url(request: http.HttpRequest, page_url: str) -> str:
anon_url = add_domain(
get_current_site(request).domain,
page_url,
request.is_secure(),
)
return util.add_key_to_url(anon_url, request.user.get_feed_key())
def item_enclosures_domain(self, item: Video, request: http.HttpRequest):
enc = feedgenerator.Enclosure(
url=self.full_link_url(
request, f"/files/audio/{item.channel.slug}/{item.slug}.mp3"
),
length=str(item.download_size),
mime_type="audio/mpeg",
)
return [enc]