From 57fb71b97d4dd5cc8707ebc8722a42e6b29e2622 Mon Sep 17 00:00:00 2001 From: Cadence Ember Date: Thu, 13 Aug 2020 18:54:51 +1200 Subject: [PATCH] Write a new channel parser, using RSS for /latest --- index.py | 186 +++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 140 insertions(+), 46 deletions(-) diff --git a/index.py b/index.py index 7253c65..91fc6cf 100644 --- a/index.py +++ b/index.py @@ -3,6 +3,7 @@ import cherrypy import json import youtube_dl import datetime +import dateutil.parser import os import re import json @@ -63,6 +64,14 @@ def combine_runs_html(runs): result += part["text"] return result +def add_html_links(text): + r_link = re.compile(r"""https?://[a-z-]+(?:\.[a-z-]+)+(?:/[^\s,<>)]*)?""") # it's okay, I guess. + match = r_link.search(text) + if match is not None: + link = match.group() + text = text[:match.start()] + '{}'.format(link, link) + add_html_links(text[match.end():]) + return text + def view_count_text_to_number(text): return int(text.split(" ")[0].replace(",", "")) @@ -124,11 +133,31 @@ def generate_video_thumbnails(id): "height": type[3] } for type in types] +def normalise_url_protocol(url): + if url.startswith("//"): + url = "https:" + url + return url + +def uncompress_counter(text): + last = text[-1:].lower() + if last >= "0" and last <= "9": + return int(last) + else: + multiplier = 1 + if last == "k": + multiplier = 1000 + elif last == "m": + multiplier = 1000000 + elif last == "b": + multiplier = 1000000000 + return int(float(text[:-1]) * multiplier) + class Second(object): def __init__(self): self.video_cache = TTLCache(maxsize=50, ttl=300) self.search_cache = TTLCache(maxsize=50, ttl=300) self.search_suggestions_cache = TTLCache(maxsize=200, ttl=60) + self.channel_cache = TTLCache(maxsize=50, ttl=300) def _cp_dispatch(self, vpath): if vpath[:4] == ["api", "manifest", "dash", "id"]: @@ -404,56 +433,121 @@ class Second(object): else: [ucid, part] = suffix - try: - info = ytdl.extract_info("https://www.youtube.com/channel/{}".format(ucid), download=False) + if part == "latest": + # use RSS + with requests.get("https://www.youtube.com/feeds/videos.xml?channel_id={}".format(ucid)) as r: + r.raise_for_status() + feed = ET.fromstring(r.content) + author_container = feed.find("{http://www.w3.org/2005/Atom}author") + author = author_container.find("{http://www.w3.org/2005/Atom}name").text + author_url = author_container.find("{http://www.w3.org/2005/Atom}uri").text + channel_id = feed.find("{http://www.youtube.com/xml/schemas/2015}channelId").text + results = [] + for entry in feed.findall("{http://www.w3.org/2005/Atom}entry"): + id = entry.find("{http://www.youtube.com/xml/schemas/2015}videoId").text + media_group = entry.find("{http://search.yahoo.com/mrss/}group") + description = media_group.find("{http://search.yahoo.com/mrss/}description").text + media_community = media_group.find("{http://search.yahoo.com/mrss/}community") + results.append({ + "type": "video", + "title": entry.find("{http://www.w3.org/2005/Atom}title").text, + "videoId": id, + "author": author, + "authorId": channel_id, + "authorUrl": author_url, + "videoThumbnails": generate_video_thumbnails(id), + "description": description, + "descriptionHtml": add_html_links(escape_html_textcontent(description)), + "viewCount": int(media_community.find("{http://search.yahoo.com/mrss/}statistics").attrib["views"]), + "published": int(dateutil.parser.isoparse(entry.find("{http://www.w3.org/2005/Atom}published").text).timestamp()), + "lengthSeconds": None, + "liveNow": None, + "paid": None, + "premium": None, + "isUpcoming": None + }) + return results - response = { - "author": info["uploader"], - "authorId": info["uploader_id"], - "authorUrl": info["uploader_url"], - "authorBanners": [], - "authorThumbnails": [], - "subCount": None, - "totalViews": None, - "joined": None, - "paid": None, - "autoGenerated": None, - "isFamilyFriendly": None, - "description": None, - "descriptionHtml": None, - "allowedRegions": [], - "latestVideos": list({ - "type": "video", - "title": video["title"], - "videoId": video["id"], - "author": info["uploader"], - "authorId": info["uploader_id"], - "authorUrl": info["uploader_url"], - "videoThumbnails": generate_video_thumbnails(info["id"]), - "description": None, - "descriptionHtml": None, - "viewCount": None, - "published": None, - "publishedText": None, - "lengthSeconds": None, - "liveNow": None, + else: + if ucid in self.channel_cache: + if part == "": + return self.channel_cache[ucid] + else: # part == "videos" + return self.channel_cache[ucid]["latestVideos"] + + with requests.get("https://www.youtube.com/channel/{}/videos".format(ucid)) as r: + r.raise_for_status() + yt_initial_data = extract_yt_initial_data(r.content.decode("utf8")) + header = yt_initial_data["header"]["c4TabbedHeaderRenderer"] + author = header["title"] + author_id = header["channelId"] + author_url = header["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"] + author_banners = header["banner"]["thumbnails"] + for t in author_banners: + t["url"] = normalise_url_protocol(t["url"]) + author_thumbnails = header["avatar"]["thumbnails"] + subscriber_count = combine_runs(header["subscriberCountText"]) + description = yt_initial_data["metadata"]["channelMetadataRenderer"]["description"] + allowed_regions = yt_initial_data["metadata"]["channelMetadataRenderer"]["availableCountryCodes"] + tabs = yt_initial_data["contents"]["twoColumnBrowseResultsRenderer"]["tabs"] + videos_tab = next(tab["tabRenderer"] for tab in tabs if tab["tabRenderer"]["title"] == "Videos") + videos = ( + v["gridVideoRenderer"] for v in + videos_tab["content"]["sectionListRenderer"]["contents"][0]["itemSectionRenderer"]["contents"][0]["gridRenderer"]["items"] + ) + latest_videos = [] + for v in videos: + length_text = next(o for o in v["thumbnailOverlays"] if "thumbnailOverlayTimeStatusRenderer" in o) \ + ["thumbnailOverlayTimeStatusRenderer"]["text"]["simpleText"] + latest_videos.append({ + "type": "video", + "title": v["title"]["simpleText"], + "videoId": v["videoId"], + "author": author, + "authorId": author_id, + "authorUrl": author_url, + "videoThumbnails": generate_video_thumbnails(v["videoId"]), + "description": "", + "descriptionHtml": "", + "viewCount": view_count_text_to_number(v["viewCountText"]["simpleText"]), + "second__viewCountText": v["viewCountText"]["simpleText"], + "second__viewCountTextShort": v["shortViewCountText"]["simpleText"], + "published": 0, + "publishedText": v["publishedTimeText"]["simpleText"], + "lengthSeconds": length_text_to_seconds(length_text), + "second__lengthText": length_text, + "liveNow": None, + "paid": None, + "premium": None, + "isUpcoming": None + }) + + channel = { + "author": author, + "authorId": author_id, + "authorUrl": author_url, + "authorBanners": author_banners, + "authorThumbnails": author_thumbnails, + "subCount": uncompress_counter(subscriber_count.split(" ")[0]), + "second__subCountText": subscriber_count, + "totalViews": None, + "joined": None, "paid": None, - "premium": None, - "isUpcoming": None - } for video in info["entries"]), - "relatedChannels": [] - } + "autoGenerated": None, + "isFamilyFriendly": None, + "description": description, + "descriptionHtml": add_html_links(escape_html_textcontent(description)), + "allowedRegions": allowed_regions, + "latestVideos": latest_videos, + "relatedChannels": [] + } - if part == "videos" or part == "latest": - return response["latestVideos"] - else: - return response + self.channel_cache[ucid] = channel - except youtube_dl.DownloadError: - return { - "error": "This channel does not exist.", - "identifier": "CHANNEL_DOES_NOT_EXIST" - } + if part == "": + return channel + else: + return latest_videos @cherrypy.expose @cherrypy.tools.json_out()