提交 e48f7fae 编写于 作者: N Nikolai Kochetov

Fix MergeTreeRangeReader.

Fix MergeTreeReader.

Fix MergeTreeBaseSelectProcessor.

Better exception message for TreeExecutor.

Added header_without_virtual_columns to MergeTreeBaseSelectProcessor.

Fix MergeTreeReverseSelectProcessor.

Fix MergeTreeDataSelectExecutor.
上级 54d32da5
......@@ -83,11 +83,24 @@ void TreeExecutor::execute()
std::stack<IProcessor *> stack;
stack.push(root);
auto prepare_processor = [](IProcessor * processor)
{
try
{
return processor->prepare();
}
catch (Exception & exception)
{
exception.addMessage(" While executing processor " + processor->getName());
throw;
}
};
while (!stack.empty())
{
IProcessor * node = stack.top();
auto status = node->prepare();
auto status = prepare_processor(node);
switch (status)
{
......
......@@ -44,6 +44,11 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
save_marks_in_cache(save_marks_in_cache_),
virt_column_names(virt_column_names_)
{
header_without_virtual_columns = getPort().getHeader();
for (auto it = virt_column_names.rbegin(); it != virt_column_names.rend(); ++it)
if (header_without_virtual_columns.has(*it))
header_without_virtual_columns.erase(*it);
}
......@@ -161,7 +166,7 @@ Chunk MergeTreeBaseSelectProcessor::readFromPartImpl()
if (read_result.num_rows == 0)
read_result.columns.clear();
auto & sample_block = getPort().getHeader();
auto & sample_block = task->range_reader.getSampleBlock();
if (read_result.num_rows != 0 && sample_block.columns() != read_result.columns.size())
throw Exception("Inconsistent number of columns got from MergeTreeRangeReader. "
"Have " + toString(sample_block.columns()) + " in sample block "
......@@ -184,15 +189,13 @@ Chunk MergeTreeBaseSelectProcessor::readFromPartImpl()
if (read_result.num_rows == 0)
return {};
auto & header = getPort().getHeader();
Columns ordered_columns;
size_t num_virtual_columns = virt_column_names.size();
ordered_columns.reserve(header.columns() - num_virtual_columns);
ordered_columns.reserve(header_without_virtual_columns.columns());
/// Reorder columns. TODO: maybe skip for default case.
for (size_t ps = 0; ps + num_virtual_columns < header.columns(); ++ps)
for (size_t ps = 0; ps < header_without_virtual_columns.columns(); ++ps)
{
auto pos_in_sample_block = sample_block.getPositionByName(header.getByPosition(ps).name);
auto pos_in_sample_block = sample_block.getPositionByName(header_without_virtual_columns.getByPosition(ps).name);
ordered_columns.emplace_back(std::move(read_result.columns[pos_in_sample_block]));
}
......
......@@ -70,6 +70,7 @@ protected:
bool save_marks_in_cache;
Names virt_column_names;
Block header_without_virtual_columns;
std::unique_ptr<MergeTreeReadTask> task;
......
......@@ -658,6 +658,7 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
auto & output = pipe.back()->getOutputs().front();
pipe.emplace_back(std::make_shared<ExpressionTransform>(
output.getHeader(), query_info.prewhere_info->remove_columns_actions));
connect(output, pipe.back()->getInputs().front());
}
}
......
......@@ -530,15 +530,13 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
merge_tree_reader->fillMissingColumns(columns, should_evaluate_missing_defaults, num_rows);
}
if (!columns.empty() && should_evaluate_missing_defaults)
merge_tree_reader->evaluateMissingDefaults(
prev_reader->getSampleBlock().cloneWithColumns(read_result.columns), columns);
read_result.columns.reserve(read_result.columns.size() + columns.size());
for (auto & column : columns)
read_result.columns.emplace_back(std::move(column));
if (!read_result.columns.empty())
{
if (should_evaluate_missing_defaults)
merge_tree_reader->evaluateMissingDefaults(read_result.columns);
}
}
else
{
......@@ -552,7 +550,7 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
read_result.num_rows);
if (should_evaluate_missing_defaults)
merge_tree_reader->evaluateMissingDefaults(read_result.columns);
merge_tree_reader->evaluateMissingDefaults({}, read_result.columns);
}
else
read_result.columns.clear();
......@@ -691,8 +689,18 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
{
/// Restore block from columns list.
Block block;
auto name_and_type = header.begin();
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
size_t pos = 0;
if (prev_reader)
{
for (auto & col : prev_reader->getSampleBlock())
{
block.insert({result.columns[pos], col.type, col.name});
++pos;
}
}
for (auto name_and_type = header.begin(); pos < num_columns; ++pos, ++name_and_type)
block.insert({result.columns[pos], name_and_type->type, name_and_type->name});
if (alias_actions)
......@@ -703,7 +711,7 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
prewhere_column_pos = block.getPositionByName(*prewhere_column_name);
result.columns.clear();
result.columns.resize(block.columns());
result.columns.reserve(block.columns());
for (auto & col : block)
result.columns.emplace_back(std::move(col.column));
......@@ -761,10 +769,21 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
{
filterColumns(result.columns, *filter_description.data);
if (result.columns.empty())
/// Get num rows after filtration.
bool has_column = false;
for (auto & column : result.columns)
{
if (column)
{
has_column = true;
result.num_rows = column->size();
break;
}
}
if (!has_column)
result.num_rows = getNumBytesInFilter();
else
result.num_rows = result.columns[0]->size();
}
}
......
......@@ -324,7 +324,7 @@ void MergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_ev
if (res_columns[i] && arrayHasNoElementsRead(*res_columns[i]))
res_columns[i] = nullptr;
if (res_columns[i])
if (res_columns[i] == nullptr)
{
if (storage.getColumns().hasDefault(name))
{
......@@ -362,7 +362,7 @@ void MergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_ev
}
}
void MergeTreeReader::evaluateMissingDefaults(Columns & res_columns)
void MergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns & res_columns)
{
try
{
......@@ -375,22 +375,21 @@ void MergeTreeReader::evaluateMissingDefaults(Columns & res_columns)
/// Convert columns list to block.
/// TODO: rewrite with columns interface. It wll be possible after changes in ExpressionActions.
Block block;
auto name_and_type = columns.begin();
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
{
if (res_columns[pos] == nullptr)
continue;
block.insert({res_columns[pos], name_and_type->type, name_and_type->name});
additional_columns.insert({res_columns[pos], name_and_type->type, name_and_type->name});
}
DB::evaluateMissingDefaults(block, columns, storage.getColumns().getDefaults(), storage.global_context);
DB::evaluateMissingDefaults(additional_columns, columns, storage.getColumns().getDefaults(), storage.global_context);
/// Move columns from block.
name_and_type = columns.begin();
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
res_columns[pos] = std::move(block.getByName(name_and_type->name).column);
res_columns[pos] = std::move(additional_columns.getByName(name_and_type->name).column);
}
catch (Exception & e)
{
......
......@@ -42,7 +42,7 @@ public:
/// num_rows is needed in case if all res_columns are nullptr.
void fillMissingColumns(Columns & res_columns, bool & should_evaluate_missing_defaults, size_t num_rows);
/// Evaluate defaulted columns if necessary.
void evaluateMissingDefaults(Columns & res_columns);
void evaluateMissingDefaults(Block additional_columns, Columns & res_columns);
const NamesAndTypesList & getColumns() const { return columns; }
......
......@@ -51,7 +51,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
bool quiet)
:
MergeTreeBaseSelectProcessor{
replaceTypes(storage_.getSampleBlockForColumns(required_columns), owned_data_part_),
replaceTypes(storage_.getSampleBlockForColumns(required_columns_), owned_data_part_),
storage_, prewhere_info_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, min_bytes_to_use_direct_io_,
max_read_buffer_size_, use_uncompressed_cache_, save_marks_in_cache_, virt_column_names_},
......@@ -79,9 +79,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
/// TODO
/// addTotalRowsApprox(total_rows);
ordered_names = getPort().getHeader().getNames();
/// Remove virtual columns.
ordered_names.resize(ordered_names.size() - virt_column_names.size());
ordered_names = header_without_virtual_columns.getNames();
task_columns = getReadTaskColumns(storage, data_part, required_columns, prewhere_info, check_columns);
......
......@@ -51,7 +51,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
bool quiet)
:
MergeTreeBaseSelectProcessor{
replaceTypes(storage_.getSampleBlockForColumns(required_columns), owned_data_part_),
replaceTypes(storage_.getSampleBlockForColumns(required_columns_), owned_data_part_),
storage_, prewhere_info_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, min_bytes_to_use_direct_io_,
max_read_buffer_size_, use_uncompressed_cache_, save_marks_in_cache_, virt_column_names_},
......@@ -80,9 +80,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
/// TODO
/// addTotalRowsApprox(total_rows);
ordered_names = getPort().getHeader().getNames();
/// Remove virtual columns.
ordered_names.resize(ordered_names.size() - virt_column_names.size());
ordered_names = header_without_virtual_columns.getNames();
}
......
......@@ -105,7 +105,7 @@ try
reader->fillMissingColumns(columns, should_evaluate_missing_defaults, rows_readed);
if (should_evaluate_missing_defaults)
reader->evaluateMissingDefaults(columns);
reader->evaluateMissingDefaults({}, columns);
/// Reorder columns and fill result block.
size_t num_columns = sample.size();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册