提交 e3867470 编写于 作者: B BinChenn 提交者: ob-robot

PALF degrades log stream in more failure scenarios

上级 a1b98f97
......@@ -132,6 +132,7 @@ int ObSimpleLogServer::simple_init(
}
tenant_base_->init();
tenant_base_->set(&log_service_);
tenant_base_->set(&detector_);
ObTenantEnv::set_tenant(tenant_base_);
assert(&log_service_ == MTL(logservice::ObLogService*));
guard.click("init tenant_base");
......
......@@ -45,6 +45,7 @@
#include "share/ob_occam_timer.h"
#include "share/resource_manager/ob_cgroup_ctrl.h"
#include "logservice/ob_net_keepalive_adapter.h"
#include "logservice/leader_coordinator/ob_failure_detector.h"
#include <memory>
#include <map>
......@@ -283,6 +284,7 @@ private:
common::ObMySQLProxy sql_proxy_;
MockNetKeepAliveAdapter *net_keepalive_;
ObSrvRpcProxy srv_proxy_;
logservice::coordinator::ObFailureDetector detector_;
};
} // end unittest
......
......@@ -42,6 +42,34 @@ class TestObSimpleLogClusterArbService : public ObSimpleLogClusterTestEnv
public:
TestObSimpleLogClusterArbService() : ObSimpleLogClusterTestEnv()
{}
bool is_degraded(const PalfHandleImplGuard &leader,
const int64_t degraded_server_idx)
{
bool has_degraded = false;
while (!has_degraded) {
common::GlobalLearnerList degraded_learner_list;
leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list);
has_degraded = degraded_learner_list.contains(get_cluster()[degraded_server_idx]->get_addr());
sleep(1);
PALF_LOG(INFO, "wait degrade");
}
return has_degraded;
}
bool is_upgraded(PalfHandleImplGuard &leader, const int64_t palf_id)
{
bool has_upgraded = false;
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, palf_id));
while (!has_upgraded) {
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, palf_id));
common::GlobalLearnerList degraded_learner_list;
leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list);
has_upgraded = (0 == degraded_learner_list.get_member_number());
sleep(1);
PALF_LOG(INFO, "wait upgrade");
}
return has_upgraded;
}
};
int64_t ObSimpleLogClusterTestBase::member_cnt_ = 3;
......@@ -68,37 +96,49 @@ TEST_F(TestObSimpleLogClusterArbService, test_2f1a_degrade_upgrade)
EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
const int64_t another_f_idx = (leader_idx+1)%3;
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, id));
sleep(2);
// 为备副本设置location cb,用于备副本找leader
palf_list[another_f_idx]->get_palf_handle_impl()->set_location_cache_cb(&loc_cb);
block_net(leader_idx, another_f_idx);
// do not check OB_SUCCESS, may return OB_NOT_MASTER during degrading member
submit_log(leader, 100, id);
bool has_degraded = false;
while (!has_degraded) {
common::GlobalLearnerList degraded_learner_list;
leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list);
has_degraded = degraded_learner_list.contains(palf_list[another_f_idx]->palf_handle_impl_->self_);
sleep(1);
PALF_LOG(INFO, "wait degrade");
}
EXPECT_TRUE(has_degraded);
// 等待lease过期,验证location cb是否可以找到leader
sleep(5);
EXPECT_TRUE(is_degraded(leader, another_f_idx));
loc_cb.leader_ = leader.palf_handle_impl_->self_;
unblock_net(leader_idx, another_f_idx);
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, id));
EXPECT_TRUE(is_upgraded(leader, id));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, id));
// set clog disk error
ObTenantEnv::set_tenant(get_cluster()[leader_idx+1]->get_tenant_base());
logservice::coordinator::ObFailureDetector *detector = MTL(logservice::coordinator::ObFailureDetector *);
if (NULL != detector) {
detector->has_add_clog_full_event_ = true;
}
EXPECT_TRUE(is_degraded(leader, another_f_idx));
bool has_upgraded = false;
while (!has_upgraded) {
common::GlobalLearnerList degraded_learner_list;
leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list);
has_upgraded = (0 == degraded_learner_list.get_member_number());
sleep(1);
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id));
PALF_LOG(INFO, "wait upgrade");
if (NULL != detector) {
detector->has_add_clog_full_event_ = false;
}
EXPECT_TRUE(is_upgraded(leader, id));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id));
// test disable sync
palf_list[another_f_idx]->palf_handle_impl_->disable_sync();
EXPECT_TRUE(is_degraded(leader, another_f_idx));
palf_list[another_f_idx]->palf_handle_impl_->enable_sync();
EXPECT_TRUE(is_upgraded(leader, id));
// test disbale vote
palf_list[another_f_idx]->palf_handle_impl_->disable_vote();
EXPECT_TRUE(is_degraded(leader, another_f_idx));
palf_list[another_f_idx]->palf_handle_impl_->enable_vote();
EXPECT_TRUE(is_upgraded(leader, id));
revert_cluster_palf_handle_guard(palf_list);
leader.reset();
delete_paxos_group(id);
......@@ -133,30 +173,19 @@ TEST_F(TestObSimpleLogClusterArbService, test_4f1a_degrade_upgrade)
block_all_net(another_f1_idx);
block_all_net(another_f2_idx);
bool has_degraded = false;
while (!has_degraded) {
common::GlobalLearnerList degraded_learner_list;
leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list);
has_degraded = degraded_learner_list.contains(get_cluster()[another_f1_idx]->get_addr());
has_degraded = has_degraded && degraded_learner_list.contains(get_cluster()[another_f2_idx]->get_addr());
sleep(1);
}
EXPECT_TRUE(has_degraded);
EXPECT_TRUE(is_degraded(leader, another_f1_idx));
EXPECT_TRUE(is_degraded(leader, another_f2_idx));
// 确保lease过期,验证loc_cb是否可以找到leader拉日志
sleep(5);
unblock_all_net(another_f1_idx);
unblock_all_net(another_f2_idx);
loc_cb.leader_ = leader.palf_handle_impl_->self_;
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, id));
EXPECT_TRUE(is_upgraded(leader, id));
bool has_upgraded = false;
while (!has_upgraded) {
common::GlobalLearnerList degraded_learner_list;
leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list);
has_upgraded = (0 == degraded_learner_list.get_member_number());
sleep(1);
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id));
}
revert_cluster_palf_handle_guard(palf_list);
leader.reset();
delete_paxos_group(id);
......@@ -191,16 +220,8 @@ TEST_F(TestObSimpleLogClusterArbService, test_2f1a_reconfirm_degrade_upgrade)
submit_log(leader, 20, id);
// submit some logs which will be truncated
bool has_degraded = false;
while (!has_degraded) {
common::GlobalLearnerList degraded_learner_list;
palf_list[another_f_idx]->palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list);
ObAddr old_leader_addr = palf_list[leader_idx]->palf_handle_impl_->self_;
has_degraded = degraded_learner_list.contains(old_leader_addr);
PALF_LOG(INFO, "check has_degraded", K(has_degraded), K(degraded_learner_list), K(old_leader_addr));
sleep(1);
}
EXPECT_TRUE(has_degraded);
EXPECT_TRUE(is_degraded(*palf_list[another_f_idx], leader_idx));
int64_t new_leader_idx = -1;
PalfHandleImplGuard new_leader;
EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, new_leader_idx));
......@@ -210,15 +231,7 @@ TEST_F(TestObSimpleLogClusterArbService, test_2f1a_reconfirm_degrade_upgrade)
unblock_net(leader_idx, arb_replica_idx);
EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 100, id));
bool has_upgraded = false;
while (!has_upgraded) {
common::GlobalLearnerList degraded_learner_list;
new_leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list);
has_upgraded = (0 == degraded_learner_list.get_member_number());
PALF_LOG(INFO, "wait upgrade", K(degraded_learner_list));
EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id));
sleep(1);
}
EXPECT_TRUE(is_upgraded(new_leader, id));
revert_cluster_palf_handle_guard(palf_list);
leader.reset();
new_leader.reset();
......@@ -267,16 +280,8 @@ TEST_F(TestObSimpleLogClusterArbService, test_4f1a_reconfirm_degrade_upgrade)
EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, new_leader_idx));
}
bool has_degraded = false;
while (!has_degraded) {
common::GlobalLearnerList degraded_learner_list;
new_leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list);
has_degraded = degraded_learner_list.contains(cluster[another_f1_idx]->get_addr());
has_degraded = has_degraded && degraded_learner_list.contains(cluster[leader_idx]->get_addr());
sleep(1);
EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id));
}
EXPECT_TRUE(has_degraded);
EXPECT_TRUE(is_degraded(new_leader, another_f1_idx));
EXPECT_TRUE(is_degraded(new_leader, leader_idx));
sleep(5);
loc_cb.leader_ = new_leader.palf_handle_impl_->self_;
......@@ -286,15 +291,7 @@ TEST_F(TestObSimpleLogClusterArbService, test_4f1a_reconfirm_degrade_upgrade)
EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 100, id));
bool has_upgraded = false;
while (!has_upgraded) {
common::GlobalLearnerList degraded_learner_list;
new_leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list);
has_upgraded = (0 == degraded_learner_list.get_member_number());
PALF_LOG(INFO, "wait upgrade", K(ret), K(degraded_learner_list));
EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id));
sleep(1);
}
EXPECT_TRUE(is_upgraded(new_leader, id));
leader.reset();
new_leader.reset();
revert_cluster_palf_handle_guard(palf_list);
......@@ -479,6 +476,7 @@ TEST_F(TestObSimpleLogClusterArbService, test_2f1a_defensive)
int64_t mode_version;
EXPECT_EQ(OB_SUCCESS, get_middle_scn(50, leader, flashback_scn, header_origin));
switch_append_to_flashback(leader, mode_version);
sleep(1);
EXPECT_EQ(OB_SUCCESS, palf_list[another_f_idx]->palf_handle_impl_->flashback(mode_version, flashback_scn, CONFIG_CHANGE_TIMEOUT));
// remove another follower
......
......@@ -248,17 +248,23 @@ OB_SERIALIZE_MEMBER(LogGetPalfStatResp, palf_stat_);
// ============= LogServerProbeMsg start =============
LogServerProbeMsg::LogServerProbeMsg()
: src_(),
palf_id_(-1),
req_id_(-1),
msg_type_(PROBE_REQ),
req_ts_(OB_INVALID_TIMESTAMP) { }
server_status_(-1) { }
LogServerProbeMsg::LogServerProbeMsg(
const common::ObAddr &src,
const int64_t palf_id,
const int64_t req_id,
const LogServerProbeType msg_type,
const int64_t req_ts)
const int64_t status)
: src_(src),
palf_id_(palf_id),
req_id_(req_id),
msg_type_(msg_type),
req_ts_(req_ts) { }
server_status_(status) { }
LogServerProbeMsg::~LogServerProbeMsg()
{
......@@ -267,17 +273,19 @@ LogServerProbeMsg::~LogServerProbeMsg()
bool LogServerProbeMsg::is_valid() const
{
return src_.is_valid() && req_ts_ != OB_INVALID_TIMESTAMP;
return src_.is_valid() && -1 != palf_id_ && req_id_ != -1 && server_status_ != -1;
}
void LogServerProbeMsg::reset()
{
src_.reset();
palf_id_ = -1;
req_id_ = -1;
msg_type_ = PROBE_REQ;
req_ts_ = OB_INVALID_TIMESTAMP;
server_status_ = -1;
}
OB_SERIALIZE_MEMBER(LogServerProbeMsg, src_, msg_type_, req_ts_);
OB_SERIALIZE_MEMBER(LogServerProbeMsg, src_, palf_id_, req_id_, msg_type_, server_status_);
// ============= LogServerProbeMsg end =============
// ============= LogChangeAccessModeCmd start =============
......
......@@ -170,14 +170,20 @@ struct LogServerProbeMsg {
OB_UNIS_VERSION(1);
public:
LogServerProbeMsg();
LogServerProbeMsg(const common::ObAddr &src, const LogServerProbeType msg_type, const int64_t req_ts);
LogServerProbeMsg(const common::ObAddr &src,
const int64_t palf_id,
const int64_t req_id,
const LogServerProbeType msg_type,
const int64_t status);
~LogServerProbeMsg();
bool is_valid() const;
void reset();
TO_STRING_KV(K_(src), K_(msg_type), K_(req_ts));
TO_STRING_KV(K_(src), K_(palf_id), K_(req_id), K_(msg_type), K_(server_status));
common::ObAddr src_;
int64_t palf_id_;
int64_t req_id_;
LogServerProbeType msg_type_;
int64_t req_ts_;
int64_t server_status_;
};
struct LogChangeAccessModeCmd {
......
// Copyright (c) 2021 OceanBase
// OceanBase is licensed under Mulan PubL v2.
// You can use this software according to the terms and conditions of the Mulan PubL v2.
// You may obtain a copy of Mulan PubL v2 at:
// http://license.coscl.org.cn/MulanPubL-2.0
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
// See the Mulan PubL v2 for more details.
#ifndef OCEANBASE_LOGSERVICE_OB_ARBITRATION_SERVICE_H_
#define OCEANBASE_LOGSERVICE_OB_ARBITRATION_SERVICE_H_
#include "share/ob_thread_pool.h"
#include "share/ob_occam_timer.h"
#include "lib/hash/ob_linear_hash_map.h" // ObLinearHashMap
#include "lib/lock/ob_tc_rwlock.h" // RWLock
#include "common/ob_member_list.h" // common::ObMemberList
#include "logrpc/ob_log_rpc_req.h" // ProbeMsg
namespace oceanbase
{
namespace palf
{
class PalfEnv;
class LogMemberAckInfo;
}
namespace obrpc
{
class ObLogServiceRpcProxy;
}
namespace logservice
{
class IObNetKeepAliveAdapter;
enum ServerAliveStatus
{
UNKNOWN = 0,
ALIVE,
DEAD,
};
inline const char *server_status_to_string(ServerAliveStatus status)
{
#define SERVER_ALIVE_STATUS_TO_STR(x) case(ServerAliveStatus::x): return #x
switch(status)
{
SERVER_ALIVE_STATUS_TO_STR(ALIVE);
SERVER_ALIVE_STATUS_TO_STR(DEAD);
default:
return "UNKNOWN";
}
#undef SERVER_ALIVE_STATUS_TO_STR
}
struct ServerProbeCtx
{
ServerProbeCtx()
: last_req_ts_(OB_INVALID_TIMESTAMP),
last_resp_ts_(OB_INVALID_TIMESTAMP),
alive_status_(UNKNOWN)
{ }
void reset_probe_ts()
{
last_req_ts_ = OB_INVALID_TIMESTAMP;
last_resp_ts_ = OB_INVALID_TIMESTAMP;
}
// ts are valid only in current status
int64_t last_req_ts_;
int64_t last_resp_ts_;
ServerAliveStatus alive_status_;
TO_STRING_KV(K_(last_req_ts), K_(last_resp_ts), "alive_status", server_status_to_string(alive_status_));
};
class ServerProbeService
{
public:
ServerProbeService()
: self_(),
probing_servers_(),
rpc_proxy_(NULL),
timer_(),
is_running_(false),
is_inited_(false)
{ }
~ServerProbeService() { destroy(); }
int init(const common::ObAddr &self, obrpc::ObLogServiceRpcProxy *rpc_proxy);
void destroy();
int start_probe_server(const common::ObAddr &server);
int stop_probe_server(const common::ObAddr &server);
int get_server_alive_status(const common::ObAddr &server, ServerAliveStatus &status) const;
void run_probe_once();
int handle_server_probe_msg(const common::ObAddr &sender, const LogServerProbeMsg &req);
private:
void run_probe_once_();
private:
typedef RWLock::RLockGuard RLockGuard;
typedef RWLock::WLockGuard WLockGuard;
static constexpr int64_t DEAD_SERVER_TIMEOUT = 2 * 1000 * 1000; // 2s
typedef common::ObLinearHashMap<common::ObAddr, ServerProbeCtx> ServerProbeMap;
private:
mutable RWLock lock_;
common::ObAddr self_;
ServerProbeMap probing_servers_;
obrpc::ObLogServiceRpcProxy *rpc_proxy_;
ObOccamTimer timer_;
bool is_running_;
bool is_inited_;
};
class ObArbitrationService : public share::ObThreadPool
{
public:
ObArbitrationService();
virtual ~ObArbitrationService();
int init(const common::ObAddr &self,
palf::PalfEnv *palf_env,
obrpc::ObLogServiceRpcProxy *rpc_proxy,
IObNetKeepAliveAdapter *net_keepalive);
int start();
void destroy();
void run1();
int handle_server_probe_msg(const common::ObAddr &sender, const LogServerProbeMsg &req);
private:
static constexpr int64_t MIN_LOOP_INTERVAL_US = 10 * 1000; // 10ms
static constexpr int64_t DEGRADE_ACTION_TIMEOUT_US = 10 * 1000 * 1000L; // 10s
static constexpr int64_t UPGRADE_ACTION_TIMEOUT_US = 10 * 1000 * 1000L; // 10s
static constexpr int64_t MAX_PALF_COUNT = 200;
typedef common::ObLinearHashMap<palf::LSKey, palf::LogMemberAckInfoList> PalfAckInfoMap;
private:
class DoDegradeFunctor
{
public:
explicit DoDegradeFunctor(const common::ObAddr &addr,
palf::PalfEnv *palf_env,
ServerProbeService *probe_srv,
IObNetKeepAliveAdapter *net_keepalive)
: self_(addr),
palf_env_(palf_env),
probe_srv_(probe_srv),
net_keepalive_(net_keepalive)
{ }
bool operator()(const palf::LSKey &ls_key, const palf::LogMemberAckInfoList &degrade_servers);
public:
common::ObSEArray<int64_t, MAX_PALF_COUNT> need_removed_palfs_;
bool is_valid()
{
return self_.is_valid() && OB_NOT_NULL(palf_env_) &&
OB_NOT_NULL(probe_srv_) && OB_NOT_NULL(net_keepalive_);
}
private:
bool is_all_other_servers_alive_(
const common::ObMemberList &all_servers,
const common::GlobalLearnerList &degraded_list,
const palf::LogMemberAckInfoList &excepted_servers);
bool is_allow_degrade_(
const common::ObMemberList &expected_member_list,
const int64_t replica_num,
const common::GlobalLearnerList &degraded_list,
const palf::LogMemberAckInfoList &may_be_degraded_servers);
common::ObAddr self_;
palf::PalfEnv *palf_env_;
ServerProbeService *probe_srv_;
IObNetKeepAliveAdapter *net_keepalive_;
};
class DoUpgradeFunctor
{
public:
explicit DoUpgradeFunctor(palf::PalfEnv *palf_env, ServerProbeService *probe_srv, IObNetKeepAliveAdapter *net_keepalive)
: palf_env_(palf_env),
probe_srv_(probe_srv),
net_keepalive_(net_keepalive)
{ }
bool operator()(const palf::LSKey &ls_key, const palf::LogMemberAckInfoList &upgrade_servers);
public:
common::ObSEArray<int64_t, MAX_PALF_COUNT> need_removed_palfs_;
private:
palf::PalfEnv *palf_env_;
ServerProbeService *probe_srv_;
IObNetKeepAliveAdapter *net_keepalive_;
};
private:
int start_probe_servers_(const palf::LogMemberAckInfoList &servers);
int start_probe_servers_(const common::ObMemberList &servers);
void start_probe_server_(const common::ObAddr &server) const;
int follower_probe_others_(const int64_t palf_id, const common::ObMemberList &paxos_member_list);
int get_server_sync_info_();
void run_loop_();
void update_arb_timeout_();
int update_server_map_(PalfAckInfoMap &palf_map,
const palf::LSKey &key,
const palf::LogMemberAckInfoList &val,
bool &is_updated)
{
int ret = OB_SUCCESS;
palf::LogMemberAckInfoList existed_val;
is_updated = false;
if (OB_FAIL(palf_map.get(key, existed_val))) {
if (OB_ENTRY_NOT_EXIST == ret) {
if (OB_FAIL(palf_map.insert(key, val))) {
CLOG_LOG(WARN, "palf_map insert failed", K(key), K(val));
} else {
is_updated = true;
}
}
} else if (palf::ack_info_list_addr_equal(existed_val, val)) {
// pass, do not insert
} else if (OB_FAIL(palf_map.insert_or_update(key, val))) {
CLOG_LOG(WARN, "palf_map insert_or_update failed", K(key), K(val));
} else {
is_updated = true;
CLOG_LOG(TRACE, "update_server_map_ success", KR(ret), K(key), K(val), K(is_updated));
}
CLOG_LOG(TRACE, "update_server_map_ finish", KR(ret), K(key), K(val), K(is_updated));
return ret;
}
private:
common::ObAddr self_;
ServerProbeService probe_srv_;
PalfAckInfoMap may_be_degraded_palfs_;
PalfAckInfoMap may_be_upgraded_palfs_;
int64_t arb_timeout_us_;
int64_t follower_last_probe_time_us_;
palf::PalfEnv *palf_env_;
obrpc::ObLogServiceRpcProxy *rpc_proxy_;
IObNetKeepAliveAdapter *net_keepalive_;
bool is_inited_;
};
} // logservice
} // oceanbase
#endif
......@@ -82,7 +82,8 @@ struct LogMemberAckInfo
typedef common::ObSEArray<LogMemberAckInfo, common::OB_MAX_MEMBER_NUMBER> LogMemberAckInfoList;
inline int64_t ack_info_list_get_index(const LogMemberAckInfoList &list_a,
template<typename T = LogMemberAckInfo>
inline int64_t ack_info_list_get_index(const common::ObSEArray<T, common::OB_MAX_MEMBER_NUMBER> &list_a,
const common::ObAddr &addr)
{
int64_t index = -1;
......@@ -95,8 +96,9 @@ inline int64_t ack_info_list_get_index(const LogMemberAckInfoList &list_a,
return index;
}
template<typename T = LogMemberAckInfo>
inline bool ack_info_list_addr_equal(const common::GlobalLearnerList &list_a,
const LogMemberAckInfoList &list_b)
const common::ObSEArray<T, common::OB_MAX_MEMBER_NUMBER> &list_b)
{
bool bool_ret = true;
if (list_a.get_member_number() != list_b.count()) {
......@@ -112,8 +114,9 @@ inline bool ack_info_list_addr_equal(const common::GlobalLearnerList &list_a,
return bool_ret;
}
inline bool ack_info_list_addr_equal(const LogMemberAckInfoList &list_a,
const LogMemberAckInfoList &list_b)
template<typename T = LogMemberAckInfo>
inline bool ack_info_list_addr_equal(const common::ObSEArray<T, common::OB_MAX_MEMBER_NUMBER> &list_a,
const common::ObSEArray<T, common::OB_MAX_MEMBER_NUMBER> &list_b)
{
bool bool_ret = true;
if (list_a.count() != list_b.count()) {
......
......@@ -412,6 +412,15 @@ int PalfHandle::disable_vote()
return ret;
}
bool PalfHandle::is_vote_enabled() const
{
int ret = OB_SUCCESS;
bool bool_ret = false;
CHECK_VALID;
bool_ret = palf_handle_impl_->is_vote_enabled();
return bool_ret;
}
int PalfHandle::enable_vote()
{
int ret = OB_SUCCESS;
......
......@@ -318,6 +318,11 @@ public:
// OB_SUCCESS
int get_access_mode(int64_t &mode_version, AccessMode &access_mode) const;
int get_access_mode(AccessMode &access_mode) const;
// @brief: check whether the palf instance is allowed to vote for logs
// By default, return true;
// After calling disable_vote(), return false.
bool is_vote_enabled() const;
// @brief: store a persistent flag which means this paxos replica
// can not reply ack when receiving logs.
// By default, paxos replica can reply ack.
......
......@@ -1207,6 +1207,16 @@ int PalfHandleImpl::disable_sync()
return ret;
}
bool PalfHandleImpl::is_vote_enabled() const
{
bool bool_ret = false;
if (IS_NOT_INIT) {
} else {
bool_ret = state_mgr_.is_allow_vote();
}
return bool_ret;
}
int PalfHandleImpl::disable_vote()
{
int ret = OB_SUCCESS;
......
......@@ -558,6 +558,11 @@ public:
const int64_t prev_log_id,
const int64_t &prev_log_proposal_id,
const LSN &committed_end_lsn) = 0;
// @brief: check whether the palf instance is allowed to vote for logs
// By default, return true;
// After calling disable_vote(), return false.
virtual bool is_vote_enabled() const = 0;
// @brief: store a persistent flag which means this paxos replica
// can not reply ack when receiving logs.
// By default, paxos replica can reply ack.
......@@ -695,6 +700,7 @@ public:
int advance_base_info(const PalfBaseInfo &palf_base_info, const bool is_rebuild) override final;
int locate_by_scn_coarsely(const share::SCN &scn, LSN &result_lsn) override final;
int locate_by_lsn_coarsely(const LSN &lsn, share::SCN &result_scn) override final;
bool is_vote_enabled() const override final;
int disable_vote() override final;
int enable_vote() override final;
public:
......
......@@ -60,65 +60,65 @@ TEST_F(TestObArbitrationService, locality_allow_degrade_test)
{
// 2F, degrade 1, allow
MockNetKeepAliveAdapter net_keepalive;
ObArbitrationService::DoDegradeFunctor do_degrade_func(addr1, NULL, NULL, &net_keepalive);
ObArbitrationService::DoDegradeFunctor do_degrade_func(addr1, NULL, NULL);
const int64_t palf_id = 1;
const int64_t replica_num = 2;
paxos_list.add_server(addr1);
paxos_list.add_server(addr2);
palf::LogMemberAckInfoList dead_servers;
LogMemberStatusList dead_servers;
common::GlobalLearnerList degraded_servers;
EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberAckInfo(ObMember(addr1, 1), 1, LSN(1000))));
EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberStatus(LogMemberAckInfo(ObMember(addr1, 1), 1, LSN(1000)))));
EXPECT_TRUE(do_degrade_func.is_allow_degrade_(paxos_list, replica_num, degraded_servers, dead_servers));
}
{
// 4F, degrade 3, not allow
MockNetKeepAliveAdapter net_keepalive;
ObArbitrationService::DoDegradeFunctor do_degrade_func(addr1, NULL, NULL, &net_keepalive);
ObArbitrationService::DoDegradeFunctor do_degrade_func(addr1, NULL, NULL);
const int64_t palf_id = 1;
paxos_list.add_server(addr3);
paxos_list.add_server(addr4);
const int64_t replica_num = 4;
palf::LogMemberAckInfoList dead_servers;
LogMemberStatusList dead_servers;
common::GlobalLearnerList degraded_servers;
EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberAckInfo(ObMember(addr1, 1), 1, LSN(1000))));
EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberAckInfo(ObMember(addr2, 1), 1, LSN(1000))));
EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberAckInfo(ObMember(addr3, 1), 1, LSN(1000))));
EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberStatus(LogMemberAckInfo(ObMember(addr1, 1), 1, LSN(1000)))));
EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberStatus(LogMemberAckInfo(ObMember(addr2, 1), 1, LSN(1000)))));
EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberStatus(LogMemberAckInfo(ObMember(addr3, 1), 1, LSN(1000)))));
EXPECT_FALSE(do_degrade_func.is_allow_degrade_(paxos_list, replica_num, degraded_servers, dead_servers));
}
{
// 4F, degrade 1, not allow
MockNetKeepAliveAdapter net_keepalive;
ObArbitrationService::DoDegradeFunctor do_degrade_func(addr1, NULL, NULL, &net_keepalive);
ObArbitrationService::DoDegradeFunctor do_degrade_func(addr1, NULL, NULL);
const int64_t palf_id = 1;
const int64_t replica_num = 4;
palf::LogMemberAckInfoList dead_servers;
LogMemberStatusList dead_servers;
common::GlobalLearnerList degraded_servers;
EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberAckInfo(ObMember(addr1, 1), 1, LSN(1000))));
EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberStatus(LogMemberAckInfo(ObMember(addr1, 1), 1, LSN(1000)))));
EXPECT_FALSE(do_degrade_func.is_allow_degrade_(paxos_list, replica_num, degraded_servers, dead_servers));
}
{
// 3F1A, degrade 1, not allow
MockNetKeepAliveAdapter net_keepalive;
ObArbitrationService::DoDegradeFunctor do_degrade_func(addr1, NULL, NULL, &net_keepalive);
ObArbitrationService::DoDegradeFunctor do_degrade_func(addr1, NULL, NULL);
const int64_t palf_id = 1;
paxos_list.remove_server(addr4);
const int64_t replica_num = 3;
palf::LogMemberAckInfoList dead_servers;
LogMemberStatusList dead_servers;
common::GlobalLearnerList degraded_servers;
EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberAckInfo(ObMember(addr3, 1), 1, LSN(1000))));
EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberStatus(LogMemberAckInfo(ObMember(addr3, 1), 1, LSN(1000)))));
EXPECT_FALSE(do_degrade_func.is_allow_degrade_(paxos_list, replica_num, degraded_servers, dead_servers));
}
{
// 4F1A, degrade 2(addr2, addr3), allow
MockNetKeepAliveAdapter net_keepalive;
ObArbitrationService::DoDegradeFunctor do_degrade_func(addr1, NULL, NULL, &net_keepalive);
ObArbitrationService::DoDegradeFunctor do_degrade_func(addr1, NULL, NULL);
const int64_t palf_id = 1;
paxos_list.add_server(addr4);
const int64_t replica_num = 4;
palf::LogMemberAckInfoList dead_servers;
LogMemberStatusList dead_servers;
common::GlobalLearnerList degraded_servers;
EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberAckInfo(ObMember(addr2, 1), 1, LSN(1000))));
EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberAckInfo(ObMember(addr3, 1), 1, LSN(1000))));
EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberStatus(LogMemberAckInfo(ObMember(addr2, 1), 1, LSN(1000)))));
EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberStatus(LogMemberAckInfo(ObMember(addr3, 1), 1, LSN(1000)))));
EXPECT_TRUE(do_degrade_func.is_allow_degrade_(paxos_list, replica_num, degraded_servers, dead_servers));
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册