提交 6779defa 编写于 作者: O obdev 提交者: ob-robot

Fix try_retire_task error

上级 f2479fe6
......@@ -117,6 +117,25 @@ void ObLogRestoreHandler::destroy()
}
}
// To support log restore in parallel, the FetchLogTask is introduced and each task covers a range of logs, as [Min_LSN, Max_LSN),
// which means logs wiil be pulled from the log restore source and submitted to Palf with this task.
//
// Two scenarios should be handled:
// 1) Log restore is done within the leader of restore handler, which changes with the Palf
// 2) The restore end_scn maybe set to control the log pull and restore.
//
// For follower, log restore should terminate while all tasks should be freed and the restore context can be reset.
// The same as restore to_end.
//
// To generate nonoverlapping and continuous FetchLogTask, a max_submit_lsn_ in context is hold, which is the Max_LSN of the previous task
// and the Min_LSN of the current task.
//
// When role change of the restore handler, the context is reset but which is not reset when restore to_end is set.
//
// The role change is easy to distinguish with the proposal_id while restore_scn change is not,
// so reset restore context and advance issue_version, to reset start_fetch_log_lsn if restore_scn is advanced
// and free issued tasks before restore to_end(paralleled)
//
void ObLogRestoreHandler::switch_role(const common::ObRole &role, const int64_t proposal_id)
{
WLockGuard guard(lock_);
......@@ -315,7 +334,12 @@ int ObLogRestoreHandler::raw_write(const int64_t proposal_id,
context_.max_fetch_lsn_ = lsn + buf_size;
context_.max_fetch_scn_ = scn;
context_.last_fetch_ts_ = ObTimeUtility::fast_current_time();
parent_->set_to_end(scn);
if (parent_->set_to_end(scn)) {
// To stop and clear all restore log tasks and restore context, reset context and advance issue version
CLOG_LOG(INFO, "restore log to_end succ", KPC(this), KPC(parent_));
context_.reset();
context_.set_issue_version();
}
}
}
} while (0);
......@@ -388,7 +412,13 @@ int ObLogRestoreHandler::try_retire_task(ObFetchLogTask &task, bool &done)
|| task.version_ != context_.issue_version_)) {
done = true;
CLOG_LOG(INFO, "stale task, just skip it", K(task), KPC(this));
} else if (context_.max_fetch_lsn_ >= task.end_lsn_ || parent_->to_end()) {
} else if (context_.max_fetch_lsn_.is_valid() && context_.max_fetch_lsn_ >= task.end_lsn_) {
CLOG_LOG(INFO, "restore max_lsn bigger than task end_lsn, just skip it", K(task), KPC(this));
done = true;
context_.issue_task_num_--;
} else if (parent_->to_end()) {
// when restore is set to_end, issue_version_ is advanced, and all issued tasks before are stale tasks as stale issue_version_
CLOG_LOG(ERROR, "error unexpected, log restored to_end, just skip it", K(task), KPC(this), K(parent_));
done = true;
context_.issue_task_num_--;
} else if (context_.max_fetch_lsn_ >= task.start_lsn_) {
......
......@@ -238,6 +238,8 @@ int ObRemoteFetchLogImpl::get_fetch_log_base_lsn_(ObLS &ls,
|| last_fetch_ts < ObTimeUtility::current_time() - 5 * 1000 * 1000L;
if (OB_FAIL(get_palf_base_lsn_scn_(ls, end_lsn, heuristic_scn))) {
LOG_WARN("get palf base lsn failed", K(ret), K(ls));
} else if (! max_fetch_lsn.is_valid()) {
lsn = end_lsn;
} else {
lsn = ignore_restore ? end_lsn : max_fetch_lsn;
}
......
......@@ -52,13 +52,14 @@ const char *ObRemoteLogParent::get_source_type_str(const ObLogRestoreSourceType
return share::ObLogRestoreSourceItem::get_source_type_str(type);
}
void ObRemoteLogParent::set_to_end(const SCN &scn)
bool ObRemoteLogParent::set_to_end(const SCN &scn)
{
if (scn >= upper_limit_scn_) {
to_end_ = true;
end_fetch_scn_ = scn;
CLOG_LOG(INFO, "set_to_end succ", KPC(this));
}
return to_end_;
}
void ObRemoteLogParent::base_copy_to_(ObRemoteLogParent &other)
......
......@@ -46,7 +46,7 @@ public:
virtual int64_t to_string(char *buf, const int64_t buf_len) const = 0;
virtual int update_locate_info(ObRemoteLogParent &source) = 0;
bool to_end() const { return to_end_; }
void set_to_end(const share::SCN &scn);
bool set_to_end(const share::SCN &scn);
void get_end_scn(share::SCN &scn) const { scn = end_fetch_scn_;}
void get_upper_limit_scn(share::SCN &scn) const { scn = upper_limit_scn_; }
ObLogRestoreSourceType get_source_type() const { return type_; }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册