提交 f2c0ba3b 编写于 作者: A Alexey Milovidov

dbms: fixed error with merges [#METR-12767].

上级 8505c84c
......@@ -101,7 +101,7 @@ public:
}
int getNumberOfThreads() const
size_t getNumberOfThreads() const
{
return size;
}
......
......@@ -82,11 +82,14 @@ struct MergeTreeSettings
size_t max_parts_to_merge_at_once_if_small = 100;
/// Куски настолько большого размера объединять нельзя вообще.
size_t max_bytes_to_merge_parts = 25ul * 1024 * 1024 * 1024;
size_t max_bytes_to_merge_parts = 10ul * 1024 * 1024 * 1024;
/// Не больше половины потоков одновременно могут выполнять слияния, в которых участвует хоть один кусок хотя бы такого размера.
size_t max_bytes_to_merge_parts_small = 250 * 1024 * 1024;
/// Куски настолько большого размера в сумме, объединять нельзя вообще.
size_t max_sum_bytes_to_merge_parts = 25ul * 1024 * 1024 * 1024;
/// Во столько раз ночью увеличиваем коэффициент.
size_t merge_parts_at_night_inc = 10;
......
......@@ -41,6 +41,23 @@ static const double DISK_USAGE_COEFFICIENT_TO_RESERVE = 1.4;
bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & parts, String & merged_name, size_t available_disk_space,
bool merge_anything_for_old_months, bool aggressive, bool only_small, const AllowedMergingPredicate & can_merge_callback)
{
std::stringstream log_message;
log_message << "Selecting parts to merge. Available disk space: ";
if (available_disk_space == NO_LIMIT)
log_message << "no limit";
else
log_message << available_disk_space << " bytes";
log_message
<< ". Merge anything for old months: " << merge_anything_for_old_months
<< ". Aggressive: " << aggressive
<< ". Only small: " << only_small
<< ".";
LOG_TRACE(log, log_message.rdbuf());
MergeTreeData::DataParts data_parts = data.getDataParts();
DateLUT & date_lut = DateLUT::instance();
......@@ -60,14 +77,23 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa
int max_count_from_left = 0;
size_t cur_max_bytes_to_merge_parts = data.settings.max_bytes_to_merge_parts;
size_t cur_max_sum_bytes_to_merge_parts = data.settings.max_sum_bytes_to_merge_parts;
/// Если ночь, можем мерджить сильно большие куски
if (now_hour >= 1 && now_hour <= 5)
bool tonight = now_hour >= 1 && now_hour <= 5;
if (tonight)
{
cur_max_bytes_to_merge_parts *= data.settings.merge_parts_at_night_inc;
cur_max_sum_bytes_to_merge_parts *= data.settings.merge_parts_at_night_inc;
}
if (only_small)
cur_max_bytes_to_merge_parts = data.settings.max_bytes_to_merge_parts_small;
LOG_TRACE(log, "Max bytes to merge parts: " << cur_max_bytes_to_merge_parts
<< " " << (only_small ? "(only small)" : (tonight ? "(tonight)" : "")) << ".");
/// Мемоизация для функции can_merge_callback. Результат вызова can_merge_callback для этого куска и предыдущего в data_parts.
std::map<MergeTreeData::DataPartPtr, bool> can_merge_with_previous;
auto can_merge = [&can_merge_with_previous, &can_merge_callback]
......@@ -168,6 +194,10 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa
++cur_len;
cur_id = last_part->right;
if (cur_sum > cur_max_sum_bytes_to_merge_parts
&& !aggressive)
break;
int min_len = 2;
int cur_age_in_sec = time(0) - oldest_modification_time;
......@@ -262,6 +292,10 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa
LOG_DEBUG(log, "Selected " << parts.size() << " parts from " << parts.front()->name << " to " << parts.back()->name
<< (only_small ? " (only small)" : ""));
}
else
{
LOG_TRACE(log, "No parts selected for merge.");
}
return found;
}
......
......@@ -156,12 +156,13 @@ bool StorageMergeTree::merge(bool aggressive, BackgroundProcessingPool::Context
auto can_merge = std::bind(&StorageMergeTree::canMergeParts, this, std::placeholders::_1, std::placeholders::_2);
/// Если слияние запущено из пула потоков, и хотя бы половина потоков сливает большие куски,
/// не будем сливать большие куски.
int big_merges = background_pool.getCounter("big merges");
size_t big_merges = background_pool.getCounter("big merges");
bool only_small = pool_context && big_merges * 2 >= background_pool.getNumberOfThreads();
if (!merger.selectPartsToMerge(parts, merged_name, disk_space, false, aggressive, only_small, can_merge) &&
!merger.selectPartsToMerge(parts, merged_name, disk_space, true, aggressive, only_small, can_merge))
{
LOG_INFO(log, "No parts to merge");
return false;
}
......
#include <statdaemons/ext/range.hpp>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h>
......@@ -892,6 +893,14 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
}
else if (entry.type == LogEntry::MERGE_PARTS)
{
std::stringstream log_message;
log_message << "Executing log entry to merge parts ";
for (auto i : ext::range(0, entry.parts_to_merge.size()))
log_message << (i != 0 ? ", " : "") << entry.parts_to_merge[i];
log_message << " to " << entry.new_part_name;
LOG_TRACE(log, log_message.rdbuf());
MergeTreeData::DataPartsVector parts;
bool have_all_parts = true;
for (const String & name : entry.parts_to_merge)
......@@ -1359,11 +1368,17 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
need_pull = false;
}
/** Сколько в очереди или в фоновом потоке мерджей крупных кусков.
* Если их больше половины от размера пула потоков для мерджа, то можно мерджить только мелкие куски.
*/
auto & background_pool = context.getBackgroundPool();
size_t big_merges_current = background_pool.getCounter("replicated big merges");
size_t max_number_of_big_merges = background_pool.getNumberOfThreads() / 2;
size_t merges_queued = 0;
/// Есть ли в очереди или в фоновом потоке мердж крупных кусков.
bool has_big_merge = context.getBackgroundPool().getCounter("replicated big merges") > 0;
size_t big_merges_queued = 0;
if (!has_big_merge)
if (big_merges_current < max_number_of_big_merges)
{
std::unique_lock<std::mutex> lock(queue_mutex);
......@@ -1373,16 +1388,17 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
{
++merges_queued;
if (!has_big_merge)
if (big_merges_current + big_merges_queued < max_number_of_big_merges)
{
for (const String & name : entry->parts_to_merge)
{
MergeTreeData::DataPartPtr part = data.getActiveContainingPart(name);
if (!part || part->name != name)
continue;
if (part->size_in_bytes > data.settings.max_bytes_to_merge_parts_small)
{
has_big_merge = true;
++big_merges_queued;
break;
}
}
......@@ -1391,20 +1407,32 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
}
}
bool only_small = big_merges_current + big_merges_queued >= max_number_of_big_merges;
LOG_TRACE(log, "Currently executing big merges: " << big_merges_current
<< ". Queued big merges: " << big_merges_queued
<< ". All merges in queue: " << merges_queued
<< ". Max number of big merges: " << max_number_of_big_merges
<< (only_small ? ". So, will select only small parts to merge." : "."));
do
{
if (merges_queued >= data.settings.max_replicated_merges_in_queue)
{
LOG_TRACE(log, "Number of queued merges is greater than max_replicated_merges_in_queue, so won't select new parts to merge.");
break;
}
MergeTreeData::DataPartsVector parts;
String merged_name;
if (!merger.selectPartsToMerge(parts, merged_name, MergeTreeDataMerger::NO_LIMIT,
false, false, has_big_merge, can_merge) &&
!merger.selectPartsToMerge(parts, merged_name, MergeTreeDataMerger::NO_LIMIT,
true, false, has_big_merge, can_merge))
if ( !merger.selectPartsToMerge(parts, merged_name, MergeTreeDataMerger::NO_LIMIT, false, false, only_small, can_merge)
&& !merger.selectPartsToMerge(parts, merged_name, MergeTreeDataMerger::NO_LIMIT, true, false, only_small, can_merge))
{
LOG_INFO(log, "No parts to merge");
break;
}
bool all_in_zk = true;
for (const auto & part : parts)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册