bilibili.py 8.3 KB
Newer Older
H
Hsury 已提交
1
#!/usr/bin/env python3.7
H
Hsury 已提交
2 3 4 5 6 7 8 9
# -*- coding: utf-8 -*-

import base64
import hashlib
import random
import requests
import rsa
import time
W
wizardforcel 已提交
10
import re
H
Hsury 已提交
11
from urllib import parse
W
wizardforcel 已提交
12
from BiliDriveEx.util import *
H
Hsury 已提交
13 14 15 16

class Bilibili:
    app_key = "1d8b6e7d45233436"

W
wizardforcel 已提交
17 18 19 20
    
    default_url = lambda self, sha1: f"http://i0.hdslb.com/bfs/album/{sha1}.png"
    meta_string = lambda self, url: ("bdex://" + re.findall(r"[a-fA-F0-9]{40}", url)[0]) if re.match(r"^http(s?)://i0.hdslb.com/bfs/album/[a-fA-F0-9]{40}.png$", url) else url
    
H
Hsury 已提交
21
    def __init__(self):
H
Hsury 已提交
22
        self._session = requests.Session()
H
Hsury 已提交
23
        self._session.headers.update({'User-Agent': "Mozilla/5.0 BiliDroid/5.51.1 (bbcallen@gmail.com)"})
H
Hsury 已提交
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
        self.get_cookies = lambda: self._session.cookies.get_dict(domain=".bilibili.com")
        self.get_uid = lambda: self.get_cookies().get("DedeUserID", "")
        self.username = ""
        self.password = ""
        self.info = {
            'ban': False,
            'coins': 0,
            'experience': {
                'current': 0,
                'next': 0,
            },
            'face': "",
            'level': 0,
            'nickname': "",
        }

W
wizardforcel 已提交
40

H
Hsury 已提交
41

H
Hsury 已提交
42
    def _requests(self, method, url, decode_level=2, retry=0, timeout=10, **kwargs):
H
Hsury 已提交
43 44 45
        if method in ["get", "post"]:
            for _ in range(retry + 1):
                try:
H
Hsury 已提交
46
                    response = getattr(self._session, method)(url, timeout=timeout, **kwargs)
H
Hsury 已提交
47 48
                    return response.json() if decode_level == 2 else response.content if decode_level == 1 else response
                except:
H
Hsury 已提交
49
                    pass
H
Hsury 已提交
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
        return None

    def _solve_captcha(self, image):
        url = "https://bili.dev:2233/captcha"
        payload = {'image': base64.b64encode(image).decode("utf-8")}
        response = self._requests("post", url, json=payload)
        return response['message'] if response and response.get("code") == 0 else None

    @staticmethod
    def calc_sign(param):
        salt = "560c52ccd288fed045859ed18bffd973"
        sign_hash = hashlib.md5()
        sign_hash.update(f"{param}{salt}".encode())
        return sign_hash.hexdigest()

    # 登录
H
Hsury 已提交
66 67 68 69 70 71 72 73 74
    def login(self, username, password):
        def get_key():
            url = f"https://passport.bilibili.com/api/oauth2/getKey"
            payload = {
                'appkey': Bilibili.app_key,
                'sign': self.calc_sign(f"appkey={Bilibili.app_key}"),
            }
            while True:
                response = self._requests("post", url, data=payload)
H
Hsury 已提交
75
                if response and response.get("code") == 0:
H
Hsury 已提交
76 77 78 79 80 81 82 83 84 85 86 87 88
                    return {
                        'key_hash': response['data']['hash'],
                        'pub_key': rsa.PublicKey.load_pkcs1_openssl_pem(response['data']['key'].encode()),
                    }
                else:
                    time.sleep(1)

        self.username = username
        self.password = password

        while True:
            key = get_key()
            key_hash, pub_key = key['key_hash'], key['pub_key']
H
Hsury 已提交
89
            url = f"https://passport.bilibili.com/api/v2/oauth2/login"
H
Hsury 已提交
90
            param = f"appkey={Bilibili.app_key}&password={parse.quote_plus(base64.b64encode(rsa.encrypt(f'{key_hash}{self.password}'.encode(), pub_key)))}&username={parse.quote_plus(self.username)}"
H
Hsury 已提交
91 92 93 94
            payload = f"{param}&sign={self.calc_sign(param)}"
            headers = {'Content-type': "application/x-www-form-urlencoded"}
            response = self._requests("post", url, data=payload, headers=headers)
            while True:
H
Hsury 已提交
95 96 97 98 99 100 101
                if response and response.get("code") is not None:
                    if response['code'] == -105:
                        url = f"https://passport.bilibili.com/captcha"
                        headers = {'Host': "passport.bilibili.com"}
                        response = self._requests("get", url, headers=headers, decode_level=1)
                        captcha = self._solve_captcha(response)
                        if captcha:
