mysql_communicator.cpp 29.6 KB
Newer Older
羽飞's avatar
羽飞 已提交
1
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
羽飞's avatar
羽飞 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
         http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */

//
// Created by Wangyunlai on 2022/11/22.
//

#include <string.h>
#include <vector>

#include "common/log/log.h"
#include "common/io/io.h"
#include "net/mysql_communicator.h"
羽飞's avatar
羽飞 已提交
21
#include "net/buffered_writer.h"
羽飞's avatar
羽飞 已提交
22
#include "event/session_event.h"
羽飞's avatar
羽飞 已提交
23
#include "sql/operator/string_list_physical_operator.h"
羽飞's avatar
羽飞 已提交
24

羽飞's avatar
羽飞 已提交
25 26 27 28 29
/**
 * @brief MySQL协议相关实现
 * @defgroup MySQLProtocol
 */

羽飞's avatar
羽飞 已提交
30 31
// https://dev.mysql.com/doc/dev/mysql-server/latest/group__group__cs__capabilities__flags.html
// the flags below are negotiate by handshake packet
L
Longda Feng 已提交
32 33 34 35 36 37 38
const uint32_t CLIENT_PROTOCOL_41 = 512;
// const uint32_t CLIENT_INTERACTIVE   = 1024;  // This is an interactive client
const uint32_t CLIENT_TRANSACTIONS = 8192;          // Client knows about transactions.
const uint32_t CLIENT_SESSION_TRACK = (1UL << 23);  // Capable of handling server state change information
const uint32_t CLIENT_DEPRECATE_EOF = (1UL << 24);  // Client no longer needs EOF_Packet and will use OK_Packet instead
const uint32_t CLIENT_OPTIONAL_RESULTSET_METADATA =
    (1UL << 25);  // The client can handle optional metadata information in the resultset.
羽飞's avatar
羽飞 已提交
39
// Support optional extension for query parameters into the COM_QUERY and COM_STMT_EXECUTE packets.
L
Longda Feng 已提交
40
// const uint32_t CLIENT_QUERY_ATTRIBUTES = (1UL << 27);
羽飞's avatar
羽飞 已提交
41 42 43

// https://dev.mysql.com/doc/dev/mysql-server/latest/group__group__cs__column__definition__flags.html
// Column Definition Flags
L
Longda Feng 已提交
44 45 46 47 48 49 50
// const uint32_t NOT_NULL_FLAG  = 1;
// const uint32_t PRI_KEY_FLAG   = 2;
// const uint32_t UNIQUE_KEY_FLAG   = 4;
// const uint32_t MULTIPLE_KEY_FLAG = 8;
// const uint32_t NUM_FLAG          = 32768; // Field is num (for clients)
// const uint32_t PART_KEY_FLAG     = 16384; // Intern; Part of some key.

羽飞's avatar
羽飞 已提交
51 52 53 54 55 56 57
/**
 * @brief Resultset metadata
 * @details 这些枚举值都是从MySQL的协议中抄过来的
 * @ingroup MySQLProtocol
 */
enum ResultSetMetaData 
{
羽飞's avatar
羽飞 已提交
58 59 60 61 62
  RESULTSET_METADATA_NONE = 0,
  RESULTSET_METADATA_FULL = 1,
};

/**
羽飞's avatar
羽飞 已提交
63 64 65 66 67 68
 * @brief Column types for MySQL
 * @details 枚举值类型是从MySQL的协议中抄过来的
 * @ingroup MySQLProtocol
 */
enum enum_field_types 
{
羽飞's avatar
羽飞 已提交
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
  MYSQL_TYPE_DECIMAL,
  MYSQL_TYPE_TINY,
  MYSQL_TYPE_SHORT,
  MYSQL_TYPE_LONG,
  MYSQL_TYPE_FLOAT,
  MYSQL_TYPE_DOUBLE,
  MYSQL_TYPE_NULL,
  MYSQL_TYPE_TIMESTAMP,
  MYSQL_TYPE_LONGLONG,
  MYSQL_TYPE_INT24,
  MYSQL_TYPE_DATE,
  MYSQL_TYPE_TIME,
  MYSQL_TYPE_DATETIME,
  MYSQL_TYPE_YEAR,
  MYSQL_TYPE_NEWDATE, /**< Internal to MySQL. Not used in protocol */
  MYSQL_TYPE_VARCHAR,
  MYSQL_TYPE_BIT,
  MYSQL_TYPE_TIMESTAMP2,
  MYSQL_TYPE_DATETIME2,   /**< Internal to MySQL. Not used in protocol */
  MYSQL_TYPE_TIME2,       /**< Internal to MySQL. Not used in protocol */
  MYSQL_TYPE_TYPED_ARRAY, /**< Used for replication only */
  MYSQL_TYPE_INVALID = 243,
  MYSQL_TYPE_BOOL = 244, /**< Currently just a placeholder */
  MYSQL_TYPE_JSON = 245,
  MYSQL_TYPE_NEWDECIMAL = 246,
  MYSQL_TYPE_ENUM = 247,
  MYSQL_TYPE_SET = 248,
  MYSQL_TYPE_TINY_BLOB = 249,
  MYSQL_TYPE_MEDIUM_BLOB = 250,
  MYSQL_TYPE_LONG_BLOB = 251,
  MYSQL_TYPE_BLOB = 252,
  MYSQL_TYPE_VAR_STRING = 253,
  MYSQL_TYPE_STRING = 254,
  MYSQL_TYPE_GEOMETRY = 255
};

羽飞's avatar
羽飞 已提交
105 106 107 108 109 110 111 112 113 114 115 116 117 118
/**
 * @brief 根据MySQL协议的描述实现的数据写入函数
 * @defgroup MySQLProtocolStore
 * @note 当前仅考虑小端模式,所以当前的代码仅能运行在小端模式的机器上,比如Intel。
 */

