提交 51134ae4 编写于 作者: C chinaxing 提交者: ob-robot

[master][tx-route] fix proxy reopen session concurrency

上级 2c6ee70b
......@@ -216,6 +216,7 @@ ObTxDesc::ObTxDesc()
snapshot_uncertain_bound_(0),
snapshot_scn_(0),
sess_id_(0),
assoc_sess_id_(0),
global_tx_type_(ObGlobalTxType::PLAIN),
op_sn_(0), // default is from 0
state_(State::INVL),
......
......@@ -344,6 +344,7 @@ protected:
int64_t snapshot_uncertain_bound_; // uncertain bound of @snapshot_version_
int64_t snapshot_scn_; // the time of acquire @snapshot_version_
uint32_t sess_id_; // sesssion id
uint32_t assoc_sess_id_; // the session which associated with
ObGlobalTxType global_tx_type_; // global trans type, i.e., xa or dblink
uint64_t op_sn_; // Tx level operation sequence No
......@@ -494,6 +495,7 @@ public:
K_(addr),
K_(tenant_id),
"session_id", sess_id_,
"assoc_session_id", assoc_sess_id_,
"xid", PC((!xid_.empty() ? &xid_ : (ObXATransID*)nullptr)),
"xa_mode", xid_.empty() ? "" : (xa_tightly_couple_ ? "tightly" : "loosely"),
K_(xa_start_addr),
......
......@@ -64,6 +64,7 @@ inline void ObTransService::init_tx_(ObTxDesc &tx, const uint32_t session_id)
tx.tenant_id_ = tenant_id_;
tx.addr_ = self_;
tx.sess_id_ = session_id;
tx.assoc_sess_id_ = session_id;
tx.alloc_ts_ = ObClockGenerator::getClock();
tx.expire_ts_ = INT64_MAX;
tx.op_sn_ = 1;
......@@ -94,17 +95,19 @@ int ObTransService::finalize_tx_(ObTxDesc &tx)
{
int ret = OB_SUCCESS;
ObSpinLockGuard guard(tx.lock_);
tx.flags_.RELEASED_ = true;
if (tx.is_tx_active() && !tx.flags_.REPLICA_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "release tx when tx is active", K(ret), KPC(this), K(tx));
tx.print_trace_();
} else if (tx.is_committing()) {
TRANS_LOG(WARN, "release tx when tx is committing", KPC(this), K(tx));
}
tx.cancel_commit_cb();
if (tx.tx_id_.is_valid()) {
tx_desc_mgr_.remove(tx);
if (!tx.flags_.RELEASED_) {
tx.flags_.RELEASED_ = true;
if (tx.is_tx_active() && !tx.flags_.REPLICA_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "release tx when tx is active", K(ret), KPC(this), K(tx));
tx.print_trace_();
} else if (tx.is_committing()) {
TRANS_LOG(WARN, "release tx when tx is committing", KPC(this), K(tx));
}
tx.cancel_commit_cb();
if (tx.tx_id_.is_valid()) {
tx_desc_mgr_.remove(tx);
}
}
return ret;
}
......
......@@ -202,6 +202,65 @@ inline int ObTransService::txn_state_update_verify_by_version_(const ObTxnFreeRo
+ encoded_length_i64(ctx.global_version_) \
+ encoded_length_i8(ctx.flag_.v_)
int ObTransService::txn_free_route__kill_session_(const uint32_t session_id)
{
int ret = OB_SUCCESS;
sql::ObSQLSessionInfo *session = NULL;
sql::ObSessionGetterGuard guard(*GCTX.session_mgr_, session_id);
if (OB_FAIL(guard.get_session(session))) {
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
} else {
TRANS_LOG(WARN, "get session fail", K(ret), K(session_id));
}
} else if (OB_FAIL(GCTX.session_mgr_->kill_session(*session))) {
TRANS_LOG(WARN, "kill session failed", K(ret), K(session_id));
}
return ret;
}
int ObTransService::txn_free_route__handle_tx_exist_(const ObTransID &tx_id, ObTxnFreeRouteAuditRecord &audit_record, ObTxDesc *&tx)
{
int ret = OB_SUCCESS;
ObTxDesc *tmp_tx = NULL;
tx = NULL;
if (OB_FAIL(tx_desc_mgr_.get(tx_id, tmp_tx))) {
if (OB_ENTRY_NOT_EXIST != ret) {
TRANS_LOG(WARN, "get tx fail", K(ret), K(tx_id));
} else { ret = OB_SUCCESS; }
} else if (OB_ISNULL(tmp_tx)) {
} else if (!tmp_tx->is_xa_trans()) {
// some session hold this txn already, close the session and release this txn
// then continue with retry
auto assoc_sess_id = tmp_tx->assoc_sess_id_;
TRANS_LOG(WARN, "tx found associate with other session, will kill the session",
K(session_id), K(assoc_sess_id), K(tx_id));
if (OB_FAIL(txn_free_route__kill_session_(assoc_sess_id))) {
TRANS_LOG(WARN, "kill old session failed", K(ret), K(assoc_sess_id));
} else if (OB_FAIL(release_tx(*tmp_tx))) {
TRANS_LOG(WARN, "release tx failed", K(ret), K(assoc_sess_id), K(tx_id));
} else {
int tmp_ret = tx_desc_mgr_.get(tx_id, tmp_tx);
if (OB_ENTRY_NOT_EXIST != tmp_ret) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "release tx while tx is exist", K(ret), K(tx_id), K(tmp_ret));
}
if (OB_NOT_NULL(tmp_tx)) {
tx_desc_mgr_.revert(*tmp_tx);
}
}
} else if (tmp_tx->addr_ != self_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "XA-tx found but not on the orignal", K(ret), K_(self), K(tx_id), K_(tmp_tx->addr));
tx_desc_mgr_.revert(*tmp_tx);
} else {
tx = tmp_tx;
audit_record.assoc_xa_orig_ = true;
TRANS_LOG(INFO, "found XA-tx on its original, ref acquried", K(tx_id));
}
return ret;
}
int ObTransService::txn_free_route__update_static_state(const uint32_t session_id,
ObTxDesc *&tx,
ObTxnFreeRouteCtx &ctx,
......@@ -231,36 +290,14 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i
ret = txn_free_route__sanity_check_fallback_(tx, ctx);
} else {
if (OB_ISNULL(tx)) {
ObTxDesc *tmp_tx = NULL;
if (OB_FAIL(tx_desc_mgr_.get(tx_id, tmp_tx)) && OB_ENTRY_NOT_EXIST != ret) {
TRANS_LOG(WARN, "get tx fail", K(ret), K(tx_id));
} else if (OB_ISNULL(tmp_tx)) {
if (OB_FAIL(txn_free_route__handle_tx_exist_(tx_id, audit_record, tx))) {
TRANS_LOG(WARN, "handle tx exist fail", K(ret), K(tx_id));
} else if (OB_ISNULL(tx)) {
audit_record.alloc_tx_ = true;
if (OB_FAIL(acquire_tx(tmp_tx, session_id))) {
// if acquire tx failed, it may retryable:
// alloc-memory failed
if (OB_FAIL(acquire_tx(tx, session_id))) {
// if acquire tx failed, it may retryable: alloc-memory failed
TRANS_LOG(WARN, "acquire tx for decode failed", K(ret));
} else { need_add_tx = true; }
} else if (!tmp_tx->is_xa_trans()) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "tx found but not associated with this session", K(ret), K(session_id), K(tx_id));
} else if (tmp_tx->addr_ != self_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "XA-tx found but not on the orignal", K(ret), K_(self), K_(tmp_tx->addr));
} else {
audit_record.assoc_xa_orig_ = true;
TRANS_LOG(INFO, "found XA-tx on its original, ref acquried", K(tx_id));
}
if (OB_FAIL(ret)) {
if (OB_NOT_NULL(tmp_tx)) {
{
ObSpinLockGuard guard(tmp_tx->lock_);
TRANS_LOG(WARN, "acquire target tx or prepare place holder for it failed", K(ret), KPC(tmp_tx));
}
tx_desc_mgr_.revert(*tmp_tx);
}
} else {
tx = tmp_tx;
}
} else if (!tx->tx_id_.is_valid()) {
// reuse, overwrite
......
......@@ -18,3 +18,5 @@ bool need_fallback_(ObTxDesc &tx, int64_t &state_size);
int push_tx_state_to_remote_(ObTxDesc &tx, const ObAddr &txn_addr);
int txn_free_route__sanity_check_fallback_(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx);
int txn_state_update_verify_by_version_(const ObTxnFreeRouteCtx &ctx, const int64_t version);
int txn_free_route__handle_tx_exist_(const ObTransID &tx_id, ObTxnFreeRouteAuditRecord &audit_record, ObTxDesc *&tx);
int txn_free_route__kill_session_(const uint32_t session_id);
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册