From 5ac1a95bc952dac49c869fe01050236ea6466bd6 Mon Sep 17 00:00:00 2001 From: obdev Date: Mon, 7 Nov 2022 03:05:47 +0000 Subject: [PATCH] [Election] optimize election bahavior --- deps/oblib/src/lib/string/ob_string_holder.h | 90 ++++++++++--------- .../palf/election/algorithm/election_impl.cpp | 6 +- .../election/algorithm/election_proposer.cpp | 64 +++++++------ .../election/message/election_message.cpp | 4 +- .../palf/election/message/election_message.h | 20 ++--- .../election/utils/election_common_define.h | 12 +-- src/logservice/palf/log_request_handler.cpp | 6 +- src/logservice/palf/log_rpc_macros.h | 41 ++++++--- src/logservice/palf/log_rpc_processor.h | 48 +++++----- 9 files changed, 167 insertions(+), 124 deletions(-) diff --git a/deps/oblib/src/lib/string/ob_string_holder.h b/deps/oblib/src/lib/string/ob_string_holder.h index f8f987e9f..67dc69c32 100644 --- a/deps/oblib/src/lib/string/ob_string_holder.h +++ b/deps/oblib/src/lib/string/ob_string_holder.h @@ -42,62 +42,65 @@ public: }; } -#define DEFAULT_ALLOCATOR value_sematic_string::DefaultAllocator::get_instance() class ObStringHolder { + static constexpr int64_t TINY_STR_SIZE = 16;// no need count '\0' public: - ObStringHolder() : ObStringHolder(DEFAULT_ALLOCATOR) {}; - ObStringHolder(const ObStringHolder &) = delete; - ObStringHolder(ObStringHolder &&rhs) : ObStringHolder(DEFAULT_ALLOCATOR) - { - std::swap(buffer_, rhs.buffer_); - std::swap(len_, rhs.len_); - } - ObStringHolder &operator=(const ObStringHolder &) = delete; - ObStringHolder &operator=(ObStringHolder &&rhs) - { - std::swap(buffer_, rhs.buffer_); - std::swap(len_, rhs.len_); - return *this; - } - ObStringHolder(ObIAllocator &alloc) : - buffer_(nullptr), len_(0), allocator_(alloc) {} + ObStringHolder() : buffer_(nullptr), len_(0) {} ~ObStringHolder() { reset(); } void reset() { - if (OB_NOT_NULL(buffer_)) { - allocator_.free(buffer_); + if (buffer_ == local_buffer_for_tiny_str_) {// tiny str + buffer_ = nullptr; + len_ = 0; + } else if (OB_ISNULL(buffer_)) {// empty str + len_ = 0; + } else {// big str + value_sematic_string::DefaultAllocator::get_instance().free(buffer_); buffer_ = nullptr; len_ = 0; } } - ObString get_ob_string() const { return ObString(len_, buffer_); } - int assign(const ObStringHolder &rhs) { - int ret = OB_SUCCESS; - if (!rhs.empty()) { - reset(); - if (OB_ISNULL(buffer_ = (char *)allocator_.alloc(rhs.len_))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - } else { - len_ = rhs.len_; - memcpy(buffer_, rhs.buffer_, len_); - } + // move sematic + ObStringHolder(ObStringHolder &&rhs) : ObStringHolder() { *this = std::move(rhs); } + ObStringHolder &operator=(ObStringHolder &&rhs) { + reset(); + if (rhs.buffer_ == rhs.local_buffer_for_tiny_str_) {// tiny str + copy_from_tiny_ob_str_(rhs.get_ob_string()); + } else {// big str + std::swap(buffer_, rhs.buffer_); + std::swap(len_, rhs.len_); } - return ret; + return *this; } + // not allow copy construction and copy assignment + ObStringHolder(const ObStringHolder &) = delete; + ObStringHolder &operator=(const ObStringHolder &) = delete; + // copy from assign + int assign(const ObStringHolder &rhs) { return assign(rhs.get_ob_string()); } int assign(const ObString &str) { int ret = OB_SUCCESS; - reset(); if (OB_LIKELY(!str.empty())) { - int64_t len = str.length(); - if (OB_ISNULL(buffer_ = (char *)allocator_.alloc(len))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - } else { - len_ = len; - memcpy(buffer_, str.ptr(), len_); + if (str.length() <= TINY_STR_SIZE) {// tiny str + copy_from_tiny_ob_str_(str); + } else {// big str + int64_t len = str.length(); + char *temp_buffer = nullptr; + if (OB_ISNULL(temp_buffer = (char *)value_sematic_string::DefaultAllocator::get_instance().alloc(len))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + } else { + reset(); + buffer_ = temp_buffer; + len_ = len; + memcpy(buffer_, str.ptr(), len_); + } } + } else { + reset(); } return ret; } + // use ObString method to serialize and print + ObString get_ob_string() const { return ObString(len_, buffer_); } bool empty() const { return OB_ISNULL(buffer_) && len_ == 0; } @@ -123,12 +126,19 @@ public: } return ret; } +private: + void copy_from_tiny_ob_str_(const ObString &tiny_str) { + reset(); + OB_ASSERT(tiny_str.length() <= TINY_STR_SIZE); + memcpy(local_buffer_for_tiny_str_, tiny_str.ptr(), tiny_str.length()); + buffer_ = local_buffer_for_tiny_str_; + len_ = tiny_str.length(); + } private: char *buffer_; int64_t len_; - ObIAllocator &allocator_; + char local_buffer_for_tiny_str_[TINY_STR_SIZE]; }; -#undef DEFAULT_ALLOCATOR } } diff --git a/src/logservice/palf/election/algorithm/election_impl.cpp b/src/logservice/palf/election/algorithm/election_impl.cpp index 6f9d2db22..e77ace9c7 100644 --- a/src/logservice/palf/election/algorithm/election_impl.cpp +++ b/src/logservice/palf/election/algorithm/election_impl.cpp @@ -24,7 +24,7 @@ namespace palf namespace election { -int64_t MAX_TST = 750_ms; +int64_t MAX_TST = 1_s; int64_t INIT_TS = -1; ObOccamTimer GLOBAL_REPORT_TIMER; @@ -242,9 +242,9 @@ int ElectionImpl::handle_message(const ElectionAcceptRequestMsg &msg) acceptor_.on_accept_request(msg, &us_to_expired); } if (OB_LIKELY(us_to_expired > 0)) { - if (us_to_expired - 2 * MAX_TST < 0) { + if (us_to_expired - CALCULATE_TRIGGER_ELECT_WATER_MARK() < 0) { LOG_NONE(WARN, "reschedule devote task in invalid us", K(us_to_expired - 2 * MAX_TST)); - } else if (CLICK_FAIL(proposer_.reschedule_or_register_prepare_task_after_(us_to_expired - 2 * MAX_TST))) { + } else if (CLICK_FAIL(proposer_.reschedule_or_register_prepare_task_after_(us_to_expired - CALCULATE_TRIGGER_ELECT_WATER_MARK()))) { LOG_NONE(ERROR, "register devote task failed"); } else { LOG_NONE(DEBUG, "reschedule devote task after", K(us_to_expired - 2 * MAX_TST)); diff --git a/src/logservice/palf/election/algorithm/election_proposer.cpp b/src/logservice/palf/election/algorithm/election_proposer.cpp index 9cfeb8afe..51db7ad2c 100644 --- a/src/logservice/palf/election/algorithm/election_proposer.cpp +++ b/src/logservice/palf/election/algorithm/election_proposer.cpp @@ -12,6 +12,7 @@ #include "logservice/palf/election/message/election_message.h" +#include "ob_role.h" #include "share/ob_occam_time_guard.h" #include "election_proposer.h" #include "common/ob_clock_generator.h" @@ -275,9 +276,17 @@ int ElectionProposer::reschedule_or_register_prepare_task_after_(const int64_t d } else if (CLICK_FAIL(p_election_->timer_->schedule_task_repeat_spcifiy_first_delay(devote_task_handle_, delay_us, CALCULATE_MAX_ELECT_COST_TIME(), - [this]() { + [this, delay_us]() { + int ret = OB_SUCCESS; LockGuard lock_guard(p_election_->lock_); - this->prepare(ObRole::FOLLOWER); + if (check_leader()) {// Leader不应该靠定时任务主动做Prepare,只能被动触发Prepare + LOG_RENEW_LEASE(INFO, "leader not allow do prepare in timer task before lease expired, this log may printed when message delay too large"); + } else { + if (role_ == ObRole::LEADER) { + role_ = ObRole::FOLLOWER; + } + this->prepare(role_);// 只有Follower可以走到这里 + } return false; }))) { LOG_INIT(ERROR, "first time register devote task failed"); @@ -369,34 +378,35 @@ void ElectionProposer::on_prepare_request(const ElectionPrepareRequestMsg &prepa ELECT_TIME_GUARD(500_ms); #define PRINT_WRAPPER KR(ret), K(prepare_req), K(*this) int ret = OB_SUCCESS; - // 1. 忽略leader prepare消息,不触发一呼百应 - if (static_cast(prepare_req.get_role()) == ObRole::LEADER) {// leader prepare不触发一呼百应 - } else if (static_cast(prepare_req.get_role()) != ObRole::FOLLOWER) { - // 非candidate prepare是非预期的 - LOG_ELECT_LEADER(ERROR, "unexpected code path"); - // 2. 尝试一呼百应 - } else if (memberlist_with_states_.get_member_list().get_addr_list().empty()) { - LOG_ELECT_LEADER(INFO, "memberlist is empty, give up do prepare this time"); - } else { - // 2.1 拒绝旧消息 - if (prepare_req.get_ballot_number() <= ballot_number_) { - // 注意这里是<=,若本轮已经发过一呼百应,则不会再重试,否则将无限循环 - if (prepare_req.get_ballot_number() < ballot_number_) { - ElectionPrepareResponseMsg prepare_res_reject(p_election_->get_self_addr(), - prepare_req); - prepare_res_reject.set_rejected(ballot_number_); - if (CLICK_FAIL(p_election_->send_(prepare_res_reject))) { - LOG_ELECT_LEADER(ERROR, "create prepare request failed"); - } + // 0. 拒绝旧消息、过滤本轮次消息、根据新消息推大轮次 + if (prepare_req.get_ballot_number() <= ballot_number_) { + if (prepare_req.get_ballot_number() < ballot_number_) {// 对于旧消息发送拒绝响应 + ElectionPrepareResponseMsg prepare_res_reject(p_election_->get_self_addr(), + prepare_req); + prepare_res_reject.set_rejected(ballot_number_); + if (CLICK_FAIL(p_election_->send_(prepare_res_reject))) { + LOG_ELECT_LEADER(ERROR, "create prepare request failed"); } else { - LOG_ELECT_LEADER(INFO, "has been send prepare request in this ballot, give up this time"); + LOG_ELECT_LEADER(INFO, "send reject response cause prepare message ballot too small"); } - // 2.2 一呼百应 + } else {// 对于本轮次消息,需要过滤,否则无限循环 + LOG_ELECT_LEADER(INFO, "has been send prepare request in this ballot, give up this time"); + } + } else {// 对于新的消息,推大本机选举轮次 + LOG_ELECT_LEADER(INFO, "receive bigger ballot prepare request"); + (void) advance_ballot_number_and_reset_related_states_(prepare_req.get_ballot_number(), + "receive bigger ballot prepare request"); + // 1. 忽略leader prepare消息,不触发一呼百应 + if (static_cast(prepare_req.get_role()) == ObRole::LEADER) { + LOG_ELECT_LEADER(INFO, "proposer ignore leader prepare"); + } else if (static_cast(prepare_req.get_role()) != ObRole::FOLLOWER) { + // 非candidate prepare是非预期的 + LOG_ELECT_LEADER(ERROR, "unexpected code path"); + // 2. 尝试一呼百应 + } else if (memberlist_with_states_.get_member_list().get_addr_list().empty()) { + LOG_ELECT_LEADER(INFO, "memberlist is empty, give up do prepare this time"); } else { - LOG_ELECT_LEADER(INFO, "receive bigger ballot prepare request"); (void) p_election_->refresh_priority_(); - (void) advance_ballot_number_and_reset_related_states_(prepare_req.get_ballot_number(), - "receive bigger ballot prepare request"); ElectionPrepareRequestMsg prepare_followed_req(p_election_->id_, p_election_->get_self_addr(), restart_counter_, @@ -409,7 +419,7 @@ void ElectionProposer::on_prepare_request(const ElectionPrepareRequestMsg &prepa LOG_ELECT_LEADER(INFO, "self is not in memberlist, give up do prepare"); } else if (CLICK_FAIL(p_election_->broadcast_(prepare_followed_req, memberlist_with_states_.get_member_list() - .get_addr_list()))) { + .get_addr_list()))) { LOG_ELECT_LEADER(ERROR, "broadcast prepare request failed"); } else { last_do_prepare_ts_ = ObClockGenerator::getCurrentTime(); diff --git a/src/logservice/palf/election/message/election_message.cpp b/src/logservice/palf/election/message/election_message.cpp index 9341c9f95..82d3ecac4 100644 --- a/src/logservice/palf/election/message/election_message.cpp +++ b/src/logservice/palf/election/message/election_message.cpp @@ -133,7 +133,7 @@ ElectionMsgDebugTs ElectionMsgBase::get_debug_ts() const { return debug_ts_; } void ElectionMsgBase::set_process_ts() { debug_ts_.dest_process_ts_ = ObClockGenerator::getRealClock(); - print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESTHOLD); + print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESHOLD); } int64_t ElectionMsgBase::get_id() const { return id_; } @@ -198,7 +198,7 @@ ElectionMsgBase(request.get_id(), accepted_(false) { set_receiver(request.get_sender()); request_debug_ts_ = request.get_debug_ts(); - print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESTHOLD); + print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESHOLD); } void ElectionPrepareResponseMsgMiddle::set_accepted(const int64_t ballot_number, const Lease lease) { diff --git a/src/logservice/palf/election/message/election_message.h b/src/logservice/palf/election/message/election_message.h index a60f25694..adf3d820d 100644 --- a/src/logservice/palf/election/message/election_message.h +++ b/src/logservice/palf/election/message/election_message.h @@ -137,13 +137,13 @@ public: int deserialize(const char* buf, const int64_t data_len, int64_t& pos) { int ret = ElectionPrepareRequestMsgMiddle::deserialize(buf, data_len, pos); debug_ts_.dest_deserialize_ts_ = ObClockGenerator::getRealClock(); - print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESTHOLD); + print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESHOLD); return ret; } int64_t get_serialize_size() const { if (debug_ts_.src_serialize_ts_ == 0) {// cause get_serialize_size maybe call more than once const_cast(debug_ts_.src_serialize_ts_) = ObClockGenerator::getRealClock(); - print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESTHOLD); + print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESHOLD); } return ElectionPrepareRequestMsgMiddle::get_serialize_size(); } @@ -183,13 +183,13 @@ public: int deserialize(const char* buf, const int64_t data_len, int64_t& pos) { int ret = ElectionPrepareResponseMsgMiddle::deserialize(buf, data_len, pos); debug_ts_.dest_deserialize_ts_ = ObClockGenerator::getRealClock(); - print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESTHOLD); + print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESHOLD); return ret; } int64_t get_serialize_size() const { if (debug_ts_.src_serialize_ts_ == 0) {// cause get_serialize_size maybe call more than once const_cast(debug_ts_.src_serialize_ts_) = ObClockGenerator::getRealClock(); - print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESTHOLD); + print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESHOLD); } return ElectionPrepareResponseMsgMiddle::get_serialize_size(); } @@ -238,13 +238,13 @@ public: int deserialize(const char* buf, const int64_t data_len, int64_t& pos) { int ret = ElectionAcceptRequestMsgMiddle::deserialize(buf, data_len, pos); debug_ts_.dest_deserialize_ts_ = ObClockGenerator::getRealClock(); - print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESTHOLD); + print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESHOLD); return ret; } int64_t get_serialize_size() const { if (debug_ts_.src_serialize_ts_ == 0) {// cause get_serialize_size maybe call more than once const_cast(debug_ts_.src_serialize_ts_) = ObClockGenerator::getRealClock(); - print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESTHOLD); + print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESHOLD); } return ElectionAcceptRequestMsgMiddle::get_serialize_size(); } @@ -302,13 +302,13 @@ public: int deserialize(const char* buf, const int64_t data_len, int64_t& pos) { int ret = ElectionAcceptResponseMsgMiddle::deserialize(buf, data_len, pos); debug_ts_.dest_deserialize_ts_ = ObClockGenerator::getRealClock(); - print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESTHOLD); + print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESHOLD); return ret; } int64_t get_serialize_size() const { if (debug_ts_.src_serialize_ts_ == 0) {// cause get_serialize_size maybe call more than once const_cast(debug_ts_.src_serialize_ts_) = ObClockGenerator::getRealClock(); - print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESTHOLD); + print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESHOLD); } return ElectionAcceptResponseMsgMiddle::get_serialize_size(); } @@ -353,13 +353,13 @@ public: int deserialize(const char* buf, const int64_t data_len, int64_t& pos) { int ret = ElectionChangeLeaderMsgMiddle::deserialize(buf, data_len, pos); debug_ts_.dest_deserialize_ts_ = ObClockGenerator::getRealClock(); - print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESTHOLD); + print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESHOLD); return ret; } int64_t get_serialize_size() const { if (debug_ts_.src_serialize_ts_ == 0) {// cause get_serialize_size maybe call more than once const_cast(debug_ts_.src_serialize_ts_) = ObClockGenerator::getRealClock(); - print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESTHOLD); + print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESHOLD); } return ElectionChangeLeaderMsgMiddle::get_serialize_size(); } diff --git a/src/logservice/palf/election/utils/election_common_define.h b/src/logservice/palf/election/utils/election_common_define.h index c6ebc7717..2e0bc2f85 100644 --- a/src/logservice/palf/election/utils/election_common_define.h +++ b/src/logservice/palf/election/utils/election_common_define.h @@ -17,6 +17,7 @@ // or may cause MACRO pollution #include "lib/oblog/ob_log_module.h" #include "share/ob_occam_time_guard.h" +#include #define LOG_PHASE(level, phase, info, args...) \ do {\ @@ -83,15 +84,16 @@ enum class LogPhase SET_MEMBER = 7, }; -constexpr int64_t MSG_DELAY_WARN_THRESTHOLD = 200_ms; +constexpr int64_t MSG_DELAY_WARN_THRESHOLD = 200_ms; constexpr int64_t MAX_LEASE_TIME = 10_s; constexpr int64_t PRIORITY_BUFFER_SIZE = 512; constexpr int64_t INVALID_VALUE = -1;// 所有int64_t变量的初始默认无效值 extern int64_t MAX_TST; // 最大单程消息延迟,暂设为750ms,在单测中会将其调低,日后可改为配置项,现阶段先用全局变量代替 -inline int64_t CALCULATE_RENEW_LEASE_INTERVAL() { return 0.5 * MAX_TST; }// 续约的周期,目前是325ms,在暂时没有切主流程优化的情况下,设置的间隔短一些,为了及时切主 -inline int64_t CALCULATE_TIME_WINDOW_SPAN_TS() { return 2 * MAX_TST; }// 时间窗口的长度,为两个最大单程消息延迟 -inline int64_t CALCULATE_MAX_ELECT_COST_TIME() { return 10 * MAX_TST; }// 一次选举可能出现的最大耗时设置,设置为10s -inline int64_t CALCULATE_LEASE_INTERVAL() { return 4 * MAX_TST; }// 4个消息延迟是3s +inline int64_t CALCULATE_RENEW_LEASE_INTERVAL() { return std::min(0.5 * MAX_TST, 250_ms); }// 续约周期固定为消息延迟的一半,最大不超过250ms +inline int64_t CALCULATE_TIME_WINDOW_SPAN_TS() { return 2 * MAX_TST; }// 时间窗口的长度,为两个最大单程消息延迟, 默认为2s +inline int64_t CALCULATE_MAX_ELECT_COST_TIME() { return 10 * MAX_TST; }// 一次选举可能出现的最大耗时设置,默认为10s +inline int64_t CALCULATE_LEASE_INTERVAL() { return 4 * MAX_TST; }// 4个消息延迟,默认是4s +inline int64_t CALCULATE_TRIGGER_ELECT_WATER_MARK() { return std::min(MAX_TST, 1_s); }// 触发无主选举的Lease剩余水位线,1个最大消息延迟,最大不超过1s }// namespace election }// namespace palf diff --git a/src/logservice/palf/log_request_handler.cpp b/src/logservice/palf/log_request_handler.cpp index df748baa7..a1effb635 100644 --- a/src/logservice/palf/log_request_handler.cpp +++ b/src/logservice/palf/log_request_handler.cpp @@ -12,6 +12,7 @@ #include "log_request_handler.h" #include "log_req.h" +#include "share/ob_occam_time_guard.h" namespace oceanbase { @@ -383,15 +384,16 @@ int LogRequestHandler::handle_request(\ const ObAddr &server,\ const MsgType &req)\ {\ + TIMEGUARD_INIT(ELECT, 50_ms, 10_s);\ int ret = common::OB_SUCCESS;\ if (false == is_valid_palf_id(palf_id) || false == req.is_valid()) {\ ret = OB_INVALID_ARGUMENT;\ PALF_LOG(ERROR, "Invalid argument!!!", K(ret), K(palf_id), K(req));\ } else {\ PalfHandleImplGuard guard;\ - if (OB_FAIL(palf_env_impl_->get_palf_handle_impl(palf_id, guard))) {\ + if (CLICK_FAIL(palf_env_impl_->get_palf_handle_impl(palf_id, guard))) {\ PALF_LOG(WARN, "ObLogMgr get_log_service failed", K(ret), K(palf_id), KP(palf_env_impl_));\ - } else if (OB_FAIL(guard.get_palf_handle_impl()->handle_election_message(req))) {\ + } else if (CLICK_FAIL(guard.get_palf_handle_impl()->handle_election_message(req))) {\ PALF_LOG(WARN, "handle message failed", K(ret), K(palf_id), K(server), K(req));\ } else {\ PALF_LOG(DEBUG, "handle message success", K(ret), K(palf_id), K(server), K(req));\ diff --git a/src/logservice/palf/log_rpc_macros.h b/src/logservice/palf/log_rpc_macros.h index 76d728be1..7cfd54599 100644 --- a/src/logservice/palf/log_rpc_macros.h +++ b/src/logservice/palf/log_rpc_macros.h @@ -13,14 +13,9 @@ #ifndef OCEANBASE_LOGSERVICE_LOG_RPC_MACROS_ #define OCEANBASE_LOGSERVICE_LOG_RPC_MACROS_ -#define DEFINE_RPC_PROCESSOR(CLASS, PROXY, REQTYPE, PCODE) \ - class CLASS : public obrpc::ObRpcProcessor> \ - { \ - public: \ - CLASS() : palf_env_impl_(NULL), filter_(NULL) {} \ - virtual ~CLASS() {} \ - int process() \ - { \ +#include "share/ob_occam_time_guard.h" + +#define __RPC_PROCESS_CODE__(REQTYPE) \ int ret = OB_SUCCESS; \ LogRpcPacketImpl &rpc_packet = arg_; \ const REQTYPE &req = rpc_packet.req_; \ @@ -36,6 +31,24 @@ PALF_LOG(TRACE, "Processor handle_request success", K(ret), K(palf_id), K(req), KP(filter_)); \ } \ return ret; \ + +#define __DEFINE_RPC_PROCESSOR__(CLASS, PROXY, REQTYPE, PCODE, ELECTION_MSG) \ + class CLASS : public obrpc::ObRpcProcessor> \ + { \ + public: \ + CLASS() : palf_env_impl_(NULL), filter_(NULL) {} \ + virtual ~CLASS() {} \ + int process() { return process_impl_(); } \ + template ::type = true> \ + int process_impl_() \ + { \ + TIMEGUARD_INIT(ELECT, 50_ms, 10_s); \ + __RPC_PROCESS_CODE__(REQTYPE) \ + } \ + template ::type = true> \ + int process_impl_() \ + { \ + __RPC_PROCESS_CODE__(REQTYPE) \ } \ void set_palf_env_impl(void *palf_env_impl, void *filter) \ { \ @@ -48,6 +61,9 @@ ObFunction *filter_; \ } +#define DEFINE_RPC_PROCESSOR(CLASS, PROXY, REQTYPE, PCODE) __DEFINE_RPC_PROCESSOR__(CLASS, PROXY, REQTYPE, PCODE, false) +#define DEFINE_ELECTION_RPC_PROCESSOR(CLASS, PROXY, REQTYPE, PCODE) __DEFINE_RPC_PROCESSOR__(CLASS, PROXY, REQTYPE, PCODE, true) + #define DECLARE_RPC_PROXY_POST_FUNCTION(PRIO, REQTYPE, PCODE) \ RPC_AP(PRIO post_packet, PCODE, (palf::LogRpcPacketImpl)); \ int post_packet(const common::ObAddr &dst, const palf::LogRpcPacketImpl &pkt, const int64_t tenant_id) @@ -72,6 +88,7 @@ int LogRpcProxyV2::post_packet(const common::ObAddr &dst, const palf::LogRpcPacketImpl &pkt, \ const int64_t tenant_id) \ { \ + TIMEGUARD_INIT(ELECT, 50_ms, 10_s); \ int ret = common::OB_SUCCESS; \ static obrpc::LogRpcCB cb; \ ret = this->to(dst) \ @@ -79,7 +96,7 @@ .trace_time(true) \ .max_process_handler_time(100 * 1000) \ .by(tenant_id) \ - .group_id(share::OBCG_ELECTION) \ + .group_id(share::OBCG_ELECTION) \ .post_packet(pkt, &cb); \ return ret; \ } @@ -113,6 +130,7 @@ virtual ~CLASS() {} \ int process() \ { \ + TIMEGUARD_INIT(PALF, 100_ms, 10_s); \ int ret = OB_SUCCESS; \ LogRpcPacketImpl &rpc_packet = arg_; \ const REQTYPE &req = rpc_packet.req_; \ @@ -120,11 +138,12 @@ int64_t palf_id = rpc_packet.palf_id_; \ RESPTYPE &resp = result_.req_; \ result_.palf_id_ = palf_id; \ - if (OB_ISNULL(palf_env_impl_) && OB_FAIL(__get_palf_env_impl(rpc_pkt_->get_tenant_id(), palf_env_impl_))) { \ + if (OB_ISNULL(palf_env_impl_) && CLICK_FAIL(__get_palf_env_impl(rpc_pkt_->get_tenant_id(), palf_env_impl_))) { \ PALF_LOG(WARN, "__get_palf_env_impl failed", K(ret), KPC(rpc_pkt_)); \ - } else if (NULL != filter_ && true == (*filter_)(server)) { \ + } else if (CLICK() && NULL != filter_ && true == (*filter_)(server)) { \ PALF_LOG(INFO, "need filter this packet", K(rpc_packet)); \ } else { \ + CLICK(); \ LogRequestHandler handler(palf_env_impl_); \ ret = handler.handle_sync_request(palf_id, server, req, resp); \ } \ diff --git a/src/logservice/palf/log_rpc_processor.h b/src/logservice/palf/log_rpc_processor.h index 75e8a05e8..6127bf24a 100644 --- a/src/logservice/palf/log_rpc_processor.h +++ b/src/logservice/palf/log_rpc_processor.h @@ -97,30 +97,30 @@ DEFINE_RPC_PROCESSOR(LogRegisterParentRespP, LogRegisterParentResp, obrpc::OB_LOG_REGISTER_PARENT_RESP); -DEFINE_RPC_PROCESSOR(ElectionPrepareRequestMsgP, - obrpc::LogRpcProxyV2, - election::ElectionPrepareRequestMsg, - obrpc::OB_LOG_ELECTION_PREPARE_REQUEST); - -DEFINE_RPC_PROCESSOR(ElectionPrepareResponseMsgP, - obrpc::LogRpcProxyV2, - election::ElectionPrepareResponseMsg, - obrpc::OB_LOG_ELECTION_PREPARE_RESPONSE); - -DEFINE_RPC_PROCESSOR(ElectionAcceptRequestMsgP, - obrpc::LogRpcProxyV2, - election::ElectionAcceptRequestMsg, - obrpc::OB_LOG_ELECTION_ACCEPT_REQUEST); - -DEFINE_RPC_PROCESSOR(ElectionAcceptResponseMsgP, - obrpc::LogRpcProxyV2, - election::ElectionAcceptResponseMsg, - obrpc::OB_LOG_ELECTION_ACCEPT_RESPONSE); - -DEFINE_RPC_PROCESSOR(ElectionChangeLeaderMsgP, - obrpc::LogRpcProxyV2, - election::ElectionChangeLeaderMsg, - obrpc::OB_LOG_ELECTION_CHANGE_LEADER_REQUEST); +DEFINE_ELECTION_RPC_PROCESSOR(ElectionPrepareRequestMsgP, + obrpc::LogRpcProxyV2, + election::ElectionPrepareRequestMsg, + obrpc::OB_LOG_ELECTION_PREPARE_REQUEST); + +DEFINE_ELECTION_RPC_PROCESSOR(ElectionPrepareResponseMsgP, + obrpc::LogRpcProxyV2, + election::ElectionPrepareResponseMsg, + obrpc::OB_LOG_ELECTION_PREPARE_RESPONSE); + +DEFINE_ELECTION_RPC_PROCESSOR(ElectionAcceptRequestMsgP, + obrpc::LogRpcProxyV2, + election::ElectionAcceptRequestMsg, + obrpc::OB_LOG_ELECTION_ACCEPT_REQUEST); + +DEFINE_ELECTION_RPC_PROCESSOR(ElectionAcceptResponseMsgP, + obrpc::LogRpcProxyV2, + election::ElectionAcceptResponseMsg, + obrpc::OB_LOG_ELECTION_ACCEPT_RESPONSE); + +DEFINE_ELECTION_RPC_PROCESSOR(ElectionChangeLeaderMsgP, + obrpc::LogRpcProxyV2, + election::ElectionChangeLeaderMsg, + obrpc::OB_LOG_ELECTION_CHANGE_LEADER_REQUEST); DEFINE_RPC_PROCESSOR(CommittedInfoP, obrpc::LogRpcProxyV2, -- GitLab