提交 5ac1a95b 编写于 作者: O obdev 提交者: wangzelin.wzl

[Election] optimize election bahavior

上级 e1f4909c
......@@ -42,62 +42,65 @@ public:
};
}
#define DEFAULT_ALLOCATOR value_sematic_string::DefaultAllocator::get_instance()
class ObStringHolder
{
static constexpr int64_t TINY_STR_SIZE = 16;// no need count '\0'
public:
ObStringHolder() : ObStringHolder(DEFAULT_ALLOCATOR) {};
ObStringHolder(const ObStringHolder &) = delete;
ObStringHolder(ObStringHolder &&rhs) : ObStringHolder(DEFAULT_ALLOCATOR)
{
std::swap(buffer_, rhs.buffer_);
std::swap(len_, rhs.len_);
}
ObStringHolder &operator=(const ObStringHolder &) = delete;
ObStringHolder &operator=(ObStringHolder &&rhs)
{
std::swap(buffer_, rhs.buffer_);
std::swap(len_, rhs.len_);
return *this;
}
ObStringHolder(ObIAllocator &alloc) :
buffer_(nullptr), len_(0), allocator_(alloc) {}
ObStringHolder() : buffer_(nullptr), len_(0) {}
~ObStringHolder() { reset(); }
void reset() {
if (OB_NOT_NULL(buffer_)) {
allocator_.free(buffer_);
if (buffer_ == local_buffer_for_tiny_str_) {// tiny str
buffer_ = nullptr;
len_ = 0;
} else if (OB_ISNULL(buffer_)) {// empty str
len_ = 0;
} else {// big str
value_sematic_string::DefaultAllocator::get_instance().free(buffer_);
buffer_ = nullptr;
len_ = 0;
}
}
ObString get_ob_string() const { return ObString(len_, buffer_); }
int assign(const ObStringHolder &rhs) {
int ret = OB_SUCCESS;
if (!rhs.empty()) {
reset();
if (OB_ISNULL(buffer_ = (char *)allocator_.alloc(rhs.len_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
} else {
len_ = rhs.len_;
memcpy(buffer_, rhs.buffer_, len_);
}
// move sematic
ObStringHolder(ObStringHolder &&rhs) : ObStringHolder() { *this = std::move(rhs); }
ObStringHolder &operator=(ObStringHolder &&rhs) {
reset();
if (rhs.buffer_ == rhs.local_buffer_for_tiny_str_) {// tiny str
copy_from_tiny_ob_str_(rhs.get_ob_string());
} else {// big str
std::swap(buffer_, rhs.buffer_);
std::swap(len_, rhs.len_);
}
return ret;
return *this;
}
// not allow copy construction and copy assignment
ObStringHolder(const ObStringHolder &) = delete;
ObStringHolder &operator=(const ObStringHolder &) = delete;
// copy from assign
int assign(const ObStringHolder &rhs) { return assign(rhs.get_ob_string()); }
int assign(const ObString &str) {
int ret = OB_SUCCESS;
reset();
if (OB_LIKELY(!str.empty())) {
int64_t len = str.length();
if (OB_ISNULL(buffer_ = (char *)allocator_.alloc(len))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
} else {
len_ = len;
memcpy(buffer_, str.ptr(), len_);
if (str.length() <= TINY_STR_SIZE) {// tiny str
copy_from_tiny_ob_str_(str);
} else {// big str
int64_t len = str.length();
char *temp_buffer = nullptr;
if (OB_ISNULL(temp_buffer = (char *)value_sematic_string::DefaultAllocator::get_instance().alloc(len))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
} else {
reset();
buffer_ = temp_buffer;
len_ = len;
memcpy(buffer_, str.ptr(), len_);
}
}
} else {
reset();
}
return ret;
}
// use ObString method to serialize and print
ObString get_ob_string() const { return ObString(len_, buffer_); }
bool empty() const {
return OB_ISNULL(buffer_) && len_ == 0;
}
......@@ -123,12 +126,19 @@ public:
}
return ret;
}
private:
void copy_from_tiny_ob_str_(const ObString &tiny_str) {
reset();
OB_ASSERT(tiny_str.length() <= TINY_STR_SIZE);
memcpy(local_buffer_for_tiny_str_, tiny_str.ptr(), tiny_str.length());
buffer_ = local_buffer_for_tiny_str_;
len_ = tiny_str.length();
}
private:
char *buffer_;
int64_t len_;
ObIAllocator &allocator_;
char local_buffer_for_tiny_str_[TINY_STR_SIZE];
};
#undef DEFAULT_ALLOCATOR
}
}
......
......@@ -24,7 +24,7 @@ namespace palf
namespace election
{
int64_t MAX_TST = 750_ms;
int64_t MAX_TST = 1_s;
int64_t INIT_TS = -1;
ObOccamTimer GLOBAL_REPORT_TIMER;
......@@ -242,9 +242,9 @@ int ElectionImpl::handle_message(const ElectionAcceptRequestMsg &msg)
acceptor_.on_accept_request(msg, &us_to_expired);
}
if (OB_LIKELY(us_to_expired > 0)) {
if (us_to_expired - 2 * MAX_TST < 0) {
if (us_to_expired - CALCULATE_TRIGGER_ELECT_WATER_MARK() < 0) {
LOG_NONE(WARN, "reschedule devote task in invalid us", K(us_to_expired - 2 * MAX_TST));
} else if (CLICK_FAIL(proposer_.reschedule_or_register_prepare_task_after_(us_to_expired - 2 * MAX_TST))) {
} else if (CLICK_FAIL(proposer_.reschedule_or_register_prepare_task_after_(us_to_expired - CALCULATE_TRIGGER_ELECT_WATER_MARK()))) {
LOG_NONE(ERROR, "register devote task failed");
} else {
LOG_NONE(DEBUG, "reschedule devote task after", K(us_to_expired - 2 * MAX_TST));
......
......@@ -12,6 +12,7 @@
#include "logservice/palf/election/message/election_message.h"
#include "ob_role.h"
#include "share/ob_occam_time_guard.h"
#include "election_proposer.h"
#include "common/ob_clock_generator.h"
......@@ -275,9 +276,17 @@ int ElectionProposer::reschedule_or_register_prepare_task_after_(const int64_t d
} else if (CLICK_FAIL(p_election_->timer_->schedule_task_repeat_spcifiy_first_delay(devote_task_handle_,
delay_us,
CALCULATE_MAX_ELECT_COST_TIME(),
[this]() {
[this, delay_us]() {
int ret = OB_SUCCESS;
LockGuard lock_guard(p_election_->lock_);
this->prepare(ObRole::FOLLOWER);
if (check_leader()) {// Leader不应该靠定时任务主动做Prepare,只能被动触发Prepare
LOG_RENEW_LEASE(INFO, "leader not allow do prepare in timer task before lease expired, this log may printed when message delay too large");
} else {
if (role_ == ObRole::LEADER) {
role_ = ObRole::FOLLOWER;
}
this->prepare(role_);// 只有Follower可以走到这里
}
return false;
}))) {
LOG_INIT(ERROR, "first time register devote task failed");
......@@ -369,34 +378,35 @@ void ElectionProposer::on_prepare_request(const ElectionPrepareRequestMsg &prepa
ELECT_TIME_GUARD(500_ms);
#define PRINT_WRAPPER KR(ret), K(prepare_req), K(*this)
int ret = OB_SUCCESS;
// 1. 忽略leader prepare消息,不触发一呼百应
if (static_cast<ObRole>(prepare_req.get_role()) == ObRole::LEADER) {// leader prepare不触发一呼百应
} else if (static_cast<ObRole>(prepare_req.get_role()) != ObRole::FOLLOWER) {
// 非candidate prepare是非预期的
LOG_ELECT_LEADER(ERROR, "unexpected code path");
// 2. 尝试一呼百应
} else if (memberlist_with_states_.get_member_list().get_addr_list().empty()) {
LOG_ELECT_LEADER(INFO, "memberlist is empty, give up do prepare this time");
} else {
// 2.1 拒绝旧消息
if (prepare_req.get_ballot_number() <= ballot_number_) {
// 注意这里是<=,若本轮已经发过一呼百应,则不会再重试,否则将无限循环
if (prepare_req.get_ballot_number() < ballot_number_) {
ElectionPrepareResponseMsg prepare_res_reject(p_election_->get_self_addr(),
prepare_req);
prepare_res_reject.set_rejected(ballot_number_);
if (CLICK_FAIL(p_election_->send_(prepare_res_reject))) {
LOG_ELECT_LEADER(ERROR, "create prepare request failed");
}
// 0. 拒绝旧消息、过滤本轮次消息、根据新消息推大轮次
if (prepare_req.get_ballot_number() <= ballot_number_) {
if (prepare_req.get_ballot_number() < ballot_number_) {// 对于旧消息发送拒绝响应
ElectionPrepareResponseMsg prepare_res_reject(p_election_->get_self_addr(),
prepare_req);
prepare_res_reject.set_rejected(ballot_number_);
if (CLICK_FAIL(p_election_->send_(prepare_res_reject))) {
LOG_ELECT_LEADER(ERROR, "create prepare request failed");
} else {
LOG_ELECT_LEADER(INFO, "has been send prepare request in this ballot, give up this time");
LOG_ELECT_LEADER(INFO, "send reject response cause prepare message ballot too small");
}
// 2.2 一呼百应
} else {// 对于本轮次消息,需要过滤,否则无限循环
LOG_ELECT_LEADER(INFO, "has been send prepare request in this ballot, give up this time");
}
} else {// 对于新的消息,推大本机选举轮次
LOG_ELECT_LEADER(INFO, "receive bigger ballot prepare request");
(void) advance_ballot_number_and_reset_related_states_(prepare_req.get_ballot_number(),
"receive bigger ballot prepare request");
// 1. 忽略leader prepare消息,不触发一呼百应
if (static_cast<ObRole>(prepare_req.get_role()) == ObRole::LEADER) {
LOG_ELECT_LEADER(INFO, "proposer ignore leader prepare");
} else if (static_cast<ObRole>(prepare_req.get_role()) != ObRole::FOLLOWER) {
// 非candidate prepare是非预期的
LOG_ELECT_LEADER(ERROR, "unexpected code path");
// 2. 尝试一呼百应
} else if (memberlist_with_states_.get_member_list().get_addr_list().empty()) {
LOG_ELECT_LEADER(INFO, "memberlist is empty, give up do prepare this time");
} else {
LOG_ELECT_LEADER(INFO, "receive bigger ballot prepare request");
(void) p_election_->refresh_priority_();
(void) advance_ballot_number_and_reset_related_states_(prepare_req.get_ballot_number(),
"receive bigger ballot prepare request");
ElectionPrepareRequestMsg prepare_followed_req(p_election_->id_,
p_election_->get_self_addr(),
restart_counter_,
......@@ -409,7 +419,7 @@ void ElectionProposer::on_prepare_request(const ElectionPrepareRequestMsg &prepa
LOG_ELECT_LEADER(INFO, "self is not in memberlist, give up do prepare");
} else if (CLICK_FAIL(p_election_->broadcast_(prepare_followed_req,
memberlist_with_states_.get_member_list()
.get_addr_list()))) {
.get_addr_list()))) {
LOG_ELECT_LEADER(ERROR, "broadcast prepare request failed");
} else {
last_do_prepare_ts_ = ObClockGenerator::getCurrentTime();
......
......@@ -133,7 +133,7 @@ ElectionMsgDebugTs ElectionMsgBase::get_debug_ts() const { return debug_ts_; }
void ElectionMsgBase::set_process_ts()
{
debug_ts_.dest_process_ts_ = ObClockGenerator::getRealClock();
print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESTHOLD);
print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESHOLD);
}
int64_t ElectionMsgBase::get_id() const { return id_; }
......@@ -198,7 +198,7 @@ ElectionMsgBase(request.get_id(),
accepted_(false) {
set_receiver(request.get_sender());
request_debug_ts_ = request.get_debug_ts();
print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESTHOLD);
print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESHOLD);
}
void ElectionPrepareResponseMsgMiddle::set_accepted(const int64_t ballot_number, const Lease lease) {
......
......@@ -137,13 +137,13 @@ public:
int deserialize(const char* buf, const int64_t data_len, int64_t& pos) {
int ret = ElectionPrepareRequestMsgMiddle::deserialize(buf, data_len, pos);
debug_ts_.dest_deserialize_ts_ = ObClockGenerator::getRealClock();
print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESTHOLD);
print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESHOLD);
return ret;
}
int64_t get_serialize_size() const {
if (debug_ts_.src_serialize_ts_ == 0) {// cause get_serialize_size maybe call more than once
const_cast<int64_t&>(debug_ts_.src_serialize_ts_) = ObClockGenerator::getRealClock();
print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESTHOLD);
print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESHOLD);
}
return ElectionPrepareRequestMsgMiddle::get_serialize_size();
}
......@@ -183,13 +183,13 @@ public:
int deserialize(const char* buf, const int64_t data_len, int64_t& pos) {
int ret = ElectionPrepareResponseMsgMiddle::deserialize(buf, data_len, pos);
debug_ts_.dest_deserialize_ts_ = ObClockGenerator::getRealClock();
print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESTHOLD);
print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESHOLD);
return ret;
}
int64_t get_serialize_size() const {
if (debug_ts_.src_serialize_ts_ == 0) {// cause get_serialize_size maybe call more than once
const_cast<int64_t&>(debug_ts_.src_serialize_ts_) = ObClockGenerator::getRealClock();
print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESTHOLD);
print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESHOLD);
}
return ElectionPrepareResponseMsgMiddle::get_serialize_size();
}
......@@ -238,13 +238,13 @@ public:
int deserialize(const char* buf, const int64_t data_len, int64_t& pos) {
int ret = ElectionAcceptRequestMsgMiddle::deserialize(buf, data_len, pos);
debug_ts_.dest_deserialize_ts_ = ObClockGenerator::getRealClock();
print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESTHOLD);
print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESHOLD);
return ret;
}
int64_t get_serialize_size() const {
if (debug_ts_.src_serialize_ts_ == 0) {// cause get_serialize_size maybe call more than once
const_cast<int64_t&>(debug_ts_.src_serialize_ts_) = ObClockGenerator::getRealClock();
print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESTHOLD);
print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESHOLD);
}
return ElectionAcceptRequestMsgMiddle::get_serialize_size();
}
......@@ -302,13 +302,13 @@ public:
int deserialize(const char* buf, const int64_t data_len, int64_t& pos) {
int ret = ElectionAcceptResponseMsgMiddle::deserialize(buf, data_len, pos);
debug_ts_.dest_deserialize_ts_ = ObClockGenerator::getRealClock();
print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESTHOLD);
print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESHOLD);
return ret;
}
int64_t get_serialize_size() const {
if (debug_ts_.src_serialize_ts_ == 0) {// cause get_serialize_size maybe call more than once
const_cast<int64_t&>(debug_ts_.src_serialize_ts_) = ObClockGenerator::getRealClock();
print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESTHOLD);
print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESHOLD);
}
return ElectionAcceptResponseMsgMiddle::get_serialize_size();
}
......@@ -353,13 +353,13 @@ public:
int deserialize(const char* buf, const int64_t data_len, int64_t& pos) {
int ret = ElectionChangeLeaderMsgMiddle::deserialize(buf, data_len, pos);
debug_ts_.dest_deserialize_ts_ = ObClockGenerator::getRealClock();
print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESTHOLD);
print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESHOLD);
return ret;
}
int64_t get_serialize_size() const {
if (debug_ts_.src_serialize_ts_ == 0) {// cause get_serialize_size maybe call more than once
const_cast<int64_t&>(debug_ts_.src_serialize_ts_) = ObClockGenerator::getRealClock();
print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESTHOLD);
print_debug_ts_if_reach_warn_threshold(*this, MSG_DELAY_WARN_THRESHOLD);
}
return ElectionChangeLeaderMsgMiddle::get_serialize_size();
}
......
......@@ -17,6 +17,7 @@
// or may cause MACRO pollution
#include "lib/oblog/ob_log_module.h"
#include "share/ob_occam_time_guard.h"
#include <algorithm>
#define LOG_PHASE(level, phase, info, args...) \
do {\
......@@ -83,15 +84,16 @@ enum class LogPhase
SET_MEMBER = 7,
};
constexpr int64_t MSG_DELAY_WARN_THRESTHOLD = 200_ms;
constexpr int64_t MSG_DELAY_WARN_THRESHOLD = 200_ms;
constexpr int64_t MAX_LEASE_TIME = 10_s;
constexpr int64_t PRIORITY_BUFFER_SIZE = 512;
constexpr int64_t INVALID_VALUE = -1;// 所有int64_t变量的初始默认无效值
extern int64_t MAX_TST; // 最大单程消息延迟,暂设为750ms,在单测中会将其调低,日后可改为配置项,现阶段先用全局变量代替
inline int64_t CALCULATE_RENEW_LEASE_INTERVAL() { return 0.5 * MAX_TST; }// 续约的周期,目前是325ms,在暂时没有切主流程优化的情况下,设置的间隔短一些,为了及时切主
inline int64_t CALCULATE_TIME_WINDOW_SPAN_TS() { return 2 * MAX_TST; }// 时间窗口的长度,为两个最大单程消息延迟
inline int64_t CALCULATE_MAX_ELECT_COST_TIME() { return 10 * MAX_TST; }// 一次选举可能出现的最大耗时设置,设置为10s
inline int64_t CALCULATE_LEASE_INTERVAL() { return 4 * MAX_TST; }// 4个消息延迟是3s
inline int64_t CALCULATE_RENEW_LEASE_INTERVAL() { return std::min<int64_t>(0.5 * MAX_TST, 250_ms); }// 续约周期固定为消息延迟的一半,最大不超过250ms
inline int64_t CALCULATE_TIME_WINDOW_SPAN_TS() { return 2 * MAX_TST; }// 时间窗口的长度,为两个最大单程消息延迟, 默认为2s
inline int64_t CALCULATE_MAX_ELECT_COST_TIME() { return 10 * MAX_TST; }// 一次选举可能出现的最大耗时设置,默认为10s
inline int64_t CALCULATE_LEASE_INTERVAL() { return 4 * MAX_TST; }// 4个消息延迟,默认是4s
inline int64_t CALCULATE_TRIGGER_ELECT_WATER_MARK() { return std::min<int64_t>(MAX_TST, 1_s); }// 触发无主选举的Lease剩余水位线,1个最大消息延迟,最大不超过1s
}// namespace election
}// namespace palf
......
......@@ -12,6 +12,7 @@
#include "log_request_handler.h"
#include "log_req.h"
#include "share/ob_occam_time_guard.h"
namespace oceanbase
{
......@@ -383,15 +384,16 @@ int LogRequestHandler::handle_request<MsgType>(\
const ObAddr &server,\
const MsgType &req)\
{\
TIMEGUARD_INIT(ELECT, 50_ms, 10_s);\
int ret = common::OB_SUCCESS;\
if (false == is_valid_palf_id(palf_id) || false == req.is_valid()) {\
ret = OB_INVALID_ARGUMENT;\
PALF_LOG(ERROR, "Invalid argument!!!", K(ret), K(palf_id), K(req));\
} else {\
PalfHandleImplGuard guard;\
if (OB_FAIL(palf_env_impl_->get_palf_handle_impl(palf_id, guard))) {\
if (CLICK_FAIL(palf_env_impl_->get_palf_handle_impl(palf_id, guard))) {\
PALF_LOG(WARN, "ObLogMgr get_log_service failed", K(ret), K(palf_id), KP(palf_env_impl_));\
} else if (OB_FAIL(guard.get_palf_handle_impl()->handle_election_message(req))) {\
} else if (CLICK_FAIL(guard.get_palf_handle_impl()->handle_election_message(req))) {\
PALF_LOG(WARN, "handle message failed", K(ret), K(palf_id), K(server), K(req));\
} else {\
PALF_LOG(DEBUG, "handle message success", K(ret), K(palf_id), K(server), K(req));\
......
......@@ -13,14 +13,9 @@
#ifndef OCEANBASE_LOGSERVICE_LOG_RPC_MACROS_
#define OCEANBASE_LOGSERVICE_LOG_RPC_MACROS_
#define DEFINE_RPC_PROCESSOR(CLASS, PROXY, REQTYPE, PCODE) \
class CLASS : public obrpc::ObRpcProcessor<PROXY::ObRpc<PCODE>> \
{ \
public: \
CLASS() : palf_env_impl_(NULL), filter_(NULL) {} \
virtual ~CLASS() {} \
int process() \
{ \
#include "share/ob_occam_time_guard.h"
#define __RPC_PROCESS_CODE__(REQTYPE) \
int ret = OB_SUCCESS; \
LogRpcPacketImpl<REQTYPE> &rpc_packet = arg_; \
const REQTYPE &req = rpc_packet.req_; \
......@@ -36,6 +31,24 @@
PALF_LOG(TRACE, "Processor handle_request success", K(ret), K(palf_id), K(req), KP(filter_)); \
} \
return ret; \
#define __DEFINE_RPC_PROCESSOR__(CLASS, PROXY, REQTYPE, PCODE, ELECTION_MSG) \
class CLASS : public obrpc::ObRpcProcessor<PROXY::ObRpc<PCODE>> \
{ \
public: \
CLASS() : palf_env_impl_(NULL), filter_(NULL) {} \
virtual ~CLASS() {} \
int process() { return process_impl_(); } \
template <bool FLAG = ELECTION_MSG, typename std::enable_if<FLAG, bool>::type = true> \
int process_impl_() \
{ \
TIMEGUARD_INIT(ELECT, 50_ms, 10_s); \
__RPC_PROCESS_CODE__(REQTYPE) \
} \
template <bool FLAG = ELECTION_MSG, typename std::enable_if<!FLAG, bool>::type = true> \
int process_impl_() \
{ \
__RPC_PROCESS_CODE__(REQTYPE) \
} \
void set_palf_env_impl(void *palf_env_impl, void *filter) \
{ \
......@@ -48,6 +61,9 @@
ObFunction<bool(const ObAddr &src)> *filter_; \
}
#define DEFINE_RPC_PROCESSOR(CLASS, PROXY, REQTYPE, PCODE) __DEFINE_RPC_PROCESSOR__(CLASS, PROXY, REQTYPE, PCODE, false)
#define DEFINE_ELECTION_RPC_PROCESSOR(CLASS, PROXY, REQTYPE, PCODE) __DEFINE_RPC_PROCESSOR__(CLASS, PROXY, REQTYPE, PCODE, true)
#define DECLARE_RPC_PROXY_POST_FUNCTION(PRIO, REQTYPE, PCODE) \
RPC_AP(PRIO post_packet, PCODE, (palf::LogRpcPacketImpl<palf::REQTYPE>)); \
int post_packet(const common::ObAddr &dst, const palf::LogRpcPacketImpl<palf::REQTYPE> &pkt, const int64_t tenant_id)
......@@ -72,6 +88,7 @@
int LogRpcProxyV2::post_packet(const common::ObAddr &dst, const palf::LogRpcPacketImpl<palf::REQTYPE> &pkt, \
const int64_t tenant_id) \
{ \
TIMEGUARD_INIT(ELECT, 50_ms, 10_s); \
int ret = common::OB_SUCCESS; \
static obrpc::LogRpcCB<obrpc::PCODE> cb; \
ret = this->to(dst) \
......@@ -79,7 +96,7 @@
.trace_time(true) \
.max_process_handler_time(100 * 1000) \
.by(tenant_id) \
.group_id(share::OBCG_ELECTION) \
.group_id(share::OBCG_ELECTION) \
.post_packet(pkt, &cb); \
return ret; \
}
......@@ -113,6 +130,7 @@
virtual ~CLASS() {} \
int process() \
{ \
TIMEGUARD_INIT(PALF, 100_ms, 10_s); \
int ret = OB_SUCCESS; \
LogRpcPacketImpl<REQTYPE> &rpc_packet = arg_; \
const REQTYPE &req = rpc_packet.req_; \
......@@ -120,11 +138,12 @@
int64_t palf_id = rpc_packet.palf_id_; \
RESPTYPE &resp = result_.req_; \
result_.palf_id_ = palf_id; \
if (OB_ISNULL(palf_env_impl_) && OB_FAIL(__get_palf_env_impl(rpc_pkt_->get_tenant_id(), palf_env_impl_))) { \
if (OB_ISNULL(palf_env_impl_) && CLICK_FAIL(__get_palf_env_impl(rpc_pkt_->get_tenant_id(), palf_env_impl_))) { \
PALF_LOG(WARN, "__get_palf_env_impl failed", K(ret), KPC(rpc_pkt_)); \
} else if (NULL != filter_ && true == (*filter_)(server)) { \
} else if (CLICK() && NULL != filter_ && true == (*filter_)(server)) { \
PALF_LOG(INFO, "need filter this packet", K(rpc_packet)); \
} else { \
CLICK(); \
LogRequestHandler handler(palf_env_impl_); \
ret = handler.handle_sync_request(palf_id, server, req, resp); \
} \
......
......@@ -97,30 +97,30 @@ DEFINE_RPC_PROCESSOR(LogRegisterParentRespP,
LogRegisterParentResp,
obrpc::OB_LOG_REGISTER_PARENT_RESP);
DEFINE_RPC_PROCESSOR(ElectionPrepareRequestMsgP,
obrpc::LogRpcProxyV2,
election::ElectionPrepareRequestMsg,
obrpc::OB_LOG_ELECTION_PREPARE_REQUEST);
DEFINE_RPC_PROCESSOR(ElectionPrepareResponseMsgP,
obrpc::LogRpcProxyV2,
election::ElectionPrepareResponseMsg,
obrpc::OB_LOG_ELECTION_PREPARE_RESPONSE);
DEFINE_RPC_PROCESSOR(ElectionAcceptRequestMsgP,
obrpc::LogRpcProxyV2,
election::ElectionAcceptRequestMsg,
obrpc::OB_LOG_ELECTION_ACCEPT_REQUEST);
DEFINE_RPC_PROCESSOR(ElectionAcceptResponseMsgP,
obrpc::LogRpcProxyV2,
election::ElectionAcceptResponseMsg,
obrpc::OB_LOG_ELECTION_ACCEPT_RESPONSE);
DEFINE_RPC_PROCESSOR(ElectionChangeLeaderMsgP,
obrpc::LogRpcProxyV2,
election::ElectionChangeLeaderMsg,
obrpc::OB_LOG_ELECTION_CHANGE_LEADER_REQUEST);
DEFINE_ELECTION_RPC_PROCESSOR(ElectionPrepareRequestMsgP,
obrpc::LogRpcProxyV2,
election::ElectionPrepareRequestMsg,
obrpc::OB_LOG_ELECTION_PREPARE_REQUEST);
DEFINE_ELECTION_RPC_PROCESSOR(ElectionPrepareResponseMsgP,
obrpc::LogRpcProxyV2,
election::ElectionPrepareResponseMsg,
obrpc::OB_LOG_ELECTION_PREPARE_RESPONSE);
DEFINE_ELECTION_RPC_PROCESSOR(ElectionAcceptRequestMsgP,
obrpc::LogRpcProxyV2,
election::ElectionAcceptRequestMsg,
obrpc::OB_LOG_ELECTION_ACCEPT_REQUEST);
DEFINE_ELECTION_RPC_PROCESSOR(ElectionAcceptResponseMsgP,
obrpc::LogRpcProxyV2,
election::ElectionAcceptResponseMsg,
obrpc::OB_LOG_ELECTION_ACCEPT_RESPONSE);
DEFINE_ELECTION_RPC_PROCESSOR(ElectionChangeLeaderMsgP,
obrpc::LogRpcProxyV2,
election::ElectionChangeLeaderMsg,
obrpc::OB_LOG_ELECTION_CHANGE_LEADER_REQUEST);
DEFINE_RPC_PROCESSOR(CommittedInfoP,
obrpc::LogRpcProxyV2,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册