From b033cdfe5bde530765deefa9c0a4424cc4a283d9 Mon Sep 17 00:00:00 2001 From: wgs13579 Date: Thu, 18 Mar 2021 17:13:16 +0800 Subject: [PATCH] support pieceInfo --- src/obproxy/proxy/mysql/Makefile.am | 3 +- src/obproxy/proxy/mysql/Makemodule.am | 3 +- src/obproxy/proxy/mysql/ob_mysql_sm.cpp | 34 +- src/obproxy/proxy/mysql/ob_mysql_transact.cpp | 378 ++++++++++++------ src/obproxy/proxy/mysql/ob_mysql_transact.h | 1 + src/obproxy/proxy/mysql/ob_piece_info.h | 63 +++ .../proxy/mysqllib/ob_mysql_common_define.h | 3 + .../mysqllib/ob_mysql_compress_analyzer.cpp | 3 +- .../ob_mysql_compress_ob20_analyzer.cpp | 2 +- .../mysqllib/ob_mysql_request_analyzer.cpp | 8 +- .../proxy/mysqllib/ob_mysql_resp_analyzer.cpp | 10 +- .../proxy/mysqllib/ob_proxy_parser_utils.cpp | 6 + .../proxy/mysqllib/ob_proxy_session_info.cpp | 13 + .../proxy/mysqllib/ob_proxy_session_info.h | 26 ++ src/rpc/obmysql/ob_mysql_packet.cpp | 4 +- src/rpc/obmysql/ob_mysql_packet.h | 3 +- 16 files changed, 398 insertions(+), 162 deletions(-) create mode 100644 src/obproxy/proxy/mysql/ob_piece_info.h diff --git a/src/obproxy/proxy/mysql/Makefile.am b/src/obproxy/proxy/mysql/Makefile.am index da1a0e4..063297d 100644 --- a/src/obproxy/proxy/mysql/Makefile.am +++ b/src/obproxy/proxy/mysql/Makefile.am @@ -62,7 +62,8 @@ common_sources = \ ob_prepare_statement_struct.cpp \ ob_prepare_statement_struct.h \ ob_cursor_struct.cpp \ - ob_cursor_struct.h + ob_cursor_struct.h \ + ob_piece_info.h libmysqlproxy_a_SOURCES = ${common_sources} diff --git a/src/obproxy/proxy/mysql/Makemodule.am b/src/obproxy/proxy/mysql/Makemodule.am index 34b93e5..dd145e1 100644 --- a/src/obproxy/proxy/mysql/Makemodule.am +++ b/src/obproxy/proxy/mysql/Makemodule.am @@ -32,4 +32,5 @@ obproxy/proxy/mysql/ob_mysql_vctable.h\ obproxy/proxy/mysql/ob_prepare_statement_struct.cpp\ obproxy/proxy/mysql/ob_prepare_statement_struct.h\ obproxy/proxy/mysql/ob_cursor_struct.cpp\ -obproxy/proxy/mysql/ob_cursor_struct.h +obproxy/proxy/mysql/ob_cursor_struct.h\ +obproxy/proxy/mysql/ob_piece_info.h diff --git a/src/obproxy/proxy/mysql/ob_mysql_sm.cpp b/src/obproxy/proxy/mysql/ob_mysql_sm.cpp index 2506ea8..552b8f6 100644 --- a/src/obproxy/proxy/mysql/ob_mysql_sm.cpp +++ b/src/obproxy/proxy/mysql/ob_mysql_sm.cpp @@ -1862,11 +1862,12 @@ void ObMysqlSM::analyze_mysql_request(ObMysqlAnalyzeStatus &status) } else if (OB_FAIL(analyze_ps_prepare_request())) { LOG_WARN("fail to analyze ps prepare request", K(ret)); } - } else if (OB_MYSQL_COM_STMT_EXECUTE == req_cmd) { + } else if (OB_MYSQL_COM_STMT_EXECUTE == req_cmd + || OB_MYSQL_COM_STMT_SEND_PIECE_DATA == req_cmd) { if (OB_FAIL(analyze_ps_execute_request())) { - LOG_WARN("fail to analyze ps execute request", K(ret)); + LOG_WARN("fail to analyze ps execute request", K(ret), K(req_cmd)); } - } else if (OB_MYSQL_COM_STMT_FETCH == req_cmd) { + } else if (OB_MYSQL_COM_STMT_FETCH == req_cmd || OB_MYSQL_COM_STMT_GET_PIECE_DATA == req_cmd) { if (OB_FAIL(analyze_fetch_request())) { LOG_WARN("fail to analyze fetch request", K(ret)); } @@ -1883,7 +1884,8 @@ void ObMysqlSM::analyze_mysql_request(ObMysqlAnalyzeStatus &status) } } else if (ANALYZE_CONT == status) { // large request means we have received enough packet(> request_buffer_len_) - if (OB_MYSQL_COM_STMT_EXECUTE == req_cmd && client_request.is_large_request()) { + if ((OB_MYSQL_COM_STMT_EXECUTE == req_cmd || OB_MYSQL_COM_STMT_SEND_PIECE_DATA == req_cmd) + && client_request.is_large_request()) { if (OB_FAIL(analyze_ps_execute_request())) { LOG_WARN("fail to analyze ps execute request", K(ret)); } @@ -2002,8 +2004,8 @@ int ObMysqlSM::analyze_ps_execute_request() LOG_WARN("com_stmt_execute packet is empty", K(ret)); } else { const char *pos = data.ptr() + MYSQL_NET_META_LENGTH; - int32_t ps_id = 0; - ObMySQLUtil::get_int4(pos, ps_id); + uint32_t ps_id = 0; + ObMySQLUtil::get_uint4(pos, ps_id); ObPsEntry *entry = NULL; if (OB_ISNULL(entry = session_info.get_ps_entry(ps_id)) || !entry->is_valid()) { ret = OB_ERR_UNEXPECTED; @@ -2165,8 +2167,8 @@ int ObMysqlSM::analyze_ps_prepare_execute_request() LOG_WARN("com_stmt_execute packet is empty", K(ret)); } else { const char *pos = data.ptr() + MYSQL_NET_META_LENGTH; - int32_t ps_id = 0; - ObMySQLUtil::get_int4(pos, ps_id); + uint32_t ps_id = 0; + ObMySQLUtil::get_uint4(pos, ps_id); if (0 == ps_id) { session_info.set_client_ps_id(client_session_->inc_and_get_ps_id()); } else { @@ -3028,6 +3030,7 @@ int ObMysqlSM::state_server_request_send(int event, void *data) ss_info.remove_ps_id_pair(client_ps_id); ss_info.remove_cursor_id_pair(client_ps_id); cs_info.remove_cursor_id_addr(client_ps_id); + cs_info.remove_piece_info(client_ps_id); if (NULL != ps_id_addrs) { ps_id_addrs->remove_addr(server_session_->get_netvc()->get_remote_addr()); } @@ -5034,7 +5037,8 @@ inline int ObMysqlSM::do_internal_observer_open() trans_state_.current_.send_action_ = ObMysqlTransact::SERVER_SEND_LAST_INSERT_ID; } else if (trans_state_.is_hold_start_trans_) { trans_state_.current_.send_action_ = ObMysqlTransact::SERVER_SEND_START_TRANS; - } else if ((OB_MYSQL_COM_STMT_EXECUTE == trans_state_.trans_info_.client_request_.get_packet_meta().cmd_) + } else if (((OB_MYSQL_COM_STMT_EXECUTE == trans_state_.trans_info_.client_request_.get_packet_meta().cmd_) + || (OB_MYSQL_COM_STMT_SEND_PIECE_DATA == trans_state_.trans_info_.client_request_.get_packet_meta().cmd_)) && client_info.need_do_prepare(server_info)) { trans_state_.current_.send_action_ = ObMysqlTransact::SERVER_SEND_PREPARE; } else if (client_info.is_text_ps_execute() && client_info.need_do_text_ps_prepare(server_info)) { @@ -6065,17 +6069,9 @@ int ObMysqlSM::setup_client_transfer(ObMysqlVCType to_vc_type) K_(sm_id), K(ret)); } else if (OB_FAIL(trans_state_.alloc_internal_buffer(MYSQL_BUFFER_SIZE))) { LOG_ERROR("fail to allocate internal buffer,", K_(sm_id), K(ret)); + } else if (OB_FAIL(ObMysqlTransact::rewrite_stmt_id(trans_state_, client_buffer_reader_ ))) { + LOG_WARN("rewrite stmt id failed", K(ret)); } else { - // rewrite stmt id for ps execute - if (obmysql::OB_MYSQL_COM_STMT_EXECUTE == trans_state_.trans_info_.sql_cmd_) { - ObServerSessionInfo &ss_info = server_session_->get_session_info(); - ObClientSessionInfo &cs_info = client_session_->get_session_info(); - uint32_t client_ps_id = cs_info.get_client_ps_id(); - // get server_ps_id by client_ps_id - uint32_t server_ps_id = ss_info.get_server_ps_id(client_ps_id); - client_buffer_reader_->replace(reinterpret_cast(&server_ps_id), sizeof(server_ps_id), MYSQL_NET_META_LENGTH); - } - // Next order of business if copy the remaining data from the // request buffer into new buffer if (OB_FAIL(trans_state_.internal_buffer_->remove_append(client_buffer_reader_, written_bytes))) { diff --git a/src/obproxy/proxy/mysql/ob_mysql_transact.cpp b/src/obproxy/proxy/mysql/ob_mysql_transact.cpp index b4f0f5e..378d973 100644 --- a/src/obproxy/proxy/mysql/ob_mysql_transact.cpp +++ b/src/obproxy/proxy/mysql/ob_mysql_transact.cpp @@ -696,7 +696,8 @@ void ObMysqlTransact::handle_oceanbase_request(ObTransState &s) s.server_info_.set_addr(ip, port); s.pll_info_.lookup_success_ = true; LOG_DEBUG("@obproxy_route_addr is set", "address", s.server_info_.addr_, K(addr)); - } else if (obmysql::OB_MYSQL_COM_STMT_FETCH == s.trans_info_.sql_cmd_) { + } else if (obmysql::OB_MYSQL_COM_STMT_FETCH == s.trans_info_.sql_cmd_ + || obmysql::OB_MYSQL_COM_STMT_GET_PIECE_DATA == s.trans_info_.sql_cmd_) { ObCursorIdAddr *cursor_id_addr = NULL; if (OB_FAIL(s.sm_->get_client_session()->get_session_info().get_cursor_id_addr(cursor_id_addr))) { LOG_WARN("fail to get client cursor id addr", K(ret)); @@ -710,6 +711,31 @@ void ObMysqlTransact::handle_oceanbase_request(ObTransState &s) s.pll_info_.lookup_success_ = true; LOG_DEBUG("succ to set cursor target addr", "address", s.server_info_.addr_, KPC(cursor_id_addr)); } + } else if (obmysql::OB_MYSQL_COM_STMT_SEND_PIECE_DATA == s.trans_info_.sql_cmd_ + || obmysql::OB_MYSQL_COM_STMT_PREPARE_EXECUTE == s.trans_info_.sql_cmd_) { + ObPieceInfo *info = NULL; + if (OB_FAIL(s.sm_->get_client_session()->get_session_info().get_piece_info(info))) { + if (OB_HASH_NOT_EXIST == ret) { + LOG_DEBUG("fail to get piece info from hash map", K(ret)); + ret = OB_SUCCESS; + // do nothing + } else { + LOG_WARN("fail to get piece info", K(ret)); + TRANSACT_RETURN(SM_ACTION_SEND_ERROR_NOOP, NULL); + } + } else if (OB_ISNULL(info)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("info is null", K(ret)); + TRANSACT_RETURN(SM_ACTION_SEND_ERROR_NOOP, NULL); + } else if (!info->is_valid()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("info is not valid", K(ret)); + TRANSACT_RETURN(SM_ACTION_SEND_ERROR_NOOP, NULL); + } else { + s.server_info_.set_addr(info->get_addr()); + s.pll_info_.lookup_success_ = true; + LOG_DEBUG("succ to set send piece data target addr", "address", s.server_info_.addr_, KPC(info)); + } } else if ((s.mysql_config_params_->is_mock_routing_mode() && !s.sm_->client_session_->is_proxy_mysql_client_) || s.mysql_config_params_->is_mysql_routing_mode()) { if (OB_FAIL(s.mysql_config_params_->get_one_test_server_addr(s.server_info_.addr_))) { @@ -738,7 +764,8 @@ void ObMysqlTransact::handle_oceanbase_request(ObTransState &s) ObMysqlServerSession *last_session = s.sm_->client_session_->get_server_session(); if (OB_LIKELY(NULL != last_session)) { - if (obmysql::OB_MYSQL_COM_STMT_FETCH == s.trans_info_.sql_cmd_) { + if (obmysql::OB_MYSQL_COM_STMT_FETCH == s.trans_info_.sql_cmd_ + || obmysql::OB_MYSQL_COM_STMT_GET_PIECE_DATA == s.trans_info_.sql_cmd_) { ObCursorIdAddr *cursor_id_addr = NULL; if (OB_FAIL(s.sm_->get_client_session()->get_session_info().get_cursor_id_addr(cursor_id_addr))) { LOG_WARN("fail to get client cursor id addr", K(ret)); @@ -759,6 +786,40 @@ void ObMysqlTransact::handle_oceanbase_request(ObTransState &s) "trans server", ObIpEndpoint(last_session->get_netvc()->get_remote_addr())); } + ret = OB_ERR_DISTRIBUTED_NOT_SUPPORTED; + s.inner_errcode_ = ret; + s.current_.state_ = ObMysqlTransact::INTERNAL_ERROR; + TRANSACT_RETURN(SM_ACTION_INTERNAL_NOOP, NULL); + } + } else if (obmysql::COM_STMT_SEND_PIECE_DATA == s.trans_info_.sql_cmd_) { + ObPieceInfo *info = NULL; + if (OB_FAIL(s.sm_->get_client_session()->get_session_info().get_piece_info(info))) { + if (OB_HASH_NOT_EXIST == ret) { + ret = OB_SUCCESS; + // do nothing + } else { + TRANSACT_RETURN(SM_ACTION_SEND_ERROR_NOOP, NULL); + } + } else if (NULL == info) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("info is null unexpected", K(ret)); + TRANSACT_RETURN(SM_ACTION_SEND_ERROR_NOOP, NULL); + } else if (!info->is_valid()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("info is invalid", K(ret)); + TRANSACT_RETURN(SM_ACTION_SEND_ERROR_NOOP, NULL); + } else if (OB_UNLIKELY(info->get_addr() != last_session->get_netvc()->get_remote_addr())) { + s.mysql_errcode_ = OB_ERR_DISTRIBUTED_NOT_SUPPORTED; + s.mysql_errmsg_ = "send piece info target server is not the trans server"; + int tmp_ret = OB_SUCCESS; + if (OB_UNLIKELY(OB_SUCCESS != (tmp_ret = build_error_packet(s)))) { + LOG_WARN("fail to build err packet", K(tmp_ret)); + } else { + LOG_WARN("send piece target server is not the trans server", + "send piece target server", info->get_addr(), + "trans server", ObIpEndpoint(last_session->get_netvc()->get_remote_addr())); + } + ret = OB_ERR_DISTRIBUTED_NOT_SUPPORTED; s.inner_errcode_ = ret; s.current_.state_ = ObMysqlTransact::INTERNAL_ERROR; @@ -1820,6 +1881,69 @@ inline int ObMysqlTransact::build_user_request( return ret; } +int ObMysqlTransact::rewrite_stmt_id(ObTransState &s, ObIOBufferReader *client_buffer_reader) +{ + int ret = OB_SUCCESS; + + // rewrite stmt id for ps execute + if (obmysql::OB_MYSQL_COM_STMT_EXECUTE == s.trans_info_.client_request_.get_packet_meta().cmd_ + || obmysql::OB_MYSQL_COM_STMT_SEND_PIECE_DATA == s.trans_info_.client_request_.get_packet_meta().cmd_) { + ObServerSessionInfo &ss_info = get_server_session_info(s); + ObClientSessionInfo &cs_info = get_client_session_info(s); + uint32_t client_ps_id = cs_info.get_client_ps_id(); + // get server_ps_id by client_ps_id + uint32_t server_ps_id = ss_info.get_server_ps_id(client_ps_id); + client_buffer_reader->replace(reinterpret_cast(&server_ps_id), sizeof(server_ps_id), MYSQL_NET_META_LENGTH); + } else if (obmysql::OB_MYSQL_COM_STMT_FETCH == s.trans_info_.client_request_.get_packet_meta().cmd_ + || obmysql::OB_MYSQL_COM_STMT_GET_PIECE_DATA == s.trans_info_.client_request_.get_packet_meta().cmd_) { + ObServerSessionInfo &ss_info = get_server_session_info(s); + ObClientSessionInfo &cs_info = get_client_session_info(s); + uint32_t client_cursor_id = cs_info.get_client_cursor_id(); + uint32_t server_cursor_id = 0; + // get server_cursor_id by client_cursor_id + // if no server_cursor_id, mayby server session has disconnected, disconnect client session + if (OB_FAIL(ss_info.get_server_cursor_id(client_cursor_id, server_cursor_id))) { + LOG_WARN("fail to get server cursor id", K(client_cursor_id), K(ret)); + } else { + client_buffer_reader->replace(reinterpret_cast(&server_cursor_id), sizeof(server_cursor_id), MYSQL_NET_META_LENGTH); + } + } else if (obmysql::OB_MYSQL_COM_STMT_CLOSE == s.trans_info_.client_request_.get_packet_meta().cmd_) { + ObServerSessionInfo &ss_info = get_server_session_info(s); + ObClientSessionInfo &cs_info = get_client_session_info(s); + uint32_t client_ps_id = cs_info.get_client_ps_id(); + uint32_t server_ps_id = 0; + // get server_ps_id or server_cursor_id + if (client_ps_id >= (CURSOR_ID_START)) { + if (OB_FAIL(ss_info.get_server_cursor_id(client_ps_id, server_ps_id))) { + LOG_WARN("fail to get server cursor id", "client_cursor_id", client_ps_id, K(ret)); + } + } else { + server_ps_id = ss_info.get_server_ps_id(client_ps_id); + } + + client_buffer_reader->replace(reinterpret_cast(&server_ps_id), sizeof(server_ps_id), MYSQL_NET_META_LENGTH); + } else if (obmysql::OB_MYSQL_COM_STMT_PREPARE_EXECUTE == s.trans_info_.client_request_.get_packet_meta().cmd_) { + ObClientSessionInfo &cs_info = get_client_session_info(s); + uint32_t recv_client_ps_id = cs_info.get_recv_client_ps_id(); + uint32_t client_ps_id = cs_info.get_client_ps_id(); + + /* if recv_client_ps_id == 0, first request, send to server directly + * if recv_client_ps_id ! = 0, have two case: + * 1. if first send to this server, need set stmt_id to 0 in packet + * 2. if not first send to this server, need replace server_ps_id + */ + if (0 != recv_client_ps_id) { + ObServerSessionInfo &ss_info = get_server_session_info(s); + /* if not first send to this server, get real server_ps_id + * if first send to this server, get 0 + */ + uint32_t server_ps_id = ss_info.get_server_ps_id(client_ps_id); + client_buffer_reader->replace(reinterpret_cast(&server_ps_id), sizeof(server_ps_id), MYSQL_NET_META_LENGTH); + } + } + return ret; +} + inline int ObMysqlTransact::build_oceanbase_user_request( ObTransState &s, ObIOBufferReader *client_buffer_reader, ObIOBufferReader *&reader, int64_t &request_len) @@ -1843,149 +1967,92 @@ inline int ObMysqlTransact::build_oceanbase_user_request( reader = client_buffer_reader; request_len = client_request_len; } else { - // rewrite stmt id for ps execute - if (obmysql::OB_MYSQL_COM_STMT_EXECUTE == s.trans_info_.client_request_.get_packet_meta().cmd_) { - ObServerSessionInfo &ss_info = get_server_session_info(s); - ObClientSessionInfo &cs_info = get_client_session_info(s); - uint32_t client_ps_id = cs_info.get_client_ps_id(); - // get server_ps_id by client_ps_id - uint32_t server_ps_id = ss_info.get_server_ps_id(client_ps_id); - client_buffer_reader->replace(reinterpret_cast(&server_ps_id), sizeof(server_ps_id), MYSQL_NET_META_LENGTH); - } else if (obmysql::OB_MYSQL_COM_STMT_FETCH == s.trans_info_.client_request_.get_packet_meta().cmd_) { - ObServerSessionInfo &ss_info = get_server_session_info(s); - ObClientSessionInfo &cs_info = get_client_session_info(s); - uint32_t client_cursor_id = cs_info.get_client_cursor_id(); - uint32_t server_cursor_id = 0; - // get server_cursor_id by client_cursor_id - // if no server_cursor_id, mayby server session has disconnected, disconnect client session - if (OB_FAIL(ss_info.get_server_cursor_id(client_cursor_id, server_cursor_id))) { - LOG_WARN("fail to get server cursor id", K(client_cursor_id), K(ret)); + ObIOBufferReader *request_buffer_reader = client_buffer_reader; + if (PROTOCOL_OB20 == ob_proxy_protocol || PROTOCOL_CHECKSUM == ob_proxy_protocol) { // convert standard mysql protocol to compression protocol + ObMIOBuffer *write_buffer = NULL; + uint8_t compress_seq = 0; + if (OB_ISNULL(write_buffer = new_miobuffer(MYSQL_BUFFER_SIZE))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to alloc mio_buffer", K(ret)); + } else if (OB_ISNULL(reader = write_buffer->alloc_reader())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("[ObMysqlTransact::build_user_request] failed to allocate iobuffer reader", K(ret)); + } else if (obmysql::OB_MYSQL_COM_STMT_CLOSE == s.trans_info_.client_request_.get_packet_meta().cmd_ + && OB_ISNULL(request_buffer_reader = client_buffer_reader->clone())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("[ObMysqlTransact::build_user_request] failed to clone client buffer reader", K(ret)); } else { - client_buffer_reader->replace(reinterpret_cast(&server_cursor_id), sizeof(server_cursor_id), MYSQL_NET_META_LENGTH); - } - } else if (obmysql::OB_MYSQL_COM_STMT_CLOSE == s.trans_info_.client_request_.get_packet_meta().cmd_) { - ObServerSessionInfo &ss_info = get_server_session_info(s); - ObClientSessionInfo &cs_info = get_client_session_info(s); - uint32_t client_ps_id = cs_info.get_client_ps_id(); - uint32_t server_ps_id = 0; - // get server_ps_id or server_cursor_id - if (client_ps_id >= (CURSOR_ID_START)) { - if (OB_FAIL(ss_info.get_server_cursor_id(client_ps_id, server_ps_id))) { - LOG_WARN("fail to get server cursor id", "client_cursor_id", client_ps_id, K(ret)); + if (PROTOCOL_OB20 == ob_proxy_protocol) { + const bool is_last_packet = true; + // here and handle_error_resp need have same reroute conditions + const bool need_reroute = is_need_reroute(s); + ObSEArray extro_info; + char client_ip_buf[MAX_IP_BUFFER_LEN] = "\0"; + ObMysqlClientSession *client_session = s.sm_->get_client_session(); + if (!client_session->is_proxy_mysql_client_ + && client_session->is_need_send_trace_info() + && is_last_packet) { + ObAddr client_ip = client_session->get_real_client_addr(); + if (OB_FAIL(ObProxyTraceUtils::build_client_ip(extro_info, client_ip_buf, client_ip))) { + LOG_ERROR("fail to build client ip", K(client_ip), K(ret)); + } else { + client_session->set_already_send_trace_info(true); + } + } + + if (OB_SUCC(ret)) { + if (OB_FAIL(ObProto20Utils::consume_and_compress_data( + request_buffer_reader, write_buffer, client_request_len, compress_seq, compress_seq, + s.sm_->get_server_session()->get_next_server_request_id(), + s.sm_->get_server_session()->get_server_sessid(), + is_last_packet, need_reroute, &extro_info))) { + LOG_ERROR("fail to consume_and_compress_data", K(ret)); + } + } + } else { + const bool use_fast_compress = true; + const bool is_checksum_on = s.sm_->is_checksum_on(); + if (OB_FAIL(ObMysqlAnalyzerUtils::consume_and_compress_data( + request_buffer_reader, write_buffer, client_request_len, use_fast_compress, + compress_seq, is_checksum_on))) { + LOG_WARN("fail to consume_and_compress_data", K(ret)); + } } - } else { - server_ps_id = ss_info.get_server_ps_id(client_ps_id); - } - client_buffer_reader->replace(reinterpret_cast(&server_ps_id), sizeof(server_ps_id), MYSQL_NET_META_LENGTH); - } else if (obmysql::OB_MYSQL_COM_STMT_PREPARE_EXECUTE == s.trans_info_.client_request_.get_packet_meta().cmd_) { - ObClientSessionInfo &cs_info = get_client_session_info(s); - uint32_t recv_client_ps_id = cs_info.get_recv_client_ps_id(); - uint32_t client_ps_id = cs_info.get_client_ps_id(); + if (obmysql::OB_MYSQL_COM_STMT_CLOSE == s.trans_info_.client_request_.get_packet_meta().cmd_) { + request_buffer_reader->dealloc(); + request_buffer_reader = NULL; + } - /* if recv_client_ps_id == 0, first request, send to server directly - * if recv_client_ps_id ! = 0, have two case: - * 1. if first send to this server, need set stmt_id to 0 in packet - * 2. if not first send to this server, need replace server_ps_id - */ - if (0 != recv_client_ps_id) { - ObServerSessionInfo &ss_info = get_server_session_info(s); - /* if not first send to this server, get real server_ps_id - * if first send to this server, get 0 - */ - uint32_t server_ps_id = ss_info.get_server_ps_id(client_ps_id); - client_buffer_reader->replace(reinterpret_cast(&server_ps_id), sizeof(server_ps_id), MYSQL_NET_META_LENGTH); + if (OB_SUCC(ret)) { + s.sm_->get_server_session()->set_compressed_seq(compress_seq); + request_len = reader->read_avail(); + LOG_DEBUG("build user compressed request succ", K(ob_proxy_protocol), + "origin len", client_request_len, "compress len", request_len); + } } - } + } else { + // no need compress, send directly + request_len = client_request_len; - if (OB_SUCC(ret)) { - ObIOBufferReader *request_buffer_reader = client_buffer_reader; - if (PROTOCOL_OB20 == ob_proxy_protocol || PROTOCOL_CHECKSUM == ob_proxy_protocol) { // convert standard mysql protocol to compression protocol - ObMIOBuffer *write_buffer = NULL; - uint8_t compress_seq = 0; + ObMIOBuffer *write_buffer = NULL; + if (obmysql::OB_MYSQL_COM_STMT_CLOSE == s.trans_info_.client_request_.get_packet_meta().cmd_) { + int64_t written_len = 0; if (OB_ISNULL(write_buffer = new_miobuffer(MYSQL_BUFFER_SIZE))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc mio_buffer", K(ret)); - } else if (OB_ISNULL(reader = write_buffer->alloc_reader())) { + } else if (OB_ISNULL(request_buffer_reader = write_buffer->alloc_reader())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("[ObMysqlTransact::build_user_request] failed to allocate iobuffer reader", K(ret)); - } else if (obmysql::OB_MYSQL_COM_STMT_CLOSE == s.trans_info_.client_request_.get_packet_meta().cmd_ - && OB_ISNULL(request_buffer_reader = client_buffer_reader->clone())) { + } else if (OB_FAIL(write_buffer->write(client_buffer_reader, client_request_len, written_len))) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("[ObMysqlTransact::build_user_request] failed to clone client buffer reader", K(ret)); - } else { - if (PROTOCOL_OB20 == ob_proxy_protocol) { - const bool is_last_packet = true; - // here and handle_error_resp need have same reroute conditions - const bool need_reroute = is_need_reroute(s); - ObSEArray extro_info; - char client_ip_buf[MAX_IP_BUFFER_LEN] = "\0"; - ObMysqlClientSession *client_session = s.sm_->get_client_session(); - if (!client_session->is_proxy_mysql_client_ - && client_session->is_need_send_trace_info() - && is_last_packet) { - ObAddr client_ip = client_session->get_real_client_addr(); - if (OB_FAIL(ObProxyTraceUtils::build_client_ip(extro_info, client_ip_buf, client_ip))) { - LOG_ERROR("fail to build client ip", K(client_ip), K(ret)); - } else { - client_session->set_already_send_trace_info(true); - } - } - - if (OB_SUCC(ret)) { - if (OB_FAIL(ObProto20Utils::consume_and_compress_data( - request_buffer_reader, write_buffer, client_request_len, compress_seq, compress_seq, - s.sm_->get_server_session()->get_next_server_request_id(), - s.sm_->get_server_session()->get_server_sessid(), - is_last_packet, need_reroute, &extro_info))) { - LOG_ERROR("fail to consume_and_compress_data", K(ret)); - } - } - } else { - const bool use_fast_compress = true; - const bool is_checksum_on = s.sm_->is_checksum_on(); - if (OB_FAIL(ObMysqlAnalyzerUtils::consume_and_compress_data( - request_buffer_reader, write_buffer, client_request_len, use_fast_compress, - compress_seq, is_checksum_on))) { - LOG_WARN("fail to consume_and_compress_data", K(ret)); - } - } - - if (obmysql::OB_MYSQL_COM_STMT_CLOSE == s.trans_info_.client_request_.get_packet_meta().cmd_) { - request_buffer_reader->dealloc(); - request_buffer_reader = NULL; - } - - if (OB_SUCC(ret)) { - s.sm_->get_server_session()->set_compressed_seq(compress_seq); - request_len = reader->read_avail(); - LOG_DEBUG("build user compressed request succ", K(ob_proxy_protocol), - "origin len", client_request_len, "compress len", request_len); - } - } - } else { - // no need compress, send directly - request_len = client_request_len; - - ObMIOBuffer *write_buffer = NULL; - if (obmysql::OB_MYSQL_COM_STMT_CLOSE == s.trans_info_.client_request_.get_packet_meta().cmd_) { - int64_t written_len = 0; - if (OB_ISNULL(write_buffer = new_miobuffer(MYSQL_BUFFER_SIZE))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("fail to alloc mio_buffer", K(ret)); - } else if (OB_ISNULL(request_buffer_reader = write_buffer->alloc_reader())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("[ObMysqlTransact::build_user_request] failed to allocate iobuffer reader", K(ret)); - } else if (OB_FAIL(write_buffer->write(client_buffer_reader, client_request_len, written_len))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("[ObMysqlTransact::build_user_request] fail to write com_stmt_close packet into new iobuffer", K(client_request_len), K(ret)); - } else if (OB_UNLIKELY(client_request_len != written_len)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("[ObMysqlTransact::build_user_request] written_len dismatch", K(client_request_len), K(written_len), K(ret)); - } + LOG_WARN("[ObMysqlTransact::build_user_request] fail to write com_stmt_close packet into new iobuffer", K(client_request_len), K(ret)); + } else if (OB_UNLIKELY(client_request_len != written_len)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("[ObMysqlTransact::build_user_request] written_len dismatch", K(client_request_len), K(written_len), K(ret)); } - reader = request_buffer_reader; } + reader = request_buffer_reader; } } } @@ -2437,6 +2504,11 @@ inline int ObMysqlTransact::do_handle_execute_succ(ObTransState &s) cursor_id_addr->set_addr(addr); } } + + if (OB_SUCC(ret)) { + ObClientSessionInfo &cs_info = get_client_session_info(s); + cs_info.remove_piece_info(cs_info.get_client_ps_id()); + } return ret; } @@ -3077,6 +3149,7 @@ inline void ObMysqlTransact::handle_resultset_resp(ObTransState &s) inline void ObMysqlTransact::handle_ok_resp(ObTransState &s) { + int ret = OB_SUCCESS; // in fact, we handle ok, eof, resultset and other responses in this function bool is_user_request = false; @@ -3135,6 +3208,32 @@ inline void ObMysqlTransact::handle_ok_resp(ObTransState &s) } } + if (obmysql::COM_STMT_SEND_PIECE_DATA == s.trans_info_.sql_cmd_) { + ObPieceInfo *info = NULL; + ObClientSessionInfo &cs_info = get_client_session_info(s); + if (OB_FAIL(cs_info.get_piece_info(info))) { + if (OB_HASH_NOT_EXIST == ret) { + if (OB_ISNULL(info = op_alloc(ObPieceInfo))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + PROXY_API_LOG(ERROR, "fail to allocate memory for piece info", K(ret)); + } else { + info->set_ps_id(cs_info.get_client_ps_id()); + info->set_addr(s.sm_->get_server_session()->get_netvc()->get_remote_addr()); + if (OB_FAIL(cs_info.add_piece_info(info))) { + PROXY_API_LOG(WARN, "fail to add piece info", K(ret)); + op_free(info); + info = NULL; + } + } + } + } + + if (OB_FAIL(ret)) { + s.current_.state_ = INTERNAL_ERROR; + LOG_WARN("add piece info failed", K(ret)); + } + } + if (!is_user_request) { consume_response_packet(s); } else { @@ -3694,6 +3793,9 @@ inline void ObMysqlTransact::handle_response_from_server(ObTransState &s) void ObMysqlTransact::handle_oceanbase_retry_server_connection(ObTransState &s) { + int ret = OB_SUCCESS; + bool second_in = false; + ObPieceInfo *info = NULL; ObSSRetryStatus retry_status = NO_NEED_RETRY; const int64_t max_connect_attempts = get_max_connect_attempts(s); @@ -3705,6 +3807,11 @@ void ObMysqlTransact::handle_oceanbase_retry_server_connection(ObTransState &s) obproxy_route_addr = s.sm_->client_session_->get_session_info().get_obproxy_route_addr(); } + if (OB_FAIL(s.sm_->get_client_session()->get_session_info().get_piece_info(info))) { + // do nothing + } else { + second_in = true; + } // in mysql mode, no need retry if (OB_UNLIKELY(s.mysql_config_params_->is_mysql_routing_mode())) { LOG_DEBUG("in mysql mode, no need retry, will disconnect"); @@ -3714,10 +3821,14 @@ void ObMysqlTransact::handle_oceanbase_retry_server_connection(ObTransState &s) // 1. not in transaction // 2. attempts_ is less then max_connect_attempts // 3. is not kill query + // 4. is not piece info } else if (!is_in_trans(s) && s.current_.attempts_ < max_connect_attempts && 0 == obproxy_route_addr - && !s.trans_info_.client_request_.is_kill_query()) { + && !s.trans_info_.client_request_.is_kill_query() + && obmysql::COM_STMT_FETCH != s.trans_info_.sql_cmd_ + && obmysql::COM_STMT_GET_PIECE_DATA != s.trans_info_.sql_cmd_ + && !second_in) { ++s.current_.attempts_; LOG_DEBUG("start next retry"); @@ -4097,7 +4208,8 @@ void ObMysqlTransact::handle_on_forward_server_response(ObTransState &s) s.current_.send_action_ = SERVER_SEND_LAST_INSERT_ID; } else if (s.is_hold_start_trans_) { s.current_.send_action_ = SERVER_SEND_START_TRANS; - } else if ((obmysql::OB_MYSQL_COM_STMT_EXECUTE == s.trans_info_.client_request_.get_packet_meta().cmd_) + } else if (((obmysql::OB_MYSQL_COM_STMT_EXECUTE == s.trans_info_.client_request_.get_packet_meta().cmd_) || + (obmysql::OB_MYSQL_COM_STMT_SEND_PIECE_DATA == s.trans_info_.client_request_.get_packet_meta().cmd_)) && client_info.need_do_prepare(server_info)) { s.current_.send_action_ = SERVER_SEND_PREPARE; } else if (client_info.is_text_ps_execute() && client_info.need_do_text_ps_prepare(server_info)) { diff --git a/src/obproxy/proxy/mysql/ob_mysql_transact.h b/src/obproxy/proxy/mysql/ob_mysql_transact.h index 6958c09..1108db4 100644 --- a/src/obproxy/proxy/mysql/ob_mysql_transact.h +++ b/src/obproxy/proxy/mysql/ob_mysql_transact.h @@ -839,6 +839,7 @@ public: static int build_server_request(ObTransState &s, event::ObIOBufferReader *&reader, int64_t &request_len); + static int rewrite_stmt_id(ObTransState &s, event::ObIOBufferReader *client_buffer_reader); static int build_oceanbase_user_request(ObTransState &s, event::ObIOBufferReader *client_buffer_reader, event::ObIOBufferReader *&reader, int64_t &request_len); static int build_user_request(ObTransState &s, event::ObIOBufferReader *client_buffer_reader, diff --git a/src/obproxy/proxy/mysql/ob_piece_info.h b/src/obproxy/proxy/mysql/ob_piece_info.h new file mode 100644 index 0000000..4745e36 --- /dev/null +++ b/src/obproxy/proxy/mysql/ob_piece_info.h @@ -0,0 +1,63 @@ +// Copyright (c) 2021-2026 Alibaba Inc. All Rights Reserved. +// Author: +// zhixin.lm@antgroup.com +// + +#ifndef OBPROXY_PIECE_INFO_H +#define OBPROXY_PIECE_INFO_H + +#include "lib/hash/ob_build_in_hashmap.h" +#include "iocore/net/ob_inet.h" + +namespace oceanbase +{ +namespace obproxy +{ +namespace proxy +{ + +class ObPieceInfo +{ +public: + ObPieceInfo() : ps_id_(0), addr_() {} + ~ObPieceInfo() {} + bool is_valid() const { return 0 != ps_id_ && addr_.is_valid(); } + uint32_t get_ps_id() const { return ps_id_; } + net::ObIpEndpoint &get_addr() { return addr_; } + void set_ps_id(const uint32_t ps_id) { ps_id_ = ps_id; } + void set_addr(const struct sockaddr &addr) { addr_.assign(addr); } + int64_t to_string(char *buf, const int64_t buf_len) const + { + int64_t pos = 0; + J_OBJ_START(); + J_KV(K_(ps_id), + K_(addr)); + J_OBJ_END(); + return pos; + } + +public: + LINK(ObPieceInfo, piece_info_link_); +private: + uint32_t ps_id_; + net::ObIpEndpoint addr_; +}; + +struct ObPieceInfoHashing +{ + typedef uint32_t Key; + typedef ObPieceInfo Value; + typedef ObDLList(ObPieceInfo, piece_info_link_) ListHead; + + static uint64_t hash(Key key) { return key; } + static Key key(Value *value) { return value->get_ps_id(); } + static bool equal(Key lhs, Key rhs) { return lhs == rhs; } +}; + +typedef common::hash::ObBuildInHashMap ObPieceInfoMap; + +} // end proxy +} // end obproxy +} // end oceanbase + +#endif diff --git a/src/obproxy/proxy/mysqllib/ob_mysql_common_define.h b/src/obproxy/proxy/mysqllib/ob_mysql_common_define.h index 4b9d34d..7215daf 100644 --- a/src/obproxy/proxy/mysqllib/ob_mysql_common_define.h +++ b/src/obproxy/proxy/mysqllib/ob_mysql_common_define.h @@ -165,6 +165,9 @@ bool is_supported_mysql_cmd(const obmysql::ObMySQLCmd mysql_cmd) case obmysql::OB_MYSQL_COM_STMT_RESET: // Stored Procedures case obmysql::OB_MYSQL_COM_STMT_FETCH: + // pieceinfo + case obmysql::COM_STMT_SEND_PIECE_DATA: + case obmysql::COM_STMT_GET_PIECE_DATA: ret = true; break; case obmysql::OB_MYSQL_COM_CHANGE_USER: diff --git a/src/obproxy/proxy/mysqllib/ob_mysql_compress_analyzer.cpp b/src/obproxy/proxy/mysqllib/ob_mysql_compress_analyzer.cpp index 4f92d15..6de3072 100644 --- a/src/obproxy/proxy/mysqllib/ob_mysql_compress_analyzer.cpp +++ b/src/obproxy/proxy/mysqllib/ob_mysql_compress_analyzer.cpp @@ -434,7 +434,8 @@ int ObMysqlCompressAnalyzer::do_analyze_last_compress_packet(ObMysqlResp &resp) body_start = last_packet_buffer_ + MYSQL_NET_HEADER_LENGTH; body_len = meta1.pkt_len_ - MYSQL_NET_HEADER_LENGTH; // only one ok packet, we need rewrite later - if (OB_MYSQL_COM_FIELD_LIST == mysql_cmd_|| OB_MYSQL_COM_STMT_PREPARE == mysql_cmd_) { + if (OB_MYSQL_COM_FIELD_LIST == mysql_cmd_|| OB_MYSQL_COM_STMT_PREPARE == mysql_cmd_ + || OB_MYSQL_COM_STMT_GET_PIECE_DATA == mysql_cmd_) { resp.get_analyze_result().ok_packet_action_type_ = OK_PACKET_ACTION_CONSUME; } else { resp.get_analyze_result().ok_packet_action_type_ = OK_PACKET_ACTION_REWRITE; diff --git a/src/obproxy/proxy/mysqllib/ob_mysql_compress_ob20_analyzer.cpp b/src/obproxy/proxy/mysqllib/ob_mysql_compress_ob20_analyzer.cpp index d62dc3c..2dfa947 100644 --- a/src/obproxy/proxy/mysqllib/ob_mysql_compress_ob20_analyzer.cpp +++ b/src/obproxy/proxy/mysqllib/ob_mysql_compress_ob20_analyzer.cpp @@ -492,7 +492,7 @@ int ObMysqlCompressOB20Analyzer::analyze_first_response( resp.get_analyze_result().is_resultset_resp_ = ((OB_MYSQL_COM_QUERY == result_.get_cmd() || OB_MYSQL_COM_STMT_EXECUTE == result_.get_cmd() || OB_MYSQL_COM_STMT_FETCH == result_.get_cmd()) - && (OB_MYSQL_COM_STATISTICS != result_.get_cmd()) + && (COM_STATISTICS != result_.get_cmd()) && (MYSQL_OK_PACKET_TYPE != mysql_result.meta_.pkt_type_) && (MYSQL_ERR_PACKET_TYPE != mysql_result.meta_.pkt_type_) && (MYSQL_EOF_PACKET_TYPE != mysql_result.meta_.pkt_type_) diff --git a/src/obproxy/proxy/mysqllib/ob_mysql_request_analyzer.cpp b/src/obproxy/proxy/mysqllib/ob_mysql_request_analyzer.cpp index 9b186b4..9fe7fe8 100644 --- a/src/obproxy/proxy/mysqllib/ob_mysql_request_analyzer.cpp +++ b/src/obproxy/proxy/mysqllib/ob_mysql_request_analyzer.cpp @@ -553,7 +553,9 @@ inline int ObMysqlRequestAnalyzer::do_analyze_request( } case OB_MYSQL_COM_STMT_FETCH: case OB_MYSQL_COM_STMT_CLOSE: - case OB_MYSQL_COM_STMT_EXECUTE: { + case OB_MYSQL_COM_STMT_EXECUTE: + case OB_MYSQL_COM_STMT_SEND_PIECE_DATA: + case OB_MYSQL_COM_STMT_GET_PIECE_DATA: { // add packet's buffer to mysql common request, for parsing later if (OB_FAIL(client_request.add_request(ctx.reader_, ctx.request_buffer_length_))) { LOG_WARN("fail to add com request", K(ret)); @@ -607,7 +609,9 @@ inline int ObMysqlRequestAnalyzer::do_analyze_request( } } case OB_MYSQL_COM_HANDSHAKE: - case OB_MYSQL_COM_QUIT: + case OB_MYSQL_COM_QUIT: { + break; + } default: { break; } diff --git a/src/obproxy/proxy/mysqllib/ob_mysql_resp_analyzer.cpp b/src/obproxy/proxy/mysqllib/ob_mysql_resp_analyzer.cpp index 6217502..a799605 100644 --- a/src/obproxy/proxy/mysqllib/ob_mysql_resp_analyzer.cpp +++ b/src/obproxy/proxy/mysqllib/ob_mysql_resp_analyzer.cpp @@ -273,7 +273,9 @@ int ObRespResult::is_resp_finished(bool &finished, ObMysqlRespEndingType &ending } case OB_MYSQL_COM_STMT_FETCH: case OB_MYSQL_COM_STMT_EXECUTE: - case OB_MYSQL_COM_QUERY : { + case OB_MYSQL_COM_QUERY: + case OB_MYSQL_COM_STMT_SEND_PIECE_DATA: + case OB_MYSQL_COM_STMT_GET_PIECE_DATA: { if (RESULT_SET_RESP_TYPE == resp_type_ || OTHERS_RESP_TYPE == resp_type_) { if (OB_UNLIKELY(is_mysql_mode())) { if (2 == pkt_cnt_[EOF_PACKET_ENDING_TYPE]) { @@ -506,7 +508,9 @@ inline int ObMysqlRespAnalyzer::read_pkt_type(ObBufferReader &buf_reader, ObResp || OB_MYSQL_COM_STMT_PREPARE == result.get_cmd() || OB_MYSQL_COM_STMT_FETCH == result.get_cmd() || OB_MYSQL_COM_STMT_EXECUTE == result.get_cmd() - || OB_MYSQL_COM_STMT_PREPARE_EXECUTE == result.get_cmd()) + || OB_MYSQL_COM_STMT_PREPARE_EXECUTE == result.get_cmd() + || OB_MYSQL_COM_STMT_GET_PIECE_DATA == result.get_cmd() + || OB_MYSQL_COM_STMT_SEND_PIECE_DATA == result.get_cmd()) && OB_LIKELY(MAX_RESP_TYPE == result.get_resp_type())) { ObMysqlRespEndingType type = meta_analyzer_.get_cur_type(); if (ERROR_PACKET_ENDING_TYPE == type || OK_PACKET_ENDING_TYPE == type) { @@ -642,6 +646,8 @@ inline int ObMysqlRespAnalyzer::analyze_resp_pkt( } else { ok_packet_action_type = OK_PACKET_ACTION_REWRITE; } + } else if (COM_STMT_GET_PIECE_DATA == result.get_cmd()) { + ok_packet_action_type = OK_PACKET_ACTION_CONSUME; } else if (1 == prepare_ok_pkt_cnt) { //stmt_prepare extra ok atfter ok_packet_action_type = OK_PACKET_ACTION_CONSUME; diff --git a/src/obproxy/proxy/mysqllib/ob_proxy_parser_utils.cpp b/src/obproxy/proxy/mysqllib/ob_proxy_parser_utils.cpp index 1254763..5aa3c1e 100644 --- a/src/obproxy/proxy/mysqllib/ob_proxy_parser_utils.cpp +++ b/src/obproxy/proxy/mysqllib/ob_proxy_parser_utils.cpp @@ -195,6 +195,12 @@ const char *ObProxyParserUtils::get_sql_cmd_name(const ObMySQLCmd cmd) case OB_MYSQL_COM_MAX_NUM: name = "OB_MYSQL_COM_MAX_NUM"; break; + case OB_MYSQL_COM_STMT_SEND_PIECE_DATA: + name ="OB_MYSQL_COM_STMT_SEND_PIECE_DATA"; + break; + case OB_MYSQL_COM_STMT_GET_PIECE_DATA: + name = "OB_MYSQL_COM_STMT_GET_PIECE_DATA"; + break; default: name = "unknown sql command"; diff --git a/src/obproxy/proxy/mysqllib/ob_proxy_session_info.cpp b/src/obproxy/proxy/mysqllib/ob_proxy_session_info.cpp index 85291ea..367b31a 100644 --- a/src/obproxy/proxy/mysqllib/ob_proxy_session_info.cpp +++ b/src/obproxy/proxy/mysqllib/ob_proxy_session_info.cpp @@ -1124,6 +1124,7 @@ void ObClientSessionInfo::destroy() destroy_ps_id_entry_map(); destroy_cursor_id_addr_map(); destroy_ps_id_addrs_map(); + destroy_piece_info_map(); is_trans_specified_ = false; is_global_vars_changed_ = false; is_user_idc_name_set_ = false; @@ -1180,6 +1181,18 @@ void ObClientSessionInfo::destroy_cursor_id_addr_map() cursor_id_addr_map_.reset(); } +void ObClientSessionInfo::destroy_piece_info_map() +{ + ObPieceInfoMap::iterator last = piece_info_map_.end(); + ObPieceInfoMap::iterator tmp_iter; + for (ObPieceInfoMap::iterator iter = piece_info_map_.begin(); iter != last;) { + tmp_iter = iter; + ++iter; + op_free(&(*tmp_iter)); + } + piece_info_map_.reset(); +} + void ObClientSessionInfo::destroy_ps_id_addrs_map() { ObPsIdAddrsMap::iterator last = ps_id_addrs_map_.end(); diff --git a/src/obproxy/proxy/mysqllib/ob_proxy_session_info.h b/src/obproxy/proxy/mysqllib/ob_proxy_session_info.h index dccb15c..d1abd90 100644 --- a/src/obproxy/proxy/mysqllib/ob_proxy_session_info.h +++ b/src/obproxy/proxy/mysqllib/ob_proxy_session_info.h @@ -26,6 +26,7 @@ #include "proxy/route/ob_ldc_struct.h" #include "proxy/mysql/ob_prepare_statement_struct.h" #include "proxy/mysql/ob_cursor_struct.h" +#include "proxy/mysql/ob_piece_info.h" #include "rpc/obmysql/packet/ompk_handshake.h" #include "rpc/obmysql/packet/ompk_ssl_request.h" #include "utils/ob_proxy_hot_upgrader.h" @@ -762,6 +763,30 @@ public: } void destroy_cursor_id_addr_map(); + int add_piece_info(ObPieceInfo *info) + { + return piece_info_map_.unique_set(info); + } + + int get_piece_info(ObPieceInfo* &info) + { + return piece_info_map_.get_refactored(ps_id_, info); + } + int get_piece_info(const uint32_t ps_id, ObPieceInfo* &info) + { + return piece_info_map_.get_refactored(ps_id, info); + } + void remove_piece_info(const uint32_t ps_id) + { + ObPieceInfo *info = piece_info_map_.remove(ps_id); + if (NULL != info) { + op_free(info); + info = NULL; + } + } + + void destroy_piece_info_map(); + int add_ps_id_addrs(ObPsIdAddrs *ps_id_addrs) { return ps_id_addrs_map_.unique_set(ps_id_addrs); } @@ -891,6 +916,7 @@ private: ObCursorIdAddrMap cursor_id_addr_map_; ObPsIdAddrsMap ps_id_addrs_map_; + ObPieceInfoMap piece_info_map_; bool is_read_only_user_; bool is_request_follower_user_; diff --git a/src/rpc/obmysql/ob_mysql_packet.cpp b/src/rpc/obmysql/ob_mysql_packet.cpp index 9a3fe45..d4f3cd7 100644 --- a/src/rpc/obmysql/ob_mysql_packet.cpp +++ b/src/rpc/obmysql/ob_mysql_packet.cpp @@ -195,7 +195,9 @@ char const *get_mysql_cmd_str(ObMySQLCmd mysql_cmd) "Binlog dump gtid", // OB_MYSQL_COM_BINLOG_DUMP_GTID, // "Reset connection", // COM_RESET_CONNECTION, - "Prepare Execute" // OB_MYSQL_COM_STMT_PREPARE_EXECUTE, + "Prepare Execute", // OB_MYSQL_COM_STMT_PREPARE_EXECUTE, + "SEND PIECE DATA", + "GET PIECE DATA", "End", // OB_MYSQL_COM_END, "Delete session", // OB_MYSQL_COM_DELETE_SESSION diff --git a/src/rpc/obmysql/ob_mysql_packet.h b/src/rpc/obmysql/ob_mysql_packet.h index f76d58b..618f043 100644 --- a/src/rpc/obmysql/ob_mysql_packet.h +++ b/src/rpc/obmysql/ob_mysql_packet.h @@ -66,10 +66,11 @@ enum ObMySQLCmd // COM_RESET_CONNECTION, OB_MYSQL_COM_STMT_PREPARE_EXECUTE = OBPROXY_NEW_MYSQL_CMD_START, + OB_MYSQL_COM_STMT_SEND_PIECE_DATA, + OB_MYSQL_COM_STMT_GET_PIECE_DATA, OB_MYSQL_COM_END, - // for obproxy // OB_MYSQL_COM_DELETE_SESSION is not a standard mysql package type. This is a package used to process delete session // When the connection is disconnected, the session needs to be deleted, but at this time it may not be obtained in the callback function disconnect -- GitLab