提交 f0a009bd 编写于 作者: A Alexey Milovidov

dbms: attempt to fix an error [#METR-19399].

上级 061306d0
......@@ -752,6 +752,15 @@ public:
*/
size_t getMaxPartsCountForMonth() const;
/** Минимальный номер блока в указанном месяце.
* Возвращает также bool - есть ли хоть один кусок.
*/
std::pair<Int64, bool> getMinBlockNumberForMonth(DayNum_t month) const;
/** Есть ли указанный номер блока в каком-нибудь куске указанного месяца.
*/
bool hasBlockNumberInMonth(Int64 block_number, DayNum_t month) const;
/** Если в таблице слишком много активных кусков, спит некоторое время, чтобы дать им возможность смерджиться.
* Если передано until - проснуться раньше, если наступило событие.
*/
......@@ -878,6 +887,8 @@ public:
static String getMonthName(DayNum_t month);
static DayNum_t getMonthDayNum(const Field & partition);
static DayNum_t getMonthFromName(const String & month_name);
/// Получить месяц из имени куска или достаточной его части.
static DayNum_t getMonthFromPartPrefix(const String & part_prefix);
Context & context;
const String date_column_name;
......
......@@ -117,8 +117,10 @@ MergeTreeData::MergeTreeData(
Int64 MergeTreeData::getMaxDataPartIndex()
{
std::lock_guard<std::mutex> lock_all(all_data_parts_mutex);
Int64 max_part_id = 0;
for (const auto & part : data_parts)
for (const auto & part : all_data_parts)
max_part_id = std::max(max_part_id, part->right);
return max_part_id;
......@@ -967,6 +969,36 @@ size_t MergeTreeData::getMaxPartsCountForMonth() const
return res;
}
std::pair<Int64, bool> MergeTreeData::getMinBlockNumberForMonth(DayNum_t month) const
{
std::lock_guard<std::mutex> lock(data_parts_mutex);
for (const auto & part : data_parts) /// Поиск можно сделать лучше.
if (part->month == month)
return { part->left, true }; /// Блоки в data_parts упорядочены по month и left.
return { 0, false };
}
bool MergeTreeData::hasBlockNumberInMonth(Int64 block_number, DayNum_t month) const
{
std::lock_guard<std::mutex> lock(data_parts_mutex);
for (const auto & part : data_parts) /// Поиск можно сделать лучше.
{
if (part->month == month && part->left <= block_number && part->right >= block_number)
return true;
if (part->month > month)
break;
}
return false;
}
void MergeTreeData::delayInsertIfNeeded(Poco::Event * until)
{
size_t parts_count = getMaxPartsCountForMonth();
......@@ -1419,4 +1451,9 @@ DayNum_t MergeTreeData::getMonthFromName(const String & month_name)
return date;
}
DayNum_t MergeTreeData::getMonthFromPartPrefix(const String & part_prefix)
{
return getMonthFromName(part_prefix.substr(0, strlen("YYYYMM")));
}
}
......@@ -88,6 +88,27 @@ const auto WAIT_FOR_REPLICA_QUEUE_MS = 10 * 1000;
const auto MERGE_SELECTING_SLEEP_MS = 5 * 1000;
/** Добавляемым блокам данных присваиваются некоторые номера - целые числа.
* Для добавляемых обычным способом (INSERT) блоков, номера выделяются по возрастанию.
* Слияния делаются для диапазонов номеров блоков на числовой прямой:
* если в слиянии участвуют номера блоков x, z, и есть блок с номером y, что x < y < z, то блок с номером y тоже участвует в слиянии.
* Это требуется для сохранения свойств некоторых операций, которые могут производиться при слиянии - например, в CollapsingMergeTree.
* В частности, это позволяет во время слияния знать, что в одном куске все данные были добавлены раньше, чем все данные в другом куске.
*
* Изредка возникает необходимость добавить в таблицу какой-то заведомо старый кусок данных,
* чтобы он воспринимался как старый в логике работы CollapsingMergeTree.
* Такой кусок данных можно добавить с помощью специального запроса ATTACH.
* И в этом случае, мы должны выделить этому куску номера меньшие, чем номера всех остальных кусков.
* В связи с этим, номера обычных кусков, добавляемых INSERT-ом, начинаются не с нуля, а с большего числа,
* а меньшие номера считаются "зарезервированными".
*
* Почему это число равно 200?
* Дело в том, что раньше не поддерживались отрицательные номера блоков.
* А также, слияние сделано так, что при увеличении количества кусков, вставка новых кусков специально замедляется,
* пока слияния не успеют уменьшить число кусков; и это было рассчитано примерно для 200 кусков.
* А значит, что при вставке в таблицу всех кусков из другой таблицы, 200 номеров наверняка достаточно.
* В свою очередь, это число выбрано почти наугад.
*/
const Int64 RESERVED_BLOCK_NUMBERS = 200;
......@@ -1455,15 +1476,30 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
}
/// Можно слить куски, если все номера между ними заброшены - не соответствуют никаким блокам.
/// Номера до RESERVED_BLOCK_NUMBERS всегда не соответствуют никаким блокам.
for (Int64 number = std::max(RESERVED_BLOCK_NUMBERS, left->right + 1); number <= right->left - 1; ++number)
{
String path1 = zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number);
String path2 = zookeeper_path + "/nonincrement_block_numbers/" + month_name + "/block-" + padIndex(number);
for (Int64 number = left->right + 1; number <= right->left - 1; ++number)
{
/** Для номеров до RESERVED_BLOCK_NUMBERS не используется AbandonableLock
* - такие номера не могут быть "заброшены" - то есть, не использованными для кусков.
* Это номера кусков, которые были добавлены с помощью ALTER ... ATTACH.
* Они должны идти без пропусков (для каждого номера должен быть кусок).
* Проверяем, что для всех таких номеров есть куски,
* иначе, через "дыры" - отсутствующие куски, нельзя мерджить.
*/
if (AbandonableLockInZooKeeper::check(path1, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED &&
AbandonableLockInZooKeeper::check(path2, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED)
return false;
if (number < RESERVED_BLOCK_NUMBERS)
{
if (!data.hasBlockNumberInMonth(number, left->month))
return false;
}
else
{
String path1 = zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number);
String path2 = zookeeper_path + "/nonincrement_block_numbers/" + month_name + "/block-" + padIndex(number);
if (AbandonableLockInZooKeeper::check(path1, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED &&
AbandonableLockInZooKeeper::check(path2, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED)
return false;
}
}
memoized_parts_that_could_be_merged.insert(key);
......@@ -2821,15 +2857,11 @@ void StorageReplicatedMergeTree::attachPartition(ASTPtr query, const Field & fie
/// Выделим добавляемым кускам максимальные свободные номера, меньшие RESERVED_BLOCK_NUMBERS.
/// NOTE: Проверка свободности номеров никак не синхронизируется. Выполнять несколько запросов ATTACH/DETACH/DROP одновременно нельзя.
Int64 min_used_number = RESERVED_BLOCK_NUMBERS;
DayNum_t month = DateLUT::instance().makeDayNum(parse<UInt16>(partition.substr(0, 4)), parse<UInt8>(partition.substr(4, 2)), 1);
DayNum_t month = MergeTreeData::getMonthFromPartPrefix(partition);
{
/// Немного неоптимально.
auto existing_parts = data.getAllDataParts();
for (const auto & part : existing_parts)
if (part->month == month)
min_used_number = std::min(min_used_number, part->left);
}
auto num_and_exists = data.getMinBlockNumberForMonth(month);
if (num_and_exists.second)
min_used_number = num_and_exists.first;
/// Добавим записи в лог.
std::reverse(parts.begin(), parts.end());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册