提交 c5b33f71 编写于 作者: L LiuYoung00 提交者: wangzelin.wzl

support send long data protocol

上级 21c25a2b
...@@ -20,6 +20,8 @@ ob_set_subtarget(ob_server mysql ...@@ -20,6 +20,8 @@ ob_set_subtarget(ob_server mysql
mysql/obmp_stmt_close.cpp mysql/obmp_stmt_close.cpp
mysql/obmp_stmt_execute.cpp mysql/obmp_stmt_execute.cpp
mysql/obmp_stmt_prepare.cpp mysql/obmp_stmt_prepare.cpp
mysql/obmp_stmt_send_long_data.cpp
mysql/obmp_stmt_send_long_data.h
mysql/obmp_utils.cpp mysql/obmp_utils.cpp
mysql/obsm_handler.cpp mysql/obsm_handler.cpp
mysql/obsm_row.cpp mysql/obsm_row.cpp
......
...@@ -41,6 +41,7 @@ ...@@ -41,6 +41,7 @@
#include "observer/mysql/ob_sync_cmd_driver.h" #include "observer/mysql/ob_sync_cmd_driver.h"
#include "observer/mysql/ob_async_cmd_driver.h" #include "observer/mysql/ob_async_cmd_driver.h"
#include "observer/mysql/ob_async_plan_driver.h" #include "observer/mysql/ob_async_plan_driver.h"
#include "observer/mysql/obmp_stmt_send_long_data.h"
#include "observer/ob_req_time_service.h" #include "observer/ob_req_time_service.h"
namespace oceanbase { namespace oceanbase {
...@@ -68,7 +69,8 @@ ObMPStmtExecute::ObMPStmtExecute(const ObGlobalContext& gctx) ...@@ -68,7 +69,8 @@ ObMPStmtExecute::ObMPStmtExecute(const ObGlobalContext& gctx)
is_cursor_readonly_(false), is_cursor_readonly_(false),
single_process_timestamp_(0), single_process_timestamp_(0),
exec_start_timestamp_(0), exec_start_timestamp_(0),
exec_end_timestamp_(0) exec_end_timestamp_(0),
params_num_(0)
{ {
ctx_.exec_type_ = MpQuery; ctx_.exec_type_ = MpQuery;
} }
...@@ -416,12 +418,12 @@ int ObMPStmtExecute::before_process() ...@@ -416,12 +418,12 @@ int ObMPStmtExecute::before_process()
ObSQLSessionInfo* old_sess_info = ctx_.session_info_; ObSQLSessionInfo* old_sess_info = ctx_.session_info_;
ctx_.schema_guard_ = &schema_guard; ctx_.schema_guard_ = &schema_guard;
ctx_.session_info_ = session; ctx_.session_info_ = session;
const int64_t num_of_params = ps_session_info->get_param_count(); const int64_t params_num_ = ps_session_info->get_param_count();
stmt_type_ = ps_session_info->get_stmt_type(); stmt_type_ = ps_session_info->get_stmt_type();
int8_t new_param_bound_flag = 0; int8_t new_param_bound_flag = 0;
if (num_of_params > 0) { if (params_num_ > 0) {
// Step1: handle bitmap // Step1: handle bitmap
int64_t bitmap_types = (num_of_params + 7) / 8; int64_t bitmap_types = (params_num_ + 7) / 8;
const char* bitmap = pos; const char* bitmap = pos;
pos += bitmap_types; pos += bitmap_types;
// Step2: get new_param_bound_flag // Step2: get new_param_bound_flag
...@@ -434,18 +436,18 @@ int ObMPStmtExecute::before_process() ...@@ -434,18 +436,18 @@ int ObMPStmtExecute::before_process()
} }
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
// do nothing // do nothing
} else if (OB_FAIL(param_type_infos.prepare_allocate(num_of_params))) { } else if (OB_FAIL(param_type_infos.prepare_allocate(params_num_))) {
LOG_WARN("array prepare allocate failed", K(ret)); LOG_WARN("array prepare allocate failed", K(ret));
} else if (OB_FAIL(params_->prepare_allocate(num_of_params))) { } else if (OB_FAIL(params_->prepare_allocate(params_num_))) {
LOG_WARN("array prepare allocate failed", K(ret)); LOG_WARN("array prepare allocate failed", K(ret));
} else if (OB_FAIL(param_cast_infos.prepare_allocate(num_of_params))) { } else if (OB_FAIL(param_cast_infos.prepare_allocate(params_num_))) {
LOG_WARN("array prepare allocate failed", K(ret)); LOG_WARN("array prepare allocate failed", K(ret));
} else if (is_arraybinding_) { } else if (is_arraybinding_) {
CK(OB_NOT_NULL(arraybinding_params_)); CK(OB_NOT_NULL(arraybinding_params_));
OZ(arraybinding_params_->prepare_allocate(num_of_params)); OZ(arraybinding_params_->prepare_allocate(params_num_));
} }
// Step3: get type // Step3: get type
for (int i = 0; OB_SUCC(ret) && i < num_of_params; ++i) { for (int i = 0; OB_SUCC(ret) && i < params_num_; ++i) {
uint8_t type = 0; uint8_t type = 0;
int8_t flag = 0; int8_t flag = 0;
if (1 == new_param_bound_flag) { if (1 == new_param_bound_flag) {
...@@ -455,9 +457,9 @@ int ObMPStmtExecute::before_process() ...@@ -455,9 +457,9 @@ int ObMPStmtExecute::before_process()
LOG_WARN("push back field failed", K(ret)); LOG_WARN("push back field failed", K(ret));
} }
} else { } else {
if (num_of_params != param_types.count()) { if (params_num_ != param_types.count()) {
ret = OB_ERR_WRONG_DYNAMIC_PARAM; ret = OB_ERR_WRONG_DYNAMIC_PARAM;
LOG_USER_ERROR(OB_ERR_WRONG_DYNAMIC_PARAM, param_types.count(), num_of_params); LOG_USER_ERROR(OB_ERR_WRONG_DYNAMIC_PARAM, param_types.count(), params_num_);
} else { } else {
type = static_cast<uint8_t>(param_types.at(i)); type = static_cast<uint8_t>(param_types.at(i));
} }
...@@ -496,7 +498,7 @@ int ObMPStmtExecute::before_process() ...@@ -496,7 +498,7 @@ int ObMPStmtExecute::before_process()
} }
// Step5: decode value // Step5: decode value
const char* params = pos; const char* params = pos;
for (int64_t i = 0; OB_SUCC(ret) && i < num_of_params; ++i) { for (int64_t i = 0; OB_SUCC(ret) && i < params_num_; ++i) {
ObObjParam& param = is_arraybinding_ ? arraybinding_params_->at(i) : params_->at(i); ObObjParam& param = is_arraybinding_ ? arraybinding_params_->at(i) : params_->at(i);
ObObjType ob_type; ObObjType ob_type;
if (OB_FAIL(ObSMUtils::get_ob_type(ob_type, static_cast<EMySQLFieldType>(param_types.at(i))))) { if (OB_FAIL(ObSMUtils::get_ob_type(ob_type, static_cast<EMySQLFieldType>(param_types.at(i))))) {
...@@ -516,7 +518,8 @@ int ObMPStmtExecute::before_process() ...@@ -516,7 +518,8 @@ int ObMPStmtExecute::before_process()
session->get_timezone_info(), session->get_timezone_info(),
&(param_type_infos.at(i)), &(param_type_infos.at(i)),
param_cast_infos.at(i) ? &(dst_type_infos.at(i)) : NULL, param_cast_infos.at(i) ? &(dst_type_infos.at(i)) : NULL,
param))) { param,
i))) {
LOG_WARN("get param value failed", K(param), K(i)); LOG_WARN("get param value failed", K(param), K(i));
} else { } else {
LOG_TRACE("execute with param", K(param), K(i)); LOG_TRACE("execute with param", K(param), K(i));
...@@ -1065,6 +1068,24 @@ int ObMPStmtExecute::process() ...@@ -1065,6 +1068,24 @@ int ObMPStmtExecute::process()
} }
session.check_and_reset_retry_info(*cur_trace_id, THIS_WORKER.need_retry()); session.check_and_reset_retry_info(*cur_trace_id, THIS_WORKER.need_retry());
} }
// whether the previous error was reported, a cleanup is to be done here
if (NULL != sess) {
ObPieceCache *piece_cache = static_cast<ObPieceCache *>(sess->get_piece_cache());
if (OB_ISNULL(piece_cache)) {
// do nothing
// piece_cache not be null in piece data protocol
} else {
for (uint64_t i = 0; OB_SUCC(ret) && i < params_num_; i++) {
if (OB_FAIL(piece_cache->remove_piece(piece_cache->get_piece_key(stmt_id_, i), *sess))) {
if (OB_HASH_NOT_EXIST == ret) {
ret = OB_SUCCESS;
} else {
LOG_WARN("remove piece fail", K(stmt_id_), K(i), K(ret));
}
}
}
}
}
if (OB_FAIL(ret) && need_response_error && conn_valid_) { if (OB_FAIL(ret) && need_response_error && conn_valid_) {
send_error_packet(ret, NULL); send_error_packet(ret, NULL);
...@@ -1319,19 +1340,60 @@ int ObMPStmtExecute::parse_basic_param_value(ObIAllocator& allocator, const uint ...@@ -1319,19 +1340,60 @@ int ObMPStmtExecute::parse_basic_param_value(ObIAllocator& allocator, const uint
int ObMPStmtExecute::parse_param_value(ObIAllocator& allocator, const uint32_t type, const ObCharsetType charset, int ObMPStmtExecute::parse_param_value(ObIAllocator& allocator, const uint32_t type, const ObCharsetType charset,
const ObCollationType cs_type, const ObCollationType ncs_type, const char*& data, const ObCollationType cs_type, const ObCollationType ncs_type, const char*& data,
const common::ObTimeZoneInfo* tz_info, TypeInfo* type_info, TypeInfo* dst_type_info, ObObjParam& param) const common::ObTimeZoneInfo* tz_info, TypeInfo* type_info, TypeInfo* dst_type_info, ObObjParam& param, int16_t param_id)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
uint64_t length = 0;
common::ObFixedArray<ObSqlString, ObIAllocator>
str_buf(THIS_WORKER.get_sql_arena_allocator());
ObPieceCache *piece_cache = NULL == ctx_.session_info_
? NULL
: static_cast<ObPieceCache*>(ctx_.session_info_->get_piece_cache());
ObPiece *piece = NULL;
if (OB_UNLIKELY(MYSQL_TYPE_COMPLEX == type)) { if (OB_UNLIKELY(MYSQL_TYPE_COMPLEX == type)) {
ret = OB_NOT_SUPPORTED; ret = OB_NOT_SUPPORTED;
} else if (OB_UNLIKELY(MYSQL_TYPE_CURSOR == type)) { } else if (OB_UNLIKELY(MYSQL_TYPE_CURSOR == type)) {
ret = OB_NOT_SUPPORTED; ret = OB_NOT_SUPPORTED;
} else { } else if (OB_NOT_NULL(piece_cache) && OB_FAIL(piece_cache->get_piece(stmt_id_, param_id, piece))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get piece fail.", K(ret));
} else if (OB_ISNULL(piece_cache) || OB_ISNULL(piece)) {
// not (send long data) column
if (OB_FAIL(parse_basic_param_value(allocator, type, charset, cs_type, ncs_type, data, tz_info, param))) { if (OB_FAIL(parse_basic_param_value(allocator, type, charset, cs_type, ncs_type, data, tz_info, param))) {
LOG_WARN("failed to parse basic param value", K(ret), K(type_info), K(dst_type_info)); LOG_WARN("failed to parse basic param value", K(ret), K(type_info), K(dst_type_info));
} else { } else {
param.set_param_meta(); param.set_param_meta();
} }
} else if (!support_send_long_data(type)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("this type is not support send long data.", K(type), K(ret));
} else if (NULL == piece->get_allocator()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("piece allocator is null.", K(stmt_id_), K(param_id), K(ret));
} else {
ObSqlString str_buf;
if (OB_FAIL(piece_cache->get_buffer(stmt_id_,
param_id,
length,
str_buf))) {
LOG_WARN("piece get buffer fail.", K(ret), K(stmt_id_), K(param_id));
} else {
char *tmp = static_cast<char*>(piece->get_allocator()->alloc(length));
int64_t pos = 0;
MEMSET(tmp, 0, length);
if (OB_FAIL(ObMySQLUtil::store_obstr(tmp, length, str_buf.string(), pos))) {
LOG_WARN("store string fail.", K(ret), K(stmt_id_), K(param_id));
} else {
const char* src = tmp;
if (OB_FAIL(parse_basic_param_value(allocator, type, charset, cs_type, ncs_type,
src, tz_info, param))) {
LOG_WARN("failed to parse basic param value", K(ret));
} else {
param.set_param_meta();
}
}
piece->get_allocator()->free(tmp);
}
} }
return ret; return ret;
} }
......
...@@ -71,6 +71,43 @@ public: ...@@ -71,6 +71,43 @@ public:
{ {
return ObMPBase::flush_buffer(is_last); return ObMPBase::flush_buffer(is_last);
} }
inline bool support_send_long_data(const uint32_t type)
{
bool is_support = false;
switch (type) {
case obmysql::MYSQL_TYPE_OB_NVARCHAR2:
case obmysql::MYSQL_TYPE_OB_NCHAR:
case obmysql::MYSQL_TYPE_OB_RAW:
case obmysql::MYSQL_TYPE_TINY_BLOB:
case obmysql::MYSQL_TYPE_MEDIUM_BLOB:
case obmysql::MYSQL_TYPE_LONG_BLOB:
case obmysql::MYSQL_TYPE_BLOB:
case obmysql::MYSQL_TYPE_STRING:
case obmysql::MYSQL_TYPE_VARCHAR:
case obmysql::MYSQL_TYPE_VAR_STRING:
case obmysql::MYSQL_TYPE_OB_NUMBER_FLOAT:
case obmysql::MYSQL_TYPE_NEWDECIMAL:
case obmysql::MYSQL_TYPE_OB_UROWID:
case obmysql::MYSQL_TYPE_ORA_BLOB:
case obmysql::MYSQL_TYPE_ORA_CLOB:
is_support = true;
break;
case obmysql::MYSQL_TYPE_COMPLEX:
is_support = share::is_oracle_mode() ? true : false;
break;
default:
is_support = false;
}
return is_support;
}
inline int32_t get_param_num()
{
return params_num_;
}
inline void set_param_num(int32_t num)
{
params_num_ = num;
}
protected: protected:
virtual int deserialize() override virtual int deserialize() override
...@@ -150,7 +187,8 @@ private: ...@@ -150,7 +187,8 @@ private:
// in oracle: %cs_type is server collation whose charset may differ with %charset // in oracle: %cs_type is server collation whose charset may differ with %charset
int parse_param_value(ObIAllocator& allocator, const uint32_t type, const ObCharsetType charset, int parse_param_value(ObIAllocator& allocator, const uint32_t type, const ObCharsetType charset,
const ObCollationType cs_type, const ObCollationType ncs_type, const char*& data, const ObCollationType cs_type, const ObCollationType ncs_type, const char*& data,
const common::ObTimeZoneInfo* tz_info, sql::TypeInfo* type_info, sql::TypeInfo* dst_type_info, ObObjParam& param); const common::ObTimeZoneInfo* tz_info, sql::TypeInfo* type_info, sql::TypeInfo* dst_type_info,
ObObjParam& param, int16_t param_id);
int decode_type_info(const char*& buf, sql::TypeInfo& type_info); int decode_type_info(const char*& buf, sql::TypeInfo& type_info);
virtual int before_response() override virtual int before_response() override
...@@ -186,6 +224,7 @@ private: ...@@ -186,6 +224,7 @@ private:
int64_t single_process_timestamp_; int64_t single_process_timestamp_;
int64_t exec_start_timestamp_; int64_t exec_start_timestamp_;
int64_t exec_end_timestamp_; int64_t exec_end_timestamp_;
uint64_t params_num_;
private: private:
DISALLOW_COPY_AND_ASSIGN(ObMPStmtExecute); DISALLOW_COPY_AND_ASSIGN(ObMPStmtExecute);
......
此差异已折叠。
/*
* Copyright (c) 2021 Ant Group CO., Ltd.
* OceanBase is licensed under Mulan PubL v1.
* You can use this software according to the terms and conditions of the Mulan PubL v1.
* You may obtain a copy of Mulan PubL v1 at:
* http://license.coscl.org.cn/MulanPubL-1.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v1 for more details.
*
* Version: $Id: obmp_stmt_send_long_data.h 19/08/2021 13:22:22 PM
*
* Authors:
* hualong <adou.ly@alibaba-inc.com>
*/
#ifndef OCEANBASE_OBSERVER_MYSQL_OBMP_STMT_SEND_LONG_DATA_H_
#define OCEANBASE_OBSERVER_MYSQL_OBMP_STMT_SEND_LONG_DATA_H_
#include "sql/ob_sql_context.h"
#include "observer/mysql/obmp_base.h"
#include "observer/mysql/ob_query_retry_ctrl.h"
#include "lib/rc/context.h"
namespace oceanbase {
namespace sql {
class ObMultiStmtItem;
}
namespace observer {
struct ObGlobalContext;
class ObMPStmtSendLongData : public ObMPBase {
public:
static const obmysql::ObMySQLCmd COM = obmysql::OB_MYSQL_COM_STMT_SEND_LONG_DATA;
explicit ObMPStmtSendLongData(const ObGlobalContext &gctx);
virtual ~ObMPStmtSendLongData()
{}
int64_t get_single_process_timestamp() const
{
return single_process_timestamp_;
}
int64_t get_exec_start_timestamp() const
{
return exec_start_timestamp_;
}
int64_t get_exec_end_timestamp() const
{
return exec_end_timestamp_;
}
int64_t get_send_timestamp() const
{
return get_receive_timestamp();
}
protected:
virtual int deserialize()
{
return common::OB_SUCCESS;
}
virtual int before_process() override;
virtual int process();
virtual int send_error_packet(int err, const char *errmsg, bool is_partition_hit = true, void *extra_err_info = NULL)
{
return ObMPBase::send_error_packet(err, errmsg, is_partition_hit, extra_err_info);
}
virtual int send_ok_packet(sql::ObSQLSessionInfo &session, ObOKPParam &ok_param)
{
return ObMPBase::send_ok_packet(session, ok_param);
}
virtual int send_eof_packet(const sql::ObSQLSessionInfo &session, const ObMySQLResultSet &result)
{
return ObMPBase::send_eof_packet(session, result);
}
virtual int response_packet(obmysql::ObMySQLPacket &pkt)
{
return ObMPBase::response_packet(pkt);
}
virtual bool need_send_extra_ok_packet()
{
return OB_NOT_NULL(get_conn()) && get_conn()->need_send_extra_ok_packet();
}
private:
int do_process(sql::ObSQLSessionInfo &session);
int response_result(sql::ObSQLSessionInfo &session);
int process_send_long_data_stmt(sql::ObSQLSessionInfo &session);
int store_piece(sql::ObSQLSessionInfo &session);
private:
sql::ObSqlCtx ctx_;
int64_t single_process_timestamp_;
int64_t exec_start_timestamp_;
int64_t exec_end_timestamp_;
int32_t stmt_id_;
int16_t param_id_;
uint64_t buffer_len_;
common::ObString buffer_;
private:
DISALLOW_COPY_AND_ASSIGN(ObMPStmtSendLongData);
}; // end of class ObMPStmtSendLongData
enum ObPieceMode { ObInvalidPiece, ObFirstPiece, ObNextPiece, ObLastPiece };
class ObPieceBuffer {
public:
ObPieceBuffer() : mode_(ObInvalidPiece), is_null_(false), buffer_(), pos_(NULL), allocator_(NULL)
{}
ObPieceBuffer(ObIAllocator *allocator, ObPieceMode mode)
: mode_(mode), is_null_(false), buffer_(), pos_(NULL), allocator_(allocator)
{}
~ObPieceBuffer()
{
reset();
}
void reset()
{
mode_ = ObInvalidPiece;
if (NULL != allocator_) {
allocator_->free(&buffer_);
}
// free allocator by ObPiece
allocator_ = NULL;
}
void set_piece_mode(ObPieceMode mode)
{
mode_ = mode;
}
ObPieceMode get_piece_mode()
{
return mode_;
}
void set_null()
{
is_null_ = true;
}
bool is_null()
{
return is_null_;
}
bool is_last_piece()
{
return ObLastPiece == mode_;
}
int set_piece_buffer(ObString *buf)
{
int ret = OB_SUCCESS;
if (NULL != allocator_ && NULL != buf && NULL != buf->ptr()) {
if (OB_FAIL(ob_write_string(*allocator_, *buf, buffer_))) {
SQL_ENG_LOG(WARN, "failed to write piece buffer", K(ret), K(mode_));
} else {
pos_ = buffer_.ptr();
}
} else if (NULL == allocator_) {
ret = OB_ERR_UNEXPECTED;
SQL_ENG_LOG(WARN, "piece allocator is NULL", K(ret));
} else {
buffer_.assign(NULL, 0);
pos_ = NULL;
is_null_ = true;
}
SQL_ENG_LOG(DEBUG, "set_piece_buffer", K(ret), K(buffer_), K(NULL != buf ? *buf : NULL));
return ret;
}
ObString *get_piece_buffer()
{
return &buffer_;
}
char *&get_position()
{
return pos_;
}
int64_t to_string(char *buffer, int64_t length) const;
private:
ObPieceMode mode_;
bool is_null_;
ObString buffer_;
char *pos_;
ObIAllocator *allocator_;
};
#define OB_MAX_PIECE_COUNT 1024
typedef common::ObFixedArray<ObPieceBuffer, common::ObIAllocator> ObPieceBufferArray;
class ObPiece {
public:
ObPiece() : stmt_id_(0), param_id_(-1), pos_(0), buffer_array_(NULL), allocator_(NULL), is_null_map_()
{}
~ObPiece()
{
reset();
}
void reset()
{
if (NULL != buffer_array_) {
reset_buffer_array();
}
if (NULL != allocator_) {
allocator_->reset();
}
stmt_id_ = 0;
param_id_ = -1;
pos_ = 0;
}
void reset_buffer_array()
{
if (NULL != buffer_array_) {
for (uint64_t i = 0; i < buffer_array_->count(); i++) {
ObPieceBuffer piece_buffer = buffer_array_->at(i);
piece_buffer.~ObPieceBuffer();
allocator_->free(&piece_buffer);
}
}
}
void set_stmt_id(int32_t stmt_id)
{
stmt_id_ = stmt_id;
}
int32_t get_stmt_id()
{
return stmt_id_;
}
void set_param_id(int16_t param_id)
{
param_id_ = param_id;
}
int16_t get_param_id()
{
return param_id_;
}
void set_position(uint64_t pos)
{
pos_ = pos;
}
uint64_t get_position()
{
return pos_;
}
void add_position()
{
pos_++;
}
void set_allocator(ObIAllocator *alloc)
{
allocator_ = alloc;
}
ObIAllocator *get_allocator()
{
return allocator_;
}
common::ObBitSet<> &get_is_null_map()
{
return is_null_map_;
}
void get_is_null_map(char *map, int64_t count)
{
for (int64_t i = 0; i < count; i++) {
if (is_null_map_.has_member(i)) {
obmysql::ObMySQLUtil::update_null_bitmap(map, i);
}
}
}
ObPieceBufferArray *get_buffer_array()
{
return buffer_array_;
}
void set_buffer_array(ObPieceBufferArray *array)
{
buffer_array_ = array;
}
int piece_init(sql::ObSQLSessionInfo &session, int32_t stmt_id, int16_t param_id);
private:
int32_t stmt_id_;
int16_t param_id_;
uint64_t pos_;
ObPieceBufferArray *buffer_array_;
ObIAllocator *allocator_;
common::ObBitSet<> is_null_map_;
}; // end of class ObPiece
class ObPieceCache {
public:
ObPieceCache() : mem_context_(nullptr), piece_map_()
{}
virtual ~ObPieceCache()
{
NULL != mem_context_ ? DESTROY_CONTEXT(mem_context_) : (void)(NULL);
}
int init(uint64_t tenant_id)
{
int ret = OB_SUCCESS;
if (OB_FAIL(ROOT_CONTEXT->CREATE_CONTEXT(
mem_context_, lib::ContextParam().set_mem_attr(tenant_id, ObModIds::OB_PL_TEMP)))) {
SQL_ENG_LOG(WARN, "create memory entity failed");
} else if (OB_ISNULL(mem_context_)) {
ret = OB_ERR_UNEXPECTED;
SQL_ENG_LOG(WARN, "null memory entity returned");
} else if (!piece_map_.created() &&
OB_FAIL(piece_map_.create(
common::hash::cal_next_prime(32), ObModIds::OB_HASH_BUCKET, ObModIds::OB_HASH_NODE))) {
SQL_ENG_LOG(WARN, "create sequence current value map failed", K(ret));
} else { /*do nothing*/
}
return ret;
}
int close_all(sql::ObSQLSessionInfo &session);
inline bool is_inited() const
{
return NULL != mem_context_;
}
void reset()
{
piece_map_.reuse();
if (NULL != mem_context_) {
DESTROY_CONTEXT(mem_context_);
mem_context_ = NULL;
}
}
// piece
int make_piece(int32_t stmt_id, int16_t param_id, ObPiece *&piece, sql::ObSQLSessionInfo &session);
int remove_piece(int64_t key, sql::ObSQLSessionInfo &session);
int add_piece(ObPiece *piece);
int get_piece(int32_t stmt_id, int16_t param_id, ObPiece *&piece);
// merge
int get_buffer(int32_t stmt_id, int16_t param_id, uint64_t &length, ObSqlString &str_buf);
inline int64_t get_piece_key(int32_t stmt_id, int16_t param_id)
{
return (((static_cast<int64_t>(stmt_id)) << 32) | param_id);
}
int add_piece_buffer(ObPiece *piece, ObPieceMode piece_mode, ObString *buf);
/* merge ObPieceBuffer.buffer_ into buf , and move & free this ObPieceBuffer from buffer_array_
* when ObPieceBuffer.is_last_piece()
* merge this ObPieceBuffer and finish merge
*/
int make_piece_buffer(ObIAllocator *allocator, ObPieceBuffer *&piece_buffer, ObPieceMode mode, ObString *buf);
int init_piece_cache(sql::ObSQLSessionInfo &session);
void close_piece(ObPiece *&piece, sql::ObSQLSessionInfo &session);
inline uint64_t get_length_length(uint64_t length)
{
// store_length
uint64_t len = 0;
if (length < (uint64_t)251) {
len = 1;
} else if (length < (uint64_t)0X10000) {
len = 3;
} else if (length < (uint64_t)0X1000000) {
len = 4;
} else if (length < UINT64_MAX) {
len = 9;
} else if (length == UINT64_MAX) {
len = 1;
}
return len;
}
public:
lib::MemoryContext mem_context_;
typedef common::hash::ObHashMap<int64_t, ObPiece *, common::hash::NoPthreadDefendMode> PieceMap;
PieceMap piece_map_;
};
} // end of namespace observer
} // end of namespace oceanbase
#endif // OCEANBASE_OBSERVER_MYSQL_OBMP_STMT_SEND_LONG_DATA_H_
...@@ -51,6 +51,7 @@ ...@@ -51,6 +51,7 @@
#include "observer/mysql/obmp_stmt_prepare.h" #include "observer/mysql/obmp_stmt_prepare.h"
#include "observer/mysql/obmp_stmt_execute.h" #include "observer/mysql/obmp_stmt_execute.h"
#include "observer/mysql/obmp_stmt_close.h" #include "observer/mysql/obmp_stmt_close.h"
#include "observer/mysql/obmp_stmt_send_long_data.h"
using namespace oceanbase::observer; using namespace oceanbase::observer;
using namespace oceanbase::lib; using namespace oceanbase::lib;
...@@ -183,6 +184,7 @@ int ObSrvMySQLXlator::translate(rpc::ObRequest& req, ObReqProcessor*& processor) ...@@ -183,6 +184,7 @@ int ObSrvMySQLXlator::translate(rpc::ObRequest& req, ObReqProcessor*& processor)
MYSQL_PROCESSOR(ObMPStmtPrepare, gctx_); MYSQL_PROCESSOR(ObMPStmtPrepare, gctx_);
MYSQL_PROCESSOR(ObMPStmtExecute, gctx_); MYSQL_PROCESSOR(ObMPStmtExecute, gctx_);
MYSQL_PROCESSOR(ObMPStmtClose, gctx_); MYSQL_PROCESSOR(ObMPStmtClose, gctx_);
MYSQL_PROCESSOR(ObMPStmtSendLongData, gctx_);
case obmysql::OB_MYSQL_COM_FIELD_LIST: { case obmysql::OB_MYSQL_COM_FIELD_LIST: {
ObSMConnection* conn = reinterpret_cast<ObSMConnection*>(req.get_ez_req()->ms->c->user_data); ObSMConnection* conn = reinterpret_cast<ObSMConnection*>(req.get_ez_req()->ms->c->user_data);
if (OB_ISNULL(conn)) { if (OB_ISNULL(conn)) {
......
...@@ -36,6 +36,7 @@ ...@@ -36,6 +36,7 @@
#include "sql/resolver/ddl/ob_drop_synonym_stmt.h" #include "sql/resolver/ddl/ob_drop_synonym_stmt.h"
#include "sql/engine/expr/ob_datum_cast.h" #include "sql/engine/expr/ob_datum_cast.h"
#include "lib/checksum/ob_crc64.h" #include "lib/checksum/ob_crc64.h"
#include "observer/mysql/obmp_stmt_send_long_data.h"
using namespace oceanbase::sql; using namespace oceanbase::sql;
using namespace oceanbase::common; using namespace oceanbase::common;
...@@ -144,7 +145,8 @@ ObSQLSessionInfo::ObSQLSessionInfo() ...@@ -144,7 +145,8 @@ ObSQLSessionInfo::ObSQLSessionInfo()
proxy_version_(0), proxy_version_(0),
min_proxy_version_ps_(0), min_proxy_version_ps_(0),
is_ignore_stmt_(false), is_ignore_stmt_(false),
got_conn_res_(false) got_conn_res_(false),
piece_cache_(NULL)
{} {}
ObSQLSessionInfo::~ObSQLSessionInfo() ObSQLSessionInfo::~ObSQLSessionInfo()
...@@ -333,6 +335,15 @@ void ObSQLSessionInfo::destroy(bool skip_sys_var) ...@@ -333,6 +335,15 @@ void ObSQLSessionInfo::destroy(bool skip_sys_var)
} }
} }
if (OB_SUCC(ret) && NULL != piece_cache_) {
if (OB_FAIL((static_cast<observer::ObPieceCache*>(piece_cache_))
->close_all(*this))) {
LOG_WARN("failed to close all piece", K(ret));
}
get_session_allocator().free(piece_cache_);
piece_cache_ = NULL;
}
reset(skip_sys_var); reset(skip_sys_var);
is_inited_ = false; is_inited_ = false;
} }
...@@ -1007,6 +1018,23 @@ int ObSQLSessionInfo::kill_query() ...@@ -1007,6 +1018,23 @@ int ObSQLSessionInfo::kill_query()
return OB_SUCCESS; return OB_SUCCESS;
} }
void* ObSQLSessionInfo::get_piece_cache(bool need_init) {
if (NULL == piece_cache_ && need_init) {
void *buf = get_session_allocator().alloc(sizeof(observer::ObPieceCache));
if (NULL != buf) {
MEMSET(buf, 0, sizeof(observer::ObPieceCache));
piece_cache_ = new (buf) observer::ObPieceCache();
if (OB_SUCCESS != (static_cast<observer::ObPieceCache*>(piece_cache_))->init(
get_effective_tenant_id())) {
get_session_allocator().free(piece_cache_);
piece_cache_ = NULL;
LOG_WARN("init piece cache fail");
}
}
}
return piece_cache_;
}
ObAuditRecordData& ObSQLSessionInfo::get_audit_record() ObAuditRecordData& ObSQLSessionInfo::get_audit_record()
{ {
audit_record_.try_cnt_++; audit_record_.try_cnt_++;
......
...@@ -656,6 +656,8 @@ public: ...@@ -656,6 +656,8 @@ public:
int on_user_connect(share::schema::ObSessionPrivInfo& priv_info, const share::schema::ObUserInfo* user_info); int on_user_connect(share::schema::ObSessionPrivInfo& priv_info, const share::schema::ObUserInfo* user_info);
int on_user_disconnect(); int on_user_disconnect();
void *get_piece_cache(bool need_init = false);
private: private:
int close_all_ps_stmt(); int close_all_ps_stmt();
...@@ -740,6 +742,7 @@ private: ...@@ -740,6 +742,7 @@ private:
// No matter whether apply for resource successfully, a session will call on_user_disconnect when disconnect. // No matter whether apply for resource successfully, a session will call on_user_disconnect when disconnect.
// While only session got connection resource can release connection resource and decrease connections count. // While only session got connection resource can release connection resource and decrease connections count.
bool got_conn_res_; bool got_conn_res_;
void *piece_cache_;
}; };
inline ObIExtraStatusCheck::Guard::Guard(ObSQLSessionInfo& session, ObIExtraStatusCheck& checker) inline ObIExtraStatusCheck::Guard::Guard(ObSQLSessionInfo& session, ObIExtraStatusCheck& checker)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册