提交 58bf3c6c 编写于 作者: H Handora 提交者: wangzelin.wzl

[BUG] read my data even aborted

上级 14898f12
......@@ -366,13 +366,14 @@ int ObPartTransCtx::trans_kill_()
int ret = OB_SUCCESS;
TRANS_LOG(INFO, "trans killed", K(trans_id_));
mt_ctx_.trans_kill();
if (ctx_tx_data_.get_state() == ObTxData::RUNNING) {
if (OB_FAIL(ctx_tx_data_.set_state(ObTxData::ABORT))) {
TRANS_LOG(WARN, "set abort state in ctx_tx_data_ failed", K(ret));
}
}
mt_ctx_.trans_kill();
return ret;
}
......@@ -1540,19 +1541,82 @@ int ObPartTransCtx::update_max_commit_version_()
return ret;
}
// Unified interface for normal transaction end(both commit and abort). We We
// want to integrate the following five things that all txn commits should do.
//
// 1.end_log_ts: We set end_log_ts during final log state is synced which must
// have been done, so we check the validation of end_log_ts here(Maybe set it in
// this function is better?)
// 2.commit_version: We set commit version after submit the commit log for local
// tx and during the do_prepare for dist tx which must have been done, so we
// check the validation of commit_version here(Maybe set it in this function is
// better?)
// 3.mt_ctx.tx_end: We need callback all txn ops for all data in txn after final
// state is synced. It must be called for all txns to clean and release its data
// resource.
// 4.set_state: We need set state after final state is synced. It tells others
// that all data for this txn is decided and visible.
// 5.insert_tx_data: We need insert into tx_data in order to cleanot data which
// need be delay cleanout
//
// NB: You need pay much attention to the order of the following steps
// TODO: Integrate trans_kill and trans_replay_end into the same function
int ObPartTransCtx::tx_end_(const bool commit)
{
int ret = OB_SUCCESS;
// NB: The order of the following steps is critical
// We need first set end_code_ for the s
int32_t state = commit ? ObTxData::COMMIT : ObTxData::ABORT;
int64_t commit_version = ctx_tx_data_.get_commit_version();
int64_t end_log_ts = ctx_tx_data_.get_end_log_ts();
// STEP1: We need check whether the end_log_ts is valid before state is filled
// in here because it will be used to cleanout the tnode if state is decided.
// What's more the end_log_ts is also be used during mt_ctx_.trans_end to
// backfill normal tnode.
if (has_persisted_log_() && OB_INVALID_TIMESTAMP == end_log_ts) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "end log ts is invalid when tx end", K(ret), KPC(this));
// STEP2: We need check whether the commi_version is valid before state is
// filled in with commit here because it will be used to cleanout the tnode or
// lock for read if state is decided. What's more the commit_version is also
// be used during mt_ctx_.trans_end to backfill normal tnode..
} else if (commit && ObTransVersion::INVALID_TRANS_VERSION == commit_version) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "commit version is invalid when tx end", K(ret), KPC(this));
// STEP3: We need invoke mt_ctx_.trans_end before state is filled in here
// because we relay on the end_code_ in mt_ctx_ to report the suicide before
// the tnode can be cleanout by concurrent read.
} else if (OB_FAIL(mt_ctx_.trans_end(commit, commit_version, end_log_ts))) {
TRANS_LOG(WARN, "trans end error", KR(ret), K(commit), "context", *this);
// STEP4: We need set state in order to informing others of the final status
// of my txn. What you need pay attention to is that after this action, others
// can cleanout the unfinished txn state and see all your data.
// TODO: we can move set_state before mt_ctx_.trans_end for commit state in
// order to accelerate users to see the data state.
} else if (OB_FAIL(ctx_tx_data_.set_state(state))) {
TRANS_LOG(WARN, "set tx data state failed", K(ret), KPC(this));
// STEP5: We need insert into the tx_data after all states are filled
} else if (has_persisted_log_() && OB_FAIL(ctx_tx_data_.insert_into_tx_table())) {
TRANS_LOG(WARN, "insert to tx table failed", KR(ret), KPC(this));
}
return ret;
}
int ObPartTransCtx::on_dist_end_(const bool commit)
{
int ret = OB_SUCCESS;
int64_t start_us, end_us;
start_us = end_us = 0;
const int64_t trans_version = commit ? ctx_tx_data_.get_commit_version() : -1;
// Distributed transactions need to wait for the commit log majority successfully before
// unlocking. If you want to know the reason, it is in the ::do_dist_commit
start_us = ObTimeUtility::fast_current_time();
if (OB_FAIL(mt_ctx_.trans_end(commit, trans_version, ctx_tx_data_.get_end_log_ts()))) {
TRANS_LOG(WARN, "trans end error", KR(ret), K(commit), K(trans_version), "context", *this);
if (OB_FAIL(tx_end_(commit))) {
TRANS_LOG(WARN, "trans end error", KR(ret), K(commit), "context", *this);
} else if (FALSE_IT(end_us = ObTimeUtility::fast_current_time())) {
} else if (commit) {
// reset the early lock release stat after the txn commits
......@@ -1758,17 +1822,10 @@ int ObPartTransCtx::on_success_ops_(ObTxLogCb *log_cb)
if (is_local_tx_()) {
if (OB_FAIL(ctx_tx_data_.set_end_log_ts(log_ts))) {
TRANS_LOG(WARN, "set end log ts failed", K(ret));
} else if (OB_FAIL(ctx_tx_data_.set_state(ObTxData::COMMIT))) {
TRANS_LOG(WARN, "set tx data state failed", K(ret));
} else {
tg.click();
if (OB_FAIL(ctx_tx_data_.insert_into_tx_table())) {
TRANS_LOG(WARN, "insert to tx table failed", KR(ret), K(*this));
} else {
tg.click();
if (OB_FAIL(on_local_commit_tx_())) {
TRANS_LOG(WARN, "on local commit failed", KR(ret), K(*this));
}
if (OB_FAIL(on_local_commit_tx_())) {
TRANS_LOG(WARN, "on local commit failed", KR(ret), K(*this));
}
}
} else {
......@@ -1776,10 +1833,6 @@ int ObPartTransCtx::on_success_ops_(ObTxLogCb *log_cb)
const NotifyType type = NotifyType::ON_COMMIT;
if (OB_FAIL(ctx_tx_data_.set_end_log_ts(log_ts))) {
TRANS_LOG(WARN, "set end log ts failed", K(ret));
} else if (OB_FAIL(ctx_tx_data_.set_state(ObTxData::COMMIT))) {
TRANS_LOG(WARN, "set tx data state failed", K(ret));
} else if (OB_FAIL(ctx_tx_data_.insert_into_tx_table())) {
TRANS_LOG(WARN, "insert to tx table failed", KR(ret), K(*this));
} else if (OB_FAIL(notify_data_source_(type, log_ts, false, exec_info_.multi_data_source_))) {
TRANS_LOG(WARN, "notify data source failed", KR(ret), K(*this));
}
......@@ -1801,10 +1854,6 @@ int ObPartTransCtx::on_success_ops_(ObTxLogCb *log_cb)
if (is_local_tx_() || sub_state_.is_force_abort()) {
if (OB_FAIL(ctx_tx_data_.set_end_log_ts(log_ts))) {
TRANS_LOG(WARN, "set end log ts failed", K(ret));
} else if (OB_FAIL(ctx_tx_data_.set_state(ObTxData::ABORT))) {
TRANS_LOG(WARN, "set tx data state failed", K(ret));
} else if (OB_FAIL(ctx_tx_data_.insert_into_tx_table())) {
TRANS_LOG(WARN, "insert to tx table failed", KR(ret), K(*this));
} else if (OB_FAIL(on_local_abort_tx_())) {
TRANS_LOG(WARN, "on local abort failed", KR(ret), K(*this));
}
......@@ -1814,10 +1863,6 @@ int ObPartTransCtx::on_success_ops_(ObTxLogCb *log_cb)
tmp_array.reset();
if (OB_FAIL(ctx_tx_data_.set_end_log_ts(log_ts))) {
TRANS_LOG(WARN, "set end log ts failed", K(ret));
} else if (OB_FAIL(ctx_tx_data_.set_state(ObTxData::ABORT))) {
TRANS_LOG(WARN, "set tx data state failed", K(ret));
} else if (OB_FAIL(ctx_tx_data_.insert_into_tx_table())) {
TRANS_LOG(WARN, "insert to tx table failed", KR(ret), K(*this));
} else if (OB_FAIL(gen_total_mds_array_(tmp_array))) {
TRANS_LOG(WARN, "gen total mds array failed", K(ret));
} else if (OB_FAIL(notify_data_source_(type, log_ts, false, tmp_array))) {
......@@ -3536,7 +3581,10 @@ int ObPartTransCtx::replay_update_tx_data_(const bool commit,
const int64_t commit_version)
{
int ret = OB_SUCCESS;
if (commit) {
if (OB_FAIL(ctx_tx_data_.set_end_log_ts(log_ts))) {
TRANS_LOG(WARN, "set end log ts failed", K(ret));
} else if (commit) {
if (TransType::SP_TRANS == exec_info_.trans_type_
&& ObTransVersion::INVALID_TRANS_VERSION == commit_version) {
if (OB_FAIL(ctx_tx_data_.set_commit_version(log_ts))) {
......@@ -3560,10 +3608,7 @@ int ObPartTransCtx::replay_update_tx_data_(const bool commit,
TRANS_LOG(WARN, "set tx data state failed", K(ret));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ctx_tx_data_.set_end_log_ts(log_ts))) {
TRANS_LOG(WARN, "set end log ts failed", K(ret));
}
return ret;
}
......@@ -5942,8 +5987,7 @@ int ObPartTransCtx::on_local_commit_tx_()
TRANS_LOG(WARN, "wait gts elapse commit version failed", KR(ret), KPC(this));
} else if (FALSE_IT(tg.click())) {
} else if (FALSE_IT(start_us = ObTimeUtility::fast_current_time())) {
} else if (OB_FAIL(mt_ctx_.trans_end(true, ctx_tx_data_.get_commit_version(),
ctx_tx_data_.get_end_log_ts()))) {
} else if (OB_FAIL(tx_end_(true /*commit*/))) {
TRANS_LOG(WARN, "trans end error", KR(ret), "context", *this);
} else if (FALSE_IT(end_us = ObTimeUtility::fast_current_time())) {
} else if (FALSE_IT(elr_handler_.reset_elr_state())) {
......@@ -5991,11 +6035,9 @@ int ObPartTransCtx::on_local_abort_tx_()
ObTxBufferNodeArray tmp_array;
if (!has_persisted_log_() && OB_FAIL(ctx_tx_data_.set_state(ObTxData::ABORT))) {
TRANS_LOG(WARN, "set abort state failed", K(ret));
} else if (OB_FALSE_IT(start_us = ObTimeUtility::fast_current_time())) {
if (OB_FALSE_IT(start_us = ObTimeUtility::fast_current_time())) {
} else if (OB_FAIL(mt_ctx_.trans_end(false, -1 /*unused*/, ctx_tx_data_.get_end_log_ts()))) {
} else if (OB_FAIL(tx_end_(false /*commit*/))) {
TRANS_LOG(WARN, "trans end error", KR(ret), K(commit_version), "context", *this);
} else if (FALSE_IT(end_us = ObTimeUtility::fast_current_time())) {
......
......@@ -245,6 +245,7 @@ private:
void set_prev_record_lsn_(const LogOffSet &prev_record_lsn);
int trans_clear_();
int trans_kill_();
int tx_end_(const bool commit);
int trans_replay_commit_(const int64_t commit_version,
const int64_t final_log_ts,
const uint64_t log_cluster_version,
......
......@@ -246,8 +246,7 @@ int ObPartTransCtx::on_commit()
if (is_local_tx_()) {
// TODO: fill it for sp commit
} else {
if (OB_FAIL(on_dist_end_(true /*commit*/
))) {
if (OB_FAIL(on_dist_end_(true /*commit*/))) {
TRANS_LOG(WARN, "transaciton end error", KR(ret), "context", *this);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册