diff --git a/index.py b/index.py
index 467ae29..486596a 100644
--- a/index.py
+++ b/index.py
@@ -36,9 +36,74 @@ def extract_yt_initial_data(content):
else:
raise Exception("Could not match ytInitialData in content")
+def combine_runs(runs):
+ if "runs" in runs: # check if already unpacked
+ runs = runs["runs"]
+ return "".join([r["text"] for r in runs])
+
+def escape_html_textcontent(text):
+ return (
+ text
+ .replace("&", "&")
+ .replace("<", "<")
+ .replace(">", ">")
+ .replace('"', """)
+ .replace("\n", "
")
+ )
+
+def combine_runs_html(runs):
+ if "runs" in runs: # check if already unpackged
+ runs = runs["runs"]
+ result = ""
+ for part in runs:
+ if part.get("bold"):
+ result += "{}".format(escape_html_textcontent(part["text"]))
+ else:
+ result += part["text"]
+ return result
+
+def view_count_text_to_number(text):
+ return int(text.split(" ")[0].replace(",", ""))
+
+def get_view_count_or_recommended(view_count_container):
+ if "runs" in view_count_container["viewCountText"]: # has live viewers
+ return int(combine_runs(view_count_container["viewCountText"]))
+ else:
+ text = view_count_container["viewCountText"]["simpleText"]
+ if text == "Recommended for you":
+ return 0 # subject to change?
+ else:
+ return view_count_text_to_number(text)
+
+def get_view_count_text_or_recommended(view_count_container):
+ if "runs" in view_count_container["viewCountText"]: # has live viewers
+ text = combine_runs(view_count_container["viewCountText"])
+ else: # has past views
+ text = view_count_container["viewCountText"]["simpleText"]
+ if text == "Recommended for you":
+ return "Recommended for you" #subject to change?
+ else:
+ return text
+
+def is_live(length_container):
+ return "lengthText" not in length_container
+
+def get_length_or_live_now(length_container):
+ if "lengthText" in length_container:
+ return length_text_to_seconds(length_container["lengthText"]["simpleText"])
+ else:
+ return -1
+
+def get_length_text_or_live_now(length_container):
+ if "lengthText" in length_container:
+ return length_container["lengthText"]["simpleText"]
+ else:
+ return "Live now"
+
class Second(object):
def __init__(self):
self.video_cache = TTLCache(maxsize=50, ttl=300)
+ self.search_cache = TTLCache(maxsize=50, ttl=300)
def _cp_dispatch(self, vpath):
if vpath[:4] == ["api", "manifest", "dash", "id"]:
@@ -128,7 +193,7 @@ class Second(object):
"isUpcoming": None,
"dashUrl": "/api/manifest/dash/id/{}".format(info["id"]),
"second__providedDashUrl": None,
- "adaptiveFormats": list({
+ "adaptiveFormats": [{
"index": None,
"bitrate": str(int(format["tbr"]*1000)),
"init": None,
@@ -147,8 +212,8 @@ class Second(object):
"qualityLabel": format["format_note"],
"second__width": format["width"],
"second__height": format["height"]
- } for format in info["formats"] if format_is_adaptive(format)),
- "formatStreams": list({
+ } for format in info["formats"] if format_is_adaptive(format)],
+ "formatStreams": [{
"url": format["url"],
"itag": format["format_id"],
"type": format_type(format),
@@ -162,7 +227,7 @@ class Second(object):
"size": "{}x{}".format(format["width"], format["height"]),
"second__width": format["width"],
"second__height": format["height"]
- } for format in info["formats"] if not format_is_adaptive(format)),
+ } for format in info["formats"] if not format_is_adaptive(format)],
"captions": [],
"recommendedVideos": []
}
@@ -182,7 +247,7 @@ class Second(object):
yt_initial_data = extract_yt_initial_data(content)
views = yt_initial_data["contents"]["twoColumnWatchNextResults"]["results"]["results"]["contents"][0]\
["videoPrimaryInfoRenderer"]["viewCount"]["videoViewCountRenderer"]
- result["second__viewCountText"] = views["viewCount"]["simpleText"]
+ result["second__viewCountText"] = get_view_count_text_or_recommended(views)
result["second__viewCountTextShort"] = views["shortViewCount"]["simpleText"]
recommendations = yt_initial_data["contents"]["twoColumnWatchNextResults"]["secondaryResults"]\
["secondaryResults"]["results"]
@@ -194,49 +259,17 @@ class Second(object):
return r["compactAutoplayRenderer"]["contents"][0]["compactVideoRenderer"]
return None
- def get_view_count(r):
- if "runs" in r["viewCountText"]: # has live viewers
- return int(r["viewCountText"]["runs"][0]["text"])
- else:
- text = r["viewCountText"]["simpleText"]
- if text == "Recommended for you":
- return 0 # subject to change?
- else:
- return int(text.replace(",", "").split(" ")[0])
-
- def get_view_count_text(r):
- if "runs" in r["viewCountText"]: # has live viewers
- text = "".join([x["text"] for x in r["viewCountText"]["runs"]])
- else: # has past views
- text = r["viewCountText"]["simpleText"]
- if text == "Recommended for you":
- return "Recommended for you" # subject to change?
- else:
- return text
-
- def get_length(r):
- if "lengthText" in r:
- return length_text_to_seconds(r["lengthText"]["simpleText"])
- else:
- return -1
-
- def get_length_text(r):
- if "lengthText" in r:
- return r["lengthText"]["simpleText"]
- else:
- return "Live now"
-
result["recommendedVideos"] = list({
"videoId": r["videoId"],
"title": r["title"]["simpleText"],
"videoThumbnails": [],
- "author": r["longBylineText"]["runs"][0]["text"],
+ "author": combine_runs(r["longBylineText"]),
"authorUrl": r["longBylineText"]["runs"][0]["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"],
"authorId": r["longBylineText"]["runs"][0]["navigationEndpoint"]["browseEndpoint"]["browseId"],
- "lengthSeconds": get_length(r),
- "second__lengthText": get_length_text(r),
- "viewCountText": get_view_count_text(r),
- "viewCount": get_view_count(r)
+ "lengthSeconds": get_length_or_live_now(r),
+ "second__lengthText": get_length_text_or_live_now(r),
+ "viewCountText": get_view_count_text_or_recommended(r),
+ "viewCount": get_view_count_or_recommended(r)
} for r in [get_useful_recommendation_data(r) for r in recommendations if get_useful_recommendation_data(r)])
m_yt_player_config = re.search(r_yt_player_config, line)
@@ -401,26 +434,67 @@ class Second(object):
if suffix == ("suggestions",):
return self.suggestions(q=q)
- info = ytdl.extract_info("ytsearchall:{}".format(q), download=False)
- return list({
- "type": "video",
- "title": video["title"],
- "videoId": video["id"],
- "author": None,
- "authorId": None,
- "authorUrl": None,
- "videoThumbnails": [],
- "description": None,
- "descriptionHtml": None,
- "viewCount": None,
- "published": None,
- "publishedText": None,
- "lengthSeconds": None,
- "liveNow": None,
- "paid": None,
- "premium": None,
- "isUpcoming": None
- } for video in info["entries"] if "title" in video)
+ if q in self.search_cache:
+ return self.search_cache[q]
+
+ try:
+ with requests.get("https://www.youtube.com/results", params={"q": q}) as r:
+ r.raise_for_status()
+ content = r.content.decode("utf8")
+ yt_initial_data = extract_yt_initial_data(content)
+ items = yt_initial_data["contents"]["twoColumnSearchResultsRenderer"]["primaryContents"]["sectionListRenderer"]["contents"][0]["itemSectionRenderer"]["contents"]
+ results = []
+ for item in items:
+ if "videoRenderer" in item:
+ video = item["videoRenderer"]
+ results.append({
+ "type": "video",
+ "title": combine_runs(video["title"]),
+ "videoId": video["videoId"],
+ "author": combine_runs(video["longBylineText"]),
+ "authorId": video["longBylineText"]["runs"][0]["navigationEndpoint"]["browseEndpoint"]["browseId"],
+ "authorUrl": video["longBylineText"]["runs"][0]["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"],
+ "videoThumbnails": [],
+ "description": combine_runs(video["descriptionSnippet"]) if "descriptionSnippet" in video else "",
+ "descriptionHtml": combine_runs_html(video["descriptionSnippet"]) if "descriptionSnippet" in video else "",
+ "viewCount": get_view_count_or_recommended(video),
+ "second__viewCountText": get_view_count_text_or_recommended(video),
+ "published": None,
+ "publishedText": video["publishedTimeText"]["simpleText"],
+ "lengthSeconds": get_length_or_live_now(video),
+ "second__lengthText": get_length_text_or_live_now(video),
+ "liveNow": is_live(video),
+ "paid": None,
+ "premium": None,
+ "isUpcoming": None
+ })
+ self.search_cache[q] = results # only cache full extraction
+ return results
+
+ except Exception:
+ print("messed up extracting search, using youtube-dl instead")
+ traceback.print_exc()
+
+ info = ytdl.extract_info("ytsearchall:{}".format(q), download=False)
+ return [{
+ "type": "video",
+ "title": video["title"],
+ "videoId": video["id"],
+ "author": None,
+ "authorId": None,
+ "authorUrl": None,
+ "videoThumbnails": [],
+ "description": None,
+ "descriptionHtml": None,
+ "viewCount": None,
+ "published": None,
+ "publishedText": None,
+ "lengthSeconds": None,
+ "liveNow": None,
+ "paid": None,
+ "premium": None,
+ "isUpcoming": None
+ } for video in info["entries"] if "title" in video]
@cherrypy.expose
@cherrypy.tools.json_out()