Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
无聊人士张
you-get
提交
5c9ec6c4
Y
you-get
项目概览
无聊人士张
/
you-get
与 Fork 源项目一致
从无法访问的项目Fork
通知
2
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
Y
you-get
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
5c9ec6c4
编写于
10月 04, 2020
作者:
J
johnsmith2077
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add format selection for AcFun
上级
00e2ce3f
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
203 addition
and
158 deletion
+203
-158
src/you_get/extractors/acfun.py
src/you_get/extractors/acfun.py
+203
-158
未找到文件。
src/you_get/extractors/acfun.py
浏览文件 @
5c9ec6c4
#!/usr/bin/env python
__all__
=
[
'acfun_download'
]
from
..common
import
*
from
..extractor
import
VideoExtractor
class
AcFun
(
VideoExtractor
):
name
=
"AcFun"
stream_types
=
[
{
'id'
:
'2160P'
,
'qualityType'
:
'2160p'
},
{
'id'
:
'1080P60'
,
'qualityType'
:
'1080p60'
},
{
'id'
:
'720P60'
,
'qualityType'
:
'720p60'
},
{
'id'
:
'1080P+'
,
'qualityType'
:
'1080p+'
},
{
'id'
:
'1080P'
,
'qualityType'
:
'1080p'
},
{
'id'
:
'720P'
,
'qualityType'
:
'720p'
},
{
'id'
:
'540P'
,
'qualityType'
:
'540p'
},
{
'id'
:
'360P'
,
'qualityType'
:
'360p'
}
]
def
prepare
(
self
,
**
kwargs
):
assert
re
.
match
(
r
'https?://[^\.]*\.*acfun\.[^\.]+/(\D|bangumi)/\D\D(\d+)'
,
self
.
url
)
if
re
.
match
(
r
'https?://[^\.]*\.*acfun\.[^\.]+/\D/\D\D(\d+)'
,
self
.
url
):
html
=
get_content
(
self
.
url
,
headers
=
fake_headers
)
json_text
=
match1
(
html
,
r
"(?s)videoInfo\s*=\s*(\{.*?\});"
)
json_data
=
json
.
loads
(
json_text
)
vid
=
json_data
.
get
(
'currentVideoInfo'
).
get
(
'id'
)
up
=
json_data
.
get
(
'user'
).
get
(
'name'
)
self
.
title
=
json_data
.
get
(
'title'
)
video_list
=
json_data
.
get
(
'videoList'
)
if
len
(
video_list
)
>
1
:
self
.
title
+=
" - "
+
[
p
.
get
(
'title'
)
for
p
in
video_list
if
p
.
get
(
'id'
)
==
vid
][
0
]
currentVideoInfo
=
json_data
.
get
(
'currentVideoInfo'
)
elif
re
.
match
(
"https?://[^\.]*\.*acfun\.[^\.]+/bangumi/aa(\d+)"
,
self
.
url
):
html
=
get_content
(
self
.
url
,
headers
=
fake_headers
)
tag_script
=
match1
(
html
,
r
'<script>\s*window\.pageInfo([^<]+)</script>'
)
json_text
=
tag_script
[
tag_script
.
find
(
'{'
)
:
tag_script
.
find
(
'};'
)
+
1
]
json_data
=
json
.
loads
(
json_text
)
self
.
title
=
json_data
[
'bangumiTitle'
]
+
" "
+
json_data
[
'episodeName'
]
+
" "
+
json_data
[
'title'
]
vid
=
str
(
json_data
[
'videoId'
])
up
=
"acfun"
currentVideoInfo
=
json_data
.
get
(
'currentVideoInfo'
)
from
.le
import
letvcloud_download_by_vu
from
.qq
import
qq_download_by_vid
from
.sina
import
sina_download_by_vid
from
.tudou
import
tudou_download_by_iid
from
.youku
import
youku_download_by_vid
import
json
import
re
import
base64
import
time
def
get_srt_json
(
id
):
url
=
'http://danmu.aixifan.com/V2/%s'
%
id
return
get_content
(
url
)
def
youku_acfun_proxy
(
vid
,
sign
,
ref
):
endpoint
=
'http://player.acfun.cn/flash_data?vid={}&ct=85&ev=3&sign={}&time={}'
url
=
endpoint
.
format
(
vid
,
sign
,
str
(
int
(
time
.
time
()
*
1000
)))
json_data
=
json
.
loads
(
get_content
(
url
,
headers
=
dict
(
referer
=
ref
)))[
'data'
]
enc_text
=
base64
.
b64decode
(
json_data
)
dec_text
=
rc4
(
b
'8bdc7e1a'
,
enc_text
).
decode
(
'utf8'
)
youku_json
=
json
.
loads
(
dec_text
)
yk_streams
=
{}
for
stream
in
youku_json
[
'stream'
]:
tp
=
stream
[
'stream_type'
]
yk_streams
[
tp
]
=
[],
stream
[
'total_size'
]
if
stream
.
get
(
'segs'
):
for
seg
in
stream
[
'segs'
]:
yk_streams
[
tp
][
0
].
append
(
seg
[
'url'
])
else
:
yk_streams
[
tp
]
=
stream
[
'm3u8'
],
stream
[
'total_size'
]
return
yk_streams
def
acfun_download_by_vid
(
vid
,
title
,
output_dir
=
'.'
,
merge
=
True
,
info_only
=
False
,
**
kwargs
):
"""str, str, str, bool, bool ->None
Download Acfun video by vid.
Call Acfun API, decide which site to use, and pass the job to its
extractor.
"""
#first call the main parasing API
info
=
json
.
loads
(
get_content
(
'http://www.acfun.cn/video/getVideo.aspx?id='
+
vid
,
headers
=
fake_headers
))
sourceType
=
info
[
'sourceType'
]
#decide sourceId to know which extractor to use
if
'sourceId'
in
info
:
sourceId
=
info
[
'sourceId'
]
# danmakuId = info['danmakuId']
#call extractor decided by sourceId
if
sourceType
==
'sina'
:
sina_download_by_vid
(
sourceId
,
title
,
output_dir
=
output_dir
,
merge
=
merge
,
info_only
=
info_only
)
elif
sourceType
==
'youku'
:
youku_download_by_vid
(
sourceId
,
title
=
title
,
output_dir
=
output_dir
,
merge
=
merge
,
info_only
=
info_only
,
**
kwargs
)
elif
sourceType
==
'tudou'
:
tudou_download_by_iid
(
sourceId
,
title
,
output_dir
=
output_dir
,
merge
=
merge
,
info_only
=
info_only
)
elif
sourceType
==
'qq'
:
qq_download_by_vid
(
sourceId
,
title
,
True
,
output_dir
=
output_dir
,
merge
=
merge
,
info_only
=
info_only
)
elif
sourceType
==
'letv'
:
letvcloud_download_by_vu
(
sourceId
,
'2d8c027396'
,
title
,
output_dir
=
output_dir
,
merge
=
merge
,
info_only
=
info_only
)
elif
sourceType
==
'zhuzhan'
:
#As in Jul.28.2016, Acfun is using embsig to anti hotlink so we need to pass this
#In Mar. 2017 there is a dedicated ``acfun_proxy'' in youku cloud player
#old code removed
url
=
'http://www.acfun.cn/v/ac'
+
vid
yk_streams
=
youku_acfun_proxy
(
info
[
'sourceId'
],
info
[
'encode'
],
url
)
seq
=
[
'mp4hd3'
,
'mp4hd2'
,
'mp4hd'
,
'flvhd'
]
for
t
in
seq
:
if
yk_streams
.
get
(
t
):
preferred
=
yk_streams
[
t
]
break
#total_size in the json could be incorrect(F.I. 0)
size
=
0
for
url
in
preferred
[
0
]:
_
,
_
,
seg_size
=
url_info
(
url
)
size
+=
seg_size
#fallback to flvhd is not quite possible
if
re
.
search
(
r
'fid=[0-9A-Z\-]*.flv'
,
preferred
[
0
][
0
]):
ext
=
'flv'
else
:
ext
=
'mp4'
print_info
(
site_info
,
title
,
ext
,
size
)
if
not
info_only
:
download_urls
(
preferred
[
0
],
title
,
ext
,
size
,
output_dir
=
output_dir
,
merge
=
merge
)
else
:
raise
NotImplementedError
(
sourceType
)
if
not
info_only
and
not
dry_run
:
if
not
kwargs
[
'caption'
]:
print
(
'Skipping danmaku.'
)
return
try
:
title
=
get_filename
(
title
)
print
(
'Downloading %s ...
\n
'
%
(
title
+
'.cmt.json'
))
cmt
=
get_srt_json
(
vid
)
with
open
(
os
.
path
.
join
(
output_dir
,
title
+
'.cmt.json'
),
'w'
,
encoding
=
'utf-8'
)
as
x
:
x
.
write
(
cmt
)
except
:
pass
def
acfun_download
(
url
,
output_dir
=
'.'
,
merge
=
True
,
info_only
=
False
,
**
kwargs
):
assert
re
.
match
(
r
'https?://[^\.]*\.*acfun\.[^\.]+/(\D|bangumi)/\D\D(\d+)'
,
url
)
def
getM3u8UrlFromCurrentVideoInfo
(
currentVideoInfo
):
if
'playInfos'
in
currentVideoInfo
:
return
currentVideoInfo
[
'playInfos'
][
0
][
'playUrls'
][
0
]
elif
'ksPlayJson'
in
currentVideoInfo
:
ksPlayJson
=
json
.
loads
(
currentVideoInfo
[
'ksPlayJson'
]
)
raise
NotImplemented
if
'ksPlayJson'
in
currentVideoInfo
:
durationMillis
=
currentVideoInfo
[
'durationMillis'
]
ksPlayJson
=
ksPlayJson
=
json
.
loads
(
currentVideoInfo
[
'ksPlayJson'
]
)
representation
=
ksPlayJson
.
get
(
'adaptationSet'
)[
0
].
get
(
'representation'
)
reps
=
[]
for
one
in
representation
:
reps
.
append
(
(
one
[
'width'
]
*
one
[
'height'
],
one
[
'url'
],
one
[
'backupUrl'
])
)
return
max
(
reps
)[
1
]
if
re
.
match
(
r
'https?://[^\.]*\.*acfun\.[^\.]+/\D/\D\D(\d+)'
,
url
):
html
=
get_content
(
url
,
headers
=
fake_headers
)
json_text
=
match1
(
html
,
r
"(?s)videoInfo\s*=\s*(\{.*?\});"
)
json_data
=
json
.
loads
(
json_text
)
vid
=
json_data
.
get
(
'currentVideoInfo'
).
get
(
'id'
)
up
=
json_data
.
get
(
'user'
).
get
(
'name'
)
title
=
json_data
.
get
(
'title'
)
video_list
=
json_data
.
get
(
'videoList'
)
if
len
(
video_list
)
>
1
:
title
+=
" - "
+
[
p
.
get
(
'title'
)
for
p
in
video_list
if
p
.
get
(
'id'
)
==
vid
][
0
]
currentVideoInfo
=
json_data
.
get
(
'currentVideoInfo'
)
m3u8_url
=
getM3u8UrlFromCurrentVideoInfo
(
currentVideoInfo
)
elif
re
.
match
(
"https?://[^\.]*\.*acfun\.[^\.]+/bangumi/aa(\d+)"
,
url
):
html
=
get_content
(
url
,
headers
=
fake_headers
)
tag_script
=
match1
(
html
,
r
'<script>\s*window\.pageInfo([^<]+)</script>'
)
json_text
=
tag_script
[
tag_script
.
find
(
'{'
)
:
tag_script
.
find
(
'};'
)
+
1
]
json_data
=
json
.
loads
(
json_text
)
title
=
json_data
[
'bangumiTitle'
]
+
" "
+
json_data
[
'episodeName'
]
+
" "
+
json_data
[
'title'
]
vid
=
str
(
json_data
[
'videoId'
])
up
=
"acfun"
currentVideoInfo
=
json_data
.
get
(
'currentVideoInfo'
)
m3u8_url
=
getM3u8UrlFromCurrentVideoInfo
(
currentVideoInfo
)
else
:
raise
NotImplemented
assert
title
and
m3u8_url
title
=
unescape_html
(
title
)
title
=
escape_file_path
(
title
)
p_title
=
r1
(
'active">([^<]+)'
,
html
)
title
=
'%s (%s)'
%
(
title
,
up
)
if
p_title
:
title
=
'%s - %s'
%
(
title
,
p_title
)
print_info
(
site_info
,
title
,
'm3u8'
,
float
(
'inf'
))
if
not
info_only
:
download_url_ffmpeg
(
m3u8_url
,
title
,
'mp4'
,
output_dir
=
output_dir
,
merge
=
merge
)
stream_list
=
representation
for
stream
in
stream_list
:
m3u8_url
=
stream
[
"url"
]
size
=
durationMillis
*
stream
[
"avgBitrate"
]
/
8
# size = float('inf')
container
=
'mp4'
stream_id
=
stream
[
"qualityLabel"
]
quality
=
stream
[
"qualityType"
]
stream_data
=
dict
(
src
=
m3u8_url
,
size
=
size
,
container
=
container
,
quality
=
quality
)
self
.
streams
[
stream_id
]
=
stream_data
assert
self
.
title
and
m3u8_url
self
.
title
=
unescape_html
(
self
.
title
)
self
.
title
=
escape_file_path
(
self
.
title
)
p_title
=
r1
(
'active">([^<]+)'
,
html
)
self
.
title
=
'%s (%s)'
%
(
self
.
title
,
up
)
if
p_title
:
self
.
title
=
'%s - %s'
%
(
self
.
title
,
p_title
)
def
download
(
self
,
**
kwargs
):
if
'json_output'
in
kwargs
and
kwargs
[
'json_output'
]:
json_output
.
output
(
self
)
elif
'info_only'
in
kwargs
and
kwargs
[
'info_only'
]:
if
'stream_id'
in
kwargs
and
kwargs
[
'stream_id'
]:
# Display the stream
stream_id
=
kwargs
[
'stream_id'
]
if
'index'
not
in
kwargs
:
self
.
p
(
stream_id
)
else
:
self
.
p_i
(
stream_id
)
else
:
# Display all available streams
if
'index'
not
in
kwargs
:
self
.
p
([])
else
:
stream_id
=
self
.
streams_sorted
[
0
][
'id'
]
if
'id'
in
self
.
streams_sorted
[
0
]
else
self
.
streams_sorted
[
0
][
'itag'
]
self
.
p_i
(
stream_id
)
else
:
if
'stream_id'
in
kwargs
and
kwargs
[
'stream_id'
]:
# Download the stream
stream_id
=
kwargs
[
'stream_id'
]
else
:
stream_id
=
self
.
streams_sorted
[
0
][
'id'
]
if
'id'
in
self
.
streams_sorted
[
0
]
else
self
.
streams_sorted
[
0
][
'itag'
]
if
'index'
not
in
kwargs
:
self
.
p
(
stream_id
)
else
:
self
.
p_i
(
stream_id
)
if
stream_id
in
self
.
streams
:
url
=
self
.
streams
[
stream_id
][
'src'
]
ext
=
self
.
streams
[
stream_id
][
'container'
]
total_size
=
self
.
streams
[
stream_id
][
'size'
]
if
ext
==
'm3u8'
or
ext
==
'm4a'
:
ext
=
'mp4'
if
not
url
:
log
.
wtf
(
'[Failed] Cannot extract video source.'
)
# For legacy main()
headers
=
{}
if
self
.
ua
is
not
None
:
headers
[
'User-Agent'
]
=
self
.
ua
if
self
.
referer
is
not
None
:
headers
[
'Referer'
]
=
self
.
referer
download_url_ffmpeg
(
url
,
self
.
title
,
ext
,
output_dir
=
kwargs
[
'output_dir'
],
merge
=
kwargs
[
'merge'
])
if
'caption'
not
in
kwargs
or
not
kwargs
[
'caption'
]:
print
(
'Skipping captions or danmaku.'
)
return
for
lang
in
self
.
caption_tracks
:
filename
=
'%s.%s.srt'
%
(
get_filename
(
self
.
title
),
lang
)
print
(
'Saving %s ... '
%
filename
,
end
=
""
,
flush
=
True
)
srt
=
self
.
caption_tracks
[
lang
]
with
open
(
os
.
path
.
join
(
kwargs
[
'output_dir'
],
filename
),
'w'
,
encoding
=
'utf-8'
)
as
x
:
x
.
write
(
srt
)
print
(
'Done.'
)
if
self
.
danmaku
is
not
None
and
not
dry_run
:
filename
=
'{}.cmt.xml'
.
format
(
get_filename
(
self
.
title
))
print
(
'Downloading {} ...
\n
'
.
format
(
filename
))
with
open
(
os
.
path
.
join
(
kwargs
[
'output_dir'
],
filename
),
'w'
,
encoding
=
'utf8'
)
as
fp
:
fp
.
write
(
self
.
danmaku
)
if
self
.
lyrics
is
not
None
and
not
dry_run
:
filename
=
'{}.lrc'
.
format
(
get_filename
(
self
.
title
))
print
(
'Downloading {} ...
\n
'
.
format
(
filename
))
with
open
(
os
.
path
.
join
(
kwargs
[
'output_dir'
],
filename
),
'w'
,
encoding
=
'utf8'
)
as
fp
:
fp
.
write
(
self
.
lyrics
)
# For main_dev()
#download_urls(urls, self.title, self.streams[stream_id]['container'], self.streams[stream_id]['size'])
keep_obj
=
kwargs
.
get
(
'keep_obj'
,
False
)
if
not
keep_obj
:
self
.
__init__
()
def
acfun_download
(
self
,
url
,
output_dir
=
'.'
,
merge
=
True
,
info_only
=
False
,
**
kwargs
):
assert
re
.
match
(
r
'https?://[^\.]*\.*acfun\.[^\.]+/(\D|bangumi)/\D\D(\d+)'
,
url
)
def
getM3u8UrlFromCurrentVideoInfo
(
currentVideoInfo
):
if
'playInfos'
in
currentVideoInfo
:
return
currentVideoInfo
[
'playInfos'
][
0
][
'playUrls'
][
0
]
elif
'ksPlayJson'
in
currentVideoInfo
:
ksPlayJson
=
json
.
loads
(
currentVideoInfo
[
'ksPlayJson'
]
)
representation
=
ksPlayJson
.
get
(
'adaptationSet'
)[
0
].
get
(
'representation'
)
reps
=
[]
for
one
in
representation
:
reps
.
append
(
(
one
[
'width'
]
*
one
[
'height'
],
one
[
'url'
],
one
[
'backupUrl'
])
)
return
max
(
reps
)[
1
]
if
re
.
match
(
r
'https?://[^\.]*\.*acfun\.[^\.]+/\D/\D\D(\d+)'
,
url
):
html
=
get_content
(
url
,
headers
=
fake_headers
)
json_text
=
match1
(
html
,
r
"(?s)videoInfo\s*=\s*(\{.*?\});"
)
json_data
=
json
.
loads
(
json_text
)
vid
=
json_data
.
get
(
'currentVideoInfo'
).
get
(
'id'
)
up
=
json_data
.
get
(
'user'
).
get
(
'name'
)
title
=
json_data
.
get
(
'title'
)
video_list
=
json_data
.
get
(
'videoList'
)
if
len
(
video_list
)
>
1
:
title
+=
" - "
+
[
p
.
get
(
'title'
)
for
p
in
video_list
if
p
.
get
(
'id'
)
==
vid
][
0
]
currentVideoInfo
=
json_data
.
get
(
'currentVideoInfo'
)
m3u8_url
=
getM3u8UrlFromCurrentVideoInfo
(
currentVideoInfo
)
elif
re
.
match
(
"https?://[^\.]*\.*acfun\.[^\.]+/bangumi/aa(\d+)"
,
url
):
html
=
get_content
(
url
,
headers
=
fake_headers
)
tag_script
=
match1
(
html
,
r
'<script>\s*window\.pageInfo([^<]+)</script>'
)
json_text
=
tag_script
[
tag_script
.
find
(
'{'
)
:
tag_script
.
find
(
'};'
)
+
1
]
json_data
=
json
.
loads
(
json_text
)
title
=
json_data
[
'bangumiTitle'
]
+
" "
+
json_data
[
'episodeName'
]
+
" "
+
json_data
[
'title'
]
vid
=
str
(
json_data
[
'videoId'
])
up
=
"acfun"
currentVideoInfo
=
json_data
.
get
(
'currentVideoInfo'
)
m3u8_url
=
getM3u8UrlFromCurrentVideoInfo
(
currentVideoInfo
)
else
:
raise
NotImplemented
assert
title
and
m3u8_url
title
=
unescape_html
(
title
)
title
=
escape_file_path
(
title
)
p_title
=
r1
(
'active">([^<]+)'
,
html
)
title
=
'%s (%s)'
%
(
title
,
up
)
if
p_title
:
title
=
'%s - %s'
%
(
title
,
p_title
)
print_info
(
site_info
,
title
,
'm3u8'
,
float
(
'inf'
))
if
not
info_only
:
download_url_ffmpeg
(
m3u8_url
,
title
,
'mp4'
,
output_dir
=
output_dir
,
merge
=
merge
)
site
=
AcFun
()
site_info
=
"AcFun.cn"
download
=
acfun_download
download
=
site
.
download_by_url
download_playlist
=
playlist_not_supported
(
'acfun'
)
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录