提交 665e7984 编写于 作者: O obdev 提交者: ob-robot

[Election] fix tenant coordinator and failure ddetector core-risk bug

上级 5b039c45
......@@ -657,8 +657,8 @@ int MockTenantModuleEnv::init()
MTL_BIND2(mtl_new_default, ObTenantTabletScheduler::mtl_init, nullptr, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
MTL_BIND2(mtl_new_default, share::ObTenantDagScheduler::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
MTL_BIND2(mtl_new_default, ObTenantCheckpointSlogHandler::mtl_init, nullptr, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
MTL_BIND2(mtl_new_default, coordinator::ObLeaderCoordinator::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
MTL_BIND2(mtl_new_default, coordinator::ObFailureDetector::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
MTL_BIND2(mtl_new_default, coordinator::ObLeaderCoordinator::mtl_init, coordinator::ObLeaderCoordinator::mtl_start, coordinator::ObLeaderCoordinator::mtl_stop, coordinator::ObLeaderCoordinator::mtl_wait, coordinator::ObLeaderCoordinator::mtl_destroy);
MTL_BIND2(mtl_new_default, coordinator::ObFailureDetector::mtl_init, coordinator::ObFailureDetector::mtl_start, coordinator::ObFailureDetector::mtl_stop, coordinator::ObFailureDetector::mtl_wait, coordinator::ObFailureDetector::mtl_destroy);
MTL_BIND2(ObLobManager::mtl_new, mtl_init_default, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
MTL_BIND2(mtl_new_default, share::detector::ObDeadLockDetectorMgr::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
MTL_BIND2(mtl_new_default, storage::ObTenantTabletStatMgr::mtl_init, nullptr, mtl_stop_default, mtl_wait_default, mtl_destroy_default)
......
......@@ -139,20 +139,7 @@ public:
bool operator==(const FailureEvent &rhs) {
bool ret = false;
if (type_ == rhs.type_ && module_ == rhs.module_ && level_ == rhs.level_) {
const char * p1 = info_.get_ob_string().ptr();
const char * p2 = rhs.info_.get_ob_string().ptr();
if (p1 == p2) {// 包含p1 == p2 == nullptr
ret = true;
} else if (p1 == nullptr || p2 == nullptr) {
ret = false;
} else {// p1和p2均不为空
while (*p1 != '\0' && *p2 != '\0' && (*(p1++) == *(p2++)));
if (*p1 == '\0' || *p2 == '\0') {
ret = true;
} else {
ret = false;
}
}
ret = (0 == info_.get_ob_string().case_compare(rhs.info_.get_ob_string()));
}
return ret;
}
......
......@@ -41,47 +41,90 @@ namespace coordinator
using namespace common;
ObFailureDetector::ObFailureDetector()
: coordinator_(nullptr),
: is_running_(false),
coordinator_(nullptr),
has_add_clog_hang_event_(false),
has_add_slog_hang_event_(false),
has_add_sstable_hang_event_(false),
has_add_clog_full_event_(false),
lock_(common::ObLatchIds::ELECTION_LOCK)
{}
{
COORDINATOR_LOG(INFO, "ObFailureDetector constructed");
}
constexpr int VALUE_BUFFER_SIZE = 512;
int ObFailureDetector::init(ObLeaderCoordinator *coordinator)
int ObFailureDetector::mtl_init(ObFailureDetector *&p_failure_detector)
{
LC_TIME_GUARD(1_s);
int ret = OB_SUCCESS;
event_.reset();
recover_detect_operation_.reset();
if (OB_NOT_NULL(coordinator_)) {
ret = OB_INIT_TWICE;
COORDINATOR_LOG(ERROR, "has been inited", KR(ret), K(MTL_ID()), KP_(coordinator));
} else if (OB_ISNULL(coordinator)) {
ObLeaderCoordinator *coordinator = MTL(ObLeaderCoordinator *);
if (OB_ISNULL(coordinator)) {
ret = OB_INVALID_ARGUMENT;
COORDINATOR_LOG(ERROR, "coordinator is nullptr", KR(ret), K(MTL_ID()));
COORDINATOR_LOG(ERROR, "coordinator is nullptr", KR(ret));
} else {
coordinator_ = coordinator;
if (CLICK_FAIL(coordinator_->failure_detect_timer_.schedule_task_repeat(failure_task_handle_, 100_ms, [this]() {
detect_failure();
return false;
}))) {
COORDINATOR_LOG(ERROR, "fail to schedule failure detect task", KR(ret), K(MTL_ID()));
} else if (CLICK_FAIL(coordinator_->recovery_detect_timer_.schedule_task_repeat(recovery_task_handle_, 1_s, [this]() {
detect_recover();
return false;
}))) {
COORDINATOR_LOG(ERROR, "fail to schedule recovery detect task", KR(ret), K(MTL_ID()));
} else {
COORDINATOR_LOG(INFO, "failure detector init success", KR(ret), K(MTL_ID()), K(lbt()));
}
p_failure_detector->coordinator_ = coordinator;
COORDINATOR_LOG(INFO, "ObFailureDetector mtl init");
}
return ret;
}
int ObFailureDetector::mtl_start(ObFailureDetector *&p_failure_detector)
{
LC_TIME_GUARD(1_s);
int ret = OB_SUCCESS;
p_failure_detector->events_with_ops_.reset();
if (OB_ISNULL(p_failure_detector->coordinator_)) {
ret = OB_NOT_INIT;
COORDINATOR_LOG(ERROR, "not init yet", KR(ret), KP_(p_failure_detector->coordinator));
} else if (CLICK_FAIL(p_failure_detector->coordinator_->failure_detect_timer_.schedule_task_repeat(p_failure_detector->failure_task_handle_, 100_ms, [p_failure_detector]() {
p_failure_detector->detect_failure();
return false;
}))) {
COORDINATOR_LOG(ERROR, "fail to schedule failure detect task", KR(ret));
} else if (CLICK_FAIL(p_failure_detector->coordinator_->recovery_detect_timer_.schedule_task_repeat(p_failure_detector->recovery_task_handle_, 1_s, [p_failure_detector]() {
p_failure_detector->detect_recover();
return false;
}))) {
COORDINATOR_LOG(ERROR, "fail to schedule recovery detect task", KR(ret));
} else {
p_failure_detector->is_running_ = true;
COORDINATOR_LOG(INFO, "ObFailureDetector mtl start");
}
return ret;
}
void ObFailureDetector::mtl_stop(ObFailureDetector *&p_failure_detector)
{
if (OB_ISNULL(p_failure_detector)) {
COORDINATOR_LOG(WARN, "p_failure_detector is NULL");
} else {
p_failure_detector->failure_task_handle_.stop();
p_failure_detector->recovery_task_handle_.stop();
COORDINATOR_LOG(INFO, "ObFailureDetector mtl init");
}
}
void ObFailureDetector::mtl_wait(ObFailureDetector *&p_failure_detector)
{
if (OB_ISNULL(p_failure_detector)) {
COORDINATOR_LOG(WARN, "p_failure_detector is NULL");
} else {
p_failure_detector->failure_task_handle_.wait();
p_failure_detector->recovery_task_handle_.wait();
COORDINATOR_LOG(INFO, "ObFailureDetector mtl init");
}
}
void ObFailureDetector::mtl_destroy(ObFailureDetector *&p_failure_detector)
{
if (OB_ISNULL(p_failure_detector)) {
COORDINATOR_LOG(WARN, "p_failure_detector is NULL");
} else {
COORDINATOR_LOG(INFO, "ObFailureDetector mtl destroy");
}
}
void ObFailureDetector::destroy()
{
LC_TIME_GUARD(1_s);
......@@ -91,64 +134,31 @@ void ObFailureDetector::destroy()
has_add_slog_hang_event_ = false;
has_add_sstable_hang_event_ = false;
has_add_clog_full_event_ = false;
COORDINATOR_LOG(INFO, "failure detector destroyed", K(MTL_ID()), K(lbt()));
}
int ObFailureDetector::mtl_init(ObFailureDetector *&p_failure_detector)
{
LC_TIME_GUARD(1_s);
int ret = OB_SUCCESS;
ObLeaderCoordinator *p_coordinator = MTL(ObLeaderCoordinator *);
if (OB_ISNULL(p_coordinator)) {
ret = OB_INVALID_ARGUMENT;
COORDINATOR_LOG(ERROR, "invalid argument", KR(ret), K(MTL_ID()));
} else if (CLICK_FAIL(p_failure_detector->init(p_coordinator))) {
COORDINATOR_LOG(ERROR, "init failure detector failed", KR(ret), K(MTL_ID()));
}
return ret;
COORDINATOR_LOG(INFO, "failure detector destroyed", K(lbt()));
}
void ObFailureDetector::detect_recover()
{
LC_TIME_GUARD(1_s);
int ret = OB_SUCCESS;
ObArray<FailureEventWithRecoverOp> temp_events_with_ops;
ObSpinLockGuard guard(lock_);
ObArray<FailureEvent> temp_event_;
ObArray<ObFunction<bool()>> temp_recover_detect_operation_;
if (!event_.empty()) {
COORDINATOR_LOG(INFO, "doing detect revocer operation", K(MTL_ID()), K_(event), K_(recover_detect_operation));
if (!events_with_ops_.empty()) {
COORDINATOR_LOG(INFO, "doing detect recover operation", K_(events_with_ops));
}
for (int64_t idx = 0; idx < event_.count() && OB_SUCC(ret); ++idx) {
if (recover_detect_operation_[idx].is_valid()) {
for (int64_t idx = 0; idx < events_with_ops_.count() && OB_SUCC(ret); ++idx) {
if (events_with_ops_[idx].recover_detect_operation_.is_valid()) {
LC_TIME_GUARD(10_ms);
bool recover_flag = recover_detect_operation_[idx]();
bool recover_flag = events_with_ops_[idx].recover_detect_operation_();
if (recover_flag) {
COORDINATOR_LOG(INFO, "revocer event detected", K(MTL_ID()), K(event_[idx]));
(void) insert_event_to_table_(event_[idx], recover_detect_operation_[idx], "DETECT REVOCER");
} else {
if (CLICK_FAIL(temp_event_.push_back(event_[idx]))) {
COORDINATOR_LOG(WARN, "fail to push event to temp_event_", KR(ret), K(MTL_ID()), K(event_[idx]));
} else if (CLICK_FAIL(temp_recover_detect_operation_.push_back(recover_detect_operation_[idx]))) {
temp_event_.pop_back();
COORDINATOR_LOG(WARN, "fail to push detect operation to temp_recover_detect_operation_", KR(ret), K(MTL_ID()), K(event_[idx]));
COORDINATOR_LOG(INFO, "revocer event detected", K(events_with_ops_[idx]));
if (CLICK_FAIL(events_with_ops_.remove(idx))) {
COORDINATOR_LOG(WARN, "fail to remove", K(events_with_ops_[idx]));
} else {
(void) insert_event_to_table_(events_with_ops_[idx].event_, events_with_ops_[idx].recover_detect_operation_, "DETECT REVOCER");
--idx;// next loop is still access to idx
}
}
} else {
if (CLICK_FAIL(temp_event_.push_back(event_[idx]))) {
COORDINATOR_LOG(WARN, "fail to push event to temp_event_", KR(ret), K(MTL_ID()), K(event_[idx]));
} else if (CLICK_FAIL(temp_recover_detect_operation_.push_back(recover_detect_operation_[idx]))) {
temp_event_.pop_back();
COORDINATOR_LOG(WARN, "fail to push detect operation to temp_recover_detect_operation_", KR(ret), K(MTL_ID()), K(event_[idx]));
}
}
}
if (OB_SUCC(ret)) {
if (temp_event_.count() != event_.count()) {
if (CLICK_FAIL(event_.assign(temp_event_))) {
COORDINATOR_LOG(WARN, "replace event array failed", KR(ret), K(MTL_ID()));
} else if (CLICK_FAIL(recover_detect_operation_.assign(temp_recover_detect_operation_))) {
COORDINATOR_LOG(WARN, "replace recover detect operation array failed", KR(ret), K(MTL_ID()));
}
}
}
}
......@@ -171,23 +181,28 @@ int ObFailureDetector::add_failure_event(const FailureEvent &event)
LC_TIME_GUARD(1_s);
int ret = OB_SUCCESS;
int64_t idx = 0;
FailureEventWithRecoverOp event_with_op;
ObSpinLockGuard guard(lock_);
for (; idx < event_.count(); ++idx) {
if (event_[idx] == event) {
break;
}
}
if (idx != event_.count()) {
ret = OB_ENTRY_EXIST;
COORDINATOR_LOG(WARN, "this failure event has been exist", KR(ret), K(MTL_ID()), K(event), K(event_));
} else if (CLICK_FAIL(event_.push_back(event))) {
COORDINATOR_LOG(WARN, "fail to push event to failure detector", KR(ret), K(MTL_ID()), K(event), K(event_));
} else if (CLICK_FAIL(recover_detect_operation_.push_back(ObFunction<bool()>()))) {
COORDINATOR_LOG(WARN, "fail to push default recover operation to failure detector", KR(ret), K(MTL_ID()), K(event), K(event_));
event_.pop_back();
if (!check_is_running_()) {
ret = OB_NOT_RUNNING;
COORDINATOR_LOG(WARN, "not running", KR(ret), K(event), K(events_with_ops_));
} else {
COORDINATOR_LOG(INFO, "success report a failure event without recover detect operation", KR(ret), K(MTL_ID()), K(event), K(event_));
(void) insert_event_to_table_(event, ObFunction<bool()>(), event.info_.get_ob_string());
for (; idx < events_with_ops_.count(); ++idx) {
if (events_with_ops_[idx].event_ == event) {
break;
}
}
if (idx != events_with_ops_.count()) {
ret = OB_ENTRY_EXIST;
COORDINATOR_LOG(WARN, "this failure event has been exist", KR(ret), K(event), K(events_with_ops_));
} else if (CLICK_FAIL(event_with_op.init(event, ObFunction<bool()>()))) {
COORDINATOR_LOG(WARN, "fail to init event with op", KR(ret), K(event), K(events_with_ops_));
} else if (CLICK_FAIL(events_with_ops_.push_back(event_with_op))) {
COORDINATOR_LOG(WARN, "fail to push", KR(ret), K(event), K(events_with_ops_));
} else {
COORDINATOR_LOG(INFO, "success report a failure event without recover detect operation", KR(ret), K(event), K(events_with_ops_));
(void) insert_event_to_table_(event, ObFunction<bool()>(), event.info_.get_ob_string());
}
}
return ret;
}
......@@ -197,23 +212,28 @@ int ObFailureDetector::add_failure_event(const FailureEvent &event, const ObFunc
LC_TIME_GUARD(1_s);
int ret = OB_SUCCESS;
int64_t idx = 0;
FailureEventWithRecoverOp event_with_op;
ObSpinLockGuard guard(lock_);
for (; idx < event_.count(); ++idx) {
if (event_[idx] == event) {
break;
}
}
if (idx != event_.count()) {
ret = OB_ENTRY_EXIST;
COORDINATOR_LOG(WARN, "this failure event has been exist", KR(ret), K(MTL_ID()), K(event), K(event_));
} else if (CLICK_FAIL(event_.push_back(event))) {
COORDINATOR_LOG(WARN, "fail to push event to failure detector", KR(ret), K(MTL_ID()), K(event), K(event_));
} else if (CLICK_FAIL(recover_detect_operation_.push_back(recover_detect_operation))) {
COORDINATOR_LOG(INFO, "fail to push recover operation to failure detector", KR(ret), K(MTL_ID()), K(event), K(event_));
event_.pop_back();
if (!check_is_running_()) {
ret = OB_NOT_RUNNING;
COORDINATOR_LOG(WARN, "not running", KR(ret), K(event), K(events_with_ops_));
} else {
COORDINATOR_LOG(INFO, "success report a failure event with recover detect operation", KR(ret), K(MTL_ID()), K(event), K(event_));
(void) insert_event_to_table_(event, recover_detect_operation, event.info_.get_ob_string());
for (; idx < events_with_ops_.count(); ++idx) {
if (events_with_ops_[idx].event_ == event) {
break;
}
}
if (idx != events_with_ops_.count()) {
ret = OB_ENTRY_EXIST;
COORDINATOR_LOG(WARN, "this failure event has been exist", KR(ret), K(event), K(events_with_ops_));
} else if (CLICK_FAIL(event_with_op.init(event, recover_detect_operation))) {
COORDINATOR_LOG(WARN, "fail to init event with op", KR(ret), K(event), K(events_with_ops_));
} else if (CLICK_FAIL(events_with_ops_.push_back(event_with_op))) {
COORDINATOR_LOG(WARN, "fail to push", KR(ret), K(event), K(events_with_ops_));
} else {
COORDINATOR_LOG(INFO, "success report a failure event with recover detect operation", KR(ret), K(event), K(events_with_ops_));
(void) insert_event_to_table_(event, recover_detect_operation, event.info_.get_ob_string());
}
}
return ret;
}
......@@ -224,64 +244,74 @@ int ObFailureDetector::remove_failure_event(const FailureEvent &event)
int ret = OB_SUCCESS;
int64_t idx = 0;
ObSpinLockGuard guard(lock_);
for (; idx < event_.count(); ++idx) {
if (event_[idx] == event) {
break;
}
}
if (idx == event_.count()) {
ret = OB_ENTRY_NOT_EXIST;
COORDINATOR_LOG(WARN, "this failure event not exist", KR(ret), K(MTL_ID()), K(event), K(event_));
if (!check_is_running_()) {
ret = OB_NOT_RUNNING;
COORDINATOR_LOG(WARN, "not running", KR(ret), K(event), K(events_with_ops_));
} else {
(void) insert_event_to_table_(event_[idx], recover_detect_operation_[idx], "REMOVE FAILURE");
if (CLICK_FAIL(event_.remove(idx))) {
COORDINATOR_LOG(WARN, "remove event failed", KR(ret), K(MTL_ID()), K(event), K(event_));
} else if (CLICK_FAIL(recover_detect_operation_.remove(idx))) {
COORDINATOR_LOG(WARN, "remove operation failed", KR(ret), K(MTL_ID()), K(event), K(event_));
for (; idx < events_with_ops_.count(); ++idx) {
if (events_with_ops_[idx].event_ == event) {
break;
}
}
if (idx == events_with_ops_.count()) {
ret = OB_ENTRY_NOT_EXIST;
COORDINATOR_LOG(WARN, "this failure event not exist", KR(ret), K(event), K(events_with_ops_));
} else {
COORDINATOR_LOG(INFO, "user remove failure event success", KR(ret), K(MTL_ID()), K(event), K(event_));
(void) insert_event_to_table_(events_with_ops_[idx].event_, events_with_ops_[idx].recover_detect_operation_, "REMOVE FAILURE");
if (CLICK_FAIL(events_with_ops_.remove(idx))) {
COORDINATOR_LOG(WARN, "remove event failed", KR(ret), K(event), K(events_with_ops_));
} else if (CLICK_FAIL(events_with_ops_.remove(idx))) {
COORDINATOR_LOG(ERROR, "remove event failed", KR(ret), K(event), K(events_with_ops_));
} else {
COORDINATOR_LOG(INFO, "user remove failure event success", KR(ret), K(event), K(events_with_ops_));
}
}
}
return ret;
}
int ObFailureDetector::insert_event_to_table_(const FailureEvent &event, const ObFunction<bool()> &recover_operation, ObString info)
int ObFailureDetector::get_specified_level_event(FailureLevel level, ObIArray<FailureEvent> &results)
{
LC_TIME_GUARD(1_s);
#define PRINT_WRAPPER KR(ret), K(MTL_ID()), K(event), K(recover_operation)
#define PRINT_WRAPPER KR(ret), K(level), K(results)
int ret = OB_SUCCESS;
if (CLICK_FAIL(SERVER_EVENT_ADD("FAILURE_DETECTOR",
common::to_cstring(info),
"FAILURE_MODULE",
obj_to_cstring(event.module_),
"FAILURE_TYPE",
obj_to_cstring(event.type_),
"AUTO_RECOVER",
common::to_cstring(recover_operation.is_valid())))) {
COORDINATOR_LOG_(WARN, "insert into __all_server_event_history failed");
ObSpinLockGuard guard(lock_);
if (!check_is_running_()) {
ret = OB_NOT_RUNNING;
COORDINATOR_LOG(WARN, "not running", KR(ret), K(events_with_ops_));
} else {
COORDINATOR_LOG_(INFO, "insert into __all_server_event_history success");
results.reset();
for (int64_t idx = 0; idx < events_with_ops_.count(); ++idx) {
if (events_with_ops_.at(idx).event_.get_failure_level() == level) {
if (CLICK_FAIL(results.push_back(events_with_ops_.at(idx).event_))) {
COORDINATOR_LOG_(WARN, "fail to push back event to results");
}
}
}
if (CLICK_FAIL(ret)) {
COORDINATOR_LOG_(WARN, "fail to get specified level failure event");
}
}
return ret;
#undef PRINT_WRAPPER
}
int ObFailureDetector::get_specified_level_event(FailureLevel level, ObIArray<FailureEvent> &results)
int ObFailureDetector::insert_event_to_table_(const FailureEvent &event, const ObFunction<bool()> &recover_operation, ObString info)
{
LC_TIME_GUARD(1_s);
#define PRINT_WRAPPER KR(ret), K(MTL_ID()), K(level), K(results)
#define PRINT_WRAPPER KR(ret), K(event), K(recover_operation)
int ret = OB_SUCCESS;
ObSpinLockGuard guard(lock_);
results.reset();
for (int64_t idx = 0; idx < event_.count(); ++idx) {
if (event_.at(idx).get_failure_level() == level) {
if (CLICK_FAIL(results.push_back(event_.at(idx)))) {
COORDINATOR_LOG_(WARN, "fail to push back event to results");
}
}
}
if (CLICK_FAIL(ret)) {
COORDINATOR_LOG_(WARN, "fail to get specified level failure event");
if (CLICK_FAIL(SERVER_EVENT_ADD("FAILURE_DETECTOR",
common::to_cstring(info),
"FAILURE_MODULE",
obj_to_cstring(event.module_),
"FAILURE_TYPE",
obj_to_cstring(event.type_),
"AUTO_RECOVER",
common::to_cstring(recover_operation.is_valid())))) {
COORDINATOR_LOG_(WARN, "insert into __all_server_event_history failed");
} else {
COORDINATOR_LOG_(INFO, "insert into __all_server_event_history success");
}
return ret;
#undef PRINT_WRAPPER
......@@ -442,6 +472,24 @@ void ObFailureDetector::detect_palf_disk_full_()
}
}
int ObFailureDetector::FailureEventWithRecoverOp::init(const FailureEvent &event,
const ObFunction<bool()> &recover_detect_operation)
{
LC_TIME_GUARD(1_s);
int ret = OB_SUCCESS;
if (CLICK_FAIL(event_.assign(event))) {
COORDINATOR_LOG(WARN, "fail to assign event", K(ret));
} else if (CLICK_FAIL(recover_detect_operation_.assign(recover_detect_operation))) {
COORDINATOR_LOG(WARN, "fail to assign op", K(ret));
}
return ret;
}
int ObFailureDetector::FailureEventWithRecoverOp::assign(const FailureEventWithRecoverOp &rhs)
{
return init(rhs.event_, rhs.recover_detect_operation_);
}
}
}
}
......@@ -57,6 +57,10 @@ public:
int init(ObLeaderCoordinator *coordinator);
void destroy();
static int mtl_init(ObFailureDetector *&p_failure_detector);
static int mtl_start(ObFailureDetector *&p_failure_detector);
static void mtl_stop(ObFailureDetector *&p_failure_detector);
static void mtl_wait(ObFailureDetector *&p_failure_detector);
static void mtl_destroy(ObFailureDetector *&p_failure_detector);
/**
* @description: 设置一个不可自动恢复的failure,需要由注册的模块手动调用remove_failure_event()接口恢复failure,否则将持续存在
* @param {FailureEvent} event failure事件,定义在failure_event.h中
......@@ -94,27 +98,28 @@ public:
* @return {*}
*/
void detect_failure();
/**
* @description: 定期探测与租户下的其他副本的网络连接是否正常
* @param {*}
* @return {*}
* @Date: 2022-01-04 21:12:16
*/
void detect_connection_status();
bool is_clog_disk_has_fatal_error();
bool is_data_disk_has_fatal_error();
private:
bool check_is_running_() const { return is_running_; }
int insert_event_to_table_(const FailureEvent &event, const ObFunction<bool()> &recover_operation, ObString info);
void detect_palf_hang_failure_();
void detect_slog_writter_hang_failure_();
void detect_sstable_io_failure_();
void detect_palf_disk_full_();
private:
struct FailureEventWithRecoverOp {
int init(const FailureEvent &event, const ObFunction<bool()> &recover_detect_operation);
int assign(const FailureEventWithRecoverOp &);
FailureEvent event_;
ObFunction<bool()> recover_detect_operation_;
TO_STRING_KV(K_(event));
};
// default time threshold for clog disk io hang detection is 5s
// TODO: this value should be a configuration
static const int64_t IO_HANG_TIME_THRESHOLD_US = 5 * 1000 * 1000;
common::ObArray<FailureEvent> event_;
common::ObArray<ObFunction<bool()>> recover_detect_operation_;
bool is_running_;
common::ObArray<FailureEventWithRecoverOp> events_with_ops_;
common::ObArray<common::ObAddr> tenant_server_list_;
common::ObOccamTimerTaskRAIIHandle failure_task_handle_;
common::ObOccamTimerTaskRAIIHandle recovery_task_handle_;
......
......@@ -32,16 +32,10 @@ using namespace share;
ObLeaderCoordinator::ObLeaderCoordinator()
: all_ls_election_reference_info_(nullptr),
is_inited_(false),
is_running_(false),
lock_(common::ObLatchIds::ELECTION_LOCK)
{}
int ObLeaderCoordinator::mtl_init(ObLeaderCoordinator *&p_coordinator)
{
LC_TIME_GUARD(1_s);
return p_coordinator->init_and_start();
}
struct AllLsElectionReferenceInfoFactory
{
static ObArray<LsElectionReferenceInfo> *create_new()
......@@ -49,7 +43,7 @@ struct AllLsElectionReferenceInfoFactory
LC_TIME_GUARD(1_s);
ObArray<LsElectionReferenceInfo> *new_all_ls_election_reference_info = nullptr;
if (nullptr == (new_all_ls_election_reference_info = (ObArray<LsElectionReferenceInfo>*)mtl_malloc(sizeof(ObArray<LsElectionReferenceInfo>), "Coordinator"))) {
COORDINATOR_LOG(ERROR, "alloc memory failed", K(MTL_ID()));
COORDINATOR_LOG(ERROR, "alloc memory failed");
} else {
new(new_all_ls_election_reference_info) ObArray<LsElectionReferenceInfo>();
}
......@@ -64,43 +58,79 @@ struct AllLsElectionReferenceInfoFactory
}
};
int ObLeaderCoordinator::init_and_start()
int ObLeaderCoordinator::mtl_init(ObLeaderCoordinator *&p_coordinator)// init timer
{
LC_TIME_GUARD(1_s);
int ret = OB_SUCCESS;
ObSpinLockGuard guard(lock_);
if(is_inited_) {
ObSpinLockGuard guard(p_coordinator->lock_);
if(p_coordinator->is_running_) {
ret = OB_INIT_TWICE;
COORDINATOR_LOG(ERROR, "has been inited alread]y", KR(ret), K(MTL_ID()));
} else if (CLICK_FAIL(recovery_detect_timer_.init_and_start(1, 1_ms, "CoordTR"))) {
COORDINATOR_LOG(ERROR, "fail to init and start recovery_detect_timer", KR(ret), K(MTL_ID()));
} else if (CLICK_FAIL(failure_detect_timer_.init_and_start(1, 1_ms, "CoordTF"))) {
COORDINATOR_LOG(ERROR, "fail to init and start failure_detect_timer", KR(ret), K(MTL_ID()));
} else if (nullptr == (all_ls_election_reference_info_ = AllLsElectionReferenceInfoFactory::create_new())) {
COORDINATOR_LOG(ERROR, "has been inited alread]y", KR(ret));
} else if (CLICK_FAIL(p_coordinator->recovery_detect_timer_.init_and_start(1, 1_ms, "CoordTR"))) {
COORDINATOR_LOG(ERROR, "fail to init and start recovery_detect_timer", KR(ret));
} else if (CLICK_FAIL(p_coordinator->failure_detect_timer_.init_and_start(1, 1_ms, "CoordTF"))) {
COORDINATOR_LOG(ERROR, "fail to init and start failure_detect_timer", KR(ret));
} else {
COORDINATOR_LOG(INFO, "ObLeaderCoordinator mtl init success", KR(ret));
}
return ret;
}
int ObLeaderCoordinator::mtl_start(ObLeaderCoordinator *&p_coordinator)// start run timer task
{
LC_TIME_GUARD(1_s);
int ret = OB_SUCCESS;
if (nullptr == (p_coordinator->all_ls_election_reference_info_ = AllLsElectionReferenceInfoFactory::create_new())) {
ret = OB_ALLOCATE_MEMORY_FAILED;
COORDINATOR_LOG(ERROR, "fail to new all_ls_election_reference_info_", KR(ret), K(MTL_ID()));
COORDINATOR_LOG(ERROR, "fail to new all_ls_election_reference_info_", KR(ret));
} else {
new(all_ls_election_reference_info_) ObArray<LsElectionReferenceInfo>();
if (CLICK_FAIL(recovery_detect_timer_.schedule_task_ignore_handle_repeat(500_ms,
[this](){ refresh(); return false; }))) {
COORDINATOR_LOG(ERROR, "schedule repeat task failed", KR(ret), K(MTL_ID()));
new(p_coordinator->all_ls_election_reference_info_) ObArray<LsElectionReferenceInfo>();
if (CLICK_FAIL(p_coordinator->recovery_detect_timer_.schedule_task_repeat(
p_coordinator->refresh_priority_task_handle_,
500_ms,
[p_coordinator](){ p_coordinator->refresh(); return false; }))) {
COORDINATOR_LOG(ERROR, "schedule repeat task failed", KR(ret));
} else {
is_inited_ = true;
COORDINATOR_LOG(INFO, "init leader coordinator success", KR(ret), K(MTL_ID()), K(lbt()));
p_coordinator->is_running_ = true;
COORDINATOR_LOG(INFO, "ObLeaderCoordinator mtl start success", KR(ret));
}
}
return ret;
}
void ObLeaderCoordinator::stop_and_wait()
void ObLeaderCoordinator::mtl_stop(ObLeaderCoordinator *&p_coordinator)// stop timer task
{
if (OB_ISNULL(p_coordinator)) {
COORDINATOR_LOG(WARN, "p_coordinator is NULL");
} else {
p_coordinator->is_running_ = false;
p_coordinator->refresh_priority_task_handle_.stop();
COORDINATOR_LOG(INFO, "ObLeaderCoordinator mtl stop");
}
}
void ObLeaderCoordinator::mtl_wait(ObLeaderCoordinator *&p_coordinator)// wait timer task
{
if (OB_ISNULL(p_coordinator)) {
COORDINATOR_LOG(WARN, "p_coordinator is NULL");
} else {
p_coordinator->refresh_priority_task_handle_.wait();
COORDINATOR_LOG(INFO, "ObLeaderCoordinator mtl wait");
}
}
void ObLeaderCoordinator::mtl_destroy(ObLeaderCoordinator *&p_coordinator)// destroy timer
{
LC_TIME_GUARD(1_s);
recovery_detect_timer_.stop_and_wait();
failure_detect_timer_.stop_and_wait();
ObSpinLockGuard guard(lock_);
AllLsElectionReferenceInfoFactory::delete_obj(all_ls_election_reference_info_);
all_ls_election_reference_info_ = NULL;
is_inited_ = false;
if (OB_ISNULL(p_coordinator)) {
COORDINATOR_LOG(WARN, "p_coordinator is NULL");
} else {
p_coordinator->recovery_detect_timer_.stop_and_wait();
p_coordinator->failure_detect_timer_.stop_and_wait();
AllLsElectionReferenceInfoFactory::delete_obj(p_coordinator->all_ls_election_reference_info_);
p_coordinator->all_ls_election_reference_info_ = NULL;
COORDINATOR_LOG(INFO, "ObLeaderCoordinator mtl destroy");
}
}
void ObLeaderCoordinator::refresh()
......@@ -110,11 +140,11 @@ void ObLeaderCoordinator::refresh()
ObArray<LsElectionReferenceInfo> *new_all_ls_election_reference_info = AllLsElectionReferenceInfoFactory::create_new();
if (nullptr == new_all_ls_election_reference_info) {
ret = OB_ALLOCATE_MEMORY_FAILED;
COORDINATOR_LOG(WARN, "alloc new_all_ls_election_reference_info failed", KR(ret), K(MTL_ID()));
COORDINATOR_LOG(WARN, "alloc new_all_ls_election_reference_info failed", KR(ret));
} else if (CLICK_FAIL(TableAccessor::get_all_ls_election_reference_info(*new_all_ls_election_reference_info))) {
COORDINATOR_LOG(WARN, "get all ls election reference info failed", KR(ret), K(MTL_ID()));
COORDINATOR_LOG(WARN, "get all ls election reference info failed", KR(ret));
} else {
COORDINATOR_LOG(INFO, "refresh __all_ls_election_reference_info success", KR(ret), K(MTL_ID()), K(*new_all_ls_election_reference_info));
COORDINATOR_LOG(INFO, "refresh __all_ls_election_reference_info success", KR(ret), K(*new_all_ls_election_reference_info));
ObSpinLockGuard guard(lock_);
std::swap(new_all_ls_election_reference_info, all_ls_election_reference_info_);
}
......@@ -126,16 +156,16 @@ int ObLeaderCoordinator::get_ls_election_reference_info(const share::ObLSID &ls_
LC_TIME_GUARD(1_s);
ObSpinLockGuard guard(lock_);
int ret = OB_SUCCESS;
if (!is_inited_) {
ret = OB_NOT_INIT;
COORDINATOR_LOG(ERROR, "call before init", KR(ret));
if (!is_running_) {
ret = OB_NOT_RUNNING;
COORDINATOR_LOG(WARN, "not running", KR(ret));
} else {
int64_t idx = 0;
for (; idx < all_ls_election_reference_info_->count(); ++idx) {
if (all_ls_election_reference_info_->at(idx).element<0>() == ls_id.id()) {
if (CLICK_FAIL(reference_info.assign(all_ls_election_reference_info_->at(idx)))) {
COORDINATOR_LOG(WARN, "fail to assign reference info",
KR(ret), K(MTL_ID()), KPC_(all_ls_election_reference_info));
KR(ret), KPC_(all_ls_election_reference_info));
}
break;
}
......@@ -143,7 +173,7 @@ int ObLeaderCoordinator::get_ls_election_reference_info(const share::ObLSID &ls_
if (idx == all_ls_election_reference_info_->count()) {
ret = OB_ENTRY_NOT_EXIST;
COORDINATOR_LOG(WARN, "can not find this ls_id in all_ls_election_reference_info_",
KR(ret), K(MTL_ID()), K(ls_id), KPC_(all_ls_election_reference_info));
KR(ret), K(ls_id), KPC_(all_ls_election_reference_info));
}
}
return ret;
......
......@@ -17,6 +17,7 @@
#include "lib/container/ob_array.h"
#include "share/ob_delegate.h"
#include "ob_failure_detector.h"
#include "share/ob_occam_timer.h"
#include "share/ob_table_access_helper.h"
namespace oceanbase
......@@ -48,25 +49,14 @@ class ObLeaderCoordinator
friend class unittest::TestElectionPriority;
public:
ObLeaderCoordinator();
~ObLeaderCoordinator() { destroy(); }
~ObLeaderCoordinator() { }
ObLeaderCoordinator(const ObLeaderCoordinator &rhs) = delete;
ObLeaderCoordinator& operator=(const ObLeaderCoordinator &rhs) = delete;
static int mtl_init(ObLeaderCoordinator *&p_coordinator);
void destroy() { stop_and_wait(); }
/**
* @description: 对内部结构进行初始化
* @param {*}
* @return {*}
* @Date: 2022-01-04 11:18:48
*/
int init_and_start();
/**
* @description: 停止线程池和timer
* @param {*}
* @return {*}
* @Date: 2022-01-04 18:05:17
*/
void stop_and_wait();
static int mtl_start(ObLeaderCoordinator *&p_coordinator);
static void mtl_stop(ObLeaderCoordinator *&p_coordinator);
static void mtl_wait(ObLeaderCoordinator *&p_coordinator);
static void mtl_destroy(ObLeaderCoordinator *&p_coordinator);
/**
* @description: 当内部表更新的时候,可以通过该接口主动触发LeaderCoordinator的刷新流程,以便切主动作可以尽快完成
* @param {*}
......@@ -81,7 +71,8 @@ private:
common::ObOccamTimer recovery_detect_timer_;
// detect whether failure has occured
common::ObOccamTimer failure_detect_timer_;
bool is_inited_;
common::ObOccamTimerTaskRAIIHandle refresh_priority_task_handle_;
bool is_running_;
mutable ObSpinLock lock_;
};
......
......@@ -326,8 +326,8 @@ int ObMultiTenant::init(ObAddr myaddr,
MTL_BIND2(mtl_new_default, rootserver::ObPrimaryLSService::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
MTL_BIND2(mtl_new_default, rootserver::ObRecoveryLSService::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
MTL_BIND2(mtl_new_default, rootserver::ObRestoreService::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
MTL_BIND2(mtl_new_default, coordinator::ObLeaderCoordinator::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
MTL_BIND2(mtl_new_default, coordinator::ObFailureDetector::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
MTL_BIND2(mtl_new_default, coordinator::ObLeaderCoordinator::mtl_init, coordinator::ObLeaderCoordinator::mtl_start, coordinator::ObLeaderCoordinator::mtl_stop, coordinator::ObLeaderCoordinator::mtl_wait, coordinator::ObLeaderCoordinator::mtl_destroy);
MTL_BIND2(mtl_new_default, coordinator::ObFailureDetector::mtl_init, coordinator::ObFailureDetector::mtl_start, coordinator::ObFailureDetector::mtl_stop, coordinator::ObFailureDetector::mtl_wait, coordinator::ObFailureDetector::mtl_destroy);
MTL_BIND2(ObLobManager::mtl_new, mtl_init_default, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
MTL_BIND2(mtl_new_default, ObStorageHAService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
MTL_BIND2(mtl_new_default, ObGlobalAutoIncService::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册