1
0
mirror of https://git.sr.ht/~cadence/NewLeaf synced 2024-11-21 15:17:29 +00:00

Remove with requests when it is unnecessary

This commit is contained in:
Cadence Ember 2022-01-16 21:51:26 +13:00
parent 73b4fbabf7
commit 68cfbb809f
No known key found for this signature in database
GPG Key ID: BC1C2C61CF521B17
6 changed files with 244 additions and 244 deletions

View File

@ -16,12 +16,12 @@ def extract_captions_from_dict(captions, *, lang=None, label=None):
return captions return captions
url = next(caption["second__remoteUrl"] for caption in captions["captions"] if caption["languageCode"] == lang or caption["label"] == label) url = next(caption["second__remoteUrl"] for caption in captions["captions"] if caption["languageCode"] == lang or caption["label"] == label)
with requests.get(url) as r: r = requests.get(url)
r.raise_for_status() r.raise_for_status()
# remove extraneous " align:start position:0%" on timestamps lines on auto-generated captions # remove extraneous " align:start position:0%" on timestamps lines on auto-generated captions
if (lang and "auto-generated" in lang) or (label and "auto-generated" in label): if (lang and "auto-generated" in lang) or (label and "auto-generated" in label):
return re.sub(r"^([0-9:.]+ --> [0-9:.]+).*$", r"\1", r.content.decode("utf8"), flags=re.MULTILINE) return re.sub(r"^([0-9:.]+ --> [0-9:.]+).*$", r"\1", r.content.decode("utf8"), flags=re.MULTILINE)
return r return r
def extract_captions_from_video(id): def extract_captions_from_video(id):
return { return {

View File

@ -18,142 +18,142 @@ def extract_channel(ucid):
return channel_cache[ucid] return channel_cache[ucid]
channel_type = "channel" if len(ucid) == 24 and ucid[:2] == "UC" else "user" channel_type = "channel" if len(ucid) == 24 and ucid[:2] == "UC" else "user"
with requests.get("https://www.youtube.com/{}/{}/videos?hl=en".format(channel_type, ucid), cookies=eu_consent_cookie()) as r: r = requests.get("https://www.youtube.com/{}/{}/videos?hl=en".format(channel_type, ucid), cookies=eu_consent_cookie())
r.raise_for_status() r.raise_for_status()
yt_initial_data = extract_yt_initial_data(r.content.decode("utf8")) yt_initial_data = extract_yt_initial_data(r.content.decode("utf8"))
for alert in yt_initial_data.get("alerts", []): for alert in yt_initial_data.get("alerts", []):
alert_text = combine_runs(alert["alertRenderer"]["text"]) alert_text = combine_runs(alert["alertRenderer"]["text"])
if alert_text == "This channel does not exist.": if alert_text == "This channel does not exist.":
return { return {
"error": alert_text, "error": alert_text,
"identifier": "NOT_FOUND" "identifier": "NOT_FOUND"
} }
elif alert_text.startswith("This account has been terminated"): elif alert_text.startswith("This account has been terminated"):
return { return {
"error": alert_text, "error": alert_text,
"identifier": "ACCOUNT_TERMINATED" "identifier": "ACCOUNT_TERMINATED"
} }
else:
print("Seen alert text '{}'".format(alert_text))
header = yt_initial_data["header"]["c4TabbedHeaderRenderer"] if "c4TabbedHeaderRenderer" in yt_initial_data["header"] else {}
channel_metadata = yt_initial_data["metadata"]["channelMetadataRenderer"]
if header:
author = header["title"]
author_id = header["channelId"]
author_url = header["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"]
else: else:
author = channel_metadata["title"] print("Seen alert text '{}'".format(alert_text))
author_id = channel_metadata["externalId"]
author_url = channel_metadata["channelUrl"]
subscriber_count = combine_runs(header["subscriberCountText"]) if "subscriberCountText" in header else "Unknown subscribers" header = yt_initial_data["header"]["c4TabbedHeaderRenderer"] if "c4TabbedHeaderRenderer" in yt_initial_data["header"] else {}
description = channel_metadata["description"] channel_metadata = yt_initial_data["metadata"]["channelMetadataRenderer"]
allowed_regions = channel_metadata["availableCountryCodes"]
author_banners = [] if header:
if "banner" in header: author = header["title"]
author_banners = header["banner"]["thumbnails"] author_id = header["channelId"]
for t in author_banners: author_url = header["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"]
t["url"] = normalise_url_protocol(t["url"]) else:
author = channel_metadata["title"]
author_id = channel_metadata["externalId"]
author_url = channel_metadata["channelUrl"]
author_thumbnails = [] subscriber_count = combine_runs(header["subscriberCountText"]) if "subscriberCountText" in header else "Unknown subscribers"
avatar = header.get("avatar") or channel_metadata.get("avatar") description = channel_metadata["description"]
if avatar: allowed_regions = channel_metadata["availableCountryCodes"]
author_thumbnails = generate_full_author_thumbnails(avatar["thumbnails"])
latest_videos = [] author_banners = []
tabs = yt_initial_data["contents"]["twoColumnBrowseResultsRenderer"]["tabs"] if "banner" in header:
try: author_banners = header["banner"]["thumbnails"]
videos_tab = next(tab["tabRenderer"] for tab in tabs if tab["tabRenderer"]["title"] == "Videos") for t in author_banners:
tab_parts = videos_tab["content"]["sectionListRenderer"]["contents"][0]["itemSectionRenderer"]["contents"][0] t["url"] = normalise_url_protocol(t["url"])
except StopIteration:
tab_parts = {}
# check that the channel actually has videos - this may be replaced author_thumbnails = []
# with messageRenderer.text.simpleText == "This channel has no videos." avatar = header.get("avatar") or channel_metadata.get("avatar")
if "gridRenderer" in tab_parts: if avatar:
videos = ( author_thumbnails = generate_full_author_thumbnails(avatar["thumbnails"])
v["gridVideoRenderer"] for v in tab_parts["gridRenderer"]["items"] if "gridVideoRenderer" in v
)
for v in videos:
live = False
is_upcoming = False
length_text = "UNKNOWN"
length_seconds = -1
for o in v["thumbnailOverlays"]:
if "thumbnailOverlayTimeStatusRenderer" in o:
length_text = combine_runs(o["thumbnailOverlayTimeStatusRenderer"]["text"])
length_text_style = o["thumbnailOverlayTimeStatusRenderer"]["style"]
if length_text_style == "DEFAULT":
length_seconds = length_text_to_seconds(length_text)
elif length_text_style == "LIVE":
live = True
elif length_text_style == "UPCOMING":
is_upcoming = True
published = 0
published_text = "Live now"
premiere_timestamp = None
if "publishedTimeText" in v:
published_text = v["publishedTimeText"]["simpleText"]
published = past_text_to_time(published_text)
if "upcomingEventData" in v:
premiere_timestamp = v["upcomingEventData"]["startTime"]
published_text = time_to_past_text(int(premiere_timestamp))
view_count_text = combine_runs(v["viewCountText"]) if "viewCountText" in v else None latest_videos = []
view_count_text_short = combine_runs(v["shortViewCountText"]) if "shortViewCountText" in v else None tabs = yt_initial_data["contents"]["twoColumnBrowseResultsRenderer"]["tabs"]
try:
videos_tab = next(tab["tabRenderer"] for tab in tabs if tab["tabRenderer"]["title"] == "Videos")
tab_parts = videos_tab["content"]["sectionListRenderer"]["contents"][0]["itemSectionRenderer"]["contents"][0]
except StopIteration:
tab_parts = {}
latest_videos.append({ # check that the channel actually has videos - this may be replaced
"type": "video", # with messageRenderer.text.simpleText == "This channel has no videos."
"title": combine_runs(v["title"]), if "gridRenderer" in tab_parts:
"videoId": v["videoId"], videos = (
"author": author, v["gridVideoRenderer"] for v in tab_parts["gridRenderer"]["items"] if "gridVideoRenderer" in v
"authorId": author_id, )
"authorUrl": author_url, for v in videos:
"videoThumbnails": generate_video_thumbnails(v["videoId"]), live = False
"description": "", is_upcoming = False
"descriptionHtml": "", length_text = "UNKNOWN"
"viewCount": view_count_text_to_number(view_count_text), length_seconds = -1
"second__viewCountText": view_count_text, for o in v["thumbnailOverlays"]:
"second__viewCountTextShort": view_count_text_short, if "thumbnailOverlayTimeStatusRenderer" in o:
"published": published, length_text = combine_runs(o["thumbnailOverlayTimeStatusRenderer"]["text"])
"publishedText": published_text, length_text_style = o["thumbnailOverlayTimeStatusRenderer"]["style"]
"lengthSeconds": length_seconds, if length_text_style == "DEFAULT":
"second__lengthText": length_text, length_seconds = length_text_to_seconds(length_text)
"liveNow": live, elif length_text_style == "LIVE":
"paid": None, live = True
"premium": None, elif length_text_style == "UPCOMING":
"isUpcoming": is_upcoming, is_upcoming = True
"premiereTimestamp": premiere_timestamp published = 0
}) published_text = "Live now"
premiere_timestamp = None
if "publishedTimeText" in v:
published_text = v["publishedTimeText"]["simpleText"]
published = past_text_to_time(published_text)
if "upcomingEventData" in v:
premiere_timestamp = v["upcomingEventData"]["startTime"]
published_text = time_to_past_text(int(premiere_timestamp))
channel = { view_count_text = combine_runs(v["viewCountText"]) if "viewCountText" in v else None
"author": author, view_count_text_short = combine_runs(v["shortViewCountText"]) if "shortViewCountText" in v else None
"authorId": author_id,
"authorUrl": author_url,
"authorBanners": author_banners,
"authorThumbnails": author_thumbnails,
"subCount": uncompress_counter(subscriber_count.split(" ")[0]),
"second__subCountText": subscriber_count,
"totalViews": None,
"joined": None,
"paid": None,
"autoGenerated": None,
"isFamilyFriendly": None,
"description": description,
"descriptionHtml": add_html_links(escape_html_textcontent(description)),
"allowedRegions": allowed_regions,
"latestVideos": latest_videos,
"relatedChannels": []
}
with channel_cache_lock: latest_videos.append({
channel_cache[ucid] = channel "type": "video",
"title": combine_runs(v["title"]),
"videoId": v["videoId"],
"author": author,
"authorId": author_id,
"authorUrl": author_url,
"videoThumbnails": generate_video_thumbnails(v["videoId"]),
"description": "",
"descriptionHtml": "",
"viewCount": view_count_text_to_number(view_count_text),
"second__viewCountText": view_count_text,
"second__viewCountTextShort": view_count_text_short,
"published": published,
"publishedText": published_text,
"lengthSeconds": length_seconds,
"second__lengthText": length_text,
"liveNow": live,
"paid": None,
"premium": None,
"isUpcoming": is_upcoming,
"premiereTimestamp": premiere_timestamp
})
return channel channel = {
"author": author,
"authorId": author_id,
"authorUrl": author_url,
"authorBanners": author_banners,
"authorThumbnails": author_thumbnails,
"subCount": uncompress_counter(subscriber_count.split(" ")[0]),
"second__subCountText": subscriber_count,
"totalViews": None,
"joined": None,
"paid": None,
"autoGenerated": None,
"isFamilyFriendly": None,
"description": description,
"descriptionHtml": add_html_links(escape_html_textcontent(description)),
"allowedRegions": allowed_regions,
"latestVideos": latest_videos,
"relatedChannels": []
}
with channel_cache_lock:
channel_cache[ucid] = channel
return channel
def extract_channel_videos(ucid): def extract_channel_videos(ucid):
channel = extract_channel(ucid) channel = extract_channel(ucid)
@ -167,59 +167,59 @@ def extract_channel_latest(ucid):
if ucid in channel_latest_cache: if ucid in channel_latest_cache:
return channel_latest_cache[ucid] return channel_latest_cache[ucid]
with requests.get("https://www.youtube.com/feeds/videos.xml?channel_id={}".format(ucid)) as r: r = requests.get("https://www.youtube.com/feeds/videos.xml?channel_id={}".format(ucid))
if r.status_code == 404: if r.status_code == 404:
cherrypy.response.status = 404 cherrypy.response.status = 404
return { return {
"error": "This channel does not exist.", "error": "This channel does not exist.",
"identifier": "NOT_FOUND" "identifier": "NOT_FOUND"
} }
feed = ET.fromstring(r.content) feed = ET.fromstring(r.content)
author_container = feed.find("{http://www.w3.org/2005/Atom}author") author_container = feed.find("{http://www.w3.org/2005/Atom}author")
author = author_container.find("{http://www.w3.org/2005/Atom}name").text author = author_container.find("{http://www.w3.org/2005/Atom}name").text
author_url = author_container.find("{http://www.w3.org/2005/Atom}uri").text author_url = author_container.find("{http://www.w3.org/2005/Atom}uri").text
channel_id = feed.find("{http://www.youtube.com/xml/schemas/2015}channelId").text channel_id = feed.find("{http://www.youtube.com/xml/schemas/2015}channelId").text
results = [] results = []
missing_published = False missing_published = False
for entry in feed.findall("{http://www.w3.org/2005/Atom}entry"): for entry in feed.findall("{http://www.w3.org/2005/Atom}entry"):
id = entry.find("{http://www.youtube.com/xml/schemas/2015}videoId").text id = entry.find("{http://www.youtube.com/xml/schemas/2015}videoId").text
media_group = entry.find("{http://search.yahoo.com/mrss/}group") media_group = entry.find("{http://search.yahoo.com/mrss/}group")
description = media_group.find("{http://search.yahoo.com/mrss/}description").text or "" description = media_group.find("{http://search.yahoo.com/mrss/}description").text or ""
media_community = media_group.find("{http://search.yahoo.com/mrss/}community") media_community = media_group.find("{http://search.yahoo.com/mrss/}community")
published_entry = entry.find("{http://www.w3.org/2005/Atom}published") published_entry = entry.find("{http://www.w3.org/2005/Atom}published")
if published_entry is not None: # sometimes youtube does not provide published dates, no idea why. if published_entry is not None: # sometimes youtube does not provide published dates, no idea why.
published = int(dateutil.parser.isoparse(published_entry.text).timestamp()) published = int(dateutil.parser.isoparse(published_entry.text).timestamp())
results.append({ results.append({
"type": "video", "type": "video",
"title": entry.find("{http://www.w3.org/2005/Atom}title").text, "title": entry.find("{http://www.w3.org/2005/Atom}title").text,
"videoId": id, "videoId": id,
"author": author, "author": author,
"authorId": channel_id, "authorId": channel_id,
"authorUrl": author_url, "authorUrl": author_url,
"videoThumbnails": generate_video_thumbnails(id), "videoThumbnails": generate_video_thumbnails(id),
"description": description, "description": description,
"descriptionHtml": add_html_links(escape_html_textcontent(description)), "descriptionHtml": add_html_links(escape_html_textcontent(description)),
"viewCount": int(media_community.find("{http://search.yahoo.com/mrss/}statistics").attrib["views"]), "viewCount": int(media_community.find("{http://search.yahoo.com/mrss/}statistics").attrib["views"]),
"published": published, "published": published,
"publishedText": time_to_past_text(published), "publishedText": time_to_past_text(published),
"lengthSeconds": None, "lengthSeconds": None,
"liveNow": None, "liveNow": None,
"paid": None, "paid": None,
"premium": None, "premium": None,
"isUpcoming": None "isUpcoming": None
}) })
else: else:
missing_published = True missing_published = True
if len(results) == 0 and missing_published: # no results due to all missing published if len(results) == 0 and missing_published: # no results due to all missing published
cherrypy.response.status = 503 cherrypy.response.status = 503
return { return {
"error": "YouTube did not provide published dates for any feed items. This is usually temporary - refresh in a few minutes.", "error": "YouTube did not provide published dates for any feed items. This is usually temporary - refresh in a few minutes.",
"identifier": "PUBLISHED_DATES_NOT_PROVIDED" "identifier": "PUBLISHED_DATES_NOT_PROVIDED"
} }
with channel_latest_cache_lock: with channel_latest_cache_lock:
channel_latest_cache[ucid] = results channel_latest_cache[ucid] = results
return results return results

View File

@ -11,9 +11,9 @@ def extract_manifest(id):
return video return video
if video["second__providedDashUrl"]: if video["second__providedDashUrl"]:
with requests.get(video["second__providedDashUrl"]) as r: r = requests.get(video["second__providedDashUrl"])
r.raise_for_status() r.raise_for_status()
return r return r
adaptation_sets_dict = {} adaptation_sets_dict = {}
for f in video["adaptiveFormats"]: for f in video["adaptiveFormats"]:

View File

@ -17,51 +17,51 @@ ytdl = yt_dlp.YoutubeDL(ytdl_opts)
def extract_search(q): def extract_search(q):
try: try:
with requests.get("https://www.youtube.com/results", params={"q": q, "hl": "en"}, cookies=eu_consent_cookie()) as r: r = requests.get("https://www.youtube.com/results", params={"q": q, "hl": "en"}, cookies=eu_consent_cookie())
r.raise_for_status() r.raise_for_status()
content = r.content.decode("utf8") content = r.content.decode("utf8")
yt_initial_data = extract_yt_initial_data(content) yt_initial_data = extract_yt_initial_data(content)
sections = yt_initial_data["contents"]["twoColumnSearchResultsRenderer"]["primaryContents"]["sectionListRenderer"]["contents"] sections = yt_initial_data["contents"]["twoColumnSearchResultsRenderer"]["primaryContents"]["sectionListRenderer"]["contents"]
# youtube searches contain a lot of random stuff, just grab it all for now, then filter to `videoRenderer` later # youtube searches contain a lot of random stuff, just grab it all for now, then filter to `videoRenderer` later
itemSections = [s for s in sections if "itemSectionRenderer" in s] itemSections = [s for s in sections if "itemSectionRenderer" in s]
items = [] items = []
for section in itemSections: for section in itemSections:
items += section["itemSectionRenderer"]["contents"] items += section["itemSectionRenderer"]["contents"]
results = [] results = []
for item in items: for item in items:
if "videoRenderer" in item: if "videoRenderer" in item:
video = item["videoRenderer"] video = item["videoRenderer"]
published = 0 published = 0
published_text = "Live now" published_text = "Live now"
if "publishedTimeText" in video: if "publishedTimeText" in video:
published_text = video["publishedTimeText"]["simpleText"] published_text = video["publishedTimeText"]["simpleText"]
published = past_text_to_time(published_text) published = past_text_to_time(published_text)
results.append({ results.append({
"type": "video", "type": "video",
"title": combine_runs(video["title"]), "title": combine_runs(video["title"]),
"videoId": video["videoId"], "videoId": video["videoId"],
"author": combine_runs(video["longBylineText"]), "author": combine_runs(video["longBylineText"]),
"authorId": video["longBylineText"]["runs"][0]["navigationEndpoint"]["browseEndpoint"]["browseId"], "authorId": video["longBylineText"]["runs"][0]["navigationEndpoint"]["browseEndpoint"]["browseId"],
"authorUrl": video["longBylineText"]["runs"][0]["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"], "authorUrl": video["longBylineText"]["runs"][0]["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"],
"videoThumbnails": generate_video_thumbnails(video["videoId"]), "videoThumbnails": generate_video_thumbnails(video["videoId"]),
"description": combine_runs(video["descriptionSnippet"]) if "descriptionSnippet" in video else "", "description": combine_runs(video["descriptionSnippet"]) if "descriptionSnippet" in video else "",
"descriptionHtml": combine_runs_html(video["descriptionSnippet"]) if "descriptionSnippet" in video else "", "descriptionHtml": combine_runs_html(video["descriptionSnippet"]) if "descriptionSnippet" in video else "",
"viewCount": get_view_count_or_recommended(video), "viewCount": get_view_count_or_recommended(video),
"second__viewCountText": get_view_count_text_or_recommended(video), "second__viewCountText": get_view_count_text_or_recommended(video),
"published": published, "published": published,
"publishedText": published_text, "publishedText": published_text,
"lengthSeconds": get_length_or_live_now(video), "lengthSeconds": get_length_or_live_now(video),
"second__lengthText": get_length_text_or_live_now(video), "second__lengthText": get_length_text_or_live_now(video),
"liveNow": is_live(video), "liveNow": is_live(video),
"paid": None, "paid": None,
"premium": None, "premium": None,
"isUpcoming": None "isUpcoming": None
}) })
search_cache[q] = results # only cache full extraction search_cache[q] = results # only cache full extraction
return results return results
except Exception: except Exception:
print("messed up extracting search, using youtube-dl instead") print("messed up extracting search, using youtube-dl instead")

View File

@ -20,12 +20,12 @@ def extract_search_suggestions(q):
"xhr": "t", "xhr": "t",
# "xssi": "t" # "xssi": "t"
} }
with requests.get("https://clients1.google.com/complete/search", params=params) as r: r = requests.get("https://clients1.google.com/complete/search", params=params)
r.raise_for_status() r.raise_for_status()
response = r.json() response = r.json()
result = { result = {
"query": q, "query": q,
"suggestions": [s[0] for s in response[1]] "suggestions": [s[0] for s in response[1]]
} }
suggestions_cache[q] = result suggestions_cache[q] = result
return result return result

