提交 68f54889 编写于 作者: H handora 提交者: wangzelin.wzl

[BUG] adapt redo complete

上级 2dd62c82
......@@ -439,6 +439,34 @@ int ObPartTransCtx::handle_timeout(const int64_t delay)
}
}
// go to preapre state when recover from redo complete
if (!is_follower_() && !exec_info_.is_dup_tx_ && !is_sub2pc()) {
if (ObTxState::REDO_COMPLETE == get_downstream_state()) {
if (is_local_tx_()) {
if (!is_logging_()) {
if (OB_FAIL(one_phase_commit_())) {
TRANS_LOG(WARN, "two phase commit failed", KR(ret), KPC(this));
} else {
part_trans_action_ = ObPartTransAction::COMMIT;
}
}
} else {
if (ObTxState::PREPARE > get_upstream_state()) {
bool unused = false;
if (is_2pc_logging()) {
TRANS_LOG(INFO, "committer is under logging", K(ret), K(*this));
} else if (OB_FAIL(do_prepare(unused))) {
TRANS_LOG(WARN, "do prepare failed", K(ret), K(*this));
} else {
set_upstream_state(ObTxState::PREPARE);
collected_.reset();
part_trans_action_ = ObPartTransAction::COMMIT;
}
}
}
}
}
// retry commiting for every node
if (!is_follower_() && is_committing_()) {
if (is_local_tx_()) {
......@@ -1234,6 +1262,9 @@ int ObPartTransCtx::recover_tx_ctx_table_info(const ObTxCtxTableInfo &ctx_info)
trans_id_ = ctx_info.tx_id_;
ls_id_ = ctx_info.ls_id_;
exec_info_ = ctx_info.exec_info_;
if (ObTxState::REDO_COMPLETE == get_downstream_state()) {
sub_state_.set_info_log_submitted();
}
exec_info_.multi_data_source_.reset();
if (OB_FAIL(ret)) {
// do nothing
......
......@@ -85,7 +85,6 @@ const static int64_t OB_TX_MAX_LOG_CBS = 32;
// participant transaction context
class ObPartTransCtx : public ObTransCtx,
public ObTsCbTask,
public ObTxOnePhaseCommitter,
public ObTxCycleTwoPhaseCommitter
{
friend class ObTransService;
......
......@@ -340,7 +340,7 @@ private:
// exception.
//
// NB: We should take both upstream and downstream into consideration.
int decide_downstream_msg_type_(ObTwoPhaseCommitMsgType &msg_type);
int decide_downstream_msg_type_(bool &need_submit, ObTwoPhaseCommitMsgType &msg_type);
int retransmit_downstream_msg_();
int retransmit_downstream_msg_(const uint8_t participant);
int retransmit_upstream_msg_(const ObTxState state);
......
......@@ -260,7 +260,7 @@ int ObTxCycleTwoPhaseCommitter::retransmit_upstream_msg_(const ObTxState state)
// if xa trans, prepare redo response is required
msg_type = ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REDO_RESP;
} else {
// do nothing
need_respond = false;
}
}
case ObTxState::PREPARE: {
......@@ -754,15 +754,19 @@ int ObTxCycleTwoPhaseCommitter::apply_abort_log()
const ObTxState state = get_downstream_state();
const ObTxState upstream_state = get_upstream_state();
if (ObTxState::ABORT != upstream_state) {
TRANS_LOG(WARN, "meet tx whose upstrean state is not abort", K(ret), KPC(this));
}
if (ObTxState::INIT != state
&& ObTxState::REDO_COMPLETE != state
&& ObTxState::PREPARE != state) {
// We will never apply abort under commit and clear state
ret = OB_TRANS_INVALID_STATE;
TRANS_LOG(ERROR, "apply abort with wrong state", K(state));
} else if (ObTxState::ABORT != upstream_state) {
ret = OB_TRANS_INVALID_STATE;
TRANS_LOG(ERROR, "apply invalid log", K(ret), K(*this), K(upstream_state));
// } else if (ObTxState::ABORT != upstream_state) {
// ret = OB_TRANS_INVALID_STATE;
// TRANS_LOG(ERROR, "apply invalid log", K(ret), K(*this), K(upstream_state));
} else if (OB_FAIL(on_abort())) {
TRANS_LOG(ERROR, "on abort failed", K(ret), K(*this), K(state));
} else if (OB_FAIL(set_downstream_state(ObTxState::ABORT))) {
......
......@@ -35,7 +35,7 @@ int ObTxCycleTwoPhaseCommitter::two_phase_commit()
bool no_need_submit_log = false;
//start 2pc from root
if (ObTxState::INIT != get_upstream_state()) {
if (ObTxState::PREPARE <= get_upstream_state()) {
TRANS_LOG(INFO, "already enter two phase commit", K(ret), K(*this));
} else if (is_2pc_logging()) {
TRANS_LOG(INFO, "committer is under logging", K(ret), K(*this));
......@@ -185,12 +185,13 @@ int ObTxCycleTwoPhaseCommitter::retransmit_downstream_msg_()
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
ObTwoPhaseCommitMsgType msg_type;
bool need_submit = true;
if (!is_leaf()) {
int this_part_id = get_participant_id();
if (OB_FAIL(decide_downstream_msg_type_(msg_type))) {
if (OB_FAIL(decide_downstream_msg_type_(need_submit, msg_type))) {
TRANS_LOG(WARN, "deecide downstream msg_type fail", K(ret), KPC(this));
} else {
} else if (need_submit) {
for (int64_t i = 0; i < get_participants_size(); ++i) {
if (!collected_.has_member(i) && this_part_id != i) {
TRANS_LOG(INFO, "unresponded participant", K(i), K(*this));
......@@ -204,38 +205,48 @@ int ObTxCycleTwoPhaseCommitter::retransmit_downstream_msg_()
return ret;
}
int ObTxCycleTwoPhaseCommitter::decide_downstream_msg_type_(ObTwoPhaseCommitMsgType &msg_type)
int ObTxCycleTwoPhaseCommitter::decide_downstream_msg_type_(bool &need_submit,
ObTwoPhaseCommitMsgType &msg_type)
{
int ret = OB_SUCCESS;
msg_type = ObTwoPhaseCommitMsgType::OB_MSG_TX_UNKNOWN;
need_submit = true;
switch (get_upstream_state())
{
case ObTxState::REDO_COMPLETE: {
if (is_sub2pc()) {
need_submit = true;
msg_type = ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REDO_REQ;
} else {
ret = OB_TRANS_INVALID_STATE;
TRANS_LOG(WARN, "invalid coord state", KR(ret), K(get_upstream_state()));
need_submit = false;
if (REACH_TIME_INTERVAL(1 * 1000 * 1000)) {
TRANS_LOG(WARN, "handle timeout when redo complete", KR(ret), KPC(this));
}
}
break;
}
case ObTxState::PREPARE: {
need_submit = true;
msg_type = ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REQ;
break;
}
case ObTxState::PRE_COMMIT: {
need_submit = true;
msg_type = ObTwoPhaseCommitMsgType::OB_MSG_TX_PRE_COMMIT_REQ;
break;
}
case ObTxState::COMMIT: {
need_submit = true;
msg_type = ObTwoPhaseCommitMsgType::OB_MSG_TX_COMMIT_REQ;
break;
}
case ObTxState::ABORT: {
need_submit = true;
msg_type = ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_REQ;
break;
}
case ObTxState::CLEAR: {
need_submit = true;
msg_type = ObTwoPhaseCommitMsgType::OB_MSG_TX_CLEAR_REQ;
break;
}
......@@ -251,11 +262,12 @@ int ObTxCycleTwoPhaseCommitter::retransmit_downstream_msg_(const uint8_t partici
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
bool need_submit = true;
ObTwoPhaseCommitMsgType msg_type;
if (is_leaf()) {
} else if (OB_FAIL(decide_downstream_msg_type_(msg_type))) {
} else if (OB_FAIL(decide_downstream_msg_type_(need_submit, msg_type))) {
TRANS_LOG(WARN, "decide downstream msg type fail", K(ret), KPC(this));
} else if (OB_TMP_FAIL(post_msg(msg_type, participant))) {
} else if (need_submit && OB_TMP_FAIL(post_msg(msg_type, participant))) {
TRANS_LOG(WARN, "post prepare msg failed", KR(tmp_ret), KPC(this));
}
......
......@@ -54,7 +54,6 @@ public:
// transaction committer, ObTxCycleTwoPhaseCommitter and ObTxOnePhaseCommitter based on
// participants number
class MockOb2pcCtx : public ObTxCycleTwoPhaseCommitter,
public ObTxOnePhaseCommitter,
public ObMailHandler<ObTwoPhaseCommitMsgType>
{
public:
......
......@@ -12,6 +12,8 @@
#include <gtest/gtest.h>
#include <vector>
#define private public
#define protected public
#include "storage/tx/ob_mock_tx_ctx.h"
namespace oceanbase
......@@ -105,31 +107,49 @@ TEST_F(TestMockObTxCtx, test_simple_tx_ctx1)
EXPECT_EQ(OB_SUCCESS, build_scheduler_mailbox());
ObTxCommitMsg tx_commit_msg;
MockObTxCtx::mock_tx_commit_msg(trans_id1,
ls_id1,
participants,
tx_commit_msg);
ObMail<ObTxMsg> tx_commit_mail;
EXPECT_EQ(OB_SUCCESS, tx_commit_mail.init(scheduler_addr_ /*from*/,
ctx1.get_mailbox_addr() /*to*/,
sizeof(ObTxCommitMsg),
tx_commit_msg));
ctx1.addr_memo_[ls_id2] = ctx2.addr_;
ctx1.ls_memo_[ctx2.addr_] = ls_id2;
ctx1.set_trans_type_(TransType::DIST_TRANS);
ctx1.upstream_state_ = ObTxState::INIT;
ctx1.set_downstream_state(ObTxState::REDO_COMPLETE);
ctx1.exec_info_.participants_.push_back(ls_id1);
ctx1.exec_info_.participants_.push_back(ls_id2);
ctx2.addr_memo_[ls_id1] = ctx1.addr_;
ctx2.ls_memo_[ctx1.addr_] = ls_id1;
ctx2.set_trans_type_(TransType::DIST_TRANS);
ctx2.upstream_state_ = ObTxState::PREPARE;
ctx2.exec_info_.upstream_ = ls_id1;
ctx2.log_queue_.push_back(ObTwoPhaseCommitLogType::OB_LOG_TX_PREPARE);
bool unused;
EXPECT_EQ(OB_SUCCESS, ctx2.do_prepare(unused));
EXPECT_EQ(OB_SUCCESS, ctx2.apply());
EXPECT_EQ(OB_SUCCESS, ctx2.handle_timeout(100000));
// ========== Two Phase Commit prepare Phase ==========
// ctx1 start to commit
mailbox_mgr_.send_to_head(tx_commit_mail, tx_commit_mail.to_);
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
EXPECT_EQ(OB_SUCCESS, ctx1.two_phase_commit());
// ctx2 handle prepare request
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx2 handle prepare request
EXPECT_EQ(OB_SUCCESS, ctx2.apply());
// ctx1 handle prepare response
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
// ctx1 apply prepare log
EXPECT_EQ(OB_SUCCESS, ctx1.apply());
EXPECT_NE(INT64_MAX, ctx1.mt_ctx_.trans_version_);
// ========== Two Phase Commit prepare Phase ==========
// ctx1 start to commit
// mailbox_mgr_.send_to_head(tx_commit_mail, tx_commit_mail.to_);
// EXPECT_EQ(OB_SUCCESS, ctx1.handle());
// // ctx2 handle prepare request
// EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// // ctx2 handle prepare request
// EXPECT_EQ(OB_SUCCESS, ctx2.apply());
// // ctx1 handle prepare response
// EXPECT_EQ(OB_SUCCESS, ctx1.handle());
// // ctx1 apply prepare log
// EXPECT_EQ(OB_SUCCESS, ctx1.apply());
// TODO shanyan.g
/*
// ========== Two Phase Commit pre commit Phase ======
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册