提交 765bddad 编写于 作者: O obdev 提交者: wangzelin.wzl

[CP] [archive] fix invalid piece id when archive is stopping

上级 4a717404
......@@ -169,6 +169,11 @@ int ObArchiveEntryIterator::next_entry(clog::ObLogEntry& entry, bool& is_accum_c
// retry is normal
ARCHIVE_LOG(TRACE, "buffer not enough or need retry", KR(ret), K(file_id_), K(cur_offset_));
}
if (OB_EAGAIN == ret) {
// overwrite OB_EAGAIN when encrypt meta has not been ready
sleep(1); // sleep 1s to control cpu load
ret = OB_SUCCESS;
}
}
} else {
done = true;
......
......@@ -674,12 +674,8 @@ void ObArchiveMgr::do_check_switch_archive_()
has_encount_fatal_error = true;
} else if (need_start) {
// start a new archive round
if (OB_UNLIKELY(!non_frozen_piece_info.is_valid())) {
ret = OB_ERR_UNEXPECTED;
has_encount_fatal_error = true;
ARCHIVE_LOG(ERROR, "non_piece_info is not valid", K(non_frozen_piece_info), KR(ret));
} else if (OB_FAIL(non_frozen_piece_info.get_backup_piece_info(cur_piece_id, cur_piece_create_date))) {
ARCHIVE_LOG(ERROR, "failed to get_backup_piece_info", K(backup_info), K(non_frozen_piece_info), KR(ret));
if (OB_FAIL(extract_cur_piece_info_(non_frozen_piece_info, cur_piece_id, cur_piece_create_date))) {
ARCHIVE_LOG(ERROR, "failed to extract_cur_piece_info", K(backup_info), K(non_frozen_piece_info), KR(ret));
has_encount_fatal_error = true;
} else if (OB_FAIL(start_archive_(backup_info, cur_piece_id, cur_piece_create_date))) {
ARCHIVE_LOG(WARN,
......@@ -697,8 +693,12 @@ void ObArchiveMgr::do_check_switch_archive_()
}
} else if (need_force_stop) {
// force stop archive
archive_round_mgr_.set_archive_force_stop(backup_info.status_.incarnation_, backup_info.status_.round_);
ARCHIVE_LOG(INFO, "force set log_archive_status STOPPED");
if (OB_FAIL(archive_round_mgr_.set_archive_force_stop(
backup_info.status_.incarnation_, backup_info.status_.round_, backup_info.status_.backup_piece_id_))) {
ARCHIVE_LOG(ERROR, "failed to force set log_archive_status STOPPED", K(backup_info), K(ret));
} else {
ARCHIVE_LOG(INFO, "force set log_archive_status STOPPED", K(backup_info));
}
} else if (need_switch_piece) {
#ifdef ERRSIM
ret = E(EventTable::EN_LOG_ARCHIVE_BLOCK_SWITCH_PIECE) OB_SUCCESS;
......@@ -884,20 +884,27 @@ int ObArchiveMgr::check_if_need_switch_log_archive_(const ObLogArchiveBackupInfo
ARCHIVE_LOG(INFO, "need_force_stop", K(backup_info), K(rs_piece_id), K(is_in_stop_status), K(diff_ir));
} else if (ObLogArchiveStatus::STATUS::DOING == status && is_in_archive_doing_status() && !diff_ir &&
0 != cur_piece_id) {
if (OB_UNLIKELY(!non_frozen_piece_info.is_valid())) {
ret = OB_ERR_UNEXPECTED;
ARCHIVE_LOG(ERROR, "non_piece_info is not valid", K(non_frozen_piece_info), KR(ret));
} else if (OB_FAIL(non_frozen_piece_info.get_backup_piece_info(rs_piece_id, rs_piece_create_date))) {
ARCHIVE_LOG(ERROR, "failed to get_backup_piece_info", K(backup_info), K(non_frozen_piece_info), KR(ret));
if (OB_FAIL(extract_cur_piece_info_(non_frozen_piece_info, rs_piece_id, rs_piece_create_date))) {
ARCHIVE_LOG(ERROR, "failed to extract_cur_piece_info", K(backup_info), K(non_frozen_piece_info), KR(ret));
} else if (rs_piece_id == cur_piece_id) {
// do nothing
} else if (rs_piece_id == cur_piece_id + 1) {
need_switch_piece = true;
ARCHIVE_LOG(INFO, "need switch piece", K(backup_info), K(rs_piece_id), K(cur_piece_id));
} else {
ARCHIVE_LOG(ERROR, "invalid rs_piece_id", K(backup_info), K(rs_piece_id), K(cur_piece_id));
} else if (rs_piece_id < cur_piece_id) {
ret = OB_ERR_UNEXPECTED;
ObPartitionKey unused_pkey;
mark_encounter_fatal_err(unused_pkey, incarnation, round);
ARCHIVE_LOG(ERROR, "invalid rs_piece_id", KR(ret), K(backup_info), K(rs_piece_id), K(cur_piece_id));
} else {
// rs_piece_id > cur_piece_id + 1
// if only follower replica in local observer, refreshing of backup info is stucked for long time
need_switch_piece = true;
ARCHIVE_LOG(WARN,
"Attention: local backup_info may has not been refreshed for long time",
K(backup_info),
K(rs_piece_id),
K(cur_piece_id));
}
} else { /*do nothing*/
}
......@@ -933,5 +940,18 @@ bool ObArchiveMgr::check_ob_ready_()
return 0 < GCTX.server_id_ && NULL != partition_service_ && partition_service_->is_scan_disk_finished();
}
int ObArchiveMgr::extract_cur_piece_info_(
const ObNonFrozenBackupPieceInfo &non_frozen_piece_info, int64_t &rs_piece_id, int64_t &rs_piece_create_ts) const
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!non_frozen_piece_info.is_valid())) {
ret = OB_ERR_UNEXPECTED;
ARCHIVE_LOG(ERROR, "non_piece_info is not valid", K(non_frozen_piece_info), KR(ret));
} else if (OB_FAIL(non_frozen_piece_info.get_backup_piece_info(rs_piece_id, rs_piece_create_ts))) {
ARCHIVE_LOG(ERROR, "failed to get_backup_piece_info", K(non_frozen_piece_info), KR(ret));
} else { /*do nothing*/
}
return ret;
}
} // namespace archive
} // namespace oceanbase
......@@ -174,6 +174,8 @@ private:
int check_and_set_start_archive_ts_(
const int64_t incarnation, const int64_t round, const bool is_oss, int64_t& start_ts);
bool check_ob_ready_();
int extract_cur_piece_info_(
const share::ObNonFrozenBackupPieceInfo &non_piece_info, int64_t &rs_piece_id, int64_t &rs_piece_create_ts) const;
private:
bool inited_;
......
......@@ -1260,6 +1260,7 @@ int ObTenantPhysicalRestoreMeta::locate_log(const ObPGKey& restore_pg_key, const
{
int ret = OB_SUCCESS;
uint64_t last_piece_base_log_id = OB_INVALID_ID;
const int64_t start_time = ObTimeUtility::current_time();
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("tenant restore meta is not inited", K(ret), K(archive_pg_key), K(archive_pg_key), K(start_log_id));
......@@ -1284,10 +1285,12 @@ int ObTenantPhysicalRestoreMeta::locate_log(const ObPGKey& restore_pg_key, const
end_file_id))) {
if (OB_ITER_END != ret) {
LOG_WARN(
"failed to locate_start_file_id_in_piece_", K(ret), K(archive_pg_key), K(archive_pg_key), K(start_log_id));
"failed to locate_start_file_id_in_piece_", K(ret), K(restore_pg_key), K(archive_pg_key), K(start_log_id));
}
} else { /*do nothing*/
}
const int64_t cost = ObTimeUtility::current_time() - start_time;
LOG_INFO("locate_log cost", K(cost), K(restore_pg_key), K(archive_pg_key), K(start_log_id));
return ret;
}
......@@ -1443,6 +1446,7 @@ int ObTenantPhysicalRestoreMeta::locate_piece_(const ObPGKey& restore_pg_key, co
// situation with multi_piece
bool has_data_in_prev_piece = false;
// assign value to start_piece_idx
common::ObTimeGuard time_guard("[ARCHIVE_RESTORE] locate_piece", 1 * 1000 * 1000LL);
for (int64_t idx = 0; OB_SUCC(ret) && 0 > start_piece_idx && idx < total_piece_cnt; ++idx) {
ObArchiveLogFileStore* file_store = file_store_array_.at(idx);
ObArchiveKeyContent archive_key_content;
......@@ -1611,6 +1615,7 @@ int ObTenantPhysicalRestoreMeta::locate_start_file_id_in_piece_(const ObPGKey& r
uint64_t min_file_id = 0;
uint64_t max_file_id = 0;
bool archive_file_exists = true;
common::ObTimeGuard time_guard("[ARCHIVE_RESTORE] locate_start_file_id", 1000 * 1000LL);
if (OB_ISNULL(file_store)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("file_store is NULL", KR(ret), K(restore_pg_key), K(archive_pg_key), K(start_piece_idx));
......
......@@ -18,6 +18,7 @@
namespace oceanbase {
namespace archive {
using namespace oceanbase::common;
using namespace oceanbase::share;
ObArchiveRoundMgr::ObArchiveRoundMgr()
: add_pg_finish_(false),
......@@ -25,7 +26,7 @@ ObArchiveRoundMgr::ObArchiveRoundMgr()
started_pg_count_(0),
incarnation_(-1),
current_archive_round_(-1),
cur_piece_id_(0),
cur_piece_id_(OB_BACKUP_INVALID_PIECE_ID),
cur_piece_create_date_(OB_INVALID_TIMESTAMP),
compatible_(false),
is_oss_(false),
......@@ -55,7 +56,7 @@ void ObArchiveRoundMgr::destroy()
started_pg_count_ = 0;
incarnation_ = -1;
current_archive_round_ = -1;
cur_piece_id_ = 0;
cur_piece_id_ = OB_BACKUP_INVALID_PIECE_ID;
cur_piece_create_date_ = OB_INVALID_TIMESTAMP;
compatible_ = false;
is_oss_ = false;
......@@ -160,17 +161,22 @@ int ObArchiveRoundMgr::set_archive_start(const int64_t incarnation, const int64_
return ret;
}
void ObArchiveRoundMgr::set_archive_force_stop(const int64_t incarnation, const int64_t archive_round)
int ObArchiveRoundMgr::set_archive_force_stop(
const int64_t incarnation, const int64_t archive_round, const int64_t piece_id)
{
int ret = OB_SUCCESS;
WLockGuard guard(rwlock_);
if (OB_UNLIKELY(0 >= incarnation || 0 > archive_round)) {
ARCHIVE_LOG(WARN, "invalid arguments", K(incarnation), K(archive_round));
if (OB_UNLIKELY(0 >= incarnation || 0 > archive_round || piece_id < 0)) {
ret = OB_INVALID_ARGUMENT;
ARCHIVE_LOG(WARN, "invalid arguments", K(incarnation), K(archive_round), K(piece_id), K(ret));
} else {
incarnation_ = incarnation;
current_archive_round_ = archive_round;
cur_piece_id_ = piece_id;
log_archive_status_ = LOG_ARCHIVE_STOPPED;
}
return ret;
}
int ObArchiveRoundMgr::update_cur_piece_info(const int64_t incarnation, const int64_t archive_round,
......@@ -179,7 +185,7 @@ int ObArchiveRoundMgr::update_cur_piece_info(const int64_t incarnation, const in
int ret = OB_SUCCESS;
WLockGuard guard(rwlock_);
if (OB_UNLIKELY(incarnation != incarnation_ || current_archive_round_ != archive_round || 0 == cur_piece_id_ ||
new_piece_id != cur_piece_id_ + 1)) {
new_piece_id <= cur_piece_id_)) {
ret = OB_ERR_UNEXPECTED;
ARCHIVE_LOG(WARN,
"invalid arguments",
......
......@@ -84,7 +84,7 @@ public:
bool has_encounter_fatal_error(const int64_t incarnation, const int64_t archive_round);
int set_archive_start(const int64_t incarnation, const int64_t archive_round, const int64_t piece_id,
const int64_t piece_create_date, const bool is_oss, const share::ObTenantLogArchiveStatus::COMPATIBLE compatible);
void set_archive_force_stop(const int64_t incarnation, const int64_t archive_round);
int set_archive_force_stop(const int64_t incarnation, const int64_t archive_round, const int64_t piece_id);
int update_cur_piece_info(const int64_t incarnation, const int64_t archive_round, const int64_t new_piece_id,
const int64_t new_piece_create_date);
void get_archive_round_info(int64_t& incarnation, int64_t& archive_round, int64_t& cur_piece_id,
......
......@@ -640,6 +640,18 @@ int ObArchiveSender::check_need_switch_piece_(const ObPGKey& pg_key, const int64
// server_piece_id is not new enough in case of concurrent happen of switch_piece and
// switch_leader
need_switch_piece = false;
} else if (server_piece_id > pg_piece_id + 1) {
ret = OB_LOG_ARCHIVE_LEADER_CHANGED;
ARCHIVE_LOG(WARN,
"pg piece_id is too old",
K(pg_key),
K(server_incarnation),
K(server_round),
K(round),
K(incarnation),
K(server_piece_id),
K(pg_piece_id),
K(ret));
} else {
ret = OB_ERR_UNEXPECTED;
ARCHIVE_LOG(WARN,
......
......@@ -187,18 +187,25 @@ int StartArchiveHelper::get_tenant_archive_status_(ObLogArchiveSimpleInfo& info)
// get archive progress with pg_key, takeover_ts_
if (OB_FAIL(
ObLogArchiveInfoMgr::get_instance().get_log_archive_status(pg_key_.get_tenant_id(), takeover_ts_, info))) {
ARCHIVE_LOG(WARN, "get_log_archive_status fail", KR(ret));
ARCHIVE_LOG(WARN, "get_log_archive_status fail", K(pg_key_), KR(ret));
} else if (OB_UNLIKELY(!info.is_valid())) {
ret = OB_ERR_UNEXPECTED;
ARCHIVE_LOG(ERROR, "ObLogArchiveSimpleInfo is not valid", KR(ret), K(info));
} else if (share::ObLogArchiveStatus::STATUS::BEGINNING != info.status_ &&
share::ObLogArchiveStatus::STATUS::DOING != info.status_) {
ret = OB_EAGAIN;
ARCHIVE_LOG(WARN, "no doing archive status, retry later", KR(ret), K(info));
ARCHIVE_LOG(WARN, "no doing archive status, retry later", KR(ret), K(pg_key_), K(info));
} else {
tenant_archive_checkpoint_ts_ = info.checkpoint_ts_;
int64_t piece_id = archive_mgr_.get_archive_round_mgr().get_cur_piece_id();
if (OB_UNLIKELY(piece_id != info.cur_piece_id_)) {
ret = OB_EAGAIN;
if (REACH_TIME_INTERVAL(1 * 1000 * 1000L)) {
ARCHIVE_LOG(WARN, "piece_id is not consistent", KR(ret), K(pg_key_), K(info), K(piece_id));
}
} else {
tenant_archive_checkpoint_ts_ = info.checkpoint_ts_;
}
}
return ret;
}
......
......@@ -125,6 +125,10 @@ bool ObPGLogArchiveStatus::operator<(const ObPGLogArchiveStatus& other)
b_ret = true;
} else if (log_archive_round_ > other.log_archive_round_) {
b_ret = false;
} else if (cur_piece_id_ < other.cur_piece_id_) {
b_ret = true;
} else if (cur_piece_id_ > other.cur_piece_id_) {
b_ret = false;
} else if (is_stopped()) {
b_ret = false;
} else if (other.is_stopped()) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册