diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index e1f7b670fe961961bed9dca9799011ab06e33272..b7c3a4979ef48d5515ef4093fe2613ce998e6973 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -366,13 +366,14 @@ int ObPartTransCtx::trans_kill_() int ret = OB_SUCCESS; TRANS_LOG(INFO, "trans killed", K(trans_id_)); + mt_ctx_.trans_kill(); + if (ctx_tx_data_.get_state() == ObTxData::RUNNING) { if (OB_FAIL(ctx_tx_data_.set_state(ObTxData::ABORT))) { TRANS_LOG(WARN, "set abort state in ctx_tx_data_ failed", K(ret)); } } - mt_ctx_.trans_kill(); return ret; } @@ -1540,19 +1541,82 @@ int ObPartTransCtx::update_max_commit_version_() return ret; } +// Unified interface for normal transaction end(both commit and abort). We We +// want to integrate the following five things that all txn commits should do. +// +// 1.end_log_ts: We set end_log_ts during final log state is synced which must +// have been done, so we check the validation of end_log_ts here(Maybe set it in +// this function is better?) +// 2.commit_version: We set commit version after submit the commit log for local +// tx and during the do_prepare for dist tx which must have been done, so we +// check the validation of commit_version here(Maybe set it in this function is +// better?) +// 3.mt_ctx.tx_end: We need callback all txn ops for all data in txn after final +// state is synced. It must be called for all txns to clean and release its data +// resource. +// 4.set_state: We need set state after final state is synced. It tells others +// that all data for this txn is decided and visible. +// 5.insert_tx_data: We need insert into tx_data in order to cleanot data which +// need be delay cleanout +// +// NB: You need pay much attention to the order of the following steps +// TODO: Integrate trans_kill and trans_replay_end into the same function +int ObPartTransCtx::tx_end_(const bool commit) +{ + int ret = OB_SUCCESS; + + // NB: The order of the following steps is critical + // We need first set end_code_ for the s + int32_t state = commit ? ObTxData::COMMIT : ObTxData::ABORT; + int64_t commit_version = ctx_tx_data_.get_commit_version(); + int64_t end_log_ts = ctx_tx_data_.get_end_log_ts(); + + // STEP1: We need check whether the end_log_ts is valid before state is filled + // in here because it will be used to cleanout the tnode if state is decided. + // What's more the end_log_ts is also be used during mt_ctx_.trans_end to + // backfill normal tnode. + if (has_persisted_log_() && OB_INVALID_TIMESTAMP == end_log_ts) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "end log ts is invalid when tx end", K(ret), KPC(this)); + // STEP2: We need check whether the commi_version is valid before state is + // filled in with commit here because it will be used to cleanout the tnode or + // lock for read if state is decided. What's more the commit_version is also + // be used during mt_ctx_.trans_end to backfill normal tnode.. + } else if (commit && ObTransVersion::INVALID_TRANS_VERSION == commit_version) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "commit version is invalid when tx end", K(ret), KPC(this)); + // STEP3: We need invoke mt_ctx_.trans_end before state is filled in here + // because we relay on the end_code_ in mt_ctx_ to report the suicide before + // the tnode can be cleanout by concurrent read. + } else if (OB_FAIL(mt_ctx_.trans_end(commit, commit_version, end_log_ts))) { + TRANS_LOG(WARN, "trans end error", KR(ret), K(commit), "context", *this); + // STEP4: We need set state in order to informing others of the final status + // of my txn. What you need pay attention to is that after this action, others + // can cleanout the unfinished txn state and see all your data. + // TODO: we can move set_state before mt_ctx_.trans_end for commit state in + // order to accelerate users to see the data state. + } else if (OB_FAIL(ctx_tx_data_.set_state(state))) { + TRANS_LOG(WARN, "set tx data state failed", K(ret), KPC(this)); + // STEP5: We need insert into the tx_data after all states are filled + } else if (has_persisted_log_() && OB_FAIL(ctx_tx_data_.insert_into_tx_table())) { + TRANS_LOG(WARN, "insert to tx table failed", KR(ret), KPC(this)); + } + + return ret; +} + int ObPartTransCtx::on_dist_end_(const bool commit) { int ret = OB_SUCCESS; int64_t start_us, end_us; start_us = end_us = 0; - const int64_t trans_version = commit ? ctx_tx_data_.get_commit_version() : -1; // Distributed transactions need to wait for the commit log majority successfully before // unlocking. If you want to know the reason, it is in the ::do_dist_commit - start_us = ObTimeUtility::fast_current_time(); - if (OB_FAIL(mt_ctx_.trans_end(commit, trans_version, ctx_tx_data_.get_end_log_ts()))) { - TRANS_LOG(WARN, "trans end error", KR(ret), K(commit), K(trans_version), "context", *this); + + if (OB_FAIL(tx_end_(commit))) { + TRANS_LOG(WARN, "trans end error", KR(ret), K(commit), "context", *this); } else if (FALSE_IT(end_us = ObTimeUtility::fast_current_time())) { } else if (commit) { // reset the early lock release stat after the txn commits @@ -1758,17 +1822,10 @@ int ObPartTransCtx::on_success_ops_(ObTxLogCb *log_cb) if (is_local_tx_()) { if (OB_FAIL(ctx_tx_data_.set_end_log_ts(log_ts))) { TRANS_LOG(WARN, "set end log ts failed", K(ret)); - } else if (OB_FAIL(ctx_tx_data_.set_state(ObTxData::COMMIT))) { - TRANS_LOG(WARN, "set tx data state failed", K(ret)); } else { tg.click(); - if (OB_FAIL(ctx_tx_data_.insert_into_tx_table())) { - TRANS_LOG(WARN, "insert to tx table failed", KR(ret), K(*this)); - } else { - tg.click(); - if (OB_FAIL(on_local_commit_tx_())) { - TRANS_LOG(WARN, "on local commit failed", KR(ret), K(*this)); - } + if (OB_FAIL(on_local_commit_tx_())) { + TRANS_LOG(WARN, "on local commit failed", KR(ret), K(*this)); } } } else { @@ -1776,10 +1833,6 @@ int ObPartTransCtx::on_success_ops_(ObTxLogCb *log_cb) const NotifyType type = NotifyType::ON_COMMIT; if (OB_FAIL(ctx_tx_data_.set_end_log_ts(log_ts))) { TRANS_LOG(WARN, "set end log ts failed", K(ret)); - } else if (OB_FAIL(ctx_tx_data_.set_state(ObTxData::COMMIT))) { - TRANS_LOG(WARN, "set tx data state failed", K(ret)); - } else if (OB_FAIL(ctx_tx_data_.insert_into_tx_table())) { - TRANS_LOG(WARN, "insert to tx table failed", KR(ret), K(*this)); } else if (OB_FAIL(notify_data_source_(type, log_ts, false, exec_info_.multi_data_source_))) { TRANS_LOG(WARN, "notify data source failed", KR(ret), K(*this)); } @@ -1801,10 +1854,6 @@ int ObPartTransCtx::on_success_ops_(ObTxLogCb *log_cb) if (is_local_tx_() || sub_state_.is_force_abort()) { if (OB_FAIL(ctx_tx_data_.set_end_log_ts(log_ts))) { TRANS_LOG(WARN, "set end log ts failed", K(ret)); - } else if (OB_FAIL(ctx_tx_data_.set_state(ObTxData::ABORT))) { - TRANS_LOG(WARN, "set tx data state failed", K(ret)); - } else if (OB_FAIL(ctx_tx_data_.insert_into_tx_table())) { - TRANS_LOG(WARN, "insert to tx table failed", KR(ret), K(*this)); } else if (OB_FAIL(on_local_abort_tx_())) { TRANS_LOG(WARN, "on local abort failed", KR(ret), K(*this)); } @@ -1814,10 +1863,6 @@ int ObPartTransCtx::on_success_ops_(ObTxLogCb *log_cb) tmp_array.reset(); if (OB_FAIL(ctx_tx_data_.set_end_log_ts(log_ts))) { TRANS_LOG(WARN, "set end log ts failed", K(ret)); - } else if (OB_FAIL(ctx_tx_data_.set_state(ObTxData::ABORT))) { - TRANS_LOG(WARN, "set tx data state failed", K(ret)); - } else if (OB_FAIL(ctx_tx_data_.insert_into_tx_table())) { - TRANS_LOG(WARN, "insert to tx table failed", KR(ret), K(*this)); } else if (OB_FAIL(gen_total_mds_array_(tmp_array))) { TRANS_LOG(WARN, "gen total mds array failed", K(ret)); } else if (OB_FAIL(notify_data_source_(type, log_ts, false, tmp_array))) { @@ -3536,7 +3581,10 @@ int ObPartTransCtx::replay_update_tx_data_(const bool commit, const int64_t commit_version) { int ret = OB_SUCCESS; - if (commit) { + + if (OB_FAIL(ctx_tx_data_.set_end_log_ts(log_ts))) { + TRANS_LOG(WARN, "set end log ts failed", K(ret)); + } else if (commit) { if (TransType::SP_TRANS == exec_info_.trans_type_ && ObTransVersion::INVALID_TRANS_VERSION == commit_version) { if (OB_FAIL(ctx_tx_data_.set_commit_version(log_ts))) { @@ -3560,10 +3608,7 @@ int ObPartTransCtx::replay_update_tx_data_(const bool commit, TRANS_LOG(WARN, "set tx data state failed", K(ret)); } } - if (OB_FAIL(ret)) { - } else if (OB_FAIL(ctx_tx_data_.set_end_log_ts(log_ts))) { - TRANS_LOG(WARN, "set end log ts failed", K(ret)); - } + return ret; } @@ -5942,8 +5987,7 @@ int ObPartTransCtx::on_local_commit_tx_() TRANS_LOG(WARN, "wait gts elapse commit version failed", KR(ret), KPC(this)); } else if (FALSE_IT(tg.click())) { } else if (FALSE_IT(start_us = ObTimeUtility::fast_current_time())) { - } else if (OB_FAIL(mt_ctx_.trans_end(true, ctx_tx_data_.get_commit_version(), - ctx_tx_data_.get_end_log_ts()))) { + } else if (OB_FAIL(tx_end_(true /*commit*/))) { TRANS_LOG(WARN, "trans end error", KR(ret), "context", *this); } else if (FALSE_IT(end_us = ObTimeUtility::fast_current_time())) { } else if (FALSE_IT(elr_handler_.reset_elr_state())) { @@ -5991,11 +6035,9 @@ int ObPartTransCtx::on_local_abort_tx_() ObTxBufferNodeArray tmp_array; - if (!has_persisted_log_() && OB_FAIL(ctx_tx_data_.set_state(ObTxData::ABORT))) { - TRANS_LOG(WARN, "set abort state failed", K(ret)); - } else if (OB_FALSE_IT(start_us = ObTimeUtility::fast_current_time())) { + if (OB_FALSE_IT(start_us = ObTimeUtility::fast_current_time())) { - } else if (OB_FAIL(mt_ctx_.trans_end(false, -1 /*unused*/, ctx_tx_data_.get_end_log_ts()))) { + } else if (OB_FAIL(tx_end_(false /*commit*/))) { TRANS_LOG(WARN, "trans end error", KR(ret), K(commit_version), "context", *this); } else if (FALSE_IT(end_us = ObTimeUtility::fast_current_time())) { diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index 039670946eb8e7ed507b49f123ed06cca7967f94..9a674aa7413fb08dec8faa8ee313cd076578f808 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -245,6 +245,7 @@ private: void set_prev_record_lsn_(const LogOffSet &prev_record_lsn); int trans_clear_(); int trans_kill_(); + int tx_end_(const bool commit); int trans_replay_commit_(const int64_t commit_version, const int64_t final_log_ts, const uint64_t log_cluster_version, diff --git a/src/storage/tx/ob_tx_2pc_ctx_impl.cpp b/src/storage/tx/ob_tx_2pc_ctx_impl.cpp index 0246d237ff5348f29e4ebf2daa7915244ef08f2e..6f4b445064879357695e9e039d5ad704ade181a4 100644 --- a/src/storage/tx/ob_tx_2pc_ctx_impl.cpp +++ b/src/storage/tx/ob_tx_2pc_ctx_impl.cpp @@ -246,8 +246,7 @@ int ObPartTransCtx::on_commit() if (is_local_tx_()) { // TODO: fill it for sp commit } else { - if (OB_FAIL(on_dist_end_(true /*commit*/ - ))) { + if (OB_FAIL(on_dist_end_(true /*commit*/))) { TRANS_LOG(WARN, "transaciton end error", KR(ret), "context", *this); } }