提交 63a91b18 编写于 作者: Y yy0 提交者: wangzelin.wzl

[replay engine] removed __enable_block_receiving_clog and increased replay_concurrency from 4 to 64

上级 aa15788e
...@@ -186,7 +186,7 @@ public: ...@@ -186,7 +186,7 @@ public:
virtual int get_ilog_file_id_range(file_id_t& min_file_id, file_id_t& max_file_id) = 0; virtual int get_ilog_file_id_range(file_id_t& min_file_id, file_id_t& max_file_id) = 0;
virtual int query_next_ilog_file_id(file_id_t& next_ilog_file_id) = 0; virtual int query_next_ilog_file_id(file_id_t& next_ilog_file_id) = 0;
virtual int get_index_info_block_map(const file_id_t file_id, IndexInfoBlockMap& index_info_block_map) = 0; virtual int get_index_info_block_map(const file_id_t file_id, IndexInfoBlockMap& index_info_block_map) = 0;
virtual int check_need_block_log(bool& is_need) const = 0; virtual int check_need_block_log(const file_id_t cur_file_id, bool &is_need) const = 0;
// want_size refers to the length in clog, which may be the length after compression, and the returned data is after // want_size refers to the length in clog, which may be the length after compression, and the returned data is after
// decompression // decompression
......
...@@ -2146,18 +2146,13 @@ int ObLogEngine::check_need_freeze_based_on_used_space_(bool& is_need) const ...@@ -2146,18 +2146,13 @@ int ObLogEngine::check_need_freeze_based_on_used_space_(bool& is_need) const
return ret; return ret;
} }
int ObLogEngine::check_need_block_log(bool& is_need) const int ObLogEngine::check_need_block_log(const file_id_t cur_file_id, bool &is_need) const
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
is_need = false; is_need = false;
const uint32_t clog_max_file_id = clog_env_.get_max_file_id(); uint32_t clog_min_using_file_id = get_clog_min_using_file_id();
const uint32_t clog_min_file_id = clog_env_.get_min_file_id(); if (cur_file_id == clog_min_using_file_id) {
const uint32_t clog_min_using_file_id = clog_env_.get_min_using_file_id(); ret = check_need_freeze_based_on_used_space_(is_need);
// clog disk has been used
if (clog_min_file_id > 1) {
is_need = (clog_max_file_id - clog_min_using_file_id) * 100LL >
(clog_max_file_id - clog_min_file_id) * RESERVED_DISK_USAGE_PERFERT;
} }
return ret; return ret;
} }
......
...@@ -522,7 +522,7 @@ public: ...@@ -522,7 +522,7 @@ public:
int ensure_log_continuous_in_file_id_cache( int ensure_log_continuous_in_file_id_cache(
const common::ObPartitionKey& partition_key, const uint64_t log_id) override; const common::ObPartitionKey& partition_key, const uint64_t log_id) override;
int get_index_info_block_map(const file_id_t file_id, IndexInfoBlockMap& index_info_block_map) override; int get_index_info_block_map(const file_id_t file_id, IndexInfoBlockMap& index_info_block_map) override;
int check_need_block_log(bool& is_need) const override; int check_need_block_log(const file_id_t cur_file_id, bool &is_need) const;
int delete_all_ilog_files(); int delete_all_ilog_files();
ObLogCache* get_ilog_log_cache() override ObLogCache* get_ilog_log_cache() override
{ {
......
...@@ -4384,7 +4384,7 @@ bool ObLogSlidingWindow::check_can_receive_larger_log(const uint64_t log_id) ...@@ -4384,7 +4384,7 @@ bool ObLogSlidingWindow::check_can_receive_larger_log(const uint64_t log_id)
const uint64_t start_log_id = sw_.get_start_id(); const uint64_t start_log_id = sw_.get_start_id();
state_mgr_->report_start_id_trace(start_log_id); state_mgr_->report_start_id_trace(start_log_id);
CLOG_LOG(WARN, CLOG_LOG(WARN,
"check_can_receive_log, now can not recieve larger log", "check_can_receive_log, now can not receive larger log because of sliding window is full",
K_(partition_key), K_(partition_key),
K(log_id), K(log_id),
"next_index_log_id", "next_index_log_id",
...@@ -4395,26 +4395,6 @@ bool ObLogSlidingWindow::check_can_receive_larger_log(const uint64_t log_id) ...@@ -4395,26 +4395,6 @@ bool ObLogSlidingWindow::check_can_receive_larger_log(const uint64_t log_id)
} }
} }
} }
if (bool_ret && GCONF.__enable_block_receiving_clog) {
int64_t pending_submit_task_count = INT64_MAX;
int ret = OB_SUCCESS;
if (OB_FAIL(replay_engine_->get_pending_submit_task_count(partition_key_, pending_submit_task_count))) {
bool_ret = false;
CLOG_LOG(WARN, "failed to get_pending_submit_task_count", KR(ret), K_(partition_key));
} else {
bool_ret = pending_submit_task_count < follower_max_unconfirmed_threshold;
if (!bool_ret) {
if (partition_reach_time_interval(5 * 60 * 1000 * 1000, check_can_receive_larger_log_warn_time_)) {
CLOG_LOG(WARN,
"check_can_receive_log, now can not recieve larger log because of pending too many task to submit",
K_(partition_key),
K(log_id),
K(pending_submit_task_count));
}
}
}
}
return bool_ret; return bool_ret;
} }
......
...@@ -453,7 +453,7 @@ const int64_t OB_STATUS_LENGTH = 64; ...@@ -453,7 +453,7 @@ const int64_t OB_STATUS_LENGTH = 64;
/////////////////////////// ///////////////////////////
//// used for replay //// used for replay
const int64_t REPLAY_TASK_QUEUE_SIZE = 4; const int64_t REPLAY_TASK_QUEUE_SIZE = 64;
inline int64_t& get_replay_queue_index() inline int64_t& get_replay_queue_index()
{ {
static __thread int64_t replay_queue_index = -1; static __thread int64_t replay_queue_index = -1;
......
...@@ -38,8 +38,8 @@ TG_DEF(PartSerSlogWr, PartSerSlogWr, "", TG_STATIC, QUEUE_THREAD, ...@@ -38,8 +38,8 @@ TG_DEF(PartSerSlogWr, PartSerSlogWr, "", TG_STATIC, QUEUE_THREAD,
TG_DEF(LogScan, LogScan, "", TG_STATIC, TIMER_GROUP, TG_DEF(LogScan, LogScan, "", TG_STATIC, TIMER_GROUP,
ThreadCountPair(clog::ObLogScanRunnable::MAX_THREAD_CNT, clog::ObLogScanRunnable::MINI_MODE_THREAD_CNT)) ThreadCountPair(clog::ObLogScanRunnable::MAX_THREAD_CNT, clog::ObLogScanRunnable::MINI_MODE_THREAD_CNT))
TG_DEF(ReplayEngine, ReplayEngine, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(sysconf(_SC_NPROCESSORS_ONLN), 2), TG_DEF(ReplayEngine, ReplayEngine, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(sysconf(_SC_NPROCESSORS_ONLN), 2),
!lib::is_mini_mode() ? common::REPLAY_TASK_QUEUE_SIZE * OB_MAX_PARTITION_NUM_PER_SERVER !lib::is_mini_mode() ? (common::REPLAY_TASK_QUEUE_SIZE + 1) * OB_MAX_PARTITION_NUM_PER_SERVER
: common::REPLAY_TASK_QUEUE_SIZE * OB_MINI_MODE_MAX_PARTITION_NUM_PER_SERVER) : (common::REPLAY_TASK_QUEUE_SIZE + 1) * OB_MINI_MODE_MAX_PARTITION_NUM_PER_SERVER)
TG_DEF(LogCb, LogCb, "", TG_STATIC, QUEUE_THREAD, TG_DEF(LogCb, LogCb, "", TG_STATIC, QUEUE_THREAD,
ThreadCountPair(clog::ObCLogMgr::CLOG_CB_THREAD_COUNT, clog::ObCLogMgr::MINI_MODE_CLOG_CB_THREAD_COUNT), ThreadCountPair(clog::ObCLogMgr::CLOG_CB_THREAD_COUNT, clog::ObCLogMgr::MINI_MODE_CLOG_CB_THREAD_COUNT),
clog::CLOG_CB_TASK_QUEUE_SIZE) clog::CLOG_CB_TASK_QUEUE_SIZE)
......
...@@ -766,10 +766,11 @@ DEF_TIME(_ob_trans_rpc_timeout, OB_CLUSTER_PARAMETER, "3s", "[0s, 3600s]", ...@@ -766,10 +766,11 @@ DEF_TIME(_ob_trans_rpc_timeout, OB_CLUSTER_PARAMETER, "3s", "[0s, 3600s]",
DEF_BOOL(enable_early_lock_release, OB_TENANT_PARAMETER, "False", "enable early lock release", DEF_BOOL(enable_early_lock_release, OB_TENANT_PARAMETER, "False", "enable early lock release",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_BOOL(__enable_block_receiving_clog, OB_CLUSTER_PARAMETER, "True", //__enable_block_receiving_clog is obsolete
"If this option is set to true, block receiving clog for slave replicas when too much clog is waiting for beening " // DEF_BOOL(__enable_block_receiving_clog, OB_CLUSTER_PARAMETER, "True",
"submited to replaying. The default is true", // "If this option is set to true, block receiving clog for slave replicas when too much clog is waiting for
ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); // beening submited to replaying. The default is true", ObParameterAttr(Section::TRANS, Source::DEFAULT,
// EditLevel::DYNAMIC_EFFECTIVE));
DEF_TIME(_trx_commit_retry_interval, OB_CLUSTER_PARAMETER, "200ms", "[100ms,)", DEF_TIME(_trx_commit_retry_interval, OB_CLUSTER_PARAMETER, "200ms", "[100ms,)",
"transaction commit retry interval. Range: [100ms,)", "transaction commit retry interval. Range: [100ms,)",
ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
......
...@@ -759,7 +759,7 @@ private: ...@@ -759,7 +759,7 @@ private:
bool is_pending_; bool is_pending_;
// whether replay trans log or not: Log replica only needs replaying add_partition_to_pg and remove_partition_to_pg // whether replay trans log or not: Log replica only needs replaying add_partition_to_pg and remove_partition_to_pg
bool need_filter_trans_log_; bool need_filter_trans_log_;
bool can_receive_log_; bool can_receive_log_; // mainly consider whether clog disk space is enough
int64_t tenant_id_; int64_t tenant_id_;
uint64_t offline_partition_log_id_; uint64_t offline_partition_log_id_;
bool offline_partition_task_submitted_; bool offline_partition_task_submitted_;
......
...@@ -1821,23 +1821,24 @@ int ObLogReplayEngine::fetch_and_submit_single_log_(const ObPartitionKey& pkey, ...@@ -1821,23 +1821,24 @@ int ObLogReplayEngine::fetch_and_submit_single_log_(const ObPartitionKey& pkey,
CLOG_LOG(WARN, "log_engine get_cursor failed", K(ret), K(pkey), K(log_id)); CLOG_LOG(WARN, "log_engine get_cursor failed", K(ret), K(pkey), K(log_id));
} }
} else { } else {
uint32_t clog_min_using_file_id = log_engine->get_clog_min_using_file_id(); bool need_block_receive_log = false;
if (log_cursor_ext.get_file_id() <= clog_min_using_file_id) { if (OB_FAIL(log_engine->check_need_block_log(log_cursor_ext.get_file_id(), need_block_receive_log))) {
bool need_block_receive_log = false; REPLAY_LOG(WARN, "failed to check_need_block_log", K(ret), K(pkey), K(log_id), K(log_cursor_ext));
if (OB_FAIL(log_engine->check_need_block_log(need_block_receive_log))) { } else if (need_block_receive_log) {
REPLAY_LOG(WARN, "failed to check_need_block_log ", K(ret), K(pkey), K(log_id)); if (replay_status.can_receive_log()) {
} else if (need_block_receive_log) {
// mark replay status // mark replay status
replay_status.set_can_receive_log(false); replay_status.set_can_receive_log(false);
if (REACH_TIME_INTERVAL(1 * 1000 * 1000)) { REPLAY_LOG(ERROR, "can not receive log now", K(pkey), K(log_cursor_ext), K(log_id));
REPLAY_LOG(
WARN, "can not receive log now", K(log_cursor_ext), K(pkey), K(clog_min_using_file_id), K(log_id));
}
} else { } else {
replay_status.set_can_receive_log(true); if (REACH_TIME_INTERVAL(2 * 1000 * 1000L)) {
REPLAY_LOG(WARN, "receiving log has been blocked", K(pkey), K(log_cursor_ext), K(log_id));
}
} }
} else { } else {
replay_status.set_can_receive_log(true); if (!replay_status.can_receive_log()) {
replay_status.set_can_receive_log(true);
REPLAY_LOG(INFO, "can receive log now", K(pkey), K(log_cursor_ext), K(log_id));
}
} }
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册