提交 90dc182f 编写于 作者: M Michael Kolupaev

Merge

上级 6c4b3e33
......@@ -72,11 +72,11 @@ struct MergeTreeSettings
/// Трудоемкость выбора кусков O(N * max_parts_to_merge_at_once).
size_t max_parts_to_merge_at_once = 10;
/// Куски настолько большого размера в основном потоке объединять нельзя вообще.
size_t max_rows_to_merge_parts = 100 * 1024 * 1024;
/// Куски настолько большого размера объединять нельзя вообще.
size_t max_bytes_to_merge_parts = 25ul * 1024 * 1024 * 1024;
/// Куски настолько большого размера во втором потоке объединять нельзя вообще.
size_t max_rows_to_merge_parts_second = 1024 * 1024;
/// Не больше половины потоков одновременно могут выполнять слияния, в которых участвует хоть один кусок хотя бы такого размера.
size_t max_bytes_to_merge_parts_small = 250 * 1024 * 1024;
/// Во столько раз ночью увеличиваем коэффициент.
size_t merge_parts_at_night_inc = 10;
......@@ -222,8 +222,8 @@ public:
MergeTreeData & storage;
size_t size; /// в количестве засечек.
std::atomic<size_t> size_in_bytes; /// размер в байтах, 0 - если не посчитано;
/// atomic, чтобы можно было не заботиться о блокировках при ALTER.
volatile size_t size_in_bytes; /// размер в байтах, 0 - если не посчитано;
/// используется из нескольких потоков без блокировок (изменяется при ALTER).
time_t modification_time;
mutable time_t remove_time = std::numeric_limits<time_t>::max(); /// Когда кусок убрали из рабочего набора.
......
......@@ -43,6 +43,9 @@ public:
{
try
{
if (!Poco::File(path).exists())
throw Exception("Part " + path + " is missing", ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART);
for (const NameAndTypePair & column : columns)
addStream(column.name, *column.type, all_mark_ranges);
}
......
......@@ -21,9 +21,9 @@ static const double DISK_USAGE_COEFFICIENT_TO_RESERVE = 1.4;
/// Выбираем отрезок из не более чем max_parts_to_merge_at_once кусков так, чтобы максимальный размер был меньше чем в max_size_ratio_to_merge_parts раз больше суммы остальных.
/// Это обеспечивает в худшем случае время O(n log n) на все слияния, независимо от выбора сливаемых кусков, порядка слияния и добавления.
/// При max_parts_to_merge_at_once >= log(max_rows_to_merge_parts/index_granularity)/log(max_size_ratio_to_merge_parts),
/// При max_parts_to_merge_at_once >= log(max_bytes_to_merge_parts)/log(max_size_ratio_to_merge_parts),
/// несложно доказать, что всегда будет что сливать, пока количество кусков больше
/// log(max_rows_to_merge_parts/index_granularity)/log(max_size_ratio_to_merge_parts)*(количество кусков размером больше max_rows_to_merge_parts).
/// log(max_bytes_to_merge_parts)/log(max_size_ratio_to_merge_parts)*(количество кусков размером больше max_bytes_to_merge_parts).
/// Дальше эвристики.
/// Будем выбирать максимальный по включению подходящий отрезок.
/// Из всех таких выбираем отрезок с минимальным максимумом размера.
......@@ -57,19 +57,19 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa
/// Нужно для определения максимальности по включению.
int max_count_from_left = 0;
size_t cur_max_rows_to_merge_parts = data.settings.max_rows_to_merge_parts;
size_t cur_max_bytes_to_merge_parts = data.settings.max_bytes_to_merge_parts;
/// Если ночь, можем мерджить сильно большие куски
if (now_hour >= 1 && now_hour <= 5)
cur_max_rows_to_merge_parts *= data.settings.merge_parts_at_night_inc;
cur_max_bytes_to_merge_parts *= data.settings.merge_parts_at_night_inc;
if (only_small)
cur_max_rows_to_merge_parts = data.settings.max_rows_to_merge_parts_second;
cur_max_bytes_to_merge_parts = data.settings.max_bytes_to_merge_parts_small;
/// Найдем суммарный размер еще не пройденных кусков (то есть всех).
size_t size_of_remaining_parts = 0;
size_t size_in_bytes_of_remaining_parts = 0;
for (const auto & part : data_parts)
size_of_remaining_parts += part->size;
size_in_bytes_of_remaining_parts += part->size_in_bytes;
/// Левый конец отрезка.
for (MergeTreeData::DataParts::iterator it = data_parts.begin(); it != data_parts.end(); ++it)
......@@ -77,10 +77,10 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa
const MergeTreeData::DataPartPtr & first_part = *it;
max_count_from_left = std::max(0, max_count_from_left - 1);
size_of_remaining_parts -= first_part->size;
size_in_bytes_of_remaining_parts -= first_part->size_in_bytes;
/// Кусок достаточно мал или слияние "агрессивное".
if (first_part->size * data.index_granularity > cur_max_rows_to_merge_parts
if (first_part->size_in_bytes > cur_max_bytes_to_merge_parts
&& !aggressive)
{
continue;
......@@ -99,10 +99,9 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa
int cur_longest_len = 0;
/// Текущий отрезок, не обязательно валидный.
size_t cur_max = first_part->size;
size_t cur_min = first_part->size;
size_t cur_sum = first_part->size;
size_t cur_total_size = first_part->size_in_bytes;
size_t cur_max = first_part->size_in_bytes;
size_t cur_min = first_part->size_in_bytes;
size_t cur_sum = first_part->size_in_bytes;
int cur_len = 1;
DayNum_t month = first_part->left_month;
......@@ -134,7 +133,7 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa
}
/// Кусок достаточно мал или слияние "агрессивное".
if (last_part->size * data.index_granularity > cur_max_rows_to_merge_parts
if (last_part->size_in_bytes > cur_max_bytes_to_merge_parts
&& !aggressive)
break;
......@@ -146,22 +145,21 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa
}
oldest_modification_time = std::max(oldest_modification_time, last_part->modification_time);
cur_max = std::max(cur_max, last_part->size);
cur_min = std::min(cur_min, last_part->size);
cur_sum += last_part->size;
cur_total_size += last_part->size_in_bytes;
cur_max = std::max(cur_max, static_cast<size_t>(last_part->size_in_bytes));
cur_min = std::min(cur_min, static_cast<size_t>(last_part->size_in_bytes));
cur_sum += last_part->size_in_bytes;
++cur_len;
cur_id = last_part->right;
int min_len = 2;
int cur_age_in_sec = time(0) - oldest_modification_time;
/// Если куски примерно больше 1 Gb и образовались меньше 6 часов назад, то мерджить не меньше чем по 3.
if (cur_max * data.index_granularity * 150 > 1024*1024*1024 && cur_age_in_sec < 6*3600)
/// Если куски больше 1 Gb и образовались меньше 6 часов назад, то мерджить не меньше чем по 3.
if (cur_max > 1024 * 1024 * 1024 && cur_age_in_sec < 6 * 3600)
min_len = 3;
/// Размер кусков после текущих, делить на максимальный из текущих кусков. Чем меньше, тем новее текущие куски.
size_t oldness_coef = (size_of_remaining_parts + first_part->size - cur_sum + 0.0) / cur_max;
size_t oldness_coef = (size_in_bytes_of_remaining_parts + first_part->size_in_bytes - cur_sum + 0.0) / cur_max;
/// Эвристика: если после этой группы кусков еще накопилось мало строк, не будем соглашаться на плохо
/// сбалансированные слияния, расчитывая, что после будущих вставок данных появятся более привлекательные слияния.
......@@ -171,12 +169,12 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa
if (cur_len >= min_len
&& (static_cast<double>(cur_max) / (cur_sum - cur_max) < ratio
/// За старый месяц объединяем что угодно, если разрешено и если этому куску хотя бы 5 дней
|| (is_old_month && merge_anything_for_old_months && cur_age_in_sec > 3600*24*5)
|| (is_old_month && merge_anything_for_old_months && cur_age_in_sec > 3600 * 24 * 5)
/// Если слияние "агрессивное", то сливаем что угодно
|| aggressive))
{
/// Достаточно места на диске, чтобы покрыть новый мердж с запасом.
if (available_disk_space > cur_total_size * DISK_USAGE_COEFFICIENT_TO_SELECT)
if (available_disk_space > cur_sum * DISK_USAGE_COEFFICIENT_TO_SELECT)
{
cur_longest_max = cur_max;
cur_longest_min = cur_min;
......@@ -190,7 +188,7 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa
disk_space_warning_time = now;
LOG_WARNING(log, "Won't merge parts from " << first_part->name << " to " << last_part->name
<< " because not enough free space: " << available_disk_space << " free and unreserved, "
<< cur_total_size << " required now (+" << static_cast<int>((DISK_USAGE_COEFFICIENT_TO_SELECT - 1.0) * 100)
<< cur_sum << " required now (+" << static_cast<int>((DISK_USAGE_COEFFICIENT_TO_SELECT - 1.0) * 100)
<< "% on overhead); suppressing similar warnings for the next hour");
}
break;
......
......@@ -171,7 +171,7 @@ bool StorageMergeTree::merge(bool aggressive, BackgroundProcessingPool::Context
{
for (const auto & part : parts)
{
if (part->size * data.index_granularity > 25 * 1024 * 1024)
if (part->size_in_bytes > data.settings.max_bytes_to_merge_parts_small)
{
pool_context->incrementCounter("big merges");
break;
......
......@@ -798,7 +798,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
/// Если собираемся сливать большие куски, увеличим счетчик потоков, сливающих большие куски.
for (const auto & part : parts)
{
if (part->size * data.index_granularity > 25 * 1024 * 1024)
if (part->size_in_bytes > data.settings.max_bytes_to_merge_parts_small)
{
pool_context.incrementCounter("big merges");
pool_context.incrementCounter("replicated big merges");
......@@ -1041,7 +1041,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
MergeTreeData::DataPartPtr part = data.getContainingPart(name);
if (!part || part->name != name)
continue;
if (part->size * data.index_granularity > 25 * 1024 * 1024)
if (part->size_in_bytes > data.settings.max_bytes_to_merge_parts_small)
{
has_big_merge = true;
break;
......@@ -1351,7 +1351,11 @@ void StorageReplicatedMergeTree::partCheckThread()
* Для кусков, полученных в результате слияния такая проверка была бы некорректной,
* потому что слитого куска может еще ни у кого не быть.
*/
if (part_info.left == part_info.right)
if (part_info.left != part_info.right)
{
LOG_WARNING(log, "Not checking if part " << part_name << " is lost because it is a result of a merge.");
}
else
{
LOG_WARNING(log, "Checking if anyone has part covering " << part_name << ".");
......
#include <DB/Storages/StorageMergeTree.h>
using namespace DB;
using namespace std;
struct DataPart
{
int time; ///левая граница куска
size_t size; /// количество строк в куске
int currently_merging; /// 0 если не мерджится, иначе номер "потока"
int modification_time;
DataPart () : time(0), size(0), currently_merging(0), modification_time(0) {};
DataPart (int time, size_t val) : time(time), size(val), currently_merging(0), modification_time(time) {};
DataPart (int time, size_t val, int currently_merging) : time(time), size(val), currently_merging(currently_merging), modification_time(time) {};
DataPart (int time, size_t val, int currently_merging, int modification_time) : time(time), size(val), currently_merging(currently_merging), modification_time(modification_time) {};
};
bool operator < (const DataPart &a, const DataPart &b)
{
if (a.time != b.time)
return a.time < b.time;
return a.size < b.size;
}
typedef Poco::SharedPtr<DataPart> DataPtr;
struct DataPtrLess { bool operator() (const DataPtr & lhs, const DataPtr & rhs) const { return *lhs < *rhs; } };
typedef std::set<DataPtr, DataPtrLess> DataParts;
DataParts copy(const DataParts &a)
{
DataParts res;
for (DataParts::iterator it = a.begin(); it != a.end(); it ++)
res.insert(new DataPart((*it)->time, (*it)->size, (*it)->currently_merging));
return res;
}
const int RowsPerSec = 100000;
MergeTreeSettings settings;
size_t index_granularity = 1;
/// Time, Type, Value
multiset<pair<int, pair<int, int> > > events;
/// Текущие части в merge tree
DataParts data_parts;
/// Первый свободный номер потока для мерджа
int uniqId = 1;
/// Разные статистики
long long totalMergeTime = 0, totalSize = 0;
DataParts maxCount, maxMerging, maxThreads;
int maxCountMoment, maxMergingMoment, maxThreadsMoment, maxScheduledThreadsMoment;
int maxScheduledThreads;
double averageNumberOfParts = 0.0;
int mergeScheduled = 0;
int cur_time = 0;
int genRand(int l, int r)
{
return l + rand() % (r - l + 1);
}
bool selectPartsToMerge(std::vector<DataPtr> & parts)
{
// LOG_DEBUG(log, "Selecting parts to merge");
size_t min_max = -1U;
size_t min_min = -1U;
int max_len = 0;
DataParts::iterator best_begin;
bool found = false;
/// Сколько кусков, начиная с текущего, можно включить в валидный отрезок, начинающийся левее текущего куска.
/// Нужно для определения максимальности по включению.
int max_count_from_left = 0;
size_t cur_max_rows_to_merge_parts = settings.max_rows_to_merge_parts;
/// Если ночь, можем мерджить сильно большие куски
// if (now_hour >= 1 && now_hour <= 5)
// cur_max_rows_to_merge_parts *= settings.merge_parts_at_night_inc;
/// Если есть активный мердж крупных кусков, то ограничаемся мерджем только маленьких частей.
for (DataParts::iterator it = data_parts.begin(); it != data_parts.end(); ++it)
if ((*it)->currently_merging && (*it)->size * index_granularity > 25 * 1024 * 1024)
cur_max_rows_to_merge_parts = settings.max_rows_to_merge_parts_second;
/// Найдем суммарный размер еще не пройденных кусков (то есть всех).
size_t size_of_remaining_parts = 0;
for (const auto & part : data_parts)
size_of_remaining_parts += part->size;
/// Левый конец отрезка.
for (DataParts::iterator it = data_parts.begin(); it != data_parts.end(); ++it)
{
const DataPtr & first_part = *it;
max_count_from_left = std::max(0, max_count_from_left - 1);
size_of_remaining_parts -= first_part->size;
/// Кусок не занят и достаточно мал.
if (first_part->currently_merging ||
first_part->size * index_granularity > cur_max_rows_to_merge_parts)
continue;
/// Самый длинный валидный отрезок, начинающийся здесь.
size_t cur_longest_max = -1U;
size_t cur_longest_min = -1U;
int cur_longest_len = 0;
/// Текущий отрезок, не обязательно валидный.
size_t cur_max = first_part->size;
size_t cur_min = first_part->size;
size_t cur_sum = first_part->size;
int cur_len = 1;
int oldest_modification_time = first_part->modification_time;
/// Правый конец отрезка.
DataParts::iterator jt = it;
for (++jt; jt != data_parts.end() && cur_len < static_cast<int>(settings.max_parts_to_merge_at_once); ++jt)
{
const DataPtr & last_part = *jt;
/// Кусок не занят, достаточно мал и в одном правильном месяце.
if (last_part->currently_merging ||
last_part->size * index_granularity > cur_max_rows_to_merge_parts)
break;
oldest_modification_time = std::max(oldest_modification_time, last_part->modification_time);
cur_max = std::max(cur_max, last_part->size);
cur_min = std::min(cur_min, last_part->size);
cur_sum += last_part->size;
++cur_len;
int min_len = 2;
int cur_age_in_sec = cur_time - oldest_modification_time;
/// Если куски примерно больше 1 Gb и образовались меньше 6 часов назад, то мерджить не меньше чем по 3.
if (cur_max * index_granularity * 150 > 1024*1024*1024 && cur_age_in_sec < 6*3600)
min_len = 3;
/// Размер кусков после текущих, делить на максимальный из текущих кусков. Чем меньше, тем новее текущие куски.
size_t oldness_coef = (size_of_remaining_parts + first_part->size - cur_sum + 0.0) / cur_max;
/// Эвристика: если после этой группы кусков еще накопилось мало строк, не будем соглашаться на плохо
/// сбалансированные слияния, расчитывая, что после будущих вставок данных появятся более привлекательные слияния.
double ratio = (oldness_coef + 1) * settings.size_ratio_coefficient_to_merge_parts;
/// Если отрезок валидный, то он самый длинный валидный, начинающийся тут.
if (cur_len >= min_len &&
(static_cast<double>(cur_max) / (cur_sum - cur_max) < ratio))
{
cur_longest_max = cur_max;
cur_longest_min = cur_min;
cur_longest_len = cur_len;
}
}
/// Это максимальный по включению валидный отрезок.
if (cur_longest_len > max_count_from_left)
{
max_count_from_left = cur_longest_len;
if (!found ||
std::make_pair(std::make_pair(cur_longest_max, cur_longest_min), -cur_longest_len) <
std::make_pair(std::make_pair(min_max, min_min), -max_len))
{
found = true;
min_max = cur_longest_max;
min_min = cur_longest_min;
max_len = cur_longest_len;
best_begin = it;
}
}
}
if (found)
{
parts.clear();
DataParts::iterator it = best_begin;
for (int i = 0; i < max_len; ++i)
{
parts.push_back(*it);
parts.back()->currently_merging = true;
++it;
}
}
return found;
}
size_t getMergeSize(const DataParts &a)
{
size_t res = 0;
for (DataParts::iterator it = a.begin(); it != a.end(); ++it)
if ((*it)->currently_merging)
res += (*it)->size;
return res;
}
size_t getThreads(const DataParts &a)
{
set<int> res;
for (DataParts::iterator it = a.begin(); it != a.end(); ++it)
res.insert((*it)->currently_merging);
res.erase(0);
return res.size();
}
void writeParts(const DataParts &a)
{
for (DataParts::iterator it = a.begin(); it != a.end(); ++it)
printf("(%d, %d) ", (int)(*it)->size, (int)(*it)->currently_merging);
puts("\n");
}
void updateStat(int time)
{
if (maxCount.size() < data_parts.size())
{
maxCount = copy(data_parts);
maxCountMoment = time;
}
if (getMergeSize(maxMerging) < getMergeSize(data_parts))
{
maxMerging = copy(data_parts);
maxMergingMoment = time;
}
if (getThreads(maxThreads) < getThreads(data_parts))
{
maxThreads = copy(data_parts);
maxThreadsMoment = time;
}
if (maxScheduledThreads < mergeScheduled)
{
maxScheduledThreads = mergeScheduled;
maxScheduledThreadsMoment = time;
}
}
/// выбрать кого мерджить, оценить время и добавить событие об окончании
bool makeMerge(int cur_time) {
if (getThreads(data_parts) >= DBMS_DEFAULT_BACKGROUND_POOL_SIZE) return 0;
if (mergeScheduled == 0) return 0;
mergeScheduled --;
std::vector<DataPtr> e;
if (!selectPartsToMerge(e)) return 1;
int curId = ++uniqId;
size_t size = 0;
for (size_t i = 0; i < e.size(); ++i)
{
e[i]->currently_merging = curId;
size += e[i]->size;
}
size_t need_time = (size + RowsPerSec - 1) / RowsPerSec;
totalMergeTime += need_time;
events.insert(make_pair(cur_time + need_time, make_pair(2, curId)));
return 1;
}
/// Запустить потоки мерджа
void merge(int cur_time, int cnt)
{
mergeScheduled += cnt;
}
/// Обработать событие
void process(pair<int, pair<int, int> > ev)
{
int cur_time = ev.first;
int type = ev.second.first;
int val = ev.second.second;
/// insert
if (type == 1)
{
data_parts.insert(new DataPart(cur_time, val));
totalSize += val;
merge(cur_time, 2);
} else if (type == 2) /// merge done
{
size_t size = 0;
int st = (int)1e9;
DataParts newData;
for (DataParts::iterator it = data_parts.begin(); it != data_parts.end();)
{
if ((*it)->currently_merging == val)
{
size += (*it)->size;
st = min(st, (*it)->time);
DataParts::iterator nxt = it;
++nxt;
data_parts.erase(it);
it = nxt;
} else
++it;
}
data_parts.insert(new DataPart(st, size, 0, cur_time));
} else if (type == 3) /// do merge
{
merge(cur_time, val);
}
while (makeMerge(cur_time));
}
int main()
{
srand(time(0));
int delay = 12;
int total_time = 60*60*30;
for (int i = 0; i < total_time/delay; ++i)
{
int cur_time = genRand(0, total_time);
if (rand() & 7)
events.insert(make_pair(cur_time, make_pair(1, genRand(70000, 80000))));
else {
events.insert(make_pair(cur_time, make_pair(1, genRand(10000, 30000))));
}
}
int iter = 0;
maxCount = data_parts;
puts("________________________________________________________________________________________________________");
puts("Couple moments from the process log:");
while (events.size() > 0)
{
int last_time = cur_time;
cur_time = events.begin()->first;
averageNumberOfParts += 1.0 * (cur_time - last_time) * data_parts.size();
if (cur_time > total_time) break;
updateStat(cur_time);
++iter;
if (iter % 3000 == 0)
{
printf("Current time: %d\n", cur_time);
printf("Current parts:");
writeParts(data_parts);
}
process(*events.begin());
events.erase(*events.begin());
}
total_time = cur_time;
averageNumberOfParts /= cur_time;
puts("________________________________________________________________________________________________________");
puts("During whole process: ");
printf("Max number of alive parts was at %d second with %d parts\n", maxCountMoment, (int) maxCount.size());
writeParts(maxCount);
printf("Max total size of merging parts was at %d second with %d rows in merge\n", maxMergingMoment, (int)getMergeSize(maxMerging));
writeParts(maxMerging);
printf("Max number of running threads was at %d second with %d threads\n", maxThreadsMoment, (int)getThreads(maxThreads));
writeParts(maxThreads);
printf("Max number of scheduled threads was at %d second with %d threads\n", maxScheduledThreadsMoment, maxScheduledThreads);
printf("Total merge time %lld sec\n", totalMergeTime);
printf("Total time %d sec\n", total_time);
printf("Total parts size %lld\n", totalSize);
printf("Total merged Rows / total rows %0.5lf \n", 1.0 * totalMergeTime * RowsPerSec / totalSize);
printf("Average number of data parts is %0.5lf\n", averageNumberOfParts);
puts("________________________________________________________________________________________________________");
puts("Result configuration:\n");
writeParts(data_parts);
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册