ucast/ucast/service/scrapetube.py

244 lines
8 KiB
Python

"""
Based on the scrapetube package from dermasmid (MIT License)
https://github.com/dermasmid/scrapetube
"""
import json
import time
from typing import Generator, Literal, Optional
import requests
def get_channel(
channel_url: str,
limit: int = None,
sleep: int = 1,
sort_by: Literal["newest", "oldest", "popular"] = "newest",
) -> Generator[dict, None, None]:
"""
Get videos for a channel.
:param channel_url: The url of the channel you want to get the videos for.
:param limit: Limit the number of videos you want to get.
:param sleep: Seconds to sleep between API calls to youtube, in order to prevent
getting blocked. Defaults to ``1``.
:param sort_by: In what order to retrive to videos. Pass one of the following values.
``"newest"``: Get the new videos first.
``"oldest"``: Get the old videos first.
``"popular"``: Get the popular videos first.
Defaults to ``"newest"``.
:return: Generator providing the videos
"""
sort_by_map = {"newest": "dd", "oldest": "da", "popular": "p"}
url = "{url}/videos?view=0&sort={sort_by}&flow=grid".format(
url=channel_url,
sort_by=sort_by_map[sort_by],
)
api_endpoint = "https://www.youtube.com/youtubei/v1/browse"
videos = _get_videos(url, api_endpoint, "gridVideoRenderer", limit, sleep)
for video in videos:
yield video
def get_channel_metadata(channel_url: str) -> dict:
"""
Get metadata of a channel.
:param channel_url: Channel URL
:return: Raw channel metadata
"""
session = _new_session()
url = f"{channel_url}/videos?view=0&flow=grid"
html = _get_initial_data(session, url)
return json.loads(_get_json_from_html(html, "var ytInitialData = ", 0, "};") + "}")
def get_playlist(
playlist_id: str, limit: int = None, sleep: int = 1
) -> Generator[dict, None, None]:
"""
Get videos for a playlist.
:param playlist_id: The playlist id from the playlist you want to get the videos for.
:param limit: Limit the number of videos you want to get.
:param sleep: Seconds to sleep between API calls to youtube, in order to prevent
getting blocked. Defaults to ``1``.
:return: Generator providing the videos
"""
url = f"https://www.youtube.com/playlist?list={playlist_id}"
api_endpoint = "https://www.youtube.com/youtubei/v1/browse"
videos = _get_videos(url, api_endpoint, "playlistVideoRenderer", limit, sleep)
for video in videos:
yield video
def get_search(
query: str,
limit: int = None,
sleep: int = 1,
sort_by: Literal["relevance", "upload_date", "view_count", "rating"] = "relevance",
results_type: Literal["video", "channel", "playlist", "movie"] = "video",
) -> Generator[dict, None, None]:
"""
Search youtube and get videos.
:param query: The term you want to search for.
:param limit: Limit the number of videos you want to get.
:param sleep: Seconds to sleep between API calls to youtube, in order to prevent
getting blocked. Defaults to ``1``.
:param sort_by: In what order to retrive to videos. Pass one of the following values.
``"relevance"``: Get the new videos in order of relevance.
``"upload_date"``: Get the new videos first.
``"view_count"``: Get the popular videos first.
``"rating"``: Get videos with more likes first.
Defaults to ``"relevance"``.
:param results_type: What type you want to search for.
Pass one of the following values: ``"video"|"channel"|
"playlist"|"movie"``. Defaults to ``"video"``.
:return: Generator providing the videos
"""
sort_by_map = {
"relevance": "A",
"upload_date": "I",
"view_count": "M",
"rating": "E",
}
results_type_map = {
"video": ["B", "videoRenderer"],
"channel": ["C", "channelRenderer"],
"playlist": ["D", "playlistRenderer"],
"movie": ["E", "videoRenderer"],
}
param_string = f"CA{sort_by_map[sort_by]}SAhA{results_type_map[results_type][0]}"
url = f"https://www.youtube.com/results?search_query={query}&sp={param_string}"
api_endpoint = "https://www.youtube.com/youtubei/v1/search"
videos = _get_videos(
url, api_endpoint, results_type_map[results_type][1], limit, sleep
)
for video in videos:
yield video
def _get_videos(
url: str, api_endpoint: str, selector: str, limit: int, sleep: int
) -> Generator[dict, None, None]:
session = _new_session()
is_first = True
quit = False
count = 0
while True:
if is_first:
html = _get_initial_data(session, url)
client = json.loads(
_get_json_from_html(html, "INNERTUBE_CONTEXT", 2, '"}},') + '"}}'
)["client"]
api_key = _get_json_from_html(html, "innertubeApiKey", 3)
session.headers["X-YouTube-Client-Name"] = "1"
session.headers["X-YouTube-Client-Version"] = client["clientVersion"]
data = json.loads(
_get_json_from_html(html, "var ytInitialData = ", 0, "};") + "}"
)
next_data = _get_next_data(data)
is_first = False
else:
data = _get_ajax_data(session, api_endpoint, api_key, next_data, client)
next_data = _get_next_data(data)
for result in _get_videos_items(data, selector):
try:
count += 1
yield result
if count == limit:
quit = True
break
except GeneratorExit:
quit = True
break
if not next_data or quit:
break
time.sleep(sleep)
session.close()
def _new_session() -> requests.Session:
session = requests.Session()
session.headers[
"User-Agent"
] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.101 Safari/537.36"
session.headers["Accept-Language"] = "en"
return session
def _get_initial_data(session: requests.Session, url: str) -> str:
response = session.get(url)
response.raise_for_status()
if "uxe=" in response.request.url:
session.cookies.set("CONSENT", "YES+cb", domain=".youtube.com")
response = session.get(url)
html = response.text
return html
def _get_ajax_data(
session: requests.Session,
api_endpoint: str,
api_key: str,
next_data: dict,
client: dict,
) -> dict:
data = {
"context": {"clickTracking": next_data["click_params"], "client": client},
"continuation": next_data["token"],
}
response = session.post(api_endpoint, params={"key": api_key}, json=data)
return response.json()
def _get_json_from_html(
html: str, key: str, num_chars: int = 2, stop: str = '"'
) -> str:
pos_begin = html.find(key) + len(key) + num_chars
pos_end = html.find(stop, pos_begin)
return html[pos_begin:pos_end]
def _get_next_data(data: dict) -> Optional[dict]:
raw_next_data = next(_search_dict(data, "continuationEndpoint"), None)
if not raw_next_data:
return None
next_data = {
"token": raw_next_data["continuationCommand"]["token"],
"click_params": {"clickTrackingParams": raw_next_data["clickTrackingParams"]},
}
return next_data
def _search_dict(partial: dict, search_key: str) -> Generator[dict, None, None]:
stack = [partial]
while stack:
current_item = stack.pop(0)
if isinstance(current_item, dict):
for key, value in current_item.items():
if key == search_key:
yield value
else:
stack.append(value)
elif isinstance(current_item, list):
for value in current_item:
stack.append(value)
def _get_videos_items(data: dict, selector: str) -> Generator[dict, None, None]:
return _search_dict(data, selector)