diff --git a/src/sql/engine/cmd/ob_tcl_executor.cpp b/src/sql/engine/cmd/ob_tcl_executor.cpp index decb0baa6d60c9b2323904bcde58840880561f09..47ebee21932ccec423268dcc57bbad82c3101a48 100644 --- a/src/sql/engine/cmd/ob_tcl_executor.cpp +++ b/src/sql/engine/cmd/ob_tcl_executor.cpp @@ -41,14 +41,15 @@ int ObEndTransExecutor::end_trans(ObExecContext& ctx, ObEndTransStmt& stmt) ret = OB_ERR_UNEXPECTED; LOG_ERROR("session ptr is null", K(ret)); } else if (my_session->is_in_transaction() && my_session->get_trans_desc().is_xa_local_trans()) { - transaction::ObXATransID xid = my_session->get_trans_desc().get_xid(); + const transaction::ObTransDesc& trans_desc = my_session->get_trans_desc(); + transaction::ObXATransID xid = trans_desc.get_xid(); if (stmt.get_is_rollback()) { // Rollback can be executed in the xa transaction, // the role is to roll back all modifications, but does not end the xa transaction } else { // commit is prohibited in xa transaction ret = OB_TRANS_XA_ERR_COMMIT; - LOG_WARN("COMMIT is not allowed in a xa trans", K(ret), K(xid)); + LOG_WARN("COMMIT is not allowed in a xa trans", K(ret), K(xid), K(trans_desc)); } ctx.set_need_disconnect(false); } else if (OB_FAIL(ObSqlTransControl::explicit_end_trans(ctx, stmt.get_is_rollback()))) { diff --git a/src/sql/engine/cmd/ob_xa_executor.cpp b/src/sql/engine/cmd/ob_xa_executor.cpp index c497ac87a1df922c2382827cd499bd92058d5c76..8cba947e8cc94890413bd135653774f886880d9c 100644 --- a/src/sql/engine/cmd/ob_xa_executor.cpp +++ b/src/sql/engine/cmd/ob_xa_executor.cpp @@ -111,7 +111,7 @@ int ObPlXaStartExecutor::execute(ObExecContext& ctx, ObXaStartStmt& stmt) LOG_WARN("set xid error", K(ret), K(stmt)); } else if (my_session->get_in_transaction()) { ret = OB_TRANS_XA_OUTSIDE; - LOG_WARN("already start trans", K(ret), K(stmt.get_xa_string())); + LOG_WARN("already start trans", K(ret), K(stmt.get_xa_string()), K(trans_desc)); } else { transaction::ObStartTransParam& start_trans_param = plan_ctx->get_start_trans_param(); init_start_trans_param(my_session, task_exec_ctx, start_trans_param); diff --git a/src/storage/ob_partition_group.cpp b/src/storage/ob_partition_group.cpp index 2f569b7d54e844ef78677e3394f8b0a3d209cdec..f412c18f94623d5b852f65b5d2066ebe5c51d66f 100644 --- a/src/storage/ob_partition_group.cpp +++ b/src/storage/ob_partition_group.cpp @@ -2139,6 +2139,7 @@ int ObPartitionGroup::prepare_splitting( int ObPartitionGroup::check_cur_partition_split(bool& is_split_partition) { int ret = OB_SUCCESS; + const bool split_kill_trans = true; // SpinWLockGuard guard(split_lock_); if (OB_UNLIKELY(!is_inited_)) { @@ -2149,7 +2150,7 @@ int ObPartitionGroup::check_cur_partition_split(bool& is_split_partition) } else { is_split_partition = false; } - if (!(GET_MIN_CLUSTER_VERSION() > CLUSTER_VERSION_3100)) { + if (split_kill_trans || !(GET_MIN_CLUSTER_VERSION() > CLUSTER_VERSION_3100)) { is_split_partition = false; } diff --git a/src/storage/ob_partition_loop_worker.cpp b/src/storage/ob_partition_loop_worker.cpp index 154683c15d0894cae2b96fe637d74d0acb471986..084ede5f9850531faf7f035653ec14847f3197eb 100644 --- a/src/storage/ob_partition_loop_worker.cpp +++ b/src/storage/ob_partition_loop_worker.cpp @@ -562,7 +562,9 @@ int ObPartitionLoopWorker::set_replay_checkpoint(const int64_t checkpoint) const int64_t last_checkpoint = ATOMIC_LOAD(&last_checkpoint_); const int64_t NOTICE_THRESHOLD = 3 * 1000 * 1000; if (last_checkpoint > 0 && checkpoint - last_checkpoint > NOTICE_THRESHOLD) { - STORAGE_LOG(INFO, "replay checkpoint updated", K_(pkey), K(checkpoint), K(*this)); + if (EXECUTE_COUNT_PER_SEC(10)) { + STORAGE_LOG(INFO, "replay checkpoint updated", K_(pkey), K(checkpoint), K(*this)); + } } inc_update(&last_checkpoint_, checkpoint); } else { diff --git a/src/storage/transaction/ob_trans_ctx.cpp b/src/storage/transaction/ob_trans_ctx.cpp index 936e05163d7b4e66471ac6fe803a0fc4db261ddc..ddeae5e1fe975b7cb7f74e04e1e15b6296b16095 100644 --- a/src/storage/transaction/ob_trans_ctx.cpp +++ b/src/storage/transaction/ob_trans_ctx.cpp @@ -185,7 +185,22 @@ void ObTransCtx::destroy() int ObTransCtx::reset_trans_audit_record() { int ret = OB_SUCCESS; + CtxLockGuard guard(lock_); if (OB_NOT_NULL(trans_audit_record_)) { + (void)trans_audit_record_->set_trans_audit_data(tenant_id_, + addr_, + trans_id_, + self_, + session_id_, + proxy_session_id_, + trans_type_, + get_uref(), + ctx_create_time_, + trans_expired_time_, + trans_param_, + get_type(), + get_status_(), + is_for_replay()); ObTransAuditRecordMgr* record_mgr = NULL; if (OB_ISNULL(record_mgr = record_mgr_guard_.get_trans_audit_record_mgr())) { ret = OB_ERR_UNEXPECTED; @@ -196,6 +211,7 @@ int ObTransCtx::reset_trans_audit_record() trans_audit_record_ = NULL; tlog_ = NULL; } + record_mgr_guard_.destroy(); return ret; } diff --git a/src/storage/transaction/ob_trans_functor.h b/src/storage/transaction/ob_trans_functor.h index a8627c7f9e2ce10adb9b00fd94378237925e99b8..b47781fb63dcf60edbbb8142810c9f8e9c838ad3 100644 --- a/src/storage/transaction/ob_trans_functor.h +++ b/src/storage/transaction/ob_trans_functor.h @@ -186,7 +186,7 @@ public: TRANS_LOG(WARN, "invalid argument", K(trans_id), "ctx", OB_P(ctx_base)); tmp_ret = common::OB_INVALID_ARGUMENT; } else { - if (OB_FAIL(ctx_base->kill(arg_, cb_array_))) { + if (OB_SUCC(ctx_base->kill(arg_, cb_array_))) { TRANS_LOG(INFO, "kill transaction success", K(trans_id), K_(arg)); } else if (common::OB_TRANS_CANNOT_BE_KILLED == ret) { TRANS_LOG(INFO, "transaction can not be killed", K(trans_id), "context", *ctx_base); @@ -199,7 +199,7 @@ public: if (OB_SUCCESS != (tmp_ret = ctx_base->reset_trans_audit_record())) { TRANS_LOG(WARN, "reset trans audot record failed", KR(tmp_ret)); } - ctx_base->get_record_mgr_guard().destroy(); + // ctx_base->get_record_mgr_guard().destroy(); } } diff --git a/src/storage/transaction/ob_trans_part_ctx.cpp b/src/storage/transaction/ob_trans_part_ctx.cpp index 981b1096b0f47146a1b8f6a52c742dcd80fe1a14..141c6c8ad217b60ba031ecfb84137ed871c074d8 100644 --- a/src/storage/transaction/ob_trans_part_ctx.cpp +++ b/src/storage/transaction/ob_trans_part_ctx.cpp @@ -2861,12 +2861,14 @@ int ObPartTransCtx::check_schema_version_elapsed(const int64_t schema_version, c } else if (OB_UNLIKELY(schema_version <= 0) || OB_UNLIKELY(refreshed_schema_ts < 0)) { TRANS_LOG(WARN, "invalid argument", K(schema_version), "context", *this); ret = OB_INVALID_ARGUMENT; + } else if (is_exiting_) { + // do nothing } else if (for_replay_) { ret = OB_NOT_MASTER; if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) { TRANS_LOG(WARN, "current participant not master, need retry", K(ret), K(*this)); } - } else if (is_exiting_ || is_readonly_) { + } else if (is_readonly_) { // do nothing } else if (ctx_create_time_ <= refreshed_schema_ts) { ret = OB_EAGAIN; diff --git a/src/storage/transaction/ob_trans_sche_ctx.cpp b/src/storage/transaction/ob_trans_sche_ctx.cpp index 62d9d27bf938fc29a5b4d846eaec43dc2b2231ee..56cbda7732090e5ad4863d13a9f70ed99b97ffee 100644 --- a/src/storage/transaction/ob_trans_sche_ctx.cpp +++ b/src/storage/transaction/ob_trans_sche_ctx.cpp @@ -3075,12 +3075,27 @@ int ObScheTransCtx::xa_rollback_session_terminate() TRANS_LOG(WARN, "ObScheTransCtx not inited", K(ret)); } else if (is_terminated_) { TRANS_LOG(INFO, "transaction is terminating", K(ret), "context", *this); - } else if (ObXATransState::ACTIVE != xa_trans_state_) { + } + /* + else if (ObXATransState::ACTIVE != xa_trans_state_) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "unexpected state", K(ret), K(xa_trans_state_), K(*this)); - } else if (has_decided_()) { - ret = OB_ERR_UNEXPECTED; - TRANS_LOG(WARN, "unexpected, xa trans already decided", K(ret), K(*this)); + } + */ + else if (has_decided_()) { + if (!is_xa_one_phase_) { + // 已进入两阶段提交阶段的xa事务不再允许执行一阶段提交 + ret = OB_TRANS_XA_PROTO; + TRANS_LOG(WARN, "xa trans has entered into two phase", "context", *this); + } else if (!is_rollback_) { + ret = OB_TRANS_XA_PROTO; + TRANS_LOG(WARN, "invalid xa trans one phase request", "context", *this); + } else { + // 断连接转发超时,不需要重试 + ret = OB_SUCCESS; + } + // ret = OB_ERR_UNEXPECTED; + // TRANS_LOG(WARN, "unexpected, xa trans already decided", K(ret), K(*this)); } else if (OB_FAIL(xa_rollback_session_terminate_())) { TRANS_LOG(WARN, "terminate xa trans failed", K(ret), K(*this)); } @@ -3118,7 +3133,11 @@ int ObScheTransCtx::xa_one_phase_end_trans(const bool is_rollback) ret = OB_NOT_INIT; } else if (OB_UNLIKELY(is_exiting_)) { TRANS_LOG(WARN, "transaction is exiting", "context", *this); - ret = OB_TRANS_IS_EXITING; + if (is_rollback) { + ret = OB_SUCCESS; + } else { + ret = OB_TRANS_IS_EXITING; + } } else if (is_tightly_coupled_ && !is_rollback && 1 < get_unprepared_branch_count_()) { ret = OB_TRANS_XA_PROTO; TRANS_LOG( @@ -3134,10 +3153,13 @@ int ObScheTransCtx::xa_one_phase_end_trans(const bool is_rollback) } else { // one-phase proxy end_trans request timeout // if scheduler in end_trans phase, need to retry and wait ctx released - ret = OB_TRANS_XA_RETRY; + if (is_rollback) { + ret = OB_SUCCESS; + } else { + ret = OB_TRANS_XA_RETRY; + } } - } - if (OB_SUCC(ret)) { + } else { if (OB_FAIL(xa_one_phase_end_trans_(is_rollback))) { TRANS_LOG(WARN, "xa one phase end trans failed", K(ret), K(is_rollback), K(*this)); } diff --git a/src/storage/transaction/ob_xa_rpc.cpp b/src/storage/transaction/ob_xa_rpc.cpp index 1d2ea563934a98d569766b866d2c557db08ba629..9357e616dc9324d571fb3c76677ca486643e03fe 100644 --- a/src/storage/transaction/ob_xa_rpc.cpp +++ b/src/storage/transaction/ob_xa_rpc.cpp @@ -239,7 +239,7 @@ int ObXAEndTransP::process() break; } } - if (is_rollback) { + if (is_rollback && is_terminated) { // one phase rollback or terminate if (OB_FAIL(sche_ctx->xa_rollback_session_terminate())) { TRANS_LOG(WARN, "rollback xa trans failed", K(ret), K(xid), K(trans_id), K(is_rollback)); diff --git a/src/storage/transaction/ob_xa_trans_service.cpp b/src/storage/transaction/ob_xa_trans_service.cpp index 4e3b06fc06df12cfed81ea7dca58bc0dcbf67f08..bf8c6f10c9a2df2e73086105e435fc8455d6035f 100644 --- a/src/storage/transaction/ob_xa_trans_service.cpp +++ b/src/storage/transaction/ob_xa_trans_service.cpp @@ -1741,6 +1741,7 @@ int ObTransService::handle_terminate_for_xa_branch_(ObTransDesc& trans_desc) ObTransCond cond; // rely on timeout of cb, therefore timeout of cond is set to max const int64_t wait_time = (INT64_MAX - now) / 2; + req.set_terminated(); if (OB_FAIL(cb.init(&cond))) { TRANS_LOG(WARN, "ObXARPCCB init failed", KR(ret)); } else if (OB_FAIL(req.init(trans_id, xid, is_rollback /*true*/, is_terminated /*true*/))) {