提交 6b4f10f4 编写于 作者: C Charles0429 提交者: LINGuanRen

fix wait transation bug

上级 a496e671
......@@ -9247,7 +9247,6 @@ int ObPartitionService::check_schema_version_elapsed(const ObPartitionKey& targe
K(latest_schema_version),
K(schema_version));
} else {
refreshed_schema_ts = ObTimeUtility::current_time();
const int64_t timeout = 10 * 1000 * 1000;
if (OB_FAIL(guard.get_partition_group()->get_pg_storage().replay_partition_schema_version_change_log(
schema_version))) {
......@@ -9266,7 +9265,7 @@ int ObPartitionService::check_schema_version_elapsed(const ObPartitionKey& targe
// override ret
ret = OB_EAGAIN;
} else if (OB_FAIL(pg_partition->update_build_index_schema_info(
schema_version, refreshed_schema_ts, log_id, log_ts))) {
schema_version, log_id, log_ts, refreshed_schema_ts))) {
STORAGE_LOG(WARN, "update build index schema info error", K(ret), K(target_pkey), K(pg_key), K(schema_version));
} else {
STORAGE_LOG(INFO,
......
......@@ -214,33 +214,25 @@ int ObPGPartition::get_refreshed_schema_info(int64_t& schema_version, int64_t& r
// There will be no concurrency here, so there is no need to lock
int ObPGPartition::update_build_index_schema_info(
const int64_t schema_version, const int64_t schema_refreshed_ts, const uint64_t log_id, const int64_t log_ts)
const int64_t schema_version, const uint64_t log_id, const int64_t log_ts, int64_t &schema_refreshed_ts)
{
int ret = OB_SUCCESS;
ObSpinLockGuard guard(lock_);
if (schema_version < 0 || schema_refreshed_ts < 0 || log_id <= 0 || log_ts <= 0) {
if (schema_version < 0 || log_id <= 0 || log_ts <= 0) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(schema_version), K(schema_refreshed_ts), K(log_id), K(log_ts));
TRANS_LOG(WARN, "invalid argument", K(schema_version), K(log_id), K(log_ts));
} else if (build_index_schema_version_ < schema_version) {
// First record
if (INT64_MAX == build_index_schema_version_refreshed_ts_) {
// Ensure that the schema version is updated at the end
build_index_schema_version_refreshed_ts_ = schema_refreshed_ts;
build_index_schema_version_refreshed_ts_ = ObTimeUtility::current_time();
schema_version_change_log_id_ = log_id;
schema_version_change_log_ts_ = log_ts;
ATOMIC_STORE(&build_index_schema_version_, schema_version);
} else if (build_index_schema_version_refreshed_ts_ > schema_refreshed_ts) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR,
"unexpected schema version refreshed ts",
K(ret),
K_(pkey),
K_(build_index_schema_version_refreshed_ts),
K(schema_version),
K(schema_refreshed_ts));
} else {
// Ensure that the schema version is updated at the end
build_index_schema_version_refreshed_ts_ = schema_refreshed_ts;
build_index_schema_version_refreshed_ts_ =
std::max(ObTimeUtility::current_time(), build_index_schema_version_refreshed_ts_);
schema_version_change_log_id_ = log_id;
schema_version_change_log_ts_ = log_ts;
ATOMIC_STORE(&build_index_schema_version_, schema_version);
......@@ -248,6 +240,9 @@ int ObPGPartition::update_build_index_schema_info(
} else {
// do nothing
}
if (OB_SUCC(ret)) {
schema_refreshed_ts = build_index_schema_version_refreshed_ts_;
}
return ret;
}
......
......@@ -116,7 +116,7 @@ public:
// During the indexing process, after the new version of the schema version is refreshed,
// a clog needs to be persisted
int update_build_index_schema_info(
const int64_t schema_version, const int64_t schema_refreshed_ts, const uint64_t log_id, const int64_t log_ts);
const int64_t schema_version, const uint64_t log_id, const int64_t log_ts, int64_t &schema_refreshed_ts);
int get_refreshed_schema_info(
int64_t& schema_version, int64_t& refreshed_schema_ts, uint64_t& log_id, int64_t& log_ts);
TO_STRING_KV(K_(pkey), KP_(cp_fty), KP_(storage));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册