/**
 * @brief 将数据写入到缓存中
 * 
 * @param buf  数据缓存
 * @param value 要写入的值
 * @return int 写入的字节数
 * @ingroup MySQLProtocolStore
 */
羽飞's avatar
羽飞 已提交
119 120 121 122 123 124
int store_int1(char *buf, int8_t value)
{
  *buf = value;
  return 1;
}

羽飞's avatar
羽飞 已提交
125 126 127 128 129 130 131 132
/**
 * @brief 将数据写入到缓存中
 * 
 * @param buf  数据缓存
 * @param value 要写入的值
 * @return int 写入的字节数
 * @ingroup MySQLProtocolStore
 */
羽飞's avatar
羽飞 已提交
133 134 135 136 137 138
int store_int2(char *buf, int16_t value)
{
  memcpy(buf, &value, sizeof(value));
  return 2;
}

羽飞's avatar
羽飞 已提交
139 140 141 142 143 144 145 146
/**
 * @brief 将数据写入到缓存中
 * 
 * @param buf  数据缓存
 * @param value 要写入的值
 * @return int 写入的字节数
 * @ingroup MySQLProtocolStore
 */
羽飞's avatar
羽飞 已提交
147 148 149 150 151 152
int store_int3(char *buf, int32_t value)
{
  memcpy(buf, &value, 3);
  return 3;
}

羽飞's avatar
羽飞 已提交
153 154 155 156 157 158 159 160
/**
 * @brief 将数据写入到缓存中
 * 
 * @param buf  数据缓存
 * @param value 要写入的值
 * @return int 写入的字节数
 * @ingroup MySQLProtocolStore
 */
羽飞's avatar
羽飞 已提交
161 162 163 164 165 166
int store_int4(char *buf, int32_t value)
{
  memcpy(buf, &value, 4);
  return 4;
}

羽飞's avatar
羽飞 已提交
167 168 169 170 171 172 173 174
/**
 * @brief 将数据写入到缓存中
 * 
 * @param buf  数据缓存
 * @param value 要写入的值
 * @return int 写入的字节数
 * @ingroup MySQLProtocolStore
 */
羽飞's avatar
羽飞 已提交
175 176 177 178 179 180
int store_int6(char *buf, int64_t value)
{
  memcpy(buf, &value, 6);
  return 6;
}

羽飞's avatar
羽飞 已提交
181 182 183 184 185 186 187 188
/**
 * @brief 将数据写入到缓存中
 * 
 * @param buf  数据缓存
 * @param value 要写入的值
 * @return int 写入的字节数
 * @ingroup MySQLProtocolStore
 */
羽飞's avatar
羽飞 已提交
189 190 191 192 193 194
int store_int8(char *buf, int64_t value)
{
  memcpy(buf, &value, 8);
  return 8;
}

羽飞's avatar
羽飞 已提交
195 196 197 198 199 200 201 202
/**
 * @brief 将数据写入到缓存中
 * @details 按照MySQL协议的描述,这是一个变长编码的整数,最大可以编码8个字节的整数。不同大小的数字,第一个字节的值不同。
 * @param buf  数据缓存
 * @param value 要写入的值
 * @return int 写入的字节数
 * @ingroup MySQLProtocolStore
 */
羽飞's avatar
羽飞 已提交
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
int store_lenenc_int(char *buf, uint64_t value)
{
  if (value < 251) {
    *buf = (int8_t)value;
    return 1;
  }

  if (value < (2UL << 16)) {
    *buf = 0xFC;
    memcpy(buf + 1, &value, 2);
    return 3;
  }

  if (value < (2UL << 24)) {
    *buf = 0xFD;
    memcpy(buf + 1, &value, 3);
    return 4;
  }

  *buf = 0xFE;
  memcpy(buf + 1, &value, 8);
  return 9;
}

羽飞's avatar
羽飞 已提交
227 228 229 230 231 232 233 234
/**
 * @brief 将以'\0'结尾的字符串写入到缓存中
 * 
 * @param buf  数据缓存
 * @param s 要写入的字符串
 * @return int 写入的字节数
 * @ingroup MySQLProtocolStore
 */
羽飞's avatar
羽飞 已提交
235 236 237 238 239 240 241 242 243 244 245
int store_null_terminated_string(char *buf, const char *s)
{
  if (nullptr == s || s[0] == 0) {
    return 0;
  }

  const int len = strlen(s) + 1;
  memcpy(buf, s, len);
  return len;
}

羽飞's avatar
羽飞 已提交
246 247 248 249 250 251 252 253 254
/**
 * @brief 将指定长度的字符串写入到缓存中
 * 
 * @param buf  数据缓存
 * @param s 要写入的字符串
 * @param len 字符串的长度
 * @return int 写入的字节数
 * @ingroup MySQLProtocolStore
 */
羽飞's avatar
羽飞 已提交
255 256 257 258 259 260 261 262 263 264
int store_fix_length_string(char *buf, const char *s, int len)
{
  if (len == 0) {
    return 0;
  }

  memcpy(buf, s, len);
  return len;
}

羽飞's avatar
羽飞 已提交
265 266 267 268 269 270 271 272
/**
 * @brief 按照带有长度标识的字符串写入到缓存,长度标识以变长整数编码
 * 
 * @param buf  数据缓存
 * @param s 要写入的字符串
 * @return int 写入的字节数
 * @ingroup MySQLProtocolStore
 */
