提交 3fc39b53 编写于 作者: O obdev 提交者: wangzelin.wzl

[4.0] fix savepoint rollback logic

上级 54cdbeee
......@@ -177,7 +177,7 @@ int ObCtxTxData::deep_copy_tx_data_out(ObTxData *&tmp_tx_data)
TRANS_LOG(WARN, "deep copy tx data failed", K(ret), KPC(tmp_tx_data), K(*this));
} else if (OB_ISNULL(tmp_tx_data)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "copied tmp tx data is null", K(ret), K(*this));
TRANS_LOG(ERROR, "copied tmp tx data is null", KR(ret), K(*this));
}
}
......@@ -352,39 +352,67 @@ ObTransID ObCtxTxData::get_tx_id() const
return (NULL != tx_data_ ? tx_data_->tx_id_ : tx_commit_data_.tx_id_);
}
int ObCtxTxData::prep_add_undo_action(ObUndoAction &undo_action,
ObTxData *&tmp_ctx_tx_data,
ObTxData *&tmp_tx_data_table_tx_data) {
int ObCtxTxData::prepare_add_undo_action(ObUndoAction &undo_action,
storage::ObTxData *&tmp_tx_data,
storage::ObUndoStatusNode *&tmp_undo_status)
{
int ret = OB_SUCCESS;
RLockGuard guard(lock_);
/*
* alloc undo_status_node used on commit stage
* alloc tx_data and add undo_action to it, which will be inserted
* into tx_data_table after RollbackSavepoint log sync success
*/
if (OB_FAIL(check_tx_data_writable_())) {
TRANS_LOG(WARN, "tx data is not writeable", K(ret), K(*this));
} else {
ObTxTable *tx_table = nullptr;
GET_TX_TABLE_(tx_table);
if (OB_FAIL(ret)) {
} else if (OB_FAIL(tx_table->deep_copy_tx_data(tx_data_, tmp_ctx_tx_data))) {
TRANS_LOG(WARN, "copy tx data fail", K(ret), KPC(this));
} else if (OB_ISNULL(tmp_ctx_tx_data)) {
} else if (OB_FAIL(tx_table->get_tx_data_table()->alloc_undo_status_node(tmp_undo_status))) {
TRANS_LOG(WARN, "alloc undo status fail", K(ret), KPC(this));
} else if (OB_ISNULL(tmp_undo_status)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unexpected copy null", KR(ret), KPC(this));
} else if (OB_FAIL(tmp_ctx_tx_data->add_undo_action(tx_table, undo_action))) {
TRANS_LOG(WARN, "add undo action failed", K(ret), K(undo_action), K(*this));
} else if (OB_FAIL(tx_table->deep_copy_tx_data(tmp_ctx_tx_data, tmp_tx_data_table_tx_data))) {
TRANS_LOG(ERROR, "undo status is null", KR(ret), KPC(this));
} else if (OB_FAIL(tx_table->deep_copy_tx_data(tx_data_, tmp_tx_data))) {
TRANS_LOG(WARN, "copy tx data fail", K(ret), KPC(this));
} else if (OB_ISNULL(tmp_tx_data_table_tx_data)) {
} else if (OB_ISNULL(tmp_tx_data)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unexpected copy null", KR(ret), KPC(this));
TRANS_LOG(ERROR, "copied tx_data is null", KR(ret), KPC(this));
} else if (OB_FAIL(tmp_tx_data->add_undo_action(tx_table, undo_action))) {
TRANS_LOG(WARN, "add undo action fail", K(ret), KPC(this));
}
if (OB_FAIL(ret)) {
if (tmp_ctx_tx_data) { tx_table->free_tx_data(tmp_ctx_tx_data); }
if (tmp_tx_data_table_tx_data) { tx_table->free_tx_data(tmp_tx_data_table_tx_data); }
if (tmp_undo_status) {
tx_table->get_tx_data_table()->free_undo_status_node(tmp_undo_status);
}
if (tmp_tx_data) {
tx_table->free_tx_data(tmp_tx_data);
}
}
}
return ret;
}
int ObCtxTxData::add_undo_action(ObUndoAction &undo_action)
int ObCtxTxData::cancel_add_undo_action(storage::ObTxData *tmp_tx_data, storage::ObUndoStatusNode *tmp_undo_status)
{
int ret = OB_SUCCESS;
ObTxTable *tx_table = nullptr;
GET_TX_TABLE_(tx_table);
if (OB_SUCC(ret)) {
tx_table->free_tx_data(tmp_tx_data);
ret = tx_table->get_tx_data_table()->free_undo_status_node(tmp_undo_status);
}
return ret;
}
int ObCtxTxData::commit_add_undo_action(ObUndoAction &undo_action, storage::ObUndoStatusNode &tmp_undo_status)
{
return add_undo_action(undo_action, &tmp_undo_status);
}
int ObCtxTxData::add_undo_action(ObUndoAction &undo_action, storage::ObUndoStatusNode *tmp_undo_status)
{
int ret = OB_SUCCESS;
RLockGuard guard(lock_);
......@@ -396,8 +424,8 @@ int ObCtxTxData::add_undo_action(ObUndoAction &undo_action)
GET_TX_TABLE_(tx_table);
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(tx_data_->add_undo_action(tx_table, undo_action))) {
TRANS_LOG(WARN, "add undo action failed", K(ret), K(undo_action), K(*this));
} else if (OB_FAIL(tx_data_->add_undo_action(tx_table, undo_action, tmp_undo_status))) {
TRANS_LOG(WARN, "add undo action failed", K(ret), K(undo_action), KP(tmp_undo_status), K(*this));
};
}
......
......@@ -57,10 +57,12 @@ public:
ObTransID get_tx_id() const;
int prep_add_undo_action(ObUndoAction &undo_action,
storage::ObTxData *&tmp_ctx_tx_data,
storage::ObTxData *&tmp_tx_data_table_tx_data);
int add_undo_action(ObUndoAction &undo_action);
int prepare_add_undo_action(ObUndoAction &undo_action,
storage::ObTxData *&tmp_tx_data,
storage::ObUndoStatusNode *&tmp_undo_status);
int cancel_add_undo_action(storage::ObTxData *tmp_tx_data, storage::ObUndoStatusNode *tmp_undo_status);
int commit_add_undo_action(ObUndoAction &undo_action, storage::ObUndoStatusNode &tmp_undo_status);
int add_undo_action(ObUndoAction &undo_action, storage::ObUndoStatusNode *tmp_undo_status = NULL);
int get_tx_commit_data(const storage::ObTxCommitData *&tx_commit_data) const;
......
......@@ -5590,7 +5590,6 @@ int ObPartTransCtx::rollback_to_savepoint_(const int64_t from_scn,
int ret = OB_SUCCESS;
// step 1: persistent 'UNDO' (if required)
/*
* Follower:
* 1. add UndoAction into tx_ctx's tx_data
......@@ -5607,38 +5606,28 @@ int ObPartTransCtx::rollback_to_savepoint_(const int64_t from_scn,
TRANS_LOG(WARN, "recrod undo info fail", K(ret), K(from_scn), K(to_scn), KPC(this));
} else if (OB_FAIL(ctx_tx_data_.deep_copy_tx_data_out(tmp_tx_data))) {
TRANS_LOG(WARN, "deep copy tx data failed", KR(ret), K(*this));
} else {
tmp_tx_data->end_log_ts_ = exec_info_.max_applying_log_ts_;
if (OB_FAIL(ctx_tx_data_.insert_tmp_tx_data(tmp_tx_data))) {
TRANS_LOG(WARN, "insert to tx table failed", KR(ret), K(*this));
} else {
tmp_tx_data = nullptr;
} else if (FALSE_IT(tmp_tx_data->end_log_ts_ = exec_info_.max_applying_log_ts_)) {
} else if (OB_FAIL(ctx_tx_data_.insert_tmp_tx_data(tmp_tx_data))) {
TRANS_LOG(WARN, "insert to tx table failed", KR(ret), K(*this));
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(ctx_tx_data_.free_tmp_tx_data(tmp_tx_data))) {
TRANS_LOG(WARN, "free tmp tx data failed", KR(tmp_ret), KPC(this));
}
}
} else if (OB_UNLIKELY(exec_info_.max_submitted_seq_no_ > to_scn)) { /* Leader */
ObUndoAction undo_action(from_scn, to_scn);
//
// In order to ensure UndoAction can be added to tx_ctx and tx_data_table
// successfully post submit log, prepare two temporary tx_data before
// submitting log
//
ObTxData *tmp_ctx_tx_data = nullptr, *tmp_tx_data_table_tx_data = nullptr;
if (OB_FAIL(ctx_tx_data_.prep_add_undo_action(undo_action, tmp_ctx_tx_data, tmp_tx_data_table_tx_data))) {
ObUndoStatusNode *undo_status = NULL;
ObTxData *tmp_tx_data = NULL;
if (ctx_tx_data_.prepare_add_undo_action(undo_action, tmp_tx_data, undo_status)) {
TRANS_LOG(WARN, "prepare add undo action fail", K(ret), KPC(this));
} else if (OB_FAIL(submit_rollback_to_log_(from_scn, to_scn, tmp_tx_data_table_tx_data))) {
// submit log
} else if (OB_FAIL(submit_rollback_to_log_(from_scn, to_scn, tmp_tx_data))) {
TRANS_LOG(WARN, "submit undo redolog fail", K(ret), K(from_scn), K(to_scn), KPC(this));
// release tmp resource
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(ctx_tx_data_.free_tmp_tx_data(tmp_ctx_tx_data))) {
TRANS_LOG(WARN, "free tmp tx data failed", KR(tmp_ret), KPC(this));
}
if (OB_TMP_FAIL(ctx_tx_data_.free_tmp_tx_data(tmp_tx_data_table_tx_data))) {
TRANS_LOG(WARN, "free tmp tx data failed", KR(tmp_ret), KPC(this));
if (OB_TMP_FAIL(ctx_tx_data_.cancel_add_undo_action(tmp_tx_data, undo_status))) {
TRANS_LOG(ERROR, "cancel add undo action failed", KR(tmp_ret), KPC(this));
}
} else if (OB_FAIL(ctx_tx_data_.replace_tx_data(tmp_ctx_tx_data))) {
// update tx_ctx's tx_data
TRANS_LOG(ERROR, "oops! replace tx data fail", KR(ret), KPC(this));
} else if (OB_FAIL(ctx_tx_data_.commit_add_undo_action(undo_action, *undo_status))) {
TRANS_LOG(ERROR, "oops, commit add undo action fail", K(ret), KPC(this));
ob_abort();
}
}
......
......@@ -159,6 +159,7 @@ int64_t ObUndoStatusList::get_serialize_size_() const
bool ObUndoStatusList::is_contain(const int64_t seq_no) const
{
bool bool_ret = false;
SpinRLockGuard guard(lock_);
ObUndoStatusNode *node_ptr = head_;
while (OB_NOT_NULL(node_ptr)) {
for (int i = 0; i < node_ptr->size_; i++) {
......@@ -423,11 +424,11 @@ bool ObTxData::is_valid_in_tx_data_table() const
return bool_ret;
}
int ObTxData::add_undo_action(ObTxTable *tx_table, transaction::ObUndoAction &new_undo_action)
int ObTxData::add_undo_action(ObTxTable *tx_table, transaction::ObUndoAction &new_undo_action, ObUndoStatusNode *undo_node)
{
// STORAGE_LOG(DEBUG, "do add_undo_action");
int ret = OB_SUCCESS;
ObByteLockGuard guard(undo_status_list_.lock_);
SpinWLockGuard guard(undo_status_list_.lock_);
ObTxDataTable *tx_data_table = nullptr;
ObUndoStatusNode *node = undo_status_list_.head_;
if (OB_ISNULL(tx_table)) {
......@@ -442,9 +443,13 @@ int ObTxData::add_undo_action(ObTxTable *tx_table, transaction::ObUndoAction &ne
if (OB_ISNULL(node) || node->size_ >= TX_DATA_UNDO_ACT_MAX_NUM_PER_NODE) {
// STORAGE_LOG(DEBUG, "generate new undo status node");
ObUndoStatusNode *new_node = nullptr;
if (OB_FAIL(tx_data_table->alloc_undo_status_node(new_node))) {
if (OB_NOT_NULL(undo_node)) {
new_node = undo_node;
undo_node = NULL;
} else if (OB_FAIL(tx_data_table->alloc_undo_status_node(new_node))) {
STORAGE_LOG(WARN, "alloc_undo_status_node() fail", KR(ret));
} else {
}
if (OB_SUCC(ret)) {
new_node->next_ = node;
undo_status_list_.head_ = new_node;
node = new_node;
......@@ -460,6 +465,10 @@ int ObTxData::add_undo_action(ObTxTable *tx_table, transaction::ObUndoAction &ne
}
}
if (OB_NOT_NULL(undo_node)) {
tx_data_table->free_undo_status_node(undo_node);
}
return ret;
}
......
......@@ -177,7 +177,6 @@ public:
{
head_ = nullptr;
undo_node_cnt_ = 0;
lock_.unlock();
}
private:
......@@ -188,7 +187,7 @@ private:
public:
ObUndoStatusNode *head_;
int32_t undo_node_cnt_;
common::ObByteLock lock_;
common::SpinRWLock lock_;
};
// TODO: Redefine it
......@@ -239,7 +238,6 @@ class ObTxData : public ObTxCommitData, public TxDataHashValue
friend TxDataHashMapAllocHandle;
private:
const static int64_t UNIS_VERSION = 1;
public:
ObTxData() { reset(); }
ObTxData(const ObTxData &rhs);
......@@ -253,8 +251,9 @@ public:
*
* @param[in] tx_table, the tx table contains this tx data
* @param[in & out] undo_action, the undo action which is waiting to be added. If this undo action contains exsiting undo actions, the existing undo actions will be deleted and this undo action will be modified to contain all the deleted undo actions.
* @param[in] undo_node, the undo status node can be used to extend undo status list if required, otherwise it will be released
*/
int add_undo_action(ObTxTable *tx_table, transaction::ObUndoAction &undo_action);
int add_undo_action(ObTxTable *tx_table, transaction::ObUndoAction &undo_action, ObUndoStatusNode *undo_node = nullptr);
/**
* @brief Check if this tx data is valid
*/
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册