From c21b5fcaaff2df9c3c341d9b6b572b4511884202 Mon Sep 17 00:00:00 2001 From: obdev Date: Tue, 1 Nov 2022 04:08:03 +0000 Subject: [PATCH] Fix migration tablet meta merge push high start_scn bug --- .../ob_ls_complete_migration.cpp | 108 +++++++++++++++++- .../ob_ls_complete_migration.h | 5 + .../ob_tablet_group_restore.cpp | 2 +- src/storage/tablet/ob_tablet_table_store.cpp | 3 + 4 files changed, 116 insertions(+), 2 deletions(-) diff --git a/src/storage/high_availability/ob_ls_complete_migration.cpp b/src/storage/high_availability/ob_ls_complete_migration.cpp index 0af1a6d06c..93552881ab 100644 --- a/src/storage/high_availability/ob_ls_complete_migration.cpp +++ b/src/storage/high_availability/ob_ls_complete_migration.cpp @@ -798,7 +798,8 @@ ObStartCompleteMigrationTask::ObStartCompleteMigrationTask() is_inited_(false), ls_handle_(), ctx_(nullptr), - log_sync_scn_(0) + log_sync_scn_(0), + max_minor_end_scn_(0) { } @@ -853,6 +854,8 @@ int ObStartCompleteMigrationTask::process() LOG_WARN("failed to check all tablet ready", K(ret), KPC(ctx_)); } else if (OB_FAIL(wait_trans_tablet_explain_data_())) { LOG_WARN("failed to wait log replay sync", K(ret), KPC(ctx_)); + } else if (OB_FAIL(wait_ls_checkpoint_ts_push_())) { + LOG_WARN("failed to wait ls checkpoint ts push", K(ret), KPC(ctx_)); } else if (OB_FAIL(update_ls_migration_status_hold_())) { LOG_WARN("failed to update ls migration status hold", K(ret), KPC(ctx_)); } else if (OB_FAIL(change_member_list_())) { @@ -1256,6 +1259,12 @@ int ObStartCompleteMigrationTask::check_tablet_ready_( LOG_WARN("tablet should not be NULL", K(ret), KP(tablet), K(tablet_handle), K(tablet_id)); } else if (tablet->get_tablet_meta().ha_status_.is_data_status_complete() || !tablet->get_tablet_meta().ha_status_.is_restore_status_full()) { + ObSSTableArray &minor_sstables = tablet->get_table_store().get_minor_sstables(); + if (minor_sstables.empty()) { + max_minor_end_scn_ = MAX(max_minor_end_scn_, tablet->get_tablet_meta().clog_checkpoint_ts_); + } else { + max_minor_end_scn_ = MAX(max_minor_end_scn_, minor_sstables.array_[minor_sstables.count() - 1]->get_end_log_ts()); + } break; } else { const int64_t current_ts = ObTimeUtility::current_time(); @@ -1286,6 +1295,103 @@ int ObStartCompleteMigrationTask::check_tablet_ready_( return ret; } +int ObStartCompleteMigrationTask::check_need_wait_checkpoint_ts_push_( + ObLS *ls, + bool &need_wait) +{ + int ret = OB_SUCCESS; + need_wait = true; + + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("start complete migration task do not init", K(ret)); + } else if (OB_ISNULL(ls)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("check need wait log sync get invalid argument", K(ret), KP(ls)); + } else if (ObMigrationOpType::ADD_LS_OP == ctx_->arg_.type_ + || ObMigrationOpType::MIGRATE_LS_OP == ctx_->arg_.type_) { + need_wait = true; + ObLSRestoreStatus ls_restore_status; + if (OB_FAIL(ls->get_restore_status(ls_restore_status))) { + LOG_WARN("failed to get restore status", K(ret), KPC(ctx_)); + } else if (!ls_restore_status.is_in_restore()) { + need_wait = true; + } else if (!ls_restore_status.can_restore_log()) { + need_wait = false; + } + } + return ret; +} + +//TODO(muwei.ym) remove it later +int ObStartCompleteMigrationTask::wait_ls_checkpoint_ts_push_() +{ + int ret = OB_SUCCESS; + ObLSHandle ls_handle; + ObLS *ls = nullptr; + checkpoint::ObCheckpointExecutor *checkpoint_executor = NULL; + int64_t checkpoint_ts = 0; + const int64_t MAX_WAIT_INTERVAL_BY_CHECKPOINT_BY_FLUSH = GCONF._advance_checkpoint_timeout; + const int64_t MAX_SLEEP_INTERVAL_MS = 1 * 1000 * 1000; //1s + bool is_cancel = false; + bool need_wait = true; + + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("start complete migration task do not init", K(ret)); + } else if (OB_FAIL(ObStorageHADagUtils::get_ls(ctx_->arg_.ls_id_, ls_handle))) { + LOG_WARN("failed to get ls", K(ret), KPC(ctx_)); + } else if (OB_ISNULL(ls = ls_handle.get_ls())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ls should not be NULL", K(ret), KP(ls), KPC(ctx_)); + } else if (OB_FAIL(check_need_wait_checkpoint_ts_push_(ls, need_wait))) { + LOG_WARN("failed to check need wait log sync", K(ret), KPC(ls), KPC(ctx_)); + } else if (!need_wait) { + LOG_INFO("no need to wait ls checkpoint ts push", K(ret), KPC(ctx_)); + } else if (OB_ISNULL(checkpoint_executor = ls->get_checkpoint_executor())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("checkpoint executor should not be NULL", K(ret), KPC(ctx_), KP(checkpoint_executor)); + } else { + const int64_t wait_checkpoint_push_start_ts = ObTimeUtility::current_time(); + while (OB_SUCC(ret)) { + if (ctx_->is_failed()) { + ret = OB_CANCELED; + STORAGE_LOG(WARN, "ls migration task is failed, cancel wait ls check point ts push", K(ret)); + } else if (OB_FAIL(SYS_TASK_STATUS_MGR.is_task_cancel(get_dag()->get_dag_id(), is_cancel))) { + STORAGE_LOG(ERROR, "failed to check is task canceled", K(ret), K(*this)); + } else if (is_cancel) { + ret = OB_CANCELED; + STORAGE_LOG(WARN, "task is cancelled", K(ret), K(*this)); + } else if (FALSE_IT(checkpoint_ts = ls->get_clog_checkpoint_ts())) { + } else if (checkpoint_ts >= max_minor_end_scn_) { + const int64_t cost_ts = ObTimeUtility::current_time() - wait_checkpoint_push_start_ts; + LOG_INFO("succeed wait clog checkpoint ts push", "cost", cost_ts, "ls_id", ctx_->arg_.ls_id_); + break; + } else if (OB_FAIL(checkpoint_executor->advance_checkpoint_by_flush(max_minor_end_scn_))) { + if (OB_NO_NEED_UPDATE == ret) { + ret = OB_SUCCESS; + } else { + LOG_WARN("failed to advance checkpoint by flush", K(ret), KPC(ctx_)); + } + } + + if (OB_SUCC(ret)) { + const int64_t current_ts = ObTimeUtility::current_time(); + if (current_ts - wait_checkpoint_push_start_ts >= MAX_WAIT_INTERVAL_BY_CHECKPOINT_BY_FLUSH) { + ret = OB_TIMEOUT; + LOG_WARN("wait ls checkpoint ts push time out", + "ls_checkpoint_ts", checkpoint_ts, "need_checkpoint_ts", max_minor_end_scn_, "ls_id", ctx_->arg_.ls_id_); + } else { + LOG_INFO("wait ls checkpoint ts push", "ls_checkpoint_ts", checkpoint_ts, + "need_checkpoint_ts", max_minor_end_scn_, "ls_id", ctx_->arg_.ls_id_); + ob_usleep(MAX_SLEEP_INTERVAL_MS); + } + } + } + } + return ret; +} + int ObStartCompleteMigrationTask::record_server_event_() { int ret = OB_SUCCESS; diff --git a/src/storage/high_availability/ob_ls_complete_migration.h b/src/storage/high_availability/ob_ls_complete_migration.h index cef40694bb..742995af0e 100644 --- a/src/storage/high_availability/ob_ls_complete_migration.h +++ b/src/storage/high_availability/ob_ls_complete_migration.h @@ -181,6 +181,10 @@ private: int check_tablet_ready_( const common::ObTabletID &tablet_id, ObLS *ls); + int check_need_wait_checkpoint_ts_push_( + ObLS *ls, + bool &need_wait); + int wait_ls_checkpoint_ts_push_(); int record_server_event_(); private: @@ -188,6 +192,7 @@ private: ObLSHandle ls_handle_; ObLSCompleteMigrationCtx *ctx_; int64_t log_sync_scn_; + int64_t max_minor_end_scn_; DISALLOW_COPY_AND_ASSIGN(ObStartCompleteMigrationTask); }; diff --git a/src/storage/high_availability/ob_tablet_group_restore.cpp b/src/storage/high_availability/ob_tablet_group_restore.cpp index d61bdc830f..834bb524c7 100644 --- a/src/storage/high_availability/ob_tablet_group_restore.cpp +++ b/src/storage/high_availability/ob_tablet_group_restore.cpp @@ -1688,7 +1688,7 @@ ObTabletRestoreDag::~ObTabletRestoreDag() if (OB_NOT_NULL(tablet_restore_ctx_.ha_table_info_mgr_)) { if (OB_SUCCESS != (tmp_ret = tablet_restore_ctx_.ha_table_info_mgr_->remove_tablet_table_info( tablet_restore_ctx_.tablet_id_))) { - LOG_ERROR("failed to remove tablet table info", K(tmp_ret), K(tablet_restore_ctx_)); + LOG_WARN("failed to remove tablet table info", K(tmp_ret), K(tablet_restore_ctx_)); } } } diff --git a/src/storage/tablet/ob_tablet_table_store.cpp b/src/storage/tablet/ob_tablet_table_store.cpp index f9f645644c..7f66cc2a1c 100644 --- a/src/storage/tablet/ob_tablet_table_store.cpp +++ b/src/storage/tablet/ob_tablet_table_store.cpp @@ -1036,6 +1036,9 @@ int ObTabletTableStore::need_remove_old_table( LOG_WARN("get invalid arguments", K(ret), K(multi_version_start)); } else if (minor_tables_.empty() || INT64_MAX == minor_tables_[0]->get_upper_trans_version()) { // do nothing + } else if (minor_tables_[0]->get_end_log_ts() > tablet_ptr_->get_tablet_meta().clog_checkpoint_ts_) { + need_remove = false; + //TODO(muwei.ym) remove it later } else if (minor_tables_[0]->get_upper_trans_version() <= major_tables_[0]->get_snapshot_version()) { // at least one minor sstable is coverd by major sstable // don't need to care about kept_multi_version_start here -- GitLab