提交 4a58022b 编写于 作者: H handora 提交者: wangzelin.wzl

cherrypick to 3.1_opensource_release

上级 a6d5591d
......@@ -1527,6 +1527,7 @@ const int64_t MAX_SSTABLE_CNT_IN_STORAGE = 64;
const int64_t RESERVED_STORE_CNT_IN_STORAGE =
8; // Avoid mistakenly triggering minor or major freeze to cause the problem of unsuccessful merge.
const int64_t MAX_FROZEN_MEMSTORE_CNT_IN_STORAGE = 7;
const int64_t MAX_MEMSTORE_CNT = 16;
// some frozen memstores and one active memstore
// Only limited to minor freeze, major freeze is not subject to this restriction
const int64_t MAX_MEMSTORE_CNT_IN_STORAGE = MAX_FROZEN_MEMSTORE_CNT_IN_STORAGE + 1;
......
......@@ -211,21 +211,6 @@ void ObLogSlidingWindow::destroy_aggre_buffer()
}
}
int ObLogSlidingWindow::leader_takeover()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(ERROR, "ObPartitionLogService is not inited", K(ret), K(partition_key_));
} else {
uint64_t max_log_id = OB_INVALID_ID;
int64_t max_log_ts = OB_INVALID_TIMESTAMP;
get_max_log_id_info(max_log_id, max_log_ts);
try_update_max_majority_log(max_log_id, max_log_ts);
}
return ret;
}
int ObLogSlidingWindow::leader_active()
{
int ret = OB_SUCCESS;
......
......@@ -100,7 +100,6 @@ public:
virtual int get_next_replay_log_timestamp(int64_t& next_replay_log_timestamp) const = 0;
virtual uint64_t get_next_index_log_id() const = 0;
virtual int leader_active() = 0;
virtual int leader_takeover() = 0;
virtual int leader_revoke() = 0;
virtual void get_next_replay_log_id_info(uint64_t& next_log_id, int64_t& next_log_ts) const = 0;
virtual bool is_fake_info_need_revoke(const uint64_t log_id, const int64_t current_time) = 0;
......@@ -490,7 +489,6 @@ public:
return common::OB_SUCCESS;
}
int leader_active() override;
int leader_takeover() override;
int leader_revoke() override;
int get_replica_replay_type(ObReplicaReplayType& replay_type) const;
// is_meta_log: log type that need been replayed by D replica and log replica
......
......@@ -1678,8 +1678,6 @@ int ObLogStateMgr::reconfirm_to_taking_over_()
revoke_leader_(revoke_type);
} else if (OB_FAIL(on_leader_takeover_())) {
CLOG_LOG(WARN, "on_leader_takeover_ failed, try again", K_(partition_key), K(ret));
} else if (OB_FAIL(sw_->leader_takeover())) {
CLOG_LOG(ERROR, "sw leader_active failed", K(ret), K(partition_key_));
} else {
reconfirm_->reset();
// role_ = LEADER;
......
......@@ -14,6 +14,7 @@
#include "storage/memtable/ob_memtable_key.h"
#include "storage/memtable/ob_memtable_data.h"
#include "storage/memtable/ob_memtable.h"
#include "storage/transaction/ob_trans_part_ctx.h"
namespace oceanbase {
using namespace common;
......@@ -83,7 +84,8 @@ int ObIMvccCtx::register_row_commit_cb(const ObMemtableKey* key, ObMvccRow* valu
TRANS_LOG(WARN, "alloc row callback failed", K(ret));
} else {
tg.click();
// count up memory size of current transaction node
(void)check_row_callback_registration_between_stmt_();
tg.click();
add_trans_mem_total_size(data_size);
node->set_mvcc_row_cb(cb);
cb->set(key, node, data_size, old_row, is_replay, need_fill_redo, sql_no);
......@@ -374,5 +376,14 @@ int64_t ObIMvccCtx::get_query_abs_lock_wait_timeout(const int64_t lock_wait_star
return abs_timeout;
}
void ObIMvccCtx::check_row_callback_registration_between_stmt_()
{
ObIMemtableCtx* i_mem_ctx = (ObIMemtableCtx*)(this);
transaction::ObPartTransCtx* trans_ctx = (transaction::ObPartTransCtx*)(i_mem_ctx->get_trans_ctx());
if (NULL != trans_ctx && trans_ctx->is_task_match()) {
TRANS_LOG(ERROR, "register commit not match expection", K(*trans_ctx));
}
}
} // namespace memtable
} // namespace oceanbase
......@@ -408,6 +408,9 @@ public:
ObMvccTransNode* alloc_trans_node();
int append_callback(ObITransCallback* cb);
private:
void check_row_callback_registration_between_stmt_();
protected:
DISALLOW_COPY_AND_ASSIGN(ObIMvccCtx);
int alloc_type_;
......
......@@ -257,6 +257,15 @@ int ObTransCallbackMgr::remove_callback_for_uncommited_txn(ObMemtable* memtable,
return ret;
}
int ObTransCallbackMgr::clean_dirty_callbacks()
{
int ret = OB_SUCCESS;
UNUSED(fifo_callback(guard(), TCB_CLEAN_DIRTY_CB));
return ret;
}
int ObTransCallbackMgr::calc_checksum_before_log_ts(const int64_t log_ts)
{
int ret = OB_SUCCESS;
......@@ -524,7 +533,9 @@ int ObTransCallbackList::remove_callback_fifo_callback(const ObITransCallback* s
}
length_--;
cnt++;
callback_mgr_.get_ctx().callback_free(iter);
if (!iter->is_savepoint()) {
callback_mgr_.get_ctx().callback_free(iter);
}
iter = next;
same_mem_cb_cnt++;
} else {
......@@ -788,6 +799,8 @@ int ObMvccRowCallback::callback(
}
} else if (TCB_ELR_TRANS_PREPARING == type) {
ret = elr_trans_preparing();
} else if (TCB_CLEAN_DIRTY_CB == type) {
ret = clean_dirty_cb();
} else if (TCB_PRINT_CALLBACK == type) {
ret = print_callback();
} else {
......@@ -910,6 +923,7 @@ int ObMvccRowCallback::row_pending()
} else {
if (NULL != tnode_) {
if (INT64_MAX == ctx_.get_trans_version()) {
TRANS_LOG(ERROR, "It should never go here", K(*this), K_(ctx));
unlink_trans_node();
} else if (OB_FAIL(tnode_->fill_trans_version(ctx_.get_trans_version()))) {
TRANS_LOG(WARN, "fill trans version failed", K(ret), K_(ctx));
......@@ -1444,5 +1458,19 @@ uint32_t ObMvccRowCallback::get_ctx_descriptor() const
return ctx_.get_ctx_descriptor();
}
int ObMvccRowCallback::clean_dirty_cb()
{
int ret = OB_SUCCESS;
if (marked_for_logging_) {
unlink_trans_node();
dec_pending_cb_count();
marked_for_logging_ = false;
need_fill_redo_ = false;
}
return ret;
}
}; // namespace memtable
}; // end namespace oceanbase
......@@ -46,6 +46,7 @@ enum TransCallbackType {
TCB_REMOVE_CALLBACK = 17,
TCB_SUB_STMT_ABORT = 18,
TCB_CRC4 = 19,
TCB_CLEAN_DIRTY_CB = 20,
// TEST_ONLY, used for print callback
TCB_PRINT_CALLBACK = 100
......@@ -322,6 +323,7 @@ public:
int mark_frozen_data(
const ObMemtable* const frozen_memtable, const ObMemtable* const active_memtable, bool& marked, int64_t& cb_cnt);
int calc_checksum_before_log_ts(const ObITransCallback* start, const ObITransCallback* end, const int64_t log_ts);
int clean_dirty_callbacks();
int fetch_rollback_data_size(const ObITransCallback* start, const ObITransCallback* end, int64_t& rollback_size);
private:
......@@ -507,6 +509,7 @@ private:
public:
bool is_rowlocks_released() const;
int calc_checksum_before_log_ts(const int64_t log_ts);
int clean_dirty_callbacks();
int fetch_rollback_data_size(const ObITransCallback* point, int64_t& rollback_size);
void inc_pending_log_size(const int64_t size);
void inc_flushed_log_size(const int64_t size)
......@@ -767,6 +770,7 @@ private:
}
int dec_pending_cb_count();
void mark_tnode_overflow(const int64_t log_ts);
int clean_dirty_cb();
private:
ObIMvccCtx& ctx_;
......
......@@ -40,8 +40,8 @@ public:
};
bool is_reach_max_memtable_cnt()
{
return get_count_() >= 16;
};
return get_count_() >= common::MAX_MEMSTORE_CNT;
}
bool is_contain_this_memtable(ObMemtable* memtable);
int check_memtable_count(int64_t& count);
......
......@@ -844,8 +844,15 @@ int ObMemtableCtx::trans_replay_end(const bool commit, const int64_t trans_versi
int ret = OB_SUCCESS;
int cs_ret = OB_SUCCESS;
if (commit && 0 != checksum && !ObServerConfig::get_instance().ignore_replay_checksum_error) {
const uint64_t checksum4 = calc_checksum4();
// We must calculate the checksum and generate the checksum_log_ts even when
// the checksum verification is unnecessary. This because the trans table
// merge may be triggered after clear state in which the callback has already
const uint64_t checksum4 = calc_checksum4();
if (commit
&& 0 != checksum
&& GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_3100
&& !ObServerConfig::get_instance().ignore_replay_checksum_error) {
if (checksum != checksum4) {
cs_ret = OB_CHECKSUM_ERROR;
TRANS_LOG(ERROR, "MT_CTX: replay checksum error", K(ret), K(*this), K(commit), K(checksum), K(checksum4));
......@@ -1572,6 +1579,49 @@ int ObMemtableCtx::remove_callback_for_uncommited_txn(ObMemtable* mt, int64_t& c
return ret;
}
// If leader switches to follower and marks some callbacks which have not been
// submmitted to sliding window. We need clean the dirty callbacks because
// follower can not submit the log and decrement the pending_cb_count_ and the
// mini merge will not be allowed before txn finishes. Which add the dependency
// from minor merge to txn termination.
//
// NB: Because we remove the trans node before txn termination, we should guarantee
// the txn should finally abort. Except the case 3.2(TODO: handora.qc): generalize
// the process of clean dirty callbacks
//
// We solve the problem in the following way:
// - When leader is revoking and no non-2pc logs of txn has already been
// submitted to sliding window and no on-the-fly log:
// - Case 1: We rollback the txn immediately, so no dirty callbacks need to be
// removed
// - When leader is revoking and some non-2pc logs of txn has already been
// submitted to sliding window:
// - Case 2.1: We only solve the case with no on-the-fly logs(because we have no idea
// whether the on-the-fly log is paxos-choosen or not)
// - If the state is not synced successfully(txn need abort), so we remove all
// marked trans node
// - When replaying start working:
// - Case 3.1: If txn has a on-the-fly log, it means some logs are not paxos-choosen
// successfully(txn need abort), so we remove all marked trans node
// - Case 3.2: If txn has no on-the-fly log and no trans state is synced by the leader
// transfer(txn may need abort, while we donot have the information whether the
// original leader successfully synced the log), and we also remove all marked trans node.
//
int ObMemtableCtx::clean_dirty_callbacks()
{
int ret = OB_SUCCESS;
ObByteLockGuard guard(lock_);
if (OB_FAIL(trans_mgr_.clean_dirty_callbacks())) {
TRANS_LOG(WARN, "fail to dirty callbacks", K(ret));
} else {
TRANS_LOG(INFO, "clean dirty callbacks successfully", K(*this));
}
return ret;
}
int ObMemtableCtx::mark_frozen_data(
const ObMemtable* const frozen_memtable, const ObMemtable* const active_memtable, bool& marked, int64_t& cb_cnt)
{
......
......@@ -489,6 +489,7 @@ public:
return trans_mgr_.count();
}
void dec_pending_elr_count();
int clean_dirty_callbacks();
public:
void on_tsc_retry(const ObMemtableKey& key) override;
......
......@@ -3409,7 +3409,8 @@ int ObPartitionGroup::get_freeze_cut_(ObMemtable& frozen_memtable, const bool is
K(*this));
}
} else {
// 2. The freeze_id of follower is the right boundary of replay queue.
// 2. The freeze_id of follower is the the maximum log id of the right
// boundary of replay queue and the max majoritied log id
// The follower will block the replay, wait it to be empty and then get the freeze_id.
if (OB_FAIL(wait_follower_no_pending_task_())) {
STORAGE_LOG(WARN, "wait follower no pending task failed", K(is_leader), K(freeze_id), K(*this));
......@@ -3421,6 +3422,53 @@ int ObPartitionGroup::get_freeze_cut_(ObMemtable& frozen_memtable, const bool is
K(freeze_id),
K(freeze_ts),
K(*this));
} else {
// The logic below is sophistic:
//
// If you remember the semantic of end_log_ts and max_log_ts belong to
// the memstore, you will know that all data belong to the log before
// end_log_ts is within the memstore, and the data may or maynot exist
// in the memstore if the log creates the data is between end_log_ts and
// max_log_ts
//
// In terms of the minor freeze, follower needs to wait until replaying
// to a continuous log point and fetch the freeze point. While follower
// cannot use the min replayed log ts both as the end_log_ts and
// max_log_ts.
//
// To see why the more sophistic max_log_ts calculation is required,
// consider the following example:
// 1. Leader submits the log 5,6,7 and only log 7 is in quorum using
// paxos and its data is already filled in the memstore
// 2. Leader switches to the follower and the min replayed log ts is
// smaller than the log 5's log_ts
// 3. If we just use the min replayed log ts as both the end_log_ts and
// max_log_ts the semantic specified above is broken
//
// So we need maintain the max_log_ts using the log 7's timestamp, in
// terms of the implementation, we use the max_majority_log_ts which is
// updated after each log's synchronization of leader.
//
// What's more, we need mark all data whose log is between end_log_ts to
// max_log_ts as overflow(the requirement from the storage layer). while
// the data may already synced and we have no chance to mark the data
// except traversing all data in the memtable. So we choose to mark the
// end_log_ts as the max_majority_log_ts as well. The detailed issue can
// be found in https://work.aone.alibaba-inc.com/issue/33865988
//
// NB: we never maintain the max_mjority_log_ts for follower, so we just
// use the variable for the corner case of leader transfer.
uint64_t max_majority_log_id = OB_INVALID_ID;
int64_t max_majority_log_ts = OB_INVALID_TIMESTAMP;
(void)pls_->get_max_majority_log(max_majority_log_id, max_majority_log_ts);
if (max_majority_log_ts > freeze_ts) {
TRANS_LOG(WARN,
"max majority log ts is larger than freeze timestamp",
K(max_majority_log_ts),
K(freeze_ts),
K(*this));
ret = OB_EAGAIN;
}
}
}
if (OB_FAIL(ret)) {
......@@ -3616,7 +3664,7 @@ int ObPartitionGroup::wait_follower_no_pending_task_()
int64_t cnt = 0;
int64_t task_cnt = replay_status_->get_pending_task_count();
while (replay_status_->has_pending_task(pkey_) && OB_SUCC(ret)) {
while (replay_status_->has_pending_task(pkey_) && !replay_status_->has_encount_fatal_error() && OB_SUCC(ret)) {
usleep(FREEZE_WAIT_RETRY_SLEEP_TS);
cnt++;
......@@ -3633,6 +3681,11 @@ int ObPartitionGroup::wait_follower_no_pending_task_()
}
}
if (replay_status_->has_encount_fatal_error()) {
TRANS_LOG(ERROR, "encounter fatal error", K(*replay_status_), K(ret), K(pkey_));
ret = OB_ERR_UNEXPECTED;
}
return ret;
}
......@@ -3661,7 +3714,6 @@ int ObPartitionGroup::check_range_changed_(ObTableHandle& handle, const bool is_
base_version = mt->get_base_version();
if (tmp_freeze_ts < start_log_ts || tmp_snapshot_version < base_version) {
ret = OB_EAGAIN;
STORAGE_LOG(INFO,
"skip freeze, maybe in the process of restarting",
K(ret),
......@@ -3818,7 +3870,7 @@ int ObPartitionGroup::freeze_log_and_data_v2_(const bool emergency, const bool f
if (OB_STATE_NOT_MATCH == ret) {
STORAGE_LOG(INFO, "skip freeze due to clog state", K(ret), K(pkey_));
ret = OB_SUCCESS;
} else if (OB_EAGAIN != ret) {
} else {
STORAGE_LOG(WARN, "failed to check log_id or version range changed", K(ret), K(old_handle));
}
} else if (!changed) {
......@@ -3837,6 +3889,10 @@ int ObPartitionGroup::freeze_log_and_data_v2_(const bool emergency, const bool f
}
}
if (OB_FAIL(ret) || !effected) {
freeze_record_.clear();
}
return ret;
}
......
......@@ -192,7 +192,7 @@ int ObPGLockWithPendingReplayGuard::wait_follower_no_pending_task_()
int64_t cnt = 0;
int64_t task_cnt = replay_status_.get_pending_task_count();
while (replay_status_.has_pending_task(pkey_) && OB_SUCC(ret)) {
while (replay_status_.has_pending_task(pkey_) && !replay_status_.has_encount_fatal_error() && OB_SUCC(ret)) {
usleep(SLEEP_FOR_PENDING_REPLAY);
cnt++;
......@@ -209,6 +209,13 @@ int ObPGLockWithPendingReplayGuard::wait_follower_no_pending_task_()
}
}
if (replay_status_.has_encount_fatal_error()) {
// We just return the success because there will be no pending replay task.
// While we report the ERROR to notify the user
TRANS_LOG(ERROR, "encounter fatal error", K(replay_status_), K(ret), K(pkey_));
ret = OB_SUCCESS;
}
return ret;
}
......
......@@ -111,8 +111,6 @@ private:
private:
static const int64_t PRINT_READABLE_INFO_DURATION_US = 1000 * 1000 * 60 * 10L; // 10min
static const int64_t MAX_MEMSTORE_CNT = 16;
private:
int64_t memtable_head_;
int64_t memtable_tail_;
......
......@@ -3750,7 +3750,7 @@ int ObPartTransCtxMgr::remove_partition(const ObPartitionKey& partition, const b
if (0 == ctx_mgr->get_active_read_write_count() && 0 == ctx_mgr->get_read_only_count()) {
if (0 != ctx_mgr->get_ctx_count()) {
TRANS_LOG(
ERROR, "maybe some context memory not free, please attention", K(partition), K(*ctx_mgr));
WARN, "maybe some context memory not free, please attention", K(partition), K(*ctx_mgr));
}
need_retry = false;
// OB_SUCCESS is not returned here.
......
......@@ -885,8 +885,10 @@ public:
if (OB_ISNULL(part_ctx = dynamic_cast<ObPartTransCtx*>(ctx_base))) {
ret = OB_ERR_UNEXPECTED;
} else if (!part_ctx->is_dirty_trans()) {
// do nothing
} else if (!part_ctx->is_dirty_trans() || !part_ctx->has_synced_log()) {
if (part_ctx->is_dirty_trans() && !part_ctx->has_synced_log()) {
TRANS_LOG(INFO, "We donot dump the dirty trans with no synced log", K(*part_ctx));
}
clean_trx_cnt_++;
} else if (OB_FAIL(part_ctx->get_trans_sstable_durable_ctx_info(end_log_ts_, ctx_info))) {
TRANS_LOG(WARN, "failed to get trans table status info", K(ret));
......@@ -897,7 +899,7 @@ public:
allocator_.reuse();
if (OB_ISNULL(tmp_buf_ = static_cast<char*>(allocator_.alloc(serialize_size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "failed to allocate memory", K(serialize_size));
TRANS_LOG(WARN, "failed to allocate memory", K(serialize_size));
}
} else {
tmp_buf_ = buf_;
......
......@@ -1097,6 +1097,8 @@ int ObPartTransCtx::end_stmt_(
need_response = false;
ret = OB_NOT_MASTER;
TRANS_LOG(WARN, "transaction is preparing changing leader", KR(ret), "context", *this);
} else if (OB_FAIL(get_status_())) {
TRANS_LOG(WARN, "transaction is not healthy when end_stmt_", KR(ret), "context", *this);
} else if (!can_rollback_stmt_) {
need_response = true;
if (Ob2PCState::INIT == get_state_()) {
......@@ -1530,6 +1532,9 @@ int ObPartTransCtx::kill(const KillTransArg& arg, ObEndTransCallbackArray& cb_ar
}
if (OB_SUCC(ret)) {
trans_kill_();
// Force kill cannot guarantee the consistency, so we just set end_log_ts
// to zero
end_log_ts_ = 0;
(void)trans_clear_();
if (OB_FAIL(unregister_timeout_task_())) {
TRANS_LOG(WARN, "unregister timer task error", KR(ret), "context", *this);
......@@ -1775,6 +1780,10 @@ int ObPartTransCtx::on_sync_log_success(
if (0 == redo_log_no_++) {
TRANS_LOG(DEBUG, "participant enter into 2pc", "context", *this, K(log_type), K(timestamp));
}
if (redo_log_no_ == 1) {
// The log is completed, we need verify the txn checksum
need_checksum_ = true;
}
// need submit redo_prepare log when log_type equal OB_LOG_TRANS_REDO
if (OB_LOG_TRANS_REDO == log_type) {
start_us = ObTimeUtility::fast_current_time();
......@@ -2528,6 +2537,7 @@ int ObPartTransCtx::leader_active(const LeaderActiveArg& arg)
is_trans_state_sync_finished_ = false;
is_changing_leader_ = false;
prepare_changing_leader_state_ = CHANGING_LEADER_STATE::NO_CHANGING_LEADER;
update_max_submitted_log_timestamp_(max_durable_log_ts_);
if (need_register_timer_task) {
// The request_id_ should be initialized to prevent the 2pc cannot be
// driven if all participants transferring the leader
......@@ -3065,6 +3075,8 @@ int ObPartTransCtx::leader_revoke(const bool first_check, bool& need_release, Ob
(void)unregister_timeout_task_();
if (!has_logged_() && !is_in_2pc_() && 0 == submit_log_count_) {
trans_kill_();
// Because of no logs, we can free the dirty trans instantly
end_log_ts_ = 0;
(void)trans_clear_();
set_exiting_();
if (!is_logging_()) {
......@@ -3084,6 +3096,12 @@ int ObPartTransCtx::leader_revoke(const bool first_check, bool& need_release, Ob
if (!is_trans_state_sync_finished_) {
TRANS_LOG(INFO, "transaction is killed", "context", *this);
}
} else if (has_logged_() && !is_in_2pc_() && !is_trans_state_sync_finished_ && 0 == submit_log_count_) {
// - When leader is revoking and some non-2pc logs of txn has already been
// submitted to sliding window:
// - If no on-the-fly log and state log is not synced successfully, remove all
// marked_log_cnts
(void)mt_ctx_.clean_dirty_callbacks();
} else if (OB_FAIL(mt_ctx_.commit_to_replay())) {
TRANS_LOG(WARN, "commit to replay error", KR(ret), "context", *this);
} else {
......@@ -3899,7 +3917,10 @@ int ObPartTransCtx::replay_prepare_log(const ObTransPrepareLog& log, const int64
} else {
batch_commit_trans_ = false;
}
if (log.get_redo_log_ids().count() == 0) {
if (0 == log.get_redo_log_ids().count() && 0 == redo_log_no_) {
// We only enable the checksum check if prev_redo_log_ids' count is zero
// and redo_log_no is zero. The later check is used to filter the txn
// REDO_WITH_PREPARE log which donot include itself inth prev_redo_log_id.
need_checksum_ = true;
}
/*
......@@ -4409,7 +4430,6 @@ int ObPartTransCtx::replay_trans_state_log(const ObTransStateLog& log, const int
TRANS_LOG(WARN, "different can elr state", K(log), K(*this));
}
can_elr_ = log.is_can_elr();
update_durable_log_id_ts_(OB_LOG_TRANS_STATE, log_id, timestamp);
log_type_ = log.get_log_type();
scheduler_ = log.get_scheduler();
is_readonly_ = log.is_readonly();
......@@ -4438,9 +4458,12 @@ int ObPartTransCtx::replay_trans_state_log(const ObTransStateLog& log, const int
has_trans_state_log_ = true;
TRANS_LOG(INFO, "replay trans state log success", "context", *this, K(log), K(log_id));
}
if (OB_FAIL(ret)) {
if (OB_SUCC(ret)) {
update_durable_log_id_ts_(OB_LOG_TRANS_STATE, log_id, timestamp);
} else {
TRANS_LOG(WARN, "replay trans state log error", KR(ret), "context", *this, K(log), K(log_id));
}
REC_TRANS_TRACE_EXT(tlog_,
replay_trans_state,
Y(ret),
......@@ -4668,11 +4691,11 @@ int ObPartTransCtx::replay_start_working_log(const int64_t timestamp, const uint
CtxTransTableLockGuard guard(lock_, trans_table_seqlock_);
if (submit_log_count_ <= 0) {
// do nothing
} else if (IS_NOT_INIT) {
if (IS_NOT_INIT) {
TRANS_LOG(WARN, "ObPartTransCtx not inited");
ret = OB_NOT_INIT;
} else if (is_exiting_) {
// do nothing
} else if (OB_UNLIKELY(!for_replay_)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "invalid state, transaction is not replaying", KR(ret), "context", *this);
......@@ -4681,6 +4704,16 @@ int ObPartTransCtx::replay_start_working_log(const int64_t timestamp, const uint
TRANS_LOG(WARN, "trans is not valid", K(*this), K(log_id), K(timestamp), K(log), K(timestamp));
ret = OB_TRANS_INVALID_STATE;
need_print_trace_log_ = true;
} else if (0 == submit_log_count_) {
if (has_logged_() && !is_in_2pc_() && !is_trans_state_sync_finished_ && is_changing_leader_) {
// - When replaying start working:
// - Case 3.2: If txn has no on-the-fly log and no trans state is synced by the leader
// transfer(txn may need abort, while we donot have the information whether the
// original leader successfully synced the log), and we also remove all marked trans node.
(void)mt_ctx_.clean_dirty_callbacks();
TRANS_LOG(INFO, "clean dirty callbacks when replay start working", K(*this));
}
} else {
need_print_trace_log_ = true;
if (!has_logged_() && !is_in_2pc_() && !is_hazardous_ctx_) {
......@@ -4695,11 +4728,17 @@ int ObPartTransCtx::replay_start_working_log(const int64_t timestamp, const uint
// majority(TODO: need provement)
is_dirty_ = false;
set_exiting_();
} else {
// because current log is not majoritied, and some logs have been
} else if (has_logged_() && !is_in_2pc_() && submit_log_count_ > 0) {
// - When replaying start working:
// - Case 3.1: If txn has a on-the-fly log, it means some logs are not paxos-choosen
// successfully(txn need abort), so we remove all marked trans node
(void)mt_ctx_.clean_dirty_callbacks();
// Because current log is not majoritied, and some logs have been
// majoritied, we need wait for abort log by new leader
TRANS_LOG(INFO, "no need to kill trans when replay start working log", K(*this));
}
submit_log_count_ = 0;
TRANS_STAT_ABORT_TRANS_INC(tenant_id_);
}
......@@ -11911,6 +11950,7 @@ int ObPartTransCtx::fake_kill_(const int64_t terminate_log_ts)
// fake kill interface
end_log_ts_ = terminate_log_ts;
// TODO(): the interface is currently not necessary, remove it
set_state_(Ob2PCState::CLEAR);
(void)trans_clear_();
set_exiting_();
}
......@@ -11931,6 +11971,7 @@ int ObPartTransCtx::kill_v2_(const int64_t terminate_log_ts)
} else {
end_log_ts_ = terminate_log_ts;
// TODO(): the interface is currently not necessary, remove it
set_state_(Ob2PCState::CLEAR);
(void)trans_clear_();
set_exiting_();
}
......
......@@ -311,6 +311,10 @@ public:
{
return is_dirty_;
}
bool has_synced_log() const
{
return 0 != max_durable_log_ts_;
}
int64_t get_forbidden_sql_no() const
{
return ATOMIC_LOAD(&forbidden_sql_no_);
......@@ -375,6 +379,10 @@ public:
{
return enable_new_1pc_;
}
bool is_task_match()
{
return stmt_info_.is_task_match();
}
void remove_trans_table();
int clear_trans_after_restore(
const int64_t restore_version, const uint64_t last_restore_log_id, const int64_t fake_terminate_log_ts);
......@@ -391,8 +399,8 @@ public:
K_(is_dup_table_prepare), K_(dup_table_syncing_log_id), K_(is_prepare_leader_revoke), K_(is_local_trans),
K_(forbidden_sql_no), K(is_dirty_), K_(undo_status), K_(max_durable_sql_no), K_(max_durable_log_ts),
K(mt_ctx_.get_checksum_log_ts()), K_(is_changing_leader), K_(has_trans_state_log),
K_(same_leader_batch_partitions_count), K_(is_hazardous_ctx), K(mt_ctx_.get_callback_count()),
K_(in_xa_prepare_state), K_(is_listener), K_(last_replayed_redo_log_id),
K_(is_trans_state_sync_finished), K_(status), K_(same_leader_batch_partitions_count), K_(is_hazardous_ctx),
K(mt_ctx_.get_callback_count()), K_(in_xa_prepare_state), K_(is_listener), K_(last_replayed_redo_log_id),
K_(is_xa_trans_prepared));
public:
......@@ -687,6 +695,18 @@ private:
bool is_prepared_;
bool is_gts_waiting_;
bool batch_commit_trans_;
// Whether there exists a trans state log for the current leader transfer
//
// It is implemented as follow:
// - For the New Leader:
// - we set the value to true when we replay the trans state log
// if the new leader is me
// - we reset the value when leader is active
// - For the original Leader:
// - we reset the value before each leader transfer
// - we set the value to true when we synced the trans state log
// - we reset the value when leader is revoked if no on-the-fly log
// exist
bool is_trans_state_sync_finished_;
bool is_changing_leader_;
bool can_rollback_stmt_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册