提交 784e4487 编写于 作者: F felix-w15 提交者: ob-robot

submit log interface to nonblcok

上级 3418e7ad
......@@ -745,7 +745,12 @@ int SimpleServerHelper::write(sqlclient::ObISQLConnection *conn, const char *sql
return conn->execute_write(OB_SYS_TENANT_ID, sql, affected_rows);
}
int InjectTxFaultHelper::submit_log(const char *buf, const int64_t size, const share::SCN &base_ts, ObTxBaseLogCb *cb, const bool need_nonblock)
int InjectTxFaultHelper::submit_log(const char *buf,
const int64_t size,
const share::SCN &base_ts,
ObTxBaseLogCb *cb,
const bool need_nonblock,
const int64_t retry_timeout_us)
{
int ret = OB_SUCCESS;
......
......@@ -95,7 +95,8 @@ public:
const int64_t size,
const share::SCN &base_ts,
ObTxBaseLogCb *cb,
const bool need_nonblock) override;
const bool need_nonblock,
const int64_t retry_timeout_us = 1000) override;
private:
transaction::ObLSTxCtxMgr *mgr_;
hash::ObHashMap<ObTransID, ObTxLogType> tx_injects_;
......
......@@ -646,11 +646,29 @@ int ObPartTransCtx::handle_timeout(const int64_t delay)
if (!is_follower_() && is_committing_()) {
if (is_local_tx_()) {
try_submit_next_log_();
} else if (ObTxState::PREPARE > get_upstream_state() ) {
ObTxState next_state = (is_sub2pc() || exec_info_.is_dup_tx_) ?
ObTxState::REDO_COMPLETE :
ObTxState::PREPARE;
if (OB_FAIL(drive_self_2pc_phase(next_state))) {
TRANS_LOG(WARN, "drive to next phase failed", K(ret), K(*this), K(next_state));
} else if (OB_FAIL(ObTxCycleTwoPhaseCommitter::handle_timeout())) {
TRANS_LOG(WARN, "handle 2pc timeout failed", KR(ret), KPC(this));
}
} else if (OB_FAIL(ObTxCycleTwoPhaseCommitter::handle_timeout())) {
TRANS_LOG(WARN, "handle 2pc timeout failed", KR(ret), KPC(this));
}
}
// retry submit abort log
if (!is_follower_()
&& get_upstream_state() == ObTxState::ABORT
&& get_upstream_state() != get_downstream_state()) {
if (OB_FAIL(compensate_abort_log_())) {
TRANS_LOG(WARN, "compensate abort log failed", KR(ret), KPC(this));
}
}
// if not committing, abort txn if it was expired
if (!is_follower_() && !is_committing_() && tx_expired) {
if (OB_FAIL(abort_(OB_TRANS_TIMEOUT))) {
......@@ -2042,6 +2060,11 @@ int ObPartTransCtx::compensate_abort_log_()
} else if(OB_FALSE_IT(sub_state_.set_force_abort())) {
} else if (OB_FAIL(submit_log_impl_(ObTxLogType::TX_ABORT_LOG))) {
int tmp_ret = OB_SUCCESS;
if (OB_EAGAIN == ret && OB_TMP_FAIL(restart_2pc_trans_timer_())) {
TRANS_LOG(WARN, "restart_2pc_trans_timer_ for submit abort log fail",
KR(ret), KR(tmp_ret), KPC(this));
}
TRANS_LOG(WARN, "submit abort log failed", KR(ret), K(*this));
} else {
}
......@@ -3564,6 +3587,7 @@ int ObPartTransCtx::submit_commit_log_()
int ObPartTransCtx::submit_abort_log_()
{
int ret = OB_SUCCESS;
set_upstream_state(ObTxState::ABORT);
ObTxLogCb *log_cb = NULL;
ObTxLogBlock log_block;
const int64_t replay_hint = trans_id_.get_id();
......@@ -3621,7 +3645,7 @@ int ObPartTransCtx::submit_abort_log_()
}
} else if (OB_FAIL(acquire_ctx_ref_())) {
TRANS_LOG(ERROR, "acquire ctx ref failed", KR(ret), K(*this));
} else if (OB_FAIL(submit_log_block_out_(log_block, SCN::min_scn(), log_cb, replay_hint, barrier))) {
} else if (OB_FAIL(submit_log_block_out_(log_block, SCN::min_scn(), log_cb, replay_hint, barrier, 50 * 1000))) {
TRANS_LOG(WARN, "submit log to clog adapter failed", KR(ret), K(*this));
return_log_cb_(log_cb);
log_cb = NULL;
......@@ -3876,7 +3900,8 @@ int ObPartTransCtx::submit_log_block_out_(ObTxLogBlock &log_block,
const share::SCN &base_scn,
ObTxLogCb *&log_cb,
const int64_t replay_hint,
const logservice::ObReplayBarrierType barrier)
const logservice::ObReplayBarrierType barrier,
const int64_t retry_timeout_us)
{
int ret = OB_SUCCESS;
bool is_2pc_state_log = false;
......@@ -3899,7 +3924,8 @@ int ObPartTransCtx::submit_log_block_out_(ObTxLogBlock &log_block,
log_block.get_size(),
base_scn,
log_cb,
false))) {
true,
retry_timeout_us))) {
busy_cbs_.add_last(log_cb);
}
}
......@@ -3933,8 +3959,19 @@ int ObPartTransCtx::submit_log_impl_(const ObTxLogType log_type)
case ObTxLogType::TX_PREPARE_LOG: {
// try generate prepare verison
ret = generate_prepare_version_();
if (OB_SUCC(ret) && mt_ctx_.is_prepared()) {
if (get_upstream_state() < ObTxState::PREPARE) {
if (OB_FAIL(drive_self_2pc_phase(ObTxState::PREPARE))) {
TRANS_LOG(WARN, "drive 2pc prepare phase failed", K(ret), K(*this));
}
} else if (get_upstream_state() > ObTxState::PREPARE ||
get_downstream_state() >= ObTxState::PREPARE) {
TRANS_LOG(INFO, "we need not submit prepare log after the prepare state", K(*this));
}
if (OB_SUCC(ret) &&
mt_ctx_.is_prepared() &&
get_upstream_state() == ObTxState::PREPARE &&
get_downstream_state() < ObTxState::PREPARE &&
!is_2pc_logging()) {
ret = submit_prepare_log_();
}
break;
......@@ -7145,7 +7182,7 @@ int ObPartTransCtx::register_multi_data_source(const ObTxDataSourceType data_sou
!= (tmp_ret = submit_log_impl_(ObTxLogType::TX_MULTI_DATA_SOURCE_LOG))) {
if (tmp_ret == OB_NOT_MASTER) {
ret = OB_TRANS_NEED_ROLLBACK;
} else if (tmp_ret == OB_TX_NOLOGCB) {
} else if (tmp_ret == OB_TX_NOLOGCB || tmp_ret == OB_EAGAIN) {
ret = OB_SUCCESS;
if (register_flag.need_flush_redo_instantly_) {
mds_cache_.set_need_retry_submit_mds(true);
......@@ -8426,8 +8463,13 @@ int ObPartTransCtx::do_local_commit_tx_()
}
} else if (OB_FAIL(submit_log_impl_(ObTxLogType::TX_COMMIT_LOG))) {
// log submitting will retry in handle_timeout
TRANS_LOG(WARN, "submit commit log fail, will retry later", KR(ret), KPC(this));
ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(restart_2pc_trans_timer_())) {
TRANS_LOG(WARN, "restart_2pc_trans_timer_ error", KR(ret), KR(tmp_ret), KPC(this));
ret = OB_EAGAIN;
} else {
ret = OB_SUCCESS;
}
}
return ret;
......
......@@ -497,7 +497,8 @@ private:
const share::SCN &base_scn,
ObTxLogCb *&log_cb,
const int64_t replay_hint = 0,
const ObReplayBarrierType barrier = ObReplayBarrierType::NO_NEED_BARRIER);
const ObReplayBarrierType barrier = ObReplayBarrierType::NO_NEED_BARRIER,
const int64_t retry_timeout_us = 1000);
int after_submit_log_(ObTxLogBlock &log_block,
ObTxLogCb *log_cb,
memtable::ObRedoLogSubmitHelper *redo_helper);
......
......@@ -1214,13 +1214,8 @@ int ObTxCycleTwoPhaseCommitter::decide_2pc_log_type_(bool &need_submit,
break;
}
case ObTxState::REDO_COMPLETE: {
if (is_sub2pc()) {
need_submit = true;
log_type = ObTwoPhaseCommitLogType::OB_LOG_TX_COMMIT_INFO;
} else {
ret = OB_TRANS_INVALID_STATE;
TRANS_LOG(ERROR, "invalid 2pc state", KR(ret), KPC(this));
}
need_submit = true;
log_type = ObTwoPhaseCommitLogType::OB_LOG_TX_COMMIT_INFO;
break;
}
case ObTxState::PREPARE: {
......
......@@ -1299,7 +1299,7 @@ int ObTransService::ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx,
bool blockable = expire_ts > 0;
do {
ret = part_ctx->rollback_to_savepoint(op_sn, specified_from_scn, savepoint, tx_seq_base, downstream_parts);
if (OB_NEED_RETRY == ret && blockable) {
if ((OB_NEED_RETRY == ret || OB_EAGAIN == ret) && blockable) {
if (ObTimeUtility::current_time() >= expire_ts) {
ret = OB_TIMEOUT;
TRANS_LOG(WARN, "can not retry rollback_to because of timeout", K(ret), K(retry_cnt));
......@@ -1311,7 +1311,7 @@ int ObTransService::ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx,
ob_usleep(50 * 1000);
}
}
} while (OB_NEED_RETRY == ret && blockable && !part_ctx->is_transfer_deleted());
} while ((OB_NEED_RETRY == ret || OB_EAGAIN == ret) && blockable && !part_ctx->is_transfer_deleted());
#ifndef NDEBUG
TRANS_LOG(INFO, "rollback to savepoint sync", K(ret),
K(part_ctx->get_trans_id()), K(part_ctx->get_ls_id()), K(retry_cnt),
......
......@@ -120,37 +120,59 @@ int ObLSTxLogAdapter::submit_log(const char *buf,
const int64_t size,
const SCN &base_scn,
ObTxBaseLogCb *cb,
const bool need_nonblock)
const bool need_nonblock,
const int64_t retry_timeout_us)
{
int ret = OB_SUCCESS;
palf::LSN lsn;
SCN scn;
int64_t cur_ts = ObTimeUtility::current_time();
int64_t retry_cnt = 0;
const bool is_big_log = (size > palf::MAX_NORMAL_LOG_BODY_SIZE);
const bool allow_compression = true;
int64_t cur_ts = ObClockGenerator::getClock();
if (NULL == buf || 0 >= size || OB_ISNULL(cb) || !base_scn.is_valid() || size > palf::MAX_LOG_BODY_SIZE ||
if (NULL == buf || 0 >= size || OB_ISNULL(cb) || !base_scn.is_valid()
|| retry_timeout_us < 0 || size > palf::MAX_LOG_BODY_SIZE ||
base_scn.convert_to_ts() > cur_ts + 86400000000L) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), KP(buf), K(size), K(base_scn), KP(cb));
} else if (OB_ISNULL(log_handler_) || !log_handler_->is_valid()) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), KP(log_handler_));
} else if (is_big_log && OB_FAIL(log_handler_->append_big_log(buf, size, base_scn, need_nonblock,
allow_compression, cb, lsn, scn))) {
TRANS_LOG(WARN, "append big log to palf failed", K(ret), KP(log_handler_), KP(buf), K(size), K(base_scn),
} else {
static const int64_t MAX_SLEEP_US = 100;
int64_t retry_cnt = 0;
int64_t sleep_us = 0;
const int64_t expire_us = cur_ts + retry_timeout_us;
do {
if (is_big_log && OB_FAIL(log_handler_->append_big_log(buf, size, base_scn, need_nonblock,
allow_compression, cb, lsn, scn))) {
TRANS_LOG(WARN, "append big log to palf failed", K(ret), KP(log_handler_), KP(buf), K(size), K(base_scn),
K(need_nonblock), K(is_big_log));
} else if (!is_big_log && OB_FAIL(log_handler_->append(buf, size, base_scn, need_nonblock,
} else if (!is_big_log && OB_FAIL(log_handler_->append(buf, size, base_scn, need_nonblock,
allow_compression, cb, lsn, scn))) {
TRANS_LOG(WARN, "append log to palf failed", K(ret), KP(log_handler_), KP(buf), K(size), K(base_scn),
TRANS_LOG(WARN, "append log to palf failed", K(ret), KP(log_handler_), KP(buf), K(size), K(base_scn),
K(need_nonblock));
} else {
cb->set_base_ts(base_scn);
cb->set_lsn(lsn);
cb->set_log_ts(scn);
cb->set_submit_ts(cur_ts);
ObTransStatistic::get_instance().add_clog_submit_count(MTL_ID(), 1);
ObTransStatistic::get_instance().add_trans_log_total_size(MTL_ID(), size);
} else {
cb->set_base_ts(base_scn);
cb->set_lsn(lsn);
cb->set_log_ts(scn);
cb->set_submit_ts(cur_ts);
ObTransStatistic::get_instance().add_clog_submit_count(MTL_ID(), 1);
ObTransStatistic::get_instance().add_trans_log_total_size(MTL_ID(), size);
}
if (!need_nonblock) {
// retries are not needed in block mode.
break;
} else if (OB_EAGAIN == ret) {
retry_cnt++;
sleep_us = retry_cnt * 10;
sleep_us = sleep_us > MAX_SLEEP_US ? MAX_SLEEP_US : sleep_us;
ob_usleep(sleep_us);
cur_ts = ObTimeUtility::current_time();
}
} while (OB_EAGAIN == ret && cur_ts < expire_us);
}
TRANS_LOG(DEBUG, "ObLSTxLogAdapter::submit_ls_log", KR(ret), KP(cb));
......
......@@ -63,7 +63,8 @@ public:
const int64_t size,
const share::SCN &base_ts,
ObTxBaseLogCb *cb,
const bool need_nonblock) = 0;
const bool need_nonblock,
const int64_t retry_timeout_us = 1000) = 0;
virtual int get_role(bool &is_leader, int64_t &epoch) = 0;
virtual int get_max_decided_scn(share::SCN &scn) = 0;
......@@ -108,7 +109,8 @@ public:
const int64_t size,
const share::SCN &base_ts,
ObTxBaseLogCb *cb,
const bool need_nonblock);
const bool need_nonblock,
const int64_t retry_timeout_us = 1000);
int get_role(bool &is_leader, int64_t &epoch);
int get_max_decided_scn(share::SCN &scn);
int get_palf_committed_max_scn(share::SCN &scn) const;
......
......@@ -497,7 +497,8 @@ public:
const int64_t size,
const share::SCN &base_ts,
ObTxBaseLogCb *cb,
const bool need_nonblock)
const bool need_nonblock,
const int64_t retry_timeout_us)
{
int ret = OB_SUCCESS;
logservice::ObLogBaseHeader base_header;
......
......@@ -108,7 +108,8 @@ int MockTxLogAdapter::submit_log(const char *buf,
const int64_t size,
const share::SCN &base_ts,
ObTxBaseLogCb *cb,
const bool need_nonblock)
const bool need_nonblock,
const int64_t retry_timeout_us)
{
int ret = OB_SUCCESS;
int64_t ts = 0;
......
......@@ -68,7 +68,8 @@ public:
const int64_t size,
const share::SCN &base_ts,
ObTxBaseLogCb *cb,
const bool need_block);
const bool need_block,
const int64_t retry_timeout_us = 1000);
int get_role(bool &is_leader, int64_t &epoch);
int get_max_decided_scn(share::SCN &scn)
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册