1
0
mirror of https://git.sr.ht/~cadence/NewLeaf synced 2024-11-22 07:37:29 +00:00

Move extract_yt_initial_data to function

This commit is contained in:
Cadence Ember 2020-08-11 23:33:14 +12:00
parent 080b00bc0c
commit 577cdd8a24
No known key found for this signature in database
GPG Key ID: 128B99B1B74A6412

166
index.py
View File

@ -26,6 +26,16 @@ def length_text_to_seconds(text):
s = text.split(":") s = text.split(":")
return sum([int(x) * 60**(len(s)-i-1) for i, x in enumerate(s)]) return sum([int(x) * 60**(len(s)-i-1) for i, x in enumerate(s)])
r_yt_intial_data = re.compile(r"""^\s*window\["ytInitialData"\] = (\{.*\});\n?$""", re.M)
def extract_yt_initial_data(content):
m_yt_initial_data = re.search(r_yt_intial_data, content)
if m_yt_initial_data:
yt_initial_data = json.loads(m_yt_initial_data.group(1))
return yt_initial_data
else:
raise Exception("Could not match ytInitialData in content")
class Second(object): class Second(object):
def __init__(self): def __init__(self):
self.video_cache = TTLCache(maxsize=50, ttl=300) self.video_cache = TTLCache(maxsize=50, ttl=300)
@ -166,93 +176,91 @@ class Second(object):
if len(possible_files) == 1: if len(possible_files) == 1:
filename = possible_files[0] filename = possible_files[0]
with open(filename) as file: with open(filename) as file:
r_yt_intial_data = re.compile(r"""^\s*window\["ytInitialData"\] = (\{.*\});\n?$""")
r_yt_player_config = re.compile(r"""^\s*[^"]+"cfg"[^"]+ytplayer\.config = (\{.*\});ytplayer\.web_player_context_config = {".""") r_yt_player_config = re.compile(r"""^\s*[^"]+"cfg"[^"]+ytplayer\.config = (\{.*\});ytplayer\.web_player_context_config = {".""")
for line in file: content = file.read()
m_yt_initial_data = re.search(r_yt_intial_data, line)
if m_yt_initial_data:
yt_initial_data = json.loads(m_yt_initial_data.group(1))
views = yt_initial_data["contents"]["twoColumnWatchNextResults"]["results"]["results"]["contents"][0]\
["videoPrimaryInfoRenderer"]["viewCount"]["videoViewCountRenderer"]
result["second__viewCountText"] = views["viewCount"]["simpleText"]
result["second__viewCountTextShort"] = views["shortViewCount"]["simpleText"]
recommendations = yt_initial_data["contents"]["twoColumnWatchNextResults"]["secondaryResults"]\
["secondaryResults"]["results"]
def get_useful_recommendation_data(r): yt_initial_data = extract_yt_initial_data(content)
if "compactVideoRenderer" in r: views = yt_initial_data["contents"]["twoColumnWatchNextResults"]["results"]["results"]["contents"][0]\
return r["compactVideoRenderer"] ["videoPrimaryInfoRenderer"]["viewCount"]["videoViewCountRenderer"]
if "compactAutoplayRenderer" in r: result["second__viewCountText"] = views["viewCount"]["simpleText"]
return r["compactAutoplayRenderer"]["contents"][0]["compactVideoRenderer"] result["second__viewCountTextShort"] = views["shortViewCount"]["simpleText"]
return None recommendations = yt_initial_data["contents"]["twoColumnWatchNextResults"]["secondaryResults"]\
["secondaryResults"]["results"]
def get_view_count(r): def get_useful_recommendation_data(r):
if "runs" in r["viewCountText"]: # has live viewers if "compactVideoRenderer" in r:
return int(r["viewCountText"]["runs"][0]["text"]) return r["compactVideoRenderer"]
else: if "compactAutoplayRenderer" in r:
text = r["viewCountText"]["simpleText"] return r["compactAutoplayRenderer"]["contents"][0]["compactVideoRenderer"]
if text == "Recommended for you": return None
return 0 # subject to change?
else:
return int(text.replace(",", "").split(" ")[0])
def get_view_count_text(r): def get_view_count(r):
if "runs" in r["viewCountText"]: # has live viewers if "runs" in r["viewCountText"]: # has live viewers
text = "".join([x["text"] for x in r["viewCountText"]["runs"]]) return int(r["viewCountText"]["runs"][0]["text"])
else: # has past views else:
text = r["viewCountText"]["simpleText"] text = r["viewCountText"]["simpleText"]
if text == "Recommended for you": if text == "Recommended for you":
return "Recommended for you" # subject to change? return 0 # subject to change?
else: else:
return text return int(text.replace(",", "").split(" ")[0])
def get_length(r): def get_view_count_text(r):
if "lengthText" in r: if "runs" in r["viewCountText"]: # has live viewers
return length_text_to_seconds(r["lengthText"]["simpleText"]) text = "".join([x["text"] for x in r["viewCountText"]["runs"]])
else: else: # has past views
return -1 text = r["viewCountText"]["simpleText"]
if text == "Recommended for you":
return "Recommended for you" # subject to change?
else:
return text
def get_length_text(r): def get_length(r):
if "lengthText" in r: if "lengthText" in r:
return r["lengthText"]["simpleText"] return length_text_to_seconds(r["lengthText"]["simpleText"])
else: else:
return "Live now" return -1
result["recommendedVideos"] = list({ def get_length_text(r):
"videoId": r["videoId"], if "lengthText" in r:
"title": r["title"]["simpleText"], return r["lengthText"]["simpleText"]
"videoThumbnails": [], else:
"author": r["longBylineText"]["runs"][0]["text"], return "Live now"
"authorUrl": r["longBylineText"]["runs"][0]["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"],
"authorId": r["longBylineText"]["runs"][0]["navigationEndpoint"]["browseEndpoint"]["browseId"],
"lengthSeconds": get_length(r),
"second__lengthText": get_length_text(r),
"viewCountText": get_view_count_text(r),
"viewCount": get_view_count(r)
} for r in [get_useful_recommendation_data(r) for r in recommendations if get_useful_recommendation_data(r)])
m_yt_player_config = re.search(r_yt_player_config, line) result["recommendedVideos"] = list({
if m_yt_player_config: "videoId": r["videoId"],
yt_player_config = json.loads(m_yt_player_config.group(1)) "title": r["title"]["simpleText"],
player_response = json.loads(yt_player_config["args"]["player_response"]) "videoThumbnails": [],
if "dashManifestUrl" in player_response["streamingData"]: "author": r["longBylineText"]["runs"][0]["text"],
result["second__providedDashUrl"] = player_response["streamingData"]["dashManifestUrl"] "authorUrl": r["longBylineText"]["runs"][0]["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"],
# result = player_response "authorId": r["longBylineText"]["runs"][0]["navigationEndpoint"]["browseEndpoint"]["browseId"],
# return result "lengthSeconds": get_length(r),
itagDict = {} "second__lengthText": get_length_text(r),
for f in player_response["streamingData"]["adaptiveFormats"]: "viewCountText": get_view_count_text(r),
if "indexRange" in f: "viewCount": get_view_count(r)
itagDict[str(f["itag"])] = { } for r in [get_useful_recommendation_data(r) for r in recommendations if get_useful_recommendation_data(r)])
"initRange": f["initRange"],
"indexRange": f["indexRange"], m_yt_player_config = re.search(r_yt_player_config, line)
"audioChannels": f["audioChannels"] if "audioChannels" in f else None if m_yt_player_config:
} yt_player_config = json.loads(m_yt_player_config.group(1))
for f in result["adaptiveFormats"]: player_response = json.loads(yt_player_config["args"]["player_response"])
if f["itag"] in itagDict: if "dashManifestUrl" in player_response["streamingData"]:
i = itagDict[f["itag"]] result["second__providedDashUrl"] = player_response["streamingData"]["dashManifestUrl"]
f["init"] = "{}-{}".format(i["initRange"]["start"], i["initRange"]["end"]) # result = player_response
f["index"] = "{}-{}".format(i["indexRange"]["start"], i["indexRange"]["end"]) # return result
f["second__audioChannels"] = i["audioChannels"] itagDict = {}
for f in player_response["streamingData"]["adaptiveFormats"]:
if "indexRange" in f:
itagDict[str(f["itag"])] = {
"initRange": f["initRange"],
"indexRange": f["indexRange"],
"audioChannels": f["audioChannels"] if "audioChannels" in f else None
}
for f in result["adaptiveFormats"]:
if f["itag"] in itagDict:
i = itagDict[f["itag"]]
f["init"] = "{}-{}".format(i["initRange"]["start"], i["initRange"]["end"])
f["index"] = "{}-{}".format(i["indexRange"]["start"], i["indexRange"]["end"])
f["second__audioChannels"] = i["audioChannels"]
except Exception: except Exception:
print("messed up extracting recommendations.") print("messed up extracting recommendations.")