From 9ee1f3ce89c963e4730e7a8333f93172aa57efe6 Mon Sep 17 00:00:00 2001 From: Cadence Ember Date: Wed, 12 Aug 2020 00:38:26 +1200 Subject: [PATCH] Implement new extractor for searches --- index.py | 198 ++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 136 insertions(+), 62 deletions(-) diff --git a/index.py b/index.py index 467ae29..486596a 100644 --- a/index.py +++ b/index.py @@ -36,9 +36,74 @@ def extract_yt_initial_data(content): else: raise Exception("Could not match ytInitialData in content") +def combine_runs(runs): + if "runs" in runs: # check if already unpacked + runs = runs["runs"] + return "".join([r["text"] for r in runs]) + +def escape_html_textcontent(text): + return ( + text + .replace("&", "&") + .replace("<", "<") + .replace(">", ">") + .replace('"', """) + .replace("\n", "
") + ) + +def combine_runs_html(runs): + if "runs" in runs: # check if already unpackged + runs = runs["runs"] + result = "" + for part in runs: + if part.get("bold"): + result += "{}".format(escape_html_textcontent(part["text"])) + else: + result += part["text"] + return result + +def view_count_text_to_number(text): + return int(text.split(" ")[0].replace(",", "")) + +def get_view_count_or_recommended(view_count_container): + if "runs" in view_count_container["viewCountText"]: # has live viewers + return int(combine_runs(view_count_container["viewCountText"])) + else: + text = view_count_container["viewCountText"]["simpleText"] + if text == "Recommended for you": + return 0 # subject to change? + else: + return view_count_text_to_number(text) + +def get_view_count_text_or_recommended(view_count_container): + if "runs" in view_count_container["viewCountText"]: # has live viewers + text = combine_runs(view_count_container["viewCountText"]) + else: # has past views + text = view_count_container["viewCountText"]["simpleText"] + if text == "Recommended for you": + return "Recommended for you" #subject to change? + else: + return text + +def is_live(length_container): + return "lengthText" not in length_container + +def get_length_or_live_now(length_container): + if "lengthText" in length_container: + return length_text_to_seconds(length_container["lengthText"]["simpleText"]) + else: + return -1 + +def get_length_text_or_live_now(length_container): + if "lengthText" in length_container: + return length_container["lengthText"]["simpleText"] + else: + return "Live now" + class Second(object): def __init__(self): self.video_cache = TTLCache(maxsize=50, ttl=300) + self.search_cache = TTLCache(maxsize=50, ttl=300) def _cp_dispatch(self, vpath): if vpath[:4] == ["api", "manifest", "dash", "id"]: @@ -128,7 +193,7 @@ class Second(object): "isUpcoming": None, "dashUrl": "/api/manifest/dash/id/{}".format(info["id"]), "second__providedDashUrl": None, - "adaptiveFormats": list({ + "adaptiveFormats": [{ "index": None, "bitrate": str(int(format["tbr"]*1000)), "init": None, @@ -147,8 +212,8 @@ class Second(object): "qualityLabel": format["format_note"], "second__width": format["width"], "second__height": format["height"] - } for format in info["formats"] if format_is_adaptive(format)), - "formatStreams": list({ + } for format in info["formats"] if format_is_adaptive(format)], + "formatStreams": [{ "url": format["url"], "itag": format["format_id"], "type": format_type(format), @@ -162,7 +227,7 @@ class Second(object): "size": "{}x{}".format(format["width"], format["height"]), "second__width": format["width"], "second__height": format["height"] - } for format in info["formats"] if not format_is_adaptive(format)), + } for format in info["formats"] if not format_is_adaptive(format)], "captions": [], "recommendedVideos": [] } @@ -182,7 +247,7 @@ class Second(object): yt_initial_data = extract_yt_initial_data(content) views = yt_initial_data["contents"]["twoColumnWatchNextResults"]["results"]["results"]["contents"][0]\ ["videoPrimaryInfoRenderer"]["viewCount"]["videoViewCountRenderer"] - result["second__viewCountText"] = views["viewCount"]["simpleText"] + result["second__viewCountText"] = get_view_count_text_or_recommended(views) result["second__viewCountTextShort"] = views["shortViewCount"]["simpleText"] recommendations = yt_initial_data["contents"]["twoColumnWatchNextResults"]["secondaryResults"]\ ["secondaryResults"]["results"] @@ -194,49 +259,17 @@ class Second(object): return r["compactAutoplayRenderer"]["contents"][0]["compactVideoRenderer"] return None - def get_view_count(r): - if "runs" in r["viewCountText"]: # has live viewers - return int(r["viewCountText"]["runs"][0]["text"]) - else: - text = r["viewCountText"]["simpleText"] - if text == "Recommended for you": - return 0 # subject to change? - else: - return int(text.replace(",", "").split(" ")[0]) - - def get_view_count_text(r): - if "runs" in r["viewCountText"]: # has live viewers - text = "".join([x["text"] for x in r["viewCountText"]["runs"]]) - else: # has past views - text = r["viewCountText"]["simpleText"] - if text == "Recommended for you": - return "Recommended for you" # subject to change? - else: - return text - - def get_length(r): - if "lengthText" in r: - return length_text_to_seconds(r["lengthText"]["simpleText"]) - else: - return -1 - - def get_length_text(r): - if "lengthText" in r: - return r["lengthText"]["simpleText"] - else: - return "Live now" - result["recommendedVideos"] = list({ "videoId": r["videoId"], "title": r["title"]["simpleText"], "videoThumbnails": [], - "author": r["longBylineText"]["runs"][0]["text"], + "author": combine_runs(r["longBylineText"]), "authorUrl": r["longBylineText"]["runs"][0]["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"], "authorId": r["longBylineText"]["runs"][0]["navigationEndpoint"]["browseEndpoint"]["browseId"], - "lengthSeconds": get_length(r), - "second__lengthText": get_length_text(r), - "viewCountText": get_view_count_text(r), - "viewCount": get_view_count(r) + "lengthSeconds": get_length_or_live_now(r), + "second__lengthText": get_length_text_or_live_now(r), + "viewCountText": get_view_count_text_or_recommended(r), + "viewCount": get_view_count_or_recommended(r) } for r in [get_useful_recommendation_data(r) for r in recommendations if get_useful_recommendation_data(r)]) m_yt_player_config = re.search(r_yt_player_config, line) @@ -401,26 +434,67 @@ class Second(object): if suffix == ("suggestions",): return self.suggestions(q=q) - info = ytdl.extract_info("ytsearchall:{}".format(q), download=False) - return list({ - "type": "video", - "title": video["title"], - "videoId": video["id"], - "author": None, - "authorId": None, - "authorUrl": None, - "videoThumbnails": [], - "description": None, - "descriptionHtml": None, - "viewCount": None, - "published": None, - "publishedText": None, - "lengthSeconds": None, - "liveNow": None, - "paid": None, - "premium": None, - "isUpcoming": None - } for video in info["entries"] if "title" in video) + if q in self.search_cache: + return self.search_cache[q] + + try: + with requests.get("https://www.youtube.com/results", params={"q": q}) as r: + r.raise_for_status() + content = r.content.decode("utf8") + yt_initial_data = extract_yt_initial_data(content) + items = yt_initial_data["contents"]["twoColumnSearchResultsRenderer"]["primaryContents"]["sectionListRenderer"]["contents"][0]["itemSectionRenderer"]["contents"] + results = [] + for item in items: + if "videoRenderer" in item: + video = item["videoRenderer"] + results.append({ + "type": "video", + "title": combine_runs(video["title"]), + "videoId": video["videoId"], + "author": combine_runs(video["longBylineText"]), + "authorId": video["longBylineText"]["runs"][0]["navigationEndpoint"]["browseEndpoint"]["browseId"], + "authorUrl": video["longBylineText"]["runs"][0]["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"], + "videoThumbnails": [], + "description": combine_runs(video["descriptionSnippet"]) if "descriptionSnippet" in video else "", + "descriptionHtml": combine_runs_html(video["descriptionSnippet"]) if "descriptionSnippet" in video else "", + "viewCount": get_view_count_or_recommended(video), + "second__viewCountText": get_view_count_text_or_recommended(video), + "published": None, + "publishedText": video["publishedTimeText"]["simpleText"], + "lengthSeconds": get_length_or_live_now(video), + "second__lengthText": get_length_text_or_live_now(video), + "liveNow": is_live(video), + "paid": None, + "premium": None, + "isUpcoming": None + }) + self.search_cache[q] = results # only cache full extraction + return results + + except Exception: + print("messed up extracting search, using youtube-dl instead") + traceback.print_exc() + + info = ytdl.extract_info("ytsearchall:{}".format(q), download=False) + return [{ + "type": "video", + "title": video["title"], + "videoId": video["id"], + "author": None, + "authorId": None, + "authorUrl": None, + "videoThumbnails": [], + "description": None, + "descriptionHtml": None, + "viewCount": None, + "published": None, + "publishedText": None, + "lengthSeconds": None, + "liveNow": None, + "paid": None, + "premium": None, + "isUpcoming": None + } for video in info["entries"] if "title" in video] @cherrypy.expose @cherrypy.tools.json_out()