羽飞's avatar
羽飞 已提交
273 274 275 276 277 278 279 280 281
int store_lenenc_string(char *buf, const char *s)
{
  int len = strlen(s);
  int pos = store_lenenc_int(buf, len);
  store_fix_length_string(buf + pos, s, len);
  return pos + len;
}

/**
羽飞's avatar
羽飞 已提交
282
 * @brief 每个包都有一个包头
羽飞's avatar
羽飞 已提交
283 284
 * @details [MySQL Basic Packet](https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_basic_packets.html)
 * [MariaDB Packet](https://mariadb.com/kb/en/0-packet/)
羽飞's avatar
羽飞 已提交
285
 * @ingroup MySQLProtocol
羽飞's avatar
羽飞 已提交
286
 */
羽飞's avatar
羽飞 已提交
287 288
struct PacketHeader 
{
L
Longda Feng 已提交
289 290
  int32_t payload_length : 24;  //! 当前packet的除掉头的长度
  int8_t sequence_id = 0;       //! 当前packet在当前处理过程中是第几个包
羽飞's avatar
羽飞 已提交
291 292
};

羽飞's avatar
羽飞 已提交
293 294
/**
 * @brief 所有的包都继承自BasePacket
羽飞's avatar
羽飞 已提交
295
 * @details 所有的包都有一个包头,所以BasePacket中包含了一个 @ref PacketHeader
羽飞's avatar
羽飞 已提交
296 297 298 299
 * @ingroup MySQLProtocol
 */
class BasePacket 
{
L
Longda Feng 已提交
300
public:
羽飞's avatar
羽飞 已提交
301 302 303 304 305 306 307 308
  PacketHeader packet_header;

  BasePacket(int8_t sequence = 0)
  {
    packet_header.sequence_id = sequence;
  }

  virtual ~BasePacket() = default;
羽飞's avatar
羽飞 已提交
309 310 311 312

  /**
   * @brief 将当前包编码成网络包
   * 
羽飞's avatar
羽飞 已提交
313 314
   * @param[in] capabilities MySQL协议中的capability标志
   * @param[out] net_packet 编码后的网络包
羽飞's avatar
羽飞 已提交
315
   */
羽飞's avatar
羽飞 已提交
316 317 318 319
  virtual RC encode(uint32_t capabilities, std::vector<char> &net_packet) const = 0;
};

/**
羽飞's avatar
羽飞 已提交
320 321 322 323
 * @brief 握手包
 * @ingroup MySQLProtocol
 * @details 先由服务端发送到客户端。
 * 这个包会交互capability与用户名密码。
羽飞's avatar
羽飞 已提交
324
 * [MySQL Handshake]https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_connection_phase_packets_protocol_handshake_v10.html
羽飞's avatar
羽飞 已提交
325
 */
羽飞's avatar
羽飞 已提交
326 327
struct HandshakeV10 : public BasePacket 
{
L
Longda Feng 已提交
328 329 330 331 332 333 334 335 336 337 338 339
  int8_t protocol = 10;
  char server_version[7] = "5.7.25";
  int32_t thread_id = 21501807;  // conn id
  char auth_plugin_data_part_1[9] =
      "12345678";                       // first 8 bytes of the plugin provided data (scramble) // and the filler
  int16_t capability_flags_1 = 0xF7DF;  // The lower 2 bytes of the Capabilities Flags
  int8_t character_set = 83;
  int16_t status_flags = 0;
  int16_t capability_flags_2 = 0x0000;
  int8_t auth_plugin_data_len = 0;
  char reserved[10] = {0};
  char auth_plugin_data_part_2[13] = "bbbbbbbbbbbb";
羽飞's avatar
羽飞 已提交
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373

  HandshakeV10(int8_t sequence = 0) : BasePacket(sequence)
  {}
  virtual ~HandshakeV10() = default;

  /**
   * https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_connection_phase_packets_protocol_handshake_v10.html
   */
  virtual RC encode(uint32_t capabilities, std::vector<char> &net_packet) const override
  {
    net_packet.resize(100);

    char *buf = net_packet.data();
    int pos = 0;
    pos += 3;

    pos += store_int1(buf + pos, packet_header.sequence_id);
    pos += store_int1(buf + pos, protocol);

    pos += store_null_terminated_string(buf + pos, server_version);
    pos += store_int4(buf + pos, thread_id);
    pos += store_null_terminated_string(buf + pos, auth_plugin_data_part_1);
    pos += store_int2(buf + pos, capability_flags_1);
    pos += store_int1(buf + pos, character_set);
    pos += store_int2(buf + pos, status_flags);
    pos += store_int2(buf + pos, capability_flags_2);
    pos += store_int1(buf + pos, auth_plugin_data_len);
    pos += store_fix_length_string(buf + pos, reserved, 10);
    pos += store_null_terminated_string(buf + pos, auth_plugin_data_part_2);

    int payload_length = pos - 4;
    store_int3(buf, payload_length);
    net_packet.resize(pos);
    LOG_TRACE("encode handshake packet with payload length=%d", payload_length);
L
Longda Feng 已提交
374

羽飞's avatar
羽飞 已提交
375 376 377 378
    return RC::SUCCESS;
  }
};

羽飞's avatar
羽飞 已提交
379 380 381 382 383 384
/**
 * @brief 响应包,在很多场景中都会使用
 * @ingroup MySQLProtocol
 */
struct OkPacket : public BasePacket 
{
L
Longda Feng 已提交
385 386 387 388 389 390
  int8_t header = 0;  // 0x00 for ok and 0xFE for EOF
  int32_t affected_rows = 0;
  int32_t last_insert_id = 0;
  int16_t status_flags = 0x22;
  int16_t warnings = 0;
  std::string info;  // human readable status information
羽飞's avatar
羽飞 已提交
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431

