encode.py 5.6 KB
Newer Older
H
hjdhnx 已提交
1 2 3 4 5 6 7 8
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File  : encode.py
# Author: DaShenHan&道长-----先苦后甜,任凭晚风拂柳颜------
# Date  : 2022/8/29

import base64
import requests
9 10
import requests.utils
from time import sleep
11
import os
H
hjdhnx 已提交
12
from utils.web import UC_UA,PC_UA
H
hjdhnx 已提交
13

14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
def getPreJs():
    base_path = os.path.dirname(os.path.abspath(os.path.dirname(__file__)))  # 上级目
    lib_path = os.path.join(base_path, f'libs/pre.js')
    with open(lib_path,encoding='utf-8') as f:
        code = f.read()
    return code

def getCryptoJS():
    base_path = os.path.dirname(os.path.abspath(os.path.dirname(__file__)))  # 上级目
    os.makedirs(os.path.join(base_path, f'libs'), exist_ok=True)
    lib_path = os.path.join(base_path, f'libs/crypto-hiker.js')
    # print('加密库地址:', lib_path)
    if not os.path.exists(lib_path):
        return 'undefiend'
    with open(lib_path,encoding='utf-8') as f:
        code = f.read()
    return code

32 33 34 35 36 37
def getHome(url):
    # http://www.baidu.com:9000/323
    urls = url.split('//')
    homeUrl = urls[0] + '//' + urls[1].split('/')[0]
    return homeUrl

38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
class OcrApi:
    def __init__(self,api):
        self.api = api

    def classification(self,img):
        try:
            code = requests.post(self.api,data=img,headers={'user-agent':PC_UA}).text
        except Exception as e:
            print(f'ocr识别发生错误:{e}')
            code = ''
        return code

def verifyCode(url,headers,timeout=5,total_cnt=3,api=None):
    if not api:
        api = 'http://192.168.3.224:9000/api/ocr_img'
53 54 55 56 57 58
    lower_keys = list(map(lambda x: x.lower(), headers.keys()))
    host = getHome(url)
    if not 'referer' in lower_keys:
        headers['Referer'] = host
    print(f'开始自动过验证,请求头:{headers}')
    cnt = 0
59
    ocr = OcrApi(api)
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
    while cnt < total_cnt:
        s = requests.session()
        try:
            img = s.get(url=f"{host}/index.php/verify/index.html", headers=headers,timeout=timeout).content
            code = ocr.classification(img)
            print(f'第{cnt+1}次验证码识别结果:{code}')
            res = s.post(
                url=f"{host}/index.php/ajax/verify_check?type=search&verify={code}",
                headers=headers).json()
            if res["msg"] == "ok":
                cookies_dict = requests.utils.dict_from_cookiejar(s.cookies)
                cookie_str = ';'.join([f'{k}={cookies_dict[k]}' for k in cookies_dict])
                # return cookies_dict
                return cookie_str
        except:
            print(f'第{cnt+1}次验证码提交失败')
            pass
        cnt += 1
        sleep(1)
    return ''

H
hjdhnx 已提交
81 82 83 84 85 86
def base64Encode(text):
    return base64.b64encode(text.encode("utf8")).decode("utf-8") #base64编码

def baseDecode(text):
    return base64.b64decode(text).decode("utf-8") #base64解码

H
hjdhnx 已提交
87 88 89
def dealObj(obj=None):
    if not obj:
        obj = {}
H
hjdhnx 已提交
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
    encoding = obj.get('encoding') or 'utf-8'
    encoding = str(encoding).replace("'", "")
    # print(type(url),url)
    # headers = dict(obj.get('headers')) if obj.get('headers') else {}
    # headers = obj.get('headers').to_dict() if obj.get('headers') else {}
    headers = obj.get('headers') if obj.get('headers') else {}
    new_headers = {}
    # print(type(headers),headers)
    for i in headers:
        new_headers[str(i).replace("'", "")] = str(headers[i]).replace("'", "")
    # print(type(new_headers), new_headers)

    timeout = float(obj.get('timeout').to_int()) if obj.get('timeout') else None
    # print(type(timeout), timeout)
    body = obj.get('body') if obj.get('body') else {}
    new_body = {}
    for i in body:
        new_body[str(i).replace("'", "")] = str(body[i]).replace("'", "")
H
hjdhnx 已提交
108 109 110 111 112 113 114 115 116 117 118 119
    return {
        'encoding':encoding,
        'headers':new_headers,
        'timeout':timeout,
        'body': new_body,
    }

def base_request(url,obj,method=None):
    url = str(url).replace("'", "")
    if not method:
        method = 'get'
    # print(obj)
H
hjdhnx 已提交
120
    print(f'{method}:{url}')
H
hjdhnx 已提交
121 122 123
    try:
        # r = requests.get(url, headers=headers, params=body, timeout=timeout)
        if method.lower() == 'get':
H
hjdhnx 已提交
124
            r = requests.get(url, headers=obj['headers'], params=obj['body'], timeout=obj['timeout'])
H
hjdhnx 已提交
125
        else:
H
hjdhnx 已提交
126
            r = requests.post(url, headers=obj['headers'], data=obj['body'], timeout=obj['timeout'])
H
hjdhnx 已提交
127 128 129
        # r = requests.get(url, timeout=timeout)
        # r = requests.get(url)
        # print(encoding)
H
hjdhnx 已提交
130
        r.encoding = obj['encoding']
H
hjdhnx 已提交
131 132 133 134 135 136
        # print(f'源码:{r.text}')
        return r.text
    except Exception as e:
        print(f'{method}请求发生错误:{e}')
        return ''

H
hjdhnx 已提交
137 138 139 140
def fetch(url,obj,method=None):
    if not method:
        method = 'get'
    obj = dealObj(obj)
H
hjdhnx 已提交
141 142 143
    # print(f'{method}:{url}')
    if not obj.get('headers') or not obj['headers'].get('User-Agent'):
        obj['headers']['User-Agent'] = PC_UA
H
hjdhnx 已提交
144
    return base_request(url,obj,method)
H
hjdhnx 已提交
145 146

def post(url,obj):
H
hjdhnx 已提交
147 148 149 150 151 152 153
    obj = dealObj(obj)
    return base_request(url,obj,'post')

def request(url,obj,method=None):
    if not method:
        method = 'get'
    obj = dealObj(obj)
H
hjdhnx 已提交
154
    # print(f'{method}:{url}')
H
hjdhnx 已提交
155 156 157
    if not obj.get('headers') or not obj['headers'].get('User-Agent'):
        obj['headers']['User-Agent'] = UC_UA

H
hjdhnx 已提交
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
    return base_request(url, obj, method)

def buildUrl(url,obj=None):
    url = str(url).replace("'", "")
    if not obj:
        obj = {}
    new_obj = {}
    for i in obj:
        new_obj[str(i).replace("'", "")] = str(obj[i]).replace("'", "")
    if not str(url).endswith('?'):
        url = str(url) + '?'
    prs = '&'.join([f'{i}={obj[i]}' for i in obj])
    url = (url + prs).replace('"','').replace("'",'')
    # print(url)
    return url