提交 bb9d5d4f 编写于 作者: W wizardforcel

优化代码结构

上级 fe65b232
......@@ -22,16 +22,14 @@ from BiliDriveEx.encoder import Encoder
from BiliDriveEx.util import *
encoder = Encoder()
default_url = lambda sha1: f"http://i0.hdslb.com/bfs/album/{sha1}.png"
meta_string = lambda url: ("bdex://" + re.findall(r"[a-fA-F0-9]{40}", url)[0]) if re.match(r"^http(s?)://i0.hdslb.com/bfs/album/[a-fA-F0-9]{40}.png$", url) else url
api = Bilibili()
def fetch_meta(s):
if re.match(r"^bdex://[a-fA-F0-9]{40}$", s):
full_meta = image_download(default_url(re.findall(r"[a-fA-F0-9]{40}", s)[0]))
full_meta = image_download(api.default_url(re.findall(r"[a-fA-F0-9]{40}", s)[0]))
elif re.match(r"^bdrive://[a-fA-F0-9]{40}$", s):
full_meta = image_download(
default_url(re.findall(r"[a-fA-F0-9]{40}", s)[0]).replace('png', 'x-ms-bmp')
api.default_url(re.findall(r"[a-fA-F0-9]{40}", s)[0]).replace('png', 'x-ms-bmp')
)
elif s.startswith("http://") or s.startswith("https://"):
full_meta = image_download(s)
......@@ -64,9 +62,8 @@ def image_upload(data, cookies):
return response
def login_handle(args):
bilibili = Bilibili()
if bilibili.login(username=args.username, password=args.password):
bilibili.get_user_info()
if api.login(username=args.username, password=args.password):
api.get_user_info()
with open(os.path.join(bundle_dir, "cookies.json"), "w", encoding="utf-8") as f:
f.write(json.dumps(bilibili.get_cookies(), ensure_ascii=False, indent=2))
......@@ -76,7 +73,7 @@ def upload_handle(args):
block_sha1 = calc_sha1(block)
full_block = encoder.encode(block)
full_block_sha1 = calc_sha1(full_block)
url = is_skippable(full_block_sha1)
url = api.exist(full_block_sha1)
if url:
log(f"分块{index + 1}/{block_num}上传完毕")
block_dict[index] = {
......@@ -113,20 +110,6 @@ def upload_handle(args):
finally:
done_flag.release()
def is_skippable(sha1):
url = default_url(sha1)
headers = {
'Referer': "http://t.bilibili.com/",
'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36",
}
for _ in range(5):
try:
response = requests.head(url, headers=headers, timeout=10)
return url if response.status_code == 200 else None
except:
pass
return None
def write_history(first_4mb_sha1, meta_dict, url):
history = read_history()
history[first_4mb_sha1] = meta_dict
......@@ -148,7 +131,7 @@ def upload_handle(args):
if first_4mb_sha1 in history:
url = history[first_4mb_sha1]['url']
log(f"文件已于{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(history[first_4mb_sha1]['time']))}上传, 共有{len(history[first_4mb_sha1]['block'])}个分块")
log(f"META URL -> {meta_string(url)}")
log(f"META URL -> {api.meta_string(url)}")
return url
try:
with open(os.path.join(bundle_dir, "cookies.json"), "r", encoding="utf-8") as f:
......@@ -191,7 +174,7 @@ def upload_handle(args):
url = response['data']['image_url']
log("元数据上传完毕")
log(f"{meta_dict['filename']} ({size_string(meta_dict['size'])}) 上传完毕, 用时{time.time() - start_time:.1f}秒, 平均速度{size_string(meta_dict['size'] / (time.time() - start_time))}/s")
log(f"META URL -> {meta_string(url)}")
log(f"META URL -> {api.meta_string(url)}")
write_history(first_4mb_sha1, meta_dict, url)
return url
log(f"元数据第{_ + 1}次上传失败")
......@@ -312,7 +295,7 @@ def history_handle(args):
for index, meta_dict in enumerate(history.values()):
prefix = f"[{index + 1}]"
print(f"{prefix} {meta_dict['filename']} ({size_string(meta_dict['size'])}), 共有{len(meta_dict['block'])}个分块, 上传于{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(meta_dict['time']))}")
print(f"{' ' * len(prefix)} META URL -> {meta_string(meta_dict['url'])}")
print(f"{' ' * len(prefix)} META URL -> {api.meta_string(meta_dict['url'])}")
else:
print(f"暂无历史记录")
......
......@@ -7,12 +7,17 @@ import random
import requests
import rsa
import time
import re
from urllib import parse
from BiliDriveEx.util import *
class Bilibili:
app_key = "1d8b6e7d45233436"
default_url = lambda self, sha1: f"http://i0.hdslb.com/bfs/album/{sha1}.png"
meta_string = lambda self, url: ("bdex://" + re.findall(r"[a-fA-F0-9]{40}", url)[0]) if re.match(r"^http(s?)://i0.hdslb.com/bfs/album/[a-fA-F0-9]{40}.png$", url) else url
def __init__(self):
self._session = requests.Session()
self._session.headers.update({'User-Agent': "Mozilla/5.0 BiliDroid/5.51.1 (bbcallen@gmail.com)"})
......@@ -143,3 +148,18 @@ class Bilibili:
else:
log("用户信息获取失败")
return False
def exist(self, sha1):
url = self.default_url(sha1)
headers = {
'Referer': "http://t.bilibili.com/",
'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36",
}
for _ in range(5):
try:
response = requests.head(url, headers=headers, timeout=10)
return url if response.status_code == 200 else None
except:
pass
return None
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册