提交 b1fa3fab 编写于 作者: N Nikolai Kochetov

Fix prewhere whith final. #2827

上级 a6d00948
......@@ -2450,16 +2450,23 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
return true;
}
bool ExpressionAnalyzer::appendPrewhere(ExpressionActionsChain & chain, bool only_types, const ASTPtr & sampling_expression)
bool ExpressionAnalyzer::appendPrewhere(ExpressionActionsChain & chain, bool only_types,
const ASTPtr & sampling_expression, const ASTPtr & primary_expression)
{
assertSelect();
if (!select_query->prewhere_expression)
return false;
Names required_sample_columns;
Names additional_required_mergetree_columns;
if (sampling_expression)
required_sample_columns = ExpressionAnalyzer(sampling_expression, context, storage).getRequiredSourceColumns();
additional_required_mergetree_columns = ExpressionAnalyzer(sampling_expression, context, storage).getRequiredSourceColumns();
if (primary_expression)
{
auto required_primary_columns = ExpressionAnalyzer(primary_expression, context, storage).getRequiredSourceColumns();
additional_required_mergetree_columns.insert(additional_required_mergetree_columns.end(),
required_primary_columns.begin(), required_primary_columns.end());
}
initChain(chain, source_columns);
auto & step = chain.getLastStep();
......@@ -2476,10 +2483,9 @@ bool ExpressionAnalyzer::appendPrewhere(ExpressionActionsChain & chain, bool onl
auto required_columns = tmp_actions->getRequiredColumns();
NameSet required_source_columns(required_columns.begin(), required_columns.end());
/// Add required columns for sample expression to required output in order not to remove them after
/// prewhere execution because sampling is executed after prewhere.
/// TODO: add sampling execution to common chain.
for (const auto & column : required_sample_columns)
/// Add required columns to required output in order not to remove them after prewhere execution.
/// TODO: add sampling and final execution to common chain.
for (const auto & column : additional_required_mergetree_columns)
{
if (required_source_columns.count(column))
{
......
......@@ -142,8 +142,9 @@ public:
bool appendArrayJoin(ExpressionActionsChain & chain, bool only_types);
bool appendJoin(ExpressionActionsChain & chain, bool only_types);
/// remove_filter is set in ExpressionActionsChain::finalize();
/// sampling_expression is needed if sampling is used in order to not remove columns are used in it.
bool appendPrewhere(ExpressionActionsChain & chain, bool only_types, const ASTPtr & sampling_expression);
/// sampling_expression and primary_expression are needed in order to not remove columns are used in it.
bool appendPrewhere(ExpressionActionsChain & chain, bool only_types,
const ASTPtr & sampling_expression, const ASTPtr & primary_expression);
bool appendWhere(ExpressionActionsChain & chain, bool only_types);
bool appendGroupBy(ExpressionActionsChain & chain, bool only_types);
void appendAggregateFunctionsArguments(ExpressionActionsChain & chain, bool only_types);
......
......@@ -308,21 +308,21 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression
const ExpressionActionsChain::Step & step = chain.steps.at(0);
res.prewhere_info->remove_prewhere_column = step.can_remove_required_output.at(0);
Names columns_to_remove_after_sampling;
Names columns_to_remove;
for (size_t i = 1; i < step.required_output.size(); ++i)
{
if (step.can_remove_required_output[i])
columns_to_remove_after_sampling.push_back(step.required_output[i]);
columns_to_remove.push_back(step.required_output[i]);
}
if (!columns_to_remove_after_sampling.empty())
if (!columns_to_remove.empty())
{
auto columns = res.prewhere_info->prewhere_actions->getSampleBlock().getNamesAndTypesList();
ExpressionActionsPtr actions = std::make_shared<ExpressionActions>(columns, context);
for (const auto & column : columns_to_remove_after_sampling)
for (const auto & column : columns_to_remove)
actions->add(ExpressionAction::removeColumn(column));
res.prewhere_info->after_sampling_actions = std::move(actions);
res.prewhere_info->remove_columns_actions = std::move(actions);
}
}
if (has_where)
......@@ -336,8 +336,9 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression
{
ExpressionActionsChain chain(context);
ASTPtr sampling_expression = storage && query.sample_size() ? storage->getSamplingExpression() : nullptr;
if (query_analyzer->appendPrewhere(chain, !res.first_stage, sampling_expression))
ASTPtr sampling_expression = (storage && query.sample_size()) ? storage->getSamplingExpression() : nullptr;
ASTPtr primary_expression = (storage && query.final()) ? storage->getPrimaryExpression() : nullptr;
if (query_analyzer->appendPrewhere(chain, !res.first_stage, sampling_expression, primary_expression))
{
has_prewhere = true;
......
......@@ -346,6 +346,9 @@ public:
/// Returns sampling expression for storage or nullptr if there is no.
virtual ASTPtr getSamplingExpression() const { return nullptr; }
/// Returns primary expression for storage or nullptr if there is no.
virtual ASTPtr getPrimaryExpression() const { return nullptr; }
using ITableDeclaration::ITableDeclaration;
using std::enable_shared_from_this<IStorage>::shared_from_this;
......
......@@ -597,9 +597,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
stream = std::make_shared<AddingConstColumnBlockInputStream<Float64>>(
stream, std::make_shared<DataTypeFloat64>(), used_sample_factor, "_sample_factor");
if (query_info.prewhere_info && query_info.prewhere_info->after_sampling_actions)
if (query_info.prewhere_info && query_info.prewhere_info->remove_columns_actions)
for (auto & stream : res)
stream = std::make_shared<ExpressionBlockInputStream>(stream, query_info.prewhere_info->after_sampling_actions);
stream = std::make_shared<ExpressionBlockInputStream>(stream, query_info.prewhere_info->remove_columns_actions);
return res;
}
......
......@@ -430,10 +430,20 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
}
else
{
size_t num_rows = read_result.block.rows();
if (!read_result.block)
{
if (auto * filter = read_result.getFilter())
num_rows = countBytesInFilter(filter->getData()); /// All columns were removed and filter is not always true.
else if (read_result.totalRowsPerGranule())
num_rows = read_result.numReadRows(); /// All columns were removed and filter is always true.
/// else filter is always false.
}
/// If block is empty, we still may need to add missing columns.
/// In that case use number of rows in result block and don't filter block.
merge_tree_reader->fillMissingColumns(block, should_reorder, should_evaluate_missing_defaults,
read_result.numReadRows());
if (num_rows)
merge_tree_reader->fillMissingColumns(block, should_reorder, should_evaluate_missing_defaults, num_rows);
}
for (auto i : ext::range(0, block.columns()))
......
......@@ -25,8 +25,8 @@ struct PrewhereInfo
ExpressionActionsPtr alias_actions;
/// Actions which are executed on block in order to get filter column for prewhere step.
ExpressionActionsPtr prewhere_actions;
/// Actions which are executed after sampling in order to remove unused columns.
ExpressionActionsPtr after_sampling_actions;
/// Actions which are executed after reading from storage in order to remove unused columns.
ExpressionActionsPtr remove_columns_actions;
String prewhere_column_name;
bool remove_prewhere_column = false;
......
......@@ -96,6 +96,8 @@ public:
ASTPtr getSamplingExpression() const override { return data.sampling_expression; }
ASTPtr getPrimaryExpression() const override { return data.primary_expr_ast; }
private:
String path;
String database_name;
......
......@@ -195,6 +195,8 @@ public:
ASTPtr getSamplingExpression() const override { return data.sampling_expression; }
ASTPtr getPrimaryExpression() const override { return data.primary_expr_ast; }
private:
/// Delete old parts from disk and from ZooKeeper.
void clearOldPartsAndRemoveFromZK();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册