提交 c377bfd2 编写于 作者: C chinaxing 提交者: ob-robot

[master][tx-route] fix and refine tx module stop relative

上级 a336d376
......@@ -1463,14 +1463,22 @@ int ObMultiTenant::remove_tenant(const uint64_t tenant_id, bool &try_clock_succ)
} else if (OB_ISNULL(GCTX.session_mgr_)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("unexpected condition", K(ret));
} else if (OB_FAIL(GCTX.session_mgr_->kill_tenant(tenant_id))) {
LOG_ERROR("fail to kill tenant session", K(ret), K(tenant_id));
} else {
LOG_INFO("removed_tenant begin to stop", K(tenant_id));
{
SpinWLockGuard guard(lock_); //add a lock when set tenant stop, omt will check tenant has stop before calling timeup()
removed_tenant->stop();
}
LOG_INFO("removed_tenant begin to kill tenant session", K(tenant_id));
if (OB_FAIL(GCTX.session_mgr_->kill_tenant(tenant_id))) {
LOG_ERROR("fail to kill tenant session", K(ret), K(tenant_id));
{
SpinWLockGuard guard(lock_);
removed_tenant->start();
}
}
}
if (OB_SUCC(ret) && OB_NOT_NULL(removed_tenant)) {
LOG_INFO("removed_tenant begin to wait", K(tenant_id));
removed_tenant->wait();
LOG_INFO("removed_tenant begin to try wlock", K(tenant_id));
......
......@@ -400,6 +400,7 @@ public:
int init_ctx();
int init(const ObTenantMeta &meta);
void stop() { ATOMIC_STORE(&stopped_, true); }
void start() { ATOMIC_STORE(&stopped_, false); }
void wait();
void destroy();
bool has_stopped() const { return ATOMIC_LOAD(&stopped_); }
......
......@@ -701,6 +701,7 @@ public:
int set_block_frozen_memtable(memtable::ObMemtable *memtable);
void clear_block_frozen_memtable();
bool is_logging_blocked();
bool is_xa_trans() const { return !exec_info_.xid_.empty(); }
private:
int check_status_();
int tx_keepalive_response_(const int64_t status);
......
......@@ -204,7 +204,8 @@ int ObTransService::do_commit_tx_(ObTxDesc &tx,
expire_ts,
tx.trace_info_.get_app_trace_info(),
tx.op_sn_,
commit_version))
commit_version,
self_))
|| !commit_need_retry_(ret))) {
if (OB_FAIL(ret)) {
TRANS_LOG(WARN, "local ls commit tx fail", K(ret), K_(tx.coord_id), K(tx));
......@@ -1699,7 +1700,8 @@ int ObTransService::handle_trans_commit_request(ObTxCommitMsg &msg,
msg.expire_ts_,
msg.app_trace_info_,
msg.request_id_,
commit_version))) {
commit_version,
msg.sender_addr_))) {
TRANS_LOG(WARN, "handle tx commit request fail", K(ret), K(msg));
}
result.reset();
......@@ -1721,7 +1723,8 @@ int ObTransService::local_ls_commit_tx_(const ObTransID &tx_id,
const int64_t &expire_ts,
const common::ObString &app_trace_info,
const int64_t &request_id,
SCN &commit_version)
SCN &commit_version,
const common::ObAddr &caller)
{
int ret = OB_SUCCESS;
MonotonicTs commit_time = get_req_receive_mts_();
......@@ -1750,10 +1753,11 @@ int ObTransService::local_ls_commit_tx_(const ObTransID &tx_id,
}
}
}
} else {
if (OB_FAIL(ctx->commit(parts, commit_time, expire_ts, app_trace_info, request_id))) {
TRANS_LOG(WARN, "commit fail", K(ret), K(coord), K(tx_id));
}
} else if (ctx->get_scheduler() != caller) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "receive commit from not scheduler", K(ret), K(caller), K(ctx->get_scheduler()));
} else if (OB_FAIL(ctx->commit(parts, commit_time, expire_ts, app_trace_info, request_id))) {
TRANS_LOG(WARN, "commit fail", K(ret), K(coord), K(tx_id));
}
if (OB_NOT_NULL(ctx)) {
revert_tx_ctx_(ctx);
......@@ -1794,10 +1798,13 @@ int ObTransService::handle_trans_abort_request(ObTxAbortMsg &abort_req, ObTransR
// We donot respond with the abort response, because we think the abort is
// eventually always successful if we have never send the commit request
TRANS_LOG(WARN, "get transaction context error", KR(ret), K(abort_req.get_trans_id()));
} else {
if (OB_FAIL(ctx->abort(abort_req.reason_))) {
TRANS_LOG(WARN, "trans rollback error", KR(ret), K(abort_req));
}
} else if (ctx->get_scheduler() != abort_req.sender_addr_ && ctx->get_scheduler().is_valid()
// xa tmp scheduler will send abort when session is break
&& !ctx->is_xa_trans()) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "receive abort request not from scheduler.", K(ret), K(ctx->get_scheduler()));
} else if (OB_FAIL(ctx->abort(abort_req.reason_))) {
TRANS_LOG(WARN, "trans rollback error", KR(ret), K(abort_req));
}
if (OB_NOT_NULL(ctx)) {
revert_tx_ctx_(ctx);
......
......@@ -264,7 +264,8 @@ int local_ls_commit_tx_(const ObTransID &tx_id,
const int64_t &expire_ts,
const common::ObString &app_trace_info,
const int64_t &request_id,
share::SCN &commit_version);
share::SCN &commit_version,
const common::ObAddr &caller);
int get_tx_state_from_tx_table_(const share::ObLSID &lsid,
const ObTransID &tx_id,
int &state,
......
......@@ -210,17 +210,23 @@ int ObTransService::stop_tx(ObTxDesc &tx)
tx.lock_.lock();
TRANS_LOG(INFO, "stop_tx, print its trace as following", K(tx));
tx.print_trace_();
if (tx.state_ < ObTxDesc::State::IN_TERMINATE) {
abort_tx_(tx, ObTxAbortCause::STOP, true);
} else if (!tx.is_terminated()) {
unregister_commit_retry_task_(tx);
// arm callback arguments
tx.commit_out_ = OB_TRANS_UNKNOWN;
tx.state_ = ObTxDesc::State::COMMIT_UNKNOWN;
}
tx.lock_.unlock();
// run callback after unlock
tx.execute_commit_cb();
if (tx.addr_ != self_) {
// either on txn temp node or xa temp node
// depends on session cleanup to quit
TRANS_LOG(INFO, "this is not txn start node.");
} else {
if (tx.state_ < ObTxDesc::State::IN_TERMINATE) {
abort_tx_(tx, ObTxAbortCause::STOP, true);
} else if (!tx.is_terminated()) {
unregister_commit_retry_task_(tx);
// arm callback arguments
tx.commit_out_ = OB_TRANS_UNKNOWN;
tx.state_ = ObTxDesc::State::COMMIT_UNKNOWN;
}
tx.lock_.unlock();
// run callback after unlock
tx.execute_commit_cb();
}
return ret;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册