提交 fbc39031 编写于 作者: O obdev 提交者: ob-robot

try to notify abort in forcing kill tx

上级 5b942b3e
......@@ -266,13 +266,13 @@ int ObCtxTxData::set_commit_version(const SCN &commit_version)
int ObCtxTxData::set_start_log_ts(const SCN &start_ts)
{
int ret = OB_SUCCESS;
const SCN tmp_start_ts = (start_ts.is_valid() ? start_ts : SCN::max_scn());
// const SCN tmp_start_ts = (start_ts.is_valid() ? start_ts : SCN::max_scn());
RLockGuard guard(lock_);
if (OB_FAIL(check_tx_data_writable_())) {
TRANS_LOG(WARN, "tx data is not writeable", K(ret), K(*this));
} else {
tx_data_guard_.tx_data()->start_scn_.atomic_store(tmp_start_ts);
tx_data_guard_.tx_data()->start_scn_.atomic_store(start_ts);
}
return ret;
......@@ -311,9 +311,9 @@ const SCN ObCtxTxData::get_start_log_ts() const
RLockGuard guard(lock_);
const ObTxData *tx_data = tx_data_guard_.tx_data();
SCN ctx_scn = (NULL != tx_data ? tx_data->start_scn_.atomic_load() : tx_commit_data_.start_scn_.atomic_load());
if (ctx_scn.is_max()) {
ctx_scn.reset();
}
// if (ctx_scn.is_max()) {
// ctx_scn.reset();
// }
return ctx_scn;
}
......@@ -322,9 +322,9 @@ const SCN ObCtxTxData::get_end_log_ts() const
RLockGuard guard(lock_);
const ObTxData *tx_data = tx_data_guard_.tx_data();
SCN ctx_scn = (NULL != tx_data ? tx_data->end_scn_.atomic_load() : tx_commit_data_.end_scn_.atomic_load());
if (ctx_scn.is_max()) {
ctx_scn.reset();
}
// if (ctx_scn.is_max()) {
// ctx_scn.reset();
// }
return ctx_scn;
}
......
......@@ -1683,13 +1683,31 @@ struct ObMulSourceDataNotifyArg
bool redo_submitted_;
bool redo_synced_;
// force kill trans without abort scn
bool is_force_kill_;
ObMulSourceDataNotifyArg() { reset(); }
void reset()
{
tx_id_.reset();
scn_.reset();
trans_version_.reset();
for_replay_ = false;
notify_type_ = NotifyType::ON_ABORT;
redo_submitted_ = false;
redo_synced_ = false;
is_force_kill_ = false;
}
TO_STRING_KV(K_(tx_id),
K_(scn),
K_(trans_version),
K_(for_replay),
K_(notify_type),
K_(redo_submitted),
K_(redo_synced));
K_(redo_synced),
K_(is_force_kill));
// The redo log of current buf_node has been submitted;
bool is_redo_submitted() const;
......
......@@ -5647,7 +5647,8 @@ int ObPartTransCtx::prepare_mul_data_source_tx_end_(bool is_commit)
int ObPartTransCtx::notify_data_source_(const NotifyType notify_type,
const SCN &log_ts,
const bool for_replay,
const ObTxBufferNodeArray &notify_array)
const ObTxBufferNodeArray &notify_array,
const bool is_force_kill)
{
int ret = OB_SUCCESS;
ObMulSourceDataNotifyArg arg;
......@@ -5656,6 +5657,7 @@ int ObPartTransCtx::notify_data_source_(const NotifyType notify_type,
arg.trans_version_ = ctx_tx_data_.get_commit_version();
arg.for_replay_ = for_replay;
arg.notify_type_ = notify_type;
arg.is_force_kill_ = is_force_kill;
int64_t total_time = 0;
......@@ -6432,18 +6434,27 @@ int ObPartTransCtx::do_force_kill_tx_()
{
int ret = OB_SUCCESS;
trans_kill_();
// Force kill cannot guarantee the consistency, so we just set end_log_ts
// to zero
end_log_ts_.set_min();
(void)trans_clear_();
if (OB_FAIL(unregister_timeout_task_())) {
TRANS_LOG(WARN, "unregister timer task error", KR(ret), "context", *this);
}
// Ignore ret
set_exiting_();
TRANS_LOG(INFO, "transaction killed success", "context", *this);
ObTxBufferNodeArray tmp_array;
if (OB_FAIL(gen_total_mds_array_(tmp_array))) {
TRANS_LOG(WARN, "gen total mds array failed", KR(ret), K(*this));
// } else if (OB_FAIL(notify_data_source_(NotifyType::ON_ABORT,
// ctx_tx_data_.get_end_log_ts() /*invalid_scn*/, false,
// tmp_array, true /*is_force_kill*/))) {
// TRANS_LOG(WARN, "notify data source failed", KR(ret), K(*this));
} else {
trans_kill_();
// Force kill cannot guarantee the consistency, so we just set end_log_ts
// to zero
end_log_ts_.set_min();
(void)trans_clear_();
if (OB_FAIL(unregister_timeout_task_())) {
TRANS_LOG(WARN, "unregister timer task error", KR(ret), "context", *this);
}
// Ignore ret
set_exiting_();
TRANS_LOG(INFO, "transaction killed success", "context", *this);
}
return ret;
}
......
......@@ -497,7 +497,8 @@ private:
int notify_data_source_(const NotifyType type,
const share::SCN &log_ts,
const bool for_replay,
const ObTxBufferNodeArray &notify_array);
const ObTxBufferNodeArray &notify_array,
const bool is_force_kill = false);
int gen_final_mds_array_(ObTxBufferNodeArray &array, bool is_committing = true) const;
int gen_total_mds_array_(ObTxBufferNodeArray &mds_array) const;
int deep_copy_mds_array(const ObTxBufferNodeArray &mds_array, bool need_replace = false);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册