From 765bddadcaf341c028e9a0478bb0636db4e220f4 Mon Sep 17 00:00:00 2001 From: obdev Date: Tue, 16 Aug 2022 20:34:00 +0800 Subject: [PATCH] [CP] [archive] fix invalid piece id when archive is stopping --- src/archive/ob_archive_entry_iterator.cpp | 5 +++ src/archive/ob_archive_mgr.cpp | 50 ++++++++++++++++------- src/archive/ob_archive_mgr.h | 2 + src/archive/ob_archive_restore_engine.cpp | 7 +++- src/archive/ob_archive_round_mgr.cpp | 18 +++++--- src/archive/ob_archive_round_mgr.h | 2 +- src/archive/ob_archive_sender.cpp | 12 ++++++ src/archive/ob_start_archive_helper.cpp | 15 +++++-- src/clog/ob_log_define.cpp | 4 ++ 9 files changed, 88 insertions(+), 27 deletions(-) diff --git a/src/archive/ob_archive_entry_iterator.cpp b/src/archive/ob_archive_entry_iterator.cpp index 364290df2e..fefa9e138b 100644 --- a/src/archive/ob_archive_entry_iterator.cpp +++ b/src/archive/ob_archive_entry_iterator.cpp @@ -169,6 +169,11 @@ int ObArchiveEntryIterator::next_entry(clog::ObLogEntry& entry, bool& is_accum_c // retry is normal ARCHIVE_LOG(TRACE, "buffer not enough or need retry", KR(ret), K(file_id_), K(cur_offset_)); } + if (OB_EAGAIN == ret) { + // overwrite OB_EAGAIN when encrypt meta has not been ready + sleep(1); // sleep 1s to control cpu load + ret = OB_SUCCESS; + } } } else { done = true; diff --git a/src/archive/ob_archive_mgr.cpp b/src/archive/ob_archive_mgr.cpp index 7f4e76e9e7..c04a674196 100644 --- a/src/archive/ob_archive_mgr.cpp +++ b/src/archive/ob_archive_mgr.cpp @@ -674,12 +674,8 @@ void ObArchiveMgr::do_check_switch_archive_() has_encount_fatal_error = true; } else if (need_start) { // start a new archive round - if (OB_UNLIKELY(!non_frozen_piece_info.is_valid())) { - ret = OB_ERR_UNEXPECTED; - has_encount_fatal_error = true; - ARCHIVE_LOG(ERROR, "non_piece_info is not valid", K(non_frozen_piece_info), KR(ret)); - } else if (OB_FAIL(non_frozen_piece_info.get_backup_piece_info(cur_piece_id, cur_piece_create_date))) { - ARCHIVE_LOG(ERROR, "failed to get_backup_piece_info", K(backup_info), K(non_frozen_piece_info), KR(ret)); + if (OB_FAIL(extract_cur_piece_info_(non_frozen_piece_info, cur_piece_id, cur_piece_create_date))) { + ARCHIVE_LOG(ERROR, "failed to extract_cur_piece_info", K(backup_info), K(non_frozen_piece_info), KR(ret)); has_encount_fatal_error = true; } else if (OB_FAIL(start_archive_(backup_info, cur_piece_id, cur_piece_create_date))) { ARCHIVE_LOG(WARN, @@ -697,8 +693,12 @@ void ObArchiveMgr::do_check_switch_archive_() } } else if (need_force_stop) { // force stop archive - archive_round_mgr_.set_archive_force_stop(backup_info.status_.incarnation_, backup_info.status_.round_); - ARCHIVE_LOG(INFO, "force set log_archive_status STOPPED"); + if (OB_FAIL(archive_round_mgr_.set_archive_force_stop( + backup_info.status_.incarnation_, backup_info.status_.round_, backup_info.status_.backup_piece_id_))) { + ARCHIVE_LOG(ERROR, "failed to force set log_archive_status STOPPED", K(backup_info), K(ret)); + } else { + ARCHIVE_LOG(INFO, "force set log_archive_status STOPPED", K(backup_info)); + } } else if (need_switch_piece) { #ifdef ERRSIM ret = E(EventTable::EN_LOG_ARCHIVE_BLOCK_SWITCH_PIECE) OB_SUCCESS; @@ -884,20 +884,27 @@ int ObArchiveMgr::check_if_need_switch_log_archive_(const ObLogArchiveBackupInfo ARCHIVE_LOG(INFO, "need_force_stop", K(backup_info), K(rs_piece_id), K(is_in_stop_status), K(diff_ir)); } else if (ObLogArchiveStatus::STATUS::DOING == status && is_in_archive_doing_status() && !diff_ir && 0 != cur_piece_id) { - if (OB_UNLIKELY(!non_frozen_piece_info.is_valid())) { - ret = OB_ERR_UNEXPECTED; - ARCHIVE_LOG(ERROR, "non_piece_info is not valid", K(non_frozen_piece_info), KR(ret)); - } else if (OB_FAIL(non_frozen_piece_info.get_backup_piece_info(rs_piece_id, rs_piece_create_date))) { - ARCHIVE_LOG(ERROR, "failed to get_backup_piece_info", K(backup_info), K(non_frozen_piece_info), KR(ret)); + if (OB_FAIL(extract_cur_piece_info_(non_frozen_piece_info, rs_piece_id, rs_piece_create_date))) { + ARCHIVE_LOG(ERROR, "failed to extract_cur_piece_info", K(backup_info), K(non_frozen_piece_info), KR(ret)); } else if (rs_piece_id == cur_piece_id) { // do nothing } else if (rs_piece_id == cur_piece_id + 1) { need_switch_piece = true; ARCHIVE_LOG(INFO, "need switch piece", K(backup_info), K(rs_piece_id), K(cur_piece_id)); - } else { - ARCHIVE_LOG(ERROR, "invalid rs_piece_id", K(backup_info), K(rs_piece_id), K(cur_piece_id)); + } else if (rs_piece_id < cur_piece_id) { + ret = OB_ERR_UNEXPECTED; ObPartitionKey unused_pkey; mark_encounter_fatal_err(unused_pkey, incarnation, round); + ARCHIVE_LOG(ERROR, "invalid rs_piece_id", KR(ret), K(backup_info), K(rs_piece_id), K(cur_piece_id)); + } else { + // rs_piece_id > cur_piece_id + 1 + // if only follower replica in local observer, refreshing of backup info is stucked for long time + need_switch_piece = true; + ARCHIVE_LOG(WARN, + "Attention: local backup_info may has not been refreshed for long time", + K(backup_info), + K(rs_piece_id), + K(cur_piece_id)); } } else { /*do nothing*/ } @@ -933,5 +940,18 @@ bool ObArchiveMgr::check_ob_ready_() return 0 < GCTX.server_id_ && NULL != partition_service_ && partition_service_->is_scan_disk_finished(); } +int ObArchiveMgr::extract_cur_piece_info_( + const ObNonFrozenBackupPieceInfo &non_frozen_piece_info, int64_t &rs_piece_id, int64_t &rs_piece_create_ts) const +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!non_frozen_piece_info.is_valid())) { + ret = OB_ERR_UNEXPECTED; + ARCHIVE_LOG(ERROR, "non_piece_info is not valid", K(non_frozen_piece_info), KR(ret)); + } else if (OB_FAIL(non_frozen_piece_info.get_backup_piece_info(rs_piece_id, rs_piece_create_ts))) { + ARCHIVE_LOG(ERROR, "failed to get_backup_piece_info", K(non_frozen_piece_info), KR(ret)); + } else { /*do nothing*/ + } + return ret; +} } // namespace archive } // namespace oceanbase diff --git a/src/archive/ob_archive_mgr.h b/src/archive/ob_archive_mgr.h index 1eda20089a..dbf8935e86 100644 --- a/src/archive/ob_archive_mgr.h +++ b/src/archive/ob_archive_mgr.h @@ -174,6 +174,8 @@ private: int check_and_set_start_archive_ts_( const int64_t incarnation, const int64_t round, const bool is_oss, int64_t& start_ts); bool check_ob_ready_(); + int extract_cur_piece_info_( + const share::ObNonFrozenBackupPieceInfo &non_piece_info, int64_t &rs_piece_id, int64_t &rs_piece_create_ts) const; private: bool inited_; diff --git a/src/archive/ob_archive_restore_engine.cpp b/src/archive/ob_archive_restore_engine.cpp index cab67ef092..88deb1b6b4 100644 --- a/src/archive/ob_archive_restore_engine.cpp +++ b/src/archive/ob_archive_restore_engine.cpp @@ -1260,6 +1260,7 @@ int ObTenantPhysicalRestoreMeta::locate_log(const ObPGKey& restore_pg_key, const { int ret = OB_SUCCESS; uint64_t last_piece_base_log_id = OB_INVALID_ID; + const int64_t start_time = ObTimeUtility::current_time(); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("tenant restore meta is not inited", K(ret), K(archive_pg_key), K(archive_pg_key), K(start_log_id)); @@ -1284,10 +1285,12 @@ int ObTenantPhysicalRestoreMeta::locate_log(const ObPGKey& restore_pg_key, const end_file_id))) { if (OB_ITER_END != ret) { LOG_WARN( - "failed to locate_start_file_id_in_piece_", K(ret), K(archive_pg_key), K(archive_pg_key), K(start_log_id)); + "failed to locate_start_file_id_in_piece_", K(ret), K(restore_pg_key), K(archive_pg_key), K(start_log_id)); } } else { /*do nothing*/ } + const int64_t cost = ObTimeUtility::current_time() - start_time; + LOG_INFO("locate_log cost", K(cost), K(restore_pg_key), K(archive_pg_key), K(start_log_id)); return ret; } @@ -1443,6 +1446,7 @@ int ObTenantPhysicalRestoreMeta::locate_piece_(const ObPGKey& restore_pg_key, co // situation with multi_piece bool has_data_in_prev_piece = false; // assign value to start_piece_idx + common::ObTimeGuard time_guard("[ARCHIVE_RESTORE] locate_piece", 1 * 1000 * 1000LL); for (int64_t idx = 0; OB_SUCC(ret) && 0 > start_piece_idx && idx < total_piece_cnt; ++idx) { ObArchiveLogFileStore* file_store = file_store_array_.at(idx); ObArchiveKeyContent archive_key_content; @@ -1611,6 +1615,7 @@ int ObTenantPhysicalRestoreMeta::locate_start_file_id_in_piece_(const ObPGKey& r uint64_t min_file_id = 0; uint64_t max_file_id = 0; bool archive_file_exists = true; + common::ObTimeGuard time_guard("[ARCHIVE_RESTORE] locate_start_file_id", 1000 * 1000LL); if (OB_ISNULL(file_store)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("file_store is NULL", KR(ret), K(restore_pg_key), K(archive_pg_key), K(start_piece_idx)); diff --git a/src/archive/ob_archive_round_mgr.cpp b/src/archive/ob_archive_round_mgr.cpp index 82aeb55d6c..8a6b3d1a31 100644 --- a/src/archive/ob_archive_round_mgr.cpp +++ b/src/archive/ob_archive_round_mgr.cpp @@ -18,6 +18,7 @@ namespace oceanbase { namespace archive { using namespace oceanbase::common; +using namespace oceanbase::share; ObArchiveRoundMgr::ObArchiveRoundMgr() : add_pg_finish_(false), @@ -25,7 +26,7 @@ ObArchiveRoundMgr::ObArchiveRoundMgr() started_pg_count_(0), incarnation_(-1), current_archive_round_(-1), - cur_piece_id_(0), + cur_piece_id_(OB_BACKUP_INVALID_PIECE_ID), cur_piece_create_date_(OB_INVALID_TIMESTAMP), compatible_(false), is_oss_(false), @@ -55,7 +56,7 @@ void ObArchiveRoundMgr::destroy() started_pg_count_ = 0; incarnation_ = -1; current_archive_round_ = -1; - cur_piece_id_ = 0; + cur_piece_id_ = OB_BACKUP_INVALID_PIECE_ID; cur_piece_create_date_ = OB_INVALID_TIMESTAMP; compatible_ = false; is_oss_ = false; @@ -160,17 +161,22 @@ int ObArchiveRoundMgr::set_archive_start(const int64_t incarnation, const int64_ return ret; } -void ObArchiveRoundMgr::set_archive_force_stop(const int64_t incarnation, const int64_t archive_round) +int ObArchiveRoundMgr::set_archive_force_stop( + const int64_t incarnation, const int64_t archive_round, const int64_t piece_id) { + int ret = OB_SUCCESS; WLockGuard guard(rwlock_); - if (OB_UNLIKELY(0 >= incarnation || 0 > archive_round)) { - ARCHIVE_LOG(WARN, "invalid arguments", K(incarnation), K(archive_round)); + if (OB_UNLIKELY(0 >= incarnation || 0 > archive_round || piece_id < 0)) { + ret = OB_INVALID_ARGUMENT; + ARCHIVE_LOG(WARN, "invalid arguments", K(incarnation), K(archive_round), K(piece_id), K(ret)); } else { incarnation_ = incarnation; current_archive_round_ = archive_round; + cur_piece_id_ = piece_id; log_archive_status_ = LOG_ARCHIVE_STOPPED; } + return ret; } int ObArchiveRoundMgr::update_cur_piece_info(const int64_t incarnation, const int64_t archive_round, @@ -179,7 +185,7 @@ int ObArchiveRoundMgr::update_cur_piece_info(const int64_t incarnation, const in int ret = OB_SUCCESS; WLockGuard guard(rwlock_); if (OB_UNLIKELY(incarnation != incarnation_ || current_archive_round_ != archive_round || 0 == cur_piece_id_ || - new_piece_id != cur_piece_id_ + 1)) { + new_piece_id <= cur_piece_id_)) { ret = OB_ERR_UNEXPECTED; ARCHIVE_LOG(WARN, "invalid arguments", diff --git a/src/archive/ob_archive_round_mgr.h b/src/archive/ob_archive_round_mgr.h index 62e92caea9..be1b08b471 100644 --- a/src/archive/ob_archive_round_mgr.h +++ b/src/archive/ob_archive_round_mgr.h @@ -84,7 +84,7 @@ public: bool has_encounter_fatal_error(const int64_t incarnation, const int64_t archive_round); int set_archive_start(const int64_t incarnation, const int64_t archive_round, const int64_t piece_id, const int64_t piece_create_date, const bool is_oss, const share::ObTenantLogArchiveStatus::COMPATIBLE compatible); - void set_archive_force_stop(const int64_t incarnation, const int64_t archive_round); + int set_archive_force_stop(const int64_t incarnation, const int64_t archive_round, const int64_t piece_id); int update_cur_piece_info(const int64_t incarnation, const int64_t archive_round, const int64_t new_piece_id, const int64_t new_piece_create_date); void get_archive_round_info(int64_t& incarnation, int64_t& archive_round, int64_t& cur_piece_id, diff --git a/src/archive/ob_archive_sender.cpp b/src/archive/ob_archive_sender.cpp index 4e569e64ca..aabc1baba9 100644 --- a/src/archive/ob_archive_sender.cpp +++ b/src/archive/ob_archive_sender.cpp @@ -640,6 +640,18 @@ int ObArchiveSender::check_need_switch_piece_(const ObPGKey& pg_key, const int64 // server_piece_id is not new enough in case of concurrent happen of switch_piece and // switch_leader need_switch_piece = false; + } else if (server_piece_id > pg_piece_id + 1) { + ret = OB_LOG_ARCHIVE_LEADER_CHANGED; + ARCHIVE_LOG(WARN, + "pg piece_id is too old", + K(pg_key), + K(server_incarnation), + K(server_round), + K(round), + K(incarnation), + K(server_piece_id), + K(pg_piece_id), + K(ret)); } else { ret = OB_ERR_UNEXPECTED; ARCHIVE_LOG(WARN, diff --git a/src/archive/ob_start_archive_helper.cpp b/src/archive/ob_start_archive_helper.cpp index fc67421bd5..6f4388755f 100644 --- a/src/archive/ob_start_archive_helper.cpp +++ b/src/archive/ob_start_archive_helper.cpp @@ -187,18 +187,25 @@ int StartArchiveHelper::get_tenant_archive_status_(ObLogArchiveSimpleInfo& info) // get archive progress with pg_key, takeover_ts_ if (OB_FAIL( ObLogArchiveInfoMgr::get_instance().get_log_archive_status(pg_key_.get_tenant_id(), takeover_ts_, info))) { - ARCHIVE_LOG(WARN, "get_log_archive_status fail", KR(ret)); + ARCHIVE_LOG(WARN, "get_log_archive_status fail", K(pg_key_), KR(ret)); } else if (OB_UNLIKELY(!info.is_valid())) { ret = OB_ERR_UNEXPECTED; ARCHIVE_LOG(ERROR, "ObLogArchiveSimpleInfo is not valid", KR(ret), K(info)); } else if (share::ObLogArchiveStatus::STATUS::BEGINNING != info.status_ && share::ObLogArchiveStatus::STATUS::DOING != info.status_) { ret = OB_EAGAIN; - ARCHIVE_LOG(WARN, "no doing archive status, retry later", KR(ret), K(info)); + ARCHIVE_LOG(WARN, "no doing archive status, retry later", KR(ret), K(pg_key_), K(info)); } else { - tenant_archive_checkpoint_ts_ = info.checkpoint_ts_; + int64_t piece_id = archive_mgr_.get_archive_round_mgr().get_cur_piece_id(); + if (OB_UNLIKELY(piece_id != info.cur_piece_id_)) { + ret = OB_EAGAIN; + if (REACH_TIME_INTERVAL(1 * 1000 * 1000L)) { + ARCHIVE_LOG(WARN, "piece_id is not consistent", KR(ret), K(pg_key_), K(info), K(piece_id)); + } + } else { + tenant_archive_checkpoint_ts_ = info.checkpoint_ts_; + } } - return ret; } diff --git a/src/clog/ob_log_define.cpp b/src/clog/ob_log_define.cpp index 803e825c31..6c0615b342 100644 --- a/src/clog/ob_log_define.cpp +++ b/src/clog/ob_log_define.cpp @@ -125,6 +125,10 @@ bool ObPGLogArchiveStatus::operator<(const ObPGLogArchiveStatus& other) b_ret = true; } else if (log_archive_round_ > other.log_archive_round_) { b_ret = false; + } else if (cur_piece_id_ < other.cur_piece_id_) { + b_ret = true; + } else if (cur_piece_id_ > other.cur_piece_id_) { + b_ret = false; } else if (is_stopped()) { b_ret = false; } else if (other.is_stopped()) { -- GitLab