提交 3d48f9d3 编写于 作者: A Alexey Milovidov

MergeSelector: development [#METR-21840].

上级 e5539af6
#pragma once
#include <cmath>
inline double interpolateLinear(double min, double max, double ratio)
{
return min + (max - min) * ratio;
}
/** It is linear interpolation in logarithmic coordinates.
* Exponential interpolation is related to linear interpolation
* exactly in same way as geometric mean is related to arithmetic mean.
* 'min' must be greater than zero, 'ratio' must be from 0 to 1.
*/
inline double interpolateExponential(double min, double max, double ratio)
{
return min * std::pow(max / min, ratio);
}
......@@ -8,30 +8,54 @@
namespace DB
{
/** Interface of algorithm to select data parts to merge
* (merge is also known as "compaction").
* Following properties depend on it:
*
* 1. Number of data parts at some moment in time.
* If parts are merged frequently, then data will be represented by lower number of parts, in average,
* but with cost of higher write amplification.
*
* 2. Write amplification ratio: how much times, on average, source data was written
* (during initial writes and followed merges).
*
* Number of parallel merges are controlled outside of scope of this interface.
*/
class IMergeSelector
{
public:
/// Information about data part relevant to merge selecting strategy.
struct Part
{
/// Size of data part in bytes.
size_t size;
/// How old this data part in seconds.
time_t age;
/// Depth of tree of merges by which this part was created. New parts has zero level.
unsigned level;
/// Opaque pointer to avoid dependencies (it is not possible to do forward declaration of typedef).
const void * data;
};
/// Parts are belong to partitions. Only parts within same partition could be merged.
using PartsInPartition = std::vector<Part>;
/// Parts are in some specific order. Parts could be merged only in contiguous ranges.
using Partitions = std::vector<PartsInPartition>;
using CanMergePart = std::function<bool(const Part &)>;
/// Some parts are forbidden to merge at this moment. Because them are already merged right now, or because there is some lock.
using CanMergeAdjacent = std::function<bool(const Part &, const Part &)>;
/** Function could be called at any frequency and it must decide, should you do any merge at all.
* If better not to do any merge, it returns empty result.
*/
virtual PartsInPartition select(
const Partitions & partitions,
CanMergePart can_merge_part,
CanMergeAdjacent can_merge_adjacent,
const size_t max_total_size_to_merge,
bool aggressive_mode) = 0;
const size_t max_total_size_to_merge) = 0;
virtual ~IMergeSelector() {}
};
......
......@@ -13,33 +13,32 @@ class SimpleMergeSelector : public IMergeSelector
public:
struct Settings
{
size_t min_parts_to_merge_at_once = 8;
size_t max_parts_to_merge_at_once = 100;
/** Minimum ratio of size of one part to all parts in set of parts to merge (for usual cases).
* For example, if all parts have equal size, it means, that at least 'base' number of parts should be merged.
* If parts has non-uniform sizes, then minumum number of parts to merge is effectively increased.
* This behaviour balances merge-tree workload.
* It called 'base', because merge-tree depth could be estimated as logarithm with that base.
*/
double base = 8;
double max_nonuniformity_of_sizes_to_merge = 2;
time_t lower_base_after = 300;
time_t lower_min_parts_to_merge_at_once_starting_at_time = 300;
size_t lower_min_parts_to_merge_at_once_base_of_exponent = 4;
/// Zero means unlimited.
size_t max_parts_to_merge_at_once = 100;
};
SimpleMergeSelector(const Settings & settings) : settings(settings) {}
PartsInPartition select(
const Partitions & partitions,
CanMergePart can_merge_part,
CanMergeAdjacent can_merge_adjacent,
const size_t max_total_size_to_merge,
bool aggressive_mode) override;
const size_t max_total_size_to_merge) override;
private:
const Settings settings;
void selectWithinPartition(
const PartsInPartition & parts,
CanMergePart can_merge_part,
CanMergeAdjacent can_merge_adjacent,
const size_t max_total_size_to_merge,
bool aggressive_mode,
Estimator & estimator);
};
......
......@@ -17,6 +17,7 @@
#include <DB/DataStreams/ConcatBlockInputStream.h>
#include <DB/Storages/MergeTree/BackgroundProcessingPool.h>
#include <DB/Common/Increment.h>
#include <DB/Common/interpolate.h>
#include <cmath>
......@@ -85,18 +86,10 @@ size_t MergeTreeDataMerger::getMaxPartsSizeForMerge()
size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed);
size_t free_threads_in_pool = total_threads_in_pool - busy_threads_in_pool;
/// Interpolate between 'max_bytes_to_merge_at_min_space_in_pool' and 'max_bytes_to_merge_at_max_space_in_pool' with exponential function.
double base_of_exponent =
static_cast<double>(data.settings.max_bytes_to_merge_at_min_space_in_pool)
/ data.settings.max_bytes_to_merge_at_min_space_in_pool;
/// from 0 to 1
double power_of_exponent =
static_cast<double>(free_threads_in_pool)
/ total_threads_in_pool;
size_t max_size = data.settings.max_bytes_to_merge_at_min_space_in_pool * std::pow(base_of_exponent, power_of_exponent);
size_t max_size = interpolateExponential(
data.settings.max_bytes_to_merge_at_min_space_in_pool,
data.settings.max_bytes_to_merge_at_max_space_in_pool,
static_cast<double>(free_threads_in_pool) / total_threads_in_pool);
return std::min(max_size, static_cast<size_t>(DiskSpaceMonitor::getUnreservedFreeSpace(data.full_path) / DISK_USAGE_COEFFICIENT_TO_SELECT));
}
......@@ -109,6 +102,8 @@ bool MergeTreeDataMerger::selectPartsToMerge(
size_t max_total_size_to_merge,
const AllowedMergingPredicate & can_merge_callback)
{
parts.clear();
MergeTreeData::DataParts data_parts = data.getDataParts();
if (data_parts.empty())
......@@ -150,46 +145,52 @@ bool MergeTreeDataMerger::selectPartsToMerge(
partitions.back().emplace_back(part_info);
}
SimpleMergeSelector merge_selector{SimpleMergeSelector::Settings()};
SimpleMergeSelector::Settings merge_settings;
if (aggressive)
{
merge_settings.base = 1;
merge_settings.lower_base_after = 0;
merge_settings.max_parts_to_merge_at_once = 0;
}
SimpleMergeSelector merge_selector(merge_settings);
IMergeSelector::PartsInPartition parts_to_merge = merge_selector.select(
partitions,
[] (const IMergeSelector::Part &) { return true; },
[&] (const IMergeSelector::Part & left, const IMergeSelector::Part & right)
{
return can_merge(
*static_cast<const MergeTreeData::DataPartPtr *>(left.data),
*static_cast<const MergeTreeData::DataPartPtr *>(right.data));
},
max_total_size_to_merge,
aggressive);
if (!parts_to_merge.empty())
{
parts.clear();
max_total_size_to_merge);
DayNum_t left_date = DayNum_t(std::numeric_limits<UInt16>::max());
DayNum_t right_date = DayNum_t(std::numeric_limits<UInt16>::min());
UInt32 level = 0;
if (parts_to_merge.empty())
return false;
for (IMergeSelector::Part & part_info : parts_to_merge)
{
const MergeTreeData::DataPartPtr & part = *static_cast<const MergeTreeData::DataPartPtr *>(part_info.data);
parts.reserve(parts_to_merge.size());
parts.push_back(part);
DayNum_t left_date = DayNum_t(std::numeric_limits<UInt16>::max());
DayNum_t right_date = DayNum_t(std::numeric_limits<UInt16>::min());
UInt32 level = 0;
level = std::max(level, part->level);
left_date = std::min(left_date, part->left_date);
right_date = std::max(right_date, part->right_date);
}
for (IMergeSelector::Part & part_info : parts_to_merge)
{
const MergeTreeData::DataPartPtr & part = *static_cast<const MergeTreeData::DataPartPtr *>(part_info.data);
merged_name = ActiveDataPartSet::getPartName(
left_date, right_date, parts.front()->left, parts.back()->right, level + 1);
parts.push_back(part);
LOG_DEBUG(log, "Selected " << parts.size() << " parts from " << parts.front()->name << " to " << parts.back()->name);
return true;
level = std::max(level, part->level);
left_date = std::min(left_date, part->left_date);
right_date = std::max(right_date, part->right_date);
}
return false;
merged_name = ActiveDataPartSet::getPartName(
left_date, right_date, parts.front()->left, parts.back()->right, level + 1);
LOG_DEBUG(log, "Selected " << parts.size() << " parts from " << parts.front()->name << " to " << parts.back()->name);
return true;
}
......
#include <DB/Storages/MergeTree/SimpleMergeSelector.h>
#include <cmath>
#include <iostream>
namespace DB
{
/** Estimates best set of parts to merge within passed alternatives.
*/
struct Estimator
{
void consider(SimpleMergeSelector::PartsInPartition && parts, size_t sum_size)
{
if (!min_size || sum_size < min_size)
double current_score = score(parts.size(), sum_size);
if (!min_score || current_score < min_score)
{
min_size = sum_size;
min_score = current_score;
best = std::move(parts);
}
}
......@@ -22,22 +25,36 @@ struct Estimator
return std::move(best);
}
size_t min_size = 0;
static double score(double count, double sum_size)
{
/** Consider we have two alternative ranges of data parts to merge.
* Assume time to merge a range is proportional to sum size of its parts.
*
* Cost of query execution is proportional to total number of data parts in a moment of time.
* Let define our target: to minimize average (in time) total number of data parts.
*
* Let calculate integral of total number of parts, if we are going to do merge of one or another range.
* It must be lower, and thus we decide, what range is better to merge.
*
* The integral is lower iff the following formula is lower:
*/
return sum_size / (count - 1);
}
double min_score = 0;
SimpleMergeSelector::PartsInPartition best;
};
SimpleMergeSelector::PartsInPartition SimpleMergeSelector::select(
const Partitions & partitions,
CanMergePart can_merge_part,
CanMergeAdjacent can_merge_adjacent,
const size_t max_total_size_to_merge,
bool aggressive_mode)
const size_t max_total_size_to_merge)
{
Estimator estimator;
for (const auto & partition : partitions)
selectWithinPartition(partition, can_merge_part, can_merge_adjacent, max_total_size_to_merge, aggressive_mode, estimator);
selectWithinPartition(partition, can_merge_adjacent, max_total_size_to_merge, estimator);
return estimator.getBest();
}
......@@ -45,81 +62,68 @@ SimpleMergeSelector::PartsInPartition SimpleMergeSelector::select(
void SimpleMergeSelector::selectWithinPartition(
const PartsInPartition & parts,
CanMergePart can_merge_part,
CanMergeAdjacent can_merge_adjacent,
const size_t max_total_size_to_merge,
bool aggressive_mode,
Estimator & estimator)
{
if (parts.size() <= 1)
return;
double actual_base = settings.base;
if (parts.back().age > settings.lower_base_after)
actual_base = 1;
std::cerr << "parts.size(): " << parts.size()
<< ", actual_base: " << actual_base
<< ", max_total_size_to_merge: " << max_total_size_to_merge
<< "\n";
auto prev_right_it = parts.begin();
for (auto left_it = parts.begin(); left_it != parts.end(); ++left_it)
{
if (!can_merge_part(*left_it))
continue;
auto right_it = left_it;
auto prev_right_it = right_it;
auto right_it_minus_one = right_it;
++right_it;
size_t count = 1;
size_t sum_size = left_it->size;
size_t max_size = left_it->size;
time_t min_age = left_it->age;
PartsInPartition candidate;
candidate.push_back(*left_it);
for (; right_it != parts.end(); ++right_it)
{
++count;
sum_size += right_it->size;
if (right_it->size > max_size)
max_size = right_it->size;
if (right_it->age < min_age)
min_age = right_it->age;
double non_uniformity = static_cast<double>(max_size * count) / sum_size;
if (max_total_size_to_merge && sum_size > max_total_size_to_merge)
break;
if (count > settings.max_parts_to_merge_at_once)
if (settings.max_parts_to_merge_at_once && candidate.size() >= settings.max_parts_to_merge_at_once)
break;
if (!aggressive_mode && non_uniformity > settings.max_nonuniformity_of_sizes_to_merge)
break;
if (!can_merge_part(*right_it))
break;
if (!can_merge_adjacent(*prev_right_it, *right_it))
if (!can_merge_adjacent(*right_it_minus_one, *right_it))
break;
candidate.push_back(*right_it);
prev_right_it = right_it;
right_it_minus_one = right_it;
}
if (count <= 1)
if (candidate.size() <= 1)
continue;
size_t min_parts_to_merge_at_once_with_correction_by_age = settings.min_parts_to_merge_at_once;
if (min_age >= settings.lower_min_parts_to_merge_at_once_starting_at_time)
{
size_t min_parts_to_merge_at_once_correction = 1
+ std::log(static_cast<double>(min_age) / settings.lower_min_parts_to_merge_at_once_starting_at_time)
/ std::log(settings.lower_min_parts_to_merge_at_once_base_of_exponent);
if (min_parts_to_merge_at_once_with_correction_by_age >= min_parts_to_merge_at_once_correction + 2)
min_parts_to_merge_at_once_with_correction_by_age -= min_parts_to_merge_at_once_correction;
}
if (static_cast<double>(sum_size) / max_size < actual_base)
continue;
if (count < min_parts_to_merge_at_once_with_correction_by_age)
/// Do not select subrange of previously considered range.
if (right_it_minus_one <= prev_right_it)
continue;
estimator.consider(std::move(candidate), sum_size);
prev_right_it = right_it_minus_one;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册