ob_mysql_request_utils.h 10.4 KB
Newer Older
O
oceanbase-admin 已提交
1 2 3 4 5 6 7 8 9 10 11 12
/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

W
wangzelin.wzl 已提交
13 14
#ifndef  _OB_MYSQL_REQUEST
#define  _OB_MYSQL_REQUEST
O
oceanbase-admin 已提交
15 16 17 18
#include "io/easy_io.h"
#include "lib/ob_define.h"
#include "lib/utility/ob_print_utils.h"
#include "lib/allocator/ob_mod_define.h"
N
nroskill 已提交
19
#include "lib/allocator/page_arena.h"
O
oceanbase-admin 已提交
20 21
#include "rpc/obmysql/ob_mysql_packet.h"

W
wangzelin.wzl 已提交
22 23 24 25
namespace oceanbase
{
namespace common
{
O
oceanbase-admin 已提交
26 27
class ObDataBuffer;
class ObArenaAllocator;
W
wangzelin.wzl 已提交
28 29 30
}
namespace rpc
{
O
oceanbase-admin 已提交
31 32
class ObRequest;
}
O
obdev 已提交
33 34 35 36
namespace observer
{
class ObSMConnection;
}
O
oceanbase-admin 已提交
37

W
wangzelin.wzl 已提交
38 39
namespace obmysql
{
O
oceanbase-admin 已提交
40 41 42 43
class ObMySQLPacket;
class ObEasyBuffer;
class ObCompressionContext;

W
wangzelin.wzl 已提交
44 45 46
static const int64_t OB_PROXY_MAX_COMPRESSED_PACKET_LENGTH = (1L << 15); //32K
static const int64_t OB_MAX_COMPRESSED_PACKET_LENGTH = (1L << 20); //1M
static const int64_t MAX_COMPRESSED_BUF_SIZE = common::OB_MALLOC_BIG_BLOCK_SIZE;//2M-1k
O
oceanbase-admin 已提交
47

W
wangzelin.wzl 已提交
48 49
class ObMysqlPktContext
{
G
gm 已提交
50
public:
W
wangzelin.wzl 已提交
51 52 53 54 55 56 57
  enum ObMysqlPktReadStep {
    READ_HEADER = 0,
    READ_BODY,
    READ_COMPLETE
  };
   ObMysqlPktContext() : arena_(common::ObModIds::LIB_MULTI_PACKETS) { reset(); }
   ~ObMysqlPktContext() {}
O
oceanbase-admin 已提交
58 59 60 61 62 63 64 65 66 67 68 69 70 71
  void reset()
  {
    MEMSET(header_buf_, 0, common::OB_MYSQL_HEADER_LENGTH);
    header_buffered_len_ = 0;
    payload_buf_alloc_len_ = 0;
    payload_buf_ = NULL;
    payload_buffered_len_ = 0;
    payload_buffered_total_len_ = 0;
    payload_len_ = 0;
    last_pkt_seq_ = 0;
    curr_pkt_seq_ = 0;
    next_read_step_ = READ_HEADER;
    raw_pkt_.reset();
    is_multi_pkt_ = false;
W
wangzelin.wzl 已提交
72
    arena_.reset(); //fast free memory
O
oceanbase-admin 已提交
73 74
  }

W
wangzelin.wzl 已提交
75
  int save_fragment_mysql_packet(const char *start, const int64_t len);
O
oceanbase-admin 已提交
76

W
wangzelin.wzl 已提交
77
  static const char *get_read_step_str(const ObMysqlPktReadStep step)
O
oceanbase-admin 已提交
78 79 80 81 82 83 84 85 86 87 88 89 90
  {
    switch (step) {
      case READ_HEADER:
        return "READ_HEADER";
      case READ_BODY:
        return "READ_BODY";
      case READ_COMPLETE:
        return "READ_COMPLETE";
      default:
        return "UNKNOWN";
    }
  }

W
wangzelin.wzl 已提交
91 92 93 94
  TO_STRING_KV(K_(header_buffered_len), K_(payload_buffered_len), K_(payload_buffered_total_len),
               K_(last_pkt_seq), K_(payload_len), K_(curr_pkt_seq), K_(payload_buf_alloc_len),
               "next_read_step", get_read_step_str(next_read_step_), K_(raw_pkt),
               "used", arena_.used(), "total", arena_.total(), K_(is_multi_pkt));
O
oceanbase-admin 已提交
95

G
gm 已提交
96
public:
O
oceanbase-admin 已提交
97 98
  char header_buf_[common::OB_MYSQL_HEADER_LENGTH];
  int64_t header_buffered_len_;
W
wangzelin.wzl 已提交
99
  char *payload_buf_;
O
oceanbase-admin 已提交
100
  int64_t payload_buf_alloc_len_;
W
wangzelin.wzl 已提交
101 102
  int64_t payload_buffered_len_; // not include header
  int64_t payload_buffered_total_len_; // not include header
O
oceanbase-admin 已提交
103 104 105 106 107 108 109 110
  int64_t payload_len_;
  uint8_t last_pkt_seq_;
  uint8_t curr_pkt_seq_;
  ObMysqlPktReadStep next_read_step_;
  ObMySQLRawPacket raw_pkt_;
  bool is_multi_pkt_;
  common::ObArenaAllocator arena_;

G
gm 已提交
111
private:
O
oceanbase-admin 已提交
112 113 114
  DISALLOW_COPY_AND_ASSIGN(ObMysqlPktContext);
};

W
wangzelin.wzl 已提交
115 116
class ObCompressedPktContext
{
G
gm 已提交
117
public:
W
wangzelin.wzl 已提交
118 119
   ObCompressedPktContext() { reset(); }
   ~ObCompressedPktContext() { }
O
oceanbase-admin 已提交
120 121 122 123 124 125
  void reset()
  {
    last_pkt_seq_ = 0;
    is_multi_pkt_ = false;
  }

W
wangzelin.wzl 已提交
126 127
  TO_STRING_KV(K_(last_pkt_seq),
               K_(is_multi_pkt));
O
oceanbase-admin 已提交
128

G
gm 已提交
129
public:
O
oceanbase-admin 已提交
130 131 132
  uint8_t last_pkt_seq_;
  bool is_multi_pkt_;

G
gm 已提交
133
private:
O
oceanbase-admin 已提交
134 135 136
  DISALLOW_COPY_AND_ASSIGN(ObCompressedPktContext);
};

W
wangzelin.wzl 已提交
137 138
class ObProto20PktContext
{
G
gm 已提交
139
public:
140
  ObProto20PktContext() : arena_(common::ObModIds::LIB_MULTI_PACKETS){ reset(); }
W
wangzelin.wzl 已提交
141
  ~ObProto20PktContext() { }
O
oceanbase-admin 已提交
142 143 144 145 146 147
  void reset()
  {
    comp_last_pkt_seq_ = 0;
    is_multi_pkt_ = false;
    proto20_last_request_id_ = 0;
    proto20_last_pkt_seq_ = 0;
148 149
    extra_info_.reset();
    arena_.reset(); //fast free memory
O
oceanbase-admin 已提交
150 151
  }

W
wangzelin.wzl 已提交
152 153 154
  TO_STRING_KV(K_(comp_last_pkt_seq),
               K_(is_multi_pkt),
               K_(proto20_last_request_id),
155 156 157 158
               K_(proto20_last_pkt_seq),
               K_(extra_info),
               "used", arena_.used(),
               "total", arena_.total());
O
oceanbase-admin 已提交
159

G
gm 已提交
160
public:
O
oceanbase-admin 已提交
161 162 163 164
  uint8_t comp_last_pkt_seq_;
  bool is_multi_pkt_;
  uint32_t proto20_last_request_id_;
  uint8_t proto20_last_pkt_seq_;
165 166
  Ob20ExtraInfo extra_info_;
  common::ObArenaAllocator arena_;
O
oceanbase-admin 已提交
167

G
gm 已提交
168
private:
O
oceanbase-admin 已提交
169 170 171
  DISALLOW_COPY_AND_ASSIGN(ObProto20PktContext);
};

W
wangzelin.wzl 已提交
172 173
class ObEasyBuffer
{
G
gm 已提交
174
public:
W
wangzelin.wzl 已提交
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
  explicit ObEasyBuffer(easy_buf_t &ezbuf) : buf_(ezbuf), read_pos_(ezbuf.pos) { }
  ~ObEasyBuffer() {}
  int64_t read_avail_size() const { return buf_.last - read_pos_; }
  int64_t write_avail_size() const { return buf_.end - buf_.last; }
  int64_t proxy_read_avail_size(const char * const proxy_pos) const { return buf_.last - proxy_pos;}
  int64_t orig_data_size() const { return buf_.last - buf_.pos; }
  int64_t orig_buf_size() const { return buf_.end - buf_.pos; }
  bool is_valid() const { return (orig_buf_size() >= 0 && orig_data_size() >= 0); }
  bool is_read_avail() const { return buf_.last > read_pos_; }
  char *read_pos() const { return read_pos_; }
  char *begin() const { return buf_.pos; }
  char *last() const { return buf_.last; }
  char *end() const { return buf_.end; }
  void read(const int64_t size) { read_pos_ += size;}
  void write(const int64_t size) { buf_.last += size;}
  void fall_back(const int64_t size) { buf_.last -= size; }

