提交 650993d3 编写于 作者: W wizardforcel

增加API的异常捕获

上级 9f7b5aa0
......@@ -37,11 +37,14 @@ class Bilibili:
def _solve_captcha(self, image):
url = "https://bili.dev:2233/captcha"
payload = {'image': base64.b64encode(image).decode("utf-8")}
response = request_retry("post", url,
headers=Bilibili.default_hdrs,
json=payload
).json()
return response['message'] if response and response.get("code") == 0 else None
try:
j = request_retry("post", url,
headers=Bilibili.default_hdrs,
json=payload
).json()
except:
return None
return j['message'] if j['code'] == 0 else None
@staticmethod
def calc_sign(param):
......@@ -51,29 +54,37 @@ class Bilibili:
return sign_hash.hexdigest()
def get_key(self):
def _get_key(self):
url = f"https://passport.bilibili.com/api/oauth2/getKey"
payload = {
'appkey': Bilibili.app_key,
'sign': self.calc_sign(f"appkey={Bilibili.app_key}"),
}
r = request_retry("post", url, data=payload,
headers=Bilibili.default_hdrs,
cookies=self.cookies, retry=999999
)
try:
r = request_retry("post", url, data=payload,
headers=Bilibili.default_hdrs,
cookies=self.cookies, retry=999999
)
except:
return None
for k, v in r.cookies.items(): self.cookies[k] = v
j = r.json()
if j and j['code'] == 0:
if j['code'] == 0:
return {
'key_hash': j['data']['hash'],
'pub_key': rsa.PublicKey.load_pkcs1_openssl_pem(j['data']['key'].encode()),
}
else:
return None
def login_once(self, username, password, captcha=None):
key = self.get_key()
def _login_once(self, username, password, captcha=None):
key = self._get_key()
if not key: return {'code': 114514, 'message': 'key 获取失败'}
key_hash, pub_key = key['key_hash'], key['pub_key']
username = parse.quote_plus(username)
password = parse.quote_plus(base64.b64encode(rsa.encrypt(f'{key_hash}{password}'.encode(), pub_key)))
url = f"https://passport.bilibili.com/api/v2/oauth2/login"
param = f"appkey={Bilibili.app_key}"
if captcha: param += f'&captcha={captcha}'
......@@ -81,36 +92,46 @@ class Bilibili:
payload = f"{param}&sign={self.calc_sign(param)}"
headers = Bilibili.default_hdrs.copy()
headers.update({'Content-type': "application/x-www-form-urlencoded"})
j = request_retry("POST", url, data=payload,
headers=headers,
cookies=self.cookies
).json()
try:
j = request_retry("POST", url, data=payload,
headers=headers,
cookies=self.cookies
).json()
except Exception as ex:
return {'code': 114514, 'message': str(ex)}
return j
def get_captcha(self):
def _get_captcha(self):
url = f"https://passport.bilibili.com/captcha"
data = request_retry('GET', url,
headers=Bilibili.default_hdrs,
cookies=self.cookies
).content
return data
try:
img = request_retry('GET', url,
headers=Bilibili.default_hdrs,
cookies=self.cookies
).content
except:
return None
return img
# 登录
def login(self, username, password):
captcha = None
while True:
response = self.login_once(username, password, captcha)
# print(response, self.cookies)
j = self._login_once(username, password, captcha)
if not response or 'code' not in response:
if 'code' not in j:
log(f"当前IP登录过于频繁, 1分钟后重试")
time.sleep(60)
continue
if response['code'] == -105:
response = self.get_captcha()
captcha = self._solve_captcha(response)
if j['code'] == -105:
img = self._get_captcha()
if not img:
log(f"验证码获取失败")
time.sleep(1)
continue
captcha = self._solve_captcha(img)
if captcha:
log(f"登录验证码识别结果: {captcha}")
else:
......@@ -118,36 +139,40 @@ class Bilibili:
time.sleep(10)
continue
if response['code'] == -449:
if j['code'] == -449:
time.sleep(1)
continue
if response['code'] == 0 and response['data']['status'] == 0:
if j['code'] == 0 and j['data']['status'] == 0:
self.cookies = {}
for cookie in response['data']['cookie_info']['cookies']:
for cookie in j['data']['cookie_info']['cookies']:
self.cookies[cookie['name']] = cookie['value']
log("登录成功")
self.save_cookies()
return True
log(f"登录失败 {response}")
log(f"登录失败 {j['message']}")
return False
# 获取用户信息
def get_user_info(self, fmt=True):
url = f"https://api.bilibili.com/x/space/myinfo"
headers = {
headers = Bilibili.default_hdrs.copy()
headers.update({
'Referer': f"https://space.bilibili.com",
}
response = request_retry("get", url,
headers=headers,
cookies=self.cookies
).json()
})
if not response or response.get("code") != 0:
try:
j = request_retry("get", url,
headers=headers,
cookies=self.cookies
).json()
except:
return
if j['code'] != 0: return
info = {
'ban': False,
'coins': 0,
......@@ -160,14 +185,14 @@ class Bilibili:
'nickname': "",
'uid': 0,
}
info['ban'] = bool(response['data']['silence'])
info['coins'] = response['data']['coins']
info['experience']['current'] = response['data']['level_exp']['current_exp']
info['experience']['next'] = response['data']['level_exp']['next_exp']
info['face'] = response['data']['face']
info['level'] = response['data']['level']
info['nickname'] = response['data']['name']
info['uid'] = response['data']['mid']
info['ban'] = bool(j['data']['silence'])
info['coins'] = j['data']['coins']
info['experience']['current'] = j['data']['level_exp']['current_exp']
info['experience']['next'] = j['data']['level_exp']['next_exp']
info['face'] = j['data']['face']
info['level'] = j['data']['level']
info['nickname'] = j['data']['name']
info['uid'] = j['data']['mid']
if fmt:
return f"{info['nickname']}(UID={info['uid']}), Lv.{info['level']}({info['experience']['current']}/{info['experience']['next']}), 拥有{info['coins']}枚硬币, 账号{'状态正常' if not info['ban'] else '被封禁'}"
......@@ -186,17 +211,12 @@ class Bilibili:
pass
def exist(self, sha1):
url = self.default_url(sha1)
try:
url = self.default_url(sha1)
headers = {
'Referer': "http://t.bilibili.com/",
'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36",
}
res = request_retry('HEAD', url, headers=headers, timeout=10)
return url if res.status_code == 200 else None
r = request_retry('HEAD', url, headers=Bilibili.default_hdrs)
except:
return
return url if r.status_code == 200 else None
def image_upload(self, data):
sha1 = calc_sha1(data)
......@@ -204,11 +224,11 @@ class Bilibili:
if url: return {'code': 0, 'data': {'image_url': url}}
url = "https://api.vc.bilibili.com/api/v1/drawImage/upload"
headers = {
headers = Bilibili.default_hdrs.copy()
headers.update({
'Origin': "https://t.bilibili.com",
'Referer': "https://t.bilibili.com/",
'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36",
}
})
files = {
'file_up': (f"{int(time.time() * 1000)}.png", data),
}
......@@ -217,11 +237,14 @@ class Bilibili:
'category': "daily",
}
try:
response = requests.post(url, data=data,
r = requests.post(url, data=data,
headers=headers,
cookies=self.cookies,
files=files, timeout=300
).json()
except:
response = None
return response
\ No newline at end of file
)
except Exception as ex:
return {'code': 114514, 'message': str(ex)}
if r.status_code != 200:
return {'code': r.status_code, 'message': r.text}
return r.json()
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册