提交 f5fa404b 编写于 作者: H Handora 提交者: wangzelin.wzl

[BUG] change from normal read to atomic read

上级 384903ce
......@@ -143,29 +143,39 @@ int ObMvccValueIterator::lock_for_read_inner_(const ObQueryFlag &flag,
// reader_tx_id.
const ObTransID &snapshot_tx_id = ctx_->snapshot_.tx_id_;
const ObTransID &reader_tx_id = ctx_->tx_id_;
const ObTransID &data_tx_id = iter->get_tx_id();
const int64_t data_seq_no = iter->get_seq_no();
const int64_t snapshot_seq_no = ctx_->snapshot_.scn_;
const int64_t snapshot_version = ctx_->snapshot_.version_;
const int64_t read_epoch = ctx_->get_tx_table_guard().epoch();
const bool read_latest = flag.is_read_latest();
ObTxTable *tx_table = ctx_->get_tx_table_guard().get_tx_table();
const bool read_latest = flag.is_read_latest();
const ObTransID &data_tx_id = iter->get_tx_id();
const int64_t data_seq_no = iter->get_seq_no();
// NB: We need pay much attention to the order of the reads to the different
// variables. Although we update the version before the state for the tnodes
// and read the state before the version. It may appear that the compiled code
// execution may rearrange its order and fail to obey its origin logic(You can
// read the Dependency Definiation of the ARM architecture book to understand
// it). So the synchronization primitive below is much important.
const bool is_committed = iter->is_committed();
const bool is_aborted = iter->is_aborted();
const bool is_elr = iter->is_elr();
const bool is_delayed_cleanout = iter->is_delayed_cleanout();
// Opt1: data is decided
if ((iter->is_committed() || iter->is_aborted() || iter->is_elr())
if ((is_committed || is_aborted || is_elr)
// Opt2: data is not decided while we donot need cleanout
|| (!iter->is_delayed_cleanout()
|| (!is_delayed_cleanout
&& (// Opt2.1: snapshot reads the data written by snapshot
data_tx_id == snapshot_tx_id ||
// Opt2.2: read reader's latest is matched
(read_latest && data_tx_id == reader_tx_id)))) {
// Case 1: Cleanout can be skipped
// because inner tx read only care whether tx node rollbacked
if (iter->is_committed() || iter->is_elr()) {
if (is_committed || is_elr) {
// Case 2: Data is committed, so the state is decided
const int64_t data_version = iter->trans_version_;
const int64_t data_version = ATOMIC_LOAD(&iter->trans_version_);
if (snapshot_version >= data_version) {
// Case 2.1 Read the version if it is smaller than read version
version_iter_ = iter;
......@@ -173,7 +183,7 @@ int ObMvccValueIterator::lock_for_read_inner_(const ObQueryFlag &flag,
// Case 2.2: Otherwise, skip to the next version
iter = iter->prev_;
}
} else if (iter->is_aborted()) {
} else if (is_aborted) {
// Case 3: Data is aborted, so the state is decided. So we skip aborted data
// version
iter = iter->prev_;
......
......@@ -206,13 +206,13 @@ bool ObMvccTransNode::is_delayed_cleanout() const
int ObMvccTransNode::fill_trans_version(const int64_t version)
{
trans_version_ = version;
ATOMIC_STORE(&trans_version_, version);
return OB_SUCCESS;
}
int ObMvccTransNode::fill_log_timestamp(const int64_t log_timestamp)
{
log_timestamp_ = log_timestamp;
ATOMIC_STORE(&log_timestamp_, log_timestamp);
return OB_SUCCESS;
}
......@@ -228,7 +228,6 @@ void ObMvccTransNode::trans_abort(const int64_t tx_end_log_ts)
{
set_aborted();
set_tx_end_log_ts(tx_end_log_ts);
}
void ObMvccTransNode::remove_callback()
......@@ -695,8 +694,8 @@ int ObMvccRow::insert_trans_node(ObIMvccCtx &ctx,
bool ObMvccRow::is_transaction_set_violation(const int64_t snapshot_version)
{
return max_trans_version_ > snapshot_version
|| max_elr_trans_version_ > snapshot_version;
return ATOMIC_LOAD(&max_trans_version_) > snapshot_version
|| ATOMIC_LOAD(&max_elr_trans_version_) > snapshot_version;
}
int ObMvccRow::elr(const ObTransID &tx_id,
......@@ -921,6 +920,7 @@ int ObMvccRow::mvcc_write_(ObIMemtableCtx &ctx,
// on the lock state of the node even the node is not delayed cleanout for
// read operation.(If you are intereted in it, read ObMvccRow::mvcc_write)
ObTransID data_tx_id = iter->get_tx_id();
if (iter->is_delayed_cleanout()
&& !(iter->is_committed() || iter->is_aborted())
&& OB_FAIL(tx_table->cleanout_tx_node(data_tx_id,
......@@ -1055,7 +1055,8 @@ int ObMvccRow::mvcc_write(ObIMemtableCtx &ctx,
int ret = OB_SUCCESS;
lock_begin(ctx);
if (max_trans_version_ > snapshot_version || max_elr_trans_version_ > snapshot_version) {
if (ATOMIC_LOAD(&max_trans_version_) > snapshot_version
|| ATOMIC_LOAD(&max_elr_trans_version_) > snapshot_version) {
// Case 3. successfully locked while tsc
ret = OB_TRANSACTION_SET_VIOLATION;
TRANS_LOG(WARN, "transaction set violation", K(ret),
......@@ -1067,7 +1068,8 @@ int ObMvccRow::mvcc_write(ObIMemtableCtx &ctx,
// Case1: Cannot insert because of write-write conflict
ret = OB_TRY_LOCK_ROW_CONFLICT;
TRANS_LOG(WARN, "mvcc write conflict", K(ret), K(ctx), K(node), K(res), K(*this));
} else if (max_trans_version_ > snapshot_version || max_elr_trans_version_ > snapshot_version) {
} else if (ATOMIC_LOAD(&max_trans_version_) > snapshot_version
|| ATOMIC_LOAD(&max_elr_trans_version_) > snapshot_version) {
// Case 3. successfully locked while tsc
ret = OB_TRANSACTION_SET_VIOLATION;
TRANS_LOG(WARN, "transaction set violation", K(ret), K(ctx), K(node), K(*this));
......
......@@ -105,13 +105,12 @@ public:
// ===================== ObMvccTransNode Flag Interface =====================
void set_committed();
bool is_committed() const { return flag_ & F_COMMITTED; }
bool is_locked() const { return flag_ & F_MUTEX; }
bool is_committed() const { return ATOMIC_LOAD(&flag_) & F_COMMITTED; }
void set_elr();
bool is_elr() const { return flag_ & F_ELR; }
bool is_elr() const { return ATOMIC_LOAD(&flag_) & F_ELR; }
void set_aborted();
void clear_aborted();
bool is_aborted() const { return (flag_ & F_ABORTED); }
bool is_aborted() const { return ATOMIC_LOAD(&flag_) & F_ABORTED; }
void set_delayed_cleanout(const bool delayed_cleanout);
bool is_delayed_cleanout() const;
......@@ -128,10 +127,10 @@ public:
void set_tx_end_log_ts(const int64_t tx_end_log_ts)
{
if (INT64_MAX != tx_end_log_ts) {
tx_end_log_ts_ = tx_end_log_ts;
ATOMIC_STORE(&tx_end_log_ts_, tx_end_log_ts);
}
}
int64_t get_tx_end_log_ts() { return tx_end_log_ts_; }
int64_t get_tx_end_log_ts() { return ATOMIC_LOAD(&tx_end_log_ts_); }
private:
static const uint8_t F_INIT;
......
......@@ -265,7 +265,7 @@ int ObCtxTxData::set_state(int32_t state)
if (OB_FAIL(check_tx_data_writable_())) {
TRANS_LOG(WARN, "tx data is not writeable", K(ret), K(*this));
} else {
tx_data_->state_ = state;
ATOMIC_STORE(&tx_data_->state_, state);
}
return ret;
......@@ -279,7 +279,7 @@ int ObCtxTxData::set_commit_version(int64_t commit_version)
if (OB_FAIL(check_tx_data_writable_())) {
TRANS_LOG(WARN, "tx data is not writeable", K(ret), K(*this));
} else {
tx_data_->commit_version_ = commit_version;
ATOMIC_STORE(&tx_data_->commit_version_, commit_version);
}
return ret;
......@@ -294,7 +294,7 @@ int ObCtxTxData::set_start_log_ts(int64_t start_ts)
if (OB_FAIL(check_tx_data_writable_())) {
TRANS_LOG(WARN, "tx data is not writeable", K(ret), K(*this));
} else {
tx_data_->start_log_ts_ = tmp_start_ts;
ATOMIC_STORE(&tx_data_->start_log_ts_, tmp_start_ts);
}
return ret;
......@@ -308,7 +308,7 @@ int ObCtxTxData::set_end_log_ts(int64_t end_ts)
if (OB_FAIL(check_tx_data_writable_())) {
TRANS_LOG(WARN, "tx data is not writeable", K(ret), K(*this));
} else {
tx_data_->end_log_ts_ = end_ts;
ATOMIC_STORE(&tx_data_->end_log_ts_, end_ts);
}
return ret;
......@@ -317,19 +317,19 @@ int ObCtxTxData::set_end_log_ts(int64_t end_ts)
int32_t ObCtxTxData::get_state() const
{
RLockGuard guard(lock_);
return (NULL != tx_data_ ? tx_data_->state_: tx_commit_data_.state_);
return (NULL != tx_data_ ? ATOMIC_LOAD(&tx_data_->state_): ATOMIC_LOAD(&tx_commit_data_.state_));
}
int64_t ObCtxTxData::get_commit_version() const
{
RLockGuard guard(lock_);
return (NULL != tx_data_ ? tx_data_->commit_version_ : tx_commit_data_.commit_version_);
return (NULL != tx_data_ ? ATOMIC_LOAD(&tx_data_->commit_version_) : ATOMIC_LOAD(&tx_commit_data_.commit_version_));
}
int64_t ObCtxTxData::get_start_log_ts() const
{
RLockGuard guard(lock_);
int64_t ctx_log_ts = (NULL != tx_data_ ? tx_data_->start_log_ts_ : tx_commit_data_.start_log_ts_);
int64_t ctx_log_ts = (NULL != tx_data_ ? ATOMIC_LOAD(&tx_data_->start_log_ts_) : ATOMIC_LOAD(&tx_commit_data_.start_log_ts_));
if (INT64_MAX == ctx_log_ts) {
ctx_log_ts = OB_INVALID_TIMESTAMP;
}
......@@ -339,7 +339,7 @@ int64_t ObCtxTxData::get_start_log_ts() const
int64_t ObCtxTxData::get_end_log_ts() const
{
RLockGuard guard(lock_);
int64_t ctx_log_ts = (NULL != tx_data_ ? tx_data_->end_log_ts_ : tx_commit_data_.end_log_ts_);
int64_t ctx_log_ts = (NULL != tx_data_ ? ATOMIC_LOAD(&tx_data_->end_log_ts_) : ATOMIC_LOAD(&tx_commit_data_.end_log_ts_));
if (INT64_MAX == ctx_log_ts) {
ctx_log_ts = OB_INVALID_TIMESTAMP;
}
......
......@@ -62,9 +62,10 @@ namespace storage
int CheckSqlSequenceCanReadFunctor::operator() (const ObTxData &tx_data, ObTxCCCtx *tx_cc_ctx) {
UNUSED(tx_cc_ctx);
int ret = OB_SUCCESS;
const int32_t state = ATOMIC_LOAD(&tx_data.state_);
// NB: The functor is only used during minor merge
if (ObTxData::ABORT == tx_data.state_) {
if (ObTxData::ABORT == state) {
// Case 1: data is aborted, so we donot need it during merge
can_read_ = false;
} else if (tx_data.undo_status_list_.is_contain(sql_sequence_)) {
......@@ -82,13 +83,15 @@ int CheckRowLockedFunctor::operator() (const ObTxData &tx_data, ObTxCCCtx *tx_cc
{
UNUSED(tx_cc_ctx);
int ret = OB_SUCCESS;
const int32_t state = ATOMIC_LOAD(&tx_data.state_);
const int64_t commit_version = ATOMIC_LOAD(&tx_data.commit_version_);
switch (tx_data.state_) {
switch (state) {
case ObTxData::COMMIT: {
// Case 1: data is committed, so the lock is locked by the data and we
// also need return the commit version for tsc check
lock_state_.is_locked_ = false;
lock_state_.trans_version_ = tx_data.commit_version_;
lock_state_.trans_version_ = commit_version;
break;
}
case ObTxData::RUNNING: {
......@@ -132,26 +135,29 @@ int GetTxStateWithLogTSFunctor::operator()(const ObTxData &tx_data, ObTxCCCtx *t
{
UNUSED(tx_cc_ctx);
int ret = OB_SUCCESS;
const int32_t state = ATOMIC_LOAD(&tx_data.state_);
const int64_t commit_version = ATOMIC_LOAD(&tx_data.commit_version_);
const int64_t end_log_ts = ATOMIC_LOAD(&tx_data.end_log_ts_);
// return the transaction state_ according to the merge log ts.
// the detailed document is available as follows.
// https://yuque.antfin-inc.com/docs/share/a3160d5e-6e1a-4980-a12e-4af653c6cf57?#
if (ObTxData::RUNNING == tx_data.state_) {
if (ObTxData::RUNNING == state) {
// Case 1: data is during execution, so we return the running state with
// INT64_MAX as version
state_ = ObTxData::RUNNING;
trans_version_ = INT64_MAX;
} else if (log_ts_ < tx_data.end_log_ts_) {
} else if (log_ts_ < end_log_ts) {
// Case 2: data is decided while the required state is before the merge log
// ts, so we return the running state with INT64_MAX as txn version
state_ = ObTxData::RUNNING;
trans_version_ = INT64_MAX;
} else if (ObTxData::COMMIT == tx_data.state_) {
} else if (ObTxData::COMMIT == state) {
// Case 3: data is committed and the required state is after the merge log
// ts, so we return the commit state with commit version as txn version
state_ = ObTxData::COMMIT;
trans_version_ = tx_data.commit_version_;
} else if (ObTxData::ABORT == tx_data.state_) {
trans_version_ = commit_version;
} else if (ObTxData::ABORT == state) {
// Case 4: data is aborted and the required state is after the merge log
// ts, so we return the abort state with 0 as txn version
state_ = ObTxData::ABORT;
......@@ -168,31 +174,42 @@ int GetTxStateWithLogTSFunctor::operator()(const ObTxData &tx_data, ObTxCCCtx *t
int LockForReadFunctor::inner_lock_for_read(const ObTxData &tx_data, ObTxCCCtx *tx_cc_ctx)
{
int ret = OB_SUCCESS;
const transaction::ObTxSnapshot &snapshot = lock_for_read_arg_.mvcc_acc_ctx_.snapshot_;
const int64_t snapshot_version = snapshot.version_;
const transaction::ObTransID snapshot_tx_id = snapshot.tx_id_;
const int64_t snapshot_sql_sequence = snapshot.scn_;
const transaction::ObTransID data_tx_id = lock_for_read_arg_.data_trans_id_;
const int64_t data_sql_sequence = lock_for_read_arg_.data_sql_sequence_;
const bool read_latest = lock_for_read_arg_.read_latest_;
const transaction::ObTransID reader_tx_id = lock_for_read_arg_.mvcc_acc_ctx_.tx_id_;
// NB: We need pay much attention to the order of the reads to the different
// variables. Although we update the version before the state for the tnodes
// and read the state before the version. It may appear that the compiled code
// execution may rearrange its order and fail to obey its origin logic(You can
// read the Dependency Definiation of the ARM architecture book to understand
// it). So the synchronization primitive below is much important.
const int32_t state = ATOMIC_LOAD(&tx_data.state_);
const int64_t commit_version = ATOMIC_LOAD(&tx_data.commit_version_);
can_read_ = false;
trans_version_ = OB_INVALID_VERSION;
is_determined_state_ = false;
auto &snapshot = lock_for_read_arg_.mvcc_acc_ctx_.snapshot_;
auto snapshot_version = snapshot.version_;
auto snapshot_tx_id = snapshot.tx_id_;
auto data_tx_id = lock_for_read_arg_.data_trans_id_;
auto snapshot_sql_sequence = snapshot.scn_;
auto data_sql_sequence = lock_for_read_arg_.data_sql_sequence_;
bool read_latest = lock_for_read_arg_.read_latest_;
auto reader_tx_id = lock_for_read_arg_.mvcc_acc_ctx_.tx_id_;
switch (tx_data.state_) {
switch (state) {
case ObTxData::COMMIT: {
// Case 1: data is committed, so the state is decided and whether we can read
// depends on whether undo status contains the data. Then we return the commit
// version as data version.
can_read_ = !tx_data.undo_status_list_.is_contain(data_sql_sequence);
trans_version_ = tx_data.commit_version_;
trans_version_ = commit_version;
is_determined_state_ = true;
break;
}
case ObTxData::ELR_COMMIT: {
can_read_ = !tx_data.undo_status_list_.is_contain(data_sql_sequence);
trans_version_ = tx_data.commit_version_;
trans_version_ = commit_version;
is_determined_state_ = false;
break;
}
......@@ -287,7 +304,9 @@ int LockForReadFunctor::operator()(const ObTxData &tx_data, ObTxCCCtx *tx_cc_ctx
auto &acc_ctx = lock_for_read_arg_.mvcc_acc_ctx_;
auto lock_expire_ts = acc_ctx.eval_lock_expire_ts();
if (OB_ISNULL(tx_cc_ctx) && (ObTxData::RUNNING == tx_data.state_)) {
const int32_t state = ATOMIC_LOAD(&tx_data.state_);
if (OB_ISNULL(tx_cc_ctx) && (ObTxData::RUNNING == state)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "lock for read functor need prepare version.", KR(ret));
} else {
......@@ -348,8 +367,11 @@ bool ObReCheckNothingOperation::operator()()
int ObCleanoutTxNodeOperation::operator()(const ObTxData &tx_data, ObTxCCCtx *tx_cc_ctx)
{
int ret = OB_SUCCESS;
const int32_t state = ATOMIC_LOAD(&tx_data.state_);
const int64_t commit_version = ATOMIC_LOAD(&tx_data.commit_version_);
const int64_t end_log_ts = ATOMIC_LOAD(&tx_data.end_log_ts_);
if (ObTxData::RUNNING == tx_data.state_
if (ObTxData::RUNNING == state
&& !tx_data.undo_status_list_.is_contain(tnode_.seq_no_)
// NB: we need pay attention to the choice condition when issuing the
// lock_for_read, we cannot only treat state in exec_info as judgement
......@@ -373,27 +395,27 @@ int ObCleanoutTxNodeOperation::operator()(const ObTxData &tx_data, ObTxCCCtx *tx
if (OB_FAIL(value_.unlink_trans_node(tnode_))) {
TRANS_LOG(WARN, "mvcc trans ctx trans commit error", K(ret), K(value_), K(tnode_));
} else {
(void)tnode_.trans_abort(tx_data.end_log_ts_);
(void)tnode_.trans_abort(end_log_ts);
}
} else if (ObTxData::RUNNING == tx_data.state_) {
} else if (ObTxData::RUNNING == state) {
if (INT64_MAX != tx_cc_ctx->prepare_version_) {
// Case 3: data is prepared, we also donot write back the prepare state
}
} else if (ObTxData::COMMIT == tx_data.state_) {
} else if (ObTxData::COMMIT == state) {
// Case 4: data is committed, so we should write back the commit state
if (OB_FAIL(value_.trans_commit(tx_data.commit_version_, tnode_))) {
if (OB_FAIL(value_.trans_commit(commit_version, tnode_))) {
TRANS_LOG(WARN, "mvcc trans ctx trans commit error", K(ret), K(value_), K(tnode_));
} else if (FALSE_IT(tnode_.trans_commit(tx_data.commit_version_, tx_data.end_log_ts_))) {
} else if (FALSE_IT(tnode_.trans_commit(commit_version, end_log_ts))) {
} else if (blocksstable::ObDmlFlag::DF_LOCK == tnode_.get_dml_flag()
&& OB_FAIL(value_.unlink_trans_node(tnode_))) {
TRANS_LOG(WARN, "unlink lock node failed", K(ret), K(value_), K(tnode_));
}
} else if (ObTxData::ABORT == tx_data.state_) {
} else if (ObTxData::ABORT == state) {
// Case 6: data is aborted, so we write back the abort state
if (OB_FAIL(value_.unlink_trans_node(tnode_))) {
TRANS_LOG(WARN, "mvcc trans ctx trans commit error", K(ret), K(value_), K(tnode_));
} else {
(void)tnode_.trans_abort(tx_data.end_log_ts_);
(void)tnode_.trans_abort(end_log_ts);
}
} else {
ret = OB_ERR_UNEXPECTED;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册