未验证 提交 bd6d9a42 编写于 作者: A Artem Zuikov 提交者: GitHub

Some InterpreterSelectQuery refactoring (#9035)

move ExpressionActionsChain logic out of InterpreterSelectQuery
上级 17de1b75
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/CheckConstraintsBlockOutputStream.h>
#include <Parsers/formatAST.h>
#include <Interpreters/ExpressionActions.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnsNumber.h>
#include <Common/assert_cast.h>
......
......@@ -35,7 +35,6 @@
#include <Columns/ColumnsCommon.h>
#include <Common/FieldVisitors.h>
#include <Common/assert_cast.h>
#include <Interpreters/ExpressionActions.h>
#include <Functions/IFunctionAdaptors.h>
#include <Functions/FunctionsMiscellaneous.h>
#include <Functions/FunctionHelpers.h>
......
......@@ -2,7 +2,6 @@
#include <Parsers/IAST.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/SubqueryForSet.h>
#include <Interpreters/InDepthNodeVisitor.h>
......@@ -13,6 +12,9 @@ namespace DB
class Context;
class ASTFunction;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
/// The case of an explicit enumeration of values.
SetPtr makeExplicitSet(
const ASTFunction * node, const Block & sample_block, bool create_ordered_set,
......
......@@ -70,9 +70,51 @@ using LogAST = DebugASTLog<false>; /// set to true to enable logs
namespace ErrorCodes
{
extern const int UNKNOWN_IDENTIFIER;
extern const int ILLEGAL_PREWHERE;
extern const int LOGICAL_ERROR;
}
namespace
{
/// Check if there is an ignore function. It's used for disabling constant folding in query
/// predicates because some performance tests use ignore function as a non-optimize guard.
bool allowEarlyConstantFolding(const ExpressionActions & actions, const Context & context)
{
if (!context.getSettingsRef().enable_early_constant_folding)
return false;
for (auto & action : actions.getActions())
{
if (action.type == action.APPLY_FUNCTION && action.function_base)
{
auto name = action.function_base->getName();
if (name == "ignore")
return false;
}
}
return true;
}
}
bool sanitizeBlock(Block & block)
{
for (auto & col : block)
{
if (!col.column)
{
if (isNotCreatable(col.type->getTypeId()))
return false;
col.column = col.type->createColumn();
}
else if (isColumnConst(*col.column) && !col.column->empty())
col.column = col.column->cloneEmpty();
}
return true;
}
ExpressionAnalyzer::ExpressionAnalyzer(
const ASTPtr & query_,
const SyntaxAnalyzerResultPtr & syntax_analyzer_result_,
......@@ -884,6 +926,13 @@ ExpressionActionsPtr ExpressionAnalyzer::getConstActions()
return actions;
}
ExpressionActionsPtr SelectQueryExpressionAnalyzer::simpleSelectActions()
{
ExpressionActionsChain new_chain(context);
appendSelect(new_chain, false);
return new_chain.getLastActions();
}
void SelectQueryExpressionAnalyzer::getAggregateInfo(Names & key_names, AggregateDescriptions & aggregates) const
{
for (const auto & name_and_type : aggregation_keys)
......@@ -892,4 +941,231 @@ void SelectQueryExpressionAnalyzer::getAggregateInfo(Names & key_names, Aggregat
aggregates = aggregate_descriptions;
}
ExpressionAnalysisResult::ExpressionAnalysisResult(const ASTSelectQuery & query,
SelectQueryExpressionAnalyzer & query_analyzer,
bool first_stage_,
bool second_stage_,
const Context & context,
const StoragePtr & storage,
bool only_types,
const FilterInfoPtr & filter_info_,
const Block & source_header)
: first_stage(first_stage_)
, second_stage(second_stage_)
{
/// first_stage: Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
/// second_stage: Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing.
/** First we compose a chain of actions and remember the necessary steps from it.
* Regardless of from_stage and to_stage, we will compose a complete sequence of actions to perform optimization and
* throw out unnecessary columns based on the entire query. In unnecessary parts of the query, we will not execute subqueries.
*/
bool finalized = false;
size_t where_step_num = 0;
auto finalizeChain = [&](ExpressionActionsChain & chain)
{
if (!finalized)
{
chain.finalize();
finalize(chain, context, where_step_num);
chain.clear();
}
finalized = true;
};
{
ExpressionActionsChain chain(context);
Names additional_required_columns_after_prewhere;
if (storage && (query.sample_size() || context.getSettingsRef().parallel_replicas_count > 1))
{
Names columns_for_sampling = storage->getColumnsRequiredForSampling();
additional_required_columns_after_prewhere.insert(additional_required_columns_after_prewhere.end(),
columns_for_sampling.begin(), columns_for_sampling.end());
}
if (storage && query.final())
{
Names columns_for_final = storage->getColumnsRequiredForFinal();
additional_required_columns_after_prewhere.insert(additional_required_columns_after_prewhere.end(),
columns_for_final.begin(), columns_for_final.end());
}
if (storage && filter_info_)
{
filter_info = filter_info_;
query_analyzer.appendPreliminaryFilter(chain, filter_info->actions, filter_info->column_name);
}
if (query_analyzer.appendPrewhere(chain, !first_stage, additional_required_columns_after_prewhere))
{
prewhere_info = std::make_shared<PrewhereInfo>(
chain.steps.front().actions, query.prewhere()->getColumnName());
if (allowEarlyConstantFolding(*prewhere_info->prewhere_actions, context))
{
Block before_prewhere_sample = source_header;
if (sanitizeBlock(before_prewhere_sample))
{
prewhere_info->prewhere_actions->execute(before_prewhere_sample);
auto & column_elem = before_prewhere_sample.getByName(query.prewhere()->getColumnName());
/// If the filter column is a constant, record it.
if (column_elem.column)
prewhere_constant_filter_description = ConstantFilterDescription(*column_elem.column);
}
}
chain.addStep();
}
need_aggregate = query_analyzer.hasAggregation();
query_analyzer.appendArrayJoin(chain, only_types || !first_stage);
if (query_analyzer.appendJoin(chain, only_types || !first_stage))
{
before_join = chain.getLastActions();
if (!hasJoin())
throw Exception("No expected JOIN", ErrorCodes::LOGICAL_ERROR);
chain.addStep();
}
if (query_analyzer.appendWhere(chain, only_types || !first_stage))
{
where_step_num = chain.steps.size() - 1;
before_where = chain.getLastActions();
if (allowEarlyConstantFolding(*before_where, context))
{
Block before_where_sample;
if (chain.steps.size() > 1)
before_where_sample = chain.steps[chain.steps.size() - 2].actions->getSampleBlock();
else
before_where_sample = source_header;
if (sanitizeBlock(before_where_sample))
{
before_where->execute(before_where_sample);
auto & column_elem = before_where_sample.getByName(query.where()->getColumnName());
/// If the filter column is a constant, record it.
if (column_elem.column)
where_constant_filter_description = ConstantFilterDescription(*column_elem.column);
}
}
chain.addStep();
}
if (need_aggregate)
{
query_analyzer.appendGroupBy(chain, only_types || !first_stage);
query_analyzer.appendAggregateFunctionsArguments(chain, only_types || !first_stage);
before_aggregation = chain.getLastActions();
finalizeChain(chain);
if (query_analyzer.appendHaving(chain, only_types || !second_stage))
{
before_having = chain.getLastActions();
chain.addStep();
}
}
bool has_stream_with_non_joned_rows = (before_join && before_join->getTableJoinAlgo()->hasStreamWithNonJoinedRows());
optimize_read_in_order =
context.getSettingsRef().optimize_read_in_order
&& storage && query.orderBy()
&& !query_analyzer.hasAggregation()
&& !query.final()
&& !has_stream_with_non_joned_rows;
/// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
query_analyzer.appendSelect(chain, only_types || (need_aggregate ? !second_stage : !first_stage));
selected_columns = chain.getLastStep().required_output;
has_order_by = query_analyzer.appendOrderBy(chain, only_types || (need_aggregate ? !second_stage : !first_stage), optimize_read_in_order);
before_order_and_select = chain.getLastActions();
chain.addStep();
if (query_analyzer.appendLimitBy(chain, only_types || !second_stage))
{
before_limit_by = chain.getLastActions();
chain.addStep();
}
query_analyzer.appendProjectResult(chain);
final_projection = chain.getLastActions();
finalizeChain(chain);
}
/// Before executing WHERE and HAVING, remove the extra columns from the block (mostly the aggregation keys).
removeExtraColumns();
subqueries_for_sets = query_analyzer.getSubqueriesForSets();
checkActions();
}
void ExpressionAnalysisResult::finalize(const ExpressionActionsChain & chain, const Context & context_, size_t where_step_num)
{
if (hasPrewhere())
{
const ExpressionActionsChain::Step & step = chain.steps.at(0);
prewhere_info->remove_prewhere_column = step.can_remove_required_output.at(0);
Names columns_to_remove;
for (size_t i = 1; i < step.required_output.size(); ++i)
{
if (step.can_remove_required_output[i])
columns_to_remove.push_back(step.required_output[i]);
}
if (!columns_to_remove.empty())
{
auto columns = prewhere_info->prewhere_actions->getSampleBlock().getNamesAndTypesList();
ExpressionActionsPtr actions = std::make_shared<ExpressionActions>(columns, context_);
for (const auto & column : columns_to_remove)
actions->add(ExpressionAction::removeColumn(column));
prewhere_info->remove_columns_actions = std::move(actions);
}
columns_to_remove_after_prewhere = std::move(columns_to_remove);
}
else if (hasFilter())
{
/// Can't have prewhere and filter set simultaneously
filter_info->do_remove_column = chain.steps.at(0).can_remove_required_output.at(0);
}
if (hasWhere())
remove_where_filter = chain.steps.at(where_step_num).can_remove_required_output.at(0);
}
void ExpressionAnalysisResult::removeExtraColumns()
{
if (hasFilter())
filter_info->actions->prependProjectInput();
if (hasWhere())
before_where->prependProjectInput();
if (hasHaving())
before_having->prependProjectInput();
}
void ExpressionAnalysisResult::checkActions()
{
/// Check that PREWHERE doesn't contain unusual actions. Unusual actions are that can change number of rows.
if (hasPrewhere())
{
auto check_actions = [](const ExpressionActionsPtr & actions)
{
if (actions)
for (const auto & action : actions->getActions())
if (action.type == ExpressionAction::Type::JOIN || action.type == ExpressionAction::Type::ARRAY_JOIN)
throw Exception("PREWHERE cannot contain ARRAY JOIN or JOIN action", ErrorCodes::ILLEGAL_PREWHERE);
};
check_actions(prewhere_info->prewhere_actions);
check_actions(prewhere_info->alias_actions);
check_actions(prewhere_info->remove_columns_actions);
}
}
}
......@@ -2,11 +2,13 @@
#include <Core/Settings.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Columns/FilterDescription.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/SyntaxAnalyzer.h>
#include <Interpreters/SubqueryForSet.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/SelectQueryInfo.h>
namespace DB
......@@ -29,6 +31,9 @@ class ASTExpressionList;
class ASTSelectQuery;
struct ASTTablesInSelectQueryElement;
/// Create columns in block or return false if not possible
bool sanitizeBlock(Block & block);
/// ExpressionAnalyzer sources, intermediates and results. It splits data and logic, allows to test them separately.
struct ExpressionAnalyzerData
{
......@@ -156,10 +161,73 @@ protected:
bool isRemoteStorage() const;
};
class SelectQueryExpressionAnalyzer;
/// Result of SelectQueryExpressionAnalyzer: expressions for InterpreterSelectQuery
struct ExpressionAnalysisResult
{
bool need_aggregate = false;
bool has_order_by = false;
bool remove_where_filter = false;
bool optimize_read_in_order = false;
ExpressionActionsPtr before_join; /// including JOIN
ExpressionActionsPtr before_where;
ExpressionActionsPtr before_aggregation;
ExpressionActionsPtr before_having;
ExpressionActionsPtr before_order_and_select;
ExpressionActionsPtr before_limit_by;
ExpressionActionsPtr final_projection;
/// Columns from the SELECT list, before renaming them to aliases.
Names selected_columns;
/// Columns will be removed after prewhere actions execution.
Names columns_to_remove_after_prewhere;
/// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
bool first_stage = false;
/// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing.
bool second_stage = false;
SubqueriesForSets subqueries_for_sets;
PrewhereInfoPtr prewhere_info;
FilterInfoPtr filter_info;
ConstantFilterDescription prewhere_constant_filter_description;
ConstantFilterDescription where_constant_filter_description;
ExpressionAnalysisResult() = default;
ExpressionAnalysisResult(
const ASTSelectQuery & query,
SelectQueryExpressionAnalyzer & query_analyzer,
bool first_stage,
bool second_stage,
const Context & context,
const StoragePtr & storage,
bool only_types,
const FilterInfoPtr & filter_info,
const Block & source_header);
bool hasFilter() const { return filter_info.get(); }
bool hasJoin() const { return before_join.get(); }
bool hasPrewhere() const { return prewhere_info.get(); }
bool hasWhere() const { return before_where.get(); }
bool hasHaving() const { return before_having.get(); }
bool hasLimitBy() const { return before_limit_by.get(); }
void removeExtraColumns();
void checkActions();
void finalize(const ExpressionActionsChain & chain, const Context & context, size_t where_step_num);
};
/// SelectQuery specific ExpressionAnalyzer part.
class SelectQueryExpressionAnalyzer : public ExpressionAnalyzer
{
public:
friend struct ExpressionAnalysisResult;
SelectQueryExpressionAnalyzer(
const ASTPtr & query_,
const SyntaxAnalyzerResultPtr & syntax_analyzer_result_,
......@@ -185,37 +253,10 @@ public:
/// Tables that will need to be sent to remote servers for distributed query processing.
const Tables & getExternalTables() const { return external_tables; }
/** These methods allow you to build a chain of transformations over a block, that receives values in the desired sections of the query.
*
* Example usage:
* ExpressionActionsChain chain;
* analyzer.appendWhere(chain);
* chain.addStep();
* analyzer.appendSelect(chain);
* analyzer.appendOrderBy(chain);
* chain.finalize();
*
* If only_types = true set, does not execute subqueries in the relevant parts of the query. The actions got this way
* shouldn't be executed, they are only needed to get a list of columns with their types.
*/
ExpressionActionsPtr simpleSelectActions();
/// Before aggregation:
bool appendArrayJoin(ExpressionActionsChain & chain, bool only_types);
bool appendJoin(ExpressionActionsChain & chain, bool only_types);
/// Add preliminary rows filtration. Actions are created in other expression analyzer to prevent any possible alias injection.
void appendPreliminaryFilter(ExpressionActionsChain & chain, ExpressionActionsPtr actions, String column_name);
/// remove_filter is set in ExpressionActionsChain::finalize();
/// Columns in `additional_required_columns` will not be removed (they can be used for e.g. sampling or FINAL modifier).
bool appendPrewhere(ExpressionActionsChain & chain, bool only_types, const Names & additional_required_columns);
bool appendWhere(ExpressionActionsChain & chain, bool only_types);
bool appendGroupBy(ExpressionActionsChain & chain, bool only_types);
void appendAggregateFunctionsArguments(ExpressionActionsChain & chain, bool only_types);
/// After aggregation:
bool appendHaving(ExpressionActionsChain & chain, bool only_types);
/// These appends are public only for tests
void appendSelect(ExpressionActionsChain & chain, bool only_types);
bool appendOrderBy(ExpressionActionsChain & chain, bool only_types, bool optimize_read_in_order);
bool appendLimitBy(ExpressionActionsChain & chain, bool only_types);
/// Deletes all columns except mentioned by SELECT, arranges the remaining columns and renames them to aliases.
void appendProjectResult(ExpressionActionsChain & chain) const;
......@@ -244,6 +285,39 @@ private:
SubqueryForSet & subquery_for_set) const;
const ASTSelectQuery * getAggregatingQuery() const;
/** These methods allow you to build a chain of transformations over a block, that receives values in the desired sections of the query.
*
* Example usage:
* ExpressionActionsChain chain;
* analyzer.appendWhere(chain);
* chain.addStep();
* analyzer.appendSelect(chain);
* analyzer.appendOrderBy(chain);
* chain.finalize();
*
* If only_types = true set, does not execute subqueries in the relevant parts of the query. The actions got this way
* shouldn't be executed, they are only needed to get a list of columns with their types.
*/
/// Before aggregation:
bool appendArrayJoin(ExpressionActionsChain & chain, bool only_types);
bool appendJoin(ExpressionActionsChain & chain, bool only_types);
/// Add preliminary rows filtration. Actions are created in other expression analyzer to prevent any possible alias injection.
void appendPreliminaryFilter(ExpressionActionsChain & chain, ExpressionActionsPtr actions, String column_name);
/// remove_filter is set in ExpressionActionsChain::finalize();
/// Columns in `additional_required_columns` will not be removed (they can be used for e.g. sampling or FINAL modifier).
bool appendPrewhere(ExpressionActionsChain & chain, bool only_types, const Names & additional_required_columns);
bool appendWhere(ExpressionActionsChain & chain, bool only_types);
bool appendGroupBy(ExpressionActionsChain & chain, bool only_types);
void appendAggregateFunctionsArguments(ExpressionActionsChain & chain, bool only_types);
/// After aggregation:
bool appendHaving(ExpressionActionsChain & chain, bool only_types);
/// appendSelect
bool appendOrderBy(ExpressionActionsChain & chain, bool only_types, bool optimize_read_in_order);
bool appendLimitBy(ExpressionActionsChain & chain, bool only_types);
/// appendProjectResult
};
}
......@@ -6,6 +6,7 @@
#include <Storages/IndicesDescription.h>
#include <Storages/ConstraintsDescription.h>
#include <Common/ThreadPool.h>
#include <Access/AccessRightsElement.h>
namespace DB
......
......@@ -154,9 +154,7 @@ String InterpreterSelectQuery::generateFilterActions(ExpressionActionsPtr & acti
/// Using separate expression analyzer to prevent any possible alias injection
auto syntax_result = SyntaxAnalyzer(*context).analyze(query_ast, storage->getColumns().getAllPhysical());
SelectQueryExpressionAnalyzer analyzer(query_ast, syntax_result, *context);
ExpressionActionsChain new_chain(*context);
analyzer.appendSelect(new_chain, false);
actions = new_chain.getLastActions();
actions = analyzer.simpleSelectActions();
return expr_list->children.at(0)->getColumnName();
}
......@@ -212,22 +210,6 @@ static Context getSubqueryContext(const Context & context)
return subquery_context;
}
static bool sanitizeBlock(Block & block)
{
for (auto & col : block)
{
if (!col.column)
{
if (isNotCreatable(col.type->getTypeId()))
return false;
col.column = col.type->createColumn();
}
else if (isColumnConst(*col.column) && !col.column->empty())
col.column = col.column->cloneEmpty();
}
return true;
}
InterpreterSelectQuery::InterpreterSelectQuery(
const ASTPtr & query_ptr_,
const Context & context_,
......@@ -556,11 +538,18 @@ Block InterpreterSelectQuery::getSampleBlockImpl(bool try_move_to_prewhere)
if (storage && !options.only_analyze)
from_stage = storage->getQueryProcessingStage(*context);
analysis_result = analyzeExpressions(
/// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
bool first_stage = from_stage < QueryProcessingStage::WithMergeableState
&& options.to_stage >= QueryProcessingStage::WithMergeableState;
/// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing.
bool second_stage = from_stage <= QueryProcessingStage::WithMergeableState
&& options.to_stage > QueryProcessingStage::WithMergeableState;
analysis_result = ExpressionAnalysisResult(
getSelectQuery(),
*query_analyzer,
from_stage,
options.to_stage,
first_stage,
second_stage,
*context,
storage,
options.only_analyze,
......@@ -616,253 +605,6 @@ Block InterpreterSelectQuery::getSampleBlockImpl(bool try_move_to_prewhere)
return analysis_result.final_projection->getSampleBlock();
}
/// Check if there is an ignore function. It's used for disabling constant folding in query
/// predicates because some performance tests use ignore function as a non-optimize guard.
static bool allowEarlyConstantFolding(const ExpressionActions & actions, const Context & context)
{
if (!context.getSettingsRef().enable_early_constant_folding)
return false;
for (auto & action : actions.getActions())
{
if (action.type == action.APPLY_FUNCTION && action.function_base)
{
auto name = action.function_base->getName();
if (name == "ignore")
return false;
}
}
return true;
}
InterpreterSelectQuery::AnalysisResult
InterpreterSelectQuery::analyzeExpressions(
const ASTSelectQuery & query,
SelectQueryExpressionAnalyzer & query_analyzer,
QueryProcessingStage::Enum from_stage,
QueryProcessingStage::Enum to_stage,
const Context & context,
const StoragePtr & storage,
bool only_types,
const FilterInfoPtr & filter_info,
const Block & source_header)
{
AnalysisResult res;
/// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
res.first_stage = from_stage < QueryProcessingStage::WithMergeableState
&& to_stage >= QueryProcessingStage::WithMergeableState;
/// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing.
res.second_stage = from_stage <= QueryProcessingStage::WithMergeableState
&& to_stage > QueryProcessingStage::WithMergeableState;
/** First we compose a chain of actions and remember the necessary steps from it.
* Regardless of from_stage and to_stage, we will compose a complete sequence of actions to perform optimization and
* throw out unnecessary columns based on the entire query. In unnecessary parts of the query, we will not execute subqueries.
*/
bool has_filter = false;
bool has_prewhere = false;
bool has_where = false;
size_t where_step_num;
auto finalizeChain = [&](ExpressionActionsChain & chain)
{
chain.finalize();
if (has_prewhere)
{
const ExpressionActionsChain::Step & step = chain.steps.at(0);
res.prewhere_info->remove_prewhere_column = step.can_remove_required_output.at(0);
Names columns_to_remove;
for (size_t i = 1; i < step.required_output.size(); ++i)
{
if (step.can_remove_required_output[i])
columns_to_remove.push_back(step.required_output[i]);
}
if (!columns_to_remove.empty())
{
auto columns = res.prewhere_info->prewhere_actions->getSampleBlock().getNamesAndTypesList();
ExpressionActionsPtr actions = std::make_shared<ExpressionActions>(columns, context);
for (const auto & column : columns_to_remove)
actions->add(ExpressionAction::removeColumn(column));
res.prewhere_info->remove_columns_actions = std::move(actions);
}
res.columns_to_remove_after_prewhere = std::move(columns_to_remove);
}
else if (has_filter)
{
/// Can't have prewhere and filter set simultaneously
res.filter_info->do_remove_column = chain.steps.at(0).can_remove_required_output.at(0);
}
if (has_where)
res.remove_where_filter = chain.steps.at(where_step_num).can_remove_required_output.at(0);
has_filter = has_prewhere = has_where = false;
chain.clear();
};
{
ExpressionActionsChain chain(context);
Names additional_required_columns_after_prewhere;
if (storage && (query.sample_size() || context.getSettingsRef().parallel_replicas_count > 1))
{
Names columns_for_sampling = storage->getColumnsRequiredForSampling();
additional_required_columns_after_prewhere.insert(additional_required_columns_after_prewhere.end(),
columns_for_sampling.begin(), columns_for_sampling.end());
}
if (storage && query.final())
{
Names columns_for_final = storage->getColumnsRequiredForFinal();
additional_required_columns_after_prewhere.insert(additional_required_columns_after_prewhere.end(),
columns_for_final.begin(), columns_for_final.end());
}
if (storage && filter_info)
{
has_filter = true;
res.filter_info = filter_info;
query_analyzer.appendPreliminaryFilter(chain, filter_info->actions, filter_info->column_name);
}
if (query_analyzer.appendPrewhere(chain, !res.first_stage, additional_required_columns_after_prewhere))
{
has_prewhere = true;
res.prewhere_info = std::make_shared<PrewhereInfo>(
chain.steps.front().actions, query.prewhere()->getColumnName());
if (allowEarlyConstantFolding(*res.prewhere_info->prewhere_actions, context))
{
Block before_prewhere_sample = source_header;
if (sanitizeBlock(before_prewhere_sample))
{
res.prewhere_info->prewhere_actions->execute(before_prewhere_sample);
auto & column_elem = before_prewhere_sample.getByName(query.prewhere()->getColumnName());
/// If the filter column is a constant, record it.
if (column_elem.column)
res.prewhere_constant_filter_description = ConstantFilterDescription(*column_elem.column);
}
}
chain.addStep();
}
res.need_aggregate = query_analyzer.hasAggregation();
query_analyzer.appendArrayJoin(chain, only_types || !res.first_stage);
if (query_analyzer.appendJoin(chain, only_types || !res.first_stage))
{
res.before_join = chain.getLastActions();
if (!res.hasJoin())
throw Exception("No expected JOIN", ErrorCodes::LOGICAL_ERROR);
chain.addStep();
}
if (query_analyzer.appendWhere(chain, only_types || !res.first_stage))
{
where_step_num = chain.steps.size() - 1;
has_where = res.has_where = true;
res.before_where = chain.getLastActions();
if (allowEarlyConstantFolding(*res.before_where, context))
{
Block before_where_sample;
if (chain.steps.size() > 1)
before_where_sample = chain.steps[chain.steps.size() - 2].actions->getSampleBlock();
else
before_where_sample = source_header;
if (sanitizeBlock(before_where_sample))
{
res.before_where->execute(before_where_sample);
auto & column_elem = before_where_sample.getByName(query.where()->getColumnName());
/// If the filter column is a constant, record it.
if (column_elem.column)
res.where_constant_filter_description = ConstantFilterDescription(*column_elem.column);
}
}
chain.addStep();
}
if (res.need_aggregate)
{
query_analyzer.appendGroupBy(chain, only_types || !res.first_stage);
query_analyzer.appendAggregateFunctionsArguments(chain, only_types || !res.first_stage);
res.before_aggregation = chain.getLastActions();
finalizeChain(chain);
if (query_analyzer.appendHaving(chain, only_types || !res.second_stage))
{
res.has_having = true;
res.before_having = chain.getLastActions();
chain.addStep();
}
}
bool has_stream_with_non_joned_rows = (res.before_join && res.before_join->getTableJoinAlgo()->hasStreamWithNonJoinedRows());
res.optimize_read_in_order =
context.getSettingsRef().optimize_read_in_order
&& storage && query.orderBy()
&& !query_analyzer.hasAggregation()
&& !query.final()
&& !has_stream_with_non_joned_rows;
/// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
query_analyzer.appendSelect(chain, only_types || (res.need_aggregate ? !res.second_stage : !res.first_stage));
res.selected_columns = chain.getLastStep().required_output;
res.has_order_by = query_analyzer.appendOrderBy(chain, only_types || (res.need_aggregate ? !res.second_stage : !res.first_stage), res.optimize_read_in_order);
res.before_order_and_select = chain.getLastActions();
chain.addStep();
if (query_analyzer.appendLimitBy(chain, only_types || !res.second_stage))
{
res.has_limit_by = true;
res.before_limit_by = chain.getLastActions();
chain.addStep();
}
query_analyzer.appendProjectResult(chain);
res.final_projection = chain.getLastActions();
finalizeChain(chain);
}
/// Before executing WHERE and HAVING, remove the extra columns from the block (mostly the aggregation keys).
if (res.filter_info)
res.filter_info->actions->prependProjectInput();
if (res.has_where)
res.before_where->prependProjectInput();
if (res.has_having)
res.before_having->prependProjectInput();
res.subqueries_for_sets = query_analyzer.getSubqueriesForSets();
/// Check that PREWHERE doesn't contain unusual actions. Unusual actions are that can change number of rows.
if (res.prewhere_info)
{
auto check_actions = [](const ExpressionActionsPtr & actions)
{
if (actions)
for (const auto & action : actions->getActions())
if (action.type == ExpressionAction::Type::JOIN || action.type == ExpressionAction::Type::ARRAY_JOIN)
throw Exception("PREWHERE cannot contain ARRAY JOIN or JOIN action", ErrorCodes::ILLEGAL_PREWHERE);
};
check_actions(res.prewhere_info->prewhere_actions);
check_actions(res.prewhere_info->alias_actions);
check_actions(res.prewhere_info->remove_columns_actions);
}
return res;
}
static Field getWithFillFieldValue(const ASTPtr & node, const Context & context)
{
const auto & [field, type] = evaluateConstantExpression(node, context);
......@@ -1094,7 +836,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (expressions.first_stage)
{
if (expressions.filter_info)
if (expressions.hasFilter())
{
if constexpr (pipeline_with_processors)
{
......@@ -1176,7 +918,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
}
}
if (expressions.has_where)
if (expressions.hasWhere())
executeWhere(pipeline, expressions.before_where, expressions.remove_where_filter);
if (expressions.need_aggregate)
......@@ -1192,7 +934,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
* but there is an ORDER or LIMIT,
* then we will perform the preliminary sorting and LIMIT on the remote server.
*/
if (!expressions.second_stage && !expressions.need_aggregate && !expressions.has_having)
if (!expressions.second_stage && !expressions.need_aggregate && !expressions.hasHaving())
{
if (expressions.has_order_by)
executeOrder(pipeline, query_info.input_sorting_info);
......@@ -1200,7 +942,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (expressions.has_order_by && query.limitLength())
executeDistinct(pipeline, false, expressions.selected_columns);
if (expressions.has_limit_by)
if (expressions.hasLimitBy())
{
executeExpression(pipeline, expressions.before_limit_by);
executeLimitBy(pipeline);
......@@ -1230,7 +972,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (query.group_by_with_totals)
{
bool final = !query.group_by_with_rollup && !query.group_by_with_cube;
executeTotalsAndHaving(pipeline, expressions.has_having, expressions.before_having, aggregate_overflow_row, final);
executeTotalsAndHaving(pipeline, expressions.hasHaving(), expressions.before_having, aggregate_overflow_row, final);
}
if (query.group_by_with_rollup)
......@@ -1238,14 +980,14 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
else if (query.group_by_with_cube)
executeRollupOrCube(pipeline, Modificator::CUBE);
if ((query.group_by_with_rollup || query.group_by_with_cube) && expressions.has_having)
if ((query.group_by_with_rollup || query.group_by_with_cube) && expressions.hasHaving())
{
if (query.group_by_with_totals)
throw Exception("WITH TOTALS and WITH ROLLUP or CUBE are not supported together in presence of HAVING", ErrorCodes::NOT_IMPLEMENTED);
executeHaving(pipeline, expressions.before_having);
}
}
else if (expressions.has_having)
else if (expressions.hasHaving())
executeHaving(pipeline, expressions.before_having);
executeExpression(pipeline, expressions.before_order_and_select);
......@@ -1273,7 +1015,8 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
/** Optimization - if there are several sources and there is LIMIT, then first apply the preliminary LIMIT,
* limiting the number of rows in each up to `offset + limit`.
*/
if (query.limitLength() && !query.limit_with_ties && pipeline.hasMoreThanOneStream() && !query.distinct && !expressions.has_limit_by && !settings.extremes)
if (query.limitLength() && !query.limit_with_ties && pipeline.hasMoreThanOneStream() &&
!query.distinct && !expressions.hasLimitBy() && !settings.extremes)
{
executePreLimit(pipeline);
}
......@@ -1298,7 +1041,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (need_second_distinct_pass)
executeDistinct(pipeline, false, expressions.selected_columns);
if (expressions.has_limit_by)
if (expressions.hasLimitBy())
{
executeExpression(pipeline, expressions.before_limit_by);
executeLimitBy(pipeline);
......
......@@ -152,55 +152,6 @@ private:
template <typename TPipeline>
void executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input, std::optional<Pipe> prepared_pipe, QueryPipeline & save_context_and_storage);
struct AnalysisResult
{
bool hasJoin() const { return before_join.get(); }
bool has_where = false;
bool need_aggregate = false;
bool has_having = false;
bool has_order_by = false;
bool has_limit_by = false;
bool remove_where_filter = false;
bool optimize_read_in_order = false;
ExpressionActionsPtr before_join; /// including JOIN
ExpressionActionsPtr before_where;
ExpressionActionsPtr before_aggregation;
ExpressionActionsPtr before_having;
ExpressionActionsPtr before_order_and_select;
ExpressionActionsPtr before_limit_by;
ExpressionActionsPtr final_projection;
/// Columns from the SELECT list, before renaming them to aliases.
Names selected_columns;
/// Columns will be removed after prewhere actions execution.
Names columns_to_remove_after_prewhere;
/// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
bool first_stage = false;
/// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing.
bool second_stage = false;
SubqueriesForSets subqueries_for_sets;
PrewhereInfoPtr prewhere_info;
FilterInfoPtr filter_info;
ConstantFilterDescription prewhere_constant_filter_description;
ConstantFilterDescription where_constant_filter_description;
};
static AnalysisResult analyzeExpressions(
const ASTSelectQuery & query,
SelectQueryExpressionAnalyzer & query_analyzer,
QueryProcessingStage::Enum from_stage,
QueryProcessingStage::Enum to_stage,
const Context & context,
const StoragePtr & storage,
bool only_types,
const FilterInfoPtr & filter_info,
const Block & source_header);
/** From which table to read. With JOIN, the "left" table is returned.
*/
static void getDatabaseAndTableNames(const ASTSelectQuery & query, String & database_name, String & table_name, const Context & context);
......@@ -284,7 +235,7 @@ private:
SelectQueryInfo query_info;
/// Is calculated in getSampleBlock. Is used later in readImpl.
AnalysisResult analysis_result;
ExpressionAnalysisResult analysis_result;
FilterInfoPtr filter_info;
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
......
......@@ -2,6 +2,7 @@
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/Join.h>
#include <Interpreters/MergeJoin.h>
#include <Interpreters/ExpressionActions.h>
#include <DataStreams/LazyBlockInputStream.h>
namespace DB
......
#pragma once
#include <Core/Block.h>
#include <Storages/IStorage_fwd.h>
#include <Parsers/IAST.h>
#include <Interpreters/IJoin.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/ExpressionActions.h>
namespace DB
{
class InterpreterSelectWithUnionQuery;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
/// Information on what to do when executing a subquery in the [GLOBAL] IN/JOIN section.
struct SubqueryForSet
......
......@@ -8,6 +8,7 @@
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Interpreters/interpretSubquery.h>
#include <Interpreters/DatabaseAndTableWithAlias.h>
......
#pragma once
#include <Core/Block.h>
#include <Interpreters/ExpressionActions.h>
#include <Formats/FormatSettings.h>
#include <Parsers/TokenIterator.h>
......@@ -12,6 +11,9 @@ struct LiteralInfo;
using LiteralsInfo = std::vector<LiteralInfo>;
struct SpecialParserType;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
/// Deduces template of an expression by replacing literals with dummy columns.
/// It allows to parse and evaluate similar expressions without using heavy IParsers and ExpressionAnalyzer.
/// Using ConstantExpressionTemplate for one expression is slower then evaluateConstantExpression(...),
......
#pragma once
#include <Poco/Logger.h>
#include <Processors/IProcessor.h>
#include <Interpreters/SubqueryForSet.h>
#include <Common/Stopwatch.h>
#include <DataStreams/SizeLimits.h>
namespace DB
{
class QueryStatus;
struct Progress;
using ProgressCallback = std::function<void(const Progress & progress)>;
......
#include <Processors/Transforms/ExpressionTransform.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
namespace DB
{
......
#include <Processors/Transforms/InflatingExpressionTransform.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
namespace DB
{
......
......@@ -14,6 +14,7 @@
#include <DataStreams/RemoteBlockOutputStream.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/createBlockSelector.h>
#include <Interpreters/ExpressionActions.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeLowCardinality.h>
......
......@@ -3,6 +3,7 @@
#include <Storages/AlterCommands.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Interpreters/Context.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/quoteString.h>
......
......@@ -52,6 +52,9 @@ using Processors = std::vector<ProcessorPtr>;
class Pipe;
using Pipes = std::vector<Pipe>;
class StoragePolicy;
using StoragePolicyPtr = std::shared_ptr<const StoragePolicy>;
struct ColumnSize
{
size_t marks = 0;
......
......@@ -3,6 +3,7 @@
#include <Core/BackgroundSchedulePool.h>
#include <Storages/IStorage.h>
#include <Storages/Kafka/Buffer_fwd.h>
#include <Interpreters/Context.h>
#include <Poco/Semaphore.h>
#include <ext/shared_ptr_helper.h>
......
......@@ -4,7 +4,6 @@
#include <optional>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/Set.h>
#include <Core/SortDescription.h>
#include <Parsers/ASTExpressionList.h>
......@@ -19,6 +18,9 @@ namespace DB
class IFunction;
using FunctionBasePtr = std::shared_ptr<IFunctionBase>;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
/** Range with open or closed ends; possibly unbounded.
*/
struct Range
......
......@@ -2,7 +2,6 @@
#include <Common/SimpleIncrement.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
......@@ -35,6 +34,9 @@ class MergeListEntry;
class AlterCommands;
class MergeTreePartsMover;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
......
#pragma once
#include <Core/Block.h>
#include <common/logger_useful.h>
#include <Interpreters/ExpressionActions.h>
#include <Storages/MergeTree/MarkRange.h>
namespace DB
......
......@@ -8,7 +8,6 @@
#include <Client/ConnectionPoolWithFailover.h>
#include <Core/Settings.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/ExpressionActions.h>
#include <Parsers/ASTFunction.h>
#include <common/logger_useful.h>
#include <Common/ActionBlocker.h>
......@@ -23,6 +22,8 @@ class StorageDistributedDirectoryMonitor;
class Volume;
using VolumePtr = std::shared_ptr<Volume>;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
/** A distributed table that resides on multiple servers.
* Uses data from the specified database and tables on each server.
......
#include <Storages/StorageJoin.h>
#include <Storages/StorageFactory.h>
#include <Interpreters/Join.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTIdentifier.h>
......
......@@ -4,6 +4,7 @@
#include <Common/OptimizedRegularExpression.h>
#include <Storages/IStorage.h>
#include <Interpreters/Context.h>
namespace DB
......
......@@ -6,6 +6,7 @@
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Interpreters/Context.h>
#include <mysqlxx/Pool.h>
......
......@@ -3,6 +3,7 @@
#include <Storages/AlterCommands.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/Context.h>
#include <Databases/IDatabase.h>
#include <IO/WriteHelpers.h>
......
#pragma once
#include <Common/config.h>
#include "config_core.h"
namespace DB
{
......
......@@ -7,6 +7,7 @@
#include <Storages/System/StorageSystemNumbers.h>
#include <Access/AccessFlags.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
#include "registerTableFunctions.h"
......
......@@ -6,6 +6,7 @@
#include <Storages/StorageS3.h>
#include <Access/AccessFlags.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionS3.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
......
......@@ -17,6 +17,7 @@
#include <Access/AccessFlags.h>
#include <Interpreters/convertFieldToType.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
#include "registerTableFunctions.h"
......
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ParserCreateQuery.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/Context.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册