  int64_t get_next_read_size(char *proxy_pos, const int64_t max_read_step)
O
oceanbase-admin 已提交
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
  {
    int64_t ret = 0;
    bool is_last_proxy_pkt = false;
    if (NULL == proxy_pos) {
      ret = read_avail_size();
    } else if (proxy_pos <= read_pos()) {
      ret = read_avail_size();
      is_last_proxy_pkt = true;
    } else {
      ret = std::min(last(), proxy_pos) - read_pos();
    }

    if (!is_last_proxy_pkt && ret > max_read_step) {
      ret = max_read_step;
    }
    return ret;
  }

W
wangzelin.wzl 已提交
211 212 213 214 215
  TO_STRING_KV(KP_(read_pos), KP(buf_.pos), KP(buf_.last), KP(buf_.end),
               "orig_buf_size", orig_buf_size(),
               "orig_data_size", orig_data_size(),
               "read_avail_size", read_avail_size(),
               "write_avail_size", write_avail_size());
O
oceanbase-admin 已提交
216

G
gm 已提交
217
public:
W
wangzelin.wzl 已提交
218 219
  easy_buf_t &buf_;
  char *read_pos_;
O
oceanbase-admin 已提交
220

G
gm 已提交
221
private:
O
oceanbase-admin 已提交
222 223 224
  DISALLOW_COPY_AND_ASSIGN(ObEasyBuffer);
};

W
wangzelin.wzl 已提交
225 226
enum ObCompressType
{
O
oceanbase-admin 已提交
227
  NO_COMPRESS = 0,
W
wangzelin.wzl 已提交
228 229 230 231 232
  DEFAULT_COMPRESS, //compress the whole buf every 1M
  PROXY_COMPRESS,   //1. compress every 32K buf,
                    //2. put error+ok/eof+ok/ok in one compressed packet, and seq=last seq
  DEFAULT_CHECKSUM, //use level 0 compress based on DEFAULT_COMPRESS
  PROXY_CHECKSUM,   //use level 0 compress based on PROXY_COMPRESS
O
oceanbase-admin 已提交
233 234
};

W
wangzelin.wzl 已提交
235 236
class ObCompressionContext
{
G
gm 已提交
237
public:
W
wangzelin.wzl 已提交
238 239 240 241 242 243 244 245 246 247 248 249 250
  ObCompressionContext() { reset(); }
  ~ObCompressionContext() {}

