提交 12e31511 编写于 作者: Z zh0 提交者: wangzelin.wzl

解决事务消息重构引入的状态机无法结束的问题

上级 5005ca27
......@@ -651,7 +651,8 @@ int ObDistTransCtx::init(const uint64_t tenant_id, const ObTransID& trans_id, co
TRANS_LOG(WARN, "ObTransCtx inited error", KR(ret));
} else {
set_state_(Ob2PCState::INIT);
trans_2pc_timeout_ = ObServerConfig::get_instance().trx_2pc_retry_interval;
trans_2pc_timeout_ =
std::min((int64_t)ObServerConfig::get_instance().trx_2pc_retry_interval, (int64_t)MAX_TRANS_2PC_TIMEOUT_US);
}
return ret;
......
......@@ -491,9 +491,8 @@ public:
// then the total size of trans_ctx preallocated by resource pool is 200B * 100 * 1000 = 20MB.
// Taking the concurrency num into consideration, obviously, it is appropriate.
static const int64_t RP_TOTAL_NUM = 100 * 1000;
protected:
static const int64_t MAX_TRANS_2PC_TIMEOUT_US = 3 * 1000 * 1000; // 3s
protected:
// if 600 seconds after trans timeout, warn is required
static const int64_t OB_TRANS_WARN_USE_TIME = 600 * 1000 * 1000;
// 0x0078746365657266 means freectx
......
......@@ -1238,6 +1238,7 @@ int ObPartTransCtx::handle_message(const ObTransMsg& msg)
} else {
set_stc_by_now_();
}
const int64_t tmp_config = ObServerConfig::get_instance().trx_2pc_retry_interval;
// TODO do not set scheduler/coordinator/participants if state is init
if (OB_FAIL(set_app_trace_info_(msg.get_app_trace_info()))) {
TRANS_LOG(WARN, "set app trace info error", K(ret), K(msg), K(*this));
......@@ -1252,9 +1253,9 @@ int ObPartTransCtx::handle_message(const ObTransMsg& msg)
} else if (OB_FAIL(handle_2pc_prepare_request_(msg))) {
TRANS_LOG(WARN, "handle 2pc preprare request error", KR(ret), "context", *this, K(msg));
} else if (OB_FAIL(unregister_timeout_task_())) {
TRANS_LOG(WARN, "unregister timeout handler error", KR(ret), "context", *this);
} else if (OB_FAIL(register_timeout_task_(ObServerConfig::get_instance().trx_2pc_retry_interval))) {
TRANS_LOG(WARN, "register timeout handler error", KR(ret), "context", *this);
TRANS_LOG(WARN, "unregister timeout handler error", K(ret), "context", *this);
} else if (OB_FAIL(register_timeout_task_(std::min(tmp_config, (int64_t)MAX_TRANS_2PC_TIMEOUT_US)))) {
TRANS_LOG(WARN, "register timeout handler error", K(ret), "context", *this);
} else {
// do nothing
}
......@@ -2531,19 +2532,20 @@ int ObPartTransCtx::leader_active(const LeaderActiveArg& arg)
// The request_id_ should be initialized to prevent the 2pc cannot be
// driven if all participants transferring the leader
generate_request_id_();
trans_2pc_timeout_ = ObServerConfig::get_instance().trx_2pc_retry_interval;
if (Ob2PCState::INIT == get_state_()) {
const int64_t left_time = trans_expired_time_ - ObClockGenerator::getRealClock();
if (left_time > 0) {
trans_2pc_timeout_ = left_time;
} else {
trans_2pc_timeout_ = ObServerConfig::get_instance().trx_2pc_retry_interval;
trans_2pc_timeout_ = std::min(trans_2pc_timeout_, (int64_t)MAX_TRANS_2PC_TIMEOUT_US);
}
// The XA txn has replayed the last redo log
if (is_xa_local_trans() && is_redo_prepared_) {
trans_2pc_timeout_ = ObServerConfig::get_instance().trx_2pc_retry_interval;
trans_2pc_timeout_ = std::min(trans_2pc_timeout_, (int64_t)MAX_TRANS_2PC_TIMEOUT_US);
}
} else {
trans_2pc_timeout_ = ObServerConfig::get_instance().trx_2pc_retry_interval;
trans_2pc_timeout_ = std::min(trans_2pc_timeout_, (int64_t)MAX_TRANS_2PC_TIMEOUT_US);
}
// do not post transaction message to avoid deadlock, bug#8257026
// just register timeout task
......@@ -2739,14 +2741,15 @@ int ObPartTransCtx::commit(const bool is_rollback, sql::ObIEndTransCallback* cb,
// - If the txn shouldnot keep the dependency with the predecessors, rollback the txn immediately
// - If the dependecy is necessary, write the OB_LOG_SP_ELR_TRANS_COMMIT
// - Current implementation write the OB_LOG_SP_ELR_TRANS_COMMIT if the predecessors exit
const int64_t tmp_config = ObServerConfig::get_instance().trx_2pc_retry_interval;
if (OB_FAIL(set_app_trace_info_(app_trace_info))) {
TRANS_LOG(WARN, "set app trace info error", K(ret), K(app_trace_info), K(*this));
} else if (OB_FAIL(generate_sp_commit_log_type_(log_type))) {
TRANS_LOG(WARN, "generate sp commit log type error", K(ret), "context", *this);
} else if (OB_FAIL(unregister_timeout_task_())) {
TRANS_LOG(WARN, "unregister timeout handler error", KR(ret), "context", *this);
TRANS_LOG(WARN, "unregister timeout handler error", K(ret), "context", *this);
} else if (OB_FAIL(register_timeout_task_(
ObServerConfig::get_instance().trx_2pc_retry_interval + trans_id_.hash() % usec_per_sec))) {
std::min(tmp_config, (int64_t)MAX_TRANS_2PC_TIMEOUT_US) + trans_id_.hash() % usec_per_sec))) {
TRANS_LOG(WARN, "register timeout handler error", KR(ret), "context", *this);
} else if (OB_FAIL(submit_log_async_(log_type, has_redo_log))) {
TRANS_LOG(WARN, "submit sp log error", KR(ret), "context", *this);
......@@ -6144,10 +6147,11 @@ int ObPartTransCtx::generate_redo_prepare_log_info(char* buf, const int64_t size
// Record the commit version of the txn. If you can elr later, you
// need to record it in the trans_result_info_mgr
global_trans_version_ = commit_version;
int64_t tmp_config = ObServerConfig::get_instance().trx_2pc_retry_interval;
if (OB_FAIL(unregister_timeout_task_())) {
TRANS_LOG(WARN, "unregister timeout handler error", KR(ret), "context", *this);
} else if (OB_FAIL(register_timeout_task_(ObServerConfig::get_instance().trx_2pc_retry_interval))) {
TRANS_LOG(WARN, "register timeout handler error", KR(ret), "context", *this);
TRANS_LOG(WARN, "unregister timeout handler error", K(ret), "context", *this);
} else if (OB_FAIL(register_timeout_task_(std::min(tmp_config, (int64_t)MAX_TRANS_2PC_TIMEOUT_US)))) {
TRANS_LOG(WARN, "register timeout handler error", K(ret), "context", *this);
} else {
// do nothing
}
......@@ -8363,9 +8367,10 @@ int ObPartTransCtx::handle_2pc_request(const ObTrxMsgBase& msg, const int64_t ms
}
}
if (OB_SUCC(ret)) {
const int64_t tmp_config = ObServerConfig::get_instance().trx_2pc_retry_interval;
if (OB_FAIL(unregister_timeout_task_())) {
TRANS_LOG(WARN, "unregister timeout handler error", K(ret), K(*this));
} else if (OB_FAIL(register_timeout_task_(ObServerConfig::get_instance().trx_2pc_retry_interval))) {
} else if (OB_FAIL(register_timeout_task_(std::min(tmp_config, (int64_t)MAX_TRANS_2PC_TIMEOUT_US)))) {
TRANS_LOG(WARN, "register timeout handler error", K(ret), K(*this));
} else {
// do nothing
......@@ -8766,6 +8771,7 @@ int ObPartTransCtx::on_prepare_(const bool batch_committed, const int64_t timest
set_state_(Ob2PCState::PREPARE);
is_redo_prepared_ = true;
trans_2pc_timeout_ = ObServerConfig::get_instance().trx_2pc_retry_interval;
trans_2pc_timeout_ = std::min(trans_2pc_timeout_, (int64_t)MAX_TRANS_2PC_TIMEOUT_US);
if (batch_committed) {
// Set up end_log_ts_ for 1pc
end_log_ts_ = timestamp;
......
......@@ -3451,6 +3451,9 @@ int ObTransService::check_partition_status(const ObPartitionKey& partition)
TRANS_LOG(WARN, "get participant status error", K(ret), K(partition));
} else if (OB_SUCCESS != clog_status) {
ret = clog_status;
if (REACH_TIME_INTERVAL(ObTransCtx::MAX_TRANS_2PC_TIMEOUT_US)) {
(void)refresh_location_cache(partition, false);
}
} else {
// do nothing
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册