diff --git a/deps/oblib/src/rpc/obmysql/ob_2_0_protocol_processor.cpp b/deps/oblib/src/rpc/obmysql/ob_2_0_protocol_processor.cpp index a0d56e8c77ddfd2c77a482bcde7a42b3d7dff821..c9438537295f1e824df41f352d98cfa5e53553b2 100644 --- a/deps/oblib/src/rpc/obmysql/ob_2_0_protocol_processor.cpp +++ b/deps/oblib/src/rpc/obmysql/ob_2_0_protocol_processor.cpp @@ -397,7 +397,19 @@ inline int Ob20ProtocolProcessor::process_ob20_packet(ObProto20PktContext& conte mysql_data_size, ipacket, need_decode_more))) { LOG_ERROR("fail to process fragment mysql packet", KP(mysql_data_start), K(mysql_data_size), K(need_decode_more), K(ret)); - } + } else if (!context.extra_info_.exist_extra_info() + && pkt20->get_extra_info().exist_extra_info()) { + char* tmp_buffer = NULL; + int64_t total_len = pkt20->get_extra_info().get_total_len(); + if (OB_ISNULL(tmp_buffer = reinterpret_cast(context.arena_.alloc(total_len)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_ERROR("no memory available", "alloc_size", total_len, K(ret)); + } else if (OB_FAIL(context.extra_info_.assign(pkt20->get_extra_info(), tmp_buffer, total_len))) { + LOG_ERROR("failed to deep copy extra info", K(ret)); + } + } else { + // do nothing + } if (OB_FAIL(ret)) { // do nothing @@ -415,7 +427,16 @@ inline int Ob20ProtocolProcessor::process_ob20_packet(ObProto20PktContext& conte ObMySQLRawPacket *input_packet = reinterpret_cast(ipacket); input_packet->set_can_reroute_pkt(pkt20->get_flags().is_proxy_reroute()); input_packet->set_is_weak_read(pkt20->get_flags().is_weak_read()); - input_packet->set_extra_info(pkt20->get_extra_info()); + + const int64_t t_len = context.extra_info_.get_total_len(); + char *t_buffer = NULL; + if (OB_ISNULL(t_buffer = reinterpret_cast(pool.alloc(t_len)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_ERROR("no memory available", "alloc_size", t_len, K(ret)); + } else if (OB_FAIL(input_packet->extra_info_.assign(context.extra_info_, t_buffer, t_len))) { + LOG_ERROR("failed to assign extra info", K(ret)); + } + input_packet->set_txn_free_route(pkt20->get_flags().txn_free_route()); context.reset(); // set again for sending response diff --git a/deps/oblib/src/rpc/obmysql/ob_mysql_packet.cpp b/deps/oblib/src/rpc/obmysql/ob_mysql_packet.cpp index 32b4ffb81db624f60028d207c57faa5ba48ead63..cb81506487b970174443b2988b1887b0662bcba7 100644 --- a/deps/oblib/src/rpc/obmysql/ob_mysql_packet.cpp +++ b/deps/oblib/src/rpc/obmysql/ob_mysql_packet.cpp @@ -177,6 +177,30 @@ int ObMySQLRawPacket::serialize(char *buf, const int64_t length, int64_t &pos) c return ret; } +int Ob20ExtraInfo::assign(const Ob20ExtraInfo &other, char* buf, int64_t buf_len) +{ + int ret = OB_SUCCESS; + uint64_t total_len = other.get_total_len(); + if (total_len > buf_len) { + ret = OB_ERR_UNEXPECTED; + SERVER_LOG(ERROR, "invalid alloc size", K(total_len), K(ret)); + } else { + uint64_t len = 0; + MEMCPY(buf+len, other.trace_info_.ptr(), other.trace_info_.length()); + trace_info_.assign_ptr(buf+len, other.trace_info_.length()); + len += other.trace_info_.length(); + + MEMCPY(buf+len, other.sync_sess_info_.ptr(), other.sync_sess_info_.length()); + sync_sess_info_.assign_ptr(buf+len, other.sync_sess_info_.length()); + len += other.sync_sess_info_.length(); + + MEMCPY(buf+len, other.full_link_trace_.ptr(), other.full_link_trace_.length()); + full_link_trace_.assign_ptr(buf+len, other.full_link_trace_.length()); + len += other.full_link_trace_.length(); + } + return ret; +} + char const *get_info_func_name(const ObInformationFunctions func) { const char *str = NULL; diff --git a/deps/oblib/src/rpc/obmysql/ob_mysql_packet.h b/deps/oblib/src/rpc/obmysql/ob_mysql_packet.h index 3b8a94b8ebb68eb7cb33656d1b16d176eba05b47..b854f03a95b521e58d632f4bf9965e3d84ab7ec4 100644 --- a/deps/oblib/src/rpc/obmysql/ob_mysql_packet.h +++ b/deps/oblib/src/rpc/obmysql/ob_mysql_packet.h @@ -360,6 +360,11 @@ public: bool exist_full_link_trace() const { return !full_link_trace_.empty(); } const ObString& get_sync_sess_info() const { return sync_sess_info_; } const ObString& get_full_link_trace() const { return full_link_trace_; } + bool exist_extra_info() {return !sync_sess_info_.empty() || !full_link_trace_.empty() || exist_trace_info_;} + bool exist_extra_info() const {return !sync_sess_info_.empty() || !full_link_trace_.empty() || exist_trace_info_;} + int assign(const Ob20ExtraInfo &other, char* buf, int64_t len); + int64_t get_total_len() {return trace_info_.length() + sync_sess_info_.length() + full_link_trace_.length();} + int64_t get_total_len() const {return trace_info_.length() + sync_sess_info_.length() + full_link_trace_.length();} TO_STRING_KV(K_(extra_len), K_(exist_trace_info), K_(trace_info), K_(sync_sess_info), K_(full_link_trace)); }; @@ -511,9 +516,9 @@ public: inline void set_txn_free_route(const bool txn_free_route); inline bool txn_free_route() const; - inline void set_extra_info(const Ob20ExtraInfo &extra_info) { extra_info_ = extra_info; } inline const Ob20ExtraInfo &get_extra_info() const { return extra_info_; } bool exist_trace_info() const { return extra_info_.exist_trace_info_; } + bool exist_extra_info() const { return extra_info_.exist_extra_info(); } const common::ObString &get_trace_info() const { return extra_info_.trace_info_; } virtual int64_t get_serialize_size() const; @@ -547,6 +552,7 @@ private: bool can_reroute_pkt_; bool is_weak_read_; bool txn_free_route_; +public: Ob20ExtraInfo extra_info_; }; diff --git a/deps/oblib/src/rpc/obmysql/ob_mysql_request_utils.h b/deps/oblib/src/rpc/obmysql/ob_mysql_request_utils.h index 75664155b7859c144e2bd78630b00355a9e6fb93..237f24636f7c8e82079c60210e992eadbcb4674e 100644 --- a/deps/oblib/src/rpc/obmysql/ob_mysql_request_utils.h +++ b/deps/oblib/src/rpc/obmysql/ob_mysql_request_utils.h @@ -137,7 +137,7 @@ private: class ObProto20PktContext { public: - ObProto20PktContext() { reset(); } + ObProto20PktContext() : arena_(common::ObModIds::LIB_MULTI_PACKETS){ reset(); } ~ObProto20PktContext() { } void reset() { @@ -145,18 +145,25 @@ public: is_multi_pkt_ = false; proto20_last_request_id_ = 0; proto20_last_pkt_seq_ = 0; + extra_info_.reset(); + arena_.reset(); //fast free memory } TO_STRING_KV(K_(comp_last_pkt_seq), K_(is_multi_pkt), K_(proto20_last_request_id), - K_(proto20_last_pkt_seq)); + K_(proto20_last_pkt_seq), + K_(extra_info), + "used", arena_.used(), + "total", arena_.total()); public: uint8_t comp_last_pkt_seq_; bool is_multi_pkt_; uint32_t proto20_last_request_id_; uint8_t proto20_last_pkt_seq_; + Ob20ExtraInfo extra_info_; + common::ObArenaAllocator arena_; private: DISALLOW_COPY_AND_ASSIGN(ObProto20PktContext);