  OkPacket(int8_t sequence = 0) : BasePacket(sequence)
  {}
  virtual ~OkPacket() = default;

  /**
   * https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_basic_ok_packet.html
   */
  virtual RC encode(uint32_t capabilities, std::vector<char> &net_packet) const override
  {
    net_packet.resize(100);
    char *buf = net_packet.data();
    int pos = 0;

    pos += 3;
    pos += store_int1(buf + pos, packet_header.sequence_id);
    pos += store_int1(buf + pos, header);
    pos += store_lenenc_int(buf + pos, affected_rows);
    pos += store_lenenc_int(buf + pos, last_insert_id);

    if (capabilities & CLIENT_PROTOCOL_41) {
      pos += store_int2(buf + pos, status_flags);
      pos += store_int2(buf + pos, warnings);
    } else if (capabilities & CLIENT_TRANSACTIONS) {
      pos += store_int2(buf + pos, status_flags);
    }

    if (capabilities & CLIENT_SESSION_TRACK) {
      pos += store_lenenc_string(buf + pos, info.c_str());
    } else {
      pos += store_fix_length_string(buf + pos, info.c_str(), info.length());
    }

    int32_t payload_length = pos - 4;
    LOG_TRACE("encode ok packet with length=%d", payload_length);
    store_int3(buf, payload_length);
    net_packet.resize(pos);
    return RC::SUCCESS;
  }
};

羽飞's avatar
羽飞 已提交
432 433 434 435 436 437 438
/**
 * @brief EOF包
 * @ingroup MySQLProtocol
 * @details [basic_err_packet](https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_basic_err_packet.html)
 */
struct EofPacket : public BasePacket 
{
L
Longda Feng 已提交
439 440 441
  int8_t header = 0xFE;
  int16_t warnings = 0;
  int16_t status_flags = 0x22;
羽飞's avatar
羽飞 已提交
442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473

  EofPacket(int8_t sequence = 0) : BasePacket(sequence)
  {}
  virtual ~EofPacket() = default;

  virtual RC encode(uint32_t capabilities, std::vector<char> &net_packet) const override
  {
    net_packet.resize(10);
    char *buf = net_packet.data();
    int pos = 0;

    pos += 3;
    store_int1(buf + pos, packet_header.sequence_id);
    pos += 1;
    store_int1(buf + pos, header);
    pos += 1;

    if (capabilities & CLIENT_PROTOCOL_41) {
      store_int2(buf + pos, warnings);
      pos += 2;
      store_int2(buf + pos, status_flags);
      pos += 2;
    }

    int payload_length = pos - 4;
    store_int3(buf, payload_length);
    net_packet.resize(pos);

    return RC::SUCCESS;
  }
};

羽飞's avatar
羽飞 已提交
474 475 476 477 478 479 480
/**
 * @brief ERR包,出现错误时返回
 * @details [eof_packet](https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_basic_eof_packet.html)
 * @ingroup MySQLProtocol
 */
struct ErrPacket : public BasePacket 
{
L
Longda Feng 已提交
481 482 483 484 485
  int8_t header = 0xFF;
  int16_t error_code = 0;
  char sql_state_marker[1] = {'#'};
  std::string sql_state{"HY000"};
  std::string error_message;
羽飞's avatar
羽飞 已提交
486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518

  ErrPacket(int8_t sequence = 0) : BasePacket(sequence)
  {}
  virtual ~ErrPacket() = default;

  virtual RC encode(uint32_t capabilities, std::vector<char> &net_packet) const override
  {
    net_packet.resize(1000);
    char *buf = net_packet.data();
    int pos = 0;

    pos += 3;

    store_int1(buf + pos, packet_header.sequence_id);
    pos += 1;
    store_int1(buf + pos, header);
    pos += 1;
    store_int2(buf + pos, error_code);
    pos += 2;
    if (capabilities & CLIENT_PROTOCOL_41) {
      pos += store_fix_length_string(buf + pos, sql_state_marker, 1);
      pos += store_fix_length_string(buf + pos, sql_state.c_str(), 5);
    }

    pos += store_fix_length_string(buf + pos, error_message.c_str(), error_message.length());

    int payload_length = pos - 4;
    store_int3(buf, payload_length);
    net_packet.resize(pos);
    return RC::SUCCESS;
  }
};

羽飞's avatar
羽飞 已提交
519 520 521 522 523 524 525 526
/**
 * @brief MySQL客户端发过来的请求包
 * @ingroup MySQLProtocol
 * @details [MySQL Protocol Command Phase](https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_command_phase.html)
 * [MariaDB Text Protocol](https://mariadb.com/kb/en/2-text-protocol/)
 */
struct QueryPacket 
{
羽飞's avatar
羽飞 已提交
527
  PacketHeader packet_header;
L
Longda Feng 已提交
528 529
  int8_t command;     // 0x03: COM_QUERY
  std::string query;  // the text of the SQL query to execute
羽飞's avatar
羽飞 已提交
530 531 532
};

/**
羽飞's avatar
羽飞 已提交
533 534 535
 * @brief decode query packet
 * @details packet_header is not included in net_packet
 * [MySQL Protocol COM_QUERY](https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_query.html)
羽飞's avatar
羽飞 已提交
536 537 538 539 540 541 542 543 544
 */
RC decode_query_packet(std::vector<char> &net_packet, QueryPacket &query_packet)
{
  // query field is a null terminated string
  query_packet.query.assign(net_packet.data() + 1, net_packet.size() - 1);
  query_packet.query.append(1, ';');
  return RC::SUCCESS;
}

