Write a new channel parser, using RSS for /latest

This commit is contained in:
Cadence Ember 2020-08-13 18:54:51 +12:00
parent 7ae5232424
commit 57fb71b97d
No known key found for this signature in database
GPG Key ID: 128B99B1B74A6412
1 changed files with 140 additions and 46 deletions

186
index.py
View File

@ -3,6 +3,7 @@ import cherrypy
import json import json
import youtube_dl import youtube_dl
import datetime import datetime
import dateutil.parser
import os import os
import re import re
import json import json
@ -63,6 +64,14 @@ def combine_runs_html(runs):
result += part["text"] result += part["text"]
return result return result
def add_html_links(text):
r_link = re.compile(r"""https?://[a-z-]+(?:\.[a-z-]+)+(?:/[^\s,<>)]*)?""") # it's okay, I guess.
match = r_link.search(text)
if match is not None:
link = match.group()
text = text[:match.start()] + '<a href="{}">{}</a>'.format(link, link) + add_html_links(text[match.end():])
return text
def view_count_text_to_number(text): def view_count_text_to_number(text):
return int(text.split(" ")[0].replace(",", "")) return int(text.split(" ")[0].replace(",", ""))
@ -124,11 +133,31 @@ def generate_video_thumbnails(id):
"height": type[3] "height": type[3]
} for type in types] } for type in types]
def normalise_url_protocol(url):
if url.startswith("//"):
url = "https:" + url
return url
def uncompress_counter(text):
last = text[-1:].lower()
if last >= "0" and last <= "9":
return int(last)
else:
multiplier = 1
if last == "k":
multiplier = 1000
elif last == "m":
multiplier = 1000000
elif last == "b":
multiplier = 1000000000
return int(float(text[:-1]) * multiplier)
class Second(object): class Second(object):
def __init__(self): def __init__(self):
self.video_cache = TTLCache(maxsize=50, ttl=300) self.video_cache = TTLCache(maxsize=50, ttl=300)
self.search_cache = TTLCache(maxsize=50, ttl=300) self.search_cache = TTLCache(maxsize=50, ttl=300)
self.search_suggestions_cache = TTLCache(maxsize=200, ttl=60) self.search_suggestions_cache = TTLCache(maxsize=200, ttl=60)
self.channel_cache = TTLCache(maxsize=50, ttl=300)
def _cp_dispatch(self, vpath): def _cp_dispatch(self, vpath):
if vpath[:4] == ["api", "manifest", "dash", "id"]: if vpath[:4] == ["api", "manifest", "dash", "id"]:
@ -404,56 +433,121 @@ class Second(object):
else: else:
[ucid, part] = suffix [ucid, part] = suffix
try: if part == "latest":
info = ytdl.extract_info("https://www.youtube.com/channel/{}".format(ucid), download=False) # use RSS
with requests.get("https://www.youtube.com/feeds/videos.xml?channel_id={}".format(ucid)) as r:
r.raise_for_status()
feed = ET.fromstring(r.content)
author_container = feed.find("{http://www.w3.org/2005/Atom}author")
author = author_container.find("{http://www.w3.org/2005/Atom}name").text
author_url = author_container.find("{http://www.w3.org/2005/Atom}uri").text
channel_id = feed.find("{http://www.youtube.com/xml/schemas/2015}channelId").text
results = []
for entry in feed.findall("{http://www.w3.org/2005/Atom}entry"):
id = entry.find("{http://www.youtube.com/xml/schemas/2015}videoId").text
media_group = entry.find("{http://search.yahoo.com/mrss/}group")
description = media_group.find("{http://search.yahoo.com/mrss/}description").text
media_community = media_group.find("{http://search.yahoo.com/mrss/}community")
results.append({
"type": "video",
"title": entry.find("{http://www.w3.org/2005/Atom}title").text,
"videoId": id,
"author": author,
"authorId": channel_id,
"authorUrl": author_url,
"videoThumbnails": generate_video_thumbnails(id),
"description": description,
"descriptionHtml": add_html_links(escape_html_textcontent(description)),
"viewCount": int(media_community.find("{http://search.yahoo.com/mrss/}statistics").attrib["views"]),
"published": int(dateutil.parser.isoparse(entry.find("{http://www.w3.org/2005/Atom}published").text).timestamp()),
"lengthSeconds": None,
"liveNow": None,
"paid": None,
"premium": None,
"isUpcoming": None
})
return results
response = { else:
"author": info["uploader"], if ucid in self.channel_cache:
"authorId": info["uploader_id"], if part == "":
"authorUrl": info["uploader_url"], return self.channel_cache[ucid]
"authorBanners": [], else: # part == "videos"
"authorThumbnails": [], return self.channel_cache[ucid]["latestVideos"]
"subCount": None,
"totalViews": None, with requests.get("https://www.youtube.com/channel/{}/videos".format(ucid)) as r:
"joined": None, r.raise_for_status()
"paid": None, yt_initial_data = extract_yt_initial_data(r.content.decode("utf8"))
"autoGenerated": None, header = yt_initial_data["header"]["c4TabbedHeaderRenderer"]
"isFamilyFriendly": None, author = header["title"]
"description": None, author_id = header["channelId"]
"descriptionHtml": None, author_url = header["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"]
"allowedRegions": [], author_banners = header["banner"]["thumbnails"]
"latestVideos": list({ for t in author_banners:
"type": "video", t["url"] = normalise_url_protocol(t["url"])
"title": video["title"], author_thumbnails = header["avatar"]["thumbnails"]
"videoId": video["id"], subscriber_count = combine_runs(header["subscriberCountText"])
"author": info["uploader"], description = yt_initial_data["metadata"]["channelMetadataRenderer"]["description"]
"authorId": info["uploader_id"], allowed_regions = yt_initial_data["metadata"]["channelMetadataRenderer"]["availableCountryCodes"]
"authorUrl": info["uploader_url"], tabs = yt_initial_data["contents"]["twoColumnBrowseResultsRenderer"]["tabs"]
"videoThumbnails": generate_video_thumbnails(info["id"]), videos_tab = next(tab["tabRenderer"] for tab in tabs if tab["tabRenderer"]["title"] == "Videos")
"description": None, videos = (
"descriptionHtml": None, v["gridVideoRenderer"] for v in
"viewCount": None, videos_tab["content"]["sectionListRenderer"]["contents"][0]["itemSectionRenderer"]["contents"][0]["gridRenderer"]["items"]
"published": None, )
"publishedText": None, latest_videos = []
"lengthSeconds": None, for v in videos:
"liveNow": None, length_text = next(o for o in v["thumbnailOverlays"] if "thumbnailOverlayTimeStatusRenderer" in o) \
["thumbnailOverlayTimeStatusRenderer"]["text"]["simpleText"]
latest_videos.append({
"type": "video",
"title": v["title"]["simpleText"],
"videoId": v["videoId"],
"author": author,
"authorId": author_id,
"authorUrl": author_url,
"videoThumbnails": generate_video_thumbnails(v["videoId"]),
"description": "",
"descriptionHtml": "",
"viewCount": view_count_text_to_number(v["viewCountText"]["simpleText"]),
"second__viewCountText": v["viewCountText"]["simpleText"],
"second__viewCountTextShort": v["shortViewCountText"]["simpleText"],
"published": 0,
"publishedText": v["publishedTimeText"]["simpleText"],
"lengthSeconds": length_text_to_seconds(length_text),
"second__lengthText": length_text,
"liveNow": None,
"paid": None,
"premium": None,
"isUpcoming": None
})
channel = {
"author": author,
"authorId": author_id,
"authorUrl": author_url,
"authorBanners": author_banners,
"authorThumbnails": author_thumbnails,
"subCount": uncompress_counter(subscriber_count.split(" ")[0]),
"second__subCountText": subscriber_count,
"totalViews": None,
"joined": None,
"paid": None, "paid": None,
"premium": None, "autoGenerated": None,
"isUpcoming": None "isFamilyFriendly": None,
} for video in info["entries"]), "description": description,
"relatedChannels": [] "descriptionHtml": add_html_links(escape_html_textcontent(description)),
} "allowedRegions": allowed_regions,
"latestVideos": latest_videos,
"relatedChannels": []
}
if part == "videos" or part == "latest": self.channel_cache[ucid] = channel
return response["latestVideos"]
else:
return response
except youtube_dl.DownloadError: if part == "":
return { return channel
"error": "This channel does not exist.", else:
"identifier": "CHANNEL_DOES_NOT_EXIST" return latest_videos
}
@cherrypy.expose @cherrypy.expose
@cherrypy.tools.json_out() @cherrypy.tools.json_out()