diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 0db4beb876721c1c8b0cd3f4cfc0ebb6b22884ec..6a25ef5863f5b58004a1f737548c47171b4d726f 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -4587,9 +4587,7 @@ int ObPartTransCtx::switch_to_leader(const SCN &start_working_ts) } else { TRANS_LOG(WARN, "txn data incomplete, will be aborted", K(contain_table_lock), KPC(this)); if (has_persisted_log_()) { - if (OB_FAIL(do_local_tx_end_(TxEndAction::ABORT_TX))) { - TRANS_LOG(WARN, "abort tx failed", KR(ret), K(*this)); - } + sub_state_.set_force_abort(); } else { TRANS_LOG(ERROR, "unexpected trx which has not persisted log", K(*this)); } @@ -4762,7 +4760,9 @@ int ObPartTransCtx::switch_to_follower_gracefully(ObIArray & } else if (is_exiting_) { // do nothing } else if (sub_state_.is_force_abort()) { - // is aborting, skip + if (exec_info_.state_ == ObTxState::INIT && OB_FAIL(mt_ctx_.commit_to_replay())) { + TRANS_LOG(WARN, "commit to replay error", KR(ret), "context", *this); + } } else if (is_follower_()) { TRANS_LOG(INFO, "current tx already follower", K(*this)); } else if (OB_FAIL(state_helper.switch_state(TxCtxOps::SWITCH_GRACEFUL))) { @@ -5567,7 +5567,10 @@ int ObPartTransCtx::sub_prepare(const ObLSArray &parts, exec_info_.trans_type_ = TransType::DIST_TRANS; exec_info_.xid_ = xid; // (void)set_sub2pc_coord_state(Ob2PCPrepareState::REDO_PREPARING); - if (OB_FAIL(prepare_redo())) { + if (sub_state_.is_force_abort()) { + TRANS_LOG(WARN, "tx was marked force abort"); + ret = OB_TRANS_KILLED; + } else if (OB_FAIL(prepare_redo())) { TRANS_LOG(WARN, "fail to execute sub prepare", K(ret), KPC(this)); } else { part_trans_action_ = ObPartTransAction::COMMIT; @@ -5799,7 +5802,7 @@ int ObPartTransCtx::rollback_to_savepoint(const int64_t op_sn, ret = OB_TRANS_SQL_SEQUENCE_ILLEGAL; } else if (op_sn > last_op_sn_ && last_scn_ <= to_scn) { last_op_sn_ = op_sn; - TRANS_LOG(INFO, "rollback succeed trivially", K(op_sn), K(to_scn), K_(last_scn)); + TRANS_LOG(INFO, "rollback succeed trivially", K(op_sn), K(to_scn), K_(last_scn), KP(this), K_(ls_id)); } else if (op_sn > last_op_sn_ && pending_write_ > 0) { ret = OB_NEED_RETRY; TRANS_LOG(WARN, "has pending write, rollback blocked", @@ -6034,7 +6037,13 @@ int ObPartTransCtx::do_local_tx_end_(TxEndAction tx_end_action) switch (tx_end_action) { case TxEndAction::COMMIT_TX: { - ret = do_local_commit_tx_(); + if (sub_state_.is_force_abort()) { + if (OB_SUCC(do_local_abort_tx_())) { + ret = OB_TRANS_KILLED; + } + } else { + ret = do_local_commit_tx_(); + } // part_trans_action_ will be set as commit in ObPartTransCtx::commit function break; } diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index e19b2d7940a1d0064c9c54837aae38e3bbf36571..36063f218b8709812c45472abc1fc031f117a821 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -619,6 +619,7 @@ protected: // Caller need ensuere the participants array has already been set and the // size of the participants array is larger or equal than one. + virtual int do_prepare_redo(); virtual int do_prepare(bool &no_need_submit_log) override; virtual int on_prepare() override; virtual int do_pre_commit(bool &need_wait) override; diff --git a/src/storage/tx/ob_two_phase_committer.h b/src/storage/tx/ob_two_phase_committer.h index 489872aa3a5187d9d87021c0eb52a1fa2d390715..820b2eda5fc8143e4b8380eccb049db9b19ef6a3 100644 --- a/src/storage/tx/ob_two_phase_committer.h +++ b/src/storage/tx/ob_two_phase_committer.h @@ -197,6 +197,7 @@ public: // // NB: The implementation need guarantee the method is failure atomic, So the // method should never report an error. + virtual int do_prepare_redo() = 0; virtual int do_prepare(bool &no_need_submit_log) = 0; virtual int do_pre_commit(bool& need_wait) = 0; virtual int do_commit() = 0; diff --git a/src/storage/tx/ob_two_phase_committer_xa.cpp b/src/storage/tx/ob_two_phase_committer_xa.cpp index 8d5a9a43b12f8154d1d85bd8db11aa4a2c6fde4f..bcc2b860fe32e8bc35e30f024962cdcd89fae7e1 100644 --- a/src/storage/tx/ob_two_phase_committer_xa.cpp +++ b/src/storage/tx/ob_two_phase_committer_xa.cpp @@ -107,6 +107,17 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_redo_request_impl_() TRANS_LOG(WARN, "unexpected operation", K(ret), K(*this)); } else if (is_2pc_logging()) { TRANS_LOG(INFO, "committer is under logging", K(ret), K(*this)); + } else if (OB_FAIL(do_prepare_redo())) { + TRANS_LOG(WARN, "do prepare redo fail", K(ret)); + if (OB_FAIL(drive_self_2pc_phase(ObTxState::ABORT))) { + TRANS_LOG(WARN, "drive self abort fail", KR(tmp_ret), KPC(this)); + } + if (is_internal() && OB_TMP_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_REQ))) { + TRANS_LOG(WARN, "post downstream abort msg failed", KR(tmp_ret), KPC(this)); + } + if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_RESP, OB_C2PC_UPSTREAM_ID))) { + TRANS_LOG(WARN, "post upstream abort resp msg failed", KR(tmp_ret), KPC(this)); + } } else if (OB_TMP_FAIL(submit_log(ObTwoPhaseCommitLogType::OB_LOG_TX_COMMIT_INFO))) { if (OB_BLOCK_FROZEN == tmp_ret) { // memtable is freezing, can not submit log right now. diff --git a/src/storage/tx/ob_two_phase_downstream_committer.cpp b/src/storage/tx/ob_two_phase_downstream_committer.cpp index af9f269ad847484ba94d70d528c37d53557bf55c..32311f534e4c82ed7f206b382c791cca37aadcf8 100644 --- a/src/storage/tx/ob_two_phase_downstream_committer.cpp +++ b/src/storage/tx/ob_two_phase_downstream_committer.cpp @@ -337,7 +337,16 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_request_impl_() { TRANS_LOG(WARN, "apply msg failed", K(ret), KPC(this)); } else if (OB_FAIL(drive_self_2pc_phase(ObTxState::PREPARE))) { TRANS_LOG(WARN, "do prepare failed", K(ret), K(*this)); - } else { + if (OB_TRANS_NEED_ROLLBACK == ret) { + if (OB_FAIL(drive_self_2pc_phase(ObTxState::ABORT))) { + TRANS_LOG(WARN, "drive abort failed", K(ret), K(*this)); + } else if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_RESP, OB_C2PC_UPSTREAM_ID))) { + TRANS_LOG(WARN, "post abort resp msg failed", K(tmp_ret), K(*this)); + } + } + } + + if (OB_SUCC(ret)) { switch (get_2pc_role()) { case Ob2PCRole::ROOT: { ret = OB_ERR_UNEXPECTED; @@ -345,9 +354,8 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_request_impl_() { break; } case Ob2PCRole::INTERNAL: { - - if (OB_TMP_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REQ))) { - TRANS_LOG(WARN, "post prepare msg failed", KR(ret)); + if (OB_TMP_FAIL(retransmit_downstream_msg_())) { + TRANS_LOG(WARN, "post downstream msg failed", KR(ret)); } break; } diff --git a/src/storage/tx/ob_two_phase_upstream_committer.cpp b/src/storage/tx/ob_two_phase_upstream_committer.cpp index 51b5d65804dcc7501248764640eacd09f72c5e16..3b189e47271fa198fb37e4a407df889f03f8a3a9 100644 --- a/src/storage/tx/ob_two_phase_upstream_committer.cpp +++ b/src/storage/tx/ob_two_phase_upstream_committer.cpp @@ -40,6 +40,14 @@ int ObTxCycleTwoPhaseCommitter::two_phase_commit() TRANS_LOG(INFO, "committer is under logging", K(ret), K(*this)); } else if (OB_FAIL(drive_self_2pc_phase(ObTxState::PREPARE))) { TRANS_LOG(WARN, "enter prepare phase failed", K(ret), K(*this)); + if (OB_TRANS_NEED_ROLLBACK == ret) { + if (OB_FAIL(drive_self_2pc_phase(ObTxState::ABORT))) { + TRANS_LOG(WARN, "enter abort phase failed", K(ret), K(*this)); + } else if (OB_TMP_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_REQ))) { + TRANS_LOG(WARN, "post prepare requests failed", K(tmp_ret)); + } + ret = OB_TRANS_KILLED; + } } else { if (OB_TMP_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REQ))) { TRANS_LOG(WARN, "post prepare requests failed", K(tmp_ret)); diff --git a/src/storage/tx/ob_tx_2pc_ctx_impl.cpp b/src/storage/tx/ob_tx_2pc_ctx_impl.cpp index 2668d2e2e63318c95068272ca380885ec89e9f27..0d913f3a348acca778f70e256e5750c63ba958d7 100644 --- a/src/storage/tx/ob_tx_2pc_ctx_impl.cpp +++ b/src/storage/tx/ob_tx_2pc_ctx_impl.cpp @@ -65,6 +65,16 @@ int ObPartTransCtx::restart_2pc_trans_timer_() return ret; } +int ObPartTransCtx::do_prepare_redo() +{ + int ret = OB_SUCCESS; + if (sub_state_.is_force_abort()) { + ret = OB_TRANS_NEED_ROLLBACK; + TRANS_LOG(WARN, "tx was marked force abort", K(ret)); + } + return ret; +} + /* * If no_need_submit_log is true, it will not submit prepare log after do_prepare. * XA and dup_table will submit commit info log in do_prepare and drive to submit prepare log after the conditions are met. @@ -74,8 +84,11 @@ int ObPartTransCtx::do_prepare(bool &no_need_submit_log) { int ret = OB_SUCCESS; no_need_submit_log = false; - - if (exec_info_.is_dup_tx_ || OB_SUCC(search_unsubmitted_dup_table_redo_())) { + if (sub_state_.is_force_abort()) { + // txn has marked force_abort, prepare should fail + ret = OB_TRANS_NEED_ROLLBACK; + TRANS_LOG(WARN, "tx was marked force_abort", K(ret)); + } else if (exec_info_.is_dup_tx_ || OB_SUCC(search_unsubmitted_dup_table_redo_())) { no_need_submit_log = true; if (OB_FAIL(dup_table_tx_redo_sync_())) { TRANS_LOG(WARN, "dup table tx redo sync failed", K(ret)); diff --git a/src/storage/tx/ob_tx_api.cpp b/src/storage/tx/ob_tx_api.cpp index 56f7eb2c056594288cd8ece6a9353ddd4eb21891..796f106517d10828af0088cef05628de3ccbe041 100644 --- a/src/storage/tx/ob_tx_api.cpp +++ b/src/storage/tx/ob_tx_api.cpp @@ -1411,7 +1411,7 @@ inline int ObTransService::rollback_savepoint_slowpath_(ObTxDesc &tx, ARRAY_FOREACH_NORET(parts, i) { if (parts[i].epoch_ <= 0) { int64_t len = tx.get_serialize_size() + sizeof(ObTxDesc); - char *buf = (char*)ob_malloc(len); + char *buf = (char*)ob_malloc(len, "TxRollbackSP"); int64_t pos = sizeof(ObTxDesc); if (OB_FAIL(tx.serialize(buf, len, pos))) { TRANS_LOG(WARN, "serialize tx fail", KR(ret), K(tx)); diff --git a/src/storage/tx/ob_tx_msg.cpp b/src/storage/tx/ob_tx_msg.cpp index 79a877b84bbb6fa636d4f48f1ff18a94efc571b2..309999ba1b8c5b1ee011b8665a4f2d9516500486 100644 --- a/src/storage/tx/ob_tx_msg.cpp +++ b/src/storage/tx/ob_tx_msg.cpp @@ -91,7 +91,7 @@ OB_DEF_DESERIALIZE(ObTxRollbackSPMsg) bool has_tx_ptr = false; OB_UNIS_DECODE(has_tx_ptr); if (has_tx_ptr) { - void *buffer = ob_malloc(sizeof(ObTxDesc)); + void *buffer = ob_malloc(sizeof(ObTxDesc), "TxRollbackSP"); if (OB_ISNULL(buffer)) { ret = OB_ALLOCATE_MEMORY_FAILED; } else { diff --git a/unittest/storage/tx/ob_mock_2pc_ctx.cpp b/unittest/storage/tx/ob_mock_2pc_ctx.cpp index a17a4d8df4a8082b21d3a3250a9268bed8538b5c..6a43ae5270466e08f38cecfb1d3a464d2055a913 100644 --- a/unittest/storage/tx/ob_mock_2pc_ctx.cpp +++ b/unittest/storage/tx/ob_mock_2pc_ctx.cpp @@ -74,7 +74,10 @@ int MockOb2pcCtx::commit(const MockObParticipants& participants) participants_.assign(participants.begin(), participants.end()); return two_phase_commit(); } - +int MockOb2pcCtx::do_prepare_redo() +{ + return OB_SUCCESS; +} int MockOb2pcCtx::do_prepare(bool &no_need_submit_log) { no_need_submit_log = false; diff --git a/unittest/storage/tx/ob_mock_2pc_ctx.h b/unittest/storage/tx/ob_mock_2pc_ctx.h index 8a8bfec2d7c35a274d12c41702c0fa822ad874ca..93fa405c9b9af31d43316a71fc4362f00beadd98 100644 --- a/unittest/storage/tx/ob_mock_2pc_ctx.h +++ b/unittest/storage/tx/ob_mock_2pc_ctx.h @@ -94,6 +94,7 @@ protected: // decide final transaction state. In Oceanbase's optimized, do_pre_commit is used to // optimize single machine read latency and do/on_clear is used to maintain the state // to recovery + virtual int do_prepare_redo() override; virtual int do_prepare(bool &no_need_submit_log) override; virtual int on_prepare() override; virtual int do_pre_commit(bool& need_wait) override;