  void reset() { memset(this, 0, sizeof(ObCompressionContext)); }
  bool use_compress() const { return NO_COMPRESS != type_; }
  bool use_uncompress() const { return NO_COMPRESS == type_; }
  bool is_proxy_compress() const { return PROXY_COMPRESS == type_; }
  bool is_default_compress() const { return DEFAULT_COMPRESS == type_; }
  bool is_default_checksum() const { return DEFAULT_CHECKSUM == type_; }
  bool is_proxy_checksum() const { return PROXY_CHECKSUM == type_; }
  bool is_proxy_compress_based() const { return is_proxy_checksum() || is_proxy_compress(); }
  bool use_checksum() const { return is_proxy_checksum() || is_default_checksum(); }
  void update_last_pkt_pos(char *pkt_pos)
O
oceanbase-admin 已提交
251 252
  {
    if (is_proxy_compress_based() && NULL == last_pkt_pos_) {
W
wangzelin.wzl 已提交
253
      //if has updated, no need update again
O
oceanbase-admin 已提交
254 255 256 257
      last_pkt_pos_ = pkt_pos;
    }
  }

W
wangzelin.wzl 已提交
258

O
oceanbase-admin 已提交
259 260
  int64_t get_max_read_step() const
  {
W
wangzelin.wzl 已提交
261 262 263
    return (is_proxy_compress_based()
        ? OB_PROXY_MAX_COMPRESSED_PACKET_LENGTH
        : OB_MAX_COMPRESSED_PACKET_LENGTH);
O
oceanbase-admin 已提交
264 265 266 267
  }

