提交 67c4c481 编写于 作者: W wgs13579 提交者: guangshu.wgs

Support cursor and transaction unbinding

上级 47eb493f
......@@ -64,6 +64,7 @@ ObMysqlClientSession::ObMysqlClientSession()
vc_ready_killed_(false), is_waiting_trans_first_request_(false),
is_need_send_trace_info_(true), is_already_send_trace_info_(false),
is_first_handle_close_request_(true), is_in_trans_for_close_request_(false),
is_need_return_last_bound_ss_(false),
need_delete_cluster_(false), is_first_dml_sql_got_(false),
cluster_resource_(NULL), dummy_entry_(NULL), is_need_update_dummy_entry_(false),
dummy_ldc_(), dummy_entry_valid_time_ns_(0), server_state_version_(0),
......@@ -150,6 +151,7 @@ void ObMysqlClientSession::destroy()
is_already_send_trace_info_ = false;
is_first_handle_close_request_ = true;
is_in_trans_for_close_request_ = false;
is_need_return_last_bound_ss_ = false;
is_first_dml_sql_got_ = false;
schema_key_.reset();
......
......@@ -239,6 +239,8 @@ public:
bool is_first_handle_close_request() const { return is_first_handle_close_request_; }
void set_in_trans_for_close_request(bool is_in_trans_for_close_request) { is_in_trans_for_close_request_ = is_in_trans_for_close_request; }
bool is_in_trans_for_close_request() const { return is_in_trans_for_close_request_; }
void set_need_return_last_bound_ss(bool is_need_return_last_bound_ss) { is_need_return_last_bound_ss_ = is_need_return_last_bound_ss; }
bool is_need_return_last_bound_ss() const { return is_need_return_last_bound_ss_; }
bool enable_analyze_internal_cmd() const { return session_info_.enable_analyze_internal_cmd(); }
bool is_metadb_user() const { return session_info_.is_metadb_user(); }
......@@ -334,6 +336,7 @@ public:
bool is_already_send_trace_info_;
bool is_first_handle_close_request_;
bool is_in_trans_for_close_request_;
bool is_need_return_last_bound_ss_;
bool need_delete_cluster_;
bool is_first_dml_sql_got_;//default false, will route with merge status careless
//it is true after user first dml sql arrived.
......
......@@ -3566,6 +3566,31 @@ int ObMysqlSM::tunnel_handler_response_transfered(int event, void *data)
ObMysqlTransact::handle_pl_update(trans_state_);
}
if (obmysql::COM_STMT_FETCH == trans_state_.trans_info_.sql_cmd_
&& client_session_->is_need_return_last_bound_ss()) {
int ret = OB_SUCCESS;
ObMysqlServerSession *last_bound_session = client_session_->get_last_bound_server_session();
if (NULL != last_bound_session) {
// 由于 tunnel_handler_server 中只有事务中才会释放 server_session,
// 正常释放 server sssion 有两个地方:
// 1. 事务中, tunnel_handler_server
// 2. 事务结束, setup_cmd_complete
// 对于事务中的 COM_STMT_FETCH, 如果需要切换到另外一台 Server:
// 1. 在 tunnel_handler_server 时, 是认为事务结束了. 因为 in_trans = false;
// 2. 由于这里修改了事务状态,在 setup_cmd_complete 中又认为是事务中
// 所以上面两处都不会释放, 所以这里要释放一次
release_server_session();
if (OB_FAIL(ObMysqlTransact::return_last_bound_server_session(client_session_))) {
LOG_WARN("fail to return last bound server session", K(ret));
} else {
trans_state_.current_.state_ = ObMysqlTransact::CMD_COMPLETE;
}
} else {
trans_state_.current_.state_ = ObMysqlTransact::INTERNAL_ERROR;
LOG_WARN("need return last bound ss, but last bound ss is NULL", K(ret));
}
}
// each sm will be destroyed after it runs 5 secondes.
if (NULL != client_session_
&& (ObMysqlTransact::CMD_COMPLETE == trans_state_.current_.state_
......@@ -4865,8 +4890,10 @@ inline int ObMysqlSM::do_oceanbase_internal_observer_open(ObMysqlServerSession *
last_session = client_session_->get_server_session();
// if need_pl_lookup is false, we must use last server session
// allow no last server session when OB_MYSQL_COM_STMT_CLOSE and need_pl_lookup_ = false
if (!trans_state_.need_pl_lookup_ && OB_MYSQL_COM_STMT_CLOSE != trans_state_.trans_info_.sql_cmd_) {
// allow no last server session when OB_MYSQL_COM_STMT_CLOSE/OB_MYSQL_COM_STMT_FETCH and need_pl_lookup_ = false
if (!trans_state_.need_pl_lookup_
&& ((OB_MYSQL_COM_STMT_CLOSE != trans_state_.trans_info_.sql_cmd_ && OB_MYSQL_COM_STMT_FETCH != trans_state_.trans_info_.sql_cmd_)
|| !client_session_->is_need_return_last_bound_ss())) {
if (OB_ISNULL(last_session)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("last server session is NULL, disconnect", K_(sm_id), K(ret));
......@@ -6675,6 +6702,7 @@ int ObMysqlSM::setup_cmd_complete()
client_session_->set_first_handle_close_request(true);
client_session_->set_in_trans_for_close_request(false);
client_session_->set_sharding_select_log_plan(NULL);
client_session_->set_need_return_last_bound_ss(false);
if (OB_MYSQL_COM_HANDSHAKE == trans_state_.trans_info_.sql_cmd_) {
// set inactivity timeout to connect_timeout after proxy send handshake
......
......@@ -643,6 +643,7 @@ void ObMysqlTransact::handle_oceanbase_request(ObTransState &s)
client_session->attach_server_session(NULL);
last_session->do_io_read(client_session, 0, NULL);
client_session->set_last_bound_server_session(last_session);
client_session->set_need_return_last_bound_ss(true);
}
if (OB_SUCC(ret)) {
......@@ -653,22 +654,18 @@ void ObMysqlTransact::handle_oceanbase_request(ObTransState &s)
}
} else {
// move last_bound_ss to bound_ss
if (client_session->is_need_return_last_bound_ss()) {
if (NULL != last_bound_session) {
ObMysqlServerSession *last_session = client_session->get_server_session();
// if bound_ss not NULL, return to connection pool
if (NULL != last_session) {
last_session->release();
client_session->attach_server_session(NULL);
if (OB_FAIL(return_last_bound_server_session(client_session))) {
LOG_WARN("fail to return last bond server session", K(ret));
}
// need set cur_server_session, because will check session != cur_ss_ in attach_server_session func
client_session->set_cur_server_session(last_bound_session);
if (OB_FAIL(client_session->attach_server_session(last_bound_session))) {
LOG_WARN("client session failed to attach server session", K(ret));
} else {
client_session->set_last_bound_server_session(NULL);
LOG_WARN("[ObMysqlTransact::handle request] last bound session is NULL, we have to disconnect");
ret = OB_ERR_UNEXPECTED;
}
}
if (OB_SUCC(ret)) {
if (!client_session->is_in_trans_for_close_request()) {
s.sm_->trans_state_.current_.state_ = ObMysqlTransact::TRANSACTION_COMPLETE;
} else {
......@@ -677,6 +674,9 @@ void ObMysqlTransact::handle_oceanbase_request(ObTransState &s)
// if close all, move to internal request and clear session cache
TRANSACT_RETURN(SM_ACTION_INTERNAL_REQUEST, handle_internal_request);
} else {
TRANSACT_RETURN(SM_ACTION_SEND_ERROR_NOOP, NULL);
}
}
}
} else if (s.need_pl_lookup_) {
......@@ -764,8 +764,28 @@ void ObMysqlTransact::handle_oceanbase_request(ObTransState &s)
ObMysqlServerSession *last_session = s.sm_->client_session_->get_server_session();
if (OB_LIKELY(NULL != last_session)) {
if (obmysql::OB_MYSQL_COM_STMT_FETCH == s.trans_info_.sql_cmd_
|| obmysql::OB_MYSQL_COM_STMT_GET_PIECE_DATA == s.trans_info_.sql_cmd_) {
if (obmysql::OB_MYSQL_COM_STMT_FETCH == s.trans_info_.sql_cmd_) {
ObCursorIdAddr *cursor_id_addr = NULL;
if (OB_FAIL(s.sm_->get_client_session()->get_session_info().get_cursor_id_addr(cursor_id_addr))) {
LOG_WARN("fail to get client cursor id addr", K(ret));
if (OB_HASH_NOT_EXIST == ret) {
handle_fetch_request(s);
} else {
TRANSACT_RETURN(SM_ACTION_SEND_ERROR_NOOP, NULL);
}
} else {
if (OB_UNLIKELY(cursor_id_addr->get_addr() != last_session->get_netvc()->get_remote_addr())) {
ObMysqlClientSession *client_session = s.sm_->get_client_session();
client_session->attach_server_session(NULL);
last_session->do_io_read(client_session, 0, NULL);
client_session->set_last_bound_server_session(last_session);
client_session->set_need_return_last_bound_ss(true);
}
s.server_info_.set_addr(cursor_id_addr->get_addr());
s.pll_info_.lookup_success_ = true;
}
} else if (obmysql::COM_STMT_GET_PIECE_DATA == s.trans_info_.sql_cmd_) {
ObCursorIdAddr *cursor_id_addr = NULL;
if (OB_FAIL(s.sm_->get_client_session()->get_session_info().get_cursor_id_addr(cursor_id_addr))) {
LOG_WARN("fail to get client cursor id addr", K(ret));
......@@ -825,11 +845,17 @@ void ObMysqlTransact::handle_oceanbase_request(ObTransState &s)
s.current_.state_ = ObMysqlTransact::INTERNAL_ERROR;
TRANSACT_RETURN(SM_ACTION_INTERNAL_NOOP, NULL);
}
}
if (OB_SUCC(ret)) {
s.server_info_.set_addr(last_session->get_netvc()->get_remote_addr());
s.pll_info_.lookup_success_ = true;
}
} else {
s.server_info_.set_addr(last_session->get_netvc()->get_remote_addr());
s.pll_info_.lookup_success_ = true;
}
if (OB_SUCC(ret)) {
start_access_control(s);
}
} else {
......@@ -869,6 +895,32 @@ void ObMysqlTransact::handle_fetch_request(ObTransState &s)
TRANSACT_RETURN(SM_ACTION_SEND_ERROR_NOOP, NULL);
}
int ObMysqlTransact::return_last_bound_server_session(ObMysqlClientSession *client_session)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(client_session)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid param, client session is NULL", K(ret));
} else {
ObMysqlServerSession *last_session = client_session->get_server_session();
ObMysqlServerSession *last_bound_session = client_session->get_last_bound_server_session();
if (NULL != last_session) {
last_session->release();
client_session->attach_server_session(NULL);
}
client_session->set_cur_server_session(last_bound_session);
if (OB_FAIL(client_session->attach_server_session(last_bound_session))) {
LOG_WARN("fail to attach server session", K(ret));
} else {
client_session->set_last_bound_server_session(NULL);
client_session->set_need_return_last_bound_ss(false);
}
}
return ret;
}
void ObMysqlTransact::handle_request(ObTransState &s)
{
s.sm_->trans_stats_.client_requests_ += 1;
......
......@@ -827,6 +827,7 @@ public:
DISALLOW_COPY_AND_ASSIGN(ObTransState);
}; // End of State struct.
static int return_last_bound_server_session(ObMysqlClientSession *client_session);
static void modify_request(ObTransState &s);
static bool is_sequence_request(ObTransState &s);
static void handle_mysql_request(ObTransState &s);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册