diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 11a325ae25f2a8e537def96d96d4478cc65ad739..a1f42018bbebe754ca2f50774aadb40802ec74e5 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -57,6 +57,8 @@ #include #include #include +#include +#include #include #include @@ -464,7 +466,10 @@ Block InterpreterSelectQuery::getSampleBlock() BlockIO InterpreterSelectQuery::execute() { BlockIO res; - executeImpl(res.pipeline, input, std::move(input_pipe)); + QueryPlan query_plan; + executeImpl(query_plan, input, std::move(input_pipe)); + + res.pipeline = std::move(*query_plan.buildQueryPipeline()); res.pipeline.addInterpreterContext(context); res.pipeline.addStorageHolder(storage); @@ -683,7 +688,7 @@ static UInt64 getLimitForSorting(const ASTSelectQuery & query, const Context & c return 0; } -void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockInputStreamPtr & prepared_input, std::optional prepared_pipe) +void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInputStreamPtr & prepared_input, std::optional prepared_pipe) { /** Streams of data. When the query is executed in parallel, we have several data streams. * If there is no GROUP BY, then perform all operations before ORDER BY and LIMIT in parallel, then @@ -704,30 +709,30 @@ void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockIn if (options.only_analyze) { - ReadNothingStep read_nothing(DataStream{.header = source_header}); - read_nothing.initializePipeline(pipeline); + auto read_nothing = std::make_unique(source_header); + query_plan.addStep(std::move(read_nothing)); if (expressions.prewhere_info) { - FilterStep prewhere_step( - DataStream{.header = pipeline.getHeader()}, + auto prewhere_step = std::make_unique( + query_plan.getCurrentDataStream(), expressions.prewhere_info->prewhere_actions, expressions.prewhere_info->prewhere_column_name, expressions.prewhere_info->remove_prewhere_column); - prewhere_step.setStepDescription("PREWHERE"); - prewhere_step.transformPipeline(pipeline); + prewhere_step->setStepDescription("PREWHERE"); + query_plan.addStep(std::move(prewhere_step)); // To remove additional columns in dry run // For example, sample column which can be removed in this stage if (expressions.prewhere_info->remove_columns_actions) { - ExpressionStep remove_columns( - DataStream{.header = pipeline.getHeader()}, + auto remove_columns = std::make_unique( + query_plan.getCurrentDataStream(), expressions.prewhere_info->remove_columns_actions); - remove_columns.setStepDescription("Remove unnecessary columns after PREWHERE"); - remove_columns.transformPipeline(pipeline); + remove_columns->setStepDescription("Remove unnecessary columns after PREWHERE"); + query_plan.addStep(std::move(remove_columns)); } } } @@ -735,13 +740,14 @@ void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockIn { if (prepared_input) { - ReadFromPreparedSource prepared_source_step(Pipe(std::make_shared(prepared_input))); - prepared_source_step.initializePipeline(pipeline); + auto prepared_source_step = std::make_unique( + Pipe(std::make_shared(prepared_input))); + query_plan.addStep(std::move(prepared_source_step)); } else if (prepared_pipe) { - ReadFromPreparedSource prepared_source_step(std::move(*prepared_pipe)); - prepared_source_step.initializePipeline(pipeline); + auto prepared_source_step = std::make_unique(std::move(*prepared_pipe)); + query_plan.addStep(std::move(prepared_source_step)); } if (from_stage == QueryProcessingStage::WithMergeableState && @@ -752,7 +758,7 @@ void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockIn throw Exception("PREWHERE is not supported if the table is filtered by row-level security expression", ErrorCodes::ILLEGAL_PREWHERE); /** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */ - executeFetchColumns(from_stage, pipeline, expressions.prewhere_info, expressions.columns_to_remove_after_prewhere); + executeFetchColumns(from_stage, query_plan, expressions.prewhere_info, expressions.columns_to_remove_after_prewhere); LOG_TRACE(log, "{} -> {}", QueryProcessingStage::toString(from_stage), QueryProcessingStage::toString(options.to_stage)); } @@ -783,19 +789,19 @@ void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockIn if (!expressions.second_stage && !expressions.need_aggregate && !expressions.hasHaving()) { if (expressions.has_order_by) - executeOrder(pipeline, query_info.input_order_info); + executeOrder(query_plan, query_info.input_order_info); if (expressions.has_order_by && query.limitLength()) - executeDistinct(pipeline, false, expressions.selected_columns, true); + executeDistinct(query_plan, false, expressions.selected_columns, true); if (expressions.hasLimitBy()) { - executeExpression(pipeline, expressions.before_limit_by, "Before LIMIT BY"); - executeLimitBy(pipeline); + executeExpression(query_plan, expressions.before_limit_by, "Before LIMIT BY"); + executeLimitBy(query_plan); } if (query.limitLength()) - executePreLimit(pipeline, true); + executePreLimit(query_plan, true); } }; @@ -806,21 +812,21 @@ void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockIn preliminary_sort(); if (expressions.need_aggregate) - executeMergeAggregated(pipeline, aggregate_overflow_row, aggregate_final); + executeMergeAggregated(query_plan, aggregate_overflow_row, aggregate_final); } if (expressions.first_stage) { if (expressions.hasFilter()) { - FilterStep row_level_security_step( - DataStream{.header = pipeline.getHeader()}, + auto row_level_security_step = std::make_unique( + query_plan.getCurrentDataStream(), expressions.filter_info->actions, expressions.filter_info->column_name, expressions.filter_info->do_remove_column); - row_level_security_step.setStepDescription("Row-level security filter"); - row_level_security_step.transformPipeline(pipeline); + row_level_security_step->setStepDescription("Row-level security filter"); + query_plan.addStep(std::move(row_level_security_step)); } if (expressions.hasJoin()) @@ -828,15 +834,7 @@ void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockIn Block join_result_sample; JoinPtr join = expressions.before_join->getTableJoinAlgo(); - join_result_sample = ExpressionTransform::transformHeader(pipeline.getHeader(), expressions.before_join); - - /// In case joined subquery has totals, and we don't, add default chunk to totals. - bool default_totals = false; - if (!pipeline.hasTotals()) - { - pipeline.addDefaultTotals(); - default_totals = true; - } + join_result_sample = ExpressionTransform::transformHeader(query_plan.getCurrentDataStream().header, expressions.before_join); bool inflating_join = false; if (join) @@ -846,61 +844,60 @@ void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockIn inflating_join = isCross(hash_join->getKind()); } + QueryPlanStepPtr before_join_step; if (inflating_join) { - InflatingExpressionStep before_join_step( - DataStream{.header = pipeline.getHeader()}, + before_join_step = std::make_unique( + query_plan.getCurrentDataStream(), expressions.before_join, - default_totals); + true); - before_join_step.setStepDescription("JOIN"); - before_join_step.transformPipeline(pipeline); } else { - ExpressionStep before_join_step( - DataStream{.header = pipeline.getHeader()}, + before_join_step = std::make_unique( + query_plan.getCurrentDataStream(), expressions.before_join, - default_totals); - - before_join_step.setStepDescription("JOIN"); - before_join_step.transformPipeline(pipeline); + true); } + before_join_step->setStepDescription("JOIN"); + query_plan.addStep(std::move(before_join_step)); + if (join) { if (auto stream = join->createStreamWithNonJoinedRows(join_result_sample, settings.max_block_size)) { auto source = std::make_shared(std::move(stream)); - AddingDelayedStreamStep add_non_joined_rows_step( - DataStream{.header = pipeline.getHeader()}, std::move(source)); + auto add_non_joined_rows_step = std::make_unique( + query_plan.getCurrentDataStream(), std::move(source)); - add_non_joined_rows_step.setStepDescription("Add non-joined rows after JOIN"); - add_non_joined_rows_step.transformPipeline(pipeline); + add_non_joined_rows_step->setStepDescription("Add non-joined rows after JOIN"); + query_plan.addStep(std::move(add_non_joined_rows_step)); } } } if (expressions.hasWhere()) - executeWhere(pipeline, expressions.before_where, expressions.remove_where_filter); + executeWhere(query_plan, expressions.before_where, expressions.remove_where_filter); if (expressions.need_aggregate) { - executeAggregation(pipeline, expressions.before_aggregation, aggregate_overflow_row, aggregate_final, query_info.input_order_info); + executeAggregation(query_plan, expressions.before_aggregation, aggregate_overflow_row, aggregate_final, query_info.input_order_info); /// We need to reset input order info, so that executeOrder can't use it query_info.input_order_info.reset(); } else { - executeExpression(pipeline, expressions.before_order_and_select, "Before ORDER BY and SELECT"); - executeDistinct(pipeline, true, expressions.selected_columns, true); + executeExpression(query_plan, expressions.before_order_and_select, "Before ORDER BY and SELECT"); + executeDistinct(query_plan, true, expressions.selected_columns, true); } preliminary_sort(); // If there is no global subqueries, we can run subqueries only when receive them on server. if (!query_analyzer->hasGlobalSubqueries() && !subqueries_for_sets.empty()) - executeSubqueriesInSetsAndJoins(pipeline, subqueries_for_sets); + executeSubqueriesInSetsAndJoins(query_plan, subqueries_for_sets); } if (expressions.second_stage) @@ -911,40 +908,38 @@ void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockIn { /// If you need to combine aggregated results from multiple servers if (!expressions.first_stage) - executeMergeAggregated(pipeline, aggregate_overflow_row, aggregate_final); + executeMergeAggregated(query_plan, aggregate_overflow_row, aggregate_final); if (!aggregate_final) { if (query.group_by_with_totals) { bool final = !query.group_by_with_rollup && !query.group_by_with_cube; - executeTotalsAndHaving(pipeline, expressions.hasHaving(), expressions.before_having, aggregate_overflow_row, final); + executeTotalsAndHaving(query_plan, expressions.hasHaving(), expressions.before_having, aggregate_overflow_row, final); } if (query.group_by_with_rollup) - executeRollupOrCube(pipeline, Modificator::ROLLUP); + executeRollupOrCube(query_plan, Modificator::ROLLUP); else if (query.group_by_with_cube) - executeRollupOrCube(pipeline, Modificator::CUBE); + executeRollupOrCube(query_plan, Modificator::CUBE); if ((query.group_by_with_rollup || query.group_by_with_cube) && expressions.hasHaving()) { if (query.group_by_with_totals) throw Exception("WITH TOTALS and WITH ROLLUP or CUBE are not supported together in presence of HAVING", ErrorCodes::NOT_IMPLEMENTED); - executeHaving(pipeline, expressions.before_having); + executeHaving(query_plan, expressions.before_having); } } else if (expressions.hasHaving()) - executeHaving(pipeline, expressions.before_having); + executeHaving(query_plan, expressions.before_having); - executeExpression(pipeline, expressions.before_order_and_select, "Before ORDER BY and SELECT"); - executeDistinct(pipeline, true, expressions.selected_columns, true); + executeExpression(query_plan, expressions.before_order_and_select, "Before ORDER BY and SELECT"); + executeDistinct(query_plan, true, expressions.selected_columns, true); } else if (query.group_by_with_totals || query.group_by_with_rollup || query.group_by_with_cube) throw Exception("WITH TOTALS, ROLLUP or CUBE are not supported without aggregation", ErrorCodes::NOT_IMPLEMENTED); - need_second_distinct_pass = query.distinct && pipeline.hasMixedStreams(); - if (expressions.has_order_by) { /** If there is an ORDER BY for distributed query processing, @@ -953,57 +948,57 @@ void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockIn */ if (!expressions.first_stage && !expressions.need_aggregate && !(query.group_by_with_totals && !aggregate_final)) - executeMergeSorted(pipeline, "before ORDER BY"); + executeMergeSorted(query_plan, "before ORDER BY"); else /// Otherwise, just sort. - executeOrder(pipeline, query_info.input_order_info); + executeOrder(query_plan, query_info.input_order_info); } /** Optimization - if there are several sources and there is LIMIT, then first apply the preliminary LIMIT, * limiting the number of rows in each up to `offset + limit`. */ bool has_prelimit = false; - if (query.limitLength() && !query.limit_with_ties && pipeline.hasMoreThanOneStream() && + if (query.limitLength() && !query.limit_with_ties && !query.distinct && !expressions.hasLimitBy() && !settings.extremes) { - executePreLimit(pipeline, false); + executePreLimit(query_plan, false); has_prelimit = true; } /** If there was more than one stream, * then DISTINCT needs to be performed once again after merging all streams. */ - if (need_second_distinct_pass) - executeDistinct(pipeline, false, expressions.selected_columns, false); + if (query.distinct) + executeDistinct(query_plan, false, expressions.selected_columns, false); if (expressions.hasLimitBy()) { - executeExpression(pipeline, expressions.before_limit_by, "Before LIMIT BY"); - executeLimitBy(pipeline); + executeExpression(query_plan, expressions.before_limit_by, "Before LIMIT BY"); + executeLimitBy(query_plan); } - executeWithFill(pipeline); + executeWithFill(query_plan); /** We must do projection after DISTINCT because projection may remove some columns. */ - executeProjection(pipeline, expressions.final_projection); + executeProjection(query_plan, expressions.final_projection); /** Extremes are calculated before LIMIT, but after LIMIT BY. This is Ok. */ - executeExtremes(pipeline); + executeExtremes(query_plan); if (!has_prelimit) /// Limit is no longer needed if there is prelimit. - executeLimit(pipeline); + executeLimit(query_plan); - executeOffset(pipeline); + executeOffset(query_plan); } } if (query_analyzer->hasGlobalSubqueries() && !subqueries_for_sets.empty()) - executeSubqueriesInSetsAndJoins(pipeline, subqueries_for_sets); + executeSubqueriesInSetsAndJoins(query_plan, subqueries_for_sets); } void InterpreterSelectQuery::executeFetchColumns( - QueryProcessingStage::Enum processing_stage, QueryPipeline & pipeline, + QueryProcessingStage::Enum processing_stage, QueryPlan & query_plan, const PrewhereInfoPtr & prewhere_info, const Names & columns_to_remove_after_prewhere) { auto & query = getSelectQuery(); diff --git a/src/Interpreters/InterpreterSelectQuery.h b/src/Interpreters/InterpreterSelectQuery.h index 6b0169a4b682c4d7259d83a5b645497c7c5b485b..7ab266b52eb10102faad10a2befa98e03c79377c 100644 --- a/src/Interpreters/InterpreterSelectQuery.h +++ b/src/Interpreters/InterpreterSelectQuery.h @@ -14,7 +14,6 @@ #include #include -#include #include namespace Poco { class Logger; } @@ -25,6 +24,7 @@ namespace DB struct SubqueryForSet; class InterpreterSelectWithUnionQuery; class Context; +class QueryPlan; struct SyntaxAnalyzerResult; using SyntaxAnalyzerResultPtr = std::shared_ptr; @@ -104,35 +104,35 @@ private: Block getSampleBlockImpl(); - void executeImpl(QueryPipeline & pipeline, const BlockInputStreamPtr & prepared_input, std::optional prepared_pipe); + void executeImpl(QueryPlan & query_plan, const BlockInputStreamPtr & prepared_input, std::optional prepared_pipe); /// Different stages of query execution. void executeFetchColumns( QueryProcessingStage::Enum processing_stage, - QueryPipeline & pipeline, + QueryPlan & query_plan, const PrewhereInfoPtr & prewhere_info, const Names & columns_to_remove_after_prewhere); - void executeWhere(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter); - void executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info); - void executeMergeAggregated(QueryPipeline & pipeline, bool overflow_row, bool final); - void executeTotalsAndHaving(QueryPipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final); - void executeHaving(QueryPipeline & pipeline, const ExpressionActionsPtr & expression); - static void executeExpression(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, const std::string & description); - void executeOrder(QueryPipeline & pipeline, InputOrderInfoPtr sorting_info); - void executeOrderOptimized(QueryPipeline & pipeline, InputOrderInfoPtr sorting_info, UInt64 limit, SortDescription & output_order_descr); - void executeWithFill(QueryPipeline & pipeline); - void executeMergeSorted(QueryPipeline & pipeline, const std::string & description); - void executePreLimit(QueryPipeline & pipeline, bool do_not_skip_offset); - void executeLimitBy(QueryPipeline & pipeline); - void executeLimit(QueryPipeline & pipeline); - void executeOffset(QueryPipeline & pipeline); - static void executeProjection(QueryPipeline & pipeline, const ExpressionActionsPtr & expression); - void executeDistinct(QueryPipeline & pipeline, bool before_order, Names columns, bool pre_distinct); - void executeExtremes(QueryPipeline & pipeline); - void executeSubqueriesInSetsAndJoins(QueryPipeline & pipeline, const std::unordered_map & subqueries_for_sets); - void executeMergeSorted(QueryPipeline & pipeline, const SortDescription & sort_description, UInt64 limit, const std::string & description); + void executeWhere(QueryPlan & query_plan, const ExpressionActionsPtr & expression, bool remove_filter); + void executeAggregation(QueryPlan & query_plan, const ExpressionActionsPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info); + void executeMergeAggregated(QueryPlan & query_plan, bool overflow_row, bool final); + void executeTotalsAndHaving(QueryPlan & query_plan, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final); + void executeHaving(QueryPlan & query_plan, const ExpressionActionsPtr & expression); + static void executeExpression(QueryPlan & query_plan, const ExpressionActionsPtr & expression, const std::string & description); + void executeOrder(QueryPlan & query_plan, InputOrderInfoPtr sorting_info); + void executeOrderOptimized(QueryPlan & query_plan, InputOrderInfoPtr sorting_info, UInt64 limit, SortDescription & output_order_descr); + void executeWithFill(QueryPlan & query_plan); + void executeMergeSorted(QueryPlan & query_plan, const std::string & description); + void executePreLimit(QueryPlan & query_plan, bool do_not_skip_offset); + void executeLimitBy(QueryPlan & query_plan); + void executeLimit(QueryPlan & query_plan); + void executeOffset(QueryPlan & query_plan); + static void executeProjection(QueryPlan & query_plan, const ExpressionActionsPtr & expression); + void executeDistinct(QueryPlan & query_plan, bool before_order, Names columns, bool pre_distinct); + void executeExtremes(QueryPlan & query_plan); + void executeSubqueriesInSetsAndJoins(QueryPlan & query_plan, const std::unordered_map & subqueries_for_sets); + void executeMergeSorted(QueryPlan & query_plan, const SortDescription & sort_description, UInt64 limit, const std::string & description); String generateFilterActions( ExpressionActionsPtr & actions, const ASTPtr & row_policy_filter, const Names & prerequisite_columns = {}) const; @@ -143,7 +143,7 @@ private: CUBE = 1 }; - void executeRollupOrCube(QueryPipeline & pipeline, Modificator modificator); + void executeRollupOrCube(QueryPlan & query_plan, Modificator modificator); /** If there is a SETTINGS section in the SELECT query, then apply settings from it. * diff --git a/src/Processors/QueryPlan/ExpressionStep.cpp b/src/Processors/QueryPlan/ExpressionStep.cpp index c3f7177eea043a6e0d951bf9c722c57e627c11ca..c4823f62ea3c9b4670e7f594b811b4e2ec417322 100644 --- a/src/Processors/QueryPlan/ExpressionStep.cpp +++ b/src/Processors/QueryPlan/ExpressionStep.cpp @@ -43,28 +43,47 @@ ExpressionStep::ExpressionStep(const DataStream & input_stream_, ExpressionActio void ExpressionStep::transformPipeline(QueryPipeline & pipeline) { + /// In case joined subquery has totals, and we don't, add default chunk to totals. + bool add_default_totals = false; + if (default_totals && !pipeline.hasTotals()) + { + pipeline.addDefaultTotals(); + add_default_totals = true; + } + pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) { bool on_totals = stream_type == QueryPipeline::StreamType::Totals; - return std::make_shared(header, expression, on_totals, default_totals); + return std::make_shared(header, expression, on_totals, add_default_totals); }); } InflatingExpressionStep::InflatingExpressionStep(const DataStream & input_stream_, ExpressionActionsPtr expression_, bool default_totals_) : ITransformingStep( input_stream_, - DataStream{.header = ExpressionTransform::transformHeader(input_stream_.header, expression_)}) + ExpressionTransform::transformHeader(input_stream_.header, expression_), + getTraits(expression_)) , expression(std::move(expression_)) , default_totals(default_totals_) { + filterDistinctColumns(output_stream->header, output_stream->distinct_columns); + filterDistinctColumns(output_stream->header, output_stream->local_distinct_columns); } void InflatingExpressionStep::transformPipeline(QueryPipeline & pipeline) { + /// In case joined subquery has totals, and we don't, add default chunk to totals. + bool add_default_totals = false; + if (default_totals && !pipeline.hasTotals()) + { + pipeline.addDefaultTotals(); + add_default_totals = true; + } + pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) { bool on_totals = stream_type == QueryPipeline::StreamType::Totals; - return std::make_shared(header, expression, on_totals, default_totals); + return std::make_shared(header, expression, on_totals, add_default_totals); }); } diff --git a/src/Processors/QueryPlan/ReadNothingStep.cpp b/src/Processors/QueryPlan/ReadNothingStep.cpp index 153fe60301fbc552f3e0f46d2fd38f302286c885..cdf1c248e5785bbca2a0f3c0c1fd031c99e913ae 100644 --- a/src/Processors/QueryPlan/ReadNothingStep.cpp +++ b/src/Processors/QueryPlan/ReadNothingStep.cpp @@ -5,8 +5,8 @@ namespace DB { -ReadNothingStep::ReadNothingStep(DataStream output_stream_) - : ISourceStep(std::move(output_stream_)) +ReadNothingStep::ReadNothingStep(Block output_header) + : ISourceStep(DataStream{.header = std::move(output_header)}) { } diff --git a/src/Processors/QueryPlan/ReadNothingStep.h b/src/Processors/QueryPlan/ReadNothingStep.h index b881e1fad29e88352aa03e972c42558c29f35721..8580331f6b098a974f74e3eedc8dab5bee2cd7e6 100644 --- a/src/Processors/QueryPlan/ReadNothingStep.h +++ b/src/Processors/QueryPlan/ReadNothingStep.h @@ -7,7 +7,7 @@ namespace DB class ReadNothingStep : public ISourceStep { public: - explicit ReadNothingStep(DataStream output_stream_); + explicit ReadNothingStep(Block output_header); String getName() const override { return "ReadNothing"; }