提交 a5bfc709 编写于 作者: A Andrey Mironov

Merge

上级 85d2dbcd
......@@ -20,16 +20,15 @@ struct MergeTreeReadTask
const NamesAndTypesList & columns;
const NamesAndTypesList & pre_columns;
const bool remove_prewhere_column;
const MarkRanges & all_ranges;
const bool should_reorder;
MergeTreeReadTask(const MergeTreeData::DataPartPtr & data_part, const MarkRanges & ranges,
const std::size_t part_index_in_query, const Names & ordered_names,
const NameSet & column_name_set, const NamesAndTypesList & columns,
const NamesAndTypesList & pre_columns, const bool remove_prewhere_column,
const MarkRanges & all_ranges)
MergeTreeReadTask(
const MergeTreeData::DataPartPtr & data_part, const MarkRanges & ranges, const std::size_t part_index_in_query,
const Names & ordered_names, const NameSet & column_name_set, const NamesAndTypesList & columns,
const NamesAndTypesList & pre_columns, const bool remove_prewhere_column, const bool should_reorder)
: data_part{data_part}, mark_ranges{ranges}, part_index_in_query{part_index_in_query},
ordered_names{ordered_names}, column_name_set{column_name_set}, columns{columns}, pre_columns{pre_columns},
remove_prewhere_column{remove_prewhere_column}, all_ranges{all_ranges}
remove_prewhere_column{remove_prewhere_column}, should_reorder{should_reorder}
{}
};
......@@ -38,12 +37,12 @@ using MergeTreeReadTaskPtr = std::unique_ptr<MergeTreeReadTask>;
class MergeTreeReadPool
{
public:
MergeTreeReadPool(const RangesInDataParts & parts, const std::vector<std::size_t> & per_part_sum_marks,
const std::size_t sum_marks, MergeTreeData & data, const ExpressionActionsPtr & prewhere_actions,
const String & prewhere_column_name, const bool check_columns, const Names & column_names)
: parts{parts}, per_part_sum_marks{per_part_sum_marks}, sum_marks{sum_marks}, data{data}
MergeTreeReadPool(
const RangesInDataParts & parts, MergeTreeData & data, const ExpressionActionsPtr & prewhere_actions,
const String & prewhere_column_name, const bool check_columns, const Names & column_names)
: parts{parts}, data{data}, column_names{column_names}
{
fillPerPartInfo(column_names, prewhere_actions, prewhere_column_name, check_columns);
fillPerPartInfo(prewhere_actions, prewhere_column_name, check_columns);
}
MergeTreeReadPool(const MergeTreeReadPool &) = delete;
......@@ -53,18 +52,13 @@ public:
{
const std::lock_guard<std::mutex> lock{mutex};
if (0 == sum_marks)
if (remaining_part_indices.empty())
return nullptr;
/// @todo use map to speedup lookup
/// find a part which has marks remaining
std::size_t part_id = 0;
for (; part_id < parts.size(); ++part_id)
if (0 != per_part_sum_marks[part_id])
break;
const auto part_id = remaining_part_indices.back();
auto & part = parts[part_id];
const auto & ordered_names = per_part_ordered_names[part_id];
const auto & column_name_set = per_part_column_name_set[part_id];
const auto & columns = per_part_columns[part_id];
const auto & pre_columns = per_part_pre_columns[part_id];
......@@ -92,7 +86,8 @@ public:
ranges_to_get_from_part = part.ranges;
marks_in_part -= marks_to_get_from_range;
sum_marks -= marks_to_get_from_range;
remaining_part_indices.pop_back();
}
else
{
......@@ -111,22 +106,39 @@ public:
marks_in_part -= marks_to_get_from_range;
need_marks -= marks_to_get_from_range;
sum_marks -= marks_to_get_from_range;
}
if (0 == marks_in_part)
remaining_part_indices.pop_back();
}
return std::make_unique<MergeTreeReadTask>(
part.data_part, ranges_to_get_from_part, part.part_index_in_query, ordered_names, column_name_set, columns,
pre_columns, remove_prewhere_column, per_part_all_ranges[part_id]);
part.data_part, ranges_to_get_from_part, part.part_index_in_query, column_names, column_name_set, columns,
pre_columns, remove_prewhere_column, per_part_should_reorder[part_id]);
}
public:
void fillPerPartInfo(const Names & column_names, const ExpressionActionsPtr & prewhere_actions,
const String & prewhere_column_name, const bool check_columns)
void fillPerPartInfo(
const ExpressionActionsPtr & prewhere_actions, const String & prewhere_column_name, const bool check_columns)
{
for (const auto & part : parts)
remaining_part_indices.reserve(parts.size());
for (const auto i : ext::range(0, parts.size()))
{
per_part_all_ranges.push_back(part.ranges);
auto & part = parts[i];
/// Посчитаем засечки для каждого куска.
size_t sum_marks = 0;
/// Пусть отрезки будут перечислены справа налево, чтобы можно было выбрасывать самый левый отрезок с помощью pop_back().
std::reverse(std::begin(part.ranges), std::end(part.ranges));
for (const auto & range : part.ranges)
sum_marks += range.end - range.begin;
per_part_sum_marks.push_back(sum_marks);
if (0 != sum_marks)
remaining_part_indices.push_back(i);
per_part_columns_lock.push_back(std::make_unique<Poco::ScopedReadRWLock>(
part.data_part->columns_lock));
......@@ -135,18 +147,13 @@ public:
auto required_column_names = column_names;
const auto injected_columns = injectRequiredColumns(part.data_part, required_column_names);
/// insert injected columns into ordered columns list to avoid exception about different block structures
auto ordered_names = column_names;
ordered_names.insert(std::end(ordered_names), std::begin(injected_columns), std::end(injected_columns));
per_part_ordered_names.emplace_back(ordered_names);
auto should_reoder = !injected_columns.empty();
Names required_pre_column_names;
if (prewhere_actions)
{
/// collect columns required for PREWHERE evaluation
/// @todo minimum size column may be added here due to const condition, thus invalidating ordered_names
required_pre_column_names = prewhere_actions->getRequiredColumns();
/// there must be at least one column required for PREWHERE
......@@ -154,7 +161,9 @@ public:
required_pre_column_names.push_back(required_column_names[0]);
/// PREWHERE columns may require some additional columns for DEFAULT evaluation
(void) injectRequiredColumns(part.data_part, required_pre_column_names);
const auto injected_pre_columns = injectRequiredColumns(part.data_part, required_pre_column_names);
if (!injected_pre_columns.empty())
should_reoder = true;
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
const NameSet pre_name_set{
......@@ -193,6 +202,8 @@ public:
per_part_pre_columns.push_back(part.data_part->columns.addTypes(required_pre_column_names));
per_part_columns.push_back(part.data_part->columns.addTypes(required_column_names));
}
per_part_should_reorder.push_back(should_reoder);
}
}
......@@ -205,13 +216,18 @@ public:
NameSet required_columns{std::begin(columns), std::end(columns)};
NameSet injected_columns;
auto all_column_files_missing = true;
for (size_t i = 0; i < columns.size(); ++i)
{
const auto & column_name = columns[i];
/// column has files and hence does not require evaluation
if (part->hasColumnFiles(column_name))
{
all_column_files_missing = false;
continue;
}
const auto default_it = data.column_defaults.find(column_name);
/// columns has no explicit default expression
......@@ -237,21 +253,70 @@ public:
}
}
if (all_column_files_missing)
{
addMinimumSizeColumn(part, columns);
/// correctly report added column
injected_columns.insert(columns.back());
}
return injected_columns;
}
/** Добавить столбец минимального размера.
* Используется в случае, когда ни один столбец не нужен, но нужно хотя бы знать количество строк.
* Добавляет в columns.
*/
void addMinimumSizeColumn(const MergeTreeData::DataPartPtr & part, Names & columns) const
{
const auto get_column_size = [this, &part] (const String & name) {
const auto & files = part->checksums.files;
const auto escaped_name = escapeForFileName(name);
const auto bin_file_name = escaped_name + ".bin";
const auto mrk_file_name = escaped_name + ".mrk";
return files.find(bin_file_name)->second.file_size + files.find(mrk_file_name)->second.file_size;
};
const auto & storage_columns = data.getColumnsList();
const NameAndTypePair * minimum_size_column = nullptr;
auto minimum_size = std::numeric_limits<size_t>::max();
for (const auto & column : storage_columns)
{
if (!part->hasColumnFiles(column.name))
continue;
const auto size = get_column_size(column.name);
if (size < minimum_size)
{
minimum_size = size;
minimum_size_column = &column;
}
}
if (!minimum_size_column)
throw Exception{
"Could not find a column of minimum size in MergeTree",
ErrorCodes::LOGICAL_ERROR
};
columns.push_back(minimum_size_column->name);
}
std::vector<std::unique_ptr<Poco::ScopedReadRWLock>> per_part_columns_lock;
RangesInDataParts parts;
std::vector<MarkRanges> per_part_all_ranges;
std::vector<std::size_t> per_part_sum_marks;
std::size_t sum_marks;
std::vector<std::size_t> remaining_part_indices;
MergeTreeData & data;
std::vector<Names> per_part_ordered_names;
Names column_names;
std::vector<NameSet> per_part_column_name_set;
std::vector<NamesAndTypesList> per_part_columns;
std::vector<NamesAndTypesList> per_part_pre_columns;
/// @todo actually all of these values are either true or false for the whole query, thus no vector required
std::vector<bool> per_part_remove_prewhere_column;
std::vector<bool> per_part_should_reorder;
mutable std::mutex mutex;
};
......
......@@ -62,20 +62,14 @@ public:
{
size_t max_rows_to_read = (to_mark - from_mark) * storage.index_granularity;
/** Для некоторых столбцов файлы с данными могут отсутствовать.
* Это бывает для старых кусков, после добавления новых столбцов в структуру таблицы.
*/
auto has_missing_columns = false;
/// Указатели на столбцы смещений, общие для столбцов из вложенных структур данных
/// Если append, все значения nullptr, и offset_columns используется только для проверки, что столбец смещений уже прочитан.
OffsetColumns offset_columns;
const auto read_column = [&] (const NameAndTypePair & it) {
for (const NameAndTypePair & it : columns)
{
if (streams.end() == streams.find(it.name))
{
has_missing_columns = true;
return;
}
continue;
/// Все столбцы уже есть в блоке. Будем добавлять значения в конец.
bool append = res.has(it.name);
......@@ -108,24 +102,12 @@ public:
if (!append && column.column->size())
res.insert(column);
};
for (const NameAndTypePair & it : columns)
read_column(it);
if (has_missing_columns && !res)
{
addMinimumSizeColumn();
/// minimum size column is necessarily at list's front
read_column(columns.front());
}
}
catch (const Exception & e)
{
if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
{
storage.reportBrokenPart(part_name);
}
/// Более хорошая диагностика.
throw Exception(e.message() + "\n(while reading from part " + path + " from mark " + toString(from_mark) + " to "
......@@ -139,60 +121,13 @@ public:
}
}
/** Добавить столбец минимального размера.
* Используется в случае, когда ни один столбец не нужен, но нужно хотя бы знать количество строк.
* Добавляет в columns.
*/
void addMinimumSizeColumn()
{
const auto get_column_size = [this] (const String & name) {
const auto & files = data_part->checksums.files;
const auto escaped_name = escapeForFileName(name);
const auto bin_file_name = escaped_name + ".bin";
const auto mrk_file_name = escaped_name + ".mrk";
return files.find(bin_file_name)->second.file_size + files.find(mrk_file_name)->second.file_size;
};
const auto & storage_columns = storage.getColumnsList();
const NameAndTypePair * minimum_size_column = nullptr;
auto minimum_size = std::numeric_limits<size_t>::max();
for (const auto & column : storage_columns)
{
if (!data_part->hasColumnFiles(column.name))
continue;
const auto size = get_column_size(column.name);
if (size < minimum_size)
{
minimum_size = size;
minimum_size_column = &column;
}
}
if (!minimum_size_column)
throw Exception{
"could not find a column of minimum size in MergeTree",
ErrorCodes::LOGICAL_ERROR
};
addStream(minimum_size_column->name, *minimum_size_column->type, all_mark_ranges);
columns.emplace(std::begin(columns), *minimum_size_column);
added_minimum_size_column = &columns.front();
}
/** Добавляет в блок недостающие столбцы из ordered_names, состоящие из значений по-умолчанию.
* Недостающие столбцы добавляются в позиции, такие же как в ordered_names.
* Если был добавлен хотя бы один столбец - то все столбцы в блоке переупорядочиваются как в ordered_names.
*/
void fillMissingColumns(Block & res, const Names & ordered_names)
void fillMissingColumns(Block & res, const Names & ordered_names, const bool always_reorder = false)
{
fillMissingColumnsImpl(res, ordered_names, false);
fillMissingColumnsImpl(res, ordered_names, always_reorder);
}
/** То же самое, но всегда переупорядочивает столбцы в блоке, как в ordered_names
......@@ -345,9 +280,8 @@ private:
String part_name;
FileStreams streams;
/// Запрашиваемые столбцы. Возможно, с добавлением minimum_size_column.
/// Запрашиваемые столбцы.
NamesAndTypesList columns;
const NameAndTypePair * added_minimum_size_column = nullptr;
UncompressedCache * uncompressed_cache;
MarkCache * mark_cache;
......@@ -538,15 +472,6 @@ private:
if (should_evaluate_defaults)
evaluateMissingDefaults(res, columns, storage.column_defaults, storage.context);
/// remove added column to ensure same content among all blocks
if (added_minimum_size_column)
{
res.erase(0);
streams.erase(added_minimum_size_column->name);
columns.erase(std::begin(columns));
added_minimum_size_column = nullptr;
}
/// sort columns to ensure consistent order among all blocks
if (should_sort)
{
......@@ -556,12 +481,6 @@ private:
if (res.has(name))
ordered_block.insert(res.getByName(name));
if (res.columns() != ordered_block.columns())
throw Exception{
"Ordered block has different number of columns than original one:\n" +
ordered_block.dumpNames() + "\nvs.\n" + res.dumpNames(),
ErrorCodes::LOGICAL_ERROR};
std::swap(res, ordered_block);
}
}
......
......@@ -56,72 +56,15 @@ protected:
{
Block res;
while (true)
while (!res)
{
if (!task)
{
task = pool->getTask(min_marks_to_read);
if (!task)
break;
if (!initialized)
{
if (use_uncompressed_cache)
owned_uncompressed_cache = storage.context.getUncompressedCache();
owned_mark_cache = storage.context.getMarkCache();
initialized = true;
}
const auto path = storage.getFullPath() + task->data_part->name + '/';
if (!reader || reader->getDataPart() != task->data_part)
{
reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(),
storage, task->all_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
if (prewhere_actions)
pre_reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(),
owned_mark_cache.get(), storage, task->all_ranges, min_bytes_to_use_direct_io,
max_read_buffer_size);
}
}
if (!task && !getNewTask())
break;
res = readFromPart();
if (res)
{
const auto rows = res.rowsInFirstColumn();
/// add virtual columns
if (!virt_column_names.empty())
{
for (const auto & virt_column_name : virt_column_names)
{
if (virt_column_name == "_part")
{
res.insert(ColumnWithNameAndType{
ColumnConst<String>{rows, task->data_part->name}.convertToFullColumn(),
new DataTypeString,
virt_column_name
});
}
else if (virt_column_name == "_part_index")
{
res.insert(ColumnWithNameAndType{
ColumnConst<UInt64>{rows, task->part_index_in_query}.convertToFullColumn(),
new DataTypeUInt64,
virt_column_name
});
}
}
}
}
injectVirtualColumns(res);
if (task->mark_ranges.empty())
{
......@@ -132,15 +75,44 @@ protected:
reader = {};
pre_reader = {};
}
if (res)
break;
}
return res;
}
private:
bool getNewTask()
{
task = pool->getTask(min_marks_to_read);
if (!task)
return false;
if (!initialized)
{
if (use_uncompressed_cache)
owned_uncompressed_cache = storage.context.getUncompressedCache();
owned_mark_cache = storage.context.getMarkCache();
initialized = true;
}
const auto path = storage.getFullPath() + task->data_part->name + '/';
reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(),
storage, task->mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
if (prewhere_actions)
pre_reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(),
owned_mark_cache.get(), storage, task->mark_ranges, min_bytes_to_use_direct_io,
max_read_buffer_size);
return true;
}
Block readFromPart()
{
Block res;
......@@ -166,7 +138,7 @@ private:
task->mark_ranges.pop_back();
}
progressImpl({ res.rowsInFirstColumn(), res.bytes() });
pre_reader->fillMissingColumns(res, task->ordered_names);
pre_reader->fillMissingColumns(res, task->ordered_names, task->should_reorder);
/// Вычислим выражение в PREWHERE.
prewhere_actions->execute(res);
......@@ -296,12 +268,41 @@ private:
progressImpl({ res.rowsInFirstColumn(), res.bytes() });
reader->fillMissingColumns(res, task->ordered_names);
reader->fillMissingColumns(res, task->ordered_names, task->should_reorder);
}
return res;
}
void injectVirtualColumns(Block & block)
{
const auto rows = block.rowsInFirstColumn();
/// add virtual columns
if (!virt_column_names.empty())
{
for (const auto & virt_column_name : virt_column_names)
{
if (virt_column_name == "_part")
{
block.insert(ColumnWithNameAndType{
ColumnConst<String>{rows, task->data_part->name}.convertToFullColumn(),
new DataTypeString,
virt_column_name
});
}
else if (virt_column_name == "_part_index")
{
block.insert(ColumnWithNameAndType{
ColumnConst<UInt64>{rows, task->part_index_in_query}.convertToFullColumn(),
new DataTypeUInt64,
virt_column_name
});
}
}
}
}
MergeTreeReadPoolPtr pool;
const std::size_t min_marks_to_read;
const std::size_t block_size;
......@@ -317,7 +318,7 @@ private:
using MergeTreeReaderPtr = std::unique_ptr<MergeTreeReader>;
bool initialized;
bool initialized{false};
UncompressedCachePtr owned_uncompressed_cache;
MarkCachePtr owned_mark_cache;
......
......@@ -366,7 +366,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
use_uncompressed_cache = false;
MergeTreeReadPoolPtr pool = std::make_shared<MergeTreeReadPool>(
parts, sum_marks_in_parts, sum_marks, data, prewhere_actions, prewhere_column, true, column_names);
parts, data, prewhere_actions, prewhere_column, true, column_names);
BlockInputStreams res;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册