From 577cdd8a240c7e2300366e5cac3b077ce4e1d73a Mon Sep 17 00:00:00 2001 From: Cadence Ember Date: Tue, 11 Aug 2020 23:33:14 +1200 Subject: [PATCH] Move extract_yt_initial_data to function --- index.py | 166 +++++++++++++++++++++++++++++-------------------------- 1 file changed, 87 insertions(+), 79 deletions(-) diff --git a/index.py b/index.py index 68e4aa9..467ae29 100644 --- a/index.py +++ b/index.py @@ -26,6 +26,16 @@ def length_text_to_seconds(text): s = text.split(":") return sum([int(x) * 60**(len(s)-i-1) for i, x in enumerate(s)]) +r_yt_intial_data = re.compile(r"""^\s*window\["ytInitialData"\] = (\{.*\});\n?$""", re.M) + +def extract_yt_initial_data(content): + m_yt_initial_data = re.search(r_yt_intial_data, content) + if m_yt_initial_data: + yt_initial_data = json.loads(m_yt_initial_data.group(1)) + return yt_initial_data + else: + raise Exception("Could not match ytInitialData in content") + class Second(object): def __init__(self): self.video_cache = TTLCache(maxsize=50, ttl=300) @@ -166,93 +176,91 @@ class Second(object): if len(possible_files) == 1: filename = possible_files[0] with open(filename) as file: - r_yt_intial_data = re.compile(r"""^\s*window\["ytInitialData"\] = (\{.*\});\n?$""") r_yt_player_config = re.compile(r"""^\s*[^"]+"cfg"[^"]+ytplayer\.config = (\{.*\});ytplayer\.web_player_context_config = {".""") - for line in file: - m_yt_initial_data = re.search(r_yt_intial_data, line) - if m_yt_initial_data: - yt_initial_data = json.loads(m_yt_initial_data.group(1)) - views = yt_initial_data["contents"]["twoColumnWatchNextResults"]["results"]["results"]["contents"][0]\ - ["videoPrimaryInfoRenderer"]["viewCount"]["videoViewCountRenderer"] - result["second__viewCountText"] = views["viewCount"]["simpleText"] - result["second__viewCountTextShort"] = views["shortViewCount"]["simpleText"] - recommendations = yt_initial_data["contents"]["twoColumnWatchNextResults"]["secondaryResults"]\ - ["secondaryResults"]["results"] + content = file.read() - def get_useful_recommendation_data(r): - if "compactVideoRenderer" in r: - return r["compactVideoRenderer"] - if "compactAutoplayRenderer" in r: - return r["compactAutoplayRenderer"]["contents"][0]["compactVideoRenderer"] - return None + yt_initial_data = extract_yt_initial_data(content) + views = yt_initial_data["contents"]["twoColumnWatchNextResults"]["results"]["results"]["contents"][0]\ + ["videoPrimaryInfoRenderer"]["viewCount"]["videoViewCountRenderer"] + result["second__viewCountText"] = views["viewCount"]["simpleText"] + result["second__viewCountTextShort"] = views["shortViewCount"]["simpleText"] + recommendations = yt_initial_data["contents"]["twoColumnWatchNextResults"]["secondaryResults"]\ + ["secondaryResults"]["results"] - def get_view_count(r): - if "runs" in r["viewCountText"]: # has live viewers - return int(r["viewCountText"]["runs"][0]["text"]) - else: - text = r["viewCountText"]["simpleText"] - if text == "Recommended for you": - return 0 # subject to change? - else: - return int(text.replace(",", "").split(" ")[0]) + def get_useful_recommendation_data(r): + if "compactVideoRenderer" in r: + return r["compactVideoRenderer"] + if "compactAutoplayRenderer" in r: + return r["compactAutoplayRenderer"]["contents"][0]["compactVideoRenderer"] + return None - def get_view_count_text(r): - if "runs" in r["viewCountText"]: # has live viewers - text = "".join([x["text"] for x in r["viewCountText"]["runs"]]) - else: # has past views - text = r["viewCountText"]["simpleText"] - if text == "Recommended for you": - return "Recommended for you" # subject to change? - else: - return text + def get_view_count(r): + if "runs" in r["viewCountText"]: # has live viewers + return int(r["viewCountText"]["runs"][0]["text"]) + else: + text = r["viewCountText"]["simpleText"] + if text == "Recommended for you": + return 0 # subject to change? + else: + return int(text.replace(",", "").split(" ")[0]) - def get_length(r): - if "lengthText" in r: - return length_text_to_seconds(r["lengthText"]["simpleText"]) - else: - return -1 + def get_view_count_text(r): + if "runs" in r["viewCountText"]: # has live viewers + text = "".join([x["text"] for x in r["viewCountText"]["runs"]]) + else: # has past views + text = r["viewCountText"]["simpleText"] + if text == "Recommended for you": + return "Recommended for you" # subject to change? + else: + return text - def get_length_text(r): - if "lengthText" in r: - return r["lengthText"]["simpleText"] - else: - return "Live now" + def get_length(r): + if "lengthText" in r: + return length_text_to_seconds(r["lengthText"]["simpleText"]) + else: + return -1 - result["recommendedVideos"] = list({ - "videoId": r["videoId"], - "title": r["title"]["simpleText"], - "videoThumbnails": [], - "author": r["longBylineText"]["runs"][0]["text"], - "authorUrl": r["longBylineText"]["runs"][0]["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"], - "authorId": r["longBylineText"]["runs"][0]["navigationEndpoint"]["browseEndpoint"]["browseId"], - "lengthSeconds": get_length(r), - "second__lengthText": get_length_text(r), - "viewCountText": get_view_count_text(r), - "viewCount": get_view_count(r) - } for r in [get_useful_recommendation_data(r) for r in recommendations if get_useful_recommendation_data(r)]) + def get_length_text(r): + if "lengthText" in r: + return r["lengthText"]["simpleText"] + else: + return "Live now" - m_yt_player_config = re.search(r_yt_player_config, line) - if m_yt_player_config: - yt_player_config = json.loads(m_yt_player_config.group(1)) - player_response = json.loads(yt_player_config["args"]["player_response"]) - if "dashManifestUrl" in player_response["streamingData"]: - result["second__providedDashUrl"] = player_response["streamingData"]["dashManifestUrl"] - # result = player_response - # return result - itagDict = {} - for f in player_response["streamingData"]["adaptiveFormats"]: - if "indexRange" in f: - itagDict[str(f["itag"])] = { - "initRange": f["initRange"], - "indexRange": f["indexRange"], - "audioChannels": f["audioChannels"] if "audioChannels" in f else None - } - for f in result["adaptiveFormats"]: - if f["itag"] in itagDict: - i = itagDict[f["itag"]] - f["init"] = "{}-{}".format(i["initRange"]["start"], i["initRange"]["end"]) - f["index"] = "{}-{}".format(i["indexRange"]["start"], i["indexRange"]["end"]) - f["second__audioChannels"] = i["audioChannels"] + result["recommendedVideos"] = list({ + "videoId": r["videoId"], + "title": r["title"]["simpleText"], + "videoThumbnails": [], + "author": r["longBylineText"]["runs"][0]["text"], + "authorUrl": r["longBylineText"]["runs"][0]["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"], + "authorId": r["longBylineText"]["runs"][0]["navigationEndpoint"]["browseEndpoint"]["browseId"], + "lengthSeconds": get_length(r), + "second__lengthText": get_length_text(r), + "viewCountText": get_view_count_text(r), + "viewCount": get_view_count(r) + } for r in [get_useful_recommendation_data(r) for r in recommendations if get_useful_recommendation_data(r)]) + + m_yt_player_config = re.search(r_yt_player_config, line) + if m_yt_player_config: + yt_player_config = json.loads(m_yt_player_config.group(1)) + player_response = json.loads(yt_player_config["args"]["player_response"]) + if "dashManifestUrl" in player_response["streamingData"]: + result["second__providedDashUrl"] = player_response["streamingData"]["dashManifestUrl"] + # result = player_response + # return result + itagDict = {} + for f in player_response["streamingData"]["adaptiveFormats"]: + if "indexRange" in f: + itagDict[str(f["itag"])] = { + "initRange": f["initRange"], + "indexRange": f["indexRange"], + "audioChannels": f["audioChannels"] if "audioChannels" in f else None + } + for f in result["adaptiveFormats"]: + if f["itag"] in itagDict: + i = itagDict[f["itag"]] + f["init"] = "{}-{}".format(i["initRange"]["start"], i["initRange"]["end"]) + f["index"] = "{}-{}".format(i["indexRange"]["start"], i["indexRange"]["end"]) + f["second__audioChannels"] = i["audioChannels"] except Exception: print("messed up extracting recommendations.")