羽飞's avatar
羽飞 已提交
545 546 547 548 549
/**
 * @brief MySQL客户端连接时会发起一个"select @@version_comment"的查询,这里对这个查询进行特殊处理
 * @param[out] sql_result 生成的结果
 * @ingroup MySQLProtocol
 */
羽飞's avatar
羽飞 已提交
550
RC create_version_comment_sql_result(SqlResult *sql_result)
羽飞's avatar
羽飞 已提交
551 552 553 554 555 556 557 558 559 560
{
  TupleSchema tuple_schema;
  TupleCellSpec cell_spec("", "", "@@version_comment");
  tuple_schema.append_cell(cell_spec);

  sql_result->set_return_code(RC::SUCCESS);
  sql_result->set_tuple_schema(tuple_schema);

  const char *version_comments = "MiniOB";

羽飞's avatar
羽飞 已提交
561
  StringListPhysicalOperator *oper = new StringListPhysicalOperator();
羽飞's avatar
羽飞 已提交
562
  oper->append(version_comments);
羽飞's avatar
羽飞 已提交
563
  sql_result->set_operator(std::unique_ptr<PhysicalOperator>(oper));
羽飞's avatar
羽飞 已提交
564 565 566
  return RC::SUCCESS;
}

羽飞's avatar
羽飞 已提交
567 568 569 570 571 572 573
/**
 * @brief MySQL链接做初始化,需要进行握手和一些预处理
 * @ingroup MySQLProtocol
 * @param fd 连接描述符
 * @param session 当前的会话
 * @param addr 对端地址
 */
羽飞's avatar
羽飞 已提交
574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590
RC MysqlCommunicator::init(int fd, Session *session, const std::string &addr)
{
  // https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_connection_phase.html
  // 按照协议描述,服务端在连接建立后需要先向客户端发送握手信息
  RC rc = Communicator::init(fd, session, addr);
  if (rc != RC::SUCCESS) {
    LOG_WARN("failed to init communicator: %s", strrc(rc));
    return rc;
  }

  HandshakeV10 handshake_packet;
  rc = send_packet(handshake_packet);
  if (rc != RC::SUCCESS) {
    LOG_WARN("failed to send handshake packet to client. addr=%s, error=%s", addr.c_str(), strerror(errno));
    return rc;
  }

羽飞's avatar
羽飞 已提交
591 592
  writer_->flush();

羽飞's avatar
羽飞 已提交
593 594 595
  return rc;
}

羽飞's avatar
羽飞 已提交
596 597 598 599 600
/**
 * @brief MySQL客户端连接时会发起一个"select @@version_comment"的查询,这里对这个查询进行特殊处理
 * 
 * @param[out] need_disconnect 连接上如果出现异常,通过这个标识来判断是否需要断开连接
 */
羽飞's avatar
羽飞 已提交
601 602
RC MysqlCommunicator::handle_version_comment(bool &need_disconnect)
{
羽飞's avatar
羽飞 已提交
603 604
  SessionEvent session_event(this);
  RC rc = create_version_comment_sql_result(session_event.sql_result());
羽飞's avatar
羽飞 已提交
605 606 607 608 609 610 611 612 613
  if (rc != RC::SUCCESS) {
    LOG_WARN("failed to handle version comment. rc=%s", strrc(rc));
    return rc;
  }

  rc = write_result(&session_event, need_disconnect);
  return rc;
}

羽飞's avatar
羽飞 已提交
614 615 616 617 618
/**
 * @brief 读取客户端发过来的请求
 * 
 * @param[out] event 如果有新的请求,就会生成一个SessionEvent
 */
