diff --git a/src/obproxy/proxy/mysqllib/ob_mysql_resp_analyzer.cpp b/src/obproxy/proxy/mysqllib/ob_mysql_resp_analyzer.cpp index a799605c46e260b34eabcd2a62c90c740fa7ac6e..1ed61cef90e7dbfbbc7bac9030b85492e76d8b97 100644 --- a/src/obproxy/proxy/mysqllib/ob_mysql_resp_analyzer.cpp +++ b/src/obproxy/proxy/mysqllib/ob_mysql_resp_analyzer.cpp @@ -90,6 +90,14 @@ inline int ObMysqlPacketMetaAnalyzer::update_cur_type(ObRespResult &result) } else if (result.is_recv_resultset()) { cur_type_ = OK_PACKET_ENDING_TYPE; } + } else if (COM_STMT_EXECUTE == result.get_cmd()) { + if (1 == err_pkt_cnt) { + cur_type_ = OK_PACKET_ENDING_TYPE; + } else if (1 != eof_pkt_cnt) { + cur_type_ = OK_PACKET_ENDING_TYPE; + } else if (result.is_recv_resultset()) { + cur_type_ = OK_PACKET_ENDING_TYPE; + } } else if (1 != eof_pkt_cnt) { cur_type_ = OK_PACKET_ENDING_TYPE; // in OCEANBASE_MYSQL_MODE, if we got an erro in ResultSet Protocol(maybe timeout), the packet @@ -295,6 +303,9 @@ int ObRespResult::is_resp_finished(bool &finished, ObMysqlRespEndingType &ending } else if (1 == pkt_cnt_[ERROR_PACKET_ENDING_TYPE]) { finished = true; ending_type = ERROR_PACKET_ENDING_TYPE; + } else if (1 == pkt_cnt_[EOF_PACKET_ENDING_TYPE] && COM_STMT_EXECUTE == cmd_ && is_recv_resultset_) { + finished = true; + ending_type = EOF_PACKET_ENDING_TYPE; } else { finished = true; ending_type = OK_PACKET_ENDING_TYPE; @@ -659,6 +670,8 @@ inline int ObMysqlRespAnalyzer::analyze_resp_pkt( } else if (2 == eof_pkt_cnt) { // extra ok after result set ok_packet_action_type = OK_PACKET_ACTION_CONSUME; + } else if (1 == eof_pkt_cnt && COM_STMT_EXECUTE == result.get_cmd() && result.is_recv_resultset()) { + ok_packet_action_type = OK_PACKET_ACTION_CONSUME; } else if (0 == err_pkt_cnt || 0 == eof_pkt_cnt) { // last ok packet, no err and eof in front // NOTE: in multi stmt, we will reset pkt cnt if it isn't the last stmt @@ -726,7 +739,7 @@ inline int ObMysqlRespAnalyzer::analyze_resp_pkt( if (0 == eof_pkt_cnt) { // analyze the first eof packet bool is_in_trans = false; - if (OB_FAIL(analyze_eof_pkt(is_in_trans))) { + if (OB_FAIL(analyze_eof_pkt(is_in_trans, is_last_eof_pkt))) { LOG_WARN("fail to analyze_eof_pkt", K(ret)); } else { if (is_in_trans) { @@ -734,20 +747,16 @@ inline int ObMysqlRespAnalyzer::analyze_resp_pkt( } else { result.set_trans_state(NOT_IN_TRANS_STATE_BY_PARSE); } - } - } else if (is_last_eof_pkt) { - if (OB_LIKELY(is_oceanbase_mode())) { - if (cur_stmt_has_more_result_) { - // in multi stmt, send directly - reserved_len_ = 0; - } else { - // last eof packet, must following by an extra ok packet - reserved_len_ = pkt_len + MYSQL_NET_HEADER_LENGTH; + + if (is_last_eof_pkt) { + handle_last_eof(pkt_len); + if (COM_STMT_EXECUTE == result.get_cmd()) { + result.set_recv_resultset(true); + } } - } else { - // in mysql mode send directly - reserved_len_ = 0; } + } else if (is_last_eof_pkt) { + handle_last_eof(pkt_len); } else if (OB_MYSQL_COM_STMT_PREPARE_EXECUTE != result.get_cmd()){ ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected eof packet", K(err_pkt_cnt), K(eof_pkt_cnt), K(ret)); @@ -829,6 +838,22 @@ inline int ObMysqlRespAnalyzer::analyze_resp_pkt( return ret; } +void ObMysqlRespAnalyzer::handle_last_eof(uint32_t pkt_len) +{ + if (OB_LIKELY(is_oceanbase_mode())) { + if (cur_stmt_has_more_result_) { + // in multi stmt, send directly + reserved_len_ = 0; + } else { + // last eof packet, must following by an extra ok packet + reserved_len_ = pkt_len + MYSQL_NET_HEADER_LENGTH; + } + } else { + // in mysql mode send directly + reserved_len_ = 0; + } +} + int ObMysqlRespAnalyzer::analyze_mysql_resp( ObBufferReader &buf_reader, ObRespResult &result, @@ -1015,7 +1040,7 @@ inline int ObMysqlRespAnalyzer::analyze_prepare_ok_pkt(ObRespResult &result) } // ref:http://dev.mysql.com/doc/internals/en/packet-EOF_Packet.html -inline int ObMysqlRespAnalyzer::analyze_eof_pkt(bool &is_in_trans) +inline int ObMysqlRespAnalyzer::analyze_eof_pkt(bool &is_in_trans, bool &is_last_eof_pkt) { int ret = OB_SUCCESS; int64_t len = body_buf_.len(); @@ -1039,6 +1064,10 @@ inline int ObMysqlRespAnalyzer::analyze_eof_pkt(bool &is_in_trans) } else { cur_stmt_has_more_result_ = false; } + + if (server_status.status_flags_.OB_SERVER_STATUS_CURSOR_EXISTS) { + is_last_eof_pkt = true; + } } return ret; diff --git a/src/obproxy/proxy/mysqllib/ob_mysql_resp_analyzer.h b/src/obproxy/proxy/mysqllib/ob_mysql_resp_analyzer.h index ef07007276b9505e03b810c1b9442fd4a26672e2..f394ac1171f2f43efff18bcc00be294265c8fbbd 100644 --- a/src/obproxy/proxy/mysqllib/ob_mysql_resp_analyzer.h +++ b/src/obproxy/proxy/mysqllib/ob_mysql_resp_analyzer.h @@ -212,7 +212,7 @@ public: private: int analyze_prepare_ok_pkt(ObRespResult &result); int analyze_ok_pkt(bool &is_in_trans); - int analyze_eof_pkt(bool &is_in_trans); + int analyze_eof_pkt(bool &is_in_trans, bool &is_last_eof_pkt); int analyze_error_pkt(ObMysqlResp *resp); int analyze_hanshake_pkt(ObMysqlResp *resp);//extract connection id @@ -220,6 +220,7 @@ private: int read_pkt_type(ObBufferReader &buf_reader, ObRespResult &result); int read_pkt_body(ObBufferReader &buf_reader, ObRespResult &result); int analyze_resp_pkt(ObRespResult &result, ObMysqlResp *resp); + void handle_last_eof(uint32_t pkt_len); int build_packet_content(obutils::ObVariableLenBuffer &content_buf);