提交 72df45bc 编写于 作者: M Mort Yao

refactor, code cleanup for YouTube

上级 fdc9d81c
......@@ -7,6 +7,7 @@ import os
import re
import sys
from urllib import request, parse
import platform
from .version import __version__
......@@ -33,20 +34,63 @@ def tr(s):
except:
return str(s.encode('utf-8'))[2:-1]
# DEPRECATED in favor of match1()
def r1(pattern, text):
m = re.search(pattern, text)
if m:
return m.group(1)
# DEPRECATED in favor of match1()
def r1_of(patterns, text):
for p in patterns:
x = r1(p, text)
if x:
return x
def match1(text, *patterns):
"""Scans through a string for substrings matched some patterns (first-subgroups only).
Args:
text: A string to be scanned.
patterns: Arbitrary number of regex patterns.
Returns:
When only one pattern is given, returns a string (None if no match found).
When more than one pattern are given, returns a list of strings ([] if no match found).
"""
if len(patterns) == 1:
pattern = patterns[0]
match = re.search(pattern, text)
if match:
return match.group(1)
else:
return None
else:
ret = []
for pattern in patterns:
match = re.search(pattern, text)
if match:
ret.append(match.group(1))
return ret
def parse_query_param(url, param):
"""Parses the query string of a URL and returns the value of a parameter.
Args:
url: A URL.
param: A string representing the name of the parameter.
Returns:
The value of the parameter.
"""
return parse.parse_qs(parse.urlparse(url).query)[param][0]
def unicodize(text):
return re.sub(r'\\u([0-9A-Fa-f][0-9A-Fa-f][0-9A-Fa-f][0-9A-Fa-f])', lambda x: chr(int(x.group(0)[2:], 16)), text)
# DEPRECATED in favor of filenameable()
def escape_file_path(path):
path = path.replace('/', '-')
path = path.replace('\\', '-')
......@@ -54,23 +98,57 @@ def escape_file_path(path):
path = path.replace('?', '-')
return path
def filenameable(text):
"""Converts a string to a legal filename through various OSes.
"""
# All POSIX systems
text = text.translate({
0: None,
ord('/'): '-',
})
if platform.system() == 'Darwin': # For Mac OS
text = text.translate({
ord(':'): '-',
})
elif platform.system() == 'Windows': # For Windows
text = text.translate({
ord(':'): '-',
ord('*'): '-',
ord('?'): '-',
ord('\\'): '-',
ord('\"'): '\'',
ord('<'): '-',
ord('>'): '-',
ord('|'): '-',
ord('+'): '-',
ord('['): '(',
ord(']'): ')',
})
return text
def unescape_html(html):
from html import parser
html = parser.HTMLParser().unescape(html)
html = re.sub(r'&#(\d+);', lambda x: chr(int(x.group(1))), html)
return html
def ungzip(s):
def ungzip(data):
"""Decompresses data for Content-Encoding: gzip.
"""
from io import BytesIO
import gzip
buffer = BytesIO(s)
f = gzip.GzipFile(fileobj = buffer)
buffer = BytesIO(data)
f = gzip.GzipFile(fileobj=buffer)
return f.read()
def undeflate(s):
def undeflate(data):
"""Decompresses data for Content-Encoding: deflate.
(the zlib compression is used.)
"""
import zlib
return zlib.decompress(s, -zlib.MAX_WBITS)
return zlib.decompress(data, -zlib.MAX_WBITS)
# DEPRECATED in favor of get_content()
def get_response(url, faker = False):
if faker:
response = request.urlopen(request.Request(url, headers = fake_headers), None)
......@@ -85,10 +163,12 @@ def get_response(url, faker = False):
response.data = data
return response
# DEPRECATED in favor of get_content()
def get_html(url, encoding = None, faker = False):
content = get_response(url, faker).data
return str(content, 'utf-8', 'ignore')
# DEPRECATED in favor of get_content()
def get_decoded_html(url, faker = False):
response = get_response(url, faker)
data = response.data
......@@ -98,6 +178,38 @@ def get_decoded_html(url, faker = False):
else:
return data
def get_content(url, headers={}, decoded=True):
"""Gets the content of a URL via sending a HTTP GET request.
Args:
url: A URL.
headers: Request headers used by the client.
decoded: Whether decode the response body using UTF-8 or the charset specified in Content-Type.
Returns:
The content as a string.
"""
response = request.urlopen(request.Request(url, headers=headers))
data = response.read()
# Handle HTTP compression for gzip and deflate (zlib)
content_encoding = response.getheader('Content-Encoding')
if content_encoding == 'gzip':
data = ungzip(data)
elif content_encoding == 'deflate':
data = undeflate(data)
# Decode the response body
if decoded:
charset = match1(response.getheader('Content-Type'), r'charset=([\w-]+)')
if charset is not None:
data = data.decode(charset)
else:
data = data.decode('utf-8')
return data
def url_size(url, faker = False):
if faker:
response = request.urlopen(request.Request(url, headers = fake_headers), None)
......@@ -388,7 +500,9 @@ def download_urls(urls, title, ext, total_size, output_dir = '.', refer = None,
import sys
traceback.print_exc(file = sys.stdout)
pass
title = escape_file_path(title)
title = filenameable(title)
filename = '%s.%s' % (title, ext)
filepath = os.path.join(output_dir, filename)
if total_size:
......@@ -463,7 +577,9 @@ def download_urls_chunked(urls, title, ext, total_size, output_dir = '.', refer
return
assert ext in ('ts')
title = escape_file_path(title)
title = filenameable(title)
filename = '%s.%s' % (title, 'ts')
filepath = os.path.join(output_dir, filename)
if total_size:
......
......@@ -6,7 +6,7 @@ from ..common import *
# YouTube media encoding options, in descending quality order.
# taken from http://en.wikipedia.org/wiki/YouTube#Quality_and_codecs, 3/22/2013.
youtube_codecs = [
yt_codecs = [
{'itag': 38, 'container': 'MP4', 'video_resolution': '3072p', 'video_encoding': 'H.264', 'video_profile': 'High', 'video_bitrate': '3.5-5', 'audio_encoding': 'AAC', 'audio_bitrate': '192'},
{'itag': 46, 'container': 'WebM', 'video_resolution': '1080p', 'video_encoding': 'VP8', 'video_profile': '', 'video_bitrate': '', 'audio_encoding': 'Vorbis', 'audio_bitrate': '192'},
{'itag': 37, 'container': 'MP4', 'video_resolution': '1080p', 'video_encoding': 'H.264', 'video_profile': 'High', 'video_bitrate': '3-4.3', 'audio_encoding': 'AAC', 'audio_bitrate': '192'},
......@@ -32,52 +32,6 @@ youtube_codecs = [
{'itag': 17, 'container': '3GP', 'video_resolution': '144p', 'video_encoding': 'MPEG-4 Visual', 'video_profile': 'Simple', 'video_bitrate': '0.05', 'audio_encoding': 'AAC', 'audio_bitrate': '24'},
]
def parse_video_info(raw_info):
"""Parser for YouTube's get_video_info data.
Returns a dict, where 'url_encoded_fmt_stream_map' maps to a sorted list.
"""
# Percent-encoding reserved characters, used as separators.
sepr = {
'&': '%26',
',': '%2C',
'=': '%3D',
}
# fmt_level = {'itag': level, ...}
# itag of a higher quality maps to a lower level number.
# The highest quality has level number 0.
fmt_level = dict(
zip(
[str(codec['itag'])
for codec in
youtube_codecs],
range(len(youtube_codecs))))
# {key1: value1, key2: value2, ...,
# 'url_encoded_fmt_stream_map': [{'itag': '38', ...}, ...]
# }
return dict(
[(lambda metadata:
['url_encoded_fmt_stream_map', (
lambda stream_map:
sorted(
[dict(
[subitem.split(sepr['='])
for subitem in
item.split(sepr['&'])])
for item in
stream_map.split(sepr[','])],
key =
lambda stream:
fmt_level[stream['itag']]))
(metadata[1])]
if metadata[0] == 'url_encoded_fmt_stream_map'
else metadata)
(item.split('='))
for item in
raw_info.split('&')])
# Signature decryption algorithm, reused code from youtube-dl
def decrypt_signature(s):
if len(s) == 88:
......@@ -97,56 +51,42 @@ def decrypt_signature(s):
else:
raise Exception('Unable to decrypt signature, key length %d not supported; retrying might work' % (len(s)))
def youtube_download_by_id(id, title = None, output_dir = '.', merge = True, info_only = False):
raw_info = request.urlopen('http://www.youtube.com/get_video_info?video_id=%s' % id).read().decode('utf-8')
def youtube_download_by_id(id, title=None, output_dir='.', merge=True, info_only=False):
"""Downloads a YouTube video by its unique id.
"""
video_info = parse_video_info(raw_info)
raw_video_info = get_content('http://www.youtube.com/get_video_info?video_id=%s' % id)
video_info = parse.parse_qs(raw_video_info)
if video_info['status'] == 'ok' and not video_info['use_cipher_signature'] == 'True': # use get_video_info data
title = parse.unquote(video_info['title'].replace('+', ' '))
signature = video_info['url_encoded_fmt_stream_map'][0]['sig']
url = parse.unquote(parse.unquote(video_info['url_encoded_fmt_stream_map'][0]['url'])) + "&signature=%s" % signature
if video_info['status'] == ['ok'] and ('use_cipher_signature' not in video_info or video_info['use_cipher_signature'] == ['False']):
title = parse.unquote_plus(video_info['title'][0])
stream_list = parse.parse_qs(raw_video_info)['url_encoded_fmt_stream_map'][0].split(',')
else: # parse video page when "embedding disabled by request"
import json
html = request.urlopen('http://www.youtube.com/watch?v=' + id).read().decode('utf-8')
html = unescape_html(html)
yt_player_config = json.loads(r1(r'ytplayer.config = ([^\n]+);', html))
title = yt_player_config['args']['title']
title = unicodize(title)
title = parse.unquote(title)
title = escape_file_path(title)
else:
# Parse video page when video_info is not usable.
video_page = get_content('http://www.youtube.com/watch?v=%s' % id)
ytplayer_config = json.loads(match1(video_page, r'ytplayer.config\s*=\s*([^\n]+);'))
for itag in [
'38',
'46', '37',
'102', '45', '22',
'84',
'120',
'85',
'44', '35',
'101', '100', '43', '34', '82', '18',
'6', '83', '13', '5', '36', '17',
]:
fmt = r1(r'([^,\"]*itag=' + itag + "[^,\"]*)", html)
if fmt:
url = r1(r'url=([^\\]+)', fmt)
url = unicodize(url)
url = parse.unquote(url)
sig = r1(r'sig=([^\\]+)', fmt) or decrypt_signature(r1(r's=([^\\]+)', fmt))
url = url + '&signature=' + sig
break
try:
url
except NameError:
url = r1(r'ytdns.ping\("([^"]+)"[^;]*;</script>', html)
url = unicodize(url)
url = re.sub(r'\\/', '/', url)
url = re.sub(r'generate_204', 'videoplayback', url)
title = ytplayer_config['args']['title']
stream_list = ytplayer_config['args']['url_encoded_fmt_stream_map'].split(',')
streams = {
parse.parse_qs(stream)['itag'][0] : parse.parse_qs(stream)
for stream in stream_list
}
for codec in yt_codecs:
itag = str(codec['itag'])
if itag in streams:
download_stream = streams[itag]
break
url = download_stream['url'][0]
if 'sig' in download_stream:
sig = download_stream['sig'][0]
else:
sig = decrypt_signature(download_stream['s'][0])
url = '%s&signature=%s' % (url, sig)
type, ext, size = url_info(url)
......@@ -154,13 +94,14 @@ def youtube_download_by_id(id, title = None, output_dir = '.', merge = True, inf
if not info_only:
download_urls([url], title, ext, size, output_dir, merge = merge)
def youtube_download(url, output_dir = '.', merge = True, info_only = False):
id = r1(r'youtu.be/(.*)', url)
if not id:
id = parse.parse_qs(parse.urlparse(url).query)['v'][0]
def youtube_download(url, output_dir='.', merge=True, info_only=False):
"""Downloads YouTube videos by URL.
"""
id = match1(url, r'youtu.be/([^/]+)') or parse_query_param(url, 'v')
assert id
youtube_download_by_id(id, None, output_dir, merge = merge, info_only = info_only)
youtube_download_by_id(id, title=None, output_dir=output_dir, merge=merge, info_only=info_only)
site_info = "YouTube.com"
download = youtube_download
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册