ws_server.py 12.4 KB
Newer Older
P
pikaqiu 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
# -*- coding:utf-8 -*-
# create: 2022/10/20 10:23
# author: ly1102
"""
    websocket服务端,对各个api进行了转换,可以支持收发微信消息
"""
import sys
import json
import inspect
import asyncio
import comtypes
import websockets
from wxRobot import WeChatRobot, get_wechat_pid_list

TCP_HOOK_PORT = 10808  # hook的微信消息接收端口,保证不被占用了就行
SERVER_IP = '0.0.0.0'  # websocket 监听的ip
SERVER_PORT = 14514  # websocket 监听的端口
SEC_TOKEN = ''  # websocket连接需要在headers带上token: <Authorization: Bearer SEC_TOKEN>,为空则不验证


def get_methods(obj):
    return set(filter(lambda m: not m.startswith("_") and callable(getattr(obj, m)), dir(obj)))  # 获取类所有方法名称


class WebsocketServer:
    """
    同时启动TCP server监听微信的消息hook推送 和 Websocket的服务器监听
    接收到的微信消息会通过websocket推送到连接的客户端
    客户端发过来的指令也会去WeChatRobot类找到对应的方法并执行,最后返回执行结果

    接收到微信消息:WeChat         ---> TCP server       ---> Websocket server         ---> Websocket client
    发送微信指令:Websocket client ---> Websocket server ---> WeChatRobot  ---> WeChat ---> Websocket client
    """
    def __init__(self):
        self.websocket = None  # 连接成功后赋值
        self.ip = SERVER_IP
        self.port = SERVER_PORT
        self.token = SEC_TOKEN  # 用来做用户验证的,保证安全性
        self.ws_clients = {}
        self.ping_interval = 5  # 多久发送一次ping,保持连接。单位秒
        self.wx_bot_api = get_methods(wx_bot)  # 获取所有可执行方法,方便ws通过api调用

    async def handle(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
        """
        TCP Server的主函数,微信收到消息后通过Hook推送到该函数,本系统使用的短链接,即微信收到
        :param reader: tcp reader
        :param writer:  tcp writer
        :return: None
        """
        comtypes.CoInitialize()
        # address = writer.get_extra_info('peername')
        # print(f'new tcp connection: {address}')
        while True:
            ptr_data = b""
            try:
                while True:
                    # Python源码中并未找到设置eof的地方,所以无奈之下采用直接修改的方式,
                    # 使得没有接收到数据就一直保持await状态,否则一await就会自动返回空字节
                    reader._eof = False  # 这一行非常非常重要,如果不添加这个语句,会导致reader一直返回空字节
                    data = await reader.read(1024)
                    # print(reader.at_eof(), reader._eof)
                    ptr_data += data
                    if len(data) == 0 or data[-1] == 0xA:
                        break
                print(f'receive:{ptr_data}')
                try:
                    if ptr_data:
                        msg = json.loads(ptr_data.decode('utf-8'))
                        await self.msg_callback(msg)
                except json.JSONDecodeError as e:
                    print(f'JSON解码失败:{e}')
            except OSError as e0:
                print(f'OS Error: {e0}')
                break
            except Exception as e:
                print(f'TCP server未知异常:{e}')
            writer.write("200 OK".encode())
            await writer.drain()
            # 每条消息都有一条新的连接,接收到空字节的时候即为接收完成,此时应跳出循环关闭链接,
            # 否则报错Task was destroyed but it is pending!
            if ptr_data == b'':
                break
        writer.close()
        comtypes.CoUninitialize()

    async def msg_callback(self, data):
        """
        回调函数,收到TCP Server的消息后,通过该函数对内容进行处理和转发到websocket客户端
        :param data: ReceiveMsgStruct
        :return: None
        """
        # 主线程中已经注入,此处禁止调用StartService和StopService
        # msg = {'pid': data.pid, 'time': data.time, 'type': data.type, 'isSendMsg': data.isSendMsg, 'wxid': data.wxid,
        #        'sendto' if data.isSendMsg else 'from': data.sender, 'message': data.message}
        msg = data
        # print(data)
        robot = comtypes.client.CreateObject("WeChatRobot.CWeChatRobot")
        event = comtypes.client.CreateObject("WeChatRobot.RobotEvent")
        wx = WeChatRobot(data['pid'], robot, event)
        userinfo = wx.GetWxUserInfo(data['wxid'])
        # print(f'userinfo: {userinfo}')
        if 'wxRemark' in userinfo and userinfo['wxRemark']:  # 有备注就显示备注名
            msg['alias'] = userinfo['wxRemark']
        else:
            msg['alias'] = userinfo['wxNumber']  # 无备注就显示微信号
        if data["isSendMsg"] == 0:
            if '@chatroom' in data['sender']:
                chatroom_info = wx.GetWxUserInfo(data['sender'])
                msg['chatroom_name'] = chatroom_info['wxNickName']
                msg['nickname'] = wx.GetChatRoomMemberNickname(data['sender'], data['wxid'])
            else:
                msg['nickname'] = userinfo['wxNickName']

        print(f'tcp server receive: {msg}')
        # for name, ws in self.ws_clients.items():  # 转发多个客户端
        #     try:
        #         await ws.send(json.dumps(msg, ensure_ascii=False))
        #     except Exception as e:
        #         print(f'转发消息到{name}ws失败:{e}')
        try:
            if self.websocket:
                await self.websocket.send(json.dumps(msg, ensure_ascii=False))
        except Exception as e:
            print(f'转发消息到ws失败:{e}')
        robot.Release()
        event.Release()

    async def start_tcp_server(self):
        """
        启动监听微信tcp发送过来的消息推送服务器
        :return:
        """
        server = await asyncio.start_server(self.handle, '127.0.0.1', TCP_HOOK_PORT)  # 开始监听微信消息
        addr = server.sockets[0].getsockname()
        print(f'TCP Serving on {addr}')
        async with server:
            await server.serve_forever()

    async def receive_connection(self, websocket, path):
        """
        接收到websocket的请求链接,然后保持连接并长期监听客户端发过来的请求
        :param websocket: websocket连接
        :param path: ws请求过来的地址 默认:/
        :return: None
        """
        # print(path)
        print(f'接收到新的连接:{websocket.remote_address} at:{path}')
        headers = dict(websocket.request_headers)  # 全部会变成小写
        if self.token:  # 如果设置了token就需要对token进行验证
            print(headers)
            if 'Authorization' not in headers or headers['Authorization'] != f'Bearer {self.token}':
                websocket.send('认证失败')
                print(f'客户端:{websocket.remote_address}token认证失败')
                websocket.close()
                return
        self.ws_clients[websocket.remote_address] = websocket
        await self.receive_ws_msg(websocket)

    async def receive_ws_msg(self, websocket):
        """
        websocket的消息接收函数,保持监听,并将客户端发过来的指令拿去调 WeChatRobot 对应的方法,比如发送消息

        接收格式为 JSON:
        {"action": "要执行的WeChatRobot函数名", "params": {"参数1": "xxx", "参数2": "xxx"}}
            例如:{ "action": "SendText", "params": {"receiver": "filehelper", "msg": "66666666666"}}
        :param websocket: websocket 链接
        :return: None
        """
        self.websocket = websocket
        while True:
            try:
                recv_text = await websocket.recv()
                print("ws server receive:", recv_text)
                try:
                    action = json.loads(recv_text)
                except json.JSONDecodeError:
                    await websocket.send('Json 解码失败,请确认json格式是否规范')
                    continue
                result = await self.call_api(action)
                try:
                    response = json.dumps(result, ensure_ascii=False)
                except TypeError as e:
                    response = json.dumps({'code': 500, 'data': None, 'message': f'json encode error: {e}'})
                await websocket.send(response)
            except websockets.ConnectionClosed:
                print(f'链接断开:{websocket.remote_address}')
                if websocket.remote_address in self.ws_clients:
                    del self.ws_clients[websocket.remote_address]
                    self.websocket = None
                return
            except Exception as e:
                print(f'未知异常:{e}')

    async def call_api(self, action):
        """
        调用wx_bot的发送消息、获取消息api

        :param action: 要执行的api,必要参数action,指明执行什么哪个方法名,params将对应参数传递进去。
        例如
            {'action': 'SendFile', params: {'receiver': 'filehelper', 'filepath': 'C:\\1.jpg'}}
        action的方法名为wxRobot.py->WeChatRobot类的方法,params则是方法名对应的参数

        :return:{
            'code': 200(成功)/500(报错),
            'data': [None, dict(), list(), object](根据函数返回值确定),
            'message': '操作成功/失败理由'
        }
        """
        if 'action' not in action:
            return {'code': 500, 'data': None, 'message': '缺少指定的action名称,请查看示例代码'}
        method_name = action['action']
        if method_name not in self.wx_bot_api:
            return {'code': 500, 'data': None, 'message': f'无{method_name}方法,请查看WeChatRobot类的方法列表'}
        params = action['params']
        func = getattr(wx_bot, method_name)
        kwargs = {}
        func_info = inspect.getfullargspec(func)
        default_length = 0
        if func_info.defaults:
            defaults = func_info.defaults
            # print(defaults)
            default_length = len(defaults)
        else:
            defaults = tuple()
        func_args = func_info.args
        args_length = len(func_args)
        for index, arg in enumerate(func_args):
            if arg == 'self':  # 方法本身的self参数不用传递
                continue
            if arg in params:
                kwargs[arg] = params[arg]
            else:
                if index < args_length - default_length:
                    print(f'缺少必要参数:{arg}')
                    return {'code': 500, 'data': None,
                            'message': f'{method_name}方法,缺少必要参数:{arg}。{{"params": {{"{arg}": "xxx"}}'}
                # print('a', index - (args_length - default_length))
                kwargs[arg] = defaults[index - (args_length - default_length)]
        # comtypes.CoInitialize()
        try:
            result = func(**kwargs)  # 调用对应api
            return {'code': 200, 'data': result, 'message': f'操作成功'}
        except Exception as e:
            return {'code': 500, 'data': None, 'message': f'执行{method_name}方法出错:{e}'}

    async def start_ws_server(self):
        """
        启动websocket监听
        :return:
        """
        async with websockets.serve(self.receive_connection, self.ip, self.port, ping_interval=self.ping_interval):
            print(f'websocket server listening on {self.ip}:{self.port}!')
            await asyncio.Future()  # run forever

    def run(self):
        """
        启动程序,开启TCP Server的监听和websocket的监听
        :return: None
        """
        server_loop = asyncio.new_event_loop()
        server_loop.create_task(self.start_tcp_server())
        server_loop.create_task(self.start_ws_server())
        # asyncio.gather(self.start_tcp_server(), self.start_ws_server())
        server_loop.run_forever()


if __name__ == '__main__':
    # 直接启动即可
    pid_list = get_wechat_pid_list()
    if not pid_list:
        print('没有发现微信进程!')
        sys.exit(-1)
    print(f'微信进程:{pid_list}')
    wx_bot = WeChatRobot(pid_list[0])
    wx_bot.StartService()
    print('StartReceiveMessage')
    wx_bot.StartReceiveMessage(port=TCP_HOOK_PORT)  # 微信的消息都会推送到TCP server来

    my_server = WebsocketServer()  # 具体配置请在文件头自己配置
    my_server.run()

    # 正常情况,以下代码不会执行
    print('StopService')
    wx_bot.StopService()