mirror of
				https://git.sr.ht/~cadence/NewLeaf
				synced 2025-10-26 00:55:36 +00:00 
			
		
		
		
	Implement new extractor for searches
This commit is contained in:
		
							parent
							
								
									577cdd8a24
								
							
						
					
					
						commit
						9ee1f3ce89
					
				
							
								
								
									
										162
									
								
								index.py
									
									
									
									
									
								
							
							
						
						
									
										162
									
								
								index.py
									
									
									
									
									
								
							| @ -36,9 +36,74 @@ def extract_yt_initial_data(content): | ||||
| 	else: | ||||
| 		raise Exception("Could not match ytInitialData in content") | ||||
| 
 | ||||
| def combine_runs(runs): | ||||
| 	if "runs" in runs: # check if already unpacked | ||||
| 		runs = runs["runs"] | ||||
| 	return "".join([r["text"] for r in runs]) | ||||
| 
 | ||||
| def escape_html_textcontent(text): | ||||
| 	return ( | ||||
| 		text | ||||
| 			.replace("&", "&") | ||||
| 			.replace("<", "<") | ||||
| 			.replace(">", ">") | ||||
| 			.replace('"', """) | ||||
| 			.replace("\n", "<br>") | ||||
| 	) | ||||
| 
 | ||||
| def combine_runs_html(runs): | ||||
| 	if "runs" in runs: # check if already unpackged | ||||
| 		runs = runs["runs"] | ||||
| 	result = "" | ||||
| 	for part in runs: | ||||
| 		if part.get("bold"): | ||||
| 			result += "<b>{}</b>".format(escape_html_textcontent(part["text"])) | ||||
| 		else: | ||||
| 			result += part["text"] | ||||
| 	return result | ||||
| 
 | ||||
| def view_count_text_to_number(text): | ||||
| 	return int(text.split(" ")[0].replace(",", "")) | ||||
| 
 | ||||
| def get_view_count_or_recommended(view_count_container): | ||||
| 	if "runs" in view_count_container["viewCountText"]: # has live viewers | ||||
| 		return int(combine_runs(view_count_container["viewCountText"])) | ||||
| 	else: | ||||
| 		text = view_count_container["viewCountText"]["simpleText"] | ||||
| 		if text == "Recommended for you": | ||||
| 			return 0 # subject to change? | ||||
| 		else: | ||||
| 			return view_count_text_to_number(text) | ||||
| 
 | ||||
| def get_view_count_text_or_recommended(view_count_container): | ||||
| 	if "runs" in view_count_container["viewCountText"]: # has live viewers | ||||
| 		text = combine_runs(view_count_container["viewCountText"]) | ||||
| 	else: # has past views | ||||
| 		text = view_count_container["viewCountText"]["simpleText"] | ||||
| 		if text == "Recommended for you": | ||||
| 			return "Recommended for you" #subject to change? | ||||
| 		else: | ||||
| 			return text | ||||
| 
 | ||||
| def is_live(length_container): | ||||
| 	return "lengthText" not in length_container | ||||
| 
 | ||||
| def get_length_or_live_now(length_container): | ||||
| 	if "lengthText" in length_container: | ||||
| 		return length_text_to_seconds(length_container["lengthText"]["simpleText"]) | ||||
| 	else: | ||||
| 		return -1 | ||||
| 
 | ||||
| def get_length_text_or_live_now(length_container): | ||||
| 	if "lengthText" in length_container: | ||||
| 		return length_container["lengthText"]["simpleText"] | ||||
| 	else: | ||||
| 		return "Live now" | ||||
| 
 | ||||
| class Second(object): | ||||
| 	def __init__(self): | ||||
| 		self.video_cache = TTLCache(maxsize=50, ttl=300) | ||||
| 		self.search_cache = TTLCache(maxsize=50, ttl=300) | ||||
| 
 | ||||
| 	def _cp_dispatch(self, vpath): | ||||
| 		if vpath[:4] == ["api", "manifest", "dash", "id"]: | ||||
| @ -128,7 +193,7 @@ class Second(object): | ||||
| 				"isUpcoming": None, | ||||
| 				"dashUrl": "/api/manifest/dash/id/{}".format(info["id"]), | ||||
| 				"second__providedDashUrl": None, | ||||
| 				"adaptiveFormats": list({ | ||||
| 				"adaptiveFormats": [{ | ||||
| 					"index": None, | ||||
| 					"bitrate": str(int(format["tbr"]*1000)), | ||||
| 					"init": None, | ||||
| @ -147,8 +212,8 @@ class Second(object): | ||||
| 					"qualityLabel": format["format_note"], | ||||
| 					"second__width": format["width"], | ||||
| 					"second__height": format["height"] | ||||
| 				} for format in info["formats"] if format_is_adaptive(format)), | ||||
| 				"formatStreams": list({ | ||||
| 				} for format in info["formats"] if format_is_adaptive(format)], | ||||
| 				"formatStreams": [{ | ||||
| 					"url": format["url"], | ||||
| 					"itag": format["format_id"], | ||||
| 					"type": format_type(format), | ||||
| @ -162,7 +227,7 @@ class Second(object): | ||||
| 					"size": "{}x{}".format(format["width"], format["height"]), | ||||
| 					"second__width": format["width"], | ||||
| 					"second__height": format["height"] | ||||
| 				} for format in info["formats"] if not format_is_adaptive(format)), | ||||
| 				} for format in info["formats"] if not format_is_adaptive(format)], | ||||
| 				"captions": [], | ||||
| 				"recommendedVideos": [] | ||||
| 			} | ||||
| @ -182,7 +247,7 @@ class Second(object): | ||||
| 						yt_initial_data = extract_yt_initial_data(content) | ||||
| 						views = yt_initial_data["contents"]["twoColumnWatchNextResults"]["results"]["results"]["contents"][0]\ | ||||
| 							["videoPrimaryInfoRenderer"]["viewCount"]["videoViewCountRenderer"] | ||||
| 						result["second__viewCountText"] = views["viewCount"]["simpleText"] | ||||
| 						result["second__viewCountText"] = get_view_count_text_or_recommended(views) | ||||
| 						result["second__viewCountTextShort"] = views["shortViewCount"]["simpleText"] | ||||
| 						recommendations = yt_initial_data["contents"]["twoColumnWatchNextResults"]["secondaryResults"]\ | ||||
| 							["secondaryResults"]["results"] | ||||
| @ -194,49 +259,17 @@ class Second(object): | ||||
| 								return r["compactAutoplayRenderer"]["contents"][0]["compactVideoRenderer"] | ||||
| 							return None | ||||
| 
 | ||||
| 						def get_view_count(r): | ||||
| 							if "runs" in r["viewCountText"]: # has live viewers | ||||
| 								return int(r["viewCountText"]["runs"][0]["text"]) | ||||
| 							else: | ||||
| 								text = r["viewCountText"]["simpleText"] | ||||
| 								if text == "Recommended for you": | ||||
| 									return 0 # subject to change? | ||||
| 								else: | ||||
| 									return int(text.replace(",", "").split(" ")[0]) | ||||
| 
 | ||||
| 						def get_view_count_text(r): | ||||
| 							if "runs" in r["viewCountText"]: # has live viewers | ||||
| 								text = "".join([x["text"] for x in r["viewCountText"]["runs"]]) | ||||
| 							else: # has past views | ||||
| 								text = r["viewCountText"]["simpleText"] | ||||
| 								if text == "Recommended for you": | ||||
| 									return "Recommended for you" # subject to change? | ||||
| 								else: | ||||
| 									return text | ||||
| 
 | ||||
| 						def get_length(r): | ||||
| 							if "lengthText" in r: | ||||
| 								return length_text_to_seconds(r["lengthText"]["simpleText"]) | ||||
| 							else: | ||||
| 								return -1 | ||||
| 
 | ||||
| 						def get_length_text(r): | ||||
| 							if "lengthText" in r: | ||||
| 								return r["lengthText"]["simpleText"] | ||||
| 							else: | ||||
| 								return "Live now" | ||||
| 
 | ||||
| 						result["recommendedVideos"] = list({ | ||||
| 							"videoId": r["videoId"], | ||||
| 							"title": r["title"]["simpleText"], | ||||
| 							"videoThumbnails": [], | ||||
| 							"author": r["longBylineText"]["runs"][0]["text"], | ||||
| 							"author": combine_runs(r["longBylineText"]), | ||||
| 							"authorUrl": r["longBylineText"]["runs"][0]["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"], | ||||
| 							"authorId": r["longBylineText"]["runs"][0]["navigationEndpoint"]["browseEndpoint"]["browseId"], | ||||
| 							"lengthSeconds": get_length(r), | ||||
| 							"second__lengthText": get_length_text(r), | ||||
| 							"viewCountText": get_view_count_text(r), | ||||
| 							"viewCount": get_view_count(r) | ||||
| 							"lengthSeconds": get_length_or_live_now(r), | ||||
| 							"second__lengthText": get_length_text_or_live_now(r), | ||||
| 							"viewCountText": get_view_count_text_or_recommended(r), | ||||
| 							"viewCount": get_view_count_or_recommended(r) | ||||
| 						} for r in [get_useful_recommendation_data(r) for r in recommendations if get_useful_recommendation_data(r)]) | ||||
| 
 | ||||
| 						m_yt_player_config = re.search(r_yt_player_config, line) | ||||
| @ -401,8 +434,49 @@ class Second(object): | ||||
| 		if suffix == ("suggestions",): | ||||
| 			return self.suggestions(q=q) | ||||
| 
 | ||||
| 		if q in self.search_cache: | ||||
| 			return self.search_cache[q] | ||||
| 
 | ||||
| 		try: | ||||
| 			with requests.get("https://www.youtube.com/results", params={"q": q}) as r: | ||||
| 				r.raise_for_status() | ||||
| 				content = r.content.decode("utf8") | ||||
| 				yt_initial_data = extract_yt_initial_data(content) | ||||
| 				items = yt_initial_data["contents"]["twoColumnSearchResultsRenderer"]["primaryContents"]["sectionListRenderer"]["contents"][0]["itemSectionRenderer"]["contents"] | ||||
| 				results = [] | ||||
| 				for item in items: | ||||
| 					if "videoRenderer" in item: | ||||
| 						video = item["videoRenderer"] | ||||
| 						results.append({ | ||||
| 							"type": "video", | ||||
| 							"title": combine_runs(video["title"]), | ||||
| 							"videoId": video["videoId"], | ||||
| 							"author": combine_runs(video["longBylineText"]), | ||||
| 							"authorId": video["longBylineText"]["runs"][0]["navigationEndpoint"]["browseEndpoint"]["browseId"], | ||||
| 							"authorUrl": video["longBylineText"]["runs"][0]["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"], | ||||
| 							"videoThumbnails": [], | ||||
| 							"description": combine_runs(video["descriptionSnippet"]) if "descriptionSnippet" in video else "", | ||||
| 							"descriptionHtml": combine_runs_html(video["descriptionSnippet"]) if "descriptionSnippet" in video else "", | ||||
| 							"viewCount": get_view_count_or_recommended(video), | ||||
| 							"second__viewCountText": get_view_count_text_or_recommended(video), | ||||
| 							"published": None, | ||||
| 							"publishedText": video["publishedTimeText"]["simpleText"], | ||||
| 							"lengthSeconds": get_length_or_live_now(video), | ||||
| 							"second__lengthText": get_length_text_or_live_now(video), | ||||
| 							"liveNow": is_live(video), | ||||
| 							"paid": None, | ||||
| 							"premium": None, | ||||
| 							"isUpcoming": None | ||||
| 						}) | ||||
| 				self.search_cache[q] = results # only cache full extraction | ||||
| 				return results | ||||
| 
 | ||||
| 		except Exception: | ||||
| 			print("messed up extracting search, using youtube-dl instead") | ||||
| 			traceback.print_exc() | ||||
| 
 | ||||
| 			info = ytdl.extract_info("ytsearchall:{}".format(q), download=False) | ||||
| 		return list({ | ||||
| 			return [{ | ||||
| 				"type": "video", | ||||
| 				"title": video["title"], | ||||
| 				"videoId": video["id"], | ||||
| @ -420,7 +494,7 @@ class Second(object): | ||||
| 				"paid": None, | ||||
| 				"premium": None, | ||||
| 				"isUpcoming": None | ||||
| 		} for video in info["entries"] if "title" in video) | ||||
| 			} for video in info["entries"] if "title" in video] | ||||
| 
 | ||||
| 	@cherrypy.expose | ||||
| 	@cherrypy.tools.json_out() | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user