From 63a91b18184f01d53402023463149af1623795fc Mon Sep 17 00:00:00 2001 From: yy0 Date: Fri, 20 Aug 2021 13:54:03 +0800 Subject: [PATCH] [replay engine] removed __enable_block_receiving_clog and increased replay_concurrency from 4 to 64 --- src/clog/ob_i_log_engine.h | 2 +- src/clog/ob_log_engine.cpp | 13 +++------- src/clog/ob_log_engine.h | 2 +- src/clog/ob_log_sliding_window.cpp | 22 +--------------- src/share/ob_define.h | 2 +- src/share/ob_thread_define.h | 4 +-- src/share/parameter/ob_parameter_seed.ipp | 9 ++++--- src/storage/ob_replay_status.h | 2 +- .../replayengine/ob_log_replay_engine.cpp | 25 ++++++++++--------- 9 files changed, 29 insertions(+), 52 deletions(-) diff --git a/src/clog/ob_i_log_engine.h b/src/clog/ob_i_log_engine.h index 4938a85a4c..9efe11a11d 100644 --- a/src/clog/ob_i_log_engine.h +++ b/src/clog/ob_i_log_engine.h @@ -186,7 +186,7 @@ public: virtual int get_ilog_file_id_range(file_id_t& min_file_id, file_id_t& max_file_id) = 0; virtual int query_next_ilog_file_id(file_id_t& next_ilog_file_id) = 0; virtual int get_index_info_block_map(const file_id_t file_id, IndexInfoBlockMap& index_info_block_map) = 0; - virtual int check_need_block_log(bool& is_need) const = 0; + virtual int check_need_block_log(const file_id_t cur_file_id, bool &is_need) const = 0; // want_size refers to the length in clog, which may be the length after compression, and the returned data is after // decompression diff --git a/src/clog/ob_log_engine.cpp b/src/clog/ob_log_engine.cpp index 153194cef3..00bee17718 100644 --- a/src/clog/ob_log_engine.cpp +++ b/src/clog/ob_log_engine.cpp @@ -2146,18 +2146,13 @@ int ObLogEngine::check_need_freeze_based_on_used_space_(bool& is_need) const return ret; } -int ObLogEngine::check_need_block_log(bool& is_need) const +int ObLogEngine::check_need_block_log(const file_id_t cur_file_id, bool &is_need) const { int ret = OB_SUCCESS; is_need = false; - const uint32_t clog_max_file_id = clog_env_.get_max_file_id(); - const uint32_t clog_min_file_id = clog_env_.get_min_file_id(); - const uint32_t clog_min_using_file_id = clog_env_.get_min_using_file_id(); - - // clog disk has been used - if (clog_min_file_id > 1) { - is_need = (clog_max_file_id - clog_min_using_file_id) * 100LL > - (clog_max_file_id - clog_min_file_id) * RESERVED_DISK_USAGE_PERFERT; + uint32_t clog_min_using_file_id = get_clog_min_using_file_id(); + if (cur_file_id == clog_min_using_file_id) { + ret = check_need_freeze_based_on_used_space_(is_need); } return ret; } diff --git a/src/clog/ob_log_engine.h b/src/clog/ob_log_engine.h index 49edfcb7cb..1e966c67f2 100644 --- a/src/clog/ob_log_engine.h +++ b/src/clog/ob_log_engine.h @@ -522,7 +522,7 @@ public: int ensure_log_continuous_in_file_id_cache( const common::ObPartitionKey& partition_key, const uint64_t log_id) override; int get_index_info_block_map(const file_id_t file_id, IndexInfoBlockMap& index_info_block_map) override; - int check_need_block_log(bool& is_need) const override; + int check_need_block_log(const file_id_t cur_file_id, bool &is_need) const; int delete_all_ilog_files(); ObLogCache* get_ilog_log_cache() override { diff --git a/src/clog/ob_log_sliding_window.cpp b/src/clog/ob_log_sliding_window.cpp index 22798e8049..76e3ca09bf 100644 --- a/src/clog/ob_log_sliding_window.cpp +++ b/src/clog/ob_log_sliding_window.cpp @@ -4384,7 +4384,7 @@ bool ObLogSlidingWindow::check_can_receive_larger_log(const uint64_t log_id) const uint64_t start_log_id = sw_.get_start_id(); state_mgr_->report_start_id_trace(start_log_id); CLOG_LOG(WARN, - "check_can_receive_log, now can not recieve larger log", + "check_can_receive_log, now can not receive larger log because of sliding window is full", K_(partition_key), K(log_id), "next_index_log_id", @@ -4395,26 +4395,6 @@ bool ObLogSlidingWindow::check_can_receive_larger_log(const uint64_t log_id) } } } - - if (bool_ret && GCONF.__enable_block_receiving_clog) { - int64_t pending_submit_task_count = INT64_MAX; - int ret = OB_SUCCESS; - if (OB_FAIL(replay_engine_->get_pending_submit_task_count(partition_key_, pending_submit_task_count))) { - bool_ret = false; - CLOG_LOG(WARN, "failed to get_pending_submit_task_count", KR(ret), K_(partition_key)); - } else { - bool_ret = pending_submit_task_count < follower_max_unconfirmed_threshold; - if (!bool_ret) { - if (partition_reach_time_interval(5 * 60 * 1000 * 1000, check_can_receive_larger_log_warn_time_)) { - CLOG_LOG(WARN, - "check_can_receive_log, now can not recieve larger log because of pending too many task to submit", - K_(partition_key), - K(log_id), - K(pending_submit_task_count)); - } - } - } - } return bool_ret; } diff --git a/src/share/ob_define.h b/src/share/ob_define.h index fad93e04c0..b0dada12de 100644 --- a/src/share/ob_define.h +++ b/src/share/ob_define.h @@ -453,7 +453,7 @@ const int64_t OB_STATUS_LENGTH = 64; /////////////////////////// //// used for replay -const int64_t REPLAY_TASK_QUEUE_SIZE = 4; +const int64_t REPLAY_TASK_QUEUE_SIZE = 64; inline int64_t& get_replay_queue_index() { static __thread int64_t replay_queue_index = -1; diff --git a/src/share/ob_thread_define.h b/src/share/ob_thread_define.h index 73dbc3b574..31a59339d3 100644 --- a/src/share/ob_thread_define.h +++ b/src/share/ob_thread_define.h @@ -38,8 +38,8 @@ TG_DEF(PartSerSlogWr, PartSerSlogWr, "", TG_STATIC, QUEUE_THREAD, TG_DEF(LogScan, LogScan, "", TG_STATIC, TIMER_GROUP, ThreadCountPair(clog::ObLogScanRunnable::MAX_THREAD_CNT, clog::ObLogScanRunnable::MINI_MODE_THREAD_CNT)) TG_DEF(ReplayEngine, ReplayEngine, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(sysconf(_SC_NPROCESSORS_ONLN), 2), - !lib::is_mini_mode() ? common::REPLAY_TASK_QUEUE_SIZE * OB_MAX_PARTITION_NUM_PER_SERVER - : common::REPLAY_TASK_QUEUE_SIZE * OB_MINI_MODE_MAX_PARTITION_NUM_PER_SERVER) + !lib::is_mini_mode() ? (common::REPLAY_TASK_QUEUE_SIZE + 1) * OB_MAX_PARTITION_NUM_PER_SERVER + : (common::REPLAY_TASK_QUEUE_SIZE + 1) * OB_MINI_MODE_MAX_PARTITION_NUM_PER_SERVER) TG_DEF(LogCb, LogCb, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(clog::ObCLogMgr::CLOG_CB_THREAD_COUNT, clog::ObCLogMgr::MINI_MODE_CLOG_CB_THREAD_COUNT), clog::CLOG_CB_TASK_QUEUE_SIZE) diff --git a/src/share/parameter/ob_parameter_seed.ipp b/src/share/parameter/ob_parameter_seed.ipp index a92bb1c84e..65fe8bba36 100644 --- a/src/share/parameter/ob_parameter_seed.ipp +++ b/src/share/parameter/ob_parameter_seed.ipp @@ -766,10 +766,11 @@ DEF_TIME(_ob_trans_rpc_timeout, OB_CLUSTER_PARAMETER, "3s", "[0s, 3600s]", DEF_BOOL(enable_early_lock_release, OB_TENANT_PARAMETER, "False", "enable early lock release", ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); -DEF_BOOL(__enable_block_receiving_clog, OB_CLUSTER_PARAMETER, "True", - "If this option is set to true, block receiving clog for slave replicas when too much clog is waiting for beening " - "submited to replaying. The default is true", - ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); +//__enable_block_receiving_clog is obsolete +// DEF_BOOL(__enable_block_receiving_clog, OB_CLUSTER_PARAMETER, "True", +// "If this option is set to true, block receiving clog for slave replicas when too much clog is waiting for +// beening submited to replaying. The default is true", ObParameterAttr(Section::TRANS, Source::DEFAULT, +// EditLevel::DYNAMIC_EFFECTIVE)); DEF_TIME(_trx_commit_retry_interval, OB_CLUSTER_PARAMETER, "200ms", "[100ms,)", "transaction commit retry interval. Range: [100ms,)", ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); diff --git a/src/storage/ob_replay_status.h b/src/storage/ob_replay_status.h index a6f6e3fbf4..7e3588da70 100644 --- a/src/storage/ob_replay_status.h +++ b/src/storage/ob_replay_status.h @@ -759,7 +759,7 @@ private: bool is_pending_; // whether replay trans log or not: Log replica only needs replaying add_partition_to_pg and remove_partition_to_pg bool need_filter_trans_log_; - bool can_receive_log_; + bool can_receive_log_; // mainly consider whether clog disk space is enough int64_t tenant_id_; uint64_t offline_partition_log_id_; bool offline_partition_task_submitted_; diff --git a/src/storage/replayengine/ob_log_replay_engine.cpp b/src/storage/replayengine/ob_log_replay_engine.cpp index 64db5f8225..be46696a96 100644 --- a/src/storage/replayengine/ob_log_replay_engine.cpp +++ b/src/storage/replayengine/ob_log_replay_engine.cpp @@ -1821,23 +1821,24 @@ int ObLogReplayEngine::fetch_and_submit_single_log_(const ObPartitionKey& pkey, CLOG_LOG(WARN, "log_engine get_cursor failed", K(ret), K(pkey), K(log_id)); } } else { - uint32_t clog_min_using_file_id = log_engine->get_clog_min_using_file_id(); - if (log_cursor_ext.get_file_id() <= clog_min_using_file_id) { - bool need_block_receive_log = false; - if (OB_FAIL(log_engine->check_need_block_log(need_block_receive_log))) { - REPLAY_LOG(WARN, "failed to check_need_block_log ", K(ret), K(pkey), K(log_id)); - } else if (need_block_receive_log) { + bool need_block_receive_log = false; + if (OB_FAIL(log_engine->check_need_block_log(log_cursor_ext.get_file_id(), need_block_receive_log))) { + REPLAY_LOG(WARN, "failed to check_need_block_log", K(ret), K(pkey), K(log_id), K(log_cursor_ext)); + } else if (need_block_receive_log) { + if (replay_status.can_receive_log()) { // mark replay status replay_status.set_can_receive_log(false); - if (REACH_TIME_INTERVAL(1 * 1000 * 1000)) { - REPLAY_LOG( - WARN, "can not receive log now", K(log_cursor_ext), K(pkey), K(clog_min_using_file_id), K(log_id)); - } + REPLAY_LOG(ERROR, "can not receive log now", K(pkey), K(log_cursor_ext), K(log_id)); } else { - replay_status.set_can_receive_log(true); + if (REACH_TIME_INTERVAL(2 * 1000 * 1000L)) { + REPLAY_LOG(WARN, "receiving log has been blocked", K(pkey), K(log_cursor_ext), K(log_id)); + } } } else { - replay_status.set_can_receive_log(true); + if (!replay_status.can_receive_log()) { + replay_status.set_can_receive_log(true); + REPLAY_LOG(INFO, "can receive log now", K(pkey), K(log_cursor_ext), K(log_id)); + } } if (OB_SUCC(ret)) { -- GitLab