mirror of
https://git.sr.ht/~cadence/NewLeaf
synced 2026-05-04 16:41:37 +00:00
Support lockupViewModel in channel extractor
This commit is contained in:
parent
fde9f3272a
commit
07fa3fffd6
2 changed files with 75 additions and 22 deletions
|
|
@ -12,6 +12,13 @@ channel_cache_lock = Lock()
|
|||
channel_latest_cache = TTLCache(maxsize=500, ttl=300)
|
||||
channel_latest_cache_lock = Lock()
|
||||
|
||||
|
||||
def maybe_get(d, *args, default=None):
|
||||
if len(args) == 1:
|
||||
return d.get(args[0], default)
|
||||
return maybe_get(d.get(args[0], {}), *args[1:], default=default)
|
||||
|
||||
|
||||
def extract_channel(ucid, second__path="user"):
|
||||
cache_key = (ucid, second__path)
|
||||
|
||||
|
|
@ -81,14 +88,26 @@ def extract_channel(ucid, second__path="user"):
|
|||
# with messageRenderer.text.simpleText == "This channel has no videos."
|
||||
if "richGridRenderer" in tab_parts:
|
||||
videos = (
|
||||
v["richItemRenderer"]["content"]["videoRenderer"] for v in tab_parts["richGridRenderer"]["contents"] if "richItemRenderer" in v
|
||||
v["richItemRenderer"]["content"] for v in tab_parts["richGridRenderer"]["contents"] if "richItemRenderer" in v
|
||||
)
|
||||
for v in videos:
|
||||
live = False
|
||||
is_upcoming = False
|
||||
length_text = "UNKNOWN"
|
||||
length_seconds = -1
|
||||
for o in v["thumbnailOverlays"]:
|
||||
published = 0
|
||||
published_text = "Live now"
|
||||
premiere_timestamp = None
|
||||
|
||||
if "videoRenderer" in v:
|
||||
vr = v["videoRenderer"]
|
||||
|
||||
video_id = vr["videoId"]
|
||||
title = combine_runs(vr["title"])
|
||||
view_count_text = combine_runs(vr["viewCountText"]) if "viewCountText" in vr else None
|
||||
view_count_text_short = combine_runs(vr["shortViewCountText"]) if "shortViewCountText" in vr else None
|
||||
|
||||
for o in vr["thumbnailOverlays"]:
|
||||
if "thumbnailOverlayTimeStatusRenderer" in o:
|
||||
length_text = combine_runs(o["thumbnailOverlayTimeStatusRenderer"]["text"])
|
||||
length_text_style = o["thumbnailOverlayTimeStatusRenderer"]["style"]
|
||||
|
|
@ -98,27 +117,52 @@ def extract_channel(ucid, second__path="user"):
|
|||
live = True
|
||||
elif length_text_style == "UPCOMING":
|
||||
is_upcoming = True
|
||||
published = 0
|
||||
published_text = "Live now"
|
||||
premiere_timestamp = None
|
||||
if "publishedTimeText" in v:
|
||||
published_text = v["publishedTimeText"]["simpleText"]
|
||||
if "publishedTimeText" in vr:
|
||||
published_text = vr["publishedTimeText"]["simpleText"]
|
||||
published = past_text_to_time(published_text)
|
||||
if "upcomingEventData" in v:
|
||||
premiere_timestamp = v["upcomingEventData"]["startTime"]
|
||||
if "upcomingEventData" in vr:
|
||||
premiere_timestamp = vr["upcomingEventData"]["startTime"]
|
||||
published_text = time_to_past_text(int(premiere_timestamp))
|
||||
elif "lockupViewModel" in v:
|
||||
lv = v["lockupViewModel"]
|
||||
|
||||
video_id = lv.get("contentId")
|
||||
title = maybe_get(lv, "metadata", "lockupMetadataViewModel", "title", "content")
|
||||
|
||||
for r in maybe_get(lv, "metadata", "lockupMetadataViewModel", "metadata", "contentMetadataViewModel", "metadataRows", default=[]):
|
||||
for p in r.get("metadataParts", []):
|
||||
t = maybe_get(p, "text", "content", default="")
|
||||
if t.startswith("Streamed") or t.endswith("ago"):
|
||||
published_text = t
|
||||
published = past_text_to_time(published_text)
|
||||
elif t.endswith("views"):
|
||||
view_count_text_short = t
|
||||
view_count_text = t
|
||||
|
||||
for o in lv["contentImage"]["thumbnailViewModel"]["overlays"]:
|
||||
if m := o.get("thumbnailBottomOverlayViewModel"):
|
||||
for b in m["badges"]:
|
||||
if bvm := b.get("thumbnailBadgeViewModel"):
|
||||
length_text = bvm["text"]
|
||||
length_text_style = bvm["badgeStyle"]
|
||||
if length_text_style == "THUMBNAIL_OVERLAY_BADGE_STYLE_DEFAULT":
|
||||
length_seconds = length_text_to_seconds(length_text)
|
||||
elif length_text_style == "THUMBNAIL_OVERLAY_BADGE_STYLE_LIVE":
|
||||
live = True
|
||||
elif length_text_style == "THUMBNAIL_OVERLAY_BADGE_STYLE_UPCOMING":
|
||||
is_upcoming = True
|
||||
else:
|
||||
continue
|
||||
|
||||
view_count_text = combine_runs(v["viewCountText"]) if "viewCountText" in v else None
|
||||
view_count_text_short = combine_runs(v["shortViewCountText"]) if "shortViewCountText" in v else None
|
||||
|
||||
latest_videos.append({
|
||||
"type": "video",
|
||||
"title": combine_runs(v["title"]),
|
||||
"videoId": v["videoId"],
|
||||
"title": title,
|
||||
"videoId": video_id,
|
||||
"author": author,
|
||||
"authorId": author_id,
|
||||
"authorUrl": author_url,
|
||||
"videoThumbnails": generate_video_thumbnails(v["videoId"]),
|
||||
"videoThumbnails": generate_video_thumbnails(video_id),
|
||||
"description": "",
|
||||
"descriptionHtml": "",
|
||||
"viewCount": view_count_text_to_number(view_count_text),
|
||||
|
|
|
|||
|
|
@ -48,10 +48,19 @@ def view_count_text_to_number(text):
|
|||
if text is None:
|
||||
return 0
|
||||
|
||||
suffixes = {
|
||||
"K": 1000,
|
||||
"M": 1000000,
|
||||
"B": 1000000000,
|
||||
}
|
||||
|
||||
first_word = text.split(" ")[0].replace(",", "")
|
||||
if first_word == "No":
|
||||
return 0
|
||||
else:
|
||||
for s in suffixes:
|
||||
if first_word.endswith(s):
|
||||
return int(suffixes[s] * float(first_word[:-1]))
|
||||
return int(first_word)
|
||||
|
||||
def get_view_count_or_recommended(view_count_container):
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue