提交 e0841360 编写于 作者: N Nikolai Kochetov

Use QueryPlan in InterpreterSelectQuery [part 1].

上级 5bb2ddc6
......@@ -57,6 +57,8 @@
#include <Processors/QueryPlan/ExtremesStep.h>
#include <Processors/QueryPlan/OffsetsStep.h>
#include <Processors/QueryPlan/FinishSortingStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPipeline.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeWhereOptimizer.h>
......@@ -464,7 +466,10 @@ Block InterpreterSelectQuery::getSampleBlock()
BlockIO InterpreterSelectQuery::execute()
{
BlockIO res;
executeImpl(res.pipeline, input, std::move(input_pipe));
QueryPlan query_plan;
executeImpl(query_plan, input, std::move(input_pipe));
res.pipeline = std::move(*query_plan.buildQueryPipeline());
res.pipeline.addInterpreterContext(context);
res.pipeline.addStorageHolder(storage);
......@@ -683,7 +688,7 @@ static UInt64 getLimitForSorting(const ASTSelectQuery & query, const Context & c
return 0;
}
void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockInputStreamPtr & prepared_input, std::optional<Pipe> prepared_pipe)
void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInputStreamPtr & prepared_input, std::optional<Pipe> prepared_pipe)
{
/** Streams of data. When the query is executed in parallel, we have several data streams.
* If there is no GROUP BY, then perform all operations before ORDER BY and LIMIT in parallel, then
......@@ -704,30 +709,30 @@ void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockIn
if (options.only_analyze)
{
ReadNothingStep read_nothing(DataStream{.header = source_header});
read_nothing.initializePipeline(pipeline);
auto read_nothing = std::make_unique<ReadNothingStep>(source_header);
query_plan.addStep(std::move(read_nothing));
if (expressions.prewhere_info)
{
FilterStep prewhere_step(
DataStream{.header = pipeline.getHeader()},
auto prewhere_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
expressions.prewhere_info->prewhere_actions,
expressions.prewhere_info->prewhere_column_name,
expressions.prewhere_info->remove_prewhere_column);
prewhere_step.setStepDescription("PREWHERE");
prewhere_step.transformPipeline(pipeline);
prewhere_step->setStepDescription("PREWHERE");
query_plan.addStep(std::move(prewhere_step));
// To remove additional columns in dry run
// For example, sample column which can be removed in this stage
if (expressions.prewhere_info->remove_columns_actions)
{
ExpressionStep remove_columns(
DataStream{.header = pipeline.getHeader()},
auto remove_columns = std::make_unique<ExpressionStep>(
query_plan.getCurrentDataStream(),
expressions.prewhere_info->remove_columns_actions);
remove_columns.setStepDescription("Remove unnecessary columns after PREWHERE");
remove_columns.transformPipeline(pipeline);
remove_columns->setStepDescription("Remove unnecessary columns after PREWHERE");
query_plan.addStep(std::move(remove_columns));
}
}
}
......@@ -735,13 +740,14 @@ void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockIn
{
if (prepared_input)
{
ReadFromPreparedSource prepared_source_step(Pipe(std::make_shared<SourceFromInputStream>(prepared_input)));
prepared_source_step.initializePipeline(pipeline);
auto prepared_source_step = std::make_unique<ReadFromPreparedSource>(
Pipe(std::make_shared<SourceFromInputStream>(prepared_input)));
query_plan.addStep(std::move(prepared_source_step));
}
else if (prepared_pipe)
{
ReadFromPreparedSource prepared_source_step(std::move(*prepared_pipe));
prepared_source_step.initializePipeline(pipeline);
auto prepared_source_step = std::make_unique<ReadFromPreparedSource>(std::move(*prepared_pipe));
query_plan.addStep(std::move(prepared_source_step));
}
if (from_stage == QueryProcessingStage::WithMergeableState &&
......@@ -752,7 +758,7 @@ void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockIn
throw Exception("PREWHERE is not supported if the table is filtered by row-level security expression", ErrorCodes::ILLEGAL_PREWHERE);
/** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */
executeFetchColumns(from_stage, pipeline, expressions.prewhere_info, expressions.columns_to_remove_after_prewhere);
executeFetchColumns(from_stage, query_plan, expressions.prewhere_info, expressions.columns_to_remove_after_prewhere);
LOG_TRACE(log, "{} -> {}", QueryProcessingStage::toString(from_stage), QueryProcessingStage::toString(options.to_stage));
}
......@@ -783,19 +789,19 @@ void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockIn
if (!expressions.second_stage && !expressions.need_aggregate && !expressions.hasHaving())
{
if (expressions.has_order_by)
executeOrder(pipeline, query_info.input_order_info);
executeOrder(query_plan, query_info.input_order_info);
if (expressions.has_order_by && query.limitLength())
executeDistinct(pipeline, false, expressions.selected_columns, true);
executeDistinct(query_plan, false, expressions.selected_columns, true);
if (expressions.hasLimitBy())
{
executeExpression(pipeline, expressions.before_limit_by, "Before LIMIT BY");
executeLimitBy(pipeline);
executeExpression(query_plan, expressions.before_limit_by, "Before LIMIT BY");
executeLimitBy(query_plan);
}
if (query.limitLength())
executePreLimit(pipeline, true);
executePreLimit(query_plan, true);
}
};
......@@ -806,21 +812,21 @@ void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockIn
preliminary_sort();
if (expressions.need_aggregate)
executeMergeAggregated(pipeline, aggregate_overflow_row, aggregate_final);
executeMergeAggregated(query_plan, aggregate_overflow_row, aggregate_final);
}
if (expressions.first_stage)
{
if (expressions.hasFilter())
{
FilterStep row_level_security_step(
DataStream{.header = pipeline.getHeader()},
auto row_level_security_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
expressions.filter_info->actions,
expressions.filter_info->column_name,
expressions.filter_info->do_remove_column);
row_level_security_step.setStepDescription("Row-level security filter");
row_level_security_step.transformPipeline(pipeline);
row_level_security_step->setStepDescription("Row-level security filter");
query_plan.addStep(std::move(row_level_security_step));
}
if (expressions.hasJoin())
......@@ -828,15 +834,7 @@ void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockIn
Block join_result_sample;
JoinPtr join = expressions.before_join->getTableJoinAlgo();
join_result_sample = ExpressionTransform::transformHeader(pipeline.getHeader(), expressions.before_join);
/// In case joined subquery has totals, and we don't, add default chunk to totals.
bool default_totals = false;
if (!pipeline.hasTotals())
{
pipeline.addDefaultTotals();
default_totals = true;
}
join_result_sample = ExpressionTransform::transformHeader(query_plan.getCurrentDataStream().header, expressions.before_join);
bool inflating_join = false;
if (join)
......@@ -846,61 +844,60 @@ void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockIn
inflating_join = isCross(hash_join->getKind());
}
QueryPlanStepPtr before_join_step;
if (inflating_join)
{
InflatingExpressionStep before_join_step(
DataStream{.header = pipeline.getHeader()},
before_join_step = std::make_unique<InflatingExpressionStep>(
query_plan.getCurrentDataStream(),
expressions.before_join,
default_totals);
true);
before_join_step.setStepDescription("JOIN");
before_join_step.transformPipeline(pipeline);
}
else
{
ExpressionStep before_join_step(
DataStream{.header = pipeline.getHeader()},
before_join_step = std::make_unique<ExpressionStep>(
query_plan.getCurrentDataStream(),
expressions.before_join,
default_totals);
before_join_step.setStepDescription("JOIN");
before_join_step.transformPipeline(pipeline);
true);
}
before_join_step->setStepDescription("JOIN");
query_plan.addStep(std::move(before_join_step));
if (join)
{
if (auto stream = join->createStreamWithNonJoinedRows(join_result_sample, settings.max_block_size))
{
auto source = std::make_shared<SourceFromInputStream>(std::move(stream));
AddingDelayedStreamStep add_non_joined_rows_step(
DataStream{.header = pipeline.getHeader()}, std::move(source));
auto add_non_joined_rows_step = std::make_unique<AddingDelayedStreamStep>(
query_plan.getCurrentDataStream(), std::move(source));
add_non_joined_rows_step.setStepDescription("Add non-joined rows after JOIN");
add_non_joined_rows_step.transformPipeline(pipeline);
add_non_joined_rows_step->setStepDescription("Add non-joined rows after JOIN");
query_plan.addStep(std::move(add_non_joined_rows_step));
}
}
}
if (expressions.hasWhere())
executeWhere(pipeline, expressions.before_where, expressions.remove_where_filter);
executeWhere(query_plan, expressions.before_where, expressions.remove_where_filter);
if (expressions.need_aggregate)
{
executeAggregation(pipeline, expressions.before_aggregation, aggregate_overflow_row, aggregate_final, query_info.input_order_info);
executeAggregation(query_plan, expressions.before_aggregation, aggregate_overflow_row, aggregate_final, query_info.input_order_info);
/// We need to reset input order info, so that executeOrder can't use it
query_info.input_order_info.reset();
}
else
{
executeExpression(pipeline, expressions.before_order_and_select, "Before ORDER BY and SELECT");
executeDistinct(pipeline, true, expressions.selected_columns, true);
executeExpression(query_plan, expressions.before_order_and_select, "Before ORDER BY and SELECT");
executeDistinct(query_plan, true, expressions.selected_columns, true);
}
preliminary_sort();
// If there is no global subqueries, we can run subqueries only when receive them on server.
if (!query_analyzer->hasGlobalSubqueries() && !subqueries_for_sets.empty())
executeSubqueriesInSetsAndJoins(pipeline, subqueries_for_sets);
executeSubqueriesInSetsAndJoins(query_plan, subqueries_for_sets);
}
if (expressions.second_stage)
......@@ -911,40 +908,38 @@ void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockIn
{
/// If you need to combine aggregated results from multiple servers
if (!expressions.first_stage)
executeMergeAggregated(pipeline, aggregate_overflow_row, aggregate_final);
executeMergeAggregated(query_plan, aggregate_overflow_row, aggregate_final);
if (!aggregate_final)
{
if (query.group_by_with_totals)
{
bool final = !query.group_by_with_rollup && !query.group_by_with_cube;
executeTotalsAndHaving(pipeline, expressions.hasHaving(), expressions.before_having, aggregate_overflow_row, final);
executeTotalsAndHaving(query_plan, expressions.hasHaving(), expressions.before_having, aggregate_overflow_row, final);
}
if (query.group_by_with_rollup)
executeRollupOrCube(pipeline, Modificator::ROLLUP);
executeRollupOrCube(query_plan, Modificator::ROLLUP);
else if (query.group_by_with_cube)
executeRollupOrCube(pipeline, Modificator::CUBE);
executeRollupOrCube(query_plan, Modificator::CUBE);
if ((query.group_by_with_rollup || query.group_by_with_cube) && expressions.hasHaving())
{
if (query.group_by_with_totals)
throw Exception("WITH TOTALS and WITH ROLLUP or CUBE are not supported together in presence of HAVING", ErrorCodes::NOT_IMPLEMENTED);
executeHaving(pipeline, expressions.before_having);
executeHaving(query_plan, expressions.before_having);
}
}
else if (expressions.hasHaving())
executeHaving(pipeline, expressions.before_having);
executeHaving(query_plan, expressions.before_having);
executeExpression(pipeline, expressions.before_order_and_select, "Before ORDER BY and SELECT");
executeDistinct(pipeline, true, expressions.selected_columns, true);
executeExpression(query_plan, expressions.before_order_and_select, "Before ORDER BY and SELECT");
executeDistinct(query_plan, true, expressions.selected_columns, true);
}
else if (query.group_by_with_totals || query.group_by_with_rollup || query.group_by_with_cube)
throw Exception("WITH TOTALS, ROLLUP or CUBE are not supported without aggregation", ErrorCodes::NOT_IMPLEMENTED);
need_second_distinct_pass = query.distinct && pipeline.hasMixedStreams();
if (expressions.has_order_by)
{
/** If there is an ORDER BY for distributed query processing,
......@@ -953,57 +948,57 @@ void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockIn
*/
if (!expressions.first_stage && !expressions.need_aggregate && !(query.group_by_with_totals && !aggregate_final))
executeMergeSorted(pipeline, "before ORDER BY");
executeMergeSorted(query_plan, "before ORDER BY");
else /// Otherwise, just sort.
executeOrder(pipeline, query_info.input_order_info);
executeOrder(query_plan, query_info.input_order_info);
}
/** Optimization - if there are several sources and there is LIMIT, then first apply the preliminary LIMIT,
* limiting the number of rows in each up to `offset + limit`.
*/
bool has_prelimit = false;
if (query.limitLength() && !query.limit_with_ties && pipeline.hasMoreThanOneStream() &&
if (query.limitLength() && !query.limit_with_ties &&
!query.distinct && !expressions.hasLimitBy() && !settings.extremes)
{
executePreLimit(pipeline, false);
executePreLimit(query_plan, false);
has_prelimit = true;
}
/** If there was more than one stream,
* then DISTINCT needs to be performed once again after merging all streams.
*/
if (need_second_distinct_pass)
executeDistinct(pipeline, false, expressions.selected_columns, false);
if (query.distinct)
executeDistinct(query_plan, false, expressions.selected_columns, false);
if (expressions.hasLimitBy())
{
executeExpression(pipeline, expressions.before_limit_by, "Before LIMIT BY");
executeLimitBy(pipeline);
executeExpression(query_plan, expressions.before_limit_by, "Before LIMIT BY");
executeLimitBy(query_plan);
}
executeWithFill(pipeline);
executeWithFill(query_plan);
/** We must do projection after DISTINCT because projection may remove some columns.
*/
executeProjection(pipeline, expressions.final_projection);
executeProjection(query_plan, expressions.final_projection);
/** Extremes are calculated before LIMIT, but after LIMIT BY. This is Ok.
*/
executeExtremes(pipeline);
executeExtremes(query_plan);
if (!has_prelimit) /// Limit is no longer needed if there is prelimit.
executeLimit(pipeline);
executeLimit(query_plan);
executeOffset(pipeline);
executeOffset(query_plan);
}
}
if (query_analyzer->hasGlobalSubqueries() && !subqueries_for_sets.empty())
executeSubqueriesInSetsAndJoins(pipeline, subqueries_for_sets);
executeSubqueriesInSetsAndJoins(query_plan, subqueries_for_sets);
}
void InterpreterSelectQuery::executeFetchColumns(
QueryProcessingStage::Enum processing_stage, QueryPipeline & pipeline,
QueryProcessingStage::Enum processing_stage, QueryPlan & query_plan,
const PrewhereInfoPtr & prewhere_info, const Names & columns_to_remove_after_prewhere)
{
auto & query = getSelectQuery();
......
......@@ -14,7 +14,6 @@
#include <Storages/ReadInOrderOptimizer.h>
#include <Interpreters/StorageID.h>
#include <Processors/QueryPipeline.h>
#include <Columns/FilterDescription.h>
namespace Poco { class Logger; }
......@@ -25,6 +24,7 @@ namespace DB
struct SubqueryForSet;
class InterpreterSelectWithUnionQuery;
class Context;
class QueryPlan;
struct SyntaxAnalyzerResult;
using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>;
......@@ -104,35 +104,35 @@ private:
Block getSampleBlockImpl();
void executeImpl(QueryPipeline & pipeline, const BlockInputStreamPtr & prepared_input, std::optional<Pipe> prepared_pipe);
void executeImpl(QueryPlan & query_plan, const BlockInputStreamPtr & prepared_input, std::optional<Pipe> prepared_pipe);
/// Different stages of query execution.
void executeFetchColumns(
QueryProcessingStage::Enum processing_stage,
QueryPipeline & pipeline,
QueryPlan & query_plan,
const PrewhereInfoPtr & prewhere_info,
const Names & columns_to_remove_after_prewhere);
void executeWhere(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter);
void executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info);
void executeMergeAggregated(QueryPipeline & pipeline, bool overflow_row, bool final);
void executeTotalsAndHaving(QueryPipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeHaving(QueryPipeline & pipeline, const ExpressionActionsPtr & expression);
static void executeExpression(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, const std::string & description);
void executeOrder(QueryPipeline & pipeline, InputOrderInfoPtr sorting_info);
void executeOrderOptimized(QueryPipeline & pipeline, InputOrderInfoPtr sorting_info, UInt64 limit, SortDescription & output_order_descr);
void executeWithFill(QueryPipeline & pipeline);
void executeMergeSorted(QueryPipeline & pipeline, const std::string & description);
void executePreLimit(QueryPipeline & pipeline, bool do_not_skip_offset);
void executeLimitBy(QueryPipeline & pipeline);
void executeLimit(QueryPipeline & pipeline);
void executeOffset(QueryPipeline & pipeline);
static void executeProjection(QueryPipeline & pipeline, const ExpressionActionsPtr & expression);
void executeDistinct(QueryPipeline & pipeline, bool before_order, Names columns, bool pre_distinct);
void executeExtremes(QueryPipeline & pipeline);
void executeSubqueriesInSetsAndJoins(QueryPipeline & pipeline, const std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
void executeMergeSorted(QueryPipeline & pipeline, const SortDescription & sort_description, UInt64 limit, const std::string & description);
void executeWhere(QueryPlan & query_plan, const ExpressionActionsPtr & expression, bool remove_filter);
void executeAggregation(QueryPlan & query_plan, const ExpressionActionsPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info);
void executeMergeAggregated(QueryPlan & query_plan, bool overflow_row, bool final);
void executeTotalsAndHaving(QueryPlan & query_plan, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeHaving(QueryPlan & query_plan, const ExpressionActionsPtr & expression);
static void executeExpression(QueryPlan & query_plan, const ExpressionActionsPtr & expression, const std::string & description);
void executeOrder(QueryPlan & query_plan, InputOrderInfoPtr sorting_info);
void executeOrderOptimized(QueryPlan & query_plan, InputOrderInfoPtr sorting_info, UInt64 limit, SortDescription & output_order_descr);
void executeWithFill(QueryPlan & query_plan);
void executeMergeSorted(QueryPlan & query_plan, const std::string & description);
void executePreLimit(QueryPlan & query_plan, bool do_not_skip_offset);
void executeLimitBy(QueryPlan & query_plan);
void executeLimit(QueryPlan & query_plan);
void executeOffset(QueryPlan & query_plan);
static void executeProjection(QueryPlan & query_plan, const ExpressionActionsPtr & expression);
void executeDistinct(QueryPlan & query_plan, bool before_order, Names columns, bool pre_distinct);
void executeExtremes(QueryPlan & query_plan);
void executeSubqueriesInSetsAndJoins(QueryPlan & query_plan, const std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
void executeMergeSorted(QueryPlan & query_plan, const SortDescription & sort_description, UInt64 limit, const std::string & description);
String generateFilterActions(
ExpressionActionsPtr & actions, const ASTPtr & row_policy_filter, const Names & prerequisite_columns = {}) const;
......@@ -143,7 +143,7 @@ private:
CUBE = 1
};
void executeRollupOrCube(QueryPipeline & pipeline, Modificator modificator);
void executeRollupOrCube(QueryPlan & query_plan, Modificator modificator);
/** If there is a SETTINGS section in the SELECT query, then apply settings from it.
*
......
......@@ -43,28 +43,47 @@ ExpressionStep::ExpressionStep(const DataStream & input_stream_, ExpressionActio
void ExpressionStep::transformPipeline(QueryPipeline & pipeline)
{
/// In case joined subquery has totals, and we don't, add default chunk to totals.
bool add_default_totals = false;
if (default_totals && !pipeline.hasTotals())
{
pipeline.addDefaultTotals();
add_default_totals = true;
}
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
{
bool on_totals = stream_type == QueryPipeline::StreamType::Totals;
return std::make_shared<ExpressionTransform>(header, expression, on_totals, default_totals);
return std::make_shared<ExpressionTransform>(header, expression, on_totals, add_default_totals);
});
}
InflatingExpressionStep::InflatingExpressionStep(const DataStream & input_stream_, ExpressionActionsPtr expression_, bool default_totals_)
: ITransformingStep(
input_stream_,
DataStream{.header = ExpressionTransform::transformHeader(input_stream_.header, expression_)})
ExpressionTransform::transformHeader(input_stream_.header, expression_),
getTraits(expression_))
, expression(std::move(expression_))
, default_totals(default_totals_)
{
filterDistinctColumns(output_stream->header, output_stream->distinct_columns);
filterDistinctColumns(output_stream->header, output_stream->local_distinct_columns);
}
void InflatingExpressionStep::transformPipeline(QueryPipeline & pipeline)
{
/// In case joined subquery has totals, and we don't, add default chunk to totals.
bool add_default_totals = false;
if (default_totals && !pipeline.hasTotals())
{
pipeline.addDefaultTotals();
add_default_totals = true;
}
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
{
bool on_totals = stream_type == QueryPipeline::StreamType::Totals;
return std::make_shared<InflatingExpressionTransform>(header, expression, on_totals, default_totals);
return std::make_shared<InflatingExpressionTransform>(header, expression, on_totals, add_default_totals);
});
}
......
......@@ -5,8 +5,8 @@
namespace DB
{
ReadNothingStep::ReadNothingStep(DataStream output_stream_)
: ISourceStep(std::move(output_stream_))
ReadNothingStep::ReadNothingStep(Block output_header)
: ISourceStep(DataStream{.header = std::move(output_header)})
{
}
......
......@@ -7,7 +7,7 @@ namespace DB
class ReadNothingStep : public ISourceStep
{
public:
explicit ReadNothingStep(DataStream output_stream_);
explicit ReadNothingStep(Block output_header);
String getName() const override { return "ReadNothing"; }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册