Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
XianxinMao
Yt Dlp
提交
47193e02
Y
Yt Dlp
项目概览
XianxinMao
/
Yt Dlp
11 个月 前同步成功
通知
27
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
Y
Yt Dlp
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
47193e02
编写于
7月 15, 2021
作者:
C
coletdjnz
提交者:
GitHub
7月 15, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[youtube:tab] Extract playlist availability (#504)
Authored by: colethedj
上级
49bd8c66
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
136 addition
and
88 deletion
+136
-88
yt_dlp/extractor/youtube.py
yt_dlp/extractor/youtube.py
+136
-88
未找到文件。
yt_dlp/extractor/youtube.py
浏览文件 @
47193e02
...
...
@@ -645,6 +645,28 @@ def _report_alerts(self, alerts, expected=True):
def
_extract_and_report_alerts
(
self
,
data
,
*
args
,
**
kwargs
):
return
self
.
_report_alerts
(
self
.
_extract_alerts
(
data
),
*
args
,
**
kwargs
)
def
_extract_badges
(
self
,
renderer
:
dict
):
badges
=
set
()
for
badge
in
try_get
(
renderer
,
lambda
x
:
x
[
'badges'
],
list
)
or
[]:
label
=
try_get
(
badge
,
lambda
x
:
x
[
'metadataBadgeRenderer'
][
'label'
],
compat_str
)
if
label
:
badges
.
add
(
label
.
lower
())
return
badges
@
staticmethod
def
_join_text_entries
(
runs
):
text
=
None
for
run
in
runs
:
if
not
isinstance
(
run
,
dict
):
continue
sub_text
=
try_get
(
run
,
lambda
x
:
x
[
'text'
],
compat_str
)
if
sub_text
:
if
not
text
:
text
=
sub_text
continue
text
+=
sub_text
return
text
def
_extract_response
(
self
,
item_id
,
query
,
note
=
'Downloading API JSON'
,
headers
=
None
,
ytcfg
=
None
,
check_get_keys
=
None
,
ep
=
'browse'
,
fatal
=
True
,
api_hostname
=
None
,
default_client
=
'WEB'
):
...
...
@@ -1971,20 +1993,6 @@ def parse_time_text(time_text):
if
len
(
time_text_split
)
>=
3
:
return
datetime_from_str
(
'now-%s%s'
%
(
time_text_split
[
0
],
time_text_split
[
1
]),
precision
=
'auto'
)
@
staticmethod
def
_join_text_entries
(
runs
):
text
=
None
for
run
in
runs
:
if
not
isinstance
(
run
,
dict
):
continue
sub_text
=
try_get
(
run
,
lambda
x
:
x
[
'text'
],
compat_str
)
if
sub_text
:
if
not
text
:
text
=
sub_text
continue
text
+=
sub_text
return
text
def
_extract_comment
(
self
,
comment_renderer
,
parent
=
None
):
comment_id
=
comment_renderer
.
get
(
'commentId'
)
if
not
comment_id
:
...
...
@@ -2959,21 +2967,20 @@ def chapter_time(mmlir):
if
initial_data
and
is_private
is
not
None
:
is_membersonly
=
False
is_premium
=
False
contents
=
try_get
(
initial_data
,
lambda
x
:
x
[
'contents'
][
'twoColumnWatchNextResults'
][
'results'
][
'results'
][
'contents'
],
list
)
for
content
in
contents
or
[]:
badges
=
try_get
(
content
,
lambda
x
:
x
[
'videoPrimaryInfoRenderer'
][
'badges'
],
list
)
for
badge
in
badges
or
[]
:
label
=
try_get
(
badge
,
lambda
x
:
x
[
'metadataBadgeRenderer'
][
'label'
])
or
''
if
label
.
lower
()
==
'members only'
:
is_membersonly
=
True
break
elif
label
.
lower
()
==
'premium'
:
is_premium
=
True
break
if
is_membersonly
or
is_premium
:
break
contents
=
try_get
(
initial_data
,
lambda
x
:
x
[
'contents'
][
'twoColumnWatchNextResults'
][
'results'
][
'results'
][
'contents'
],
list
)
or
[]
badge_labels
=
set
()
for
content
in
contents
:
if
not
isinstance
(
content
,
dict
)
:
continue
badge_labels
.
update
(
self
.
_extract_badges
(
content
.
get
(
'videoPrimaryInfoRenderer'
)))
for
badge_label
in
badge_labels
:
if
badge_label
.
lower
()
==
'members only'
:
is_membersonly
=
True
elif
badge_label
.
lower
()
==
'premium'
:
is_premium
=
True
elif
badge_label
.
lower
()
==
'unlisted'
:
is_unlisted
=
True
# TODO: Add this for playlists
info
[
'availability'
]
=
self
.
_availability
(
is_private
=
is_private
,
needs_premium
=
is_premium
,
...
...
@@ -3447,6 +3454,17 @@ class YoutubeTabIE(YoutubeBaseInfoExtractor):
'title'
:
'Album - Royalty Free Music Library V2 (50 Songs)'
,
},
'playlist_count'
:
50
,
},
{
'note'
:
'unlisted single video playlist'
,
'url'
:
'https://www.youtube.com/playlist?list=PLwL24UFy54GrB3s2KMMfjZscDi1x5Dajf'
,
'info_dict'
:
{
'uploader_id'
:
'UC9zHu_mHU96r19o-wV5Qs1Q'
,
'uploader'
:
'colethedj'
,
'id'
:
'PLwL24UFy54GrB3s2KMMfjZscDi1x5Dajf'
,
'title'
:
'yt-dlp unlisted playlist test'
,
'availability'
:
'unlisted'
},
'playlist_count'
:
1
,
}]
@
classmethod
...
...
@@ -3768,27 +3786,19 @@ def _extract_selected_tab(tabs):
else
:
raise
ExtractorError
(
'Unable to find selected tab'
)
@
static
method
def
_extract_uploader
(
data
):
@
class
method
def
_extract_uploader
(
cls
,
data
):
uploader
=
{}
sidebar_renderer
=
try_get
(
data
,
lambda
x
:
x
[
'sidebar'
][
'playlistSidebarRenderer'
][
'items'
],
list
)
if
sidebar_renderer
:
for
item
in
sidebar_renderer
:
if
not
isinstance
(
item
,
dict
):
continue
renderer
=
item
.
get
(
'playlistSidebarSecondaryInfoRenderer'
)
if
not
isinstance
(
renderer
,
dict
):
continue
owner
=
try_get
(
renderer
,
lambda
x
:
x
[
'videoOwner'
][
'videoOwnerRenderer'
][
'title'
][
'runs'
][
0
],
dict
)
if
owner
:
uploader
[
'uploader'
]
=
owner
.
get
(
'text'
)
uploader
[
'uploader_id'
]
=
try_get
(
owner
,
lambda
x
:
x
[
'navigationEndpoint'
][
'browseEndpoint'
][
'browseId'
],
compat_str
)
uploader
[
'uploader_url'
]
=
urljoin
(
'https://www.youtube.com/'
,
try_get
(
owner
,
lambda
x
:
x
[
'navigationEndpoint'
][
'browseEndpoint'
][
'canonicalBaseUrl'
],
compat_str
))
renderer
=
cls
.
_extract_sidebar_info_renderer
(
data
,
'playlistSidebarSecondaryInfoRenderer'
)
or
{}
owner
=
try_get
(
renderer
,
lambda
x
:
x
[
'videoOwner'
][
'videoOwnerRenderer'
][
'title'
][
'runs'
][
0
],
dict
)
if
owner
:
uploader
[
'uploader'
]
=
owner
.
get
(
'text'
)
uploader
[
'uploader_id'
]
=
try_get
(
owner
,
lambda
x
:
x
[
'navigationEndpoint'
][
'browseEndpoint'
][
'browseId'
],
compat_str
)
uploader
[
'uploader_url'
]
=
urljoin
(
'https://www.youtube.com/'
,
try_get
(
owner
,
lambda
x
:
x
[
'navigationEndpoint'
][
'browseEndpoint'
][
'canonicalBaseUrl'
],
compat_str
))
return
{
k
:
v
for
k
,
v
in
uploader
.
items
()
if
v
is
not
None
}
def
_extract_from_tabs
(
self
,
item_id
,
webpage
,
data
,
tabs
):
...
...
@@ -3814,8 +3824,8 @@ def _extract_from_tabs(self, item_id, webpage, data, tabs):
thumbnails_list
=
(
try_get
(
renderer
,
lambda
x
:
x
[
'avatar'
][
'thumbnails'
],
list
)
or
try_get
(
data
,
lambda
x
:
x
[
'
sidebar'
][
'playlistSidebarRenderer'
][
'items'
][
0
][
'playlistSidebarPrimaryInfoRenderer'
][
'
thumbnailRenderer'
][
'playlistVideoThumbnailRenderer'
][
'thumbnail'
][
'thumbnails'
],
self
.
_extract_sidebar_info_renderer
(
data
,
'playlistSidebarPrimaryInfoRenderer'
)
,
lambda
x
:
x
[
'thumbnailRenderer'
][
'playlistVideoThumbnailRenderer'
][
'thumbnail'
][
'thumbnails'
],
list
)
or
[])
...
...
@@ -3839,7 +3849,6 @@ def _extract_from_tabs(self, item_id, webpage, data, tabs):
or
playlist_id
)
title
+=
format_field
(
selected_tab
,
'title'
,
' - %s'
)
title
+=
format_field
(
selected_tab
,
'expandedText'
,
' - %s'
)
metadata
=
{
'playlist_id'
:
playlist_id
,
'playlist_title'
:
title
,
...
...
@@ -3850,6 +3859,9 @@ def _extract_from_tabs(self, item_id, webpage, data, tabs):
'thumbnails'
:
thumbnails
,
'tags'
:
tags
,
}
availability
=
self
.
_extract_availability
(
data
)
if
availability
:
metadata
[
'availability'
]
=
availability
if
not
channel_id
:
metadata
.
update
(
self
.
_extract_uploader
(
data
))
metadata
.
update
({
...
...
@@ -3921,49 +3933,86 @@ def _extract_from_playlist(self, item_id, url, data, playlist, webpage):
self
.
_extract_mix_playlist
(
playlist
,
playlist_id
,
data
,
webpage
),
playlist_id
=
playlist_id
,
playlist_title
=
title
)
def
_extract_availability
(
self
,
data
):
"""
Gets the availability of a given playlist/tab.
Note: Unless YouTube tells us explicitly, we do not assume it is public
@param data: response
"""
is_private
=
is_unlisted
=
None
renderer
=
self
.
_extract_sidebar_info_renderer
(
data
,
'playlistSidebarPrimaryInfoRenderer'
)
or
{}
badge_labels
=
self
.
_extract_badges
(
renderer
)
# Personal playlists, when authenticated, have a dropdown visibility selector instead of a badge
privacy_dropdown_entries
=
try_get
(
renderer
,
lambda
x
:
x
[
'privacyForm'
][
'dropdownFormFieldRenderer'
][
'dropdown'
][
'dropdownRenderer'
][
'entries'
],
list
)
or
[]
for
renderer_dict
in
privacy_dropdown_entries
:
is_selected
=
try_get
(
renderer_dict
,
lambda
x
:
x
[
'privacyDropdownItemRenderer'
][
'isSelected'
],
bool
)
or
False
if
not
is_selected
:
continue
label
=
self
.
_join_text_entries
(
try_get
(
renderer_dict
,
lambda
x
:
x
[
'privacyDropdownItemRenderer'
][
'label'
][
'runs'
],
list
)
or
[])
if
label
:
badge_labels
.
add
(
label
.
lower
())
break
for
badge_label
in
badge_labels
:
if
badge_label
==
'unlisted'
:
is_unlisted
=
True
elif
badge_label
==
'private'
:
is_private
=
True
elif
badge_label
==
'public'
:
is_unlisted
=
is_private
=
False
return
self
.
_availability
(
is_private
,
False
,
False
,
False
,
is_unlisted
)
@
staticmethod
def
_extract_sidebar_info_renderer
(
data
,
info_renderer
,
expected_type
=
dict
):
sidebar_renderer
=
try_get
(
data
,
lambda
x
:
x
[
'sidebar'
][
'playlistSidebarRenderer'
][
'items'
],
list
)
or
[]
for
item
in
sidebar_renderer
:
renderer
=
try_get
(
item
,
lambda
x
:
x
[
info_renderer
],
expected_type
)
if
renderer
:
return
renderer
def
_reload_with_unavailable_videos
(
self
,
item_id
,
data
,
webpage
):
"""
Get playlist with unavailable videos if the 'show unavailable videos' button exists.
"""
sidebar_renderer
=
try_get
(
data
,
lambda
x
:
x
[
'sidebar'
][
'playlistSidebarRenderer'
][
'items'
],
list
)
if
not
sidebar_renderer
:
return
browse_id
=
params
=
None
for
item
in
sidebar_renderer
:
if
not
isinstance
(
item
,
dict
):
renderer
=
self
.
_extract_sidebar_info_renderer
(
data
,
'playlistSidebarPrimaryInfoRenderer'
)
if
not
renderer
:
return
menu_renderer
=
try_get
(
renderer
,
lambda
x
:
x
[
'menu'
][
'menuRenderer'
][
'items'
],
list
)
or
[]
for
menu_item
in
menu_renderer
:
if
not
isinstance
(
menu_item
,
dict
):
continue
renderer
=
item
.
get
(
'playlistSidebarPrimaryInfoRenderer'
)
menu_renderer
=
try_get
(
renderer
,
lambda
x
:
x
[
'menu'
][
'menuRenderer'
][
'items'
],
list
)
or
[]
for
menu_item
in
menu_renderer
:
if
not
isinstance
(
menu_item
,
dict
):
continue
nav_item_renderer
=
menu_item
.
get
(
'menuNavigationItemRenderer'
)
text
=
try_get
(
nav_item_renderer
,
lambda
x
:
x
[
'text'
][
'simpleText'
],
compat_str
)
if
not
text
or
text
.
lower
()
!=
'show unavailable videos'
:
continue
browse_endpoint
=
try_get
(
nav_item_renderer
,
lambda
x
:
x
[
'navigationEndpoint'
][
'browseEndpoint'
],
dict
)
or
{}
browse_id
=
browse_endpoint
.
get
(
'browseId'
)
params
=
browse_endpoint
.
get
(
'params'
)
break
nav_item_renderer
=
menu_item
.
get
(
'menuNavigationItemRenderer'
)
text
=
try_get
(
nav_item_renderer
,
lambda
x
:
x
[
'text'
][
'simpleText'
],
compat_str
)
if
not
text
or
text
.
lower
()
!=
'show unavailable videos'
:
continue
browse_endpoint
=
try_get
(
nav_item_renderer
,
lambda
x
:
x
[
'navigationEndpoint'
][
'browseEndpoint'
],
dict
)
or
{}
browse_id
=
browse_endpoint
.
get
(
'browseId'
)
params
=
browse_endpoint
.
get
(
'params'
)
break
ytcfg
=
self
.
_extract_ytcfg
(
item_id
,
webpage
)
headers
=
self
.
_generate_api_headers
(
ytcfg
,
account_syncid
=
self
.
_extract_account_syncid
(
ytcfg
),
identity_token
=
self
.
_extract_identity_token
(
webpage
,
item_id
=
item_id
),
visitor_data
=
try_get
(
self
.
_extract_context
(
ytcfg
),
lambda
x
:
x
[
'client'
][
'visitorData'
],
compat_str
))
query
=
{
'params'
:
params
or
'wgYCCAA='
,
'browseId'
:
browse_id
or
'VL%s'
%
item_id
}
return
self
.
_extract_response
(
item_id
=
item_id
,
headers
=
headers
,
query
=
query
,
check_get_keys
=
'contents'
,
fatal
=
False
,
note
=
'Downloading API JSON with unavailable videos'
)
ytcfg
=
self
.
_extract_ytcfg
(
item_id
,
webpage
)
headers
=
self
.
_generate_api_headers
(
ytcfg
,
account_syncid
=
self
.
_extract_account_syncid
(
ytcfg
),
identity_token
=
self
.
_extract_identity_token
(
webpage
,
item_id
=
item_id
),
visitor_data
=
try_get
(
self
.
_extract_context
(
ytcfg
),
lambda
x
:
x
[
'client'
][
'visitorData'
],
compat_str
))
query
=
{
'params'
:
params
or
'wgYCCAA='
,
'browseId'
:
browse_id
or
'VL%s'
%
item_id
}
return
self
.
_extract_response
(
item_id
=
item_id
,
headers
=
headers
,
query
=
query
,
check_get_keys
=
'contents'
,
fatal
=
False
,
note
=
'Downloading API JSON with unavailable videos'
)
def
_extract_webpage
(
self
,
url
,
item_id
):
retries
=
self
.
get_param
(
'extractor_retries'
,
3
)
...
...
@@ -4100,7 +4149,6 @@ def get_mobj(url):
if
'no-youtube-unavailable-videos'
not
in
compat_opts
:
data
=
self
.
_reload_with_unavailable_videos
(
item_id
,
data
,
webpage
)
or
data
self
.
_extract_and_report_alerts
(
data
)
tabs
=
try_get
(
data
,
lambda
x
:
x
[
'contents'
][
'twoColumnBrowseResultsRenderer'
][
'tabs'
],
list
)
if
tabs
:
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录