提交 7c4d50e3 编写于 作者: O obdev 提交者: wangzelin.wzl

Set limitation for large transaction && Add some obtest for large transaction testing

上级 53a562dc
...@@ -17,6 +17,12 @@ if(ENABLE_DEBUG_LOG) ...@@ -17,6 +17,12 @@ if(ENABLE_DEBUG_LOG)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DENABLE_DEBUG_LOG") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DENABLE_DEBUG_LOG")
endif() endif()
if(OB_ERRSIM)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DERRSIM")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DERRSIM")
set(OB_TRANS_ERRSIM ON)
endif()
if(WITH_OSS) if(WITH_OSS)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_WITH_OSS") set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_WITH_OSS")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_WITH_OSS") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_WITH_OSS")
......
...@@ -121,6 +121,12 @@ function build ...@@ -121,6 +121,12 @@ function build
xdebug_no_unity) xdebug_no_unity)
do_build "$@" -DCMAKE_BUILD_TYPE=Debug -DOB_ENABLE_UNITY=OFF -DOB_ENABLE_PCH=OFF do_build "$@" -DCMAKE_BUILD_TYPE=Debug -DOB_ENABLE_UNITY=OFF -DOB_ENABLE_PCH=OFF
;; ;;
xerrsim_debug)
do_build "$@" -DCMAKE_BUILD_TYPE=Debug -DOB_ERRSIM=ON -DOB_USE_LLD=$LLD_OPTION
;;
xerrsim)
do_build "$@" -DCMAKE_BUILD_TYPE=RelWithDebInfo -DOB_ERRSIM=ON -DOB_USE_LLD=$LLD_OPTION
;;
xrpm) xrpm)
do_build "$@" -DOB_BUILD_RPM=ON -DCMAKE_BUILD_TYPE=RelWithDebInfo -DOB_USE_CCACHE=OFF -DOB_COMPRESS_DEBUG_SECTIONS=ON -DOB_STATIC_LINK_LGPL_DEPS=OFF do_build "$@" -DOB_BUILD_RPM=ON -DCMAKE_BUILD_TYPE=RelWithDebInfo -DOB_USE_CCACHE=OFF -DOB_COMPRESS_DEBUG_SECTIONS=ON -DOB_STATIC_LINK_LGPL_DEPS=OFF
;; ;;
......
...@@ -466,6 +466,7 @@ public: ...@@ -466,6 +466,7 @@ public:
EN_INVALID_ADDR_WEAK_READ_FAILED = 215, EN_INVALID_ADDR_WEAK_READ_FAILED = 215,
EN_STACK_OVERFLOW_CHECK_EXPR_STACK_SIZE = 216, EN_STACK_OVERFLOW_CHECK_EXPR_STACK_SIZE = 216,
EN_ENABLE_PDML_ALL_FEATURE = 217, EN_ENABLE_PDML_ALL_FEATURE = 217,
EN_FAST_MIGRATE_ADD_MEMBER_FAIL = 220,
EN_MIGRATE_ADD_PARTITION_FAILED = 224, EN_MIGRATE_ADD_PARTITION_FAILED = 224,
EN_PRINT_QUERY_SQL = 231, EN_PRINT_QUERY_SQL = 231,
...@@ -501,11 +502,17 @@ public: ...@@ -501,11 +502,17 @@ public:
EN_CLOG_DUMP_ILOG_MEMSTORE_RENAME_FAILURE = 267, EN_CLOG_DUMP_ILOG_MEMSTORE_RENAME_FAILURE = 267,
EN_CLOG_ILOG_MEMSTORE_ALLOC_MEMORY_FAILURE = 268, EN_CLOG_ILOG_MEMSTORE_ALLOC_MEMORY_FAILURE = 268,
EN_PARTICIPANTS_SIZE_OVERFLOW = 275,
EN_UNDO_ACTIONS_SIZE_OVERFLOW = 276,
EN_PART_PLUS_UNDO_OVERFLOW = 277,
EN_PREVENT_SYNC_REPORT = 360, EN_PREVENT_SYNC_REPORT = 360,
EN_PREVENT_ASYNC_REPORT = 361, EN_PREVENT_ASYNC_REPORT = 361,
EN_LOG_IDS_COUNT_ERROR = 363, EN_LOG_IDS_COUNT_ERROR = 363,
// DDL related 500-550 // DDL related 500-550
EN_SUBMIT_INDEX_TASK_ERROR_BEFORE_STAT_RECORD = 503, EN_SUBMIT_INDEX_TASK_ERROR_BEFORE_STAT_RECORD = 503,
EN_SUBMIT_INDEX_TASK_ERROR_AFTER_STAT_RECORD = 504, EN_SUBMIT_INDEX_TASK_ERROR_AFTER_STAT_RECORD = 504,
......
...@@ -1200,7 +1200,7 @@ int ObGarbageCollector::gc_check_log_archive_( ...@@ -1200,7 +1200,7 @@ int ObGarbageCollector::gc_check_log_archive_(
int64_t delay_interval = LOG_ARCHIVE_DROP_DELAY; int64_t delay_interval = LOG_ARCHIVE_DROP_DELAY;
#ifdef ERRSIM #ifdef ERRSIM
delay_interval = std::min(LOG_ARCHIVE_DROP_DELAY, (int64_t)ObServerConfig::get_instance().schema_drop_gc_delay_time); // delay_interval = std::min(LOG_ARCHIVE_DROP_DELAY, (int64_t)ObServerConfig::get_instance().schema_drop_gc_delay_time);
#endif #endif
if (enable_log_archive && !is_restore && !is_sys_tenant) { if (enable_log_archive && !is_restore && !is_sys_tenant) {
......
...@@ -1419,6 +1419,38 @@ int ObTransDesc::merge_participants(const common::ObPartitionArray& participants ...@@ -1419,6 +1419,38 @@ int ObTransDesc::merge_participants(const common::ObPartitionArray& participants
return ret; return ret;
} }
int ObTransDesc::check_participants_size()
{
int ret = OB_SUCCESS;
#ifdef ERRSIM
// test if this function can handle participants size overflow successfully
if (OB_FAIL(E(EventTable::EN_PARTICIPANTS_SIZE_OVERFLOW) OB_SUCCESS)) {
OB_MAX_TRANS_SERIALIZE_SIZE = 1500;
OB_MIN_REDO_LOG_SERIALIZE_SIZE = 500;
TRANS_LOG(INFO, "ERRSIM modify trans ctx serialize size for case 1 ",
K(OB_MAX_TRANS_SERIALIZE_SIZE), K(OB_MIN_REDO_LOG_SERIALIZE_SIZE));
} else if (OB_FAIL(E(EventTable::EN_PART_PLUS_UNDO_OVERFLOW) OB_SUCCESS)) {
OB_MAX_TRANS_SERIALIZE_SIZE = 7500;
OB_MIN_REDO_LOG_SERIALIZE_SIZE = 500;
TRANS_LOG(INFO, "ERRSIM modify trans ctx serialize size for case 2 ",
K(OB_MAX_TRANS_SERIALIZE_SIZE), K(OB_MIN_REDO_LOG_SERIALIZE_SIZE));
}
OB_MAX_UNDO_ACTION_SERIALIZE_SIZE = OB_MAX_TRANS_SERIALIZE_SIZE - OB_MIN_REDO_LOG_SERIALIZE_SIZE;
ret = OB_SUCCESS;
#endif
int64_t participants_size = participants_.get_serialize_size();
if (participants_size > OB_MAX_TRANS_SERIALIZE_SIZE) {
ret = OB_SIZE_OVERFLOW;
TRANS_LOG(WARN, "Participants are too large which may make dump trans state table failed.",
KR(ret), K(participants_size), K(participants_.count()),
K(OB_MAX_TRANS_SERIALIZE_SIZE), K(participants_));
}
return ret;
}
int ObTransDesc::set_cur_stmt_desc(const ObStmtDesc& stmt_desc) int ObTransDesc::set_cur_stmt_desc(const ObStmtDesc& stmt_desc)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
......
...@@ -69,8 +69,15 @@ class AggreLogTask; ...@@ -69,8 +69,15 @@ class AggreLogTask;
class ObPartTransCtxMgr; class ObPartTransCtxMgr;
class ObPartitionTransCtxMgr; class ObPartitionTransCtxMgr;
// Reserve 50KB to store the fields in trans ctx except undo_status, participants and redo_log // The redo log id can use at least 128KB storage space
static const int64_t OB_MAX_TRANS_SERIALIZE_SIZE = common::OB_MAX_USER_ROW_LENGTH - 51200; static int64_t OB_MIN_REDO_LOG_SERIALIZE_SIZE = 131072;
// redo_log + participants + undo_actions can use MAX_VARCHAR_LENGTH-10KB storage space
static int64_t OB_MAX_TRANS_SERIALIZE_SIZE = common::OB_MAX_VARCHAR_LENGTH - 10 * 1024;
// The participants and undo actions share the last storage space
static int64_t OB_MAX_UNDO_ACTION_SERIALIZE_SIZE
= OB_MAX_TRANS_SERIALIZE_SIZE - OB_MIN_REDO_LOG_SERIALIZE_SIZE;
class ObTransErrsim { class ObTransErrsim {
public: public:
...@@ -1672,6 +1679,7 @@ public: ...@@ -1672,6 +1679,7 @@ public:
int merge_participants_pla(); int merge_participants_pla();
int merge_participants(const common::ObPartitionArray& participants); int merge_participants(const common::ObPartitionArray& participants);
int merge_participants_pla(const common::ObPartitionLeaderArray& participant_pla); int merge_participants_pla(const common::ObPartitionLeaderArray& participant_pla);
int check_participants_size();
const common::ObPartitionArray& get_participants() const const common::ObPartitionArray& get_participants() const
{ {
return participants_; return participants_;
......
...@@ -175,8 +175,9 @@ int ObPartTransCtx::init(const uint64_t tenant_id, const ObTransID& trans_id, co ...@@ -175,8 +175,9 @@ int ObPartTransCtx::init(const uint64_t tenant_id, const ObTransID& trans_id, co
} }
is_listener_ = false; is_listener_ = false;
listener_handler_ = NULL; listener_handler_ = NULL;
ctx_serialize_size_ = undo_status_.get_serialize_size() + partition_log_info_arr_.get_serialize_size() + redo_log_id_serialize_size_ = prev_redo_log_ids_.get_serialize_size();
prev_redo_log_ids_.get_serialize_size(); participants_serialize_size_ = partition_log_info_arr_.get_serialize_size();
undo_serialize_size_ = undo_status_.get_serialize_size();
} }
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
if (NULL != redo_sync_task_) { if (NULL != redo_sync_task_) {
...@@ -390,7 +391,9 @@ void ObPartTransCtx::reset() ...@@ -390,7 +391,9 @@ void ObPartTransCtx::reset()
last_redo_log_mutator_size_ = 0; last_redo_log_mutator_size_ = 0;
has_write_or_replay_mutator_redo_log_ = false; has_write_or_replay_mutator_redo_log_ = false;
is_in_redo_with_prepare_ = false; is_in_redo_with_prepare_ = false;
ctx_serialize_size_ = 0; redo_log_id_serialize_size_ = 0;
participants_serialize_size_ = 0;
undo_serialize_size_ = 0;
prev_checkpoint_id_ = 0; prev_checkpoint_id_ = 0;
} }
...@@ -1650,6 +1653,7 @@ bool ObPartTransCtx::not_need_write_next_log_(const int64_t log_type) ...@@ -1650,6 +1653,7 @@ bool ObPartTransCtx::not_need_write_next_log_(const int64_t log_type)
void ObPartTransCtx::reset_prev_redo_log_ids() void ObPartTransCtx::reset_prev_redo_log_ids()
{ {
prev_redo_log_ids_.reset(); prev_redo_log_ids_.reset();
redo_log_id_serialize_size_ = 0;
} }
// when submit log success // when submit log success
...@@ -4841,20 +4845,29 @@ bool ObPartTransCtx::has_logged_() const ...@@ -4841,20 +4845,29 @@ bool ObPartTransCtx::has_logged_() const
bool ObPartTransCtx::need_record_log() const bool ObPartTransCtx::need_record_log() const
{ {
// Record Log will be generated if the number of log ids // There are three variables can be very large : participants array, redo log array and undo
// is no less than the max size of prev_redo_log_ids_ // actions array. In order to avoiding the serialize size of ObPartTransCtx is too large, which
uint64_t prev_log_ids_count = MAX_PREV_LOG_IDS_COUNT; // may larger than 1MB, we have to limit the size of these three variables. We set the
// following principles:
//
// 1. The redo_log can use at least 128KB
// 2. All the three variables can use 1014KB(MAX_VARCHAR_LENGTH-10KB)
//
// In this function, we sum the serialize size of these three fileds to decide if this transaction
// need write record log. So this function return true implicates the serialize size of these
// three variables is larger than 1014KB and we need write record log to make sure the trans state
// table can be dumped successfully.
bool bool_ret = false;
int total_size = participants_serialize_size_ + undo_serialize_size_ + redo_log_id_serialize_size_;
#ifdef ERRSIM if (total_size > OB_MAX_TRANS_SERIALIZE_SIZE) {
// Error injection test, used for changing prev_log_ids_count for test bool_ret = true;
int tmp_ret = E(EventTable::EN_LOG_IDS_COUNT_ERROR) OB_SUCCESS; TRANS_LOG(INFO, "need flush record log.", K(participants_serialize_size_),
if (tmp_ret != OB_SUCCESS) { K(undo_serialize_size_), K(redo_log_id_serialize_size_), K(total_size),
prev_log_ids_count = 2; K(OB_MAX_TRANS_SERIALIZE_SIZE), KPC(this));
TRANS_LOG(INFO, "need_record_log: ", K(prev_log_ids_count));
} }
#endif
return prev_redo_log_ids_.count() >= prev_log_ids_count; return bool_ret;
} }
int ObPartTransCtx::reserve_log_header_(char *buf, const int64_t size, int64_t &pos) int ObPartTransCtx::reserve_log_header_(char *buf, const int64_t size, int64_t &pos)
...@@ -12155,56 +12168,214 @@ void ObPartTransCtx::DEBUG_SYNC_slow_txn_during_2pc_prepare_phase_for_physical_b ...@@ -12155,56 +12168,214 @@ void ObPartTransCtx::DEBUG_SYNC_slow_txn_during_2pc_prepare_phase_for_physical_b
} }
} }
// for more explanation of this function, see ObPartTransCtx::need_record_log()
int ObPartTransCtx::calc_serialize_size_and_set_redo_log_(const int64_t log_id) int ObPartTransCtx::calc_serialize_size_and_set_redo_log_(const int64_t log_id)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if ((ctx_serialize_size_ += serialization::encoded_length_vi64(log_id)) > OB_MAX_TRANS_SERIALIZE_SIZE) { redo_log_id_serialize_size_ += serialization::encoded_length_vi64(log_id);
ret = OB_SIZE_OVERFLOW; if (OB_FAIL(prev_redo_log_ids_.push_back(log_id))) {
TRANS_LOG(WARN, "size overflow when set redo log.", KR(ret), K(ctx_serialize_size_), K(log_id)); redo_log_id_serialize_size_ -= serialization::encoded_length_vi64(log_id);
} else if (OB_FAIL(prev_redo_log_ids_.push_back(log_id))) { TRANS_LOG(WARN, "prev redo log id push back error", KR(ret), K(log_id), KPC(this));
ctx_serialize_size_ -= serialization::encoded_length_vi64(log_id);
TRANS_LOG(WARN, "sp redo log id push back error", KR(ret), "context", *this, K(log_id));
} else { } else {
// push back redo log success // push back redo log success
} }
return ret; return ret;
} }
/*
* There are three kinds of cases when set participants. We calculate serialize size and do
* something to avoid dumping trans state table fail.
*
* ┌─────────────────────────────────────────────────┐
* CASE 1 : │ participants │
* └─────────────────────────────────────────────────┘
*
* │ OB_MAX_TRANS_SERIALIZE_SIZE │
* └────────────────────────────────────────────┘
* Participants are too large. This situation should be handled by upper layer
*
* ┌────────────────────────────┬────────────────────┐
* CASE 2 : │ participants │ undo status │
* └────────────────────────────┴────────────────────┘
*
* │ OB_MAX_TRANS_SERIALIZE_SIZE │
* └────────────────────────────────────────────┘
* Participants plus undo status are too large. Trans state table can not be dumped by flushing
* record log.Rollback it without returning error code because the follower cannot response to the
* leader if return error code here.
*
* ┌─────────────────┬───────────────┬────────────────┐
* CASE 3 : │ participants │ undo status │ redo log │
* └─────────────────┴───────────────┴────────────────┘
*
* │ OB_MAX_TRANS_SERIALIZE_SIZE │
* └────────────────────────────────────────────┘
* Flush record log can make trans state table be successfully dumped.
*/
int ObPartTransCtx::calc_serialize_size_and_set_participants_(const ObPartitionArray &participants) int ObPartTransCtx::calc_serialize_size_and_set_participants_(const ObPartitionArray &participants)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if ((ctx_serialize_size_ += participants.get_serialize_size()) > OB_MAX_TRANS_SERIALIZE_SIZE) { int64_t tmp_participants_size = participants_serialize_size_ + participants.get_serialize_size();
bool has_redo_log = false;
bool need_submit_record_log = false;
#ifdef ERRSIM
// test if this function can handle participants size overflow successfully
if (OB_FAIL(E(EventTable::EN_PARTICIPANTS_SIZE_OVERFLOW) OB_SUCCESS)) {
OB_MAX_TRANS_SERIALIZE_SIZE = 1500;
OB_MIN_REDO_LOG_SERIALIZE_SIZE = 500;
} else if (OB_FAIL(E(EventTable::EN_PART_PLUS_UNDO_OVERFLOW) OB_SUCCESS)) {
OB_MAX_TRANS_SERIALIZE_SIZE = 7500;
OB_MIN_REDO_LOG_SERIALIZE_SIZE = 500;
}
OB_MAX_UNDO_ACTION_SERIALIZE_SIZE = OB_MAX_TRANS_SERIALIZE_SIZE - OB_MIN_REDO_LOG_SERIALIZE_SIZE;
TRANS_LOG(INFO, "ERRSIM modify trans ctx serialize size ", K(OB_MAX_TRANS_SERIALIZE_SIZE),
K(OB_MIN_REDO_LOG_SERIALIZE_SIZE), K(OB_MAX_UNDO_ACTION_SERIALIZE_SIZE));
ret = OB_SUCCESS;
#endif
if (OB_UNLIKELY(tmp_participants_size > OB_MAX_TRANS_SERIALIZE_SIZE)) {
// case 1
ret = OB_ERR_UNEXPECTED;
int64_t participants_count = participants.count();
TRANS_LOG(ERROR, "participants is unexpected too large.", KR(ret), K(participants_count),
K(tmp_participants_size), KPC(this));
} else if (OB_UNLIKELY(tmp_participants_size + undo_serialize_size_
> OB_MAX_TRANS_SERIALIZE_SIZE)) {
// case 2
// undo_serialize_size_ is an estimate value. Here we update undo serialize size before checking
// because a large range undo action can remove some little range undo actions.
undo_serialize_size_ = undo_status_.get_serialize_size();
if (tmp_participants_size + undo_serialize_size_ > OB_MAX_TRANS_SERIALIZE_SIZE) {
TRANS_LOG(WARN, "transaction is too large. flush record log can not handle it",
K(tmp_participants_size), K(participants_serialize_size_), K(undo_serialize_size_),
K(OB_MAX_TRANS_SERIALIZE_SIZE), K(trans_id_), KPC(this));
set_status_(OB_TRANS_NEED_ROLLBACK); set_status_(OB_TRANS_NEED_ROLLBACK);
ret = OB_SIZE_OVERFLOW;
TRANS_LOG(WARN, // Reset undo status to make trans state table can be dumped.
"size overflow when set participants.", undo_status_.reset();
KR(ret), undo_serialize_size_ = 0;
K(ctx_serialize_size_), }
K(participants.get_serialize_size())); } else {
// normal case, set participants directly
}
if (OB_SUCC(ret)
&& OB_UNLIKELY(tmp_participants_size + undo_serialize_size_ + redo_log_id_serialize_size_
> OB_MAX_TRANS_SERIALIZE_SIZE)) {
// case 3
need_submit_record_log = true;
int64_t participants_size = tmp_participants_size;
int64_t real_undo_size = undo_status_.get_serialize_size();
int64_t total_size = participants_size + redo_log_id_serialize_size_ + undo_serialize_size_;
TRANS_LOG(INFO, "flush record log to reserve space for participants", K(participants_size),
K(undo_serialize_size_), K(real_undo_size), K(redo_log_id_serialize_size_),
K(total_size), K(OB_MAX_TRANS_SERIALIZE_SIZE), KPC(this));
}
if (OB_FAIL(ret)) {
// participants is too large, this function can not handle it
} else if (OB_UNLIKELY(need_submit_record_log)
&& OB_FAIL(submit_log_async_(OB_LOG_TRANS_RECORD, has_redo_log))) {
TRANS_LOG(WARN, "submit record log failed", KR(ret), KPC(this));
} else if (OB_FAIL(set_participants_(participants))) { } else if (OB_FAIL(set_participants_(participants))) {
ctx_serialize_size_ -= participants.get_serialize_size(); TRANS_LOG(WARN, "set participants error", KR(ret), KPC(this), K(participants));
TRANS_LOG(WARN, "set participants error", KR(ret), K(participants)); } else {
participants_serialize_size_ = tmp_participants_size;
} }
return ret; return ret;
} }
int ObPartTransCtx::calc_serialize_size_and_set_undo_(const int64_t undo_to, const int64_t undo_from) /*
* There are two cases when set undo status. We calculate serialize size and do something to avoid
* dumping trans state table fail.
*
* We increase undo_serialize_size_ every time we set undo. But if the undo_serialize_size is too
* large and the transaction need rollback, we update this variable to get a real size.
*
* ┌────────────────────────────────────────┐
* CASE 4 : │ undo status │
* └────────────────────────────────────────┘
*
* │ OB_MAX_UNDO_ACTION_SERIALIZE_SIZE │
* └────────────────────────────────────┘
*
* │ OB_MAX_TRANS_SERIALIZE_SIZE │
* └────────────────────────────────────────────┘
* Undo status are too large, rollback this transaction.
*
* ┌───────────────────────────┬───────────────────────┐
* CASE 5 : │ undo status │ redo log │
* └───────────────────────────┴───────────────────────┘
*
* │ OB_MAX_UNDO_ACTION_SERIALIZE_SIZE │
* └────────────────────────────────────┘
*
* │ OB_MAX_TRANS_SERIALIZE_SIZE │
* └────────────────────────────────────────────┘
* Flush record log can make trans state table be successfully dumped.
*/
int ObPartTransCtx::calc_serialize_size_and_set_undo_(const int64_t undo_to,
const int64_t undo_from)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObUndoAction undo_action(undo_to, undo_from); ObUndoAction undo_action(undo_to, undo_from);
if ((ctx_serialize_size_ += undo_action.get_serialize_size()) > OB_MAX_TRANS_SERIALIZE_SIZE) { undo_serialize_size_ += undo_action.get_serialize_size();
bool has_redo_log = false;
bool updated_size = false;
#ifdef ERRSIM
// test if this function can handle participants size overflow successfully
if (OB_FAIL(E(EventTable::EN_UNDO_ACTIONS_SIZE_OVERFLOW) OB_SUCCESS)) {
OB_MAX_TRANS_SERIALIZE_SIZE = 4100;
OB_MIN_REDO_LOG_SERIALIZE_SIZE = 1500;
TRANS_LOG(INFO, "ERRSIM modify trans ctx serialize size for case 4", K(OB_MAX_TRANS_SERIALIZE_SIZE),
K(OB_MIN_REDO_LOG_SERIALIZE_SIZE));
}
OB_MAX_UNDO_ACTION_SERIALIZE_SIZE = OB_MAX_TRANS_SERIALIZE_SIZE - OB_MIN_REDO_LOG_SERIALIZE_SIZE;
ret = OB_SUCCESS;
#endif
if (OB_UNLIKELY(undo_serialize_size_ > OB_MAX_UNDO_ACTION_SERIALIZE_SIZE)) {
// undo_serialize_size_ is an estimate value. Here we update undo serialize size before checking
// because a large range undo action can remove some little range undo actions.
undo_serialize_size_ = undo_status_.get_serialize_size();
updated_size = true;
}
if (undo_serialize_size_ > OB_MAX_UNDO_ACTION_SERIALIZE_SIZE) {
// case 4
set_status_(OB_TRANS_NEED_ROLLBACK); set_status_(OB_TRANS_NEED_ROLLBACK);
ret = OB_SIZE_OVERFLOW; ret = OB_SIZE_OVERFLOW;
TRANS_LOG(WARN, TRANS_LOG(WARN, "size overflow when set undo action", KR(ret), K(undo_serialize_size_),
"size overflow when set undo action", K(redo_log_id_serialize_size_), K(participants_serialize_size_),
KR(ret), K(OB_MAX_UNDO_ACTION_SERIALIZE_SIZE), KPC(this));
K(ctx_serialize_size_),
K(ctx_serialize_size_), } else if (OB_UNLIKELY(undo_serialize_size_ + redo_log_id_serialize_size_
K(OB_MAX_TRANS_SERIALIZE_SIZE)); > OB_MAX_TRANS_SERIALIZE_SIZE)) {
// case 5
TRANS_LOG(INFO, "flush record log to reserve space for undo", K(undo_serialize_size_),
K(redo_log_id_serialize_size_), K(OB_MAX_TRANS_SERIALIZE_SIZE));
if (OB_FAIL(submit_log_async_(OB_LOG_TRANS_RECORD, has_redo_log))) {
TRANS_LOG(WARN, "submit record log failed", KR(ret), K(undo_serialize_size_),
K(redo_log_id_serialize_size_), K(OB_MAX_TRANS_SERIALIZE_SIZE), KPC(this));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(undo_status_.undo(undo_to, undo_from))) { } else if (OB_FAIL(undo_status_.undo(undo_to, undo_from))) {
ctx_serialize_size_ -= undo_action.get_serialize_size(); TRANS_LOG(WARN, "record rollback action failed", KR(ret), K(undo_action), KPC(this));
TRANS_LOG(WARN, "record rollback action failed", K(ret), K(undo_to), K(undo_from)); }
if (OB_FAIL(ret) && !updated_size) {
undo_serialize_size_ -= undo_action.get_serialize_size();
} }
return ret; return ret;
} }
......
...@@ -405,7 +405,8 @@ public: ...@@ -405,7 +405,8 @@ public:
K(mt_ctx_.get_checksum_log_ts()), K_(is_changing_leader), K_(has_trans_state_log), K(mt_ctx_.get_checksum_log_ts()), K_(is_changing_leader), K_(has_trans_state_log),
K_(is_trans_state_sync_finished), K_(status), K_(same_leader_batch_partitions_count), K_(is_hazardous_ctx), K_(is_trans_state_sync_finished), K_(status), K_(same_leader_batch_partitions_count), K_(is_hazardous_ctx),
K(mt_ctx_.get_callback_count()), K_(in_xa_prepare_state), K_(is_listener), K_(last_replayed_redo_log_id), K(mt_ctx_.get_callback_count()), K_(in_xa_prepare_state), K_(is_listener), K_(last_replayed_redo_log_id),
K_(status), K_(is_xa_trans_prepared), K_(ctx_serialize_size)); K_(status), K_(is_xa_trans_prepared), K_(redo_log_id_serialize_size), K_(participants_serialize_size),
K_(undo_serialize_size));
public: public:
static const int64_t OP_LOCAL_NUM = 16; static const int64_t OP_LOCAL_NUM = 16;
...@@ -752,7 +753,9 @@ ObTransSubmitLogCb submit_log_cb_; ...@@ -752,7 +753,9 @@ ObTransSubmitLogCb submit_log_cb_;
bool is_xa_trans_prepared_; bool is_xa_trans_prepared_;
bool has_write_or_replay_mutator_redo_log_; bool has_write_or_replay_mutator_redo_log_;
bool is_in_redo_with_prepare_; bool is_in_redo_with_prepare_;
int64_t ctx_serialize_size_; int64_t redo_log_id_serialize_size_;
int64_t participants_serialize_size_;
int64_t undo_serialize_size_;
// the log id of prev checkpoint log // the log id of prev checkpoint log
uint64_t prev_checkpoint_id_; uint64_t prev_checkpoint_id_;
}; };
......
...@@ -2500,6 +2500,10 @@ int ObTransService::end_stmt(bool is_rollback, bool is_incomplete, const ObParti ...@@ -2500,6 +2500,10 @@ int ObTransService::end_stmt(bool is_rollback, bool is_incomplete, const ObParti
} }
} }
if (OB_SUCC(ret) && OB_FAIL(trans_desc.check_participants_size())) {
trans_desc.set_need_rollback();
TRANS_LOG(WARN, "check participants size failed.", KR(ret));
}
REC_TRACE_EXT(tlog, end_stmt, Y(ret)); REC_TRACE_EXT(tlog, end_stmt, Y(ret));
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
trans_desc.set_need_print_trace_log(); trans_desc.set_need_print_trace_log();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册