diff --git a/src/obproxy/proxy/mysql/ob_mysql_client_session.cpp b/src/obproxy/proxy/mysql/ob_mysql_client_session.cpp index 494e9d6276503c549a606483b9049e530fb428b7..186ef3c9a8a20abd247e663e38d26ff00df3608f 100644 --- a/src/obproxy/proxy/mysql/ob_mysql_client_session.cpp +++ b/src/obproxy/proxy/mysql/ob_mysql_client_session.cpp @@ -64,6 +64,7 @@ ObMysqlClientSession::ObMysqlClientSession() vc_ready_killed_(false), is_waiting_trans_first_request_(false), is_need_send_trace_info_(true), is_already_send_trace_info_(false), is_first_handle_close_request_(true), is_in_trans_for_close_request_(false), + is_need_return_last_bound_ss_(false), need_delete_cluster_(false), is_first_dml_sql_got_(false), cluster_resource_(NULL), dummy_entry_(NULL), is_need_update_dummy_entry_(false), dummy_ldc_(), dummy_entry_valid_time_ns_(0), server_state_version_(0), @@ -150,6 +151,7 @@ void ObMysqlClientSession::destroy() is_already_send_trace_info_ = false; is_first_handle_close_request_ = true; is_in_trans_for_close_request_ = false; + is_need_return_last_bound_ss_ = false; is_first_dml_sql_got_ = false; schema_key_.reset(); diff --git a/src/obproxy/proxy/mysql/ob_mysql_client_session.h b/src/obproxy/proxy/mysql/ob_mysql_client_session.h index a91652de3a828ee4334593679a16f3147fc5f1a0..526454d13291634c47395ac9499cb0f8e2fe325e 100644 --- a/src/obproxy/proxy/mysql/ob_mysql_client_session.h +++ b/src/obproxy/proxy/mysql/ob_mysql_client_session.h @@ -239,6 +239,8 @@ public: bool is_first_handle_close_request() const { return is_first_handle_close_request_; } void set_in_trans_for_close_request(bool is_in_trans_for_close_request) { is_in_trans_for_close_request_ = is_in_trans_for_close_request; } bool is_in_trans_for_close_request() const { return is_in_trans_for_close_request_; } + void set_need_return_last_bound_ss(bool is_need_return_last_bound_ss) { is_need_return_last_bound_ss_ = is_need_return_last_bound_ss; } + bool is_need_return_last_bound_ss() const { return is_need_return_last_bound_ss_; } bool enable_analyze_internal_cmd() const { return session_info_.enable_analyze_internal_cmd(); } bool is_metadb_user() const { return session_info_.is_metadb_user(); } @@ -334,6 +336,7 @@ public: bool is_already_send_trace_info_; bool is_first_handle_close_request_; bool is_in_trans_for_close_request_; + bool is_need_return_last_bound_ss_; bool need_delete_cluster_; bool is_first_dml_sql_got_;//default false, will route with merge status careless //it is true after user first dml sql arrived. diff --git a/src/obproxy/proxy/mysql/ob_mysql_sm.cpp b/src/obproxy/proxy/mysql/ob_mysql_sm.cpp index 552b8f6110f61f58f324f1ab8f66fc090fc8d7c9..3a7b4c85a35e67078adb0aed38f99a122711e370 100644 --- a/src/obproxy/proxy/mysql/ob_mysql_sm.cpp +++ b/src/obproxy/proxy/mysql/ob_mysql_sm.cpp @@ -3566,6 +3566,31 @@ int ObMysqlSM::tunnel_handler_response_transfered(int event, void *data) ObMysqlTransact::handle_pl_update(trans_state_); } + if (obmysql::COM_STMT_FETCH == trans_state_.trans_info_.sql_cmd_ + && client_session_->is_need_return_last_bound_ss()) { + int ret = OB_SUCCESS; + ObMysqlServerSession *last_bound_session = client_session_->get_last_bound_server_session(); + if (NULL != last_bound_session) { + // 由于 tunnel_handler_server 中只有事务中才会释放 server_session, + // 正常释放 server sssion 有两个地方: + // 1. 事务中, tunnel_handler_server + // 2. 事务结束, setup_cmd_complete + // 对于事务中的 COM_STMT_FETCH, 如果需要切换到另外一台 Server: + // 1. 在 tunnel_handler_server 时, 是认为事务结束了. 因为 in_trans = false; + // 2. 由于这里修改了事务状态,在 setup_cmd_complete 中又认为是事务中 + // 所以上面两处都不会释放, 所以这里要释放一次 + release_server_session(); + if (OB_FAIL(ObMysqlTransact::return_last_bound_server_session(client_session_))) { + LOG_WARN("fail to return last bound server session", K(ret)); + } else { + trans_state_.current_.state_ = ObMysqlTransact::CMD_COMPLETE; + } + } else { + trans_state_.current_.state_ = ObMysqlTransact::INTERNAL_ERROR; + LOG_WARN("need return last bound ss, but last bound ss is NULL", K(ret)); + } + } + // each sm will be destroyed after it runs 5 secondes. if (NULL != client_session_ && (ObMysqlTransact::CMD_COMPLETE == trans_state_.current_.state_ @@ -4865,8 +4890,10 @@ inline int ObMysqlSM::do_oceanbase_internal_observer_open(ObMysqlServerSession * last_session = client_session_->get_server_session(); // if need_pl_lookup is false, we must use last server session - // allow no last server session when OB_MYSQL_COM_STMT_CLOSE and need_pl_lookup_ = false - if (!trans_state_.need_pl_lookup_ && OB_MYSQL_COM_STMT_CLOSE != trans_state_.trans_info_.sql_cmd_) { + // allow no last server session when OB_MYSQL_COM_STMT_CLOSE/OB_MYSQL_COM_STMT_FETCH and need_pl_lookup_ = false + if (!trans_state_.need_pl_lookup_ + && ((OB_MYSQL_COM_STMT_CLOSE != trans_state_.trans_info_.sql_cmd_ && OB_MYSQL_COM_STMT_FETCH != trans_state_.trans_info_.sql_cmd_) + || !client_session_->is_need_return_last_bound_ss())) { if (OB_ISNULL(last_session)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("last server session is NULL, disconnect", K_(sm_id), K(ret)); @@ -6675,6 +6702,7 @@ int ObMysqlSM::setup_cmd_complete() client_session_->set_first_handle_close_request(true); client_session_->set_in_trans_for_close_request(false); client_session_->set_sharding_select_log_plan(NULL); + client_session_->set_need_return_last_bound_ss(false); if (OB_MYSQL_COM_HANDSHAKE == trans_state_.trans_info_.sql_cmd_) { // set inactivity timeout to connect_timeout after proxy send handshake diff --git a/src/obproxy/proxy/mysql/ob_mysql_transact.cpp b/src/obproxy/proxy/mysql/ob_mysql_transact.cpp index 378d97338d895faf9c9ffaf10b9a23500fbdbab4..bb0520b2ab77942bb599986672b7c2a3f27fc889 100644 --- a/src/obproxy/proxy/mysql/ob_mysql_transact.cpp +++ b/src/obproxy/proxy/mysql/ob_mysql_transact.cpp @@ -643,6 +643,7 @@ void ObMysqlTransact::handle_oceanbase_request(ObTransState &s) client_session->attach_server_session(NULL); last_session->do_io_read(client_session, 0, NULL); client_session->set_last_bound_server_session(last_session); + client_session->set_need_return_last_bound_ss(true); } if (OB_SUCC(ret)) { @@ -653,30 +654,29 @@ void ObMysqlTransact::handle_oceanbase_request(ObTransState &s) } } else { // move last_bound_ss to bound_ss - if (NULL != last_bound_session) { - ObMysqlServerSession *last_session = client_session->get_server_session(); - // if bound_ss not NULL, return to connection pool - if (NULL != last_session) { - last_session->release(); - client_session->attach_server_session(NULL); - } - // need set cur_server_session, because will check session != cur_ss_ in attach_server_session func - client_session->set_cur_server_session(last_bound_session); - if (OB_FAIL(client_session->attach_server_session(last_bound_session))) { - LOG_WARN("client session failed to attach server session", K(ret)); + if (client_session->is_need_return_last_bound_ss()) { + if (NULL != last_bound_session) { + if (OB_FAIL(return_last_bound_server_session(client_session))) { + LOG_WARN("fail to return last bond server session", K(ret)); + } } else { - client_session->set_last_bound_server_session(NULL); + LOG_WARN("[ObMysqlTransact::handle request] last bound session is NULL, we have to disconnect"); + ret = OB_ERR_UNEXPECTED; } } - if (!client_session->is_in_trans_for_close_request()) { - s.sm_->trans_state_.current_.state_ = ObMysqlTransact::TRANSACTION_COMPLETE; + if (OB_SUCC(ret)) { + if (!client_session->is_in_trans_for_close_request()) { + s.sm_->trans_state_.current_.state_ = ObMysqlTransact::TRANSACTION_COMPLETE; + } else { + s.sm_->trans_state_.current_.state_ = ObMysqlTransact::CMD_COMPLETE; + } + + // if close all, move to internal request and clear session cache + TRANSACT_RETURN(SM_ACTION_INTERNAL_REQUEST, handle_internal_request); } else { - s.sm_->trans_state_.current_.state_ = ObMysqlTransact::CMD_COMPLETE; + TRANSACT_RETURN(SM_ACTION_SEND_ERROR_NOOP, NULL); } - - // if close all, move to internal request and clear session cache - TRANSACT_RETURN(SM_ACTION_INTERNAL_REQUEST, handle_internal_request); } } } else if (s.need_pl_lookup_) { @@ -764,8 +764,28 @@ void ObMysqlTransact::handle_oceanbase_request(ObTransState &s) ObMysqlServerSession *last_session = s.sm_->client_session_->get_server_session(); if (OB_LIKELY(NULL != last_session)) { - if (obmysql::OB_MYSQL_COM_STMT_FETCH == s.trans_info_.sql_cmd_ - || obmysql::OB_MYSQL_COM_STMT_GET_PIECE_DATA == s.trans_info_.sql_cmd_) { + if (obmysql::OB_MYSQL_COM_STMT_FETCH == s.trans_info_.sql_cmd_) { + ObCursorIdAddr *cursor_id_addr = NULL; + if (OB_FAIL(s.sm_->get_client_session()->get_session_info().get_cursor_id_addr(cursor_id_addr))) { + LOG_WARN("fail to get client cursor id addr", K(ret)); + if (OB_HASH_NOT_EXIST == ret) { + handle_fetch_request(s); + } else { + TRANSACT_RETURN(SM_ACTION_SEND_ERROR_NOOP, NULL); + } + } else { + if (OB_UNLIKELY(cursor_id_addr->get_addr() != last_session->get_netvc()->get_remote_addr())) { + ObMysqlClientSession *client_session = s.sm_->get_client_session(); + client_session->attach_server_session(NULL); + last_session->do_io_read(client_session, 0, NULL); + client_session->set_last_bound_server_session(last_session); + client_session->set_need_return_last_bound_ss(true); + } + + s.server_info_.set_addr(cursor_id_addr->get_addr()); + s.pll_info_.lookup_success_ = true; + } + } else if (obmysql::COM_STMT_GET_PIECE_DATA == s.trans_info_.sql_cmd_) { ObCursorIdAddr *cursor_id_addr = NULL; if (OB_FAIL(s.sm_->get_client_session()->get_session_info().get_cursor_id_addr(cursor_id_addr))) { LOG_WARN("fail to get client cursor id addr", K(ret)); @@ -825,11 +845,17 @@ void ObMysqlTransact::handle_oceanbase_request(ObTransState &s) s.current_.state_ = ObMysqlTransact::INTERNAL_ERROR; TRANSACT_RETURN(SM_ACTION_INTERNAL_NOOP, NULL); } - } - if (OB_SUCC(ret)) { + if (OB_SUCC(ret)) { + s.server_info_.set_addr(last_session->get_netvc()->get_remote_addr()); + s.pll_info_.lookup_success_ = true; + } + } else { s.server_info_.set_addr(last_session->get_netvc()->get_remote_addr()); s.pll_info_.lookup_success_ = true; + } + + if (OB_SUCC(ret)) { start_access_control(s); } } else { @@ -869,6 +895,32 @@ void ObMysqlTransact::handle_fetch_request(ObTransState &s) TRANSACT_RETURN(SM_ACTION_SEND_ERROR_NOOP, NULL); } +int ObMysqlTransact::return_last_bound_server_session(ObMysqlClientSession *client_session) +{ + int ret = OB_SUCCESS; + + if (OB_ISNULL(client_session)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid param, client session is NULL", K(ret)); + } else { + ObMysqlServerSession *last_session = client_session->get_server_session(); + ObMysqlServerSession *last_bound_session = client_session->get_last_bound_server_session(); + if (NULL != last_session) { + last_session->release(); + client_session->attach_server_session(NULL); + } + client_session->set_cur_server_session(last_bound_session); + if (OB_FAIL(client_session->attach_server_session(last_bound_session))) { + LOG_WARN("fail to attach server session", K(ret)); + } else { + client_session->set_last_bound_server_session(NULL); + client_session->set_need_return_last_bound_ss(false); + } + } + + return ret; +} + void ObMysqlTransact::handle_request(ObTransState &s) { s.sm_->trans_stats_.client_requests_ += 1; diff --git a/src/obproxy/proxy/mysql/ob_mysql_transact.h b/src/obproxy/proxy/mysql/ob_mysql_transact.h index 1108db4f53f2268c1f66f9d42913fb7854895583..3bc7a86088120a837456e3fb7b13583df3b6a516 100644 --- a/src/obproxy/proxy/mysql/ob_mysql_transact.h +++ b/src/obproxy/proxy/mysql/ob_mysql_transact.h @@ -827,6 +827,7 @@ public: DISALLOW_COPY_AND_ASSIGN(ObTransState); }; // End of State struct. + static int return_last_bound_server_session(ObMysqlClientSession *client_session); static void modify_request(ObTransState &s); static bool is_sequence_request(ObTransState &s); static void handle_mysql_request(ObTransState &s);