From 97b4595450086d66ef430de7fe011173e4dc3984 Mon Sep 17 00:00:00 2001 From: yy0 Date: Thu, 12 Aug 2021 16:05:28 +0800 Subject: [PATCH] not update schema version if replay clog while restore --- src/storage/memtable/mvcc/ob_mvcc_ctx.h | 11 +++++++++++ src/storage/memtable/ob_memtable.cpp | 13 ++++++++----- src/storage/ob_partition_service.cpp | 1 + src/storage/transaction/ob_trans_part_ctx.cpp | 12 ++++++++++++ src/storage/transaction/ob_trans_part_ctx.h | 1 + 5 files changed, 33 insertions(+), 5 deletions(-) diff --git a/src/storage/memtable/mvcc/ob_mvcc_ctx.h b/src/storage/memtable/mvcc/ob_mvcc_ctx.h index 0d4d82e600..e1c17f1bcc 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_ctx.h +++ b/src/storage/memtable/mvcc/ob_mvcc_ctx.h @@ -49,6 +49,7 @@ public: lock_start_time_(0), row_purge_version_(0), redo_log_timestamp_(0), + redo_log_id_(0), trx_lock_timeout_(-1), lock_wait_start_ts_(0), multi_version_range_(), @@ -273,10 +274,18 @@ public: { redo_log_timestamp_ = redo_log_timestamp; } + inline void set_redo_log_id(const int64_t redo_log_id) + { + redo_log_id_ = redo_log_id; + } inline int64_t get_redo_log_timestamp() const { return redo_log_timestamp_; } + inline int64_t get_redo_log_id() const + { + return redo_log_id_; + } inline int64_t get_callback_list_length() const { return trans_mgr_.count(); @@ -348,6 +357,7 @@ public: lock_start_time_ = 0; row_purge_version_ = 0; redo_log_timestamp_ = 0; + redo_log_id_ = 0; multi_version_range_.reset(); multi_version_range_.base_version_ = common::ObVersionRange::MIN_VERSION; // TODO: remove it lock_wait_start_ts_ = 0; @@ -429,6 +439,7 @@ protected: int64_t lock_start_time_; int64_t row_purge_version_; int64_t redo_log_timestamp_; + int64_t redo_log_id_; int64_t trx_lock_timeout_; int64_t lock_wait_start_ts_; common::ObVersionRange multi_version_range_; diff --git a/src/storage/memtable/ob_memtable.cpp b/src/storage/memtable/ob_memtable.cpp index 1d1445829f..999d7c2e55 100644 --- a/src/storage/memtable/ob_memtable.cpp +++ b/src/storage/memtable/ob_memtable.cpp @@ -1323,7 +1323,9 @@ int ObMemtable::replay(const ObStoreCtx& ctx, const char* data, const int64_t da const int64_t log_timestamp = mt_ctx->get_redo_log_timestamp(); // In principle, failure is not allowed here, but from implementation aspect, // the logic inside needs to deal with failures due to lack of memory - if (OB_UNLIKELY(0 >= log_timestamp) || OB_UNLIKELY(INT64_MAX == log_timestamp)) { + const int64_t log_id = mt_ctx->get_redo_log_id(); + if (OB_UNLIKELY(0 >= log_timestamp) || OB_UNLIKELY(INT64_MAX == log_timestamp) || OB_UNLIKELY(0 >= log_id) || + OB_UNLIKELY(INT64_MAX == log_id)) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(ERROR, "unexpected log timestamp", K(ret), K(*mt_ctx)); } else if (OB_FAIL(mt_ctx->set_replay_host(this, for_replay))) { @@ -1380,9 +1382,8 @@ int ObMemtable::replay(const ObStoreCtx& ctx, const char* data, const int64_t da TRANS_LOG(WARN, "failed to check standby_cluster_schema_condition", K(ret), K(table_id), K(table_version)); } else { // FIXME. + transaction::ObPartTransCtx* part_ctx = static_cast(mt_ctx->get_trans_ctx()); if (0 != flag) { - transaction::ObPartTransCtx* part_ctx = - static_cast(mt_ctx->get_trans_ctx()); if (OB_FAIL(part_ctx->replay_rollback_to(sql_no, ctx.log_ts_))) { TRANS_LOG(WARN, "replay rollback savepoint failed", K(ret), K(*mt_ctx), K(sql_no)); } else { @@ -1414,8 +1415,10 @@ int ObMemtable::replay(const ObStoreCtx& ctx, const char* data, const int64_t da K(acc_checksum)); } } else { - ctx.mem_ctx_->set_table_version(table_version); - set_max_schema_version(table_version); + if (part_ctx->need_update_schema_version(log_id, log_timestamp)) { + ctx.mem_ctx_->set_table_version(table_version); + set_max_schema_version(table_version); + } } } } diff --git a/src/storage/ob_partition_service.cpp b/src/storage/ob_partition_service.cpp index 08b47823ec..54d0fd1271 100644 --- a/src/storage/ob_partition_service.cpp +++ b/src/storage/ob_partition_service.cpp @@ -4477,6 +4477,7 @@ int ObPartitionService::replay_redo_log(const common::ObPartitionKey& pkey, cons } else { // used to order trans nodes that have not determined the commit version during replay ctx.mem_ctx_->set_redo_log_timestamp(log_timestamp); + ctx.mem_ctx_->set_redo_log_id(log_id); if (OB_FAIL(get_partition(pkey, guard))) { STORAGE_LOG(WARN, "get partition failed", K(pkey), K(ret)); } else if (OB_ISNULL(guard.get_partition_group())) { diff --git a/src/storage/transaction/ob_trans_part_ctx.cpp b/src/storage/transaction/ob_trans_part_ctx.cpp index a94adebe8c..ec04d83b54 100644 --- a/src/storage/transaction/ob_trans_part_ctx.cpp +++ b/src/storage/transaction/ob_trans_part_ctx.cpp @@ -4132,6 +4132,18 @@ bool ObPartTransCtx::need_rollback_when_restore_(const int64_t commit_version) commit_version > restore_snapshot_version; } +bool ObPartTransCtx::need_update_schema_version(const int64_t log_id, const int64_t log_ts) +{ + const int64_t restore_snapshot_version = partition_mgr_->get_restore_snapshot_version(); + const int64_t last_restore_log_id = partition_mgr_->get_last_restore_log_id(); + bool need_update = true; + if (restore_snapshot_version > 0 && (last_restore_log_id == OB_INVALID_ID || log_id <= last_restore_log_id) && + (log_ts > restore_snapshot_version)) { + need_update = false; + } + return need_update; +} + int ObPartTransCtx::trans_replay_commit_(const int64_t commit_version, const int64_t checksum) { ObTimeGuard tg("trans_replay_commit", 50 * 1000); diff --git a/src/storage/transaction/ob_trans_part_ctx.h b/src/storage/transaction/ob_trans_part_ctx.h index 87a0ee21b2..2ae2876052 100644 --- a/src/storage/transaction/ob_trans_part_ctx.h +++ b/src/storage/transaction/ob_trans_part_ctx.h @@ -389,6 +389,7 @@ public: bool is_in_trans_table_state(); virtual int64_t get_part_trans_action() const override; int rollback_stmt(const int64_t from_sql_no, const int64_t to_sql_no); + bool need_update_schema_version(const int64_t log_id, const int64_t log_ts); public: INHERIT_TO_STRING_KV("ObDistTransCtx", ObDistTransCtx, K_(snapshot_version), K_(local_trans_version), -- GitLab