diff --git a/src/storage/transaction/ob_trans_ctx.cpp b/src/storage/transaction/ob_trans_ctx.cpp index 4cc05ac060de51356c2bdb04bfbba68cc2904d48..936e05163d7b4e66471ac6fe803a0fc4db261ddc 100644 --- a/src/storage/transaction/ob_trans_ctx.cpp +++ b/src/storage/transaction/ob_trans_ctx.cpp @@ -651,7 +651,8 @@ int ObDistTransCtx::init(const uint64_t tenant_id, const ObTransID& trans_id, co TRANS_LOG(WARN, "ObTransCtx inited error", KR(ret)); } else { set_state_(Ob2PCState::INIT); - trans_2pc_timeout_ = ObServerConfig::get_instance().trx_2pc_retry_interval; + trans_2pc_timeout_ = + std::min((int64_t)ObServerConfig::get_instance().trx_2pc_retry_interval, (int64_t)MAX_TRANS_2PC_TIMEOUT_US); } return ret; diff --git a/src/storage/transaction/ob_trans_ctx.h b/src/storage/transaction/ob_trans_ctx.h index 57d442a607b88e3806504c9a8decf1ee55514094..d7e216de2dfe6d89650128906c23023f1dae7eff 100644 --- a/src/storage/transaction/ob_trans_ctx.h +++ b/src/storage/transaction/ob_trans_ctx.h @@ -491,9 +491,8 @@ public: // then the total size of trans_ctx preallocated by resource pool is 200B * 100 * 1000 = 20MB. // Taking the concurrency num into consideration, obviously, it is appropriate. static const int64_t RP_TOTAL_NUM = 100 * 1000; - -protected: static const int64_t MAX_TRANS_2PC_TIMEOUT_US = 3 * 1000 * 1000; // 3s +protected: // if 600 seconds after trans timeout, warn is required static const int64_t OB_TRANS_WARN_USE_TIME = 600 * 1000 * 1000; // 0x0078746365657266 means freectx diff --git a/src/storage/transaction/ob_trans_part_ctx.cpp b/src/storage/transaction/ob_trans_part_ctx.cpp index 9978ce9bf0920169a59118678dd87e111c812a82..981b1096b0f47146a1b8f6a52c742dcd80fe1a14 100644 --- a/src/storage/transaction/ob_trans_part_ctx.cpp +++ b/src/storage/transaction/ob_trans_part_ctx.cpp @@ -1238,6 +1238,7 @@ int ObPartTransCtx::handle_message(const ObTransMsg& msg) } else { set_stc_by_now_(); } + const int64_t tmp_config = ObServerConfig::get_instance().trx_2pc_retry_interval; // TODO do not set scheduler/coordinator/participants if state is init if (OB_FAIL(set_app_trace_info_(msg.get_app_trace_info()))) { TRANS_LOG(WARN, "set app trace info error", K(ret), K(msg), K(*this)); @@ -1252,9 +1253,9 @@ int ObPartTransCtx::handle_message(const ObTransMsg& msg) } else if (OB_FAIL(handle_2pc_prepare_request_(msg))) { TRANS_LOG(WARN, "handle 2pc preprare request error", KR(ret), "context", *this, K(msg)); } else if (OB_FAIL(unregister_timeout_task_())) { - TRANS_LOG(WARN, "unregister timeout handler error", KR(ret), "context", *this); - } else if (OB_FAIL(register_timeout_task_(ObServerConfig::get_instance().trx_2pc_retry_interval))) { - TRANS_LOG(WARN, "register timeout handler error", KR(ret), "context", *this); + TRANS_LOG(WARN, "unregister timeout handler error", K(ret), "context", *this); + } else if (OB_FAIL(register_timeout_task_(std::min(tmp_config, (int64_t)MAX_TRANS_2PC_TIMEOUT_US)))) { + TRANS_LOG(WARN, "register timeout handler error", K(ret), "context", *this); } else { // do nothing } @@ -2531,19 +2532,20 @@ int ObPartTransCtx::leader_active(const LeaderActiveArg& arg) // The request_id_ should be initialized to prevent the 2pc cannot be // driven if all participants transferring the leader generate_request_id_(); + trans_2pc_timeout_ = ObServerConfig::get_instance().trx_2pc_retry_interval; if (Ob2PCState::INIT == get_state_()) { const int64_t left_time = trans_expired_time_ - ObClockGenerator::getRealClock(); if (left_time > 0) { trans_2pc_timeout_ = left_time; } else { - trans_2pc_timeout_ = ObServerConfig::get_instance().trx_2pc_retry_interval; + trans_2pc_timeout_ = std::min(trans_2pc_timeout_, (int64_t)MAX_TRANS_2PC_TIMEOUT_US); } // The XA txn has replayed the last redo log if (is_xa_local_trans() && is_redo_prepared_) { - trans_2pc_timeout_ = ObServerConfig::get_instance().trx_2pc_retry_interval; + trans_2pc_timeout_ = std::min(trans_2pc_timeout_, (int64_t)MAX_TRANS_2PC_TIMEOUT_US); } } else { - trans_2pc_timeout_ = ObServerConfig::get_instance().trx_2pc_retry_interval; + trans_2pc_timeout_ = std::min(trans_2pc_timeout_, (int64_t)MAX_TRANS_2PC_TIMEOUT_US); } // do not post transaction message to avoid deadlock, bug#8257026 // just register timeout task @@ -2739,14 +2741,15 @@ int ObPartTransCtx::commit(const bool is_rollback, sql::ObIEndTransCallback* cb, // - If the txn shouldnot keep the dependency with the predecessors, rollback the txn immediately // - If the dependecy is necessary, write the OB_LOG_SP_ELR_TRANS_COMMIT // - Current implementation write the OB_LOG_SP_ELR_TRANS_COMMIT if the predecessors exit + const int64_t tmp_config = ObServerConfig::get_instance().trx_2pc_retry_interval; if (OB_FAIL(set_app_trace_info_(app_trace_info))) { TRANS_LOG(WARN, "set app trace info error", K(ret), K(app_trace_info), K(*this)); } else if (OB_FAIL(generate_sp_commit_log_type_(log_type))) { TRANS_LOG(WARN, "generate sp commit log type error", K(ret), "context", *this); } else if (OB_FAIL(unregister_timeout_task_())) { - TRANS_LOG(WARN, "unregister timeout handler error", KR(ret), "context", *this); + TRANS_LOG(WARN, "unregister timeout handler error", K(ret), "context", *this); } else if (OB_FAIL(register_timeout_task_( - ObServerConfig::get_instance().trx_2pc_retry_interval + trans_id_.hash() % usec_per_sec))) { + std::min(tmp_config, (int64_t)MAX_TRANS_2PC_TIMEOUT_US) + trans_id_.hash() % usec_per_sec))) { TRANS_LOG(WARN, "register timeout handler error", KR(ret), "context", *this); } else if (OB_FAIL(submit_log_async_(log_type, has_redo_log))) { TRANS_LOG(WARN, "submit sp log error", KR(ret), "context", *this); @@ -6144,10 +6147,11 @@ int ObPartTransCtx::generate_redo_prepare_log_info(char* buf, const int64_t size // Record the commit version of the txn. If you can elr later, you // need to record it in the trans_result_info_mgr global_trans_version_ = commit_version; + int64_t tmp_config = ObServerConfig::get_instance().trx_2pc_retry_interval; if (OB_FAIL(unregister_timeout_task_())) { - TRANS_LOG(WARN, "unregister timeout handler error", KR(ret), "context", *this); - } else if (OB_FAIL(register_timeout_task_(ObServerConfig::get_instance().trx_2pc_retry_interval))) { - TRANS_LOG(WARN, "register timeout handler error", KR(ret), "context", *this); + TRANS_LOG(WARN, "unregister timeout handler error", K(ret), "context", *this); + } else if (OB_FAIL(register_timeout_task_(std::min(tmp_config, (int64_t)MAX_TRANS_2PC_TIMEOUT_US)))) { + TRANS_LOG(WARN, "register timeout handler error", K(ret), "context", *this); } else { // do nothing } @@ -8363,9 +8367,10 @@ int ObPartTransCtx::handle_2pc_request(const ObTrxMsgBase& msg, const int64_t ms } } if (OB_SUCC(ret)) { + const int64_t tmp_config = ObServerConfig::get_instance().trx_2pc_retry_interval; if (OB_FAIL(unregister_timeout_task_())) { TRANS_LOG(WARN, "unregister timeout handler error", K(ret), K(*this)); - } else if (OB_FAIL(register_timeout_task_(ObServerConfig::get_instance().trx_2pc_retry_interval))) { + } else if (OB_FAIL(register_timeout_task_(std::min(tmp_config, (int64_t)MAX_TRANS_2PC_TIMEOUT_US)))) { TRANS_LOG(WARN, "register timeout handler error", K(ret), K(*this)); } else { // do nothing @@ -8766,6 +8771,7 @@ int ObPartTransCtx::on_prepare_(const bool batch_committed, const int64_t timest set_state_(Ob2PCState::PREPARE); is_redo_prepared_ = true; trans_2pc_timeout_ = ObServerConfig::get_instance().trx_2pc_retry_interval; + trans_2pc_timeout_ = std::min(trans_2pc_timeout_, (int64_t)MAX_TRANS_2PC_TIMEOUT_US); if (batch_committed) { // Set up end_log_ts_ for 1pc end_log_ts_ = timestamp; diff --git a/src/storage/transaction/ob_trans_service.cpp b/src/storage/transaction/ob_trans_service.cpp index 01efb8fa7878696cc534f1e426a7dc9d1a881d85..13e09ab8509a4fd30112ee8d3e7cc812f5029955 100644 --- a/src/storage/transaction/ob_trans_service.cpp +++ b/src/storage/transaction/ob_trans_service.cpp @@ -3451,6 +3451,9 @@ int ObTransService::check_partition_status(const ObPartitionKey& partition) TRANS_LOG(WARN, "get participant status error", K(ret), K(partition)); } else if (OB_SUCCESS != clog_status) { ret = clog_status; + if (REACH_TIME_INTERVAL(ObTransCtx::MAX_TRANS_2PC_TIMEOUT_US)) { + (void)refresh_location_cache(partition, false); + } } else { // do nothing }