提交 f001a082 编写于 作者: H Hsury

实现本地历史记录

上级 d4f1c8f8
......@@ -12,15 +12,11 @@ from urllib import parse
class Bilibili:
app_key = "1d8b6e7d45233436"
def __init__(self, https=True):
def __init__(self):
self._session = requests.Session()
self._session.headers.update({'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.75 Safari/537.36"})
self.get_cookies = lambda: self._session.cookies.get_dict(domain=".bilibili.com")
self.get_csrf = lambda: self.get_cookies().get("bili_jct", "")
self.get_sid = lambda: self.get_cookies().get("sid", "")
self.get_uid = lambda: self.get_cookies().get("DedeUserID", "")
self.access_token = ""
self.refresh_token = ""
self.username = ""
self.password = ""
self.info = {
......@@ -34,22 +30,18 @@ class Bilibili:
'level': 0,
'nickname': "",
}
self.protocol = "https" if https else "http"
self.proxy = None
self.proxy_pool = set()
def _log(self, message):
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}][{self.username if self.username else '#' + self.get_uid() if self.get_uid() else ''}] {message}")
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}] {message}")
def _requests(self, method, url, decode_level=2, enable_proxy=True, retry=10, timeout=15, **kwargs):
def _requests(self, method, url, decode_level=2, retry=10, timeout=15, **kwargs):
if method in ["get", "post"]:
for _ in range(retry + 1):
try:
response = getattr(self._session, method)(url, timeout=timeout, proxies=self.proxy if enable_proxy else None, **kwargs)
response = getattr(self._session, method)(url, timeout=timeout, **kwargs)
return response.json() if decode_level == 2 else response.content if decode_level == 1 else response
except:
if enable_proxy:
self.set_proxy()
pass
return None
def _solve_captcha(self, image):
......@@ -65,151 +57,71 @@ class Bilibili:
sign_hash.update(f"{param}{salt}".encode())
return sign_hash.hexdigest()
def set_proxy(self, add=None):
if isinstance(add, str):
self.proxy_pool.add(add)
elif isinstance(add, list):
self.proxy_pool.update(add)
if self.proxy_pool:
proxy = random.sample(self.proxy_pool, 1)[0]
self.proxy = {self.protocol: f"{self.protocol}://{proxy}"}
# self._log(f"使用{self.protocol.upper()}代理: {proxy}")
else:
self.proxy = None
return self.proxy
# 登录
def login(self, **kwargs):
def by_cookie():
url = f"{self.protocol}://api.bilibili.com/x/space/myinfo"
headers = {'Host': "api.bilibili.com"}
response = self._requests("get", url, headers=headers)
if response and response.get("code") != -101:
self._log("Cookie仍有效")
return True
else:
self._log("Cookie已失效")
return False
def by_token(force_refresh=False):
if not force_refresh:
param = f"access_key={self.access_token}&appkey={Bilibili.app_key}&ts={int(time.time())}"
url = f"{self.protocol}://passport.bilibili.com/api/v2/oauth2/info?{param}&sign={self.calc_sign(param)}"
response = self._requests("get", url)
def login(self, username, password):
def get_key():
url = f"https://passport.bilibili.com/api/oauth2/getKey"
payload = {
'appkey': Bilibili.app_key,
'sign': self.calc_sign(f"appkey={Bilibili.app_key}"),
}
while True:
response = self._requests("post", url, data=payload)
if response and response.get("code") == 0:
self._session.cookies.set('DedeUserID', str(response['data']['mid']), domain=".bilibili.com")
self._log(f"Token仍有效, 有效期至{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time() + int(response['data']['expires_in'])))}")
param = f"access_key={self.access_token}&appkey={Bilibili.app_key}&gourl={self.protocol}%3A%2F%2Faccount.bilibili.com%2Faccount%2Fhome&ts={int(time.time())}"
url = f"{self.protocol}://passport.bilibili.com/api/login/sso?{param}&sign={self.calc_sign(param)}"
self._requests("get", url, decode_level=0)
if all(key in self.get_cookies() for key in ["bili_jct", "DedeUserID", "DedeUserID__ckMd5", "sid", "SESSDATA"]):
self._log("Cookie获取成功")
return True
else:
self._log("Cookie获取失败")
url = f"{self.protocol}://passport.bilibili.com/api/v2/oauth2/refresh_token"
param = f"access_key={self.access_token}&appkey={Bilibili.app_key}&refresh_token={self.refresh_token}&ts={int(time.time())}"
return {
'key_hash': response['data']['hash'],
'pub_key': rsa.PublicKey.load_pkcs1_openssl_pem(response['data']['key'].encode()),
}
else:
time.sleep(1)
self.username = username
self.password = password
while True:
key = get_key()
key_hash, pub_key = key['key_hash'], key['pub_key']
url = f"https://passport.bilibili.com/api/v2/oauth2/login"
param = f"appkey={Bilibili.app_key}&password={parse.quote_plus(base64.b64encode(rsa.encrypt(f'{key_hash}{self.password}'.encode(), pub_key)))}&username={parse.quote_plus(self.username)}"
payload = f"{param}&sign={self.calc_sign(param)}"
headers = {'Content-type': "application/x-www-form-urlencoded"}
response = self._requests("post", url, data=payload, headers=headers)
if response and response.get("code") == 0:
for cookie in response['data']['cookie_info']['cookies']:
self._session.cookies.set(cookie['name'], cookie['value'], domain=".bilibili.com")
self.access_token = response['data']['token_info']['access_token']
self.refresh_token = response['data']['token_info']['refresh_token']
self._log(f"Token刷新成功, 有效期至{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time() + int(response['data']['token_info']['expires_in'])))}")
return True
else:
self.access_token = ""
self.refresh_token = ""
self._log("Token刷新失败")
return False
def by_password():
def get_key():
url = f"{self.protocol}://passport.bilibili.com/api/oauth2/getKey"
payload = {
'appkey': Bilibili.app_key,
'sign': self.calc_sign(f"appkey={Bilibili.app_key}"),
}
while True:
response = self._requests("post", url, data=payload)
if response and response.get("code") == 0:
return {
'key_hash': response['data']['hash'],
'pub_key': rsa.PublicKey.load_pkcs1_openssl_pem(response['data']['key'].encode()),
}
else:
time.sleep(1)
while True:
key = get_key()
key_hash, pub_key = key['key_hash'], key['pub_key']
url = f"{self.protocol}://passport.bilibili.com/api/v2/oauth2/login"
param = f"appkey={Bilibili.app_key}&password={parse.quote_plus(base64.b64encode(rsa.encrypt(f'{key_hash}{self.password}'.encode(), pub_key)))}&username={parse.quote_plus(self.username)}"
payload = f"{param}&sign={self.calc_sign(param)}"
headers = {'Content-type': "application/x-www-form-urlencoded"}
response = self._requests("post", url, data=payload, headers=headers)
while True:
if response and response.get("code") is not None:
if response['code'] == -105:
url = f"{self.protocol}://passport.bilibili.com/captcha"
headers = {'Host': "passport.bilibili.com"}
response = self._requests("get", url, headers=headers, decode_level=1)
captcha = self._solve_captcha(response)
if captcha:
self._log(f"登录验证码识别结果: {captcha}")
key = get_key()
key_hash, pub_key = key['key_hash'], key['pub_key']
url = f"{self.protocol}://passport.bilibili.com/api/v2/oauth2/login"
param = f"appkey={Bilibili.app_key}&captcha={captcha}&password={parse.quote_plus(base64.b64encode(rsa.encrypt(f'{key_hash}{self.password}'.encode(), pub_key)))}&username={parse.quote_plus(self.username)}"
payload = f"{param}&sign={self.calc_sign(param)}"
headers = {'Content-type': "application/x-www-form-urlencoded"}
response = self._requests("post", url, data=payload, headers=headers)
else:
self._log(f"登录验证码识别服务暂时不可用, {'尝试更换代理' if self.proxy else '10秒后重试'}")
if not self.set_proxy():
time.sleep(10)
break
elif response['code'] == 0 and response['data']['status'] == 0:
for cookie in response['data']['cookie_info']['cookies']:
self._session.cookies.set(cookie['name'], cookie['value'], domain=".bilibili.com")
self.access_token = response['data']['token_info']['access_token']
self.refresh_token = response['data']['token_info']['refresh_token']
self._log("登录成功")
return True
if response and response.get("code") is not None:
if response['code'] == -105:
url = f"https://passport.bilibili.com/captcha"
headers = {'Host': "passport.bilibili.com"}
response = self._requests("get", url, headers=headers, decode_level=1)
captcha = self._solve_captcha(response)
if captcha:
self._log(f"登录验证码识别结果: {captcha}")
key = get_key()
key_hash, pub_key = key['key_hash'], key['pub_key']
url = f"https://passport.bilibili.com/api/v2/oauth2/login"
param = f"appkey={Bilibili.app_key}&captcha={captcha}&password={parse.quote_plus(base64.b64encode(rsa.encrypt(f'{key_hash}{self.password}'.encode(), pub_key)))}&username={parse.quote_plus(self.username)}"
payload = f"{param}&sign={self.calc_sign(param)}"
headers = {'Content-type': "application/x-www-form-urlencoded"}
response = self._requests("post", url, data=payload, headers=headers)
else:
self._log(f"登录失败 {response}")
return False
self._log(f"登录验证码识别服务暂时不可用, 10秒后重试")
time.sleep(10)
break
elif response['code'] == 0 and response['data']['status'] == 0:
for cookie in response['data']['cookie_info']['cookies']:
self._session.cookies.set(cookie['name'], cookie['value'], domain=".bilibili.com")
self._log("登录成功")
return True
else:
self._log(f"当前IP登录过于频繁, {'尝试更换代理' if self.proxy else '1分钟后重试'}")
if not self.set_proxy():
time.sleep(60)
break
self._session.cookies.clear()
for name in ["bili_jct", "DedeUserID", "DedeUserID__ckMd5", "sid", "SESSDATA"]:
value = kwargs.get(name)
if value:
self._session.cookies.set(name, value, domain=".bilibili.com")
self.access_token = kwargs.get("access_token", "")
self.refresh_token = kwargs.get("refresh_token", "")
self.username = kwargs.get("username", "")
self.password = kwargs.get("password", "")
force_refresh_token = kwargs.get("force_refresh_token", False)
if (not force_refresh_token or not self.access_token or not self.refresh_token) and all(key in self.get_cookies() for key in ["bili_jct", "DedeUserID", "DedeUserID__ckMd5", "sid", "SESSDATA"]) and by_cookie():
return True
elif self.access_token and self.refresh_token and by_token(force_refresh_token):
return True
elif self.username and self.password and by_password():
return True
else:
self._session.cookies.clear()
return False
self._log(f"登录失败 {response}")
return False
else:
self._log(f"当前IP登录过于频繁, 1分钟后重试")
time.sleep(60)
break
# 获取用户信息
def get_user_info(self):
url = f"{self.protocol}://api.bilibili.com/x/space/myinfo?jsonp=jsonp"
url = f"https://api.bilibili.com/x/space/myinfo?jsonp=jsonp"
headers = {
'Host': "api.bilibili.com",
'Referer': f"https://space.bilibili.com/{self.get_uid()}/",
......
......@@ -14,26 +14,8 @@ import time
import types
from bilibili import Bilibili
def log(message):
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}] {message}")
def calc_sha1(data, hexdigest=False):
sha1 = hashlib.sha1()
if isinstance(data, types.GeneratorType):
for chunk in data:
sha1.update(chunk)
else:
sha1.update(data)
return sha1.hexdigest() if hexdigest else sha1.digest()
def read_in_chunks(file_name, chunk_size=1024 * 1024):
with open(file_name, "rb") as f:
while True:
data = f.read(chunk_size)
if data != b"":
yield data
else:
return
default_url = lambda sha1: f"http://i0.hdslb.com/bfs/album/{sha1}.x-ms-bmp"
meta_string = lambda url: ("bdrive://" + re.findall(r"[a-fA-F0-9]{40}", url)[0]) if re.match(r"^http(s?)://i0.hdslb.com/bfs/album/[a-fA-F0-9]{40}.x-ms-bmp$", url) else url
def bmp_header(data):
return b"BM" \
......@@ -54,6 +36,28 @@ def bmp_header(data):
+ b"\x00\x00\x00\x00" \
+ b"\x00\x00\x00\x00\xff\xff\xff\x00"
def calc_sha1(data, hexdigest=False):
sha1 = hashlib.sha1()
if isinstance(data, types.GeneratorType):
for chunk in data:
sha1.update(chunk)
else:
sha1.update(data)
return sha1.hexdigest() if hexdigest else sha1.digest()
def fetch_meta(string):
if re.match(r"^bdrive://[a-fA-F0-9]{40}$", string) or re.match(r"^[a-fA-F0-9]{40}$", string):
full_meta = image_download(default_url(re.findall(r'[a-fA-F0-9]{40}', string)[0]))
elif string.startswith("http://") or string.startswith("https://"):
full_meta = image_download(string)
else:
return None
try:
meta_dict = json.loads(full_meta[62:].decode("utf-8"))
return meta_dict
except:
return None
def image_upload(data, cookies):
url = "https://api.vc.bilibili.com/api/v1/drawImage/upload"
headers = {
......@@ -66,32 +70,48 @@ def image_upload(data, cookies):
'biz': "draw",
'category': "daily",
}
response = requests.post(url, headers=headers, cookies=cookies, files=files).json()
try:
response = requests.post(url, headers=headers, cookies=cookies, files=files).json()
except:
response = None
return response
def image_download(url):
response = requests.get(url)
return response.content
try:
response = requests.get(url).content
except:
response = None
return response
def fetch_meta(string):
if string.startswith("http://") or string.startswith("https://"):
full_meta = image_download(string)
elif re.match(r"^[a-fA-F0-9]{40}$", string):
full_meta = image_download(f"http://i0.hdslb.com/bfs/album/{string}.x-ms-bmp")
else:
return None
def log(message):
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}] {message}")
def read_history():
try:
meta_dict = json.loads(full_meta[62:].decode("utf-8"))
return meta_dict
with open("history.json", "r", encoding="utf-8") as f:
history = json.loads(f.read())
except:
return None
history = {}
return history
def login_handle(args):
bilibili = Bilibili()
bilibili.login(username=args.username, password=args.password)
bilibili.get_user_info()
with open(args.cookies_file, "w", encoding="utf-8") as f:
f.write(json.dumps(bilibili.get_cookies(), ensure_ascii=False, indent=2))
def read_in_chunks(file_name, chunk_size=16 * 1024 * 1024):
with open(file_name, "rb") as f:
while True:
data = f.read(chunk_size)
if data != b"":
yield data
else:
return
def history_handle(args):
history = read_history()
if history:
for index, meta_dict in enumerate(history.values()):
prefix = f"[{index}]"
print(f"{prefix} {meta_dict['filename']} ({meta_dict['size'] / 1024 / 1024:.2f} MB), 共有{len(meta_dict['block'])}个分块, 上传于{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(meta_dict['time']))}")
print(f"{' ' * len(prefix)} {meta_string(meta_dict['url'])}")
else:
print(f"暂无上传历史记录")
def info_handle(args):
meta_dict = fetch_meta(args.meta)
......@@ -106,6 +126,13 @@ def info_handle(args):
else:
log("元数据解析出错")
def login_handle(args):
bilibili = Bilibili()
if bilibili.login(username=args.username, password=args.password):
bilibili.get_user_info()
with open("cookies.json", "w", encoding="utf-8") as f:
f.write(json.dumps(bilibili.get_cookies(), ensure_ascii=False, indent=2))
def upload_handle(args):
def core(index, block):
block_sha1 = calc_sha1(block, hexdigest=True)
......@@ -113,7 +140,7 @@ def upload_handle(args):
full_block_sha1 = calc_sha1(full_block, hexdigest=True)
url = skippable(full_block_sha1)
if url:
log(f"分块{index} ({len(block) / 1024 / 1024:.2f} MB) 已存在于服务器")
# log(f"分块{index} ({len(block) / 1024 / 1024:.2f} MB) 已存在于服务器")
block_dict[index] = {
'url': url,
'size': len(block),
......@@ -121,38 +148,43 @@ def upload_handle(args):
}
done_flag.release()
else:
for _ in range(3):
# log(f"分块{index} ({len(block) / 1024 / 1024:.2f} MB) 开始上传")
for _ in range(10):
response = image_upload(full_block, cookies)
if response['code'] == 0:
url = response['data']['image_url']
log(f"分块{index} ({len(block) / 1024 / 1024:.2f} MB) 已上传")
block_dict[index] = {
'url': url,
'size': len(block),
'sha1': block_sha1,
}
done_flag.release()
break
elif response['code'] == -4:
terminate_flag.set()
log("上传失败, 请先登录")
break
if response:
if response['code'] == 0:
url = response['data']['image_url']
log(f"分块{index} ({len(block) / 1024 / 1024:.2f} MB) 上传完毕")
block_dict[index] = {
'url': url,
'size': len(block),
'sha1': block_sha1,
}
done_flag.release()
break
elif response['code'] == -4:
terminate_flag.set()
log(f"分块{index} ({len(block) / 1024 / 1024:.2f} MB) 第{_ + 1}次上传失败, 请重新登录")
break
log(f"分块{index} ({len(block) / 1024 / 1024:.2f} MB) 第{_ + 1}次上传失败")
else:
terminate_flag.set()
log(f"分块{index} ({len(block) / 1024 / 1024:.2f} MB) 上传失败, 服务器返回{response}")
def skippable(sha1):
url = f"http://i0.hdslb.com/bfs/album/{sha1}.x-ms-bmp"
url = default_url(sha1)
response = requests.head(url)
return url if response.status_code == 200 else None
done_flag = threading.Semaphore(0)
terminate_flag = threading.Event()
thread_pool = []
block_dict = {}
def write_history(meta_dict, url):
history = read_history()
history[meta_dict['sha1']] = meta_dict
history[meta_dict['sha1']]['url'] = url
with open("history.json", "w", encoding="utf-8") as f:
f.write(json.dumps(history, ensure_ascii=False, indent=2))
start_time = time.time()
try:
with open(args.cookies_file, "r", encoding="utf-8") as f:
with open("cookies.json", "r", encoding="utf-8") as f:
cookies = json.loads(f.read())
except:
log("Cookies加载失败, 请先登录")
......@@ -161,7 +193,17 @@ def upload_handle(args):
log(f"上传: {os.path.basename(file_name)} ({os.path.getsize(file_name) / 1024 / 1024:.2f} MB)")
sha1 = calc_sha1(read_in_chunks(file_name), hexdigest=True)
log(f"SHA-1: {sha1}")
history = read_history()
if sha1 in history:
url = history[sha1]['url']
log(f"该文件已于{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(history[sha1]['time']))}上传, 共有{len(history[sha1]['block'])}个分块")
log(meta_string(url))
return url
log(f"线程数: {args.thread}")
done_flag = threading.Semaphore(0)
terminate_flag = threading.Event()
thread_pool = []
block_dict = {}
for index, block in enumerate(read_in_chunks(file_name, chunk_size=args.block_size * 1024 * 1024)):
if len(thread_pool) >= args.thread:
done_flag.acquire()
......@@ -183,69 +225,79 @@ def upload_handle(args):
}
meta = json.dumps(meta_dict, ensure_ascii=False).encode("utf-8")
full_meta = bmp_header(meta) + meta
for _ in range(3):
for _ in range(10):
response = image_upload(full_meta, cookies)
if response['code'] == 0:
if response and response['code'] == 0:
url = response['data']['image_url']
log("元数据已上传")
log("元数据上传完毕")
log(f"{os.path.basename(file_name)}上传完毕, 共有{len(meta_dict['block'])}个分块, 用时{int(time.time() - start_time)}秒, 平均速度{meta_dict['size'] / 1024 / 1024 / (time.time() - start_time):.2f} MB/s")
log(f"META: {re.findall(r'[a-fA-F0-9]{40}', url)[0] if re.match(r'^http(s?)://i0.hdslb.com/bfs/album/[a-fA-F0-9]{40}.x-ms-bmp$', url) else url}")
log(meta_string(url))
write_history(meta_dict, url)
return url
log(f"元数据第{_ + 1}次上传失败")
else:
log(f"元数据上传失败, 服务器返回{response}")
return None
def download_handle(args):
def core(index, block_dict, f):
for _ in range(3):
def core(index, block_dict):
# log(f"分块{index} ({block_dict['size'] / 1024 / 1024:.2f} MB) 开始下载")
for _ in range(10):
block = image_download(block_dict['url'])[62:]
if calc_sha1(block, hexdigest=True) == block_dict['sha1']:
f.seek(block_offset(index))
f.write(block)
log(f"分块{index} ({block_dict['size'] / 1024 / 1024:.2f} MB) 已下载")
done_flag.release()
break
if block:
if calc_sha1(block, hexdigest=True) == block_dict['sha1']:
file_lock.acquire()
f.seek(block_offset(index))
f.write(block)
file_lock.release()
log(f"分块{index} ({block_dict['size'] / 1024 / 1024:.2f} MB) 下载完毕")
done_flag.release()
break
else:
log(f"分块{index} ({block_dict['size'] / 1024 / 1024:.2f} MB) 校验未通过, SHA-1与元数据中的记录{block_dict['sha1']}不匹配")
else:
log(f"分块{index} ({block_dict['size'] / 1024 / 1024:.2f} MB) 第{_ + 1}次下载失败")
else:
terminate_flag.set()
log(f"分块{index}校验未通过, SHA-1与元数据中的记录{block_dict['sha1']}不匹配")
return
def block_offset(index):
return sum(meta_dict['block'][i]['size'] for i in range(index))
done_flag = threading.Semaphore(0)
terminate_flag = threading.Event()
thread_pool = []
download_block_list = []
start_time = time.time()
meta_dict = fetch_meta(args.meta)
if meta_dict:
file_name = args.file if args.file else meta_dict['filename']
log(f"下载: {file_name} ({meta_dict['size'] / 1024 / 1024:.2f} MB), 共有{len(meta_dict['block'])}个分块, 上传于{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(meta_dict['time']))}")
log(f"下载: {os.path.basename(file_name)} ({meta_dict['size'] / 1024 / 1024:.2f} MB), 共有{len(meta_dict['block'])}个分块, 上传于{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(meta_dict['time']))}")
else:
log("元数据解析出错")
return None
log(f"线程数: {args.thread}")
download_block_list = []
if os.path.exists(file_name) and os.path.getsize(file_name) == meta_dict['size']:
if calc_sha1(read_in_chunks(file_name), hexdigest=True) == meta_dict['sha1']:
log(f"{file_name}已存在于本地")
log(f"{os.path.basename(file_name)}已存在于本地")
return file_name
else:
with open(file_name, "rb") as f:
for index, block_dict in enumerate(meta_dict['block']):
f.seek(block_offset(index))
if calc_sha1(f.read(block_dict['size']), hexdigest=True) == block_dict['sha1']:
log(f"分块{index} ({block_dict['size'] / 1024 / 1024:.2f} MB) 已存在于本地")
# log(f"分块{index} ({block_dict['size'] / 1024 / 1024:.2f} MB) 已存在于本地")
pass
else:
# log(f"分块{index} ({block_dict['size'] / 1024 / 1024:.2f} MB) 需要重新下载")
download_block_list.append(index)
else:
download_block_list = list(range(len(meta_dict['block'])))
done_flag = threading.Semaphore(0)
terminate_flag = threading.Event()
file_lock = threading.Lock()
thread_pool = []
with open(file_name, "r+b" if os.path.exists(file_name) else "wb") as f:
for index in download_block_list:
if len(thread_pool) >= args.thread:
done_flag.acquire()
if not terminate_flag.is_set():
thread_pool.append(threading.Thread(target=core, args=(index, meta_dict['block'][index], f)))
thread_pool.append(threading.Thread(target=core, args=(index, meta_dict['block'][index])))
thread_pool[-1].start()
else:
log("已终止下载, 等待线程回收")
......@@ -257,40 +309,35 @@ def download_handle(args):
sha1 = calc_sha1(read_in_chunks(file_name), hexdigest=True)
log(f"SHA-1: {sha1}")
if sha1 == meta_dict['sha1']:
log(f"{file_name}校验通过")
log(f"{file_name}下载完毕, 用时{int(time.time() - start_time)}秒, 平均速度{meta_dict['size'] / 1024 / 1024 / (time.time() - start_time):.2f} MB/s")
log(f"{os.path.basename(file_name)}校验通过")
log(f"{os.path.basename(file_name)}下载完毕, 用时{int(time.time() - start_time)}秒, 平均速度{meta_dict['size'] / 1024 / 1024 / (time.time() - start_time):.2f} MB/s")
return file_name
else:
log(f"{file_name}校验未通过, SHA-1与元数据中的记录{meta_dict['sha1']}不匹配")
log(f"{os.path.basename(file_name)}校验未通过, SHA-1与元数据中的记录{meta_dict['sha1']}不匹配")
return None
if __name__ == "__main__":
parser = argparse.ArgumentParser(prog="BiliDrive", description="Bilibili Drive", epilog="By Hsury, 2019/10/25")
parser.add_argument("-c", "--cookies-file", default="cookies.json", help="cookies json file name")
subparsers = parser.add_subparsers()
history_parser = subparsers.add_parser("history", help="view upload history")
history_parser.set_defaults(func=history_handle)
info_parser = subparsers.add_parser("info", help="view meta info")
info_parser.add_argument("meta", help="meta url")
info_parser.set_defaults(func=info_handle)
login_parser = subparsers.add_parser("login", help="login to bilibili")
login_parser.add_argument("username", help="username")
login_parser.add_argument("password", help="password")
login_parser.set_defaults(func=login_handle)
info_parser = subparsers.add_parser("info", help="get meta info")
info_parser.add_argument("meta", help="meta url")
info_parser.set_defaults(func=info_handle)
upload_parser = subparsers.add_parser("upload", help="upload a file")
upload_parser.add_argument("file", help="file name")
upload_parser.add_argument("-b", "--block-size", default=4, type=int, help="block size in MB")
upload_parser.add_argument("-t", "--thread", default=2, type=int, help="thread number")
upload_parser.add_argument("-t", "--thread", default=4, type=int, help="thread number")
upload_parser.set_defaults(func=upload_handle)
download_parser = subparsers.add_parser("download", help="download a file")
download_parser.add_argument("meta", help="meta url")
download_parser.add_argument("file", nargs="?", default="", help="save as file name")
download_parser.add_argument("-t", "--thread", default=4, type=int, help="thread number")
download_parser.add_argument("-t", "--thread", default=8, type=int, help="thread number")
download_parser.set_defaults(func=download_handle)
args = parser.parse_args()
try:
args.func(args)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册