提交 e0798edc 编写于 作者: A Alexey Milovidov

Better UNION ALL: development #1947

上级 cfe900c9
......@@ -210,6 +210,12 @@ ExpressionAnalyzer::ExpressionAnalyzer(
/// Common subexpression elimination. Rewrite rules.
normalizeTree();
/// Remove unneeded columns according to 'required_source_columns'.
/// Leave all selected columns in case of DISTINCT; columns that contain arrayJoin function inside.
/// Must be after 'normalizeTree' (after expanding aliases, for aliases not get lost)
/// and before 'executeScalarSubqueries', 'analyzeAggregation', etc. to avoid excessive calculations.
removeUnneededColumnsFromSelectClause();
/// Executing scalar subqueries - replacing them with constant values.
executeScalarSubqueries();
......@@ -228,9 +234,6 @@ ExpressionAnalyzer::ExpressionAnalyzer(
/// array_join_alias_to_name, array_join_result_to_source.
getArrayJoinedColumns();
/// All selected columns in case of DISTINCT; columns that contain arrayJoin function inside.
calculateRequiredColumnsBeforeProjection();
/// Delete the unnecessary from `source_columns` list. Create `unknown_required_source_columns`. Form `columns_added_by_join`.
collectUsedColumns();
......@@ -2498,8 +2501,7 @@ void ExpressionAnalyzer::appendSelect(ExpressionActionsChain & chain, bool only_
getRootActions(select_query->select_expression_list, only_types, false, step.actions);
for (const auto & child : select_query->select_expression_list->children)
if (required_columns_before_projection.count(child->getColumnName()))
step.required_output.push_back(child->getColumnName());
step.required_output.push_back(child->getColumnName());
}
bool ExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, bool only_types)
......@@ -2667,7 +2669,7 @@ void ExpressionAnalyzer::collectUsedColumns()
collectJoinedColumns(available_joined_columns, columns_added_by_join);
NameSet required_joined_columns;
getRequiredSourceColumnsInSelectImpl(available_columns, required, ignored, available_joined_columns, required_joined_columns);
getRequiredSourceColumnsImpl(ast, available_columns, required, ignored, available_joined_columns, required_joined_columns);
for (NamesAndTypesList::iterator it = columns_added_by_join.begin(); it != columns_added_by_join.end();)
{
......@@ -2786,29 +2788,6 @@ Names ExpressionAnalyzer::getRequiredSourceColumns() const
}
void ExpressionAnalyzer::getRequiredSourceColumnsInSelectImpl(
const NameSet & available_columns, NameSet & required_source_columns, NameSet & ignored_names,
const NameSet & available_joined_columns, NameSet & required_joined_columns)
{
if (!select_query)
{
getRequiredSourceColumnsImpl(ast, available_columns, required_source_columns,
ignored_names, available_joined_columns, required_joined_columns);
return;
}
for (const auto & child : select_query->select_expression_list->children)
if (required_columns_before_projection.count(child->getColumnName()))
getRequiredSourceColumnsImpl(child, available_columns, required_source_columns,
ignored_names, available_joined_columns, required_joined_columns);
for (const auto & child : select_query->children)
if (child != select_query->select_expression_list)
getRequiredSourceColumnsImpl(child, available_columns, required_source_columns,
ignored_names, available_joined_columns, required_joined_columns);
}
void ExpressionAnalyzer::getRequiredSourceColumnsImpl(const ASTPtr & ast,
const NameSet & available_columns, NameSet & required_source_columns, NameSet & ignored_names,
const NameSet & available_joined_columns, NameSet & required_joined_columns)
......@@ -2910,17 +2889,20 @@ static bool hasArrayJoin(const ASTPtr & ast)
}
void ExpressionAnalyzer::calculateRequiredColumnsBeforeProjection()
void ExpressionAnalyzer::removeUnneededColumnsFromSelectClause()
{
if (!select_query)
return;
for (const auto & child : select_query->select_expression_list->children)
if (required_result_columns.empty()
|| select_query->distinct
|| hasArrayJoin(child)
|| required_result_columns.count(child->getAliasOrColumnName()))
required_columns_before_projection.insert(child->getColumnName());
if (required_result_columns.empty() || select_query->distinct)
return;
ASTs & elements = select_query->select_expression_list->children;
elements.erase(std::remove_if(elements.begin(), elements.end(), [this](const auto & node)
{
return !required_result_columns.count(node->getAliasOrColumnName()) && !hasArrayJoin(node);
}), elements.end());
}
}
......@@ -296,12 +296,6 @@ private:
const NameSet & available_columns, NameSet & required_source_columns, NameSet & ignored_names,
const NameSet & available_joined_columns, NameSet & required_joined_columns);
/** Same as above but skip unnecessary elements in SELECT according to 'required_result_columns'.
*/
void getRequiredSourceColumnsInSelectImpl(
const NameSet & available_columns, NameSet & required_source_columns, NameSet & ignored_names,
const NameSet & available_joined_columns, NameSet & required_joined_columns);
/// columns - the columns that are present before the transformations begin.
void initChain(ExpressionActionsChain & chain, const NamesAndTypesList & columns) const;
......@@ -331,8 +325,7 @@ private:
/** Sometimes we have to calculate more columns in SELECT clause than will be returned from query.
* This is the case when we have DISTINCT or arrayJoin: we require more columns in SELECT even if we need less columns in result.
*/
NameSet required_columns_before_projection;
void calculateRequiredColumnsBeforeProjection();
void removeUnneededColumnsFromSelectClause();
};
}
......@@ -327,7 +327,8 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
/** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */
QueryProcessingStage::Enum from_stage = executeFetchColumns(pipeline, dry_run);
LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage));
if (!dry_run)
LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage));
AnalysisResult expressions = analyzeExpressions(from_stage);
......@@ -451,27 +452,24 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
}
if (need_merge_streams)
{
executeUnion(pipeline);
/** If there was more than one stream,
* then DISTINCT needs to be performed once again after merging all streams.
*/
if (need_second_distinct_pass)
executeDistinct(pipeline, false, Names());
/** If there was more than one stream,
* then DISTINCT needs to be performed once again after merging all streams.
*/
if (need_second_distinct_pass)
executeDistinct(pipeline, false, Names());
/** We must do projection after DISTINCT because projection may remove some columns.
*/
executeProjection(pipeline, expressions.final_projection);
executeExtremes(pipeline);
executeLimitBy(pipeline);
executeLimit(pipeline);
}
else
{
executeProjection(pipeline, expressions.final_projection);
executeExtremes(pipeline);
}
/** We must do projection after DISTINCT because projection may remove some columns.
*/
executeLimitBy(pipeline);
executeProjection(pipeline, expressions.final_projection);
/** Extremes are calculated before LIMIT, but after LIMIT BY. This is Ok.
*/
executeExtremes(pipeline);
executeLimit(pipeline);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册