1
0
mirror of https://git.sr.ht/~cadence/NewLeaf synced 2024-11-14 03:57:31 +00:00
NewLeaf/extractors/channel.py
Cadence Ember 57b0a88a2e
Detect channels that do not exist
If error alerts exist, they will be logged. But it is reasonable to
assume that not all errors will be fatal, so we don't necessarily quit
parsing if we find one.

This also normalises the text error of the /latest response for a
missing channel, without changing its identifier.
2021-05-02 01:20:53 +12:00

211 lines
7.6 KiB
Python

import cherrypy
import dateutil.parser
import requests
import xml.etree.ElementTree as ET
from tools.converters import *
from tools.extractors import extract_yt_initial_data
from threading import Lock
from cachetools import TTLCache
channel_cache = TTLCache(maxsize=50, ttl=300)
channel_cache_lock = Lock()
channel_latest_cache = TTLCache(maxsize=500, ttl=300)
channel_latest_cache_lock = Lock()
def extract_channel(ucid):
with channel_cache_lock:
if ucid in channel_cache:
return channel_cache[ucid]
channel_type = "channel" if len(ucid) == 24 and ucid[:2] == "UC" else "user"
with requests.get("https://www.youtube.com/{}/{}/videos?hl=en".format(channel_type, ucid), cookies={"CONSENT": "PENDING+999"}) as r:
r.raise_for_status()
yt_initial_data = extract_yt_initial_data(r.content.decode("utf8"))
for alert in yt_initial_data.get("alerts", []):
alert_text = combine_runs(alert["alertRenderer"]["text"])
if alert_text == "This channel does not exist.":
return {
"error": alert_text,
"identifier": "NOT_FOUND"
}
else:
print("Seen alert text '{}'".format(alert_text))
header = yt_initial_data["header"]["c4TabbedHeaderRenderer"] if "c4TabbedHeaderRenderer" in yt_initial_data["header"] else {}
channel_metadata = yt_initial_data["metadata"]["channelMetadataRenderer"]
if header:
author = header["title"]
author_id = header["channelId"]
author_url = header["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"]
else:
author = channel_metadata["title"]
author_id = channel_metadata["externalId"]
author_url = channel_metadata["channelUrl"]
subscriber_count = combine_runs(header["subscriberCountText"]) if "subscriberCountText" in header else "Unknown subscribers"
description = channel_metadata["description"]
allowed_regions = channel_metadata["availableCountryCodes"]
author_banners = []
if "banner" in header:
author_banners = header["banner"]["thumbnails"]
for t in author_banners:
t["url"] = normalise_url_protocol(t["url"])
author_thumbnails = []
avatar = header.get("avatar") or channel_metadata.get("avatar")
if avatar:
author_thumbnails = generate_full_author_thumbnails(avatar["thumbnails"])
latest_videos = []
tabs = yt_initial_data["contents"]["twoColumnBrowseResultsRenderer"]["tabs"]
try:
videos_tab = next(tab["tabRenderer"] for tab in tabs if tab["tabRenderer"]["title"] == "Videos")
tab_parts = videos_tab["content"]["sectionListRenderer"]["contents"][0]["itemSectionRenderer"]["contents"][0]
except StopIteration:
tab_parts = {}
# check that the channel actually has videos - this may be replaced
# with messageRenderer.text.simpleText == "This channel has no videos."
if "gridRenderer" in tab_parts:
videos = (
v["gridVideoRenderer"] for v in tab_parts["gridRenderer"]["items"] if "gridVideoRenderer" in v
)
for v in videos:
live = True
length_text = "LIVE"
length_seconds = -1
for o in v["thumbnailOverlays"]:
if "thumbnailOverlayTimeStatusRenderer" in o:
length_text = combine_runs(o["thumbnailOverlayTimeStatusRenderer"]["text"])
if o["thumbnailOverlayTimeStatusRenderer"]["style"] != "LIVE":
length_seconds = length_text_to_seconds(length_text)
live = False
published = 0
published_text = "Live now"
if "publishedTimeText" in v:
published_text = v["publishedTimeText"]["simpleText"]
published = past_text_to_time(published_text)
view_count_text = combine_runs(v["viewCountText"]) if "viewCountText" in v else None
view_count_text_short = combine_runs(v["shortViewCountText"]) if "shortViewCountText" in v else None
latest_videos.append({
"type": "video",
"title": combine_runs(v["title"]),
"videoId": v["videoId"],
"author": author,
"authorId": author_id,
"authorUrl": author_url,
"videoThumbnails": generate_video_thumbnails(v["videoId"]),
"description": "",
"descriptionHtml": "",
"viewCount": view_count_text_to_number(view_count_text),
"second__viewCountText": view_count_text,
"second__viewCountTextShort": view_count_text_short,
"published": published,
"publishedText": published_text,
"lengthSeconds": length_seconds,
"second__lengthText": length_text,
"liveNow": live,
"paid": None,
"premium": None,
"isUpcoming": None
})
channel = {
"author": author,
"authorId": author_id,
"authorUrl": author_url,
"authorBanners": author_banners,
"authorThumbnails": author_thumbnails,
"subCount": uncompress_counter(subscriber_count.split(" ")[0]),
"second__subCountText": subscriber_count,
"totalViews": None,
"joined": None,
"paid": None,
"autoGenerated": None,
"isFamilyFriendly": None,
"description": description,
"descriptionHtml": add_html_links(escape_html_textcontent(description)),
"allowedRegions": allowed_regions,
"latestVideos": latest_videos,
"relatedChannels": []
}
with channel_cache_lock:
channel_cache[ucid] = channel
return channel
def extract_channel_videos(ucid):
channel = extract_channel(ucid)
if "error" in channel:
return channel
else:
return channel["latestVideos"]
def extract_channel_latest(ucid):
with channel_latest_cache_lock:
if ucid in channel_latest_cache:
return channel_latest_cache[ucid]
with requests.get("https://www.youtube.com/feeds/videos.xml?channel_id={}".format(ucid)) as r:
if r.status_code == 404:
cherrypy.response.status = 404
return {
"error": "This channel does not exist.",
"identifier": "NOT_FOUND"
}
feed = ET.fromstring(r.content)
author_container = feed.find("{http://www.w3.org/2005/Atom}author")
author = author_container.find("{http://www.w3.org/2005/Atom}name").text
author_url = author_container.find("{http://www.w3.org/2005/Atom}uri").text
channel_id = feed.find("{http://www.youtube.com/xml/schemas/2015}channelId").text
results = []
missing_published = False
for entry in feed.findall("{http://www.w3.org/2005/Atom}entry"):
id = entry.find("{http://www.youtube.com/xml/schemas/2015}videoId").text
media_group = entry.find("{http://search.yahoo.com/mrss/}group")
description = media_group.find("{http://search.yahoo.com/mrss/}description").text or ""
media_community = media_group.find("{http://search.yahoo.com/mrss/}community")
published_entry = entry.find("{http://www.w3.org/2005/Atom}published")
if published_entry is not None: # sometimes youtube does not provide published dates, no idea why.
published = int(dateutil.parser.isoparse(published_entry.text).timestamp())
results.append({
"type": "video",
"title": entry.find("{http://www.w3.org/2005/Atom}title").text,
"videoId": id,
"author": author,
"authorId": channel_id,
"authorUrl": author_url,
"videoThumbnails": generate_video_thumbnails(id),
"description": description,
"descriptionHtml": add_html_links(escape_html_textcontent(description)),
"viewCount": int(media_community.find("{http://search.yahoo.com/mrss/}statistics").attrib["views"]),
"published": published,
"publishedText": time_to_past_text(published),
"lengthSeconds": None,
"liveNow": None,
"paid": None,
"premium": None,
"isUpcoming": None
})
else:
missing_published = True
if len(results) == 0 and missing_published: # no results due to all missing published
cherrypy.response.status = 503
return {
"error": "YouTube did not provide published dates for any feed items. This is usually temporary - refresh in a few minutes.",
"identifier": "PUBLISHED_DATES_NOT_PROVIDED"
}
with channel_latest_cache_lock:
channel_latest_cache[ucid] = results
return results