From c5b33f71c2a466131c7b665c9eba84049d165226 Mon Sep 17 00:00:00 2001 From: LiuYoung00 Date: Mon, 30 Aug 2021 22:38:11 +0800 Subject: [PATCH] support send long data protocol --- src/observer/CMakeLists.txt | 2 + src/observer/mysql/obmp_stmt_execute.cpp | 92 ++- src/observer/mysql/obmp_stmt_execute.h | 41 +- .../mysql/obmp_stmt_send_long_data.cpp | 555 ++++++++++++++++++ src/observer/mysql/obmp_stmt_send_long_data.h | 375 ++++++++++++ src/observer/ob_srv_xlator.cpp | 2 + src/sql/session/ob_sql_session_info.cpp | 30 +- src/sql/session/ob_sql_session_info.h | 3 + 8 files changed, 1083 insertions(+), 17 deletions(-) create mode 100644 src/observer/mysql/obmp_stmt_send_long_data.cpp create mode 100644 src/observer/mysql/obmp_stmt_send_long_data.h diff --git a/src/observer/CMakeLists.txt b/src/observer/CMakeLists.txt index 6f7e45cfff..beba5f8584 100644 --- a/src/observer/CMakeLists.txt +++ b/src/observer/CMakeLists.txt @@ -20,6 +20,8 @@ ob_set_subtarget(ob_server mysql mysql/obmp_stmt_close.cpp mysql/obmp_stmt_execute.cpp mysql/obmp_stmt_prepare.cpp + mysql/obmp_stmt_send_long_data.cpp + mysql/obmp_stmt_send_long_data.h mysql/obmp_utils.cpp mysql/obsm_handler.cpp mysql/obsm_row.cpp diff --git a/src/observer/mysql/obmp_stmt_execute.cpp b/src/observer/mysql/obmp_stmt_execute.cpp index da2afef249..4748ca01b6 100644 --- a/src/observer/mysql/obmp_stmt_execute.cpp +++ b/src/observer/mysql/obmp_stmt_execute.cpp @@ -41,6 +41,7 @@ #include "observer/mysql/ob_sync_cmd_driver.h" #include "observer/mysql/ob_async_cmd_driver.h" #include "observer/mysql/ob_async_plan_driver.h" +#include "observer/mysql/obmp_stmt_send_long_data.h" #include "observer/ob_req_time_service.h" namespace oceanbase { @@ -68,7 +69,8 @@ ObMPStmtExecute::ObMPStmtExecute(const ObGlobalContext& gctx) is_cursor_readonly_(false), single_process_timestamp_(0), exec_start_timestamp_(0), - exec_end_timestamp_(0) + exec_end_timestamp_(0), + params_num_(0) { ctx_.exec_type_ = MpQuery; } @@ -416,12 +418,12 @@ int ObMPStmtExecute::before_process() ObSQLSessionInfo* old_sess_info = ctx_.session_info_; ctx_.schema_guard_ = &schema_guard; ctx_.session_info_ = session; - const int64_t num_of_params = ps_session_info->get_param_count(); + const int64_t params_num_ = ps_session_info->get_param_count(); stmt_type_ = ps_session_info->get_stmt_type(); int8_t new_param_bound_flag = 0; - if (num_of_params > 0) { + if (params_num_ > 0) { // Step1: handle bitmap - int64_t bitmap_types = (num_of_params + 7) / 8; + int64_t bitmap_types = (params_num_ + 7) / 8; const char* bitmap = pos; pos += bitmap_types; // Step2: get new_param_bound_flag @@ -434,18 +436,18 @@ int ObMPStmtExecute::before_process() } if (OB_FAIL(ret)) { // do nothing - } else if (OB_FAIL(param_type_infos.prepare_allocate(num_of_params))) { + } else if (OB_FAIL(param_type_infos.prepare_allocate(params_num_))) { LOG_WARN("array prepare allocate failed", K(ret)); - } else if (OB_FAIL(params_->prepare_allocate(num_of_params))) { + } else if (OB_FAIL(params_->prepare_allocate(params_num_))) { LOG_WARN("array prepare allocate failed", K(ret)); - } else if (OB_FAIL(param_cast_infos.prepare_allocate(num_of_params))) { + } else if (OB_FAIL(param_cast_infos.prepare_allocate(params_num_))) { LOG_WARN("array prepare allocate failed", K(ret)); } else if (is_arraybinding_) { CK(OB_NOT_NULL(arraybinding_params_)); - OZ(arraybinding_params_->prepare_allocate(num_of_params)); + OZ(arraybinding_params_->prepare_allocate(params_num_)); } // Step3: get type - for (int i = 0; OB_SUCC(ret) && i < num_of_params; ++i) { + for (int i = 0; OB_SUCC(ret) && i < params_num_; ++i) { uint8_t type = 0; int8_t flag = 0; if (1 == new_param_bound_flag) { @@ -455,9 +457,9 @@ int ObMPStmtExecute::before_process() LOG_WARN("push back field failed", K(ret)); } } else { - if (num_of_params != param_types.count()) { + if (params_num_ != param_types.count()) { ret = OB_ERR_WRONG_DYNAMIC_PARAM; - LOG_USER_ERROR(OB_ERR_WRONG_DYNAMIC_PARAM, param_types.count(), num_of_params); + LOG_USER_ERROR(OB_ERR_WRONG_DYNAMIC_PARAM, param_types.count(), params_num_); } else { type = static_cast(param_types.at(i)); } @@ -496,7 +498,7 @@ int ObMPStmtExecute::before_process() } // Step5: decode value const char* params = pos; - for (int64_t i = 0; OB_SUCC(ret) && i < num_of_params; ++i) { + for (int64_t i = 0; OB_SUCC(ret) && i < params_num_; ++i) { ObObjParam& param = is_arraybinding_ ? arraybinding_params_->at(i) : params_->at(i); ObObjType ob_type; if (OB_FAIL(ObSMUtils::get_ob_type(ob_type, static_cast(param_types.at(i))))) { @@ -516,7 +518,8 @@ int ObMPStmtExecute::before_process() session->get_timezone_info(), &(param_type_infos.at(i)), param_cast_infos.at(i) ? &(dst_type_infos.at(i)) : NULL, - param))) { + param, + i))) { LOG_WARN("get param value failed", K(param), K(i)); } else { LOG_TRACE("execute with param", K(param), K(i)); @@ -1065,6 +1068,24 @@ int ObMPStmtExecute::process() } session.check_and_reset_retry_info(*cur_trace_id, THIS_WORKER.need_retry()); } + // whether the previous error was reported, a cleanup is to be done here + if (NULL != sess) { + ObPieceCache *piece_cache = static_cast(sess->get_piece_cache()); + if (OB_ISNULL(piece_cache)) { + // do nothing + // piece_cache not be null in piece data protocol + } else { + for (uint64_t i = 0; OB_SUCC(ret) && i < params_num_; i++) { + if (OB_FAIL(piece_cache->remove_piece(piece_cache->get_piece_key(stmt_id_, i), *sess))) { + if (OB_HASH_NOT_EXIST == ret) { + ret = OB_SUCCESS; + } else { + LOG_WARN("remove piece fail", K(stmt_id_), K(i), K(ret)); + } + } + } + } + } if (OB_FAIL(ret) && need_response_error && conn_valid_) { send_error_packet(ret, NULL); @@ -1319,19 +1340,60 @@ int ObMPStmtExecute::parse_basic_param_value(ObIAllocator& allocator, const uint int ObMPStmtExecute::parse_param_value(ObIAllocator& allocator, const uint32_t type, const ObCharsetType charset, const ObCollationType cs_type, const ObCollationType ncs_type, const char*& data, - const common::ObTimeZoneInfo* tz_info, TypeInfo* type_info, TypeInfo* dst_type_info, ObObjParam& param) + const common::ObTimeZoneInfo* tz_info, TypeInfo* type_info, TypeInfo* dst_type_info, ObObjParam& param, int16_t param_id) { int ret = OB_SUCCESS; + uint64_t length = 0; + common::ObFixedArray + str_buf(THIS_WORKER.get_sql_arena_allocator()); + ObPieceCache *piece_cache = NULL == ctx_.session_info_ + ? NULL + : static_cast(ctx_.session_info_->get_piece_cache()); + ObPiece *piece = NULL; if (OB_UNLIKELY(MYSQL_TYPE_COMPLEX == type)) { ret = OB_NOT_SUPPORTED; } else if (OB_UNLIKELY(MYSQL_TYPE_CURSOR == type)) { ret = OB_NOT_SUPPORTED; - } else { + } else if (OB_NOT_NULL(piece_cache) && OB_FAIL(piece_cache->get_piece(stmt_id_, param_id, piece))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get piece fail.", K(ret)); + } else if (OB_ISNULL(piece_cache) || OB_ISNULL(piece)) { + // not (send long data) column if (OB_FAIL(parse_basic_param_value(allocator, type, charset, cs_type, ncs_type, data, tz_info, param))) { LOG_WARN("failed to parse basic param value", K(ret), K(type_info), K(dst_type_info)); } else { param.set_param_meta(); } + } else if (!support_send_long_data(type)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("this type is not support send long data.", K(type), K(ret)); + } else if (NULL == piece->get_allocator()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("piece allocator is null.", K(stmt_id_), K(param_id), K(ret)); + } else { + ObSqlString str_buf; + if (OB_FAIL(piece_cache->get_buffer(stmt_id_, + param_id, + length, + str_buf))) { + LOG_WARN("piece get buffer fail.", K(ret), K(stmt_id_), K(param_id)); + } else { + char *tmp = static_cast(piece->get_allocator()->alloc(length)); + int64_t pos = 0; + MEMSET(tmp, 0, length); + if (OB_FAIL(ObMySQLUtil::store_obstr(tmp, length, str_buf.string(), pos))) { + LOG_WARN("store string fail.", K(ret), K(stmt_id_), K(param_id)); + } else { + const char* src = tmp; + if (OB_FAIL(parse_basic_param_value(allocator, type, charset, cs_type, ncs_type, + src, tz_info, param))) { + LOG_WARN("failed to parse basic param value", K(ret)); + } else { + param.set_param_meta(); + } + } + piece->get_allocator()->free(tmp); + } } return ret; } diff --git a/src/observer/mysql/obmp_stmt_execute.h b/src/observer/mysql/obmp_stmt_execute.h index 42b321c37c..de86d0fd7c 100644 --- a/src/observer/mysql/obmp_stmt_execute.h +++ b/src/observer/mysql/obmp_stmt_execute.h @@ -71,6 +71,43 @@ public: { return ObMPBase::flush_buffer(is_last); } + inline bool support_send_long_data(const uint32_t type) + { + bool is_support = false; + switch (type) { + case obmysql::MYSQL_TYPE_OB_NVARCHAR2: + case obmysql::MYSQL_TYPE_OB_NCHAR: + case obmysql::MYSQL_TYPE_OB_RAW: + case obmysql::MYSQL_TYPE_TINY_BLOB: + case obmysql::MYSQL_TYPE_MEDIUM_BLOB: + case obmysql::MYSQL_TYPE_LONG_BLOB: + case obmysql::MYSQL_TYPE_BLOB: + case obmysql::MYSQL_TYPE_STRING: + case obmysql::MYSQL_TYPE_VARCHAR: + case obmysql::MYSQL_TYPE_VAR_STRING: + case obmysql::MYSQL_TYPE_OB_NUMBER_FLOAT: + case obmysql::MYSQL_TYPE_NEWDECIMAL: + case obmysql::MYSQL_TYPE_OB_UROWID: + case obmysql::MYSQL_TYPE_ORA_BLOB: + case obmysql::MYSQL_TYPE_ORA_CLOB: + is_support = true; + break; + case obmysql::MYSQL_TYPE_COMPLEX: + is_support = share::is_oracle_mode() ? true : false; + break; + default: + is_support = false; + } + return is_support; + } + inline int32_t get_param_num() + { + return params_num_; + } + inline void set_param_num(int32_t num) + { + params_num_ = num; + } protected: virtual int deserialize() override @@ -150,7 +187,8 @@ private: // in oracle: %cs_type is server collation whose charset may differ with %charset int parse_param_value(ObIAllocator& allocator, const uint32_t type, const ObCharsetType charset, const ObCollationType cs_type, const ObCollationType ncs_type, const char*& data, - const common::ObTimeZoneInfo* tz_info, sql::TypeInfo* type_info, sql::TypeInfo* dst_type_info, ObObjParam& param); + const common::ObTimeZoneInfo* tz_info, sql::TypeInfo* type_info, sql::TypeInfo* dst_type_info, + ObObjParam& param, int16_t param_id); int decode_type_info(const char*& buf, sql::TypeInfo& type_info); virtual int before_response() override @@ -186,6 +224,7 @@ private: int64_t single_process_timestamp_; int64_t exec_start_timestamp_; int64_t exec_end_timestamp_; + uint64_t params_num_; private: DISALLOW_COPY_AND_ASSIGN(ObMPStmtExecute); diff --git a/src/observer/mysql/obmp_stmt_send_long_data.cpp b/src/observer/mysql/obmp_stmt_send_long_data.cpp new file mode 100644 index 0000000000..ed67769a70 --- /dev/null +++ b/src/observer/mysql/obmp_stmt_send_long_data.cpp @@ -0,0 +1,555 @@ +/* + * Copyright (c) 2021 Ant Group CO., Ltd. + * OceanBase is licensed under Mulan PubL v1. + * You can use this software according to the terms and conditions of the Mulan PubL v1. + * You may obtain a copy of Mulan PubL v1 at: + * http://license.coscl.org.cn/MulanPubL-1.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v1 for more details. + * + * Version: $Id: obmp_stmt_send_long_data.cpp 01/03/2021 13:22:22 PM + * + * Authors: + * hualong + */ + +#define USING_LOG_PREFIX SERVER + +#include "observer/mysql/obmp_stmt_send_long_data.h" + +#include "share/ob_worker.h" +#include "lib/oblog/ob_log.h" +#include "lib/stat/ob_session_stat.h" +#include "rpc/ob_request.h" +#include "share/schema/ob_schema_getter_guard.h" +#include "sql/ob_sql_context.h" +#include "sql/session/ob_sql_session_info.h" +#include "sql/ob_sql.h" +#include "observer/ob_req_time_service.h" +#include "observer/omt/ob_tenant.h" +#include "observer/mysql/obsm_utils.h" + +namespace oceanbase { + +using namespace rpc; +using namespace common; +using namespace share; +using namespace obmysql; +using namespace sql; + +namespace observer { + +ObMPStmtSendLongData::ObMPStmtSendLongData(const ObGlobalContext &gctx) + : ObMPBase(gctx), + single_process_timestamp_(0), + exec_start_timestamp_(0), + exec_end_timestamp_(0), + stmt_id_(0), + param_id_(-1), + buffer_len_(0), + buffer_() +{ + ctx_.exec_type_ = MpQuery; +} + +/* + * request packet: + * 1 COM_STMT_SEND_LONG_DATA + * 4 stmt_id + * 2 param_id + * n data + */ +int ObMPStmtSendLongData::before_process() +{ + int ret = OB_SUCCESS; + if (OB_FAIL(ObMPBase::before_process())) { + LOG_WARN("failed to pre processing packet", K(ret)); + } else { + const ObMySQLRawPacket &pkt = reinterpret_cast(req_->get_packet()); + const char *pos = pkt.get_cdata(); + // stmt_id + ObMySQLUtil::get_int4(pos, stmt_id_); + ObMySQLUtil::get_int2(pos, param_id_); + if (stmt_id_ < 1 || param_id_ < 0) { + ret = OB_ERR_PARAM_INVALID; + LOG_WARN("send long data get error info.", K(stmt_id_), K(param_id_)); + } else { + buffer_len_ = pkt.get_clen() - 7; + buffer_.assign_ptr(pos, static_cast(buffer_len_)); + LOG_INFO("get info success in send long data protocol.", K(stmt_id_), K(param_id_), K(buffer_len_), K(buffer_)); + } + LOG_INFO("send long data get param", K(stmt_id_), K(param_id_), K(buffer_len_), K(buffer_.length()), K(buffer_)); + } + return ret; +} + +int ObMPStmtSendLongData::process() +{ + int ret = OB_SUCCESS; + ObSQLSessionInfo *sess = NULL; + bool need_response_error = true; + bool async_resp_used = false; // 由事务提交线程异步回复客户端 + int64_t query_timeout = 0; + ObSMConnection *conn = get_conn(); + + if (share::is_oracle_mode()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("send long data not support oracle mode. use send_piece_data instead.", K(ret)); + } else if (OB_ISNULL(req_) || OB_ISNULL(conn)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("req or conn is null", K_(req), K(conn), K(ret)); + } else if (OB_UNLIKELY(!conn->is_in_authed_phase())) { + ret = OB_ERR_NO_PRIVILEGE; + LOG_WARN("receive sql without session", K_(stmt_id), K_(param_id), K(ret)); + } else if (OB_ISNULL(conn->tenant_)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("invalid tenant", K_(stmt_id), K_(param_id), K(conn->tenant_), K(ret)); + } else if (OB_FAIL(get_session(sess))) { + LOG_WARN("get session fail", K_(stmt_id), K_(param_id), K(ret)); + } else if (OB_ISNULL(sess)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("session is NULL or invalid", K_(stmt_id), K_(param_id), K(sess), K(ret)); + } else if (OB_FAIL(update_transmission_checksum_flag(*sess))) { + LOG_WARN("update transmisson checksum flag failed", K(ret)); + } else { + ObSQLSessionInfo &session = *sess; + ObSQLSessionInfo::LockGuard lock_guard(session.get_query_lock()); + session.set_use_static_typing_engine(false); + int64_t tenant_version = 0; + int64_t sys_version = 0; + const ObMySQLRawPacket &pkt = reinterpret_cast(req_->get_packet()); + int64_t packet_len = pkt.get_clen(); + if (OB_UNLIKELY(!session.is_valid())) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("invalid session", K_(stmt_id), K_(param_id), K(ret)); + } else if (OB_UNLIKELY(session.is_zombie())) { + ret = OB_ERR_SESSION_INTERRUPTED; + LOG_WARN("session has been killed", + K(session.get_session_state()), + K_(stmt_id), + K_(param_id), + K(session.get_sessid()), + "proxy_sessid", + session.get_proxy_sessid(), + K(ret)); + } else if (OB_UNLIKELY(packet_len > session.get_max_packet_size())) { + ret = OB_ERR_NET_PACKET_TOO_LARGE; + LOG_WARN("packet too large than allowd for the session", K_(stmt_id), K_(param_id), K(ret)); + } else if (OB_FAIL(session.get_query_timeout(query_timeout))) { + LOG_WARN("fail to get query timeout", K_(stmt_id), K_(param_id), K(ret)); + } else if (OB_FAIL(gctx_.schema_service_->get_tenant_received_broadcast_version( + session.get_effective_tenant_id(), tenant_version))) { + LOG_WARN("fail get tenant broadcast version", K(ret)); + } else if (OB_FAIL(gctx_.schema_service_->get_tenant_received_broadcast_version(OB_SYS_TENANT_ID, sys_version))) { + LOG_WARN("fail get tenant broadcast version", K(ret)); + } else if (pkt.exist_trace_info() && + OB_FAIL(session.update_sys_variable(SYS_VAR_OB_TRACE_INFO, pkt.get_trace_info()))) { + LOG_WARN("fail to update trace info", K(ret)); + } else { + THIS_WORKER.set_timeout_ts(get_receive_timestamp() + query_timeout); + session.partition_hit().reset(); + if (OB_FAIL(process_send_long_data_stmt(session))) { + LOG_WARN("execute sql failed", K_(stmt_id), K_(param_id), K(ret)); + } + } + + if (OB_FAIL(ret)) { + send_error_packet(ret, NULL); + disconnect(); + LOG_WARN("disconnect connection when send long data", K(ret)); + } + + if (sess != NULL) { + revert_session(sess); // current ignore revert session ret + } + } + return ret; +} + +int ObMPStmtSendLongData::process_send_long_data_stmt(ObSQLSessionInfo &session) +{ + int ret = OB_SUCCESS; + bool need_response_error = true; + bool use_sess_trace = false; + int64_t tenant_version = 0; + int64_t sys_version = 0; + setup_wb(session); + + ObVirtualTableIteratorFactory vt_iter_factory(*gctx_.vt_iter_creator_); + ObSessionStatEstGuard stat_est_guard(get_conn()->tenant_->id(), session.get_sessid()); + const bool enable_trace_log = lib::is_trace_log_enabled(); + if (enable_trace_log) { + ObThreadLogLevelUtils::init(session.get_log_id_level_map()); + } + ret = do_process(session); + if (enable_trace_log) { + ObThreadLogLevelUtils::clear(); + } + + //对于tracelog的处理,不影响正常逻辑,错误码无须赋值给ret + int tmp_ret = OB_SUCCESS; + //清空WARNING BUFFER + tmp_ret = do_after_process(session, use_sess_trace, ctx_, false); + UNUSED(tmp_ret); + return ret; +} + +int ObMPStmtSendLongData::do_process(ObSQLSessionInfo &session) +{ + int ret = OB_SUCCESS; + ObAuditRecordData &audit_record = session.get_audit_record(); + const bool enable_perf_event = lib::is_diagnose_info_enabled(); + const bool enable_sql_audit = GCONF.enable_sql_audit && session.get_local_ob_enable_sql_audit(); + single_process_timestamp_ = ObTimeUtility::current_time(); + bool is_diagnostics_stmt = false; + + ObWaitEventStat total_wait_desc; + ObDiagnoseSessionInfo *di = ObDiagnoseSessionInfo::get_local_diagnose_info(); + { + ObMaxWaitGuard max_wait_guard(enable_perf_event ? &audit_record.exec_record_.max_wait_event_ : NULL, di); + ObTotalWaitGuard total_wait_guard(enable_perf_event ? &total_wait_desc : NULL, di); + if (enable_sql_audit) { + audit_record.exec_record_.record_start(di); + } + int64_t execution_id = 0; + ObString sql = "send long data"; + if (FALSE_IT(execution_id = gctx_.sql_engine_->get_execution_id())) { + // nothing to do + } else if (OB_FAIL(set_session_active( + sql, session, ObTimeUtil::current_time(), obmysql::ObMySQLCmd::OB_MYSQL_COM_STMT_SEND_LONG_DATA))) { + LOG_WARN("fail to set session active", K(ret)); + } else if (OB_FAIL(store_piece(session))) { + exec_start_timestamp_ = ObTimeUtility::current_time(); + } else { + //监控项统计开始 + if (enable_perf_event) { + exec_start_timestamp_ = ObTimeUtility::current_time(); + } + session.set_current_execution_id(execution_id); + session.set_last_trace_id(ObCurTraceId::get_trace_id()); + if (enable_perf_event) { + exec_end_timestamp_ = ObTimeUtility::current_time(); + if (lib::is_diagnose_info_enabled()) { + const int64_t time_cost = exec_end_timestamp_ - get_receive_timestamp(); + EVENT_INC(SQL_PS_PREPARE_COUNT); + EVENT_ADD(SQL_PS_PREPARE_TIME, time_cost); + } + } + if (enable_sql_audit) { + audit_record.exec_record_.record_end(di); + bool first_record = (0 == audit_record.try_cnt_); + ObExecStatUtils::record_exec_timestamp(*this, first_record, audit_record.exec_timestamp_); + } + } + } // diagnose end + + // store the warning message from the most recent statement in the current session + if (OB_SUCC(ret) && is_diagnostics_stmt) { + // if diagnostic stmt execute successfully, it dosen't clear the warning message + } else { + session.set_show_warnings_buf(ret); // TODO: 挪个地方性能会更好,减少部分wb拷贝 + } + + // set read_only + if (OB_SUCC(ret)) { + session.set_has_exec_write_stmt(false); + } else { + bool is_partition_hit = session.partition_hit().get_bool(); + int err = send_error_packet(ret, NULL, is_partition_hit); + if (OB_SUCCESS != err) { // 发送error包 + LOG_WARN("send error packet failed", K(ret), K(err)); + } + } + if (enable_sql_audit) { + audit_record.status_ = ret; + audit_record.client_addr_ = session.get_peer_addr(); + audit_record.user_client_addr_ = session.get_user_client_addr(); + audit_record.user_group_ = THIS_WORKER.get_group_id(); + audit_record.exec_record_.wait_time_end_ = total_wait_desc.time_waited_; + audit_record.exec_record_.wait_count_end_ = total_wait_desc.total_waits_; + audit_record.ps_stmt_id_ = stmt_id_; + audit_record.update_stage_stat(); + // TODO: 可以这么做么? + // ObSQLUtils::handle_audit_record(false, EXECUTE_PS_EXECUTE, + // session, ctx_); + } + + clear_wb_content(session); + return ret; +} + +int ObMPStmtSendLongData::store_piece(ObSQLSessionInfo &session) +{ + int ret = OB_SUCCESS; + ObPieceCache *piece_cache = static_cast(session.get_piece_cache(true)); + if (OB_ISNULL(piece_cache)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("piece cache is null.", K(ret), K(stmt_id_)); + } else { + ObPiece *piece = NULL; + if (OB_FAIL(piece_cache->get_piece(stmt_id_, param_id_, piece))) { + LOG_WARN("get piece fail", K(stmt_id_), K(param_id_), K(ret)); + } else if (NULL == piece) { + if (OB_FAIL(piece_cache->make_piece(stmt_id_, param_id_, piece, session))) { + LOG_WARN("make piece fail.", K(ret), K(stmt_id_)); + } + } + if (OB_SUCC(ret)) { + if (NULL == piece) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("piece is null.", K(ret), K(stmt_id_), K(param_id_)); + } else if (OB_FAIL(piece_cache->add_piece_buffer(piece, ObPieceMode::ObInvalidPiece, &buffer_))) { + LOG_WARN("add piece buffer fail.", K(ret), K(stmt_id_)); + } else { + // send long data do not response. + } + } + } + return ret; +} + + +int64_t ObPieceBuffer::to_string(char *buffer, int64_t len) const +{ + int64_t pos = 0; + databuff_printf(buffer, + len, + pos, + "piece_mode:%d", + //"buf:%.*s", + mode_ //, + /*buffer_->length(), buffer_->ptr()*/); + return pos; +} + +int ObPiece::piece_init(ObSQLSessionInfo &session, int32_t stmt_id, int16_t param_id) +{ + int ret = OB_SUCCESS; + set_stmt_id(stmt_id); + set_param_id(param_id); + lib::MemoryContext entity = NULL; + lib::ContextParam param; + param.set_mem_attr(session.get_effective_tenant_id(), ObModIds::OB_PL_TEMP, ObCtxIds::DEFAULT_CTX_ID); + param.set_page_size(OB_MALLOC_BIG_BLOCK_SIZE); + if (OB_FAIL((static_cast(session.get_piece_cache()))->mem_context_->CREATE_CONTEXT(entity, param))) { + LOG_WARN("failed to create ref cursor entity", K(ret)); + } else if (OB_ISNULL(entity)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to alloc ref cursor entity", K(ret)); + } else { + void *buf = NULL; + ObPieceBufferArray *buf_array = NULL; + ObIAllocator *alloc = &entity->get_arena_allocator(); + OV(OB_NOT_NULL(buf = alloc->alloc(sizeof(ObPieceBufferArray))), + OB_ALLOCATE_MEMORY_FAILED, + sizeof(ObPieceBufferArray)); + OX(MEMSET(buf, 0, sizeof(ObPieceBufferArray))); + OV(OB_NOT_NULL(buf_array = new (buf) ObPieceBufferArray(alloc))); + OZ(buf_array->reserve(OB_MAX_PIECE_COUNT)); + if (OB_SUCC(ret)) { + set_allocator(alloc); + set_buffer_array(buf_array); + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("alloc buffer array fail.", K(ret), K(stmt_id), K(param_id)); + } + } + LOG_DEBUG("piece init.", K(ret), K(stmt_id), K(param_id)); + // The failure is handed over to the upper layer to release the memory space + return ret; +} + +int ObPieceCache::init_piece_cache(ObSQLSessionInfo &session) +{ + int ret = OB_SUCCESS; + if (!is_inited()) { + if (OB_FAIL(init(session.get_effective_tenant_id()))) { + LOG_WARN("piece_cache init fail", K(ret)); + } + } + LOG_DEBUG("init piece cache. ", K(session.get_effective_tenant_id())); + return ret; +} + +int ObPieceCache::make_piece(int32_t stmt_id, int16_t param_id, ObPiece *&piece, ObSQLSessionInfo &session) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(init_piece_cache(session))) { + LOG_WARN("piece_cache init fail", K(ret)); + } else { + void *buf = NULL; + OV(OB_NOT_NULL(buf = session.get_session_allocator().alloc(sizeof(ObPiece))), + OB_ALLOCATE_MEMORY_FAILED, + sizeof(ObPiece)); + OX(MEMSET(buf, 0, sizeof(ObPiece))); + OV(OB_NOT_NULL(piece = new (buf) ObPiece())); + if (OB_SUCC(ret)) { + if (OB_FAIL(piece->piece_init(session, stmt_id, param_id))) { + LOG_WARN("piece init fail.", K(ret), K(stmt_id), K(param_id)); + } else if (OB_FAIL(add_piece(piece))) { + LOG_WARN("add piece fail.", K(ret), K(stmt_id), K(param_id)); + } + if (OB_SUCCESS != ret) { + // clean up memory when failed. + piece->~ObPiece(); + session.get_session_allocator().free(piece); + piece = NULL; + } + } + } + LOG_DEBUG("make piece: ", K(ret), K(stmt_id), K(param_id), K(session.get_effective_tenant_id())); + return ret; +} + +int ObPieceCache::add_piece(ObPiece *piece) +{ + int ret = OB_SUCCESS; + int64_t key = get_piece_key(piece->get_stmt_id(), piece->get_param_id()); + if (OB_INVALID_ID == key) { + ret = OB_ERR_PARAM_INVALID; + LOG_WARN("piece key is invalid.", K(ret), K(key)); + } else if (OB_FAIL(piece_map_.set_refactored(key, piece))) { + LOG_WARN("fail insert ps id to hash map", K(key), K(ret)); + } + LOG_DEBUG("add piece: ", K(ret), K(key), K(piece->get_stmt_id()), K(piece->get_param_id())); + return ret; +} + +int ObPieceCache::remove_piece(int64_t key, ObSQLSessionInfo &session) +{ + int ret = OB_SUCCESS; + ObPiece *piece = NULL; + if (OB_FAIL(piece_map_.erase_refactored(key, &piece))) { + LOG_WARN("cursor info not exist", K(key)); + } else if (OB_ISNULL(piece)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("session_info is null", K(ret)); + } else { + close_piece(piece, session); + LOG_DEBUG("remove piece success.", K(key)); + } + return ret; +} + +void ObPieceCache::close_piece(ObPiece *&piece, ObSQLSessionInfo &session) +{ + if (NULL != piece) { + LOG_DEBUG("remove piece", K(piece->get_stmt_id()), K(piece->get_param_id())); + piece->~ObPiece(); + session.get_session_allocator().free(piece); + piece = NULL; + } +} + +int ObPieceCache::close_all(ObSQLSessionInfo &session) +{ + int ret = OB_SUCCESS; + if (is_inited()) { + for (PieceMap::iterator iter = piece_map_.begin(); // ignore ret + iter != piece_map_.end(); + ++iter) { + ObPiece *piece = iter->second; + int64_t key = get_piece_key(piece->get_stmt_id(), piece->get_param_id()); + if (OB_FAIL(remove_piece(key, session))) { + LOG_WARN("remove piece fail.", K(piece->get_stmt_id()), K(piece->get_param_id()), K(ret)); + } + } + } + return ret; +} + +int ObPieceCache::get_piece(int32_t stmt_id, int16_t param_id, ObPiece *&piece) +{ + int ret = OB_SUCCESS; + piece = NULL; + if (!is_inited()) { + LOG_DEBUG("piece_cache_ is not init.", K(stmt_id), K(param_id)); + // do nothing, do not init piece_cache_ here + } else { + if (OB_FAIL(piece_map_.get_refactored(get_piece_key(stmt_id, param_id), piece))) { + if (OB_HASH_NOT_EXIST == ret) { + ret = OB_SUCCESS; + } else { + LOG_WARN("get piece info failed", K(stmt_id), K(param_id)); + } + } + } + return ret; +} + +int ObPieceCache::get_buffer(int32_t stmt_id, int16_t param_id, uint64_t &length, ObSqlString &str_buf) +{ + int ret = OB_SUCCESS; + ObPiece *piece = NULL; + length = 0; + str_buf.reset(); + if (OB_FAIL(get_piece(stmt_id, param_id, piece))) { + LOG_WARN("get piece fail", K(stmt_id), K(param_id), K(ret)); + } else if (NULL == piece) { + ret = OB_ERR_PARAM_INVALID; + LOG_WARN("piece is null", K(stmt_id), K(ret)); + } else { + ObPieceBufferArray *buffer_array = piece->get_buffer_array(); + for (int64_t i = 0; OB_SUCC(ret) && i < buffer_array->count(); i++) { + ObPieceBuffer *piece_buffer = &buffer_array->at(i); + if (NULL != piece_buffer->get_piece_buffer()) { + const ObString buffer = *(piece_buffer->get_piece_buffer()); + if (OB_FAIL(str_buf.append(buffer))) { + LOG_WARN("append long data fail.", K(ret)); + } else { + } + } + } + length += get_length_length(str_buf.length()); + length += str_buf.length(); + } + LOG_DEBUG("get buffer.", K(ret), K(stmt_id), K(param_id), K(length)); + return ret; +} + +int ObPieceCache::make_piece_buffer( + ObIAllocator *allocator, ObPieceBuffer *&piece_buffer, ObPieceMode mode, ObString *buf) +{ + int ret = OB_SUCCESS; + void *piece_mem = NULL; + OV(OB_NOT_NULL(piece_mem = allocator->alloc(sizeof(ObPieceBuffer))), + OB_ALLOCATE_MEMORY_FAILED, + sizeof(ObPieceBuffer)); + OX(MEMSET(piece_mem, 0, sizeof(ObPieceBuffer))); + OV(OB_NOT_NULL(piece_buffer = new (piece_mem) ObPieceBuffer(allocator, mode))); + CK(OB_NOT_NULL(piece_buffer)); + OX(piece_buffer->set_piece_buffer(buf)); + LOG_DEBUG("make piece buffer.", K(ret), K(mode), K(buf->length())); + return ret; +} + +int ObPieceCache::add_piece_buffer(ObPiece *piece, ObPieceMode piece_mode, ObString *buf) +{ + int ret = OB_SUCCESS; + ObPieceBuffer *piece_buffer = NULL; + + if (OB_ISNULL(piece) || OB_ISNULL(piece->get_allocator())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("piece is null.", K(ret)); + } else if (OB_FAIL(make_piece_buffer(piece->get_allocator(), piece_buffer, piece_mode, buf))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("piece or piece_buffer is null when add piece buffer", K(ret), K(piece), K(piece_buffer)); + } else if (NULL == piece->get_buffer_array()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("buffer array is null.", K(ret), K(piece->get_stmt_id()), K(piece->get_param_id())); + } else { /* do nothing */ + } + if (OB_SUCC(ret) && OB_NOT_NULL(piece->get_buffer_array())) { + ObPieceBufferArray *buffer_array = piece->get_buffer_array(); + if (OB_FAIL(buffer_array->push_back(*piece_buffer))) { + LOG_WARN("push buffer array fail.", K(ret)); + } else { /* mysql do nothing */ } + } + LOG_DEBUG("add piece buffer.", K(ret), K(piece_mode)); + return ret; +} + +} // end of namespace observer +} // end of namespace oceanbase diff --git a/src/observer/mysql/obmp_stmt_send_long_data.h b/src/observer/mysql/obmp_stmt_send_long_data.h new file mode 100644 index 0000000000..eb76494c6e --- /dev/null +++ b/src/observer/mysql/obmp_stmt_send_long_data.h @@ -0,0 +1,375 @@ +/* + * Copyright (c) 2021 Ant Group CO., Ltd. + * OceanBase is licensed under Mulan PubL v1. + * You can use this software according to the terms and conditions of the Mulan PubL v1. + * You may obtain a copy of Mulan PubL v1 at: + * http://license.coscl.org.cn/MulanPubL-1.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v1 for more details. + * + * Version: $Id: obmp_stmt_send_long_data.h 19/08/2021 13:22:22 PM + * + * Authors: + * hualong + */ + +#ifndef OCEANBASE_OBSERVER_MYSQL_OBMP_STMT_SEND_LONG_DATA_H_ +#define OCEANBASE_OBSERVER_MYSQL_OBMP_STMT_SEND_LONG_DATA_H_ + +#include "sql/ob_sql_context.h" +#include "observer/mysql/obmp_base.h" +#include "observer/mysql/ob_query_retry_ctrl.h" +#include "lib/rc/context.h" + +namespace oceanbase { +namespace sql { +class ObMultiStmtItem; +} +namespace observer { + +struct ObGlobalContext; + +class ObMPStmtSendLongData : public ObMPBase { +public: + static const obmysql::ObMySQLCmd COM = obmysql::OB_MYSQL_COM_STMT_SEND_LONG_DATA; + + explicit ObMPStmtSendLongData(const ObGlobalContext &gctx); + virtual ~ObMPStmtSendLongData() + {} + int64_t get_single_process_timestamp() const + { + return single_process_timestamp_; + } + int64_t get_exec_start_timestamp() const + { + return exec_start_timestamp_; + } + int64_t get_exec_end_timestamp() const + { + return exec_end_timestamp_; + } + int64_t get_send_timestamp() const + { + return get_receive_timestamp(); + } + +protected: + virtual int deserialize() + { + return common::OB_SUCCESS; + } + virtual int before_process() override; + virtual int process(); + virtual int send_error_packet(int err, const char *errmsg, bool is_partition_hit = true, void *extra_err_info = NULL) + { + return ObMPBase::send_error_packet(err, errmsg, is_partition_hit, extra_err_info); + } + virtual int send_ok_packet(sql::ObSQLSessionInfo &session, ObOKPParam &ok_param) + { + return ObMPBase::send_ok_packet(session, ok_param); + } + virtual int send_eof_packet(const sql::ObSQLSessionInfo &session, const ObMySQLResultSet &result) + { + return ObMPBase::send_eof_packet(session, result); + } + virtual int response_packet(obmysql::ObMySQLPacket &pkt) + { + return ObMPBase::response_packet(pkt); + } + virtual bool need_send_extra_ok_packet() + { + return OB_NOT_NULL(get_conn()) && get_conn()->need_send_extra_ok_packet(); + } + +private: + int do_process(sql::ObSQLSessionInfo &session); + int response_result(sql::ObSQLSessionInfo &session); + + int process_send_long_data_stmt(sql::ObSQLSessionInfo &session); + int store_piece(sql::ObSQLSessionInfo &session); + +private: + sql::ObSqlCtx ctx_; + int64_t single_process_timestamp_; + int64_t exec_start_timestamp_; + int64_t exec_end_timestamp_; + int32_t stmt_id_; + int16_t param_id_; + uint64_t buffer_len_; + common::ObString buffer_; + +private: + DISALLOW_COPY_AND_ASSIGN(ObMPStmtSendLongData); + +}; // end of class ObMPStmtSendLongData + + +enum ObPieceMode { ObInvalidPiece, ObFirstPiece, ObNextPiece, ObLastPiece }; + +class ObPieceBuffer { +public: + ObPieceBuffer() : mode_(ObInvalidPiece), is_null_(false), buffer_(), pos_(NULL), allocator_(NULL) + {} + ObPieceBuffer(ObIAllocator *allocator, ObPieceMode mode) + : mode_(mode), is_null_(false), buffer_(), pos_(NULL), allocator_(allocator) + {} + ~ObPieceBuffer() + { + reset(); + } + + void reset() + { + mode_ = ObInvalidPiece; + if (NULL != allocator_) { + allocator_->free(&buffer_); + } + // free allocator by ObPiece + allocator_ = NULL; + } + void set_piece_mode(ObPieceMode mode) + { + mode_ = mode; + } + ObPieceMode get_piece_mode() + { + return mode_; + } + void set_null() + { + is_null_ = true; + } + bool is_null() + { + return is_null_; + } + bool is_last_piece() + { + return ObLastPiece == mode_; + } + int set_piece_buffer(ObString *buf) + { + int ret = OB_SUCCESS; + if (NULL != allocator_ && NULL != buf && NULL != buf->ptr()) { + if (OB_FAIL(ob_write_string(*allocator_, *buf, buffer_))) { + SQL_ENG_LOG(WARN, "failed to write piece buffer", K(ret), K(mode_)); + } else { + pos_ = buffer_.ptr(); + } + } else if (NULL == allocator_) { + ret = OB_ERR_UNEXPECTED; + SQL_ENG_LOG(WARN, "piece allocator is NULL", K(ret)); + } else { + buffer_.assign(NULL, 0); + pos_ = NULL; + is_null_ = true; + } + SQL_ENG_LOG(DEBUG, "set_piece_buffer", K(ret), K(buffer_), K(NULL != buf ? *buf : NULL)); + return ret; + } + ObString *get_piece_buffer() + { + return &buffer_; + } + char *&get_position() + { + return pos_; + } + int64_t to_string(char *buffer, int64_t length) const; + +private: + ObPieceMode mode_; + bool is_null_; + ObString buffer_; + char *pos_; + ObIAllocator *allocator_; +}; + +#define OB_MAX_PIECE_COUNT 1024 +typedef common::ObFixedArray ObPieceBufferArray; + +class ObPiece { +public: + ObPiece() : stmt_id_(0), param_id_(-1), pos_(0), buffer_array_(NULL), allocator_(NULL), is_null_map_() + {} + ~ObPiece() + { + reset(); + } + void reset() + { + if (NULL != buffer_array_) { + reset_buffer_array(); + } + if (NULL != allocator_) { + allocator_->reset(); + } + stmt_id_ = 0; + param_id_ = -1; + pos_ = 0; + } + void reset_buffer_array() + { + if (NULL != buffer_array_) { + for (uint64_t i = 0; i < buffer_array_->count(); i++) { + ObPieceBuffer piece_buffer = buffer_array_->at(i); + piece_buffer.~ObPieceBuffer(); + allocator_->free(&piece_buffer); + } + } + } + void set_stmt_id(int32_t stmt_id) + { + stmt_id_ = stmt_id; + } + int32_t get_stmt_id() + { + return stmt_id_; + } + void set_param_id(int16_t param_id) + { + param_id_ = param_id; + } + int16_t get_param_id() + { + return param_id_; + } + void set_position(uint64_t pos) + { + pos_ = pos; + } + uint64_t get_position() + { + return pos_; + } + void add_position() + { + pos_++; + } + void set_allocator(ObIAllocator *alloc) + { + allocator_ = alloc; + } + ObIAllocator *get_allocator() + { + return allocator_; + } + common::ObBitSet<> &get_is_null_map() + { + return is_null_map_; + } + void get_is_null_map(char *map, int64_t count) + { + for (int64_t i = 0; i < count; i++) { + if (is_null_map_.has_member(i)) { + obmysql::ObMySQLUtil::update_null_bitmap(map, i); + } + } + } + ObPieceBufferArray *get_buffer_array() + { + return buffer_array_; + } + void set_buffer_array(ObPieceBufferArray *array) + { + buffer_array_ = array; + } + int piece_init(sql::ObSQLSessionInfo &session, int32_t stmt_id, int16_t param_id); + +private: + int32_t stmt_id_; + int16_t param_id_; + uint64_t pos_; + ObPieceBufferArray *buffer_array_; + ObIAllocator *allocator_; + common::ObBitSet<> is_null_map_; +}; // end of class ObPiece + +class ObPieceCache { +public: + ObPieceCache() : mem_context_(nullptr), piece_map_() + {} + virtual ~ObPieceCache() + { + NULL != mem_context_ ? DESTROY_CONTEXT(mem_context_) : (void)(NULL); + } + int init(uint64_t tenant_id) + { + int ret = OB_SUCCESS; + if (OB_FAIL(ROOT_CONTEXT->CREATE_CONTEXT( + mem_context_, lib::ContextParam().set_mem_attr(tenant_id, ObModIds::OB_PL_TEMP)))) { + SQL_ENG_LOG(WARN, "create memory entity failed"); + } else if (OB_ISNULL(mem_context_)) { + ret = OB_ERR_UNEXPECTED; + SQL_ENG_LOG(WARN, "null memory entity returned"); + } else if (!piece_map_.created() && + OB_FAIL(piece_map_.create( + common::hash::cal_next_prime(32), ObModIds::OB_HASH_BUCKET, ObModIds::OB_HASH_NODE))) { + SQL_ENG_LOG(WARN, "create sequence current value map failed", K(ret)); + } else { /*do nothing*/ + } + return ret; + } + int close_all(sql::ObSQLSessionInfo &session); + inline bool is_inited() const + { + return NULL != mem_context_; + } + void reset() + { + piece_map_.reuse(); + if (NULL != mem_context_) { + DESTROY_CONTEXT(mem_context_); + mem_context_ = NULL; + } + } + // piece + int make_piece(int32_t stmt_id, int16_t param_id, ObPiece *&piece, sql::ObSQLSessionInfo &session); + int remove_piece(int64_t key, sql::ObSQLSessionInfo &session); + int add_piece(ObPiece *piece); + int get_piece(int32_t stmt_id, int16_t param_id, ObPiece *&piece); + // merge + int get_buffer(int32_t stmt_id, int16_t param_id, uint64_t &length, ObSqlString &str_buf); + inline int64_t get_piece_key(int32_t stmt_id, int16_t param_id) + { + return (((static_cast(stmt_id)) << 32) | param_id); + } + int add_piece_buffer(ObPiece *piece, ObPieceMode piece_mode, ObString *buf); + /* merge ObPieceBuffer.buffer_ into buf , and move & free this ObPieceBuffer from buffer_array_ + * when ObPieceBuffer.is_last_piece() + * merge this ObPieceBuffer and finish merge + */ + int make_piece_buffer(ObIAllocator *allocator, ObPieceBuffer *&piece_buffer, ObPieceMode mode, ObString *buf); + int init_piece_cache(sql::ObSQLSessionInfo &session); + void close_piece(ObPiece *&piece, sql::ObSQLSessionInfo &session); + inline uint64_t get_length_length(uint64_t length) + { + // store_length + uint64_t len = 0; + if (length < (uint64_t)251) { + len = 1; + } else if (length < (uint64_t)0X10000) { + len = 3; + } else if (length < (uint64_t)0X1000000) { + len = 4; + } else if (length < UINT64_MAX) { + len = 9; + } else if (length == UINT64_MAX) { + len = 1; + } + return len; + } + +public: + lib::MemoryContext mem_context_; + typedef common::hash::ObHashMap PieceMap; + PieceMap piece_map_; +}; + + +} // end of namespace observer +} // end of namespace oceanbase + +#endif // OCEANBASE_OBSERVER_MYSQL_OBMP_STMT_SEND_LONG_DATA_H_ diff --git a/src/observer/ob_srv_xlator.cpp b/src/observer/ob_srv_xlator.cpp index 13f6284b60..6befd2b543 100644 --- a/src/observer/ob_srv_xlator.cpp +++ b/src/observer/ob_srv_xlator.cpp @@ -51,6 +51,7 @@ #include "observer/mysql/obmp_stmt_prepare.h" #include "observer/mysql/obmp_stmt_execute.h" #include "observer/mysql/obmp_stmt_close.h" +#include "observer/mysql/obmp_stmt_send_long_data.h" using namespace oceanbase::observer; using namespace oceanbase::lib; @@ -183,6 +184,7 @@ int ObSrvMySQLXlator::translate(rpc::ObRequest& req, ObReqProcessor*& processor) MYSQL_PROCESSOR(ObMPStmtPrepare, gctx_); MYSQL_PROCESSOR(ObMPStmtExecute, gctx_); MYSQL_PROCESSOR(ObMPStmtClose, gctx_); + MYSQL_PROCESSOR(ObMPStmtSendLongData, gctx_); case obmysql::OB_MYSQL_COM_FIELD_LIST: { ObSMConnection* conn = reinterpret_cast(req.get_ez_req()->ms->c->user_data); if (OB_ISNULL(conn)) { diff --git a/src/sql/session/ob_sql_session_info.cpp b/src/sql/session/ob_sql_session_info.cpp index aa423b548f..d0da14e761 100644 --- a/src/sql/session/ob_sql_session_info.cpp +++ b/src/sql/session/ob_sql_session_info.cpp @@ -36,6 +36,7 @@ #include "sql/resolver/ddl/ob_drop_synonym_stmt.h" #include "sql/engine/expr/ob_datum_cast.h" #include "lib/checksum/ob_crc64.h" +#include "observer/mysql/obmp_stmt_send_long_data.h" using namespace oceanbase::sql; using namespace oceanbase::common; @@ -144,7 +145,8 @@ ObSQLSessionInfo::ObSQLSessionInfo() proxy_version_(0), min_proxy_version_ps_(0), is_ignore_stmt_(false), - got_conn_res_(false) + got_conn_res_(false), + piece_cache_(NULL) {} ObSQLSessionInfo::~ObSQLSessionInfo() @@ -333,6 +335,15 @@ void ObSQLSessionInfo::destroy(bool skip_sys_var) } } + if (OB_SUCC(ret) && NULL != piece_cache_) { + if (OB_FAIL((static_cast(piece_cache_)) + ->close_all(*this))) { + LOG_WARN("failed to close all piece", K(ret)); + } + get_session_allocator().free(piece_cache_); + piece_cache_ = NULL; + } + reset(skip_sys_var); is_inited_ = false; } @@ -1007,6 +1018,23 @@ int ObSQLSessionInfo::kill_query() return OB_SUCCESS; } +void* ObSQLSessionInfo::get_piece_cache(bool need_init) { + if (NULL == piece_cache_ && need_init) { + void *buf = get_session_allocator().alloc(sizeof(observer::ObPieceCache)); + if (NULL != buf) { + MEMSET(buf, 0, sizeof(observer::ObPieceCache)); + piece_cache_ = new (buf) observer::ObPieceCache(); + if (OB_SUCCESS != (static_cast(piece_cache_))->init( + get_effective_tenant_id())) { + get_session_allocator().free(piece_cache_); + piece_cache_ = NULL; + LOG_WARN("init piece cache fail"); + } + } + } + return piece_cache_; +} + ObAuditRecordData& ObSQLSessionInfo::get_audit_record() { audit_record_.try_cnt_++; diff --git a/src/sql/session/ob_sql_session_info.h b/src/sql/session/ob_sql_session_info.h index 417cdc3c88..6cfb08cbad 100644 --- a/src/sql/session/ob_sql_session_info.h +++ b/src/sql/session/ob_sql_session_info.h @@ -656,6 +656,8 @@ public: int on_user_connect(share::schema::ObSessionPrivInfo& priv_info, const share::schema::ObUserInfo* user_info); int on_user_disconnect(); + void *get_piece_cache(bool need_init = false); + private: int close_all_ps_stmt(); @@ -740,6 +742,7 @@ private: // No matter whether apply for resource successfully, a session will call on_user_disconnect when disconnect. // While only session got connection resource can release connection resource and decrease connections count. bool got_conn_res_; + void *piece_cache_; }; inline ObIExtraStatusCheck::Guard::Guard(ObSQLSessionInfo& session, ObIExtraStatusCheck& checker) -- GitLab