  bool need_hold_last_pkt(const bool is_last) const
  {
W
wangzelin.wzl 已提交
268 269 270 271
    //if error(eof) + ok can not in one buf, we need hold error(eof) packet for proxy
    return (is_proxy_compress_based()
            && NULL != last_pkt_pos_
            && !is_last );
O
oceanbase-admin 已提交
272 273
  }

W
wangzelin.wzl 已提交
274
  int64_t to_string(char *buf, const int64_t buf_len) const
O
oceanbase-admin 已提交
275 276 277 278 279 280 281 282 283 284 285 286 287 288
  {
    int64_t pos = 0;
    J_OBJ_START();
    J_KV(K_(sessid), K_(type), K_(is_checksum_off), K_(seq), KP_(last_pkt_pos));
    J_COMMA();
    if (NULL != send_buf_) {
      J_KV("send_buf", ObEasyBuffer(*send_buf_));
    } else {
      J_KV(KP_(send_buf));
    }
    J_OBJ_END();
    return pos;
  }

G
gm 已提交
289
public:
O
oceanbase-admin 已提交
290 291
  ObCompressType type_;
  bool is_checksum_off_;
W
wangzelin.wzl 已提交
292 293 294
  uint8_t seq_;//compressed pkt seq
  easy_buf_t *send_buf_;
  char *last_pkt_pos_;//proxy last pkt(error+ok, eof+ok, ok)'s pos in orig_ezbuf, default is null
O
oceanbase-admin 已提交
295
  uint32_t sessid_;
O
obdev 已提交
296
  observer::ObSMConnection *conn_;
O
oceanbase-admin 已提交
297

G
gm 已提交
298
private:
O
oceanbase-admin 已提交
299 300 301
  DISALLOW_COPY_AND_ASSIGN(ObCompressionContext);
};

W
wangzelin.wzl 已提交
302 303
class ObFlushBufferParam
{
G
gm 已提交
304
public:
W
wangzelin.wzl 已提交
305 306 307 308 309 310 311
  ObFlushBufferParam(easy_buf_t &ez_buf, easy_request_t &ez_req, ObCompressionContext &context,
                     bool &conn_valid, bool &req_has_wokenup,
                     const bool pkt_has_completed)
    : orig_send_buf_(ez_buf), ez_req_(ez_req), comp_context_(context), conn_valid_(conn_valid),
      req_has_wokenup_(req_has_wokenup),
      pkt_has_completed_(pkt_has_completed)
    {}
O
oceanbase-admin 已提交
312

G
gm 已提交
313
public:
O
oceanbase-admin 已提交
314
  ObEasyBuffer orig_send_buf_;
W
wangzelin.wzl 已提交
315 316 317 318
  easy_request_t &ez_req_;
  ObCompressionContext &comp_context_;
  bool &conn_valid_;
  bool &req_has_wokenup_;
O
oceanbase-admin 已提交
319 320
  const bool pkt_has_completed_;

G
gm 已提交
321
private:
O
oceanbase-admin 已提交
322 323 324
  DISALLOW_COPY_AND_ASSIGN(ObFlushBufferParam);
};

W
wangzelin.wzl 已提交
325 326
class ObMySQLRequestUtils
{
G
gm 已提交
327
public:
O
oceanbase-admin 已提交
328 329 330
  ObMySQLRequestUtils();
  virtual ~ObMySQLRequestUtils();

W
wangzelin.wzl 已提交
331 332 333
  static int flush_buffer(ObFlushBufferParam &param);
  static int flush_compressed_buffer(bool pkt_has_completed, ObCompressionContext &comp_context, 
                                                  ObEasyBuffer &orig_send_buf, rpc::ObRequest &req);
G
gm 已提交
334
private:
W
wangzelin.wzl 已提交
335 336 337 338 339 340 341 342 343 344 345
  static void disconnect(easy_request_t &ez_req);
  static void wakeup_easy_request(easy_request_t &ez_req, bool &req_has_wokenup);
  static int check_flush_param(ObFlushBufferParam &param);
  static int consume_compressed_buffer(ObFlushBufferParam &param,
                                       const bool flush_immediately = false);
  static int reuse_compressed_buffer(ObFlushBufferParam &param, int64_t comp_buf_size,
                                     const bool is_last_flush);

  //once last flushed, ObRequest may be destroyed
  static int flush_buffer_internal(easy_buf_t *send_buf, ObFlushBufferParam &param,
                                   const bool is_last_flush);
G
gm 已提交
346
private:
O
oceanbase-admin 已提交
347 348 349 350 351
  DISALLOW_COPY_AND_ASSIGN(ObMySQLRequestUtils);
};

extern void request_finish_callback();

W
wangzelin.wzl 已提交
352 353 354
} //end of namespace obmysql
} //end of namespace oceanbase

O
oceanbase-admin 已提交
355 356

#endif