羽飞's avatar
羽飞 已提交
619 620 621
RC MysqlCommunicator::read_event(SessionEvent *&event)
{
  RC rc = RC::SUCCESS;
羽飞's avatar
羽飞 已提交
622 623

  /// 读取一个完整的数据包
羽飞's avatar
羽飞 已提交
624 625 626
  PacketHeader packet_header;
  int ret = common::readn(fd_, &packet_header, sizeof(packet_header));
  if (ret != 0) {
L
Longda Feng 已提交
627
    LOG_WARN("failed to read packet header. length=%d, addr=%s. error=%s",
羽飞's avatar
羽飞 已提交
628
             sizeof(packet_header), addr_.c_str(), strerror(errno));
羽飞's avatar
羽飞 已提交
629
    return RC::IOERR_READ;
羽飞's avatar
羽飞 已提交
630 631 632 633
  }

  LOG_TRACE("read packet header. length=%d, sequence_id=%d", sizeof(packet_header), packet_header.sequence_id);
  sequence_id_ = packet_header.sequence_id + 1;
L
Longda Feng 已提交
634

羽飞's avatar
羽飞 已提交
635 636 637
  std::vector<char> buf(packet_header.payload_length);
  ret = common::readn(fd_, buf.data(), packet_header.payload_length);
  if (ret != 0) {
羽飞's avatar
羽飞 已提交
638 639
    LOG_WARN("failed to read packet payload. length=%d, addr=%s, error=%s", 
             packet_header.payload_length, addr_.c_str(), strerror(errno));
羽飞's avatar
羽飞 已提交
640
    return RC::IOERR_READ;
羽飞's avatar
羽飞 已提交
641 642 643
  }

  LOG_TRACE("read packet payload length=%d", packet_header.payload_length);
L
Longda Feng 已提交
644

羽飞's avatar
羽飞 已提交
645 646
  event = nullptr;
  if (!authed_) {
羽飞's avatar
羽飞 已提交
647
    /// 还没有做过认证,就先需要完成握手阶段
L
Longda Feng 已提交
648
    uint32_t client_flag = *(uint32_t *)buf.data();  // TODO should use decode (little endian as default)
羽飞's avatar
羽飞 已提交
649 650 651 652 653 654 655 656 657
    LOG_INFO("client handshake response with capabilities flag=%d", client_flag);
    client_capabilities_flag_ = client_flag;
    // send ok packet and return
    OkPacket ok_packet;
    ok_packet.packet_header.sequence_id = sequence_id_;
    rc = send_packet(ok_packet);
    if (rc != RC::SUCCESS) {
      LOG_WARN("failed to send ok packet while auth");
    }
羽飞's avatar
羽飞 已提交
658
    writer_->flush();
羽飞's avatar
羽飞 已提交
659 660 661 662 663 664 665
    authed_ = true;
    LOG_INFO("client authed. addr=%s. rc=%s", addr_.c_str(), strrc(rc));
    return rc;
  }

  int8_t command_type = buf[0];
  LOG_TRACE("recv command from client =%d", command_type);
L
Longda Feng 已提交
666

羽飞's avatar
羽飞 已提交
667 668
  /// 已经做过握手,接收普通的消息包
  if (command_type == 0x03) {  // COM_QUERY,这是一个普通的文本请求
羽飞's avatar
羽飞 已提交
669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684
    QueryPacket query_packet;
    rc = decode_query_packet(buf, query_packet);
    if (rc != RC::SUCCESS) {
      LOG_WARN("failed to decode query packet. packet length=%ld, addr=%s, error=%s", buf.size(), addr(), strrc(rc));
      return rc;
    }

    LOG_TRACE("query command: %s", query_packet.query.c_str());
    if (query_packet.query.find("select @@version_comment") != std::string::npos) {
      bool need_disconnect;
      return handle_version_comment(need_disconnect);
    }

    event = new SessionEvent(this);
    event->set_query(query_packet.query);
  } else {
羽飞's avatar
羽飞 已提交
685
    /// 其它的非文本请求,暂时不支持
羽飞's avatar
羽飞 已提交
686 687 688 689 690 691
    OkPacket ok_packet(sequence_id_);
    rc = send_packet(ok_packet);
    if (rc != RC::SUCCESS) {
      LOG_WARN("failed to send ok packet. command=%d, addr=%s, error=%s", command_type, addr(), strrc(rc));
      return rc;
    }
羽飞's avatar
羽飞 已提交
692
    writer_->flush();
羽飞's avatar
羽飞 已提交
693 694 695 696 697 698 699
  }
  return rc;
}

RC MysqlCommunicator::write_state(SessionEvent *event, bool &need_disconnect)
{
  SqlResult *sql_result = event->sql_result();
L
Longda Feng 已提交
700

羽飞's avatar
羽飞 已提交
701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732
  const int buf_size = 2048;
  char *buf = new char[buf_size];
  const std::string &state_string = sql_result->state_string();
  if (state_string.empty()) {
    const char *result = RC::SUCCESS == sql_result->return_code() ? "SUCCESS" : "FAILURE";
    snprintf(buf, buf_size, "%s", result);
  } else {
    snprintf(buf, buf_size, "%s > %s", strrc(sql_result->return_code()), state_string.c_str());
  }

  RC rc = RC::SUCCESS;
  if (sql_result->return_code() == RC::SUCCESS) {

    OkPacket ok_packet;
    ok_packet.packet_header.sequence_id = sequence_id_++;
    ok_packet.info.assign(buf);
    rc = send_packet(ok_packet);
  } else {
    ErrPacket err_packet;
    err_packet.packet_header.sequence_id = sequence_id_++;
    err_packet.error_code = static_cast<int>(sql_result->return_code());
    err_packet.error_message = buf;
    rc = send_packet(err_packet);
  }
  if (rc != RC::SUCCESS) {
    LOG_WARN("failed to send ok packet to client. addr=%s, error=%s", addr(), strrc(rc));
    need_disconnect = true;
  } else {
    need_disconnect = false;
  }

  delete[] buf;
羽飞's avatar
羽飞 已提交
733
  writer_->flush();
羽飞's avatar
羽飞 已提交
734 735 736 737 738 739
  return rc;
}

RC MysqlCommunicator::write_result(SessionEvent *event, bool &need_disconnect)
{
  RC rc = RC::SUCCESS;
L
Longda Feng 已提交
740

羽飞's avatar
羽飞 已提交
741 742 743 744
  need_disconnect = true;
  SqlResult *sql_result = event->sql_result();
  if (nullptr == sql_result) {

羽飞's avatar
羽飞 已提交
745 746
    const char *response = "Unexpected error: no result";
    const int len = strlen(response);
L
Longda Feng 已提交
747
    OkPacket ok_packet;  // TODO if error occurs, we should send an error packet to client
羽飞's avatar
羽飞 已提交
748 749 750 751 752 753 754 755 756 757
    ok_packet.info.assign(response, len);
    rc = send_packet(ok_packet);
    if (rc != RC::SUCCESS) {
      LOG_WARN("failed to send ok packet to client. addr=%s, rc=%s", addr(), strrc(rc));
      return rc;
    }

    need_disconnect = false;
  } else {
    if (RC::SUCCESS != sql_result->return_code() || !sql_result->has_operator()) {
L
Longda Feng 已提交
758
      return write_state(event, need_disconnect);
羽飞's avatar
羽飞 已提交
759 760 761 762 763 764 765 766 767 768
    }

    // send result set
    // https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_query_response_text_resultset.html
    RC rc = sql_result->open();
    if (rc != RC::SUCCESS) {
      sql_result->set_return_code(rc);
      return write_state(event, need_disconnect);
    }

羽飞's avatar
羽飞 已提交
769 770 771 772 773 774 775 776 777
    const TupleSchema &tuple_schema = sql_result->tuple_schema();
    const int cell_num = tuple_schema.cell_num();
    if (cell_num == 0) {
      // maybe a dml that send nothing to client
    } else {

      // send metadata : Column Definition
      rc = send_column_definition(sql_result, need_disconnect);
      if (rc != RC::SUCCESS) {
羽飞's avatar
羽飞 已提交
778
        sql_result->close();
羽飞's avatar
羽飞 已提交
779 780
        return rc;
      }
羽飞's avatar
羽飞 已提交
781 782
    }

羽飞's avatar
羽飞 已提交
783
    rc = send_result_rows(sql_result, cell_num == 0, need_disconnect);
羽飞's avatar
羽飞 已提交
784
  }
L
Longda Feng 已提交
785

羽飞's avatar
羽飞 已提交
786 787 788 789
  RC close_rc = sql_result->close();
  if (rc == RC::SUCCESS) {
    rc = close_rc;
  }
羽飞's avatar
羽飞 已提交
790
  writer_->flush();
羽飞's avatar
羽飞 已提交
791 792 793 794 795 796 797 798 799 800 801 802
  return rc;
}