View File

@ -123,17 +123,17 @@ class NewLeaf(object):
@cherrypy.expose @cherrypy.expose
def vi(self, id, file): def vi(self, id, file):
with requests.get("https://i.ytimg.com/vi/{}/{}".format(id, file), stream=True) as r: r = requests.get("https://i.ytimg.com/vi/{}/{}".format(id, file), stream=True)
r.raise_for_status() r.raise_for_status()
cherrypy.response.headers["content-type"] = r.headers["content-type"] cherrypy.response.headers["content-type"] = r.headers["content-type"]
return next(r.iter_content(chunk_size=None)) return next(r.iter_content(chunk_size=None))
@cherrypy.expose @cherrypy.expose
def ggpht(self, *path): def ggpht(self, *path):
with requests.get("https://yt3.ggpht.com/{}".format("/".join(path)), stream=True) as r: r = requests.get("https://yt3.ggpht.com/{}".format("/".join(path)), stream=True)
r.raise_for_status() r.raise_for_status()
cherrypy.response.headers["content-type"] = r.headers["content-type"] cherrypy.response.headers["content-type"] = r.headers["content-type"]
return next(r.iter_content(chunk_size=None)) return next(r.iter_content(chunk_size=None))
bind_port = getattr(configuration, "bind_port", 3000) bind_port = getattr(configuration, "bind_port", 3000)
bind_host = getattr(configuration, "bind_host", "0.0.0.0") bind_host = getattr(configuration, "bind_host", "0.0.0.0")