diff --git a/dbms/include/DB/Storages/MergeTree/BackgroundProcessingPool.h b/dbms/include/DB/Storages/MergeTree/BackgroundProcessingPool.h index b846a1e037d37d85907581daa78346fca626574d..b282973f31908570fc3f57d3fd197cac6ef612e5 100644 --- a/dbms/include/DB/Storages/MergeTree/BackgroundProcessingPool.h +++ b/dbms/include/DB/Storages/MergeTree/BackgroundProcessingPool.h @@ -101,7 +101,7 @@ public: } - int getNumberOfThreads() const + size_t getNumberOfThreads() const { return size; } diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h index 79d06ac75c021e2e5e3a11593bb197d10fd928d1..a09592774cdc257c370996eae008cc72496d38cb 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h @@ -82,11 +82,14 @@ struct MergeTreeSettings size_t max_parts_to_merge_at_once_if_small = 100; /// Куски настолько большого размера объединять нельзя вообще. - size_t max_bytes_to_merge_parts = 25ul * 1024 * 1024 * 1024; + size_t max_bytes_to_merge_parts = 10ul * 1024 * 1024 * 1024; /// Не больше половины потоков одновременно могут выполнять слияния, в которых участвует хоть один кусок хотя бы такого размера. size_t max_bytes_to_merge_parts_small = 250 * 1024 * 1024; + /// Куски настолько большого размера в сумме, объединять нельзя вообще. + size_t max_sum_bytes_to_merge_parts = 25ul * 1024 * 1024 * 1024; + /// Во столько раз ночью увеличиваем коэффициент. size_t merge_parts_at_night_inc = 10; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 95f96788c251e585e5c700d4c1ac5827b4e2b639..744efdd6cceba63798a46bece8fbf907bb1ca2f3 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -41,6 +41,23 @@ static const double DISK_USAGE_COEFFICIENT_TO_RESERVE = 1.4; bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & parts, String & merged_name, size_t available_disk_space, bool merge_anything_for_old_months, bool aggressive, bool only_small, const AllowedMergingPredicate & can_merge_callback) { + std::stringstream log_message; + + log_message << "Selecting parts to merge. Available disk space: "; + + if (available_disk_space == NO_LIMIT) + log_message << "no limit"; + else + log_message << available_disk_space << " bytes"; + + log_message + << ". Merge anything for old months: " << merge_anything_for_old_months + << ". Aggressive: " << aggressive + << ". Only small: " << only_small + << "."; + + LOG_TRACE(log, log_message.rdbuf()); + MergeTreeData::DataParts data_parts = data.getDataParts(); DateLUT & date_lut = DateLUT::instance(); @@ -60,14 +77,23 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa int max_count_from_left = 0; size_t cur_max_bytes_to_merge_parts = data.settings.max_bytes_to_merge_parts; + size_t cur_max_sum_bytes_to_merge_parts = data.settings.max_sum_bytes_to_merge_parts; /// Если ночь, можем мерджить сильно большие куски - if (now_hour >= 1 && now_hour <= 5) + bool tonight = now_hour >= 1 && now_hour <= 5; + + if (tonight) + { cur_max_bytes_to_merge_parts *= data.settings.merge_parts_at_night_inc; + cur_max_sum_bytes_to_merge_parts *= data.settings.merge_parts_at_night_inc; + } if (only_small) cur_max_bytes_to_merge_parts = data.settings.max_bytes_to_merge_parts_small; + LOG_TRACE(log, "Max bytes to merge parts: " << cur_max_bytes_to_merge_parts + << " " << (only_small ? "(only small)" : (tonight ? "(tonight)" : "")) << "."); + /// Мемоизация для функции can_merge_callback. Результат вызова can_merge_callback для этого куска и предыдущего в data_parts. std::map can_merge_with_previous; auto can_merge = [&can_merge_with_previous, &can_merge_callback] @@ -168,6 +194,10 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa ++cur_len; cur_id = last_part->right; + if (cur_sum > cur_max_sum_bytes_to_merge_parts + && !aggressive) + break; + int min_len = 2; int cur_age_in_sec = time(0) - oldest_modification_time; @@ -262,6 +292,10 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa LOG_DEBUG(log, "Selected " << parts.size() << " parts from " << parts.front()->name << " to " << parts.back()->name << (only_small ? " (only small)" : "")); } + else + { + LOG_TRACE(log, "No parts selected for merge."); + } return found; } diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 838f89a36664a4b1982eff7621e9b7dcb19b1e2c..94fb296ef6f3fff53c22e123d06f3a2bf0bce7b0 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -156,12 +156,13 @@ bool StorageMergeTree::merge(bool aggressive, BackgroundProcessingPool::Context auto can_merge = std::bind(&StorageMergeTree::canMergeParts, this, std::placeholders::_1, std::placeholders::_2); /// Если слияние запущено из пула потоков, и хотя бы половина потоков сливает большие куски, /// не будем сливать большие куски. - int big_merges = background_pool.getCounter("big merges"); + size_t big_merges = background_pool.getCounter("big merges"); bool only_small = pool_context && big_merges * 2 >= background_pool.getNumberOfThreads(); if (!merger.selectPartsToMerge(parts, merged_name, disk_space, false, aggressive, only_small, can_merge) && !merger.selectPartsToMerge(parts, merged_name, disk_space, true, aggressive, only_small, can_merge)) { + LOG_INFO(log, "No parts to merge"); return false; } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 47668a88427db9122a0837e38c77c2245785c65f..238bd74ce714aa9eceff49acf55239deac86ab04 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -892,6 +893,14 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro } else if (entry.type == LogEntry::MERGE_PARTS) { + std::stringstream log_message; + log_message << "Executing log entry to merge parts "; + for (auto i : ext::range(0, entry.parts_to_merge.size())) + log_message << (i != 0 ? ", " : "") << entry.parts_to_merge[i]; + log_message << " to " << entry.new_part_name; + + LOG_TRACE(log, log_message.rdbuf()); + MergeTreeData::DataPartsVector parts; bool have_all_parts = true; for (const String & name : entry.parts_to_merge) @@ -1359,11 +1368,17 @@ void StorageReplicatedMergeTree::mergeSelectingThread() need_pull = false; } + /** Сколько в очереди или в фоновом потоке мерджей крупных кусков. + * Если их больше половины от размера пула потоков для мерджа, то можно мерджить только мелкие куски. + */ + auto & background_pool = context.getBackgroundPool(); + + size_t big_merges_current = background_pool.getCounter("replicated big merges"); + size_t max_number_of_big_merges = background_pool.getNumberOfThreads() / 2; size_t merges_queued = 0; - /// Есть ли в очереди или в фоновом потоке мердж крупных кусков. - bool has_big_merge = context.getBackgroundPool().getCounter("replicated big merges") > 0; + size_t big_merges_queued = 0; - if (!has_big_merge) + if (big_merges_current < max_number_of_big_merges) { std::unique_lock lock(queue_mutex); @@ -1373,16 +1388,17 @@ void StorageReplicatedMergeTree::mergeSelectingThread() { ++merges_queued; - if (!has_big_merge) + if (big_merges_current + big_merges_queued < max_number_of_big_merges) { for (const String & name : entry->parts_to_merge) { MergeTreeData::DataPartPtr part = data.getActiveContainingPart(name); if (!part || part->name != name) continue; + if (part->size_in_bytes > data.settings.max_bytes_to_merge_parts_small) { - has_big_merge = true; + ++big_merges_queued; break; } } @@ -1391,20 +1407,32 @@ void StorageReplicatedMergeTree::mergeSelectingThread() } } + bool only_small = big_merges_current + big_merges_queued >= max_number_of_big_merges; + + LOG_TRACE(log, "Currently executing big merges: " << big_merges_current + << ". Queued big merges: " << big_merges_queued + << ". All merges in queue: " << merges_queued + << ". Max number of big merges: " << max_number_of_big_merges + << (only_small ? ". So, will select only small parts to merge." : ".")); + do { if (merges_queued >= data.settings.max_replicated_merges_in_queue) + { + LOG_TRACE(log, "Number of queued merges is greater than max_replicated_merges_in_queue, so won't select new parts to merge."); break; + } MergeTreeData::DataPartsVector parts; String merged_name; - if (!merger.selectPartsToMerge(parts, merged_name, MergeTreeDataMerger::NO_LIMIT, - false, false, has_big_merge, can_merge) && - !merger.selectPartsToMerge(parts, merged_name, MergeTreeDataMerger::NO_LIMIT, - true, false, has_big_merge, can_merge)) + if ( !merger.selectPartsToMerge(parts, merged_name, MergeTreeDataMerger::NO_LIMIT, false, false, only_small, can_merge) + && !merger.selectPartsToMerge(parts, merged_name, MergeTreeDataMerger::NO_LIMIT, true, false, only_small, can_merge)) + { + LOG_INFO(log, "No parts to merge"); break; + } bool all_in_zk = true; for (const auto & part : parts)