ob_2_0_protocol_processor.cpp 19.5 KB
Newer Older
O
oceanbase-admin 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#define USING_LOG_PREFIX RPC_OBMYSQL

#include "rpc/obmysql/ob_2_0_protocol_processor.h"
#include "rpc/obmysql/ob_mysql_util.h"
#include "rpc/obmysql/ob_mysql_request_utils.h"
#include "rpc/obmysql/ob_2_0_protocol_struct.h"
#include "lib/checksum/ob_crc16.h"
#include "lib/checksum/ob_crc64.h"
#include "common/object/ob_object.h"
W
wangzelin.wzl 已提交
22
#include "rpc/obmysql/obsm_struct.h"
O
oceanbase-admin 已提交
23

W
wangzelin.wzl 已提交
24 25 26 27
namespace oceanbase
{
namespace obmysql
{
O
oceanbase-admin 已提交
28 29 30
using namespace oceanbase::common;
using namespace oceanbase::rpc;
using namespace oceanbase::obmysql;
W
wangzelin.wzl 已提交
31
using namespace oceanbase::observer;
O
oceanbase-admin 已提交
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
//
// OceanBase 2.0 Client/Server Protocol Format:
//
//
//   0       1         2           3          4      Byte
//   +-----------------+----------------------+
//   |    Magic Num    |       Version        |
//   +-----------------+----------------------+
//   |            Connection Id               |
//   +-----------------------------+----------+
//   |         Request Id          |   Seq    |
//   +-----------------------------+----------+
//   |            PayLoad Length              |
//   +----------------------------------------+
//   |                Flag                    |
//   +-----------------+----------------------+
//   |    Reserved     |Header Checksum(CRC16)|
//   +-----------------+----------------------+
//   |        ... PayLoad  Data ...           |----------+
//   +----------------------------------------+          |
//   |    Tailer PayLoad Checksum (CRC32)     |          |
//   +----------------------------------------+          |
//                                                       |
//                                                       |
//                            +--------------------------+
//                            |
//                            |
//                            v
//   +-------------------+-------------------+-------------------------------------+
//   | Extra Len(4Byte)  |  Extra Info(K/V)  |  Basic Info(Standard MySQL Packet)  |
//   +-------------------+-------------------+-------------------------------------+

W
wangzelin.wzl 已提交
64
int Ob20ProtocolProcessor::do_decode(ObSMConnection& conn, ObICSMemPool& pool, const char*& start, const char* end, rpc::ObPacket*& pkt, int64_t& next_read_bytes)
O
oceanbase-admin 已提交
65 66 67
{
  int ret = OB_SUCCESS;
  pkt = NULL;
W
wangzelin.wzl 已提交
68
  const uint32_t sessid = conn.sessid_;
O
oceanbase-admin 已提交
69 70 71 72
  // together with mysql compress header, all treat as packet header
  const int64_t header_size = OB20_PROTOCOL_HEADER_LENGTH + OB_MYSQL_COMPRESSED_HEADER_SIZE;

  // no need duplicated check 'm' valid, ObMySQLHandler::process() has already checked
W
wangzelin.wzl 已提交
73 74
  if ((end - start) >= header_size) {
    const char *origin_start = start;
O
oceanbase-admin 已提交
75 76 77 78 79
    Ob20ProtocolHeader header20;

    // 1. decode mysql compress header
    uint32_t pktlen = 0;
    uint8_t pktseq = 0;
W
wangzelin.wzl 已提交
80 81 82 83
    uint32_t pktlen_before_compress = 0; // here, must be 0
    ObMySQLUtil::get_uint3(start, pktlen);
    ObMySQLUtil::get_uint1(start, pktseq);
    ObMySQLUtil::get_uint3(start, pktlen_before_compress);
O
oceanbase-admin 已提交
84 85 86 87 88
    header20.cp_hdr_.comp_len_ = pktlen;
    header20.cp_hdr_.comp_seq_ = pktseq;
    header20.cp_hdr_.uncomp_len = pktlen_before_compress;

    // 2. decode proto2.0 header
W
wangzelin.wzl 已提交
89 90 91 92 93 94 95 96 97
    ObMySQLUtil::get_uint2(start, header20.magic_num_);
    ObMySQLUtil::get_uint2(start, header20.version_);
    ObMySQLUtil::get_uint4(start, header20.connection_id_);
    ObMySQLUtil::get_uint3(start, header20.request_id_);
    ObMySQLUtil::get_uint1(start, header20.pkt_seq_);
    ObMySQLUtil::get_uint4(start, header20.payload_len_);
    ObMySQLUtil::get_uint4(start, header20.flag_.flags_);
    ObMySQLUtil::get_uint2(start, header20.reserved_);
    ObMySQLUtil::get_uint2(start, header20.header_checksum_);
O
oceanbase-admin 已提交
98 99 100 101 102 103 104

    LOG_DEBUG("decode proto20 header succ", K(header20));
    // 3. crc16 for header checksum
    if (OB_FAIL(do_header_checksum(origin_start, header20))) {
      LOG_ERROR("fail to do header checksum", K(header20), K(ret));
    } else if (OB_UNLIKELY(OB20_PROTOCOL_MAGIC_NUM != header20.magic_num_)) {
      ret = OB_UNKNOWN_PACKET;
W
wangzelin.wzl 已提交
105 106
      LOG_ERROR("invalid magic num", K(OB20_PROTOCOL_MAGIC_NUM),
                K(header20.magic_num_), K(sessid), K(ret));
O
oceanbase-admin 已提交
107 108 109 110 111
    } else if (OB_UNLIKELY(sessid != header20.connection_id_)) {
      ret = OB_UNKNOWN_CONNECTION;
      LOG_ERROR("connection id mismatch", K(sessid), K_(header20.connection_id), K(ret));
    } else if (0 != pktlen_before_compress) {
      ret = OB_ERR_UNEXPECTED;
W
wangzelin.wzl 已提交
112 113
      LOG_ERROR("pktlen_before_compress must be 0 here", K(pktlen_before_compress),
                K(sessid), K(ret));
O
oceanbase-admin 已提交
114 115
    } else if (OB_UNLIKELY(OB20_PROTOCOL_VERSION_VALUE != header20.version_)) {
      ret = OB_UNKNOWN_PACKET;
W
wangzelin.wzl 已提交
116 117 118 119
      LOG_ERROR("invalid version", K(OB20_PROTOCOL_VERSION_VALUE),
                K(header20.version_), K(sessid), K(ret));
    } else if (OB_UNLIKELY(pktlen !=
                           (header20.payload_len_ + OB20_PROTOCOL_HEADER_LENGTH + OB20_PROTOCOL_TAILER_LENGTH))) {
O
oceanbase-admin 已提交
120 121
      // must only contain one ob20 packet
      ret = OB_ERR_UNEXPECTED;
W
wangzelin.wzl 已提交
122 123
      LOG_ERROR("invalid pktlen len", K(pktlen), K(header20.payload_len_), K(sessid),
                K(OB20_PROTOCOL_HEADER_LENGTH), K(OB20_PROTOCOL_TAILER_LENGTH), K(ret));
O
oceanbase-admin 已提交
124 125
    } else {
      // received packet length, include tailer, but exclude packet header
W
wangzelin.wzl 已提交
126
      uint32_t rpktlen = static_cast<uint32_t>(end - start);
O
oceanbase-admin 已提交
127 128 129 130 131 132

      // one packet was not received complete
      if ((header20.payload_len_ + OB20_PROTOCOL_TAILER_LENGTH) > rpktlen) {
        int64_t delta_len = header20.payload_len_ + OB20_PROTOCOL_TAILER_LENGTH - rpktlen;
        // valid packet, but not sufficient data received by easy, tell easy read more.
        // go backward with MySQL packet header length
W
wangzelin.wzl 已提交
133 134
        start -= header_size;
        next_read_bytes = delta_len;
O
oceanbase-admin 已提交
135 136 137
        // if received at least packet completed, do payload checksum
        // delat_len == 0, recevied one packet complete
        // delta_len < 0, received more than one packet
W
wangzelin.wzl 已提交
138
      } else if (OB_FAIL(do_body_checksum(start, header20))) {
O
oceanbase-admin 已提交
139
        LOG_ERROR("fail to do body checksum", K(header20), K(sessid), K(ret));
W
wangzelin.wzl 已提交
140
      } else if (OB_FAIL(decode_ob20_body(pool, start, header20, pkt))) {
O
oceanbase-admin 已提交
141 142 143
        LOG_ERROR("fail to decode_compressed_body", K(sessid), K(header20), K(ret));
      }
    }
W
wangzelin.wzl 已提交
144 145 146
  } else {
    /* read at least a header size*/
    next_read_bytes = header_size - (end - start);
O
oceanbase-admin 已提交
147 148 149 150 151
  }

  return ret;
}

W
wangzelin.wzl 已提交
152
inline int Ob20ProtocolProcessor::do_header_checksum(const char *origin_start, const Ob20ProtocolHeader &hdr) {
O
oceanbase-admin 已提交
153 154 155 156 157 158 159
  INIT_SUCC(ret);
  if (OB_ISNULL(origin_start)) {
    ret = OB_INVALID_ARGUMENT;
    LOG_ERROR("invalid input value", KP(origin_start), K(ret));
  } else if (0 == hdr.header_checksum_) {
    // 0 means skip checksum
  } else {
W
wangzelin.wzl 已提交
160
    const char *header_start = origin_start;
O
oceanbase-admin 已提交
161 162 163 164 165
    // mysql compress header len + proto20 header(except 2 byte checksum)
    int64_t check_len = OB20_PROTOCOL_HEADER_LENGTH - 2 + OB_MYSQL_COMPRESSED_HEADER_SIZE;

    // 3. crc16 for header checksum
    uint16_t local_header_checksum = 0;
W
wangzelin.wzl 已提交
166
    local_header_checksum = ob_crc16(0, reinterpret_cast<const uint8_t *>(header_start), check_len);
O
oceanbase-admin 已提交
167 168
    if (local_header_checksum != hdr.header_checksum_) {
      ret = OB_CHECKSUM_ERROR;
W
wangzelin.wzl 已提交
169 170
      LOG_ERROR("ob 2.0 protocol header checksum error!", K(local_header_checksum),
                K(hdr.header_checksum_), K(check_len), KP(header_start), K(hdr), K(ret));
O
oceanbase-admin 已提交
171 172 173 174 175 176
    }
  }

  return ret;
}

W
wangzelin.wzl 已提交
177
inline int Ob20ProtocolProcessor::do_body_checksum(const char* buf, const Ob20ProtocolHeader &hdr) {
O
oceanbase-admin 已提交
178 179 180 181
  INIT_SUCC(ret);

  uint32_t payload_checksum = 0;
  int64_t payload_len = hdr.payload_len_;
W
wangzelin.wzl 已提交
182
  const char *payload_checksum_pos = buf + payload_len;
O
oceanbase-admin 已提交
183 184 185 186
  ObMySQLUtil::get_uint4(payload_checksum_pos, payload_checksum);
  if (0 == payload_checksum) {
    // 0 means skip checksum
  } else {
W
wangzelin.wzl 已提交
187 188
    const char *payload_start = buf;
    uint64_t local_payload_checksum = ob_crc64(payload_start, payload_len); // actual is crc32
O
oceanbase-admin 已提交
189
    local_payload_checksum = ob_crc64(payload_start, payload_len);
W
wangzelin.wzl 已提交
190
    if (OB_UNLIKELY(local_payload_checksum != payload_checksum))  {
O
oceanbase-admin 已提交
191
      ret = OB_CHECKSUM_ERROR;
W
wangzelin.wzl 已提交
192 193
      LOG_ERROR("body checksum error", K(local_payload_checksum),
                K(payload_checksum), K(hdr), K(ret));
O
oceanbase-admin 已提交
194 195 196 197 198 199
    }
  }

  return ret;
}

W
wangzelin.wzl 已提交
200 201 202 203 204
inline int Ob20ProtocolProcessor::decode_ob20_body(
    ObICSMemPool& pool,
    const char*& buf,
    const Ob20ProtocolHeader &hdr,
    ObPacket *&pkt)
O
oceanbase-admin 已提交
205 206 207
{
  INIT_SUCC(ret);
  pkt = NULL;
W
wangzelin.wzl 已提交
208 209 210 211 212 213 214 215 216 217 218 219 220
  const char *payload_start = buf;
  buf += (hdr.payload_len_ + OB20_PROTOCOL_TAILER_LENGTH);

  Ob20ExtraInfo extra_info;
  // decode extra info, if needed
  if (hdr.flag_.is_new_extra_info() &&
      OB_FAIL(decode_new_extra_info(hdr, payload_start, extra_info))) {
    LOG_WARN("fail to decode extra info", K(hdr), K(extra_info), K(ret),
                                          K(hdr.flag_.is_new_extra_info()));
  } else if (!hdr.flag_.is_new_extra_info() &&
            OB_FAIL(decode_extra_info(hdr, payload_start, extra_info))) {
    LOG_WARN("fail to decode extra info", K(hdr), K(extra_info), K(ret),
                                          K(hdr.flag_.is_new_extra_info()));
O
oceanbase-admin 已提交
221
  } else {
W
wangzelin.wzl 已提交
222 223 224 225 226 227 228 229 230 231
    Ob20Packet *pkt20 = NULL;
    if (OB_ISNULL(pkt20 = reinterpret_cast<Ob20Packet *>(pool.alloc(sizeof(Ob20Packet))))) {
      ret = OB_ALLOCATE_MEMORY_FAILED;
      LOG_ERROR("no memory available, close connection", "alloc_size", sizeof(Ob20Packet), K(ret));
    } else {
      pkt20 = new (pkt20) Ob20Packet();
      pkt20->set_content(payload_start, hdr, extra_info);
      pkt = pkt20;
    }
  }
O
oceanbase-admin 已提交
232

W
wangzelin.wzl 已提交
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
  return ret;
}

int Ob20ProtocolProcessor::decode_extra_info(const Ob20ProtocolHeader &hdr,
                                             const char*& payload_start,
                                             Ob20ExtraInfo &extra_info)
{
  int ret = OB_SUCCESS;
  if (hdr.flag_.is_extra_info_exist()) {
    // extra_info:
    // extra_len, key(ObObj), value(ObObj), key, value, ..., key, value
    ObMySQLUtil::get_uint4(payload_start, extra_info.extra_len_);
    if ((0 == extra_info.extra_len_) || (extra_info.extra_len_ > hdr.payload_len_)) {
      ret = OB_ERR_UNEXPECTED;
      LOG_ERROR("invalid extra_len", K(extra_info.extra_len_), K(hdr), K(ret));
    } else {
      int64_t pos = 0;
      const char *buf = payload_start;
      int64_t len = extra_info.extra_len_;
      while (pos < extra_info.extra_len_) {
        common::ObObj key;
        common::ObObj value;
        if (OB_FAIL(key.deserialize(buf, len, pos))) {
          LOG_WARN("fail to deserialize extra info", K(ret));
        } else if (!key.is_varchar()) {
          ret = OB_INVALID_ARGUMENT;
          LOG_WARN("invalid extra info key type", K(ret), K(key));
        } else if (OB_FAIL(value.deserialize(buf, len, pos))) {
          LOG_WARN("fail to deserialize extra info", K(ret));
        } else {
          LOG_TRACE("extra info", K(key), K(value));
          if (0 == key.get_string().case_compare(ObString("ob_trace_info"))) {
            extra_info.exist_trace_info_ = true;
            if (!value.is_varchar()) {
              ret = OB_INVALID_ARGUMENT;
              LOG_WARN("invalid extra info value type", K(ret), K(key), K(value));
            } else {
              extra_info.trace_info_ = value.get_string();
            }
          } else if (0 == key.get_string().case_compare(ObString("sess_inf"))) {
            if (!value.is_varchar()) {
              ret = OB_INVALID_ARGUMENT;
              LOG_WARN("invalid extra info value type", K(ret), K(key), K(value));
O
oceanbase-admin 已提交
276
            } else {
W
wangzelin.wzl 已提交
277 278
              extra_info.sync_sess_info_ = value.get_string();
              LOG_DEBUG("receive extra_info", KPHEX(extra_info.sync_sess_info_.ptr(),extra_info.sync_sess_info_.length()));
O
oceanbase-admin 已提交
279
            }
W
wangzelin.wzl 已提交
280 281 282 283 284 285 286 287 288
          } else if (0 == key.get_string().case_compare(ObString("full_trc"))) {
            if (!value.is_varchar()) {
              ret = OB_INVALID_ARGUMENT;
              LOG_WARN("invalid extra info value type", K(ret), K(key), K(value));
            } else {
              extra_info.full_link_trace_ = value.get_string();
            }
          } else {
            // do nothing
O
oceanbase-admin 已提交
289 290 291
          }
        }
      }
W
wangzelin.wzl 已提交
292
      payload_start += extra_info.extra_len_;
O
oceanbase-admin 已提交
293
    }
W
wangzelin.wzl 已提交
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336
  }

  return ret;
}

int Ob20ProtocolProcessor::decode_new_extra_info(const Ob20ProtocolHeader &hdr,
                            const char*& payload_start, Ob20ExtraInfo &extra_info)
{
  int ret = OB_SUCCESS;

  if (hdr.flag_.is_extra_info_exist()) {
    // new extra_info:
    // extra_len, (extra_info_key, len, val), ..., (extra_info_key, len, val)
    extra_info.reset();
    ObMySQLUtil::get_uint4(payload_start, extra_info.extra_len_);
    if ((0 == extra_info.extra_len_) || (extra_info.extra_len_ > hdr.payload_len_)) {
      ret = OB_ERR_UNEXPECTED;
      LOG_ERROR("invalid extra_len", K(extra_info.extra_len_), K(hdr), K(ret));
    } else {
      int64_t pos = 0;
      const char *buf = payload_start;
      int64_t len = extra_info.extra_len_;
      while (OB_SUCC(ret) && pos < extra_info.extra_len_) {
        int16_t extra_id = 0;
        if (OB_FAIL(ObProtoTransUtil::resolve_type(buf, len, pos, extra_id))) {
          OB_LOG(WARN,"failed to get extra_info", K(ret), KP(buf));
        } else if (FALSE_IT(pos -= sizeof(int16_t))) {
          // do nothing, reset pos to original
        } else if (extra_id <= OBP20_PROXY_MAX_TYPE && extra_id >= OBP20_SVR_END) {
          // invalid extra_id, skip it
        } else if (OB_ISNULL(svr_decoders_[extra_id-OBP20_PROXY_MAX_TYPE-1])) {
          ret = OB_ERR_UNEXPECTED;
          LOG_ERROR("get a null encoder", K(extra_id), K(extra_id-OBP20_PROXY_MAX_TYPE-1), K(ret));
        } else if (OB_FAIL(svr_decoders_[extra_id-OBP20_PROXY_MAX_TYPE-1]->
                                                    deserialize(buf, len, pos, extra_info))) {
          LOG_WARN("failed to decode", K(ret),  KP(buf), K(len), K(pos), K(extra_id));
        } else {
          // do nothing
        }
      }

      if(OB_FAIL(ret)) {
        // do nothing
O
oceanbase-admin 已提交
337
      } else {
W
wangzelin.wzl 已提交
338
        payload_start += extra_info.extra_len_;
O
oceanbase-admin 已提交
339 340 341 342 343 344
      }
    }
  }
  return ret;
}

W
wangzelin.wzl 已提交
345 346

int Ob20ProtocolProcessor::do_splice(ObSMConnection& conn, ObICSMemPool& pool, void*& pkt, bool& need_decode_more)
O
oceanbase-admin 已提交
347 348
{
  INIT_SUCC(ret);
O
obdev 已提交
349 350
  if (OB_FAIL(process_ob20_packet(conn.proto20_pkt_context_, conn.mysql_pkt_context_,
                                    conn.pkt_rec_wrapper_, pool, pkt, need_decode_more))) {
O
oceanbase-admin 已提交
351 352 353 354 355
    LOG_ERROR("fail to process_ob20_packet", K(ret));
  }
  return ret;
}

O
obdev 已提交
356 357 358 359
inline int Ob20ProtocolProcessor::process_ob20_packet(ObProto20PktContext& context,
                                                      ObMysqlPktContext &mysql_pkt_context,
                                                      obmysql::ObPacketRecordWrapper &pkt_rec_wrapper,
                                                      ObICSMemPool& pool,
W
wangzelin.wzl 已提交
360
                                                      void *&ipacket, bool &need_decode_more)
O
oceanbase-admin 已提交
361 362 363
{
  INIT_SUCC(ret);
  need_decode_more = true;
W
wangzelin.wzl 已提交
364 365
  Ob20Packet *pkt20 = NULL;
  if (OB_ISNULL(pkt20 = reinterpret_cast<Ob20Packet *>(ipacket))) {
O
oceanbase-admin 已提交
366 367
    ret = OB_ERR_UNEXPECTED;
    LOG_ERROR("ipacket is null", K(ret));
W
wangzelin.wzl 已提交
368
  } else if (context.is_multi_pkt_) {
O
oceanbase-admin 已提交
369
    // check sequence
W
wangzelin.wzl 已提交
370
    if (OB_UNLIKELY(pkt20->get_comp_seq() != static_cast<uint8_t>(context.comp_last_pkt_seq_ + 1))) {
O
oceanbase-admin 已提交
371
      ret = OB_ERR_UNEXPECTED;
W
wangzelin.wzl 已提交
372 373 374
      LOG_ERROR("comp seq is unexpected", "last_seq", context.comp_last_pkt_seq_,
                "comp_seq", pkt20->get_comp_seq(), KPC(pkt20), K(ret));
    } else if (OB_UNLIKELY(pkt20->get_seq() != static_cast<uint8_t>(context.proto20_last_pkt_seq_ + 1))) {
O
oceanbase-admin 已提交
375
      ret = OB_ERR_UNEXPECTED;
W
wangzelin.wzl 已提交
376 377 378
      LOG_ERROR("ob20 packet seq is unexpected", "last_seq", context.proto20_last_pkt_seq_,
                "current seq", pkt20->get_seq(), KPC(pkt20), K(ret));
    } else if (OB_UNLIKELY(pkt20->get_request_id() != context.proto20_last_request_id_)) {
O
oceanbase-admin 已提交
379
      ret = OB_ERR_UNEXPECTED;
W
wangzelin.wzl 已提交
380 381 382
      LOG_ERROR("ob20 packet request id is unexpected", "last request id",
                context.proto20_last_request_id_, "current request id", pkt20->get_request_id(),
                KPC(pkt20), K(ret));
O
oceanbase-admin 已提交
383 384 385 386 387
    }
  }

  if (OB_SUCC(ret)) {
    uint32_t mysql_data_size = pkt20->get_mysql_packet_len();
W
wangzelin.wzl 已提交
388
    char *mysql_data_start = const_cast<char *>(pkt20->get_cdata());
O
obdev 已提交
389 390 391
    if (pkt_rec_wrapper.enable_proto_dia()) {
      pkt_rec_wrapper.record_recieve_mysql_pkt_fragment(mysql_data_size);
    }
W
wangzelin.wzl 已提交
392 393 394 395 396 397 398 399
    if (mysql_data_size == 0) {
      // waitting for a not empty packet
      need_decode_more = true;
    } else if (OB_FAIL(process_fragment_mysql_packet(
                        mysql_pkt_context, pool, mysql_data_start,
                        mysql_data_size, ipacket, need_decode_more))) {
      LOG_ERROR("fail to process fragment mysql packet", KP(mysql_data_start),
                K(mysql_data_size), K(need_decode_more), K(ret));
400 401 402 403 404 405 406 407 408 409 410 411 412
    } else if (!context.extra_info_.exist_extra_info()
        && pkt20->get_extra_info().exist_extra_info()) {
      char* tmp_buffer = NULL;
      int64_t total_len = pkt20->get_extra_info().get_total_len();
      if (OB_ISNULL(tmp_buffer = reinterpret_cast<char *>(context.arena_.alloc(total_len)))) {
        ret = OB_ALLOCATE_MEMORY_FAILED;
        LOG_ERROR("no memory available", "alloc_size", total_len, K(ret));
      } else if (OB_FAIL(context.extra_info_.assign(pkt20->get_extra_info(), tmp_buffer, total_len))) {
        LOG_ERROR("failed to deep copy extra info", K(ret));
      }
    } else {
      // do nothing
    }
W
wangzelin.wzl 已提交
413 414 415

    if (OB_FAIL(ret)) {
      // do nothing
O
oceanbase-admin 已提交
416
    } else {
W
wangzelin.wzl 已提交
417 418 419
      context.comp_last_pkt_seq_ = pkt20->get_comp_seq();
      context.proto20_last_pkt_seq_ = pkt20->get_seq(); // remember the request proto20 seq
      context.proto20_last_request_id_ = pkt20->get_request_id(); // remember the request id
O
oceanbase-admin 已提交
420
      if (need_decode_more) {
W
wangzelin.wzl 已提交
421
        context.is_multi_pkt_ = true;
O
oceanbase-admin 已提交
422 423
      } else {
        // If a MySQL package is split into multiple 2.0 protocol packages,
W
wangzelin.wzl 已提交
424 425 426 427
        // Only after all the sub-packages have been received and the group package is completed, ipacket is set as a complete MySQL package
        // Only then can we set the flag of re-routing
        // If a request is divided into multiple MySQL packages, each MySQL package will also set the re-routing flag
        ObMySQLRawPacket *input_packet = reinterpret_cast<ObMySQLRawPacket *>(ipacket);
O
oceanbase-admin 已提交
428
        input_packet->set_can_reroute_pkt(pkt20->get_flags().is_proxy_reroute());
429
        input_packet->set_is_weak_read(pkt20->get_flags().is_weak_read());
430 431 432 433 434 435 436 437 438 439

        const int64_t t_len = context.extra_info_.get_total_len();
        char *t_buffer = NULL;
        if (OB_ISNULL(t_buffer = reinterpret_cast<char *>(pool.alloc(t_len)))) {
          ret = OB_ALLOCATE_MEMORY_FAILED;
          LOG_ERROR("no memory available", "alloc_size", t_len, K(ret));
        } else if (OB_FAIL(input_packet->extra_info_.assign(context.extra_info_, t_buffer, t_len))) {
          LOG_ERROR("failed to assign extra info", K(ret));
        }

440
        input_packet->set_txn_free_route(pkt20->get_flags().txn_free_route());
W
wangzelin.wzl 已提交
441
        context.reset();
O
oceanbase-admin 已提交
442
        // set again for sending response
W
wangzelin.wzl 已提交
443 444
        context.proto20_last_pkt_seq_ = pkt20->get_seq();
        context.proto20_last_request_id_ = pkt20->get_request_id();
O
obdev 已提交
445 446 447
        if (pkt_rec_wrapper.enable_proto_dia()) {
          pkt_rec_wrapper.record_recieve_obp20_packet(*pkt20, *input_packet);
        }
O
oceanbase-admin 已提交
448 449 450 451 452 453 454
      }
    }
  }

  return ret;
}

W
wangzelin.wzl 已提交
455 456
} // end of namespace obmysql
} // end of namespace oceanbase