W
wizardforcel 已提交
102
                            log(f"登录验证码识别结果: {captcha}")
H
Hsury 已提交
103 104 105 106 107 108 109
                            key = get_key()
                            key_hash, pub_key = key['key_hash'], key['pub_key']
                            url = f"https://passport.bilibili.com/api/v2/oauth2/login"
                            param = f"appkey={Bilibili.app_key}&captcha={captcha}&password={parse.quote_plus(base64.b64encode(rsa.encrypt(f'{key_hash}{self.password}'.encode(), pub_key)))}&username={parse.quote_plus(self.username)}"
                            payload = f"{param}&sign={self.calc_sign(param)}"
                            headers = {'Content-type': "application/x-www-form-urlencoded"}
                            response = self._requests("post", url, data=payload, headers=headers)
H
Hsury 已提交
110
                        else:
W
wizardforcel 已提交
111
                            log(f"登录验证码识别服务暂时不可用, 10秒后重试")
H
Hsury 已提交
112 113
                            time.sleep(10)
                            break
114 115 116
                    elif response['code'] == -449:
                        time.sleep(1)
                        response = self._requests("post", url, data=payload, headers=headers)
H
Hsury 已提交
117 118 119
                    elif response['code'] == 0 and response['data']['status'] == 0:
                        for cookie in response['data']['cookie_info']['cookies']:
                            self._session.cookies.set(cookie['name'], cookie['value'], domain=".bilibili.com")
W
wizardforcel 已提交
120
                        log("登录成功")
H
Hsury 已提交
121
                        return True
H
Hsury 已提交
122
                    else:
W
wizardforcel 已提交
123
                        log(f"登录失败 {response}")
H
Hsury 已提交
124 125
                        return False
                else:
W
wizardforcel 已提交
126
                    log(f"当前IP登录过于频繁, 1分钟后重试")
H
Hsury 已提交
127 128
                    time.sleep(60)
                    break
H
Hsury 已提交
129 130 131

    # 获取用户信息
    def get_user_info(self):
H
Hsury 已提交
132
        url = f"https://api.bilibili.com/x/space/myinfo?jsonp=jsonp"
H
Hsury 已提交
133 134 135 136 137 138 139 140 141 142 143 144 145
        headers = {
            'Host': "api.bilibili.com",
            'Referer': f"https://space.bilibili.com/{self.get_uid()}/",
        }
        response = self._requests("get", url, headers=headers)
        if response and response.get("code") == 0:
            self.info['ban'] = bool(response['data']['silence'])
            self.info['coins'] = response['data']['coins']
            self.info['experience']['current'] = response['data']['level_exp']['current_exp']
            self.info['experience']['next'] = response['data']['level_exp']['next_exp']
            self.info['face'] = response['data']['face']
            self.info['level'] = response['data']['level']
            self.info['nickname'] = response['data']['name']
W
wizardforcel 已提交
146
            log(f"{self.info['nickname']}(UID={self.get_uid()}), Lv.{self.info['level']}({self.info['experience']['current']}/{self.info['experience']['next']}), 拥有{self.info['coins']}枚硬币, 账号{'状态正常' if not self.info['ban'] else '被封禁'}")
H
Hsury 已提交
147 148
            return True
        else:
W
wizardforcel 已提交
149
            log("用户信息获取失败")
H
Hsury 已提交
150
            return False
W
wizardforcel 已提交
151 152 153 154 155 156 157 158 159 160 161 162 163 164

            
    def exist(self, sha1):
        url = self.default_url(sha1)
        headers = {
            'Referer': "http://t.bilibili.com/",
            'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36",
        }
        for _ in range(5):
            try:
                response = requests.head(url, headers=headers, timeout=10)
                return url if response.status_code == 200 else None
            except:
                pass
W
wizardforcel 已提交
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
        return None
        
        
    def image_upload(self, data, cookies):
        sha1 = calc_sha1(data)
        url = self.exist(sha1)
        if url: return {'code': 0, 'data': {'image_url': url}}
    
        url = "https://api.vc.bilibili.com/api/v1/drawImage/upload"
        headers = {
            'Origin': "https://t.bilibili.com",
            'Referer': "https://t.bilibili.com/",
            'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36",
        }
        files = {
            'file_up': (f"{int(time.time() * 1000)}.png", data),
        }
        data = {
            'biz': "draw",
            'category': "daily",
        }
        try:
            response = requests.post(url, data=data, headers=headers, cookies=cookies, files=files, timeout=300).json()
        except:
            response = None
        return response