diff --git a/mittest/mtlenv/mock_tenant_module_env.h b/mittest/mtlenv/mock_tenant_module_env.h index a4fc78b0259681b8598012002a55c5bef5ff0186..76425b4670852c205bb374967d09cdd9e9c210d7 100644 --- a/mittest/mtlenv/mock_tenant_module_env.h +++ b/mittest/mtlenv/mock_tenant_module_env.h @@ -657,8 +657,8 @@ int MockTenantModuleEnv::init() MTL_BIND2(mtl_new_default, ObTenantTabletScheduler::mtl_init, nullptr, mtl_stop_default, mtl_wait_default, mtl_destroy_default); MTL_BIND2(mtl_new_default, share::ObTenantDagScheduler::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default); MTL_BIND2(mtl_new_default, ObTenantCheckpointSlogHandler::mtl_init, nullptr, mtl_stop_default, mtl_wait_default, mtl_destroy_default); - MTL_BIND2(mtl_new_default, coordinator::ObLeaderCoordinator::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default); - MTL_BIND2(mtl_new_default, coordinator::ObFailureDetector::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default); + MTL_BIND2(mtl_new_default, coordinator::ObLeaderCoordinator::mtl_init, coordinator::ObLeaderCoordinator::mtl_start, coordinator::ObLeaderCoordinator::mtl_stop, coordinator::ObLeaderCoordinator::mtl_wait, coordinator::ObLeaderCoordinator::mtl_destroy); + MTL_BIND2(mtl_new_default, coordinator::ObFailureDetector::mtl_init, coordinator::ObFailureDetector::mtl_start, coordinator::ObFailureDetector::mtl_stop, coordinator::ObFailureDetector::mtl_wait, coordinator::ObFailureDetector::mtl_destroy); MTL_BIND2(ObLobManager::mtl_new, mtl_init_default, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); MTL_BIND2(mtl_new_default, share::detector::ObDeadLockDetectorMgr::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default); MTL_BIND2(mtl_new_default, storage::ObTenantTabletStatMgr::mtl_init, nullptr, mtl_stop_default, mtl_wait_default, mtl_destroy_default) diff --git a/src/logservice/leader_coordinator/failure_event.h b/src/logservice/leader_coordinator/failure_event.h index 8100880ea642dacbdd650bc033aa8e6c8e544210..deef055993df22bcbad4defaa3dab6f392678f2d 100644 --- a/src/logservice/leader_coordinator/failure_event.h +++ b/src/logservice/leader_coordinator/failure_event.h @@ -139,20 +139,7 @@ public: bool operator==(const FailureEvent &rhs) { bool ret = false; if (type_ == rhs.type_ && module_ == rhs.module_ && level_ == rhs.level_) { - const char * p1 = info_.get_ob_string().ptr(); - const char * p2 = rhs.info_.get_ob_string().ptr(); - if (p1 == p2) {// 包含p1 == p2 == nullptr - ret = true; - } else if (p1 == nullptr || p2 == nullptr) { - ret = false; - } else {// p1和p2均不为空 - while (*p1 != '\0' && *p2 != '\0' && (*(p1++) == *(p2++))); - if (*p1 == '\0' || *p2 == '\0') { - ret = true; - } else { - ret = false; - } - } + ret = (0 == info_.get_ob_string().case_compare(rhs.info_.get_ob_string())); } return ret; } diff --git a/src/logservice/leader_coordinator/ob_failure_detector.cpp b/src/logservice/leader_coordinator/ob_failure_detector.cpp index f9c773d44f33b6a4742a5e9bf5c158712b5fa9cd..9a468c1b6441991fd6db264c28094d5091b50247 100644 --- a/src/logservice/leader_coordinator/ob_failure_detector.cpp +++ b/src/logservice/leader_coordinator/ob_failure_detector.cpp @@ -41,47 +41,90 @@ namespace coordinator using namespace common; ObFailureDetector::ObFailureDetector() - : coordinator_(nullptr), + : is_running_(false), + coordinator_(nullptr), has_add_clog_hang_event_(false), has_add_slog_hang_event_(false), has_add_sstable_hang_event_(false), has_add_clog_full_event_(false), lock_(common::ObLatchIds::ELECTION_LOCK) -{} +{ + COORDINATOR_LOG(INFO, "ObFailureDetector constructed"); +} constexpr int VALUE_BUFFER_SIZE = 512; -int ObFailureDetector::init(ObLeaderCoordinator *coordinator) +int ObFailureDetector::mtl_init(ObFailureDetector *&p_failure_detector) { - LC_TIME_GUARD(1_s); int ret = OB_SUCCESS; - event_.reset(); - recover_detect_operation_.reset(); - if (OB_NOT_NULL(coordinator_)) { - ret = OB_INIT_TWICE; - COORDINATOR_LOG(ERROR, "has been inited", KR(ret), K(MTL_ID()), KP_(coordinator)); - } else if (OB_ISNULL(coordinator)) { + ObLeaderCoordinator *coordinator = MTL(ObLeaderCoordinator *); + if (OB_ISNULL(coordinator)) { ret = OB_INVALID_ARGUMENT; - COORDINATOR_LOG(ERROR, "coordinator is nullptr", KR(ret), K(MTL_ID())); + COORDINATOR_LOG(ERROR, "coordinator is nullptr", KR(ret)); } else { - coordinator_ = coordinator; - if (CLICK_FAIL(coordinator_->failure_detect_timer_.schedule_task_repeat(failure_task_handle_, 100_ms, [this]() { - detect_failure(); - return false; - }))) { - COORDINATOR_LOG(ERROR, "fail to schedule failure detect task", KR(ret), K(MTL_ID())); - } else if (CLICK_FAIL(coordinator_->recovery_detect_timer_.schedule_task_repeat(recovery_task_handle_, 1_s, [this]() { - detect_recover(); - return false; - }))) { - COORDINATOR_LOG(ERROR, "fail to schedule recovery detect task", KR(ret), K(MTL_ID())); - } else { - COORDINATOR_LOG(INFO, "failure detector init success", KR(ret), K(MTL_ID()), K(lbt())); - } + p_failure_detector->coordinator_ = coordinator; + COORDINATOR_LOG(INFO, "ObFailureDetector mtl init"); + } + return ret; +} + +int ObFailureDetector::mtl_start(ObFailureDetector *&p_failure_detector) +{ + LC_TIME_GUARD(1_s); + int ret = OB_SUCCESS; + p_failure_detector->events_with_ops_.reset(); + + if (OB_ISNULL(p_failure_detector->coordinator_)) { + ret = OB_NOT_INIT; + COORDINATOR_LOG(ERROR, "not init yet", KR(ret), KP_(p_failure_detector->coordinator)); + } else if (CLICK_FAIL(p_failure_detector->coordinator_->failure_detect_timer_.schedule_task_repeat(p_failure_detector->failure_task_handle_, 100_ms, [p_failure_detector]() { + p_failure_detector->detect_failure(); + return false; + }))) { + COORDINATOR_LOG(ERROR, "fail to schedule failure detect task", KR(ret)); + } else if (CLICK_FAIL(p_failure_detector->coordinator_->recovery_detect_timer_.schedule_task_repeat(p_failure_detector->recovery_task_handle_, 1_s, [p_failure_detector]() { + p_failure_detector->detect_recover(); + return false; + }))) { + COORDINATOR_LOG(ERROR, "fail to schedule recovery detect task", KR(ret)); + } else { + p_failure_detector->is_running_ = true; + COORDINATOR_LOG(INFO, "ObFailureDetector mtl start"); } return ret; } +void ObFailureDetector::mtl_stop(ObFailureDetector *&p_failure_detector) +{ + if (OB_ISNULL(p_failure_detector)) { + COORDINATOR_LOG(WARN, "p_failure_detector is NULL"); + } else { + p_failure_detector->failure_task_handle_.stop(); + p_failure_detector->recovery_task_handle_.stop(); + COORDINATOR_LOG(INFO, "ObFailureDetector mtl init"); + } +} + +void ObFailureDetector::mtl_wait(ObFailureDetector *&p_failure_detector) +{ + if (OB_ISNULL(p_failure_detector)) { + COORDINATOR_LOG(WARN, "p_failure_detector is NULL"); + } else { + p_failure_detector->failure_task_handle_.wait(); + p_failure_detector->recovery_task_handle_.wait(); + COORDINATOR_LOG(INFO, "ObFailureDetector mtl init"); + } +} + +void ObFailureDetector::mtl_destroy(ObFailureDetector *&p_failure_detector) +{ + if (OB_ISNULL(p_failure_detector)) { + COORDINATOR_LOG(WARN, "p_failure_detector is NULL"); + } else { + COORDINATOR_LOG(INFO, "ObFailureDetector mtl destroy"); + } +} + void ObFailureDetector::destroy() { LC_TIME_GUARD(1_s); @@ -91,64 +134,31 @@ void ObFailureDetector::destroy() has_add_slog_hang_event_ = false; has_add_sstable_hang_event_ = false; has_add_clog_full_event_ = false; - COORDINATOR_LOG(INFO, "failure detector destroyed", K(MTL_ID()), K(lbt())); -} - -int ObFailureDetector::mtl_init(ObFailureDetector *&p_failure_detector) -{ - LC_TIME_GUARD(1_s); - int ret = OB_SUCCESS; - ObLeaderCoordinator *p_coordinator = MTL(ObLeaderCoordinator *); - if (OB_ISNULL(p_coordinator)) { - ret = OB_INVALID_ARGUMENT; - COORDINATOR_LOG(ERROR, "invalid argument", KR(ret), K(MTL_ID())); - } else if (CLICK_FAIL(p_failure_detector->init(p_coordinator))) { - COORDINATOR_LOG(ERROR, "init failure detector failed", KR(ret), K(MTL_ID())); - } - return ret; + COORDINATOR_LOG(INFO, "failure detector destroyed", K(lbt())); } void ObFailureDetector::detect_recover() { LC_TIME_GUARD(1_s); int ret = OB_SUCCESS; + ObArray temp_events_with_ops; ObSpinLockGuard guard(lock_); - ObArray temp_event_; - ObArray> temp_recover_detect_operation_; - if (!event_.empty()) { - COORDINATOR_LOG(INFO, "doing detect revocer operation", K(MTL_ID()), K_(event), K_(recover_detect_operation)); + if (!events_with_ops_.empty()) { + COORDINATOR_LOG(INFO, "doing detect recover operation", K_(events_with_ops)); } - for (int64_t idx = 0; idx < event_.count() && OB_SUCC(ret); ++idx) { - if (recover_detect_operation_[idx].is_valid()) { + for (int64_t idx = 0; idx < events_with_ops_.count() && OB_SUCC(ret); ++idx) { + if (events_with_ops_[idx].recover_detect_operation_.is_valid()) { LC_TIME_GUARD(10_ms); - bool recover_flag = recover_detect_operation_[idx](); + bool recover_flag = events_with_ops_[idx].recover_detect_operation_(); if (recover_flag) { - COORDINATOR_LOG(INFO, "revocer event detected", K(MTL_ID()), K(event_[idx])); - (void) insert_event_to_table_(event_[idx], recover_detect_operation_[idx], "DETECT REVOCER"); - } else { - if (CLICK_FAIL(temp_event_.push_back(event_[idx]))) { - COORDINATOR_LOG(WARN, "fail to push event to temp_event_", KR(ret), K(MTL_ID()), K(event_[idx])); - } else if (CLICK_FAIL(temp_recover_detect_operation_.push_back(recover_detect_operation_[idx]))) { - temp_event_.pop_back(); - COORDINATOR_LOG(WARN, "fail to push detect operation to temp_recover_detect_operation_", KR(ret), K(MTL_ID()), K(event_[idx])); + COORDINATOR_LOG(INFO, "revocer event detected", K(events_with_ops_[idx])); + if (CLICK_FAIL(events_with_ops_.remove(idx))) { + COORDINATOR_LOG(WARN, "fail to remove", K(events_with_ops_[idx])); + } else { + (void) insert_event_to_table_(events_with_ops_[idx].event_, events_with_ops_[idx].recover_detect_operation_, "DETECT REVOCER"); + --idx;// next loop is still access to idx } } - } else { - if (CLICK_FAIL(temp_event_.push_back(event_[idx]))) { - COORDINATOR_LOG(WARN, "fail to push event to temp_event_", KR(ret), K(MTL_ID()), K(event_[idx])); - } else if (CLICK_FAIL(temp_recover_detect_operation_.push_back(recover_detect_operation_[idx]))) { - temp_event_.pop_back(); - COORDINATOR_LOG(WARN, "fail to push detect operation to temp_recover_detect_operation_", KR(ret), K(MTL_ID()), K(event_[idx])); - } - } - } - if (OB_SUCC(ret)) { - if (temp_event_.count() != event_.count()) { - if (CLICK_FAIL(event_.assign(temp_event_))) { - COORDINATOR_LOG(WARN, "replace event array failed", KR(ret), K(MTL_ID())); - } else if (CLICK_FAIL(recover_detect_operation_.assign(temp_recover_detect_operation_))) { - COORDINATOR_LOG(WARN, "replace recover detect operation array failed", KR(ret), K(MTL_ID())); - } } } } @@ -171,23 +181,28 @@ int ObFailureDetector::add_failure_event(const FailureEvent &event) LC_TIME_GUARD(1_s); int ret = OB_SUCCESS; int64_t idx = 0; + FailureEventWithRecoverOp event_with_op; ObSpinLockGuard guard(lock_); - for (; idx < event_.count(); ++idx) { - if (event_[idx] == event) { - break; - } - } - if (idx != event_.count()) { - ret = OB_ENTRY_EXIST; - COORDINATOR_LOG(WARN, "this failure event has been exist", KR(ret), K(MTL_ID()), K(event), K(event_)); - } else if (CLICK_FAIL(event_.push_back(event))) { - COORDINATOR_LOG(WARN, "fail to push event to failure detector", KR(ret), K(MTL_ID()), K(event), K(event_)); - } else if (CLICK_FAIL(recover_detect_operation_.push_back(ObFunction()))) { - COORDINATOR_LOG(WARN, "fail to push default recover operation to failure detector", KR(ret), K(MTL_ID()), K(event), K(event_)); - event_.pop_back(); + if (!check_is_running_()) { + ret = OB_NOT_RUNNING; + COORDINATOR_LOG(WARN, "not running", KR(ret), K(event), K(events_with_ops_)); } else { - COORDINATOR_LOG(INFO, "success report a failure event without recover detect operation", KR(ret), K(MTL_ID()), K(event), K(event_)); - (void) insert_event_to_table_(event, ObFunction(), event.info_.get_ob_string()); + for (; idx < events_with_ops_.count(); ++idx) { + if (events_with_ops_[idx].event_ == event) { + break; + } + } + if (idx != events_with_ops_.count()) { + ret = OB_ENTRY_EXIST; + COORDINATOR_LOG(WARN, "this failure event has been exist", KR(ret), K(event), K(events_with_ops_)); + } else if (CLICK_FAIL(event_with_op.init(event, ObFunction()))) { + COORDINATOR_LOG(WARN, "fail to init event with op", KR(ret), K(event), K(events_with_ops_)); + } else if (CLICK_FAIL(events_with_ops_.push_back(event_with_op))) { + COORDINATOR_LOG(WARN, "fail to push", KR(ret), K(event), K(events_with_ops_)); + } else { + COORDINATOR_LOG(INFO, "success report a failure event without recover detect operation", KR(ret), K(event), K(events_with_ops_)); + (void) insert_event_to_table_(event, ObFunction(), event.info_.get_ob_string()); + } } return ret; } @@ -197,23 +212,28 @@ int ObFailureDetector::add_failure_event(const FailureEvent &event, const ObFunc LC_TIME_GUARD(1_s); int ret = OB_SUCCESS; int64_t idx = 0; + FailureEventWithRecoverOp event_with_op; ObSpinLockGuard guard(lock_); - for (; idx < event_.count(); ++idx) { - if (event_[idx] == event) { - break; - } - } - if (idx != event_.count()) { - ret = OB_ENTRY_EXIST; - COORDINATOR_LOG(WARN, "this failure event has been exist", KR(ret), K(MTL_ID()), K(event), K(event_)); - } else if (CLICK_FAIL(event_.push_back(event))) { - COORDINATOR_LOG(WARN, "fail to push event to failure detector", KR(ret), K(MTL_ID()), K(event), K(event_)); - } else if (CLICK_FAIL(recover_detect_operation_.push_back(recover_detect_operation))) { - COORDINATOR_LOG(INFO, "fail to push recover operation to failure detector", KR(ret), K(MTL_ID()), K(event), K(event_)); - event_.pop_back(); + if (!check_is_running_()) { + ret = OB_NOT_RUNNING; + COORDINATOR_LOG(WARN, "not running", KR(ret), K(event), K(events_with_ops_)); } else { - COORDINATOR_LOG(INFO, "success report a failure event with recover detect operation", KR(ret), K(MTL_ID()), K(event), K(event_)); - (void) insert_event_to_table_(event, recover_detect_operation, event.info_.get_ob_string()); + for (; idx < events_with_ops_.count(); ++idx) { + if (events_with_ops_[idx].event_ == event) { + break; + } + } + if (idx != events_with_ops_.count()) { + ret = OB_ENTRY_EXIST; + COORDINATOR_LOG(WARN, "this failure event has been exist", KR(ret), K(event), K(events_with_ops_)); + } else if (CLICK_FAIL(event_with_op.init(event, recover_detect_operation))) { + COORDINATOR_LOG(WARN, "fail to init event with op", KR(ret), K(event), K(events_with_ops_)); + } else if (CLICK_FAIL(events_with_ops_.push_back(event_with_op))) { + COORDINATOR_LOG(WARN, "fail to push", KR(ret), K(event), K(events_with_ops_)); + } else { + COORDINATOR_LOG(INFO, "success report a failure event with recover detect operation", KR(ret), K(event), K(events_with_ops_)); + (void) insert_event_to_table_(event, recover_detect_operation, event.info_.get_ob_string()); + } } return ret; } @@ -224,64 +244,74 @@ int ObFailureDetector::remove_failure_event(const FailureEvent &event) int ret = OB_SUCCESS; int64_t idx = 0; ObSpinLockGuard guard(lock_); - for (; idx < event_.count(); ++idx) { - if (event_[idx] == event) { - break; - } - } - if (idx == event_.count()) { - ret = OB_ENTRY_NOT_EXIST; - COORDINATOR_LOG(WARN, "this failure event not exist", KR(ret), K(MTL_ID()), K(event), K(event_)); + if (!check_is_running_()) { + ret = OB_NOT_RUNNING; + COORDINATOR_LOG(WARN, "not running", KR(ret), K(event), K(events_with_ops_)); } else { - (void) insert_event_to_table_(event_[idx], recover_detect_operation_[idx], "REMOVE FAILURE"); - if (CLICK_FAIL(event_.remove(idx))) { - COORDINATOR_LOG(WARN, "remove event failed", KR(ret), K(MTL_ID()), K(event), K(event_)); - } else if (CLICK_FAIL(recover_detect_operation_.remove(idx))) { - COORDINATOR_LOG(WARN, "remove operation failed", KR(ret), K(MTL_ID()), K(event), K(event_)); + for (; idx < events_with_ops_.count(); ++idx) { + if (events_with_ops_[idx].event_ == event) { + break; + } + } + if (idx == events_with_ops_.count()) { + ret = OB_ENTRY_NOT_EXIST; + COORDINATOR_LOG(WARN, "this failure event not exist", KR(ret), K(event), K(events_with_ops_)); } else { - COORDINATOR_LOG(INFO, "user remove failure event success", KR(ret), K(MTL_ID()), K(event), K(event_)); + (void) insert_event_to_table_(events_with_ops_[idx].event_, events_with_ops_[idx].recover_detect_operation_, "REMOVE FAILURE"); + if (CLICK_FAIL(events_with_ops_.remove(idx))) { + COORDINATOR_LOG(WARN, "remove event failed", KR(ret), K(event), K(events_with_ops_)); + } else if (CLICK_FAIL(events_with_ops_.remove(idx))) { + COORDINATOR_LOG(ERROR, "remove event failed", KR(ret), K(event), K(events_with_ops_)); + } else { + COORDINATOR_LOG(INFO, "user remove failure event success", KR(ret), K(event), K(events_with_ops_)); + } } } return ret; } -int ObFailureDetector::insert_event_to_table_(const FailureEvent &event, const ObFunction &recover_operation, ObString info) +int ObFailureDetector::get_specified_level_event(FailureLevel level, ObIArray &results) { LC_TIME_GUARD(1_s); - #define PRINT_WRAPPER KR(ret), K(MTL_ID()), K(event), K(recover_operation) + #define PRINT_WRAPPER KR(ret), K(level), K(results) int ret = OB_SUCCESS; - if (CLICK_FAIL(SERVER_EVENT_ADD("FAILURE_DETECTOR", - common::to_cstring(info), - "FAILURE_MODULE", - obj_to_cstring(event.module_), - "FAILURE_TYPE", - obj_to_cstring(event.type_), - "AUTO_RECOVER", - common::to_cstring(recover_operation.is_valid())))) { - COORDINATOR_LOG_(WARN, "insert into __all_server_event_history failed"); + ObSpinLockGuard guard(lock_); + if (!check_is_running_()) { + ret = OB_NOT_RUNNING; + COORDINATOR_LOG(WARN, "not running", KR(ret), K(events_with_ops_)); } else { - COORDINATOR_LOG_(INFO, "insert into __all_server_event_history success"); + results.reset(); + for (int64_t idx = 0; idx < events_with_ops_.count(); ++idx) { + if (events_with_ops_.at(idx).event_.get_failure_level() == level) { + if (CLICK_FAIL(results.push_back(events_with_ops_.at(idx).event_))) { + COORDINATOR_LOG_(WARN, "fail to push back event to results"); + } + } + } + if (CLICK_FAIL(ret)) { + COORDINATOR_LOG_(WARN, "fail to get specified level failure event"); + } } return ret; #undef PRINT_WRAPPER } -int ObFailureDetector::get_specified_level_event(FailureLevel level, ObIArray &results) +int ObFailureDetector::insert_event_to_table_(const FailureEvent &event, const ObFunction &recover_operation, ObString info) { LC_TIME_GUARD(1_s); - #define PRINT_WRAPPER KR(ret), K(MTL_ID()), K(level), K(results) + #define PRINT_WRAPPER KR(ret), K(event), K(recover_operation) int ret = OB_SUCCESS; - ObSpinLockGuard guard(lock_); - results.reset(); - for (int64_t idx = 0; idx < event_.count(); ++idx) { - if (event_.at(idx).get_failure_level() == level) { - if (CLICK_FAIL(results.push_back(event_.at(idx)))) { - COORDINATOR_LOG_(WARN, "fail to push back event to results"); - } - } - } - if (CLICK_FAIL(ret)) { - COORDINATOR_LOG_(WARN, "fail to get specified level failure event"); + if (CLICK_FAIL(SERVER_EVENT_ADD("FAILURE_DETECTOR", + common::to_cstring(info), + "FAILURE_MODULE", + obj_to_cstring(event.module_), + "FAILURE_TYPE", + obj_to_cstring(event.type_), + "AUTO_RECOVER", + common::to_cstring(recover_operation.is_valid())))) { + COORDINATOR_LOG_(WARN, "insert into __all_server_event_history failed"); + } else { + COORDINATOR_LOG_(INFO, "insert into __all_server_event_history success"); } return ret; #undef PRINT_WRAPPER @@ -442,6 +472,24 @@ void ObFailureDetector::detect_palf_disk_full_() } } +int ObFailureDetector::FailureEventWithRecoverOp::init(const FailureEvent &event, + const ObFunction &recover_detect_operation) +{ + LC_TIME_GUARD(1_s); + int ret = OB_SUCCESS; + if (CLICK_FAIL(event_.assign(event))) { + COORDINATOR_LOG(WARN, "fail to assign event", K(ret)); + } else if (CLICK_FAIL(recover_detect_operation_.assign(recover_detect_operation))) { + COORDINATOR_LOG(WARN, "fail to assign op", K(ret)); + } + return ret; +} + +int ObFailureDetector::FailureEventWithRecoverOp::assign(const FailureEventWithRecoverOp &rhs) +{ + return init(rhs.event_, rhs.recover_detect_operation_); +} + } } } diff --git a/src/logservice/leader_coordinator/ob_failure_detector.h b/src/logservice/leader_coordinator/ob_failure_detector.h index fcbe32c4dcc9fa044c543ae8a8e1b94105331da8..d2daa297885a30bbd2a6441212194ea5c9771603 100644 --- a/src/logservice/leader_coordinator/ob_failure_detector.h +++ b/src/logservice/leader_coordinator/ob_failure_detector.h @@ -57,6 +57,10 @@ public: int init(ObLeaderCoordinator *coordinator); void destroy(); static int mtl_init(ObFailureDetector *&p_failure_detector); + static int mtl_start(ObFailureDetector *&p_failure_detector); + static void mtl_stop(ObFailureDetector *&p_failure_detector); + static void mtl_wait(ObFailureDetector *&p_failure_detector); + static void mtl_destroy(ObFailureDetector *&p_failure_detector); /** * @description: 设置一个不可自动恢复的failure,需要由注册的模块手动调用remove_failure_event()接口恢复failure,否则将持续存在 * @param {FailureEvent} event failure事件,定义在failure_event.h中 @@ -94,27 +98,28 @@ public: * @return {*} */ void detect_failure(); - /** - * @description: 定期探测与租户下的其他副本的网络连接是否正常 - * @param {*} - * @return {*} - * @Date: 2022-01-04 21:12:16 - */ - void detect_connection_status(); bool is_clog_disk_has_fatal_error(); bool is_data_disk_has_fatal_error(); private: + bool check_is_running_() const { return is_running_; } int insert_event_to_table_(const FailureEvent &event, const ObFunction &recover_operation, ObString info); void detect_palf_hang_failure_(); void detect_slog_writter_hang_failure_(); void detect_sstable_io_failure_(); void detect_palf_disk_full_(); private: + struct FailureEventWithRecoverOp { + int init(const FailureEvent &event, const ObFunction &recover_detect_operation); + int assign(const FailureEventWithRecoverOp &); + FailureEvent event_; + ObFunction recover_detect_operation_; + TO_STRING_KV(K_(event)); + }; // default time threshold for clog disk io hang detection is 5s // TODO: this value should be a configuration static const int64_t IO_HANG_TIME_THRESHOLD_US = 5 * 1000 * 1000; - common::ObArray event_; - common::ObArray> recover_detect_operation_; + bool is_running_; + common::ObArray events_with_ops_; common::ObArray tenant_server_list_; common::ObOccamTimerTaskRAIIHandle failure_task_handle_; common::ObOccamTimerTaskRAIIHandle recovery_task_handle_; diff --git a/src/logservice/leader_coordinator/ob_leader_coordinator.cpp b/src/logservice/leader_coordinator/ob_leader_coordinator.cpp index 0f5b3f8998cdba28662a11b9251035222a66a163..b6fb440c161992dd2f34a9d8c94a5b65f0396d93 100644 --- a/src/logservice/leader_coordinator/ob_leader_coordinator.cpp +++ b/src/logservice/leader_coordinator/ob_leader_coordinator.cpp @@ -32,16 +32,10 @@ using namespace share; ObLeaderCoordinator::ObLeaderCoordinator() : all_ls_election_reference_info_(nullptr), - is_inited_(false), + is_running_(false), lock_(common::ObLatchIds::ELECTION_LOCK) {} -int ObLeaderCoordinator::mtl_init(ObLeaderCoordinator *&p_coordinator) -{ - LC_TIME_GUARD(1_s); - return p_coordinator->init_and_start(); -} - struct AllLsElectionReferenceInfoFactory { static ObArray *create_new() @@ -49,7 +43,7 @@ struct AllLsElectionReferenceInfoFactory LC_TIME_GUARD(1_s); ObArray *new_all_ls_election_reference_info = nullptr; if (nullptr == (new_all_ls_election_reference_info = (ObArray*)mtl_malloc(sizeof(ObArray), "Coordinator"))) { - COORDINATOR_LOG(ERROR, "alloc memory failed", K(MTL_ID())); + COORDINATOR_LOG(ERROR, "alloc memory failed"); } else { new(new_all_ls_election_reference_info) ObArray(); } @@ -64,43 +58,79 @@ struct AllLsElectionReferenceInfoFactory } }; -int ObLeaderCoordinator::init_and_start() +int ObLeaderCoordinator::mtl_init(ObLeaderCoordinator *&p_coordinator)// init timer { LC_TIME_GUARD(1_s); int ret = OB_SUCCESS; - ObSpinLockGuard guard(lock_); - if(is_inited_) { + ObSpinLockGuard guard(p_coordinator->lock_); + if(p_coordinator->is_running_) { ret = OB_INIT_TWICE; - COORDINATOR_LOG(ERROR, "has been inited alread]y", KR(ret), K(MTL_ID())); - } else if (CLICK_FAIL(recovery_detect_timer_.init_and_start(1, 1_ms, "CoordTR"))) { - COORDINATOR_LOG(ERROR, "fail to init and start recovery_detect_timer", KR(ret), K(MTL_ID())); - } else if (CLICK_FAIL(failure_detect_timer_.init_and_start(1, 1_ms, "CoordTF"))) { - COORDINATOR_LOG(ERROR, "fail to init and start failure_detect_timer", KR(ret), K(MTL_ID())); - } else if (nullptr == (all_ls_election_reference_info_ = AllLsElectionReferenceInfoFactory::create_new())) { + COORDINATOR_LOG(ERROR, "has been inited alread]y", KR(ret)); + } else if (CLICK_FAIL(p_coordinator->recovery_detect_timer_.init_and_start(1, 1_ms, "CoordTR"))) { + COORDINATOR_LOG(ERROR, "fail to init and start recovery_detect_timer", KR(ret)); + } else if (CLICK_FAIL(p_coordinator->failure_detect_timer_.init_and_start(1, 1_ms, "CoordTF"))) { + COORDINATOR_LOG(ERROR, "fail to init and start failure_detect_timer", KR(ret)); + } else { + COORDINATOR_LOG(INFO, "ObLeaderCoordinator mtl init success", KR(ret)); + } + return ret; +} + +int ObLeaderCoordinator::mtl_start(ObLeaderCoordinator *&p_coordinator)// start run timer task +{ + LC_TIME_GUARD(1_s); + int ret = OB_SUCCESS; + if (nullptr == (p_coordinator->all_ls_election_reference_info_ = AllLsElectionReferenceInfoFactory::create_new())) { ret = OB_ALLOCATE_MEMORY_FAILED; - COORDINATOR_LOG(ERROR, "fail to new all_ls_election_reference_info_", KR(ret), K(MTL_ID())); + COORDINATOR_LOG(ERROR, "fail to new all_ls_election_reference_info_", KR(ret)); } else { - new(all_ls_election_reference_info_) ObArray(); - if (CLICK_FAIL(recovery_detect_timer_.schedule_task_ignore_handle_repeat(500_ms, - [this](){ refresh(); return false; }))) { - COORDINATOR_LOG(ERROR, "schedule repeat task failed", KR(ret), K(MTL_ID())); + new(p_coordinator->all_ls_election_reference_info_) ObArray(); + if (CLICK_FAIL(p_coordinator->recovery_detect_timer_.schedule_task_repeat( + p_coordinator->refresh_priority_task_handle_, + 500_ms, + [p_coordinator](){ p_coordinator->refresh(); return false; }))) { + COORDINATOR_LOG(ERROR, "schedule repeat task failed", KR(ret)); } else { - is_inited_ = true; - COORDINATOR_LOG(INFO, "init leader coordinator success", KR(ret), K(MTL_ID()), K(lbt())); + p_coordinator->is_running_ = true; + COORDINATOR_LOG(INFO, "ObLeaderCoordinator mtl start success", KR(ret)); } } return ret; } -void ObLeaderCoordinator::stop_and_wait() +void ObLeaderCoordinator::mtl_stop(ObLeaderCoordinator *&p_coordinator)// stop timer task +{ + if (OB_ISNULL(p_coordinator)) { + COORDINATOR_LOG(WARN, "p_coordinator is NULL"); + } else { + p_coordinator->is_running_ = false; + p_coordinator->refresh_priority_task_handle_.stop(); + COORDINATOR_LOG(INFO, "ObLeaderCoordinator mtl stop"); + } +} + +void ObLeaderCoordinator::mtl_wait(ObLeaderCoordinator *&p_coordinator)// wait timer task +{ + if (OB_ISNULL(p_coordinator)) { + COORDINATOR_LOG(WARN, "p_coordinator is NULL"); + } else { + p_coordinator->refresh_priority_task_handle_.wait(); + COORDINATOR_LOG(INFO, "ObLeaderCoordinator mtl wait"); + } +} + +void ObLeaderCoordinator::mtl_destroy(ObLeaderCoordinator *&p_coordinator)// destroy timer { LC_TIME_GUARD(1_s); - recovery_detect_timer_.stop_and_wait(); - failure_detect_timer_.stop_and_wait(); - ObSpinLockGuard guard(lock_); - AllLsElectionReferenceInfoFactory::delete_obj(all_ls_election_reference_info_); - all_ls_election_reference_info_ = NULL; - is_inited_ = false; + if (OB_ISNULL(p_coordinator)) { + COORDINATOR_LOG(WARN, "p_coordinator is NULL"); + } else { + p_coordinator->recovery_detect_timer_.stop_and_wait(); + p_coordinator->failure_detect_timer_.stop_and_wait(); + AllLsElectionReferenceInfoFactory::delete_obj(p_coordinator->all_ls_election_reference_info_); + p_coordinator->all_ls_election_reference_info_ = NULL; + COORDINATOR_LOG(INFO, "ObLeaderCoordinator mtl destroy"); + } } void ObLeaderCoordinator::refresh() @@ -110,11 +140,11 @@ void ObLeaderCoordinator::refresh() ObArray *new_all_ls_election_reference_info = AllLsElectionReferenceInfoFactory::create_new(); if (nullptr == new_all_ls_election_reference_info) { ret = OB_ALLOCATE_MEMORY_FAILED; - COORDINATOR_LOG(WARN, "alloc new_all_ls_election_reference_info failed", KR(ret), K(MTL_ID())); + COORDINATOR_LOG(WARN, "alloc new_all_ls_election_reference_info failed", KR(ret)); } else if (CLICK_FAIL(TableAccessor::get_all_ls_election_reference_info(*new_all_ls_election_reference_info))) { - COORDINATOR_LOG(WARN, "get all ls election reference info failed", KR(ret), K(MTL_ID())); + COORDINATOR_LOG(WARN, "get all ls election reference info failed", KR(ret)); } else { - COORDINATOR_LOG(INFO, "refresh __all_ls_election_reference_info success", KR(ret), K(MTL_ID()), K(*new_all_ls_election_reference_info)); + COORDINATOR_LOG(INFO, "refresh __all_ls_election_reference_info success", KR(ret), K(*new_all_ls_election_reference_info)); ObSpinLockGuard guard(lock_); std::swap(new_all_ls_election_reference_info, all_ls_election_reference_info_); } @@ -126,16 +156,16 @@ int ObLeaderCoordinator::get_ls_election_reference_info(const share::ObLSID &ls_ LC_TIME_GUARD(1_s); ObSpinLockGuard guard(lock_); int ret = OB_SUCCESS; - if (!is_inited_) { - ret = OB_NOT_INIT; - COORDINATOR_LOG(ERROR, "call before init", KR(ret)); + if (!is_running_) { + ret = OB_NOT_RUNNING; + COORDINATOR_LOG(WARN, "not running", KR(ret)); } else { int64_t idx = 0; for (; idx < all_ls_election_reference_info_->count(); ++idx) { if (all_ls_election_reference_info_->at(idx).element<0>() == ls_id.id()) { if (CLICK_FAIL(reference_info.assign(all_ls_election_reference_info_->at(idx)))) { COORDINATOR_LOG(WARN, "fail to assign reference info", - KR(ret), K(MTL_ID()), KPC_(all_ls_election_reference_info)); + KR(ret), KPC_(all_ls_election_reference_info)); } break; } @@ -143,7 +173,7 @@ int ObLeaderCoordinator::get_ls_election_reference_info(const share::ObLSID &ls_ if (idx == all_ls_election_reference_info_->count()) { ret = OB_ENTRY_NOT_EXIST; COORDINATOR_LOG(WARN, "can not find this ls_id in all_ls_election_reference_info_", - KR(ret), K(MTL_ID()), K(ls_id), KPC_(all_ls_election_reference_info)); + KR(ret), K(ls_id), KPC_(all_ls_election_reference_info)); } } return ret; diff --git a/src/logservice/leader_coordinator/ob_leader_coordinator.h b/src/logservice/leader_coordinator/ob_leader_coordinator.h index 9968eaf4e917012e01076bad24cf37606b0a5a06..6dd57ab40ca64da9b01cd492246029aa82dad2dc 100644 --- a/src/logservice/leader_coordinator/ob_leader_coordinator.h +++ b/src/logservice/leader_coordinator/ob_leader_coordinator.h @@ -17,6 +17,7 @@ #include "lib/container/ob_array.h" #include "share/ob_delegate.h" #include "ob_failure_detector.h" +#include "share/ob_occam_timer.h" #include "share/ob_table_access_helper.h" namespace oceanbase @@ -48,25 +49,14 @@ class ObLeaderCoordinator friend class unittest::TestElectionPriority; public: ObLeaderCoordinator(); - ~ObLeaderCoordinator() { destroy(); } + ~ObLeaderCoordinator() { } ObLeaderCoordinator(const ObLeaderCoordinator &rhs) = delete; ObLeaderCoordinator& operator=(const ObLeaderCoordinator &rhs) = delete; static int mtl_init(ObLeaderCoordinator *&p_coordinator); - void destroy() { stop_and_wait(); } - /** - * @description: 对内部结构进行初始化 - * @param {*} - * @return {*} - * @Date: 2022-01-04 11:18:48 - */ - int init_and_start(); - /** - * @description: 停止线程池和timer - * @param {*} - * @return {*} - * @Date: 2022-01-04 18:05:17 - */ - void stop_and_wait(); + static int mtl_start(ObLeaderCoordinator *&p_coordinator); + static void mtl_stop(ObLeaderCoordinator *&p_coordinator); + static void mtl_wait(ObLeaderCoordinator *&p_coordinator); + static void mtl_destroy(ObLeaderCoordinator *&p_coordinator); /** * @description: 当内部表更新的时候,可以通过该接口主动触发LeaderCoordinator的刷新流程,以便切主动作可以尽快完成 * @param {*} @@ -81,7 +71,8 @@ private: common::ObOccamTimer recovery_detect_timer_; // detect whether failure has occured common::ObOccamTimer failure_detect_timer_; - bool is_inited_; + common::ObOccamTimerTaskRAIIHandle refresh_priority_task_handle_; + bool is_running_; mutable ObSpinLock lock_; }; diff --git a/src/observer/omt/ob_multi_tenant.cpp b/src/observer/omt/ob_multi_tenant.cpp index b7aa48267e01b1d1a24bc15aa65b33c63cebda41..62b2134bc5ad2f97452d5d4027c63ef359fdd15c 100644 --- a/src/observer/omt/ob_multi_tenant.cpp +++ b/src/observer/omt/ob_multi_tenant.cpp @@ -326,8 +326,8 @@ int ObMultiTenant::init(ObAddr myaddr, MTL_BIND2(mtl_new_default, rootserver::ObPrimaryLSService::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default); MTL_BIND2(mtl_new_default, rootserver::ObRecoveryLSService::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default); MTL_BIND2(mtl_new_default, rootserver::ObRestoreService::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default); - MTL_BIND2(mtl_new_default, coordinator::ObLeaderCoordinator::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default); - MTL_BIND2(mtl_new_default, coordinator::ObFailureDetector::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default); + MTL_BIND2(mtl_new_default, coordinator::ObLeaderCoordinator::mtl_init, coordinator::ObLeaderCoordinator::mtl_start, coordinator::ObLeaderCoordinator::mtl_stop, coordinator::ObLeaderCoordinator::mtl_wait, coordinator::ObLeaderCoordinator::mtl_destroy); + MTL_BIND2(mtl_new_default, coordinator::ObFailureDetector::mtl_init, coordinator::ObFailureDetector::mtl_start, coordinator::ObFailureDetector::mtl_stop, coordinator::ObFailureDetector::mtl_wait, coordinator::ObFailureDetector::mtl_destroy); MTL_BIND2(ObLobManager::mtl_new, mtl_init_default, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); MTL_BIND2(mtl_new_default, ObStorageHAService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); MTL_BIND2(mtl_new_default, ObGlobalAutoIncService::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);