From 07fa3fffd61f38c14d14cf9ddde492911ef50874 Mon Sep 17 00:00:00 2001 From: Eleanor Clifford Date: Thu, 30 Apr 2026 17:59:20 +0100 Subject: [PATCH] Support lockupViewModel in channel extractor --- extractors/channel.py | 88 ++++++++++++++++++++++++++++++++----------- tools/converters.py | 9 +++++ 2 files changed, 75 insertions(+), 22 deletions(-) diff --git a/extractors/channel.py b/extractors/channel.py index a965c67..bbfd9d7 100644 --- a/extractors/channel.py +++ b/extractors/channel.py @@ -12,6 +12,13 @@ channel_cache_lock = Lock() channel_latest_cache = TTLCache(maxsize=500, ttl=300) channel_latest_cache_lock = Lock() + +def maybe_get(d, *args, default=None): + if len(args) == 1: + return d.get(args[0], default) + return maybe_get(d.get(args[0], {}), *args[1:], default=default) + + def extract_channel(ucid, second__path="user"): cache_key = (ucid, second__path) @@ -81,44 +88,81 @@ def extract_channel(ucid, second__path="user"): # with messageRenderer.text.simpleText == "This channel has no videos." if "richGridRenderer" in tab_parts: videos = ( - v["richItemRenderer"]["content"]["videoRenderer"] for v in tab_parts["richGridRenderer"]["contents"] if "richItemRenderer" in v + v["richItemRenderer"]["content"] for v in tab_parts["richGridRenderer"]["contents"] if "richItemRenderer" in v ) for v in videos: live = False is_upcoming = False length_text = "UNKNOWN" length_seconds = -1 - for o in v["thumbnailOverlays"]: - if "thumbnailOverlayTimeStatusRenderer" in o: - length_text = combine_runs(o["thumbnailOverlayTimeStatusRenderer"]["text"]) - length_text_style = o["thumbnailOverlayTimeStatusRenderer"]["style"] - if length_text_style == "DEFAULT": - length_seconds = length_text_to_seconds(length_text) - elif length_text_style == "LIVE": - live = True - elif length_text_style == "UPCOMING": - is_upcoming = True published = 0 published_text = "Live now" premiere_timestamp = None - if "publishedTimeText" in v: - published_text = v["publishedTimeText"]["simpleText"] - published = past_text_to_time(published_text) - if "upcomingEventData" in v: - premiere_timestamp = v["upcomingEventData"]["startTime"] - published_text = time_to_past_text(int(premiere_timestamp)) - view_count_text = combine_runs(v["viewCountText"]) if "viewCountText" in v else None - view_count_text_short = combine_runs(v["shortViewCountText"]) if "shortViewCountText" in v else None + if "videoRenderer" in v: + vr = v["videoRenderer"] + + video_id = vr["videoId"] + title = combine_runs(vr["title"]) + view_count_text = combine_runs(vr["viewCountText"]) if "viewCountText" in vr else None + view_count_text_short = combine_runs(vr["shortViewCountText"]) if "shortViewCountText" in vr else None + + for o in vr["thumbnailOverlays"]: + if "thumbnailOverlayTimeStatusRenderer" in o: + length_text = combine_runs(o["thumbnailOverlayTimeStatusRenderer"]["text"]) + length_text_style = o["thumbnailOverlayTimeStatusRenderer"]["style"] + if length_text_style == "DEFAULT": + length_seconds = length_text_to_seconds(length_text) + elif length_text_style == "LIVE": + live = True + elif length_text_style == "UPCOMING": + is_upcoming = True + if "publishedTimeText" in vr: + published_text = vr["publishedTimeText"]["simpleText"] + published = past_text_to_time(published_text) + if "upcomingEventData" in vr: + premiere_timestamp = vr["upcomingEventData"]["startTime"] + published_text = time_to_past_text(int(premiere_timestamp)) + elif "lockupViewModel" in v: + lv = v["lockupViewModel"] + + video_id = lv.get("contentId") + title = maybe_get(lv, "metadata", "lockupMetadataViewModel", "title", "content") + + for r in maybe_get(lv, "metadata", "lockupMetadataViewModel", "metadata", "contentMetadataViewModel", "metadataRows", default=[]): + for p in r.get("metadataParts", []): + t = maybe_get(p, "text", "content", default="") + if t.startswith("Streamed") or t.endswith("ago"): + published_text = t + published = past_text_to_time(published_text) + elif t.endswith("views"): + view_count_text_short = t + view_count_text = t + + for o in lv["contentImage"]["thumbnailViewModel"]["overlays"]: + if m := o.get("thumbnailBottomOverlayViewModel"): + for b in m["badges"]: + if bvm := b.get("thumbnailBadgeViewModel"): + length_text = bvm["text"] + length_text_style = bvm["badgeStyle"] + if length_text_style == "THUMBNAIL_OVERLAY_BADGE_STYLE_DEFAULT": + length_seconds = length_text_to_seconds(length_text) + elif length_text_style == "THUMBNAIL_OVERLAY_BADGE_STYLE_LIVE": + live = True + elif length_text_style == "THUMBNAIL_OVERLAY_BADGE_STYLE_UPCOMING": + is_upcoming = True + else: + continue + latest_videos.append({ "type": "video", - "title": combine_runs(v["title"]), - "videoId": v["videoId"], + "title": title, + "videoId": video_id, "author": author, "authorId": author_id, "authorUrl": author_url, - "videoThumbnails": generate_video_thumbnails(v["videoId"]), + "videoThumbnails": generate_video_thumbnails(video_id), "description": "", "descriptionHtml": "", "viewCount": view_count_text_to_number(view_count_text), diff --git a/tools/converters.py b/tools/converters.py index a2ee3ed..075c1b2 100644 --- a/tools/converters.py +++ b/tools/converters.py @@ -48,10 +48,19 @@ def view_count_text_to_number(text): if text is None: return 0 + suffixes = { + "K": 1000, + "M": 1000000, + "B": 1000000000, + } + first_word = text.split(" ")[0].replace(",", "") if first_word == "No": return 0 else: + for s in suffixes: + if first_word.endswith(s): + return int(suffixes[s] * float(first_word[:-1])) return int(first_word) def get_view_count_or_recommended(view_count_container):