提交 b4a318ec 编写于 作者: C chinaxing 提交者: ob-robot

[master] XA-tx clean sanity check false positive

上级 3f3afdb0
......@@ -343,7 +343,7 @@ protected:
share::SCN snapshot_version_; // snapshot for RR | SERIAL Isolation
int64_t snapshot_uncertain_bound_; // uncertain bound of @snapshot_version_
int64_t snapshot_scn_; // the time of acquire @snapshot_version_
uint32_t sess_id_; // sesssion id
uint32_t sess_id_; // sesssion id of txn start, for XA it is XA_START session id
uint32_t assoc_sess_id_; // the session which associated with
ObGlobalTxType global_tx_type_; // global trans type, i.e., xa or dblink
......@@ -591,6 +591,7 @@ public:
ObXACtx *get_xa_ctx() { return xa_ctx_; }
void set_xid(const ObXATransID &xid) { xid_ = xid; }
void set_sessid(const uint32_t session_id) { sess_id_ = session_id; }
void set_assoc_sessid(const uint32_t session_id) { assoc_sess_id_ = session_id; }
const ObXATransID &get_xid() const { return xid_; }
bool is_xa_trans() const { return !xid_.empty(); }
bool is_xa_tightly_couple() const { return xa_tightly_couple_; }
......
......@@ -33,11 +33,9 @@ using namespace common::serialization;
int64_t MAX_STATE_SIZE = 4 * 1024; // 4KB
#endif
#define TX_START_OR_RESUME_ADDR(tx) ((tx)->is_xa_trans() ? (tx)->xa_start_addr_ : (tx)->addr_)
#define TX_START_OR_RESUME_LOCAL(tx) (TX_START_OR_RESUME_ADDR(tx) == GCONF.self_addr_)
bool ObTxnFreeRouteCtx::is_temp(const ObTxDesc &tx) const
{
//return !TX_START_OR_RESUME_LOCAL(&tx);
UNUSED(tx);
return txn_addr_.is_valid() && txn_addr_ != GCONF.self_addr_;
}
......@@ -76,7 +74,7 @@ void ObTxnFreeRouteCtx::init_before_handle_request(ObTxDesc *tx)
#endif
}
int ObTransService::clean_txn_state_(ObTxDesc *&tx, const ObTransID &tx_id)
int ObTransService::clean_txn_state_(ObTxDesc *&tx, ObTxnFreeRouteCtx &ctx, const ObTransID &tx_id)
{
int ret = OB_SUCCESS;
// sanity : the tx not started on this session
......@@ -88,7 +86,7 @@ int ObTransService::clean_txn_state_(ObTxDesc *&tx, const ObTransID &tx_id)
bool release_ref = false, release = false;
{
ObSpinLockGuard guard(tx->lock_);
if (TX_START_OR_RESUME_LOCAL(tx)) {
if (ctx.txn_addr_ == self_) {
if (tx->tx_id_ == tx_id) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "try to clean txn state on txn start node", K(ret), KPC(tx));
......@@ -285,7 +283,7 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i
} else if (flag.is_tx_terminated_) {
audit_record.upd_term_ = true;
audit_record.upd_clean_tx_ = OB_NOT_NULL(tx);
if (OB_NOT_NULL(tx) && OB_FAIL(clean_txn_state_(tx, tx_id))) {
if (OB_NOT_NULL(tx) && OB_FAIL(clean_txn_state_(tx, ctx, tx_id))) {
TRANS_LOG(WARN, "cleanup prev txn state fail", K(ret), K(tx_id), K(tx));
}
} else if (flag.is_fallback_) {
......
......@@ -12,7 +12,7 @@ int tx_free_route_check_alive(ObTxnFreeRouteCtx &ctx, const ObTxDesc &tx, const
int tx_free_route_handle_check_alive(const ObTxFreeRouteCheckAliveMsg &msg, const int retcode);
int tx_free_route_handle_push_state(const ObTxFreeRoutePushState &msg);
private:
int clean_txn_state_(ObTxDesc *&tx, const ObTransID &tx_id);
int clean_txn_state_(ObTxDesc *&tx, ObTxnFreeRouteCtx &ctx, const ObTransID &tx_id);
static int update_logic_clock_(const int64_t logic_clock);
bool need_fallback_(ObTxDesc &tx, int64_t &state_size);
int push_tx_state_to_remote_(ObTxDesc &tx, const ObAddr &txn_addr);
......
......@@ -1257,6 +1257,7 @@ int ObXAService::xa_start_(const ObXATransID &xid,
// xa_start on new session, adjust tx_desc.sess_id_
if (OB_SUCC(ret)) {
tx_desc->set_sessid(session_id);
tx_desc->set_assoc_sessid(session_id);
}
}
}
......@@ -1373,6 +1374,7 @@ int ObXAService::xa_start_join_(const ObXATransID &xid,
// xa_join/resume on new session, adjust tx_desc.sess_id_
if (OB_SUCC(ret)) {
tx_desc->set_sessid(session_id);
tx_desc->set_assoc_sessid(session_id);
}
return ret;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册