提交 8e5889cf 编写于 作者: A Alexey Milovidov

Improved merge selecting algorithm for Replicated tables [#METR-23615].

上级 36553dd6
......@@ -30,6 +30,11 @@ public:
*/
size_t getMaxPartsSizeForMerge();
/** For explicitly passed size of pool and number of used tasks.
* This method could be used to calculate threshold depending on number of tasks in replication queue.
*/
size_t getMaxPartsSizeForMerge(size_t pool_size, size_t pool_used);
/** Выбирает, какие куски слить. Использует кучу эвристик.
*
* can_merge - функция, определяющая, можно ли объединить пару соседних кусков.
......
......@@ -21,7 +21,7 @@ struct MergeTreeSettings
size_t max_bytes_to_merge_at_min_space_in_pool = 1024 * 1024;
/// How many tasks of merging parts are allowed simultaneously in ReplicatedMergeTree queue.
size_t max_replicated_merges_in_queue = 6;
size_t max_replicated_merges_in_queue = 16;
/// How many seconds to keep obsolete parts.
time_t old_parts_lifetime = 8 * 60;
......
......@@ -85,12 +85,20 @@ size_t MergeTreeDataMerger::getMaxPartsSizeForMerge()
{
size_t total_threads_in_pool = pool.getNumberOfThreads();
size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed);
size_t free_threads_in_pool = 1 + total_threads_in_pool - busy_threads_in_pool; /// 1 is current thread
return getMaxPartsSizeForMerge(total_threads_in_pool, busy_threads_in_pool == 0 ? 0 : busy_threads_in_pool - 1); /// 1 is current thread
}
size_t MergeTreeDataMerger::getMaxPartsSizeForMerge(size_t pool_size, size_t pool_used)
{
if (pool_used > pool_size)
throw Exception("Logical error: invalid arguments passed to getMaxPartsSizeForMerge: pool_used > pool_size", ErrorCodes::LOGICAL_ERROR);
size_t max_size = interpolateExponential(
data.settings.max_bytes_to_merge_at_min_space_in_pool,
data.settings.max_bytes_to_merge_at_max_space_in_pool,
static_cast<double>(free_threads_in_pool) / total_threads_in_pool);
static_cast<double>(pool_size - pool_used) / pool_size);
return std::min(max_size, static_cast<size_t>(DiskSpaceMonitor::getUnreservedFreeSpace(data.full_path) / DISK_USAGE_COEFFICIENT_TO_SELECT));
}
......
......@@ -1696,6 +1696,10 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
std::lock_guard<std::mutex> merge_selecting_lock(merge_selecting_mutex);
/** If many merges is already queued, then will queue only small enough merges.
* Otherwise merge queue could be filled with only large merges,
* and in the same time, many small parts could be created and won't be merged.
*/
size_t merges_queued = queue.countMerges();
if (merges_queued >= data.settings.max_replicated_merges_in_queue)
......@@ -1712,7 +1716,9 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
if (merger.selectPartsToMerge(
parts, merged_name, false, std::min(disk_space, data.settings.max_bytes_to_merge_at_max_space_in_pool), can_merge)
parts, merged_name, false,
merger.getMaxPartsSizeForMerge(data.settings.max_replicated_merges_in_queue, merges_queued),
can_merge)
&& createLogEntryToMergeParts(parts, merged_name))
{
success = true;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册