RC MysqlCommunicator::send_packet(const BasePacket &packet)
{
  std::vector<char> net_packet;
  RC rc = packet.encode(client_capabilities_flag_, net_packet);
  if (rc != RC::SUCCESS) {
    LOG_WARN("failed to encode ok packet. rc=%s", strrc(rc));
    return rc;
  }

羽飞's avatar
羽飞 已提交
803 804
  rc = writer_->writen(net_packet.data(), net_packet.size());
  if (OB_FAIL(rc)) {
羽飞's avatar
羽飞 已提交
805
    LOG_WARN("failed to send packet to client. addr=%s, error=%s", addr(), strerror(errno));
羽飞's avatar
羽飞 已提交
806
    return rc;
羽飞's avatar
羽飞 已提交
807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826
  }

  LOG_TRACE("send ok packet success. packet length=%d", net_packet.size());
  return rc;
}

/**
 * 发送列定义信息
 *  https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_query_response_text_resultset.html
 *  https://mariadb.com/kb/en/result-set-packets/#column-definition-packet
 *
 * 先发送当前有多少个列
 * 然后发送N个包,告诉客户端每个列的信息
 */
RC MysqlCommunicator::send_column_definition(SqlResult *sql_result, bool &need_disconnect)
{
  RC rc = RC::SUCCESS;
  const TupleSchema &tuple_schema = sql_result->tuple_schema();
  const int cell_num = tuple_schema.cell_num();

羽飞's avatar
羽飞 已提交
827 828 829 830
  if (cell_num == 0) {
    return rc;
  }

羽飞's avatar
羽飞 已提交
831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853
  std::vector<char> net_packet;
  net_packet.resize(1024);
  char *buf = net_packet.data();
  int pos = 0;

  pos += 3;
  store_int1(buf + pos, sequence_id_++);
  pos += 1;

  if (client_capabilities_flag_ & CLIENT_OPTIONAL_RESULTSET_METADATA) {
    store_int1(buf + pos, static_cast<int>(ResultSetMetaData::RESULTSET_METADATA_FULL));
    pos += 1;
    LOG_TRACE("client with optional resultset metadata");
  } else {
    LOG_TRACE("client without optional resultset metadata");
  }

  pos += store_lenenc_int(buf + pos, cell_num);

  int payload_length = pos - 4;
  store_int3(buf, payload_length);
  net_packet.resize(pos);

羽飞's avatar
羽飞 已提交
854 855
  rc = writer_->writen(net_packet.data(), net_packet.size());
  if (OB_FAIL(rc)) {
羽飞's avatar
羽飞 已提交
856 857
    LOG_WARN("failed to send column count to client. addr=%s, error=%s", addr(), strerror(errno));
    need_disconnect = true;
羽飞's avatar
羽飞 已提交
858
    return rc;
羽飞's avatar
羽飞 已提交
859 860 861 862 863 864 865 866 867 868 869 870
  }

  for (int i = 0; i < cell_num; i++) {
    net_packet.resize(1024);
    buf = net_packet.data();
    pos = 0;

    pos += 3;
    store_int1(buf + pos, sequence_id_++);
    pos += 1;

    const TupleCellSpec &spec = tuple_schema.cell_at(i);
L
Longda Feng 已提交
871 872 873
    const char *catalog = "def";  // The catalog used. Currently always "def"
    const char *schema = "sys";   // schema name
    const char *table = spec.table_name();
羽飞's avatar
羽飞 已提交
874 875
    const char *org_table = spec.table_name();
    const char *name = spec.alias();
L
Longda Feng 已提交
876
    // const char *org_name = spec.field_name();
羽飞's avatar
羽飞 已提交
877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901
    const char *org_name = spec.alias();
    int fixed_len_fields = 0x0c;
    int character_set = 33;
    int column_length = 16384;
    int type = MYSQL_TYPE_VAR_STRING;
    int16_t flags = 0;
    int8_t decimals = 0x1f;

    pos += store_lenenc_string(buf + pos, catalog);
    pos += store_lenenc_string(buf + pos, schema);
    pos += store_lenenc_string(buf + pos, table);
    pos += store_lenenc_string(buf + pos, org_table);
    pos += store_lenenc_string(buf + pos, name);
    pos += store_lenenc_string(buf + pos, org_name);
    pos += store_lenenc_int(buf + pos, fixed_len_fields);
    store_int2(buf + pos, character_set);
    pos += 2;
    store_int4(buf + pos, column_length);
    pos += 4;
    store_int1(buf + pos, type);
    pos += 1;
    store_int2(buf + pos, flags);
    pos += 2;
    store_int1(buf + pos, decimals);
    pos += 1;
L
Longda Feng 已提交
902
    store_int2(buf + pos, 0);  // 按照mariadb的文档描述,最后还有一个unused字段int<2>,不过mysql的文档没有给出这样的描述
羽飞's avatar
羽飞 已提交
903 904 905 906 907 908
    pos += 2;

    payload_length = pos - 4;
    store_int3(buf, payload_length);
    net_packet.resize(pos);

羽飞's avatar
羽飞 已提交
909 910
    rc = writer_->writen(net_packet.data(), net_packet.size());
    if (OB_FAIL(rc)) {
羽飞's avatar
羽飞 已提交
911 912
      LOG_WARN("failed to write column definition to client. addr=%s, error=%s", addr(), strerror(errno));
      need_disconnect = true;
羽飞's avatar
羽飞 已提交
913
      return rc;
羽飞's avatar
羽飞 已提交
914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937
    }
  }

  if (!(client_capabilities_flag_ & CLIENT_DEPRECATE_EOF)) {
    EofPacket eof_packet;
    eof_packet.packet_header.sequence_id = sequence_id_++;
    eof_packet.status_flags = 0x02;
    rc = send_packet(eof_packet);
    if (rc != RC::SUCCESS) {
      need_disconnect = true;
      LOG_WARN("failed to send eof packet to client. addr=%s, error=%s", addr(), strerror(errno));
    }
  } else {
    LOG_TRACE("client use CLIENT_DEPRECATE_EOF");
  }

  LOG_TRACE("send column definition to client done");
  need_disconnect = false;
  return RC::SUCCESS;
}

