提交 c9521097 编写于 作者: D dx0 提交者: wangzelin.wzl

patch code to open source branch

上级 8a519477
......@@ -41,14 +41,15 @@ int ObEndTransExecutor::end_trans(ObExecContext& ctx, ObEndTransStmt& stmt)
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("session ptr is null", K(ret));
} else if (my_session->is_in_transaction() && my_session->get_trans_desc().is_xa_local_trans()) {
transaction::ObXATransID xid = my_session->get_trans_desc().get_xid();
const transaction::ObTransDesc& trans_desc = my_session->get_trans_desc();
transaction::ObXATransID xid = trans_desc.get_xid();
if (stmt.get_is_rollback()) {
// Rollback can be executed in the xa transaction,
// the role is to roll back all modifications, but does not end the xa transaction
} else {
// commit is prohibited in xa transaction
ret = OB_TRANS_XA_ERR_COMMIT;
LOG_WARN("COMMIT is not allowed in a xa trans", K(ret), K(xid));
LOG_WARN("COMMIT is not allowed in a xa trans", K(ret), K(xid), K(trans_desc));
}
ctx.set_need_disconnect(false);
} else if (OB_FAIL(ObSqlTransControl::explicit_end_trans(ctx, stmt.get_is_rollback()))) {
......
......@@ -111,7 +111,7 @@ int ObPlXaStartExecutor::execute(ObExecContext& ctx, ObXaStartStmt& stmt)
LOG_WARN("set xid error", K(ret), K(stmt));
} else if (my_session->get_in_transaction()) {
ret = OB_TRANS_XA_OUTSIDE;
LOG_WARN("already start trans", K(ret), K(stmt.get_xa_string()));
LOG_WARN("already start trans", K(ret), K(stmt.get_xa_string()), K(trans_desc));
} else {
transaction::ObStartTransParam& start_trans_param = plan_ctx->get_start_trans_param();
init_start_trans_param(my_session, task_exec_ctx, start_trans_param);
......
......@@ -2139,6 +2139,7 @@ int ObPartitionGroup::prepare_splitting(
int ObPartitionGroup::check_cur_partition_split(bool& is_split_partition)
{
int ret = OB_SUCCESS;
const bool split_kill_trans = true;
// SpinWLockGuard guard(split_lock_);
if (OB_UNLIKELY(!is_inited_)) {
......@@ -2149,7 +2150,7 @@ int ObPartitionGroup::check_cur_partition_split(bool& is_split_partition)
} else {
is_split_partition = false;
}
if (!(GET_MIN_CLUSTER_VERSION() > CLUSTER_VERSION_3100)) {
if (split_kill_trans || !(GET_MIN_CLUSTER_VERSION() > CLUSTER_VERSION_3100)) {
is_split_partition = false;
}
......
......@@ -562,7 +562,9 @@ int ObPartitionLoopWorker::set_replay_checkpoint(const int64_t checkpoint)
const int64_t last_checkpoint = ATOMIC_LOAD(&last_checkpoint_);
const int64_t NOTICE_THRESHOLD = 3 * 1000 * 1000;
if (last_checkpoint > 0 && checkpoint - last_checkpoint > NOTICE_THRESHOLD) {
STORAGE_LOG(INFO, "replay checkpoint updated", K_(pkey), K(checkpoint), K(*this));
if (EXECUTE_COUNT_PER_SEC(10)) {
STORAGE_LOG(INFO, "replay checkpoint updated", K_(pkey), K(checkpoint), K(*this));
}
}
inc_update(&last_checkpoint_, checkpoint);
} else {
......
......@@ -185,7 +185,22 @@ void ObTransCtx::destroy()
int ObTransCtx::reset_trans_audit_record()
{
int ret = OB_SUCCESS;
CtxLockGuard guard(lock_);
if (OB_NOT_NULL(trans_audit_record_)) {
(void)trans_audit_record_->set_trans_audit_data(tenant_id_,
addr_,
trans_id_,
self_,
session_id_,
proxy_session_id_,
trans_type_,
get_uref(),
ctx_create_time_,
trans_expired_time_,
trans_param_,
get_type(),
get_status_(),
is_for_replay());
ObTransAuditRecordMgr* record_mgr = NULL;
if (OB_ISNULL(record_mgr = record_mgr_guard_.get_trans_audit_record_mgr())) {
ret = OB_ERR_UNEXPECTED;
......@@ -196,6 +211,7 @@ int ObTransCtx::reset_trans_audit_record()
trans_audit_record_ = NULL;
tlog_ = NULL;
}
record_mgr_guard_.destroy();
return ret;
}
......
......@@ -186,7 +186,7 @@ public:
TRANS_LOG(WARN, "invalid argument", K(trans_id), "ctx", OB_P(ctx_base));
tmp_ret = common::OB_INVALID_ARGUMENT;
} else {
if (OB_FAIL(ctx_base->kill(arg_, cb_array_))) {
if (OB_SUCC(ctx_base->kill(arg_, cb_array_))) {
TRANS_LOG(INFO, "kill transaction success", K(trans_id), K_(arg));
} else if (common::OB_TRANS_CANNOT_BE_KILLED == ret) {
TRANS_LOG(INFO, "transaction can not be killed", K(trans_id), "context", *ctx_base);
......@@ -199,7 +199,7 @@ public:
if (OB_SUCCESS != (tmp_ret = ctx_base->reset_trans_audit_record())) {
TRANS_LOG(WARN, "reset trans audot record failed", KR(tmp_ret));
}
ctx_base->get_record_mgr_guard().destroy();
// ctx_base->get_record_mgr_guard().destroy();
}
}
......
......@@ -2861,12 +2861,14 @@ int ObPartTransCtx::check_schema_version_elapsed(const int64_t schema_version, c
} else if (OB_UNLIKELY(schema_version <= 0) || OB_UNLIKELY(refreshed_schema_ts < 0)) {
TRANS_LOG(WARN, "invalid argument", K(schema_version), "context", *this);
ret = OB_INVALID_ARGUMENT;
} else if (is_exiting_) {
// do nothing
} else if (for_replay_) {
ret = OB_NOT_MASTER;
if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) {
TRANS_LOG(WARN, "current participant not master, need retry", K(ret), K(*this));
}
} else if (is_exiting_ || is_readonly_) {
} else if (is_readonly_) {
// do nothing
} else if (ctx_create_time_ <= refreshed_schema_ts) {
ret = OB_EAGAIN;
......
......@@ -3075,12 +3075,27 @@ int ObScheTransCtx::xa_rollback_session_terminate()
TRANS_LOG(WARN, "ObScheTransCtx not inited", K(ret));
} else if (is_terminated_) {
TRANS_LOG(INFO, "transaction is terminating", K(ret), "context", *this);
} else if (ObXATransState::ACTIVE != xa_trans_state_) {
}
/*
else if (ObXATransState::ACTIVE != xa_trans_state_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpected state", K(ret), K(xa_trans_state_), K(*this));
} else if (has_decided_()) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpected, xa trans already decided", K(ret), K(*this));
}
*/
else if (has_decided_()) {
if (!is_xa_one_phase_) {
// 已进入两阶段提交阶段的xa事务不再允许执行一阶段提交
ret = OB_TRANS_XA_PROTO;
TRANS_LOG(WARN, "xa trans has entered into two phase", "context", *this);
} else if (!is_rollback_) {
ret = OB_TRANS_XA_PROTO;
TRANS_LOG(WARN, "invalid xa trans one phase request", "context", *this);
} else {
// 断连接转发超时,不需要重试
ret = OB_SUCCESS;
}
// ret = OB_ERR_UNEXPECTED;
// TRANS_LOG(WARN, "unexpected, xa trans already decided", K(ret), K(*this));
} else if (OB_FAIL(xa_rollback_session_terminate_())) {
TRANS_LOG(WARN, "terminate xa trans failed", K(ret), K(*this));
}
......@@ -3118,7 +3133,11 @@ int ObScheTransCtx::xa_one_phase_end_trans(const bool is_rollback)
ret = OB_NOT_INIT;
} else if (OB_UNLIKELY(is_exiting_)) {
TRANS_LOG(WARN, "transaction is exiting", "context", *this);
ret = OB_TRANS_IS_EXITING;
if (is_rollback) {
ret = OB_SUCCESS;
} else {
ret = OB_TRANS_IS_EXITING;
}
} else if (is_tightly_coupled_ && !is_rollback && 1 < get_unprepared_branch_count_()) {
ret = OB_TRANS_XA_PROTO;
TRANS_LOG(
......@@ -3134,10 +3153,13 @@ int ObScheTransCtx::xa_one_phase_end_trans(const bool is_rollback)
} else {
// one-phase proxy end_trans request timeout
// if scheduler in end_trans phase, need to retry and wait ctx released
ret = OB_TRANS_XA_RETRY;
if (is_rollback) {
ret = OB_SUCCESS;
} else {
ret = OB_TRANS_XA_RETRY;
}
}
}
if (OB_SUCC(ret)) {
} else {
if (OB_FAIL(xa_one_phase_end_trans_(is_rollback))) {
TRANS_LOG(WARN, "xa one phase end trans failed", K(ret), K(is_rollback), K(*this));
}
......
......@@ -239,7 +239,7 @@ int ObXAEndTransP::process()
break;
}
}
if (is_rollback) {
if (is_rollback && is_terminated) {
// one phase rollback or terminate
if (OB_FAIL(sche_ctx->xa_rollback_session_terminate())) {
TRANS_LOG(WARN, "rollback xa trans failed", K(ret), K(xid), K(trans_id), K(is_rollback));
......
......@@ -1741,6 +1741,7 @@ int ObTransService::handle_terminate_for_xa_branch_(ObTransDesc& trans_desc)
ObTransCond cond;
// rely on timeout of cb, therefore timeout of cond is set to max
const int64_t wait_time = (INT64_MAX - now) / 2;
req.set_terminated();
if (OB_FAIL(cb.init(&cond))) {
TRANS_LOG(WARN, "ObXARPCCB init failed", KR(ret));
} else if (OB_FAIL(req.init(trans_id, xid, is_rollback /*true*/, is_terminated /*true*/))) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册