提交 97b45954 编写于 作者: Y yy0 提交者: wangzelin.wzl

not update schema version if replay clog while restore

上级 ab743ff0
......@@ -49,6 +49,7 @@ public:
lock_start_time_(0),
row_purge_version_(0),
redo_log_timestamp_(0),
redo_log_id_(0),
trx_lock_timeout_(-1),
lock_wait_start_ts_(0),
multi_version_range_(),
......@@ -273,10 +274,18 @@ public:
{
redo_log_timestamp_ = redo_log_timestamp;
}
inline void set_redo_log_id(const int64_t redo_log_id)
{
redo_log_id_ = redo_log_id;
}
inline int64_t get_redo_log_timestamp() const
{
return redo_log_timestamp_;
}
inline int64_t get_redo_log_id() const
{
return redo_log_id_;
}
inline int64_t get_callback_list_length() const
{
return trans_mgr_.count();
......@@ -348,6 +357,7 @@ public:
lock_start_time_ = 0;
row_purge_version_ = 0;
redo_log_timestamp_ = 0;
redo_log_id_ = 0;
multi_version_range_.reset();
multi_version_range_.base_version_ = common::ObVersionRange::MIN_VERSION; // TODO: remove it
lock_wait_start_ts_ = 0;
......@@ -429,6 +439,7 @@ protected:
int64_t lock_start_time_;
int64_t row_purge_version_;
int64_t redo_log_timestamp_;
int64_t redo_log_id_;
int64_t trx_lock_timeout_;
int64_t lock_wait_start_ts_;
common::ObVersionRange multi_version_range_;
......
......@@ -1323,7 +1323,9 @@ int ObMemtable::replay(const ObStoreCtx& ctx, const char* data, const int64_t da
const int64_t log_timestamp = mt_ctx->get_redo_log_timestamp();
// In principle, failure is not allowed here, but from implementation aspect,
// the logic inside needs to deal with failures due to lack of memory
if (OB_UNLIKELY(0 >= log_timestamp) || OB_UNLIKELY(INT64_MAX == log_timestamp)) {
const int64_t log_id = mt_ctx->get_redo_log_id();
if (OB_UNLIKELY(0 >= log_timestamp) || OB_UNLIKELY(INT64_MAX == log_timestamp) || OB_UNLIKELY(0 >= log_id) ||
OB_UNLIKELY(INT64_MAX == log_id)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unexpected log timestamp", K(ret), K(*mt_ctx));
} else if (OB_FAIL(mt_ctx->set_replay_host(this, for_replay))) {
......@@ -1380,9 +1382,8 @@ int ObMemtable::replay(const ObStoreCtx& ctx, const char* data, const int64_t da
TRANS_LOG(WARN, "failed to check standby_cluster_schema_condition", K(ret), K(table_id), K(table_version));
} else {
// FIXME.
transaction::ObPartTransCtx* part_ctx = static_cast<transaction::ObPartTransCtx*>(mt_ctx->get_trans_ctx());
if (0 != flag) {
transaction::ObPartTransCtx* part_ctx =
static_cast<transaction::ObPartTransCtx*>(mt_ctx->get_trans_ctx());
if (OB_FAIL(part_ctx->replay_rollback_to(sql_no, ctx.log_ts_))) {
TRANS_LOG(WARN, "replay rollback savepoint failed", K(ret), K(*mt_ctx), K(sql_no));
} else {
......@@ -1414,8 +1415,10 @@ int ObMemtable::replay(const ObStoreCtx& ctx, const char* data, const int64_t da
K(acc_checksum));
}
} else {
ctx.mem_ctx_->set_table_version(table_version);
set_max_schema_version(table_version);
if (part_ctx->need_update_schema_version(log_id, log_timestamp)) {
ctx.mem_ctx_->set_table_version(table_version);
set_max_schema_version(table_version);
}
}
}
}
......
......@@ -4477,6 +4477,7 @@ int ObPartitionService::replay_redo_log(const common::ObPartitionKey& pkey, cons
} else {
// used to order trans nodes that have not determined the commit version during replay
ctx.mem_ctx_->set_redo_log_timestamp(log_timestamp);
ctx.mem_ctx_->set_redo_log_id(log_id);
if (OB_FAIL(get_partition(pkey, guard))) {
STORAGE_LOG(WARN, "get partition failed", K(pkey), K(ret));
} else if (OB_ISNULL(guard.get_partition_group())) {
......
......@@ -4132,6 +4132,18 @@ bool ObPartTransCtx::need_rollback_when_restore_(const int64_t commit_version)
commit_version > restore_snapshot_version;
}
bool ObPartTransCtx::need_update_schema_version(const int64_t log_id, const int64_t log_ts)
{
const int64_t restore_snapshot_version = partition_mgr_->get_restore_snapshot_version();
const int64_t last_restore_log_id = partition_mgr_->get_last_restore_log_id();
bool need_update = true;
if (restore_snapshot_version > 0 && (last_restore_log_id == OB_INVALID_ID || log_id <= last_restore_log_id) &&
(log_ts > restore_snapshot_version)) {
need_update = false;
}
return need_update;
}
int ObPartTransCtx::trans_replay_commit_(const int64_t commit_version, const int64_t checksum)
{
ObTimeGuard tg("trans_replay_commit", 50 * 1000);
......
......@@ -389,6 +389,7 @@ public:
bool is_in_trans_table_state();
virtual int64_t get_part_trans_action() const override;
int rollback_stmt(const int64_t from_sql_no, const int64_t to_sql_no);
bool need_update_schema_version(const int64_t log_id, const int64_t log_ts);
public:
INHERIT_TO_STRING_KV("ObDistTransCtx", ObDistTransCtx, K_(snapshot_version), K_(local_trans_version),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册