提交 c21b5fca 编写于 作者: O obdev 提交者: wangzelin.wzl

Fix migration tablet meta merge push high start_scn bug

上级 d687c3f8
......@@ -798,7 +798,8 @@ ObStartCompleteMigrationTask::ObStartCompleteMigrationTask()
is_inited_(false),
ls_handle_(),
ctx_(nullptr),
log_sync_scn_(0)
log_sync_scn_(0),
max_minor_end_scn_(0)
{
}
......@@ -853,6 +854,8 @@ int ObStartCompleteMigrationTask::process()
LOG_WARN("failed to check all tablet ready", K(ret), KPC(ctx_));
} else if (OB_FAIL(wait_trans_tablet_explain_data_())) {
LOG_WARN("failed to wait log replay sync", K(ret), KPC(ctx_));
} else if (OB_FAIL(wait_ls_checkpoint_ts_push_())) {
LOG_WARN("failed to wait ls checkpoint ts push", K(ret), KPC(ctx_));
} else if (OB_FAIL(update_ls_migration_status_hold_())) {
LOG_WARN("failed to update ls migration status hold", K(ret), KPC(ctx_));
} else if (OB_FAIL(change_member_list_())) {
......@@ -1256,6 +1259,12 @@ int ObStartCompleteMigrationTask::check_tablet_ready_(
LOG_WARN("tablet should not be NULL", K(ret), KP(tablet), K(tablet_handle), K(tablet_id));
} else if (tablet->get_tablet_meta().ha_status_.is_data_status_complete()
|| !tablet->get_tablet_meta().ha_status_.is_restore_status_full()) {
ObSSTableArray &minor_sstables = tablet->get_table_store().get_minor_sstables();
if (minor_sstables.empty()) {
max_minor_end_scn_ = MAX(max_minor_end_scn_, tablet->get_tablet_meta().clog_checkpoint_ts_);
} else {
max_minor_end_scn_ = MAX(max_minor_end_scn_, minor_sstables.array_[minor_sstables.count() - 1]->get_end_log_ts());
}
break;
} else {
const int64_t current_ts = ObTimeUtility::current_time();
......@@ -1286,6 +1295,103 @@ int ObStartCompleteMigrationTask::check_tablet_ready_(
return ret;
}
int ObStartCompleteMigrationTask::check_need_wait_checkpoint_ts_push_(
ObLS *ls,
bool &need_wait)
{
int ret = OB_SUCCESS;
need_wait = true;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("start complete migration task do not init", K(ret));
} else if (OB_ISNULL(ls)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("check need wait log sync get invalid argument", K(ret), KP(ls));
} else if (ObMigrationOpType::ADD_LS_OP == ctx_->arg_.type_
|| ObMigrationOpType::MIGRATE_LS_OP == ctx_->arg_.type_) {
need_wait = true;
ObLSRestoreStatus ls_restore_status;
if (OB_FAIL(ls->get_restore_status(ls_restore_status))) {
LOG_WARN("failed to get restore status", K(ret), KPC(ctx_));
} else if (!ls_restore_status.is_in_restore()) {
need_wait = true;
} else if (!ls_restore_status.can_restore_log()) {
need_wait = false;
}
}
return ret;
}
//TODO(muwei.ym) remove it later
int ObStartCompleteMigrationTask::wait_ls_checkpoint_ts_push_()
{
int ret = OB_SUCCESS;
ObLSHandle ls_handle;
ObLS *ls = nullptr;
checkpoint::ObCheckpointExecutor *checkpoint_executor = NULL;
int64_t checkpoint_ts = 0;
const int64_t MAX_WAIT_INTERVAL_BY_CHECKPOINT_BY_FLUSH = GCONF._advance_checkpoint_timeout;
const int64_t MAX_SLEEP_INTERVAL_MS = 1 * 1000 * 1000; //1s
bool is_cancel = false;
bool need_wait = true;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("start complete migration task do not init", K(ret));
} else if (OB_FAIL(ObStorageHADagUtils::get_ls(ctx_->arg_.ls_id_, ls_handle))) {
LOG_WARN("failed to get ls", K(ret), KPC(ctx_));
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls should not be NULL", K(ret), KP(ls), KPC(ctx_));
} else if (OB_FAIL(check_need_wait_checkpoint_ts_push_(ls, need_wait))) {
LOG_WARN("failed to check need wait log sync", K(ret), KPC(ls), KPC(ctx_));
} else if (!need_wait) {
LOG_INFO("no need to wait ls checkpoint ts push", K(ret), KPC(ctx_));
} else if (OB_ISNULL(checkpoint_executor = ls->get_checkpoint_executor())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("checkpoint executor should not be NULL", K(ret), KPC(ctx_), KP(checkpoint_executor));
} else {
const int64_t wait_checkpoint_push_start_ts = ObTimeUtility::current_time();
while (OB_SUCC(ret)) {
if (ctx_->is_failed()) {
ret = OB_CANCELED;
STORAGE_LOG(WARN, "ls migration task is failed, cancel wait ls check point ts push", K(ret));
} else if (OB_FAIL(SYS_TASK_STATUS_MGR.is_task_cancel(get_dag()->get_dag_id(), is_cancel))) {
STORAGE_LOG(ERROR, "failed to check is task canceled", K(ret), K(*this));
} else if (is_cancel) {
ret = OB_CANCELED;
STORAGE_LOG(WARN, "task is cancelled", K(ret), K(*this));
} else if (FALSE_IT(checkpoint_ts = ls->get_clog_checkpoint_ts())) {
} else if (checkpoint_ts >= max_minor_end_scn_) {
const int64_t cost_ts = ObTimeUtility::current_time() - wait_checkpoint_push_start_ts;
LOG_INFO("succeed wait clog checkpoint ts push", "cost", cost_ts, "ls_id", ctx_->arg_.ls_id_);
break;
} else if (OB_FAIL(checkpoint_executor->advance_checkpoint_by_flush(max_minor_end_scn_))) {
if (OB_NO_NEED_UPDATE == ret) {
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to advance checkpoint by flush", K(ret), KPC(ctx_));
}
}
if (OB_SUCC(ret)) {
const int64_t current_ts = ObTimeUtility::current_time();
if (current_ts - wait_checkpoint_push_start_ts >= MAX_WAIT_INTERVAL_BY_CHECKPOINT_BY_FLUSH) {
ret = OB_TIMEOUT;
LOG_WARN("wait ls checkpoint ts push time out",
"ls_checkpoint_ts", checkpoint_ts, "need_checkpoint_ts", max_minor_end_scn_, "ls_id", ctx_->arg_.ls_id_);
} else {
LOG_INFO("wait ls checkpoint ts push", "ls_checkpoint_ts", checkpoint_ts,
"need_checkpoint_ts", max_minor_end_scn_, "ls_id", ctx_->arg_.ls_id_);
ob_usleep(MAX_SLEEP_INTERVAL_MS);
}
}
}
}
return ret;
}
int ObStartCompleteMigrationTask::record_server_event_()
{
int ret = OB_SUCCESS;
......
......@@ -181,6 +181,10 @@ private:
int check_tablet_ready_(
const common::ObTabletID &tablet_id,
ObLS *ls);
int check_need_wait_checkpoint_ts_push_(
ObLS *ls,
bool &need_wait);
int wait_ls_checkpoint_ts_push_();
int record_server_event_();
private:
......@@ -188,6 +192,7 @@ private:
ObLSHandle ls_handle_;
ObLSCompleteMigrationCtx *ctx_;
int64_t log_sync_scn_;
int64_t max_minor_end_scn_;
DISALLOW_COPY_AND_ASSIGN(ObStartCompleteMigrationTask);
};
......
......@@ -1688,7 +1688,7 @@ ObTabletRestoreDag::~ObTabletRestoreDag()
if (OB_NOT_NULL(tablet_restore_ctx_.ha_table_info_mgr_)) {
if (OB_SUCCESS != (tmp_ret = tablet_restore_ctx_.ha_table_info_mgr_->remove_tablet_table_info(
tablet_restore_ctx_.tablet_id_))) {
LOG_ERROR("failed to remove tablet table info", K(tmp_ret), K(tablet_restore_ctx_));
LOG_WARN("failed to remove tablet table info", K(tmp_ret), K(tablet_restore_ctx_));
}
}
}
......
......@@ -1036,6 +1036,9 @@ int ObTabletTableStore::need_remove_old_table(
LOG_WARN("get invalid arguments", K(ret), K(multi_version_start));
} else if (minor_tables_.empty() || INT64_MAX == minor_tables_[0]->get_upper_trans_version()) {
// do nothing
} else if (minor_tables_[0]->get_end_log_ts() > tablet_ptr_->get_tablet_meta().clog_checkpoint_ts_) {
need_remove = false;
//TODO(muwei.ym) remove it later
} else if (minor_tables_[0]->get_upper_trans_version() <= major_tables_[0]->get_snapshot_version()) {
// at least one minor sstable is coverd by major sstable
// don't need to care about kept_multi_version_start here
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册