/**
 * 发送每行数据
 * 一行一个包
羽飞's avatar
羽飞 已提交
938 939
 * @param no_column_def 为了特殊处理没有返回值的语句,比如insert/delete,需要做特殊处理。
 *                      这种语句只需要返回一个ok packet即可
羽飞's avatar
羽飞 已提交
940
 */
羽飞's avatar
羽飞 已提交
941
RC MysqlCommunicator::send_result_rows(SqlResult *sql_result, bool no_column_def, bool &need_disconnect)
羽飞's avatar
羽飞 已提交
942 943 944
{
  RC rc = RC::SUCCESS;
  std::vector<char> packet;
L
Longda Feng 已提交
945
  packet.resize(4 * 1024 * 1024);  // TODO warning: length cannot be fix
羽飞's avatar
羽飞 已提交
946 947

  int affected_rows = 0;
羽飞's avatar
羽飞 已提交
948 949
  Tuple *tuple = nullptr;
  while (RC::SUCCESS == (rc = sql_result->next_tuple(tuple))) {
羽飞's avatar
羽飞 已提交
950 951 952
    assert(tuple != nullptr);

    affected_rows++;
L
Longda Feng 已提交
953

羽飞's avatar
羽飞 已提交
954 955 956 957 958
    const int cell_num = tuple->cell_num();
    if (cell_num == 0) {
      continue;
    }

羽飞's avatar
羽飞 已提交
959 960 961 962 963 964 965
    // https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_query_response_text_resultset.html
    // https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_query_response_text_resultset_row.html
    // note: if some field is null, send a 0xFB
    char *buf = packet.data();
    int pos = 0;

    pos += 3;
羽飞's avatar
羽飞 已提交
966
    pos += store_int1(buf + pos, sequence_id_++);
羽飞's avatar
羽飞 已提交
967 968 969 970 971 972

    TupleCell tuple_cell;
    for (int i = 0; i < cell_num; i++) {
      rc = tuple->cell_at(i, tuple_cell);
      if (rc != RC::SUCCESS) {
        sql_result->set_return_code(rc);
L
Longda Feng 已提交
973
        break;  // TODO send error packet
羽飞's avatar
羽飞 已提交
974 975 976 977
      }

      std::stringstream ss;
      tuple_cell.to_string(ss);
L
Longda Feng 已提交
978
      pos += store_lenenc_string(buf + pos, ss.str().c_str());
羽飞's avatar
羽飞 已提交
979 980 981 982
    }

    int payload_length = pos - 4;
    store_int3(buf, payload_length);
羽飞's avatar
羽飞 已提交
983 984
    rc = writer_->writen(buf, pos);
    if (OB_FAIL(rc)) {
羽飞's avatar
羽飞 已提交
985 986
      LOG_WARN("failed to send row packet to client. addr=%s, error=%s", addr(), strerror(errno));
      need_disconnect = true;
羽飞's avatar
羽飞 已提交
987
      return rc;
羽飞's avatar
羽飞 已提交
988 989 990 991
    }
  }

  // 所有行发送完成后,发送一个EOF或OK包
羽飞's avatar
羽飞 已提交
992 993
  if ((client_capabilities_flag_ & CLIENT_DEPRECATE_EOF) || no_column_def) {
    LOG_TRACE("client has CLIENT_DEPRECATE_EOF or has empty column, send ok packet");
羽飞's avatar
羽飞 已提交
994 995
    OkPacket ok_packet;
    ok_packet.packet_header.sequence_id = sequence_id_++;
羽飞's avatar
羽飞 已提交
996
    ok_packet.affected_rows = affected_rows;
羽飞's avatar
羽飞 已提交
997 998
    rc = send_packet(ok_packet);
  } else {
羽飞's avatar
羽飞 已提交
999
    LOG_TRACE("send eof packet to client");
羽飞's avatar
羽飞 已提交
1000 1001 1002 1003 1004 1005 1006 1007 1008
    EofPacket eof_packet;
    eof_packet.packet_header.sequence_id = sequence_id_++;
    rc = send_packet(eof_packet);
  }

  LOG_TRACE("send rows to client done");
  need_disconnect = false;
  return rc;
}