提交 16062de3 编写于 作者: G gl0 提交者: LINGuanRen

Limit the size of part trans ctx

上级 d6fa1441
......@@ -69,6 +69,9 @@ class AggreLogTask;
class ObPartTransCtxMgr;
class ObPartitionTransCtxMgr;
// Reserve 50KB to store the fields in trans ctx except undo_status, participants and redo_log
static const int64_t OB_MAX_TRANS_SERIALIZE_SIZE = common::OB_MAX_USER_ROW_LENGTH - 51200;
class ObTransErrsim {
public:
static inline bool is_memory_errsim()
......
......@@ -175,6 +175,8 @@ int ObPartTransCtx::init(const uint64_t tenant_id, const ObTransID& trans_id, co
}
is_listener_ = false;
listener_handler_ = NULL;
ctx_serialize_size_ = undo_status_.get_serialize_size() + partition_log_info_arr_.get_serialize_size() +
prev_redo_log_ids_.get_serialize_size();
}
if (OB_FAIL(ret)) {
if (NULL != redo_sync_task_) {
......@@ -388,6 +390,7 @@ void ObPartTransCtx::reset()
last_redo_log_mutator_size_ = 0;
has_write_or_replay_mutator_redo_log_ = false;
is_in_redo_with_prepare_ = false;
ctx_serialize_size_ = 0;
}
int ObPartTransCtx::construct_context(const ObTransMsg& msg)
......@@ -1209,7 +1212,7 @@ int ObPartTransCtx::handle_message(const ObTransMsg& msg)
case OB_TRANS_CLEAR_REQUEST: {
if (OB_FAIL(set_scheduler_(msg.get_scheduler()))) {
TRANS_LOG(WARN, "set scheduler error", KR(ret), K(msg));
} else if (OB_FAIL(set_participants_(msg.get_participants()))) {
} else if (OB_FAIL(calc_serialize_size_and_set_participants_(msg.get_participants()))) {
TRANS_LOG(WARN, "set participants error", KR(ret), K(msg));
} else if (OB_FAIL(handle_trans_clear_request_(msg))) {
TRANS_LOG(WARN, "handle trans clear request error", KR(ret), K(msg));
......@@ -1248,7 +1251,7 @@ int ObPartTransCtx::handle_message(const ObTransMsg& msg)
TRANS_LOG(WARN, "set scheduler error", KR(ret), K(msg));
} else if (OB_FAIL(set_coordinator_(msg.get_coordinator()))) {
TRANS_LOG(WARN, "set coordinator error", KR(ret), K(msg));
} else if (OB_FAIL(set_participants_(msg.get_participants()))) {
} else if (OB_FAIL(calc_serialize_size_and_set_participants_(msg.get_participants()))) {
TRANS_LOG(WARN, "set participants error", KR(ret), K(msg));
} else if (OB_FAIL(set_xid_(msg.get_xid()))) {
TRANS_LOG(WARN, "set xid error", KR(ret), K(msg));
......@@ -1296,7 +1299,7 @@ int ObPartTransCtx::handle_message(const ObTransMsg& msg)
TRANS_LOG(WARN, "set scheduler error", KR(ret), K(msg));
} else if (OB_FAIL(set_coordinator_(msg.get_coordinator()))) {
TRANS_LOG(WARN, "set coordinator error", KR(ret), K(msg));
} else if (OB_FAIL(set_participants_(msg.get_participants()))) {
} else if (OB_FAIL(calc_serialize_size_and_set_participants_(msg.get_participants()))) {
TRANS_LOG(WARN, "set participants error", KR(ret), K(msg));
} else {
// do nothing
......@@ -1680,7 +1683,7 @@ int ObPartTransCtx::on_sync_log_success(
// The log is completed, we need verify the txn checksum
need_checksum_ = true;
}
if (OB_FAIL(prev_redo_log_ids_.push_back(log_id))) {
if (OB_FAIL(calc_serialize_size_and_set_redo_log_(log_id))) {
TRANS_LOG(WARN, "sp redo log id push back error", KR(ret), "context", *this, K(log_id));
} else if (!not_need_write_next_log_(log_type) &&
OB_FAIL(submit_log_task_(OB_LOG_SP_TRANS_COMMIT, has_redo_log))) {
......@@ -1788,7 +1791,7 @@ int ObPartTransCtx::on_sync_log_success(
if (OB_LOG_TRANS_REDO == log_type) {
start_us = ObTimeUtility::fast_current_time();
// record the redo log id
if (!is_xa_last_empty_redo_log_() && OB_FAIL(prev_redo_log_ids_.push_back(log_id))) {
if (!is_xa_last_empty_redo_log_() && OB_FAIL(calc_serialize_size_and_set_redo_log_(log_id))) {
TRANS_LOG(WARN, "redo log id push back error", KR(ret), "context", *this, K(log_id));
} else if (not_need_write_next_log_(log_type)) {
// No need to write log for dup table in order to prevent the leader
......@@ -1950,7 +1953,7 @@ int ObPartTransCtx::on_sync_log_success(
need_checksum_ = true;
}
start_us = ObTimeUtility::fast_current_time();
if (OB_FAIL(prev_redo_log_ids_.push_back(log_id))) {
if (OB_FAIL(calc_serialize_size_and_set_redo_log_(log_id))) {
TRANS_LOG(WARN, "redo log id push back error", KR(ret), "context", *this, K(log_id));
} else if ((OB_LOG_TRANS_STATE & log_type) != 0) {
// do nothing
......@@ -7116,7 +7119,7 @@ int ObPartTransCtx::post_stmt_response_(
status,
request_id_))) {
TRANS_LOG(WARN, "message init error", K(ret), K_(scheduler), K_(tmp_scheduler), K(msg_type));
// 将request的发送时间戳记录到response中,用于scheduler对消息超时的校验
// record request timestamp into response for checking timeout in scheduler
} else if (OB_FAIL(msg.set_msg_timeout(request_timeout))) {
TRANS_LOG(INFO,
"set message start timestamp error",
......@@ -7856,7 +7859,7 @@ int ObPartTransCtx::handle_2pc_local_prepare_request(const int64_t request_id, c
TRANS_LOG(WARN, "set scheduler error", K(ret), K(scheduler), "context", *this);
} else if (OB_FAIL(set_coordinator_(coordinator))) {
TRANS_LOG(WARN, "set coordinator error", K(ret), K(coordinator), "context", *this);
} else if (OB_FAIL(set_participants_(participants))) {
} else if (OB_FAIL(calc_serialize_size_and_set_participants_(participants))) {
TRANS_LOG(WARN, "set participants error", K(ret), K(participants), "context", *this);
} else if (Ob2PCState::INIT != get_state_()) {
ret = OB_EAGAIN;
......@@ -8036,7 +8039,7 @@ int ObPartTransCtx::handle_2pc_pre_prepare_request(const int64_t prepare_version
TRANS_LOG(WARN, "set scheduler error", KR(ret), K(scheduler), "context", *this);
} else if (OB_FAIL(set_coordinator_(coordinator))) {
TRANS_LOG(WARN, "set coordinator error", KR(ret), K(coordinator), "context", *this);
} else if (OB_FAIL(set_participants_(participants))) {
} else if (OB_FAIL(calc_serialize_size_and_set_participants_(participants))) {
TRANS_LOG(WARN, "set participants error", KR(ret), K(participants), "context", *this);
} else if (Ob2PCState::INIT != get_state_()) {
ret = OB_EAGAIN;
......@@ -8373,7 +8376,7 @@ int ObPartTransCtx::handle_2pc_request(const ObTrxMsgBase& msg, const int64_t ms
TRANS_LOG(WARN, "set scheduler error", K(ret), K(*req));
} else if (OB_FAIL(set_coordinator_(req->coordinator_))) {
TRANS_LOG(WARN, "set coordinator error", K(ret), K(*req));
} else if (OB_FAIL(set_participants_(req->participants_))) {
} else if (OB_FAIL(calc_serialize_size_and_set_participants_(req->participants_))) {
TRANS_LOG(WARN, "set participants error", K(ret), K(*req));
} else {
// do nothing
......@@ -8427,7 +8430,7 @@ int ObPartTransCtx::handle_2pc_request(const ObTrxMsgBase& msg, const int64_t ms
// TRANS_LOG(WARN, "set scheduler error", K(ret), K(*req));
if (OB_FAIL(set_coordinator_(req->coordinator_))) {
TRANS_LOG(WARN, "set coordinator error", K(ret), K(*req));
} else if (OB_FAIL(set_participants_(req->participants_))) {
} else if (OB_FAIL(calc_serialize_size_and_set_participants_(req->participants_))) {
TRANS_LOG(WARN, "set participants error", K(ret), K(*req));
} else if (OB_FAIL(set_xid_(req->xid_))) {
TRANS_LOG(WARN, "set xid error", K(ret), K(*this), K(*req));
......@@ -10917,7 +10920,7 @@ int ObPartTransCtx::rollback_to_(const int32_t sql_no)
}
if (OB_SUCC(ret)) {
if (OB_FAIL(undo_status_.undo(sql_no, curr_sql_no))) {
if (OB_FAIL(calc_serialize_size_and_set_undo_(sql_no, curr_sql_no))) {
TRANS_LOG(WARN, "record rollback action failed", K(ret), K(sql_no), K(curr_sql_no));
}
}
......@@ -12079,5 +12082,58 @@ void ObPartTransCtx::DEBUG_SYNC_slow_txn_during_2pc_prepare_phase_for_physical_b
}
}
int ObPartTransCtx::calc_serialize_size_and_set_redo_log_(const int64_t log_id)
{
int ret = OB_SUCCESS;
if ((ctx_serialize_size_ += serialization::encoded_length_vi64(log_id)) > OB_MAX_TRANS_SERIALIZE_SIZE) {
ret = OB_SIZE_OVERFLOW;
TRANS_LOG(WARN, "size overflow when set redo log.", KR(ret), K(ctx_serialize_size_), K(log_id));
} else if (OB_FAIL(prev_redo_log_ids_.push_back(log_id))) {
ctx_serialize_size_ -= serialization::encoded_length_vi64(log_id);
TRANS_LOG(WARN, "sp redo log id push back error", KR(ret), "context", *this, K(log_id));
} else {
// push back redo log success
}
return ret;
}
int ObPartTransCtx::calc_serialize_size_and_set_participants_(const ObPartitionArray &participants)
{
int ret = OB_SUCCESS;
if ((ctx_serialize_size_ += participants.get_serialize_size()) > OB_MAX_TRANS_SERIALIZE_SIZE) {
set_status_(OB_TRANS_NEED_ROLLBACK);
ret = OB_SIZE_OVERFLOW;
TRANS_LOG(WARN,
"size overflow when set participants.",
KR(ret),
K(ctx_serialize_size_),
K(participants.get_serialize_size()));
} else if (OB_FAIL(set_participants_(participants))) {
ctx_serialize_size_ -= participants.get_serialize_size();
TRANS_LOG(WARN, "set participants error", KR(ret), K(participants));
}
return ret;
}
int ObPartTransCtx::calc_serialize_size_and_set_undo_(const int64_t undo_to, const int64_t undo_from)
{
int ret = OB_SUCCESS;
ObUndoAction undo_action(undo_to, undo_from);
if ((ctx_serialize_size_ += undo_action.get_serialize_size()) > OB_MAX_TRANS_SERIALIZE_SIZE) {
set_status_(OB_TRANS_NEED_ROLLBACK);
ret = OB_SIZE_OVERFLOW;
TRANS_LOG(WARN,
"size overflow when set undo action",
KR(ret),
K(ctx_serialize_size_),
K(ctx_serialize_size_),
K(OB_MAX_TRANS_SERIALIZE_SIZE));
} else if (OB_FAIL(undo_status_.undo(undo_to, undo_from))) {
ctx_serialize_size_ -= undo_action.get_serialize_size();
TRANS_LOG(WARN, "record rollback action failed", K(ret), K(undo_to), K(undo_from));
}
return ret;
}
} // namespace transaction
} // namespace oceanbase
......@@ -402,7 +402,7 @@ public:
K(mt_ctx_.get_checksum_log_ts()), K_(is_changing_leader), K_(has_trans_state_log),
K_(is_trans_state_sync_finished), K_(status), K_(same_leader_batch_partitions_count), K_(is_hazardous_ctx),
K(mt_ctx_.get_callback_count()), K_(in_xa_prepare_state), K_(is_listener), K_(last_replayed_redo_log_id),
K_(status), K_(is_xa_trans_prepared));
K_(status), K_(is_xa_trans_prepared), K_(ctx_serialize_size));
public:
static const int64_t OP_LOCAL_NUM = 16;
......@@ -608,6 +608,9 @@ private:
bool is_xa_last_empty_redo_log_() const;
int fake_kill_(const int64_t terminate_log_ts);
int kill_v2_(const int64_t terminate_log_ts);
int calc_serialize_size_and_set_redo_log_(const int64_t log_id);
int calc_serialize_size_and_set_participants_(const ObPartitionArray &participants);
int calc_serialize_size_and_set_undo_(const int64_t undo_to, const int64_t undo_from);
private:
DISALLOW_COPY_AND_ASSIGN(ObPartTransCtx);
......@@ -623,7 +626,7 @@ private:
private:
bool is_inited_;
ObIClogAdapter* clog_adapter_;
ObTransSubmitLogCb submit_log_cb_;
ObTransSubmitLogCb submit_log_cb_;
memtable::ObMemtableCtx mt_ctx_;
memtable::ObIMemtableCtxFactory* mt_ctx_factory_;
ObTransTaskWorker* big_trans_worker_;
......@@ -743,6 +746,7 @@ private:
bool is_xa_trans_prepared_;
bool has_write_or_replay_mutator_redo_log_;
bool is_in_redo_with_prepare_;
int64_t ctx_serialize_size_;
};
#if defined(__x86_64__)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册