提交 b033cdfe 编写于 作者: W wgs13579 提交者: guangshu.wgs

support pieceInfo

上级 dee2edf6
......@@ -62,7 +62,8 @@ common_sources = \
ob_prepare_statement_struct.cpp \
ob_prepare_statement_struct.h \
ob_cursor_struct.cpp \
ob_cursor_struct.h
ob_cursor_struct.h \
ob_piece_info.h
libmysqlproxy_a_SOURCES = ${common_sources}
......
......@@ -32,4 +32,5 @@ obproxy/proxy/mysql/ob_mysql_vctable.h\
obproxy/proxy/mysql/ob_prepare_statement_struct.cpp\
obproxy/proxy/mysql/ob_prepare_statement_struct.h\
obproxy/proxy/mysql/ob_cursor_struct.cpp\
obproxy/proxy/mysql/ob_cursor_struct.h
obproxy/proxy/mysql/ob_cursor_struct.h\
obproxy/proxy/mysql/ob_piece_info.h
......@@ -1862,11 +1862,12 @@ void ObMysqlSM::analyze_mysql_request(ObMysqlAnalyzeStatus &status)
} else if (OB_FAIL(analyze_ps_prepare_request())) {
LOG_WARN("fail to analyze ps prepare request", K(ret));
}
} else if (OB_MYSQL_COM_STMT_EXECUTE == req_cmd) {
} else if (OB_MYSQL_COM_STMT_EXECUTE == req_cmd
|| OB_MYSQL_COM_STMT_SEND_PIECE_DATA == req_cmd) {
if (OB_FAIL(analyze_ps_execute_request())) {
LOG_WARN("fail to analyze ps execute request", K(ret));
LOG_WARN("fail to analyze ps execute request", K(ret), K(req_cmd));
}
} else if (OB_MYSQL_COM_STMT_FETCH == req_cmd) {
} else if (OB_MYSQL_COM_STMT_FETCH == req_cmd || OB_MYSQL_COM_STMT_GET_PIECE_DATA == req_cmd) {
if (OB_FAIL(analyze_fetch_request())) {
LOG_WARN("fail to analyze fetch request", K(ret));
}
......@@ -1883,7 +1884,8 @@ void ObMysqlSM::analyze_mysql_request(ObMysqlAnalyzeStatus &status)
}
} else if (ANALYZE_CONT == status) {
// large request means we have received enough packet(> request_buffer_len_)
if (OB_MYSQL_COM_STMT_EXECUTE == req_cmd && client_request.is_large_request()) {
if ((OB_MYSQL_COM_STMT_EXECUTE == req_cmd || OB_MYSQL_COM_STMT_SEND_PIECE_DATA == req_cmd)
&& client_request.is_large_request()) {
if (OB_FAIL(analyze_ps_execute_request())) {
LOG_WARN("fail to analyze ps execute request", K(ret));
}
......@@ -2002,8 +2004,8 @@ int ObMysqlSM::analyze_ps_execute_request()
LOG_WARN("com_stmt_execute packet is empty", K(ret));
} else {
const char *pos = data.ptr() + MYSQL_NET_META_LENGTH;
int32_t ps_id = 0;
ObMySQLUtil::get_int4(pos, ps_id);
uint32_t ps_id = 0;
ObMySQLUtil::get_uint4(pos, ps_id);
ObPsEntry *entry = NULL;
if (OB_ISNULL(entry = session_info.get_ps_entry(ps_id)) || !entry->is_valid()) {
ret = OB_ERR_UNEXPECTED;
......@@ -2165,8 +2167,8 @@ int ObMysqlSM::analyze_ps_prepare_execute_request()
LOG_WARN("com_stmt_execute packet is empty", K(ret));
} else {
const char *pos = data.ptr() + MYSQL_NET_META_LENGTH;
int32_t ps_id = 0;
ObMySQLUtil::get_int4(pos, ps_id);
uint32_t ps_id = 0;
ObMySQLUtil::get_uint4(pos, ps_id);
if (0 == ps_id) {
session_info.set_client_ps_id(client_session_->inc_and_get_ps_id());
} else {
......@@ -3028,6 +3030,7 @@ int ObMysqlSM::state_server_request_send(int event, void *data)
ss_info.remove_ps_id_pair(client_ps_id);
ss_info.remove_cursor_id_pair(client_ps_id);
cs_info.remove_cursor_id_addr(client_ps_id);
cs_info.remove_piece_info(client_ps_id);
if (NULL != ps_id_addrs) {
ps_id_addrs->remove_addr(server_session_->get_netvc()->get_remote_addr());
}
......@@ -5034,7 +5037,8 @@ inline int ObMysqlSM::do_internal_observer_open()
trans_state_.current_.send_action_ = ObMysqlTransact::SERVER_SEND_LAST_INSERT_ID;
} else if (trans_state_.is_hold_start_trans_) {
trans_state_.current_.send_action_ = ObMysqlTransact::SERVER_SEND_START_TRANS;
} else if ((OB_MYSQL_COM_STMT_EXECUTE == trans_state_.trans_info_.client_request_.get_packet_meta().cmd_)
} else if (((OB_MYSQL_COM_STMT_EXECUTE == trans_state_.trans_info_.client_request_.get_packet_meta().cmd_)
|| (OB_MYSQL_COM_STMT_SEND_PIECE_DATA == trans_state_.trans_info_.client_request_.get_packet_meta().cmd_))
&& client_info.need_do_prepare(server_info)) {
trans_state_.current_.send_action_ = ObMysqlTransact::SERVER_SEND_PREPARE;
} else if (client_info.is_text_ps_execute() && client_info.need_do_text_ps_prepare(server_info)) {
......@@ -6065,17 +6069,9 @@ int ObMysqlSM::setup_client_transfer(ObMysqlVCType to_vc_type)
K_(sm_id), K(ret));
} else if (OB_FAIL(trans_state_.alloc_internal_buffer(MYSQL_BUFFER_SIZE))) {
LOG_ERROR("fail to allocate internal buffer,", K_(sm_id), K(ret));
} else if (OB_FAIL(ObMysqlTransact::rewrite_stmt_id(trans_state_, client_buffer_reader_ ))) {
LOG_WARN("rewrite stmt id failed", K(ret));
} else {
// rewrite stmt id for ps execute
if (obmysql::OB_MYSQL_COM_STMT_EXECUTE == trans_state_.trans_info_.sql_cmd_) {
ObServerSessionInfo &ss_info = server_session_->get_session_info();
ObClientSessionInfo &cs_info = client_session_->get_session_info();
uint32_t client_ps_id = cs_info.get_client_ps_id();
// get server_ps_id by client_ps_id
uint32_t server_ps_id = ss_info.get_server_ps_id(client_ps_id);
client_buffer_reader_->replace(reinterpret_cast<const char*>(&server_ps_id), sizeof(server_ps_id), MYSQL_NET_META_LENGTH);
}
// Next order of business if copy the remaining data from the
// request buffer into new buffer
if (OB_FAIL(trans_state_.internal_buffer_->remove_append(client_buffer_reader_, written_bytes))) {
......
......@@ -839,6 +839,7 @@ public:
static int build_server_request(ObTransState &s, event::ObIOBufferReader *&reader,
int64_t &request_len);
static int rewrite_stmt_id(ObTransState &s, event::ObIOBufferReader *client_buffer_reader);
static int build_oceanbase_user_request(ObTransState &s, event::ObIOBufferReader *client_buffer_reader,
event::ObIOBufferReader *&reader, int64_t &request_len);
static int build_user_request(ObTransState &s, event::ObIOBufferReader *client_buffer_reader,
......
// Copyright (c) 2021-2026 Alibaba Inc. All Rights Reserved.
// Author:
// zhixin.lm@antgroup.com
//
#ifndef OBPROXY_PIECE_INFO_H
#define OBPROXY_PIECE_INFO_H
#include "lib/hash/ob_build_in_hashmap.h"
#include "iocore/net/ob_inet.h"
namespace oceanbase
{
namespace obproxy
{
namespace proxy
{
class ObPieceInfo
{
public:
ObPieceInfo() : ps_id_(0), addr_() {}
~ObPieceInfo() {}
bool is_valid() const { return 0 != ps_id_ && addr_.is_valid(); }
uint32_t get_ps_id() const { return ps_id_; }
net::ObIpEndpoint &get_addr() { return addr_; }
void set_ps_id(const uint32_t ps_id) { ps_id_ = ps_id; }
void set_addr(const struct sockaddr &addr) { addr_.assign(addr); }
int64_t to_string(char *buf, const int64_t buf_len) const
{
int64_t pos = 0;
J_OBJ_START();
J_KV(K_(ps_id),
K_(addr));
J_OBJ_END();
return pos;
}
public:
LINK(ObPieceInfo, piece_info_link_);
private:
uint32_t ps_id_;
net::ObIpEndpoint addr_;
};
struct ObPieceInfoHashing
{
typedef uint32_t Key;
typedef ObPieceInfo Value;
typedef ObDLList(ObPieceInfo, piece_info_link_) ListHead;
static uint64_t hash(Key key) { return key; }
static Key key(Value *value) { return value->get_ps_id(); }
static bool equal(Key lhs, Key rhs) { return lhs == rhs; }
};
typedef common::hash::ObBuildInHashMap<ObPieceInfoHashing, 64> ObPieceInfoMap;
} // end proxy
} // end obproxy
} // end oceanbase
#endif
......@@ -165,6 +165,9 @@ bool is_supported_mysql_cmd(const obmysql::ObMySQLCmd mysql_cmd)
case obmysql::OB_MYSQL_COM_STMT_RESET:
// Stored Procedures
case obmysql::OB_MYSQL_COM_STMT_FETCH:
// pieceinfo
case obmysql::COM_STMT_SEND_PIECE_DATA:
case obmysql::COM_STMT_GET_PIECE_DATA:
ret = true;
break;
case obmysql::OB_MYSQL_COM_CHANGE_USER:
......
......@@ -434,7 +434,8 @@ int ObMysqlCompressAnalyzer::do_analyze_last_compress_packet(ObMysqlResp &resp)
body_start = last_packet_buffer_ + MYSQL_NET_HEADER_LENGTH;
body_len = meta1.pkt_len_ - MYSQL_NET_HEADER_LENGTH;
// only one ok packet, we need rewrite later
if (OB_MYSQL_COM_FIELD_LIST == mysql_cmd_|| OB_MYSQL_COM_STMT_PREPARE == mysql_cmd_) {
if (OB_MYSQL_COM_FIELD_LIST == mysql_cmd_|| OB_MYSQL_COM_STMT_PREPARE == mysql_cmd_
|| OB_MYSQL_COM_STMT_GET_PIECE_DATA == mysql_cmd_) {
resp.get_analyze_result().ok_packet_action_type_ = OK_PACKET_ACTION_CONSUME;
} else {
resp.get_analyze_result().ok_packet_action_type_ = OK_PACKET_ACTION_REWRITE;
......
......@@ -492,7 +492,7 @@ int ObMysqlCompressOB20Analyzer::analyze_first_response(
resp.get_analyze_result().is_resultset_resp_ = ((OB_MYSQL_COM_QUERY == result_.get_cmd()
|| OB_MYSQL_COM_STMT_EXECUTE == result_.get_cmd()
|| OB_MYSQL_COM_STMT_FETCH == result_.get_cmd())
&& (OB_MYSQL_COM_STATISTICS != result_.get_cmd())
&& (COM_STATISTICS != result_.get_cmd())
&& (MYSQL_OK_PACKET_TYPE != mysql_result.meta_.pkt_type_)
&& (MYSQL_ERR_PACKET_TYPE != mysql_result.meta_.pkt_type_)
&& (MYSQL_EOF_PACKET_TYPE != mysql_result.meta_.pkt_type_)
......
......@@ -553,7 +553,9 @@ inline int ObMysqlRequestAnalyzer::do_analyze_request(
}
case OB_MYSQL_COM_STMT_FETCH:
case OB_MYSQL_COM_STMT_CLOSE:
case OB_MYSQL_COM_STMT_EXECUTE: {
case OB_MYSQL_COM_STMT_EXECUTE:
case OB_MYSQL_COM_STMT_SEND_PIECE_DATA:
case OB_MYSQL_COM_STMT_GET_PIECE_DATA: {
// add packet's buffer to mysql common request, for parsing later
if (OB_FAIL(client_request.add_request(ctx.reader_, ctx.request_buffer_length_))) {
LOG_WARN("fail to add com request", K(ret));
......@@ -607,7 +609,9 @@ inline int ObMysqlRequestAnalyzer::do_analyze_request(
}
}
case OB_MYSQL_COM_HANDSHAKE:
case OB_MYSQL_COM_QUIT:
case OB_MYSQL_COM_QUIT: {
break;
}
default: {
break;
}
......
......@@ -273,7 +273,9 @@ int ObRespResult::is_resp_finished(bool &finished, ObMysqlRespEndingType &ending
}
case OB_MYSQL_COM_STMT_FETCH:
case OB_MYSQL_COM_STMT_EXECUTE:
case OB_MYSQL_COM_QUERY : {
case OB_MYSQL_COM_QUERY:
case OB_MYSQL_COM_STMT_SEND_PIECE_DATA:
case OB_MYSQL_COM_STMT_GET_PIECE_DATA: {
if (RESULT_SET_RESP_TYPE == resp_type_ || OTHERS_RESP_TYPE == resp_type_) {
if (OB_UNLIKELY(is_mysql_mode())) {
if (2 == pkt_cnt_[EOF_PACKET_ENDING_TYPE]) {
......@@ -506,7 +508,9 @@ inline int ObMysqlRespAnalyzer::read_pkt_type(ObBufferReader &buf_reader, ObResp
|| OB_MYSQL_COM_STMT_PREPARE == result.get_cmd()
|| OB_MYSQL_COM_STMT_FETCH == result.get_cmd()
|| OB_MYSQL_COM_STMT_EXECUTE == result.get_cmd()
|| OB_MYSQL_COM_STMT_PREPARE_EXECUTE == result.get_cmd())
|| OB_MYSQL_COM_STMT_PREPARE_EXECUTE == result.get_cmd()
|| OB_MYSQL_COM_STMT_GET_PIECE_DATA == result.get_cmd()
|| OB_MYSQL_COM_STMT_SEND_PIECE_DATA == result.get_cmd())
&& OB_LIKELY(MAX_RESP_TYPE == result.get_resp_type())) {
ObMysqlRespEndingType type = meta_analyzer_.get_cur_type();
if (ERROR_PACKET_ENDING_TYPE == type || OK_PACKET_ENDING_TYPE == type) {
......@@ -642,6 +646,8 @@ inline int ObMysqlRespAnalyzer::analyze_resp_pkt(
} else {
ok_packet_action_type = OK_PACKET_ACTION_REWRITE;
}
} else if (COM_STMT_GET_PIECE_DATA == result.get_cmd()) {
ok_packet_action_type = OK_PACKET_ACTION_CONSUME;
} else if (1 == prepare_ok_pkt_cnt) {
//stmt_prepare extra ok atfter
ok_packet_action_type = OK_PACKET_ACTION_CONSUME;
......
......@@ -195,6 +195,12 @@ const char *ObProxyParserUtils::get_sql_cmd_name(const ObMySQLCmd cmd)
case OB_MYSQL_COM_MAX_NUM:
name = "OB_MYSQL_COM_MAX_NUM";
break;
case OB_MYSQL_COM_STMT_SEND_PIECE_DATA:
name ="OB_MYSQL_COM_STMT_SEND_PIECE_DATA";
break;
case OB_MYSQL_COM_STMT_GET_PIECE_DATA:
name = "OB_MYSQL_COM_STMT_GET_PIECE_DATA";
break;
default:
name = "unknown sql command";
......
......@@ -1124,6 +1124,7 @@ void ObClientSessionInfo::destroy()
destroy_ps_id_entry_map();
destroy_cursor_id_addr_map();
destroy_ps_id_addrs_map();
destroy_piece_info_map();
is_trans_specified_ = false;
is_global_vars_changed_ = false;
is_user_idc_name_set_ = false;
......@@ -1180,6 +1181,18 @@ void ObClientSessionInfo::destroy_cursor_id_addr_map()
cursor_id_addr_map_.reset();
}
void ObClientSessionInfo::destroy_piece_info_map()
{
ObPieceInfoMap::iterator last = piece_info_map_.end();
ObPieceInfoMap::iterator tmp_iter;
for (ObPieceInfoMap::iterator iter = piece_info_map_.begin(); iter != last;) {
tmp_iter = iter;
++iter;
op_free(&(*tmp_iter));
}
piece_info_map_.reset();
}
void ObClientSessionInfo::destroy_ps_id_addrs_map()
{
ObPsIdAddrsMap::iterator last = ps_id_addrs_map_.end();
......
......@@ -26,6 +26,7 @@
#include "proxy/route/ob_ldc_struct.h"
#include "proxy/mysql/ob_prepare_statement_struct.h"
#include "proxy/mysql/ob_cursor_struct.h"
#include "proxy/mysql/ob_piece_info.h"
#include "rpc/obmysql/packet/ompk_handshake.h"
#include "rpc/obmysql/packet/ompk_ssl_request.h"
#include "utils/ob_proxy_hot_upgrader.h"
......@@ -762,6 +763,30 @@ public:
}
void destroy_cursor_id_addr_map();
int add_piece_info(ObPieceInfo *info)
{
return piece_info_map_.unique_set(info);
}
int get_piece_info(ObPieceInfo* &info)
{
return piece_info_map_.get_refactored(ps_id_, info);
}
int get_piece_info(const uint32_t ps_id, ObPieceInfo* &info)
{
return piece_info_map_.get_refactored(ps_id, info);
}
void remove_piece_info(const uint32_t ps_id)
{
ObPieceInfo *info = piece_info_map_.remove(ps_id);
if (NULL != info) {
op_free(info);
info = NULL;
}
}
void destroy_piece_info_map();
int add_ps_id_addrs(ObPsIdAddrs *ps_id_addrs) {
return ps_id_addrs_map_.unique_set(ps_id_addrs);
}
......@@ -891,6 +916,7 @@ private:
ObCursorIdAddrMap cursor_id_addr_map_;
ObPsIdAddrsMap ps_id_addrs_map_;
ObPieceInfoMap piece_info_map_;
bool is_read_only_user_;
bool is_request_follower_user_;
......
......@@ -195,7 +195,9 @@ char const *get_mysql_cmd_str(ObMySQLCmd mysql_cmd)
"Binlog dump gtid", // OB_MYSQL_COM_BINLOG_DUMP_GTID,
// "Reset connection", // COM_RESET_CONNECTION,
"Prepare Execute" // OB_MYSQL_COM_STMT_PREPARE_EXECUTE,
"Prepare Execute", // OB_MYSQL_COM_STMT_PREPARE_EXECUTE,
"SEND PIECE DATA",
"GET PIECE DATA",
"End", // OB_MYSQL_COM_END,
"Delete session", // OB_MYSQL_COM_DELETE_SESSION
......
......@@ -66,10 +66,11 @@ enum ObMySQLCmd
// COM_RESET_CONNECTION,
OB_MYSQL_COM_STMT_PREPARE_EXECUTE = OBPROXY_NEW_MYSQL_CMD_START,
OB_MYSQL_COM_STMT_SEND_PIECE_DATA,
OB_MYSQL_COM_STMT_GET_PIECE_DATA,
OB_MYSQL_COM_END,
// for obproxy
// OB_MYSQL_COM_DELETE_SESSION is not a standard mysql package type. This is a package used to process delete session
// When the connection is disconnected, the session needs to be deleted, but at this time it may not be obtained in the callback function disconnect
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册