diff --git a/src/obproxy/proxy/mysql/ob_mysql_sm.cpp b/src/obproxy/proxy/mysql/ob_mysql_sm.cpp index 3a7b4c85a35e67078adb0aed38f99a122711e370..12a6308972f0abee078b1547c7957185a34170b2 100644 --- a/src/obproxy/proxy/mysql/ob_mysql_sm.cpp +++ b/src/obproxy/proxy/mysql/ob_mysql_sm.cpp @@ -3566,19 +3566,11 @@ int ObMysqlSM::tunnel_handler_response_transfered(int event, void *data) ObMysqlTransact::handle_pl_update(trans_state_); } - if (obmysql::COM_STMT_FETCH == trans_state_.trans_info_.sql_cmd_ + if (obmysql::OB_MYSQL_COM_STMT_FETCH == trans_state_.trans_info_.sql_cmd_ && client_session_->is_need_return_last_bound_ss()) { int ret = OB_SUCCESS; ObMysqlServerSession *last_bound_session = client_session_->get_last_bound_server_session(); if (NULL != last_bound_session) { - // 由于 tunnel_handler_server 中只有事务中才会释放 server_session, - // 正常释放 server sssion 有两个地方: - // 1. 事务中, tunnel_handler_server - // 2. 事务结束, setup_cmd_complete - // 对于事务中的 COM_STMT_FETCH, 如果需要切换到另外一台 Server: - // 1. 在 tunnel_handler_server 时, 是认为事务结束了. 因为 in_trans = false; - // 2. 由于这里修改了事务状态,在 setup_cmd_complete 中又认为是事务中 - // 所以上面两处都不会释放, 所以这里要释放一次 release_server_session(); if (OB_FAIL(ObMysqlTransact::return_last_bound_server_session(client_session_))) { LOG_WARN("fail to return last bound server session", K(ret)); diff --git a/src/obproxy/proxy/mysql/ob_mysql_transact.cpp b/src/obproxy/proxy/mysql/ob_mysql_transact.cpp index bb0520b2ab77942bb599986672b7c2a3f27fc889..8b30311d8288daf6908b6a7d82e9b16f2b269e23 100644 --- a/src/obproxy/proxy/mysql/ob_mysql_transact.cpp +++ b/src/obproxy/proxy/mysql/ob_mysql_transact.cpp @@ -785,7 +785,7 @@ void ObMysqlTransact::handle_oceanbase_request(ObTransState &s) s.server_info_.set_addr(cursor_id_addr->get_addr()); s.pll_info_.lookup_success_ = true; } - } else if (obmysql::COM_STMT_GET_PIECE_DATA == s.trans_info_.sql_cmd_) { + } else if (obmysql::OB_MYSQL_COM_STMT_GET_PIECE_DATA == s.trans_info_.sql_cmd_) { ObCursorIdAddr *cursor_id_addr = NULL; if (OB_FAIL(s.sm_->get_client_session()->get_session_info().get_cursor_id_addr(cursor_id_addr))) { LOG_WARN("fail to get client cursor id addr", K(ret)); @@ -811,7 +811,12 @@ void ObMysqlTransact::handle_oceanbase_request(ObTransState &s) s.current_.state_ = ObMysqlTransact::INTERNAL_ERROR; TRANSACT_RETURN(SM_ACTION_INTERNAL_NOOP, NULL); } - } else if (obmysql::COM_STMT_SEND_PIECE_DATA == s.trans_info_.sql_cmd_) { + + if (OB_SUCC(ret)) { + s.server_info_.set_addr(last_session->get_netvc()->get_remote_addr()); + s.pll_info_.lookup_success_ = true; + } + } else if (obmysql::OB_MYSQL_COM_STMT_SEND_PIECE_DATA == s.trans_info_.sql_cmd_) { ObPieceInfo *info = NULL; if (OB_FAIL(s.sm_->get_client_session()->get_session_info().get_piece_info(info))) { if (OB_HASH_NOT_EXIST == ret) { @@ -2018,6 +2023,8 @@ inline int ObMysqlTransact::build_oceanbase_user_request( if (s.trans_info_.request_content_length_ > 0) { reader = client_buffer_reader; request_len = client_request_len; + } else if (OB_FAIL(rewrite_stmt_id(s, client_buffer_reader))) { + LOG_WARN("rewrite stmt id failed", K(ret)); } else { ObIOBufferReader *request_buffer_reader = client_buffer_reader; if (PROTOCOL_OB20 == ob_proxy_protocol || PROTOCOL_CHECKSUM == ob_proxy_protocol) { // convert standard mysql protocol to compression protocol @@ -3260,7 +3267,7 @@ inline void ObMysqlTransact::handle_ok_resp(ObTransState &s) } } - if (obmysql::COM_STMT_SEND_PIECE_DATA == s.trans_info_.sql_cmd_) { + if (obmysql::OB_MYSQL_COM_STMT_SEND_PIECE_DATA == s.trans_info_.sql_cmd_) { ObPieceInfo *info = NULL; ObClientSessionInfo &cs_info = get_client_session_info(s); if (OB_FAIL(cs_info.get_piece_info(info))) { @@ -3878,8 +3885,8 @@ void ObMysqlTransact::handle_oceanbase_retry_server_connection(ObTransState &s) && s.current_.attempts_ < max_connect_attempts && 0 == obproxy_route_addr && !s.trans_info_.client_request_.is_kill_query() - && obmysql::COM_STMT_FETCH != s.trans_info_.sql_cmd_ - && obmysql::COM_STMT_GET_PIECE_DATA != s.trans_info_.sql_cmd_ + && obmysql::OB_MYSQL_COM_STMT_FETCH != s.trans_info_.sql_cmd_ + && obmysql::OB_MYSQL_COM_STMT_GET_PIECE_DATA != s.trans_info_.sql_cmd_ && !second_in) { ++s.current_.attempts_; LOG_DEBUG("start next retry"); diff --git a/src/obproxy/proxy/mysqllib/ob_mysql_common_define.h b/src/obproxy/proxy/mysqllib/ob_mysql_common_define.h index 7215daf93d72f3d2fe10bd1ff22cb3cce71db63a..fef365a69dce91565e88a2dccb78d80167db9365 100644 --- a/src/obproxy/proxy/mysqllib/ob_mysql_common_define.h +++ b/src/obproxy/proxy/mysqllib/ob_mysql_common_define.h @@ -166,8 +166,8 @@ bool is_supported_mysql_cmd(const obmysql::ObMySQLCmd mysql_cmd) // Stored Procedures case obmysql::OB_MYSQL_COM_STMT_FETCH: // pieceinfo - case obmysql::COM_STMT_SEND_PIECE_DATA: - case obmysql::COM_STMT_GET_PIECE_DATA: + case obmysql::OB_MYSQL_COM_STMT_SEND_PIECE_DATA: + case obmysql::OB_MYSQL_COM_STMT_GET_PIECE_DATA: ret = true; break; case obmysql::OB_MYSQL_COM_CHANGE_USER: diff --git a/src/obproxy/proxy/mysqllib/ob_mysql_compress_ob20_analyzer.cpp b/src/obproxy/proxy/mysqllib/ob_mysql_compress_ob20_analyzer.cpp index 2dfa947f14e87c616bd9b482d850c3084510dfdc..d62dc3ce827d88ddb533ce8782d1b28163c03db4 100644 --- a/src/obproxy/proxy/mysqllib/ob_mysql_compress_ob20_analyzer.cpp +++ b/src/obproxy/proxy/mysqllib/ob_mysql_compress_ob20_analyzer.cpp @@ -492,7 +492,7 @@ int ObMysqlCompressOB20Analyzer::analyze_first_response( resp.get_analyze_result().is_resultset_resp_ = ((OB_MYSQL_COM_QUERY == result_.get_cmd() || OB_MYSQL_COM_STMT_EXECUTE == result_.get_cmd() || OB_MYSQL_COM_STMT_FETCH == result_.get_cmd()) - && (COM_STATISTICS != result_.get_cmd()) + && (OB_MYSQL_COM_STATISTICS != result_.get_cmd()) && (MYSQL_OK_PACKET_TYPE != mysql_result.meta_.pkt_type_) && (MYSQL_ERR_PACKET_TYPE != mysql_result.meta_.pkt_type_) && (MYSQL_EOF_PACKET_TYPE != mysql_result.meta_.pkt_type_) diff --git a/src/obproxy/proxy/mysqllib/ob_mysql_resp_analyzer.cpp b/src/obproxy/proxy/mysqllib/ob_mysql_resp_analyzer.cpp index 6d9519274768d9150412baad5eb84bb963ef253e..0e69324d5ec03857571d86a7c9b7480f26ba170b 100644 --- a/src/obproxy/proxy/mysqllib/ob_mysql_resp_analyzer.cpp +++ b/src/obproxy/proxy/mysqllib/ob_mysql_resp_analyzer.cpp @@ -90,7 +90,7 @@ inline int ObMysqlPacketMetaAnalyzer::update_cur_type(ObRespResult &result) } else if (result.is_recv_resultset()) { cur_type_ = OK_PACKET_ENDING_TYPE; } - } else if (COM_STMT_EXECUTE == result.get_cmd()) { + } else if (OB_MYSQL_COM_STMT_EXECUTE == result.get_cmd()) { if (1 == err_pkt_cnt) { cur_type_ = OK_PACKET_ENDING_TYPE; } else if (1 != eof_pkt_cnt) { @@ -303,7 +303,7 @@ int ObRespResult::is_resp_finished(bool &finished, ObMysqlRespEndingType &ending } else if (1 == pkt_cnt_[ERROR_PACKET_ENDING_TYPE]) { finished = true; ending_type = ERROR_PACKET_ENDING_TYPE; - } else if (1 == pkt_cnt_[EOF_PACKET_ENDING_TYPE] && COM_STMT_EXECUTE == cmd_ && is_recv_resultset_) { + } else if (1 == pkt_cnt_[EOF_PACKET_ENDING_TYPE] && OB_MYSQL_COM_STMT_EXECUTE == cmd_ && is_recv_resultset_) { finished = true; ending_type = EOF_PACKET_ENDING_TYPE; } else { @@ -657,7 +657,7 @@ inline int ObMysqlRespAnalyzer::analyze_resp_pkt( } else { ok_packet_action_type = OK_PACKET_ACTION_REWRITE; } - } else if (COM_STMT_GET_PIECE_DATA == result.get_cmd()) { + } else if (OB_MYSQL_COM_STMT_GET_PIECE_DATA == result.get_cmd()) { ok_packet_action_type = OK_PACKET_ACTION_CONSUME; } else if (1 == prepare_ok_pkt_cnt) { //stmt_prepare extra ok atfter @@ -670,7 +670,7 @@ inline int ObMysqlRespAnalyzer::analyze_resp_pkt( } else if (2 == eof_pkt_cnt) { // extra ok after result set ok_packet_action_type = OK_PACKET_ACTION_CONSUME; - } else if (1 == eof_pkt_cnt && COM_STMT_EXECUTE == result.get_cmd() && result.is_recv_resultset()) { + } else if (1 == eof_pkt_cnt && OB_MYSQL_COM_STMT_EXECUTE == result.get_cmd() && result.is_recv_resultset()) { ok_packet_action_type = OK_PACKET_ACTION_CONSUME; } else if (0 == err_pkt_cnt || 0 == eof_pkt_cnt) { // last ok packet, no err and eof in front @@ -750,7 +750,7 @@ inline int ObMysqlRespAnalyzer::analyze_resp_pkt( if (is_last_eof_pkt) { handle_last_eof(pkt_len); - if (COM_STMT_EXECUTE == result.get_cmd()) { + if (OB_MYSQL_COM_STMT_EXECUTE == result.get_cmd()) { result.set_recv_resultset(true); } } @@ -1065,7 +1065,7 @@ inline int ObMysqlRespAnalyzer::analyze_eof_pkt(obmysql::ObMySQLCmd cmd, bool &i cur_stmt_has_more_result_ = false; } - if (COM_STMT_EXECUTE == cmd && server_status.status_flags_.OB_SERVER_STATUS_CURSOR_EXISTS) { + if (OB_MYSQL_COM_STMT_EXECUTE == cmd && server_status.status_flags_.OB_SERVER_STATUS_CURSOR_EXISTS) { is_last_eof_pkt = true; } } diff --git a/src/rpc/obmysql/ob_mysql_packet.h b/src/rpc/obmysql/ob_mysql_packet.h index 618f043d1a1da692c86304577a247b41d041f973..31c05225194dc6416d4599d30f655ee5fe100ac3 100644 --- a/src/rpc/obmysql/ob_mysql_packet.h +++ b/src/rpc/obmysql/ob_mysql_packet.h @@ -71,6 +71,7 @@ enum ObMySQLCmd OB_MYSQL_COM_END, + // for obproxy // OB_MYSQL_COM_DELETE_SESSION is not a standard mysql package type. This is a package used to process delete session // When the connection is disconnected, the session needs to be deleted, but at this time it may not be obtained in the callback function disconnect diff --git a/src/rpc/obmysql/packet/ompk_handshake_response.cpp b/src/rpc/obmysql/packet/ompk_handshake_response.cpp index 3768c01295862ee81085ea18f1aca1b138c5d15e..583cc5d306082356a33e1fc1293980635f8b3d51 100644 --- a/src/rpc/obmysql/packet/ompk_handshake_response.cpp +++ b/src/rpc/obmysql/packet/ompk_handshake_response.cpp @@ -202,7 +202,7 @@ int OMPKHandshakeResponse::decode() } } - /* 如果长度不对, 又怀疑是 connector/j, 放过, 参考 bug: https://bugs.mysql.com/bug.php?id=79612 */ + /* If the length is wrong, and suspect it is connector/j, let it go, bug: https://bugs.mysql.com/bug.php?id=79612 */ if (OB_INVALID_ARGUMENT == ret && maybe_connector_j) { ret = OB_SUCCESS; connect_attrs_.reset();