RpcClientTcp.php 4.1 KB
Newer Older
lzc828's avatar
rpc v2  
lzc828 已提交
1 2 3 4 5 6 7 8
<?php
/**
 * Created by PhpStorm.
 * User: admin
 * Date: 2018/12/4
 * Time: 17:52
 */

lzc828's avatar
lzc828 已提交
9
namespace One\Swoole {
lzc828's avatar
rpc v2  
lzc828 已提交
10

lzc828's avatar
lzc828 已提交
11
    class RpcClientTcp
lzc828's avatar
lzc828 已提交
12 13
    {
        const RPC_REMOTE_OBJ = '#RpcRemoteObj#';
lzc828's avatar
rpc v2  
lzc828 已提交
14

lzc828's avatar
lzc828 已提交
15
        private $_need_close = 0;
lzc828's avatar
rpc v2  
lzc828 已提交
16

lzc828's avatar
lzc828 已提交
17 18
        private static $_connection = null;

lzc828's avatar
lzc828 已提交
19
        private static $_is_static = 0;
lzc828's avatar
rpc v2  
lzc828 已提交
20

lzc828's avatar
lzc828 已提交
21
        protected $_rpc_server = '';
lzc828's avatar
rpc v2  
lzc828 已提交
22

lzc828's avatar
lzc828 已提交
23
        protected $_remote_class_name = '';
lzc828's avatar
rpc v2  
lzc828 已提交
24

lzc828's avatar
lzc828 已提交
25 26
        protected $_token = '';

lzc828's avatar
lzc828 已提交
27 28
        protected $_time_out = 1;

lzc828's avatar
lzc828 已提交
29
        public static $_call_id = '';
lzc828's avatar
rpc v2  
lzc828 已提交
30

lzc828's avatar
lzc828 已提交
31 32 33 34 35 36
        public function __construct(...$args)
        {
            // 这里可以设置为 requestId方便调用链跟踪
            $this->id    = self::$_call_id ? self::$_call_id : $this->uuid();
            $this->calss = $this->_remote_class_name ? $this->_remote_class_name : get_called_class();
            $this->args  = $args;
lzc828's avatar
lzc828 已提交
37 38 39
            if (self::$_connection === null) {
                self::$_connection = $this->connect();
            }
lzc828's avatar
lzc828 已提交
40
        }
lzc828's avatar
rpc v2  
lzc828 已提交
41

lzc828's avatar
lzc828 已提交
42 43 44
        public function __call($name, $arguments)
        {
            return $this->_callRpc([
lzc828's avatar
lzc828 已提交
45 46 47 48 49 50 51
                'i' => $this->id, // 分布式唯一id
                'c' => $this->calss, // 调用class
                'f' => $name, // 调用方法名称
                'a' => $arguments, // 调用方法参数
                't' => $this->args, // 构造函数参数 __construct
                's' => self::$_is_static, // 是否是静态方法
                'o' => $this->_token, // token 在中间件可获取
lzc828's avatar
lzc828 已提交
52 53
            ]);
        }
lzc828's avatar
lzc828 已提交
54

lzc828's avatar
lzc828 已提交
55 56 57 58 59 60 61
        private function uuid()
        {
            $str = uniqid('', true);
            $arr = explode('.', $str);
            $str = $arr[0] . base_convert($arr[1], 10, 16);
            $len = 32;
            while (strlen($str) <= $len) {
lzc828's avatar
lzc828 已提交
62
                $str .= bin2hex(openssl_random_pseudo_bytes(4));
lzc828's avatar
lzc828 已提交
63 64 65 66
            }
            $str = substr($str, 0, $len);
            $str = str_replace(['+', '/', '='], '', base64_encode(hex2bin($str)));
            return $str;
lzc828's avatar
rpc v2  
lzc828 已提交
67 68
        }

lzc828's avatar
lzc828 已提交
69 70 71
        private function _callRpc($data)
        {
            self::$_is_static = 0;
lzc828's avatar
rpc v2  
lzc828 已提交
72

lzc828's avatar
lzc828 已提交
73 74 75 76
            $buffer = msgpack_pack($data);
            $buffer = pack('N', 4 + strlen($buffer)) . $buffer;
            $len    = fwrite(self::$_connection, $buffer);
            if ($len !== strlen($buffer)) {
lzc828's avatar
lzc828 已提交
77
                throw new \Exception('writeToRemote fail', 11);
lzc828's avatar
lzc828 已提交
78 79
            }
            $data = msgpack_unpack($this->read());
lzc828's avatar
lzc828 已提交
80 81 82 83 84 85 86 87
            if ($data === self::RPC_REMOTE_OBJ) {
                $this->_need_close = 1;
                return $this;
            } else if (is_array($data) && isset($data['err'], $data['msg'])) {
                throw new \Exception($data['msg'], $data['err']);
            } else {
                return $data;
            }
lzc828's avatar
rpc v2  
lzc828 已提交
88 89
        }

lzc828's avatar
lzc828 已提交
90 91 92 93 94 95 96 97
        private function read()
        {
            $all_buffer = '';
            $total_len  = 4;
            $head_read  = false;
            while (1) {
                $buffer = fread(self::$_connection, 8192);
                if ($buffer === '' || $buffer === false) {
lzc828's avatar
lzc828 已提交
98
                    throw new \Exception('read from remote fail', 2);
lzc828's avatar
lzc828 已提交
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
                }
                $all_buffer .= $buffer;
                $recv_len   = strlen($all_buffer);
                if ($recv_len >= $total_len) {
                    if ($head_read) {
                        break;
                    }
                    $unpack_data = unpack('Ntotal_length', $all_buffer);
                    $total_len   = $unpack_data['total_length'];
                    if ($recv_len >= $total_len) {
                        break;
                    }
                    $head_read = true;
                }
            }
            return substr($all_buffer, 4);
        }

        private function connect()
        {
lzc828's avatar
lzc828 已提交
119
            $connection = stream_socket_client($this->_rpc_server, $code, $msg, 3);
lzc828's avatar
lzc828 已提交
120
            if (!$connection) {
lzc828's avatar
lzc828 已提交
121
                throw new \Exception($msg,3);
lzc828's avatar
lzc828 已提交
122 123 124 125 126 127
            }
            stream_set_timeout($connection, $this->_time_out);
            return $connection;

        }

lzc828's avatar
lzc828 已提交
128 129 130 131 132 133 134
        public static function __callStatic($name, $arguments)
        {
            self::$_is_static = 1;
            return (new static)->{$name}(...$arguments);
        }

    }
lzc828's avatar
rpc v2  
lzc828 已提交
135
}