244 lines
8 KiB
Python
244 lines
8 KiB
Python
"""
|
|
Based on the scrapetube package from dermasmid (MIT License)
|
|
https://github.com/dermasmid/scrapetube
|
|
"""
|
|
import json
|
|
import time
|
|
from typing import Generator, Literal, Optional
|
|
|
|
import requests
|
|
|
|
|
|
def get_channel(
|
|
channel_url: str,
|
|
limit: int = None,
|
|
sleep: int = 1,
|
|
sort_by: Literal["newest", "oldest", "popular"] = "newest",
|
|
) -> Generator[dict, None, None]:
|
|
"""
|
|
Get videos for a channel.
|
|
|
|
:param channel_url: The url of the channel you want to get the videos for.
|
|
:param limit: Limit the number of videos you want to get.
|
|
:param sleep: Seconds to sleep between API calls to youtube, in order to prevent
|
|
getting blocked. Defaults to ``1``.
|
|
:param sort_by: In what order to retrive to videos. Pass one of the following values.
|
|
``"newest"``: Get the new videos first.
|
|
``"oldest"``: Get the old videos first.
|
|
``"popular"``: Get the popular videos first.
|
|
Defaults to ``"newest"``.
|
|
:return: Generator providing the videos
|
|
"""
|
|
|
|
sort_by_map = {"newest": "dd", "oldest": "da", "popular": "p"}
|
|
url = "{url}/videos?view=0&sort={sort_by}&flow=grid".format(
|
|
url=channel_url,
|
|
sort_by=sort_by_map[sort_by],
|
|
)
|
|
api_endpoint = "https://www.youtube.com/youtubei/v1/browse"
|
|
videos = _get_videos(url, api_endpoint, "gridVideoRenderer", limit, sleep)
|
|
for video in videos:
|
|
yield video
|
|
|
|
|
|
def get_channel_metadata(channel_url: str) -> dict:
|
|
"""
|
|
Get metadata of a channel.
|
|
|
|
:param channel_url: Channel URL
|
|
:return: Raw channel metadata
|
|
"""
|
|
session = _new_session()
|
|
|
|
url = f"{channel_url}/videos?view=0&flow=grid"
|
|
|
|
html = _get_initial_data(session, url)
|
|
return json.loads(_get_json_from_html(html, "var ytInitialData = ", 0, "};") + "}")
|
|
|
|
|
|
def get_playlist(
|
|
playlist_id: str, limit: int = None, sleep: int = 1
|
|
) -> Generator[dict, None, None]:
|
|
"""
|
|
Get videos for a playlist.
|
|
|
|
:param playlist_id: The playlist id from the playlist you want to get the videos for.
|
|
:param limit: Limit the number of videos you want to get.
|
|
:param sleep: Seconds to sleep between API calls to youtube, in order to prevent
|
|
getting blocked. Defaults to ``1``.
|
|
:return: Generator providing the videos
|
|
"""
|
|
|
|
url = f"https://www.youtube.com/playlist?list={playlist_id}"
|
|
api_endpoint = "https://www.youtube.com/youtubei/v1/browse"
|
|
videos = _get_videos(url, api_endpoint, "playlistVideoRenderer", limit, sleep)
|
|
for video in videos:
|
|
yield video
|
|
|
|
|
|
def get_search(
|
|
query: str,
|
|
limit: int = None,
|
|
sleep: int = 1,
|
|
sort_by: Literal["relevance", "upload_date", "view_count", "rating"] = "relevance",
|
|
results_type: Literal["video", "channel", "playlist", "movie"] = "video",
|
|
) -> Generator[dict, None, None]:
|
|
"""
|
|
Search youtube and get videos.
|
|
|
|
:param query: The term you want to search for.
|
|
:param limit: Limit the number of videos you want to get.
|
|
:param sleep: Seconds to sleep between API calls to youtube, in order to prevent
|
|
getting blocked. Defaults to ``1``.
|
|
:param sort_by: In what order to retrive to videos. Pass one of the following values.
|
|
``"relevance"``: Get the new videos in order of relevance.
|
|
``"upload_date"``: Get the new videos first.
|
|
``"view_count"``: Get the popular videos first.
|
|
``"rating"``: Get videos with more likes first.
|
|
Defaults to ``"relevance"``.
|
|
:param results_type: What type you want to search for.
|
|
Pass one of the following values: ``"video"|"channel"|
|
|
"playlist"|"movie"``. Defaults to ``"video"``.
|
|
:return: Generator providing the videos
|
|
"""
|
|
|
|
sort_by_map = {
|
|
"relevance": "A",
|
|
"upload_date": "I",
|
|
"view_count": "M",
|
|
"rating": "E",
|
|
}
|
|
|
|
results_type_map = {
|
|
"video": ["B", "videoRenderer"],
|
|
"channel": ["C", "channelRenderer"],
|
|
"playlist": ["D", "playlistRenderer"],
|
|
"movie": ["E", "videoRenderer"],
|
|
}
|
|
|
|
param_string = f"CA{sort_by_map[sort_by]}SAhA{results_type_map[results_type][0]}"
|
|
url = f"https://www.youtube.com/results?search_query={query}&sp={param_string}"
|
|
api_endpoint = "https://www.youtube.com/youtubei/v1/search"
|
|
videos = _get_videos(
|
|
url, api_endpoint, results_type_map[results_type][1], limit, sleep
|
|
)
|
|
for video in videos:
|
|
yield video
|
|
|
|
|
|
def _get_videos(
|
|
url: str, api_endpoint: str, selector: str, limit: int, sleep: int
|
|
) -> Generator[dict, None, None]:
|
|
session = _new_session()
|
|
is_first = True
|
|
quit = False
|
|
count = 0
|
|
while True:
|
|
if is_first:
|
|
html = _get_initial_data(session, url)
|
|
client = json.loads(
|
|
_get_json_from_html(html, "INNERTUBE_CONTEXT", 2, '"}},') + '"}}'
|
|
)["client"]
|
|
api_key = _get_json_from_html(html, "innertubeApiKey", 3)
|
|
session.headers["X-YouTube-Client-Name"] = "1"
|
|
session.headers["X-YouTube-Client-Version"] = client["clientVersion"]
|
|
data = json.loads(
|
|
_get_json_from_html(html, "var ytInitialData = ", 0, "};") + "}"
|
|
)
|
|
next_data = _get_next_data(data)
|
|
is_first = False
|
|
else:
|
|
data = _get_ajax_data(session, api_endpoint, api_key, next_data, client)
|
|
next_data = _get_next_data(data)
|
|
for result in _get_videos_items(data, selector):
|
|
try:
|
|
count += 1
|
|
yield result
|
|
if count == limit:
|
|
quit = True
|
|
break
|
|
except GeneratorExit:
|
|
quit = True
|
|
break
|
|
|
|
if not next_data or quit:
|
|
break
|
|
|
|
time.sleep(sleep)
|
|
|
|
session.close()
|
|
|
|
|
|
def _new_session() -> requests.Session:
|
|
session = requests.Session()
|
|
session.headers[
|
|
"User-Agent"
|
|
] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.101 Safari/537.36"
|
|
session.headers["Accept-Language"] = "en"
|
|
return session
|
|
|
|
|
|
def _get_initial_data(session: requests.Session, url: str) -> str:
|
|
response = session.get(url)
|
|
response.raise_for_status()
|
|
|
|
if "uxe=" in response.request.url:
|
|
session.cookies.set("CONSENT", "YES+cb", domain=".youtube.com")
|
|
response = session.get(url)
|
|
|
|
html = response.text
|
|
return html
|
|
|
|
|
|
def _get_ajax_data(
|
|
session: requests.Session,
|
|
api_endpoint: str,
|
|
api_key: str,
|
|
next_data: dict,
|
|
client: dict,
|
|
) -> dict:
|
|
data = {
|
|
"context": {"clickTracking": next_data["click_params"], "client": client},
|
|
"continuation": next_data["token"],
|
|
}
|
|
response = session.post(api_endpoint, params={"key": api_key}, json=data)
|
|
return response.json()
|
|
|
|
|
|
def _get_json_from_html(
|
|
html: str, key: str, num_chars: int = 2, stop: str = '"'
|
|
) -> str:
|
|
pos_begin = html.find(key) + len(key) + num_chars
|
|
pos_end = html.find(stop, pos_begin)
|
|
return html[pos_begin:pos_end]
|
|
|
|
|
|
def _get_next_data(data: dict) -> Optional[dict]:
|
|
raw_next_data = next(_search_dict(data, "continuationEndpoint"), None)
|
|
if not raw_next_data:
|
|
return None
|
|
next_data = {
|
|
"token": raw_next_data["continuationCommand"]["token"],
|
|
"click_params": {"clickTrackingParams": raw_next_data["clickTrackingParams"]},
|
|
}
|
|
|
|
return next_data
|
|
|
|
|
|
def _search_dict(partial: dict, search_key: str) -> Generator[dict, None, None]:
|
|
stack = [partial]
|
|
while stack:
|
|
current_item = stack.pop(0)
|
|
if isinstance(current_item, dict):
|
|
for key, value in current_item.items():
|
|
if key == search_key:
|
|
yield value
|
|
else:
|
|
stack.append(value)
|
|
elif isinstance(current_item, list):
|
|
for value in current_item:
|
|
stack.append(value)
|
|
|
|
|
|
def _get_videos_items(data: dict, selector: str) -> Generator[dict, None, None]:
|
|
return _search_dict(data, selector)
|