From 45adacf0bc0ff5db21e5b711a99202a87837027e Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 18 Jun 2020 20:45:00 +0300 Subject: [PATCH] Use QueryPlan in InterpreterSelectQuery [part 2]. --- src/Interpreters/InterpreterSelectQuery.cpp | 251 +++++++++--------- src/Interpreters/InterpreterSelectQuery.h | 3 + .../InterpreterSelectWithUnionQuery.cpp | 61 ++--- .../InterpreterSelectWithUnionQuery.h | 7 +- src/Processors/QueryPipeline.cpp | 5 +- src/Processors/QueryPipeline.h | 2 +- src/Processors/QueryPlan/ConvertingStep.cpp | 48 ++++ src/Processors/QueryPlan/ConvertingStep.h | 20 ++ src/Processors/QueryPlan/MergeSortingStep.cpp | 2 +- src/Processors/QueryPlan/QueryPlan.cpp | 43 +++ src/Processors/QueryPlan/QueryPlan.h | 7 + src/Processors/QueryPlan/TotalsHavingStep.h | 1 - src/Processors/QueryPlan/UnionStep.cpp | 39 +++ src/Processors/QueryPlan/UnionStep.h | 22 ++ src/Processors/ya.make | 2 + 15 files changed, 341 insertions(+), 172 deletions(-) create mode 100644 src/Processors/QueryPlan/ConvertingStep.cpp create mode 100644 src/Processors/QueryPlan/ConvertingStep.h create mode 100644 src/Processors/QueryPlan/UnionStep.cpp create mode 100644 src/Processors/QueryPlan/UnionStep.h diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index a1f42018bb..7911372e5b 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -77,6 +77,7 @@ #include #include #include +#include namespace DB @@ -462,26 +463,29 @@ Block InterpreterSelectQuery::getSampleBlock() return result_header; } +void InterpreterSelectQuery::buildQueryPlan(QueryPlan & query_plan) +{ + executeImpl(query_plan, input, std::move(input_pipe)); + + /// We must guarantee that result structure is the same as in getSampleBlock() + if (!blocksHaveEqualStructure(query_plan.getCurrentDataStream().header, result_header)) + { + auto converting = std::make_unique(query_plan.getCurrentDataStream(), result_header); + query_plan.addStep(std::move(converting)); + } +} BlockIO InterpreterSelectQuery::execute() { BlockIO res; QueryPlan query_plan; - executeImpl(query_plan, input, std::move(input_pipe)); + + buildQueryPlan(query_plan); res.pipeline = std::move(*query_plan.buildQueryPipeline()); res.pipeline.addInterpreterContext(context); res.pipeline.addStorageHolder(storage); - /// We must guarantee that result structure is the same as in getSampleBlock() - if (!blocksHaveEqualStructure(res.pipeline.getHeader(), result_header)) - { - res.pipeline.addSimpleTransform([&](const Block & header) - { - return std::make_shared(header, result_header, ConvertingTransform::MatchColumnsMode::Name); - }); - } - return res; } @@ -1045,9 +1049,9 @@ void InterpreterSelectQuery::executeFetchColumns( {std::move(column), std::make_shared(func, argument_types, desc.parameters), desc.column_name}}; auto istream = std::make_shared(block_with_count); - ReadFromPreparedSource prepared_count(Pipe(std::make_shared(istream))); - prepared_count.setStepDescription("Optimized trivial count"); - prepared_count.initializePipeline(pipeline); + auto prepared_count = std::make_unique(Pipe(std::make_shared(istream))); + prepared_count->setStepDescription("Optimized trivial count"); + query_plan.addStep(std::move(prepared_count)); from_stage = QueryProcessingStage::WithMergeableState; analysis_result.first_stage = false; return; @@ -1241,7 +1245,7 @@ void InterpreterSelectQuery::executeFetchColumns( { is_remote = true; max_streams = settings.max_distributed_connections; - pipeline.setMaxThreads(max_streams); + query_plan.setMaxThreads(max_streams); } UInt64 max_block_size = settings.max_block_size; @@ -1266,14 +1270,14 @@ void InterpreterSelectQuery::executeFetchColumns( { max_block_size = std::max(UInt64(1), limit_length + limit_offset); max_streams = 1; - pipeline.setMaxThreads(max_streams); + query_plan.setMaxThreads(max_streams); } if (!max_block_size) throw Exception("Setting 'max_block_size' cannot be zero", ErrorCodes::PARAMETER_OUT_OF_BOUND); /// Initialize the initial data streams to which the query transforms are superimposed. Table or subquery or prepared input? - if (pipeline.initialized()) + if (query_plan.isInitialized()) { /// Prepared input. } @@ -1295,7 +1299,7 @@ void InterpreterSelectQuery::executeFetchColumns( interpreter_subquery->ignoreWithTotals(); } - pipeline = interpreter_subquery->execute().pipeline; + interpreter_subquery->buildQueryPlan(query_plan); } else if (storage) { @@ -1331,13 +1335,12 @@ void InterpreterSelectQuery::executeFetchColumns( query_info.input_order_info = query_info.order_optimizer->getInputOrder(storage); } - ReadFromStorageStep read_step( + auto read_step = std::make_unique( table_lock, options, storage, required_columns, query_info, *context, processing_stage, max_block_size, max_streams); - read_step.setStepDescription("Read from " + storage->getName()); - - pipeline = std::move(*read_step.updatePipeline({})); + read_step->setStepDescription("Read from " + storage->getName()); + query_plan.addStep(std::move(read_step)); } else throw Exception("Logical error in InterpreterSelectQuery: nowhere to read", ErrorCodes::LOGICAL_ERROR); @@ -1345,32 +1348,33 @@ void InterpreterSelectQuery::executeFetchColumns( /// Aliases in table declaration. if (processing_stage == QueryProcessingStage::FetchColumns && alias_actions) { - ExpressionStep table_aliases(DataStream{.header = pipeline.getHeader()}, alias_actions); - table_aliases.setStepDescription("Add table aliases"); - table_aliases.transformPipeline(pipeline); + auto table_aliases = std::make_unique(query_plan.getCurrentDataStream(), alias_actions); + table_aliases->setStepDescription("Add table aliases"); + query_plan.addStep(std::move(table_aliases)); } } -void InterpreterSelectQuery::executeWhere(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter) +void InterpreterSelectQuery::executeWhere(QueryPlan & query_plan, const ExpressionActionsPtr & expression, bool remove_filter) { - FilterStep where_step( - DataStream{.header = pipeline.getHeader()}, + auto where_step = std::make_unique( + query_plan.getCurrentDataStream(), expression, getSelectQuery().where()->getColumnName(), remove_filter); - where_step.setStepDescription("WHERE"); - where_step.transformPipeline(pipeline); + where_step->setStepDescription("WHERE"); + query_plan.addStep(std::move(where_step)); } -void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info) +void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const ExpressionActionsPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info) { - ExpressionStep expression_before_aggregation(DataStream{.header = pipeline.getHeader()}, expression); - expression_before_aggregation.transformPipeline(pipeline); + auto expression_before_aggregation = std::make_unique(query_plan.getCurrentDataStream(), expression); + expression_before_aggregation->setStepDescription("Before GROUP BY"); + query_plan.addStep(std::move(expression_before_aggregation)); - Block header_before_aggregation = pipeline.getHeader(); + const auto & header_before_aggregation = query_plan.getCurrentDataStream().header; ColumnNumbers keys; for (const auto & key : query_analyzer->aggregationKeys()) keys.push_back(header_before_aggregation.getPositionByName(key.name)); @@ -1412,8 +1416,8 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const bool storage_has_evenly_distributed_read = storage && storage->hasEvenlyDistributedRead(); - AggregatingStep aggregating_step( - DataStream{.header = pipeline.getHeader()}, + auto aggregating_step = std::make_unique( + query_plan.getCurrentDataStream(), std::move(transform_params), settings.max_block_size, merge_threads, @@ -1422,13 +1426,13 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const std::move(group_by_info), std::move(group_by_sort_description)); - aggregating_step.transformPipeline(pipeline); + query_plan.addStep(std::move(aggregating_step)); } -void InterpreterSelectQuery::executeMergeAggregated(QueryPipeline & pipeline, bool overflow_row, bool final) +void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool overflow_row, bool final) { - Block header_before_merge = pipeline.getHeader(); + const auto & header_before_merge = query_plan.getCurrentDataStream().header; ColumnNumbers keys; for (const auto & key : query_analyzer->aggregationKeys()) @@ -1455,47 +1459,45 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPipeline & pipeline, bo auto transform_params = std::make_shared(params, final); - MergingAggregatedStep merging_aggregated( - DataStream{.header = pipeline.getHeader()}, + auto merging_aggregated = std::make_unique( + query_plan.getCurrentDataStream(), std::move(transform_params), settings.distributed_aggregation_memory_efficient, settings.max_threads, settings.aggregation_memory_efficient_merge_threads); - merging_aggregated.transformPipeline(pipeline); + query_plan.addStep(std::move(merging_aggregated)); } -void InterpreterSelectQuery::executeHaving(QueryPipeline & pipeline, const ExpressionActionsPtr & expression) +void InterpreterSelectQuery::executeHaving(QueryPlan & query_plan, const ExpressionActionsPtr & expression) { - FilterStep having_step( - DataStream{.header = pipeline.getHeader()}, + auto having_step = std::make_unique( + query_plan.getCurrentDataStream(), expression, getSelectQuery().having()->getColumnName(), false); - having_step.setStepDescription("HAVING"); - having_step.transformPipeline(pipeline); + having_step->setStepDescription("HAVING"); + query_plan.addStep(std::move(having_step)); } -void InterpreterSelectQuery::executeTotalsAndHaving(QueryPipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final) +void InterpreterSelectQuery::executeTotalsAndHaving(QueryPlan & query_plan, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final) { const Settings & settings = context->getSettingsRef(); - TotalsHavingStep totals_having_step( - DataStream{.header = pipeline.getHeader()}, + auto totals_having_step = std::make_unique( + query_plan.getCurrentDataStream(), overflow_row, expression, has_having ? getSelectQuery().having()->getColumnName() : "", settings.totals_mode, settings.totals_auto_threshold, final); - totals_having_step.transformPipeline(pipeline); + query_plan.addStep(std::move(totals_having_step)); } -void InterpreterSelectQuery::executeRollupOrCube(QueryPipeline & pipeline, Modificator modificator) +void InterpreterSelectQuery::executeRollupOrCube(QueryPlan & query_plan, Modificator modificator) { - pipeline.resize(1); - - Block header_before_transform = pipeline.getHeader(); + const auto & header_before_transform = query_plan.getCurrentDataStream().header; ColumnNumbers keys; @@ -1512,45 +1514,40 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPipeline & pipeline, Modif auto transform_params = std::make_shared(params, true); + QueryPlanStepPtr step; if (modificator == Modificator::ROLLUP) - { - RollupStep rollup_step(DataStream{.header = pipeline.getHeader()}, std::move(transform_params)); - rollup_step.transformPipeline(pipeline); - } + step = std::make_unique(query_plan.getCurrentDataStream(), std::move(transform_params)); else - { - CubeStep cube_step(DataStream{.header = pipeline.getHeader()}, std::move(transform_params)); - cube_step.transformPipeline(pipeline); - } + step = std::make_unique(query_plan.getCurrentDataStream(), std::move(transform_params)); + + query_plan.addStep(std::move(step)); } -void InterpreterSelectQuery::executeExpression(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, const std::string & description) +void InterpreterSelectQuery::executeExpression(QueryPlan & query_plan, const ExpressionActionsPtr & expression, const std::string & description) { - ExpressionStep expression_step( - DataStream{.header = pipeline.getHeader()}, - expression); + auto expression_step = std::make_unique(query_plan.getCurrentDataStream(), expression); - expression_step.setStepDescription(description); - expression_step.transformPipeline(pipeline); + expression_step->setStepDescription(description); + query_plan.addStep(std::move(expression_step)); } -void InterpreterSelectQuery::executeOrderOptimized(QueryPipeline & pipeline, InputOrderInfoPtr input_sorting_info, UInt64 limit, SortDescription & output_order_descr) +void InterpreterSelectQuery::executeOrderOptimized(QueryPlan & query_plan, InputOrderInfoPtr input_sorting_info, UInt64 limit, SortDescription & output_order_descr) { const Settings & settings = context->getSettingsRef(); - FinishSortingStep finish_sorting_step( - DataStream{.header = pipeline.getHeader()}, + auto finish_sorting_step = std::make_unique( + query_plan.getCurrentDataStream(), input_sorting_info->order_key_prefix_descr, output_order_descr, settings.max_block_size, limit); - finish_sorting_step.transformPipeline(pipeline); + query_plan.addStep(std::move(finish_sorting_step)); } -void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputOrderInfoPtr input_sorting_info) +void InterpreterSelectQuery::executeOrder(QueryPlan & query_plan, InputOrderInfoPtr input_sorting_info) { auto & query = getSelectQuery(); SortDescription output_order_descr = getSortDescription(query, *context); @@ -1564,69 +1561,69 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputOrderIn * and then merge them into one sorted stream. * At this stage we merge per-thread streams into one. */ - executeOrderOptimized(pipeline, input_sorting_info, limit, output_order_descr); + executeOrderOptimized(query_plan, input_sorting_info, limit, output_order_descr); return; } const Settings & settings = context->getSettingsRef(); - PartialSortingStep partial_sorting( - DataStream{.header = pipeline.getHeader()}, + auto partial_sorting = std::make_unique( + query_plan.getCurrentDataStream(), output_order_descr, limit, SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode)); - partial_sorting.setStepDescription("Sort each block before ORDER BY"); - partial_sorting.transformPipeline(pipeline); + partial_sorting->setStepDescription("Sort each block before ORDER BY"); + query_plan.addStep(std::move(partial_sorting)); /// Merge the sorted blocks. - MergeSortingStep merge_sorting_step( - DataStream{.header = pipeline.getHeader()}, + auto merge_sorting_step = std::make_unique( + query_plan.getCurrentDataStream(), output_order_descr, settings.max_block_size, limit, - settings.max_bytes_before_remerge_sort / pipeline.getNumStreams(), + settings.max_bytes_before_remerge_sort, settings.max_bytes_before_external_sort, context->getTemporaryVolume(), settings.min_free_disk_space_for_temporary_data); - merge_sorting_step.setStepDescription("Merge sorted blocks before ORDER BY"); - merge_sorting_step.transformPipeline(pipeline); + merge_sorting_step->setStepDescription("Merge sorted blocks before ORDER BY"); + query_plan.addStep(std::move(merge_sorting_step)); /// If there are several streams, we merge them into one - executeMergeSorted(pipeline, output_order_descr, limit, "before ORDER BY"); + executeMergeSorted(query_plan, output_order_descr, limit, "before ORDER BY"); } -void InterpreterSelectQuery::executeMergeSorted(QueryPipeline & pipeline, const std::string & description) +void InterpreterSelectQuery::executeMergeSorted(QueryPlan & query_plan, const std::string & description) { auto & query = getSelectQuery(); SortDescription order_descr = getSortDescription(query, *context); UInt64 limit = getLimitForSorting(query, *context); - executeMergeSorted(pipeline, order_descr, limit, description); + executeMergeSorted(query_plan, order_descr, limit, description); } -void InterpreterSelectQuery::executeMergeSorted(QueryPipeline & pipeline, const SortDescription & sort_description, UInt64 limit, const std::string & description) +void InterpreterSelectQuery::executeMergeSorted(QueryPlan & query_plan, const SortDescription & sort_description, UInt64 limit, const std::string & description) { const Settings & settings = context->getSettingsRef(); - MergingSortedStep merging_sorted( - DataStream{.header = pipeline.getHeader()}, + auto merging_sorted = std::make_unique( + query_plan.getCurrentDataStream(), sort_description, settings.max_block_size, limit); - merging_sorted.setStepDescription("Merge sorted streams " + description); - merging_sorted.transformPipeline(pipeline); + merging_sorted->setStepDescription("Merge sorted streams " + description); + query_plan.addStep(std::move(merging_sorted)); } -void InterpreterSelectQuery::executeProjection(QueryPipeline & pipeline, const ExpressionActionsPtr & expression) +void InterpreterSelectQuery::executeProjection(QueryPlan & query_plan, const ExpressionActionsPtr & expression) { - ExpressionStep projection_step(DataStream{.header = pipeline.getHeader()}, expression); - projection_step.setStepDescription("Projection"); - projection_step.transformPipeline(pipeline); + auto projection_step = std::make_unique(query_plan.getCurrentDataStream(), expression); + projection_step->setStepDescription("Projection"); + query_plan.addStep(std::move(projection_step)); } -void InterpreterSelectQuery::executeDistinct(QueryPipeline & pipeline, bool before_order, Names columns, bool pre_distinct) +void InterpreterSelectQuery::executeDistinct(QueryPlan & query_plan, bool before_order, Names columns, bool pre_distinct) { auto & query = getSelectQuery(); if (query.distinct) @@ -1642,20 +1639,20 @@ void InterpreterSelectQuery::executeDistinct(QueryPipeline & pipeline, bool befo SizeLimits limits(settings.max_rows_in_distinct, settings.max_bytes_in_distinct, settings.distinct_overflow_mode); - DistinctStep distinct_step( - DataStream{.header = pipeline.getHeader()}, + auto distinct_step = std::make_unique( + query_plan.getCurrentDataStream(), limits, limit_for_distinct, columns, pre_distinct); if (pre_distinct) - distinct_step.setStepDescription("Preliminary DISTINCT"); + distinct_step->setStepDescription("Preliminary DISTINCT"); - distinct_step.transformPipeline(pipeline); + query_plan.addStep(std::move(distinct_step)); } } /// Preliminary LIMIT - is used in every source, if there are several sources, before they are combined. -void InterpreterSelectQuery::executePreLimit(QueryPipeline & pipeline, bool do_not_skip_offset) +void InterpreterSelectQuery::executePreLimit(QueryPlan & query_plan, bool do_not_skip_offset) { auto & query = getSelectQuery(); /// If there is LIMIT @@ -1669,14 +1666,14 @@ void InterpreterSelectQuery::executePreLimit(QueryPipeline & pipeline, bool do_n limit_offset = 0; } - LimitStep limit(DataStream{.header = pipeline.getHeader()}, limit_length, limit_offset); - limit.setStepDescription("preliminary LIMIT"); - limit.transformPipeline(pipeline); + auto limit = std::make_unique(query_plan.getCurrentDataStream(), limit_length, limit_offset); + limit->setStepDescription("preliminary LIMIT"); + query_plan.addStep(std::move(limit)); } } -void InterpreterSelectQuery::executeLimitBy(QueryPipeline & pipeline) +void InterpreterSelectQuery::executeLimitBy(QueryPlan & query_plan) { auto & query = getSelectQuery(); if (!query.limitByLength() || !query.limitBy()) @@ -1689,8 +1686,8 @@ void InterpreterSelectQuery::executeLimitBy(QueryPipeline & pipeline) UInt64 length = getLimitUIntValue(query.limitByLength(), *context, "LIMIT"); UInt64 offset = (query.limitByOffset() ? getLimitUIntValue(query.limitByOffset(), *context, "OFFSET") : 0); - LimitByStep limit_by(DataStream{.header = pipeline.getHeader()}, length, offset, columns); - limit_by.transformPipeline(pipeline); + auto limit_by = std::make_unique(query_plan.getCurrentDataStream(), length, offset, columns); + query_plan.addStep(std::move(limit_by)); } @@ -1719,7 +1716,7 @@ namespace } } -void InterpreterSelectQuery::executeWithFill(QueryPipeline & pipeline) +void InterpreterSelectQuery::executeWithFill(QueryPlan & query_plan) { auto & query = getSelectQuery(); if (query.orderBy()) @@ -1735,13 +1732,13 @@ void InterpreterSelectQuery::executeWithFill(QueryPipeline & pipeline) if (fill_descr.empty()) return; - FillingStep filling_step(DataStream{.header = pipeline.getHeader()}, std::move(fill_descr)); - filling_step.transformPipeline(pipeline); + auto filling_step = std::make_unique(query_plan.getCurrentDataStream(), std::move(fill_descr)); + query_plan.addStep(std::move(filling_step)); } } -void InterpreterSelectQuery::executeLimit(QueryPipeline & pipeline) +void InterpreterSelectQuery::executeLimit(QueryPlan & query_plan) { auto & query = getSelectQuery(); /// If there is LIMIT @@ -1776,19 +1773,19 @@ void InterpreterSelectQuery::executeLimit(QueryPipeline & pipeline) order_descr = getSortDescription(query, *context); } - LimitStep limit( - DataStream{.header = pipeline.getHeader()}, + auto limit = std::make_unique( + query_plan.getCurrentDataStream(), limit_length, limit_offset, always_read_till_end, query.limit_with_ties, order_descr); if (query.limit_with_ties) - limit.setStepDescription("LIMIT WITH TIES"); + limit->setStepDescription("LIMIT WITH TIES"); - limit.transformPipeline(pipeline); + query_plan.addStep(std::move(limit)); } } -void InterpreterSelectQuery::executeOffset(QueryPipeline & pipeline) +void InterpreterSelectQuery::executeOffset(QueryPlan & query_plan) { auto & query = getSelectQuery(); /// If there is not a LIMIT but an offset @@ -1798,35 +1795,35 @@ void InterpreterSelectQuery::executeOffset(QueryPipeline & pipeline) UInt64 limit_offset; std::tie(limit_length, limit_offset) = getLimitLengthAndOffset(query, *context); - OffsetsStep offsets_step(DataStream{.header = pipeline.getHeader()}, limit_offset); - offsets_step.transformPipeline(pipeline); + auto offsets_step = std::make_unique(query_plan.getCurrentDataStream(), limit_offset); + query_plan.addStep(std::move(offsets_step)); } } -void InterpreterSelectQuery::executeExtremes(QueryPipeline & pipeline) +void InterpreterSelectQuery::executeExtremes(QueryPlan & query_plan) { if (!context->getSettingsRef().extremes) return; - ExtremesStep extremes_step(DataStream{.header = pipeline.getHeader()}); - extremes_step.transformPipeline(pipeline); + auto extremes_step = std::make_unique(query_plan.getCurrentDataStream()); + query_plan.addStep(std::move(extremes_step)); } -void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(QueryPipeline & pipeline, const SubqueriesForSets & subqueries_for_sets) +void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(QueryPlan & query_plan, const SubqueriesForSets & subqueries_for_sets) { if (query_info.input_order_info) - executeMergeSorted(pipeline, query_info.input_order_info->order_key_prefix_descr, 0, "before creating sets for subqueries and joins"); + executeMergeSorted(query_plan, query_info.input_order_info->order_key_prefix_descr, 0, "before creating sets for subqueries and joins"); const Settings & settings = context->getSettingsRef(); - CreatingSetsStep creating_sets( - DataStream{.header = pipeline.getHeader()}, + auto creating_sets = std::make_unique( + query_plan.getCurrentDataStream(), subqueries_for_sets, SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode), *context); - creating_sets.setStepDescription("Create sets for subqueries and joins"); - creating_sets.transformPipeline(pipeline); + creating_sets->setStepDescription("Create sets for subqueries and joins"); + query_plan.addStep(std::move(creating_sets)); } diff --git a/src/Interpreters/InterpreterSelectQuery.h b/src/Interpreters/InterpreterSelectQuery.h index 7ab266b52e..fdf6176cb2 100644 --- a/src/Interpreters/InterpreterSelectQuery.h +++ b/src/Interpreters/InterpreterSelectQuery.h @@ -77,6 +77,9 @@ public: /// Execute a query. Get the stream of blocks to read. BlockIO execute() override; + /// Builds QueryPlan for current query. + void buildQueryPlan(QueryPlan & query_plan); + bool ignoreLimits() const override { return options.ignore_limits; } bool ignoreQuota() const override { return options.ignore_quota; } diff --git a/src/Interpreters/InterpreterSelectWithUnionQuery.cpp b/src/Interpreters/InterpreterSelectWithUnionQuery.cpp index 7b86616555..6a922bebc6 100644 --- a/src/Interpreters/InterpreterSelectWithUnionQuery.cpp +++ b/src/Interpreters/InterpreterSelectWithUnionQuery.cpp @@ -6,10 +6,9 @@ #include #include #include - -#include -#include -#include +#include +#include +#include namespace DB @@ -173,47 +172,35 @@ Block InterpreterSelectWithUnionQuery::getSampleBlock( return cache[key] = InterpreterSelectWithUnionQuery(query_ptr, context, SelectQueryOptions().analyze()).getSampleBlock(); } - -BlockIO InterpreterSelectWithUnionQuery::execute() +void InterpreterSelectWithUnionQuery::buildQueryPlan(QueryPlan & query_plan) { - BlockIO res; - QueryPipeline & main_pipeline = res.pipeline; - std::vector pipelines; - bool has_main_pipeline = false; - - Blocks headers; - headers.reserve(nested_interpreters.size()); + size_t num_plans = nested_interpreters.size(); + std::vector plans(num_plans); + DataStreams data_streams(num_plans); - for (auto & interpreter : nested_interpreters) + for (size_t i = 0; i < num_plans; ++i) { - if (!has_main_pipeline) - { - has_main_pipeline = true; - main_pipeline = interpreter->execute().pipeline; - headers.emplace_back(main_pipeline.getHeader()); - } - else - { - pipelines.emplace_back(interpreter->execute().pipeline); - headers.emplace_back(pipelines.back().getHeader()); - } + nested_interpreters[i]->buildQueryPlan(plans[i]); + data_streams[i] = plans[i].getCurrentDataStream(); } - if (!has_main_pipeline) - main_pipeline.init(Pipe(std::make_shared(getSampleBlock()))); + auto max_threads = context->getSettingsRef().max_threads; + auto union_step = std::make_unique(std::move(data_streams), result_header, max_threads); - if (!pipelines.empty()) - { - auto common_header = getCommonHeaderForUnion(headers); - main_pipeline.unitePipelines(std::move(pipelines), common_header); + query_plan.unitePlans(std::move(union_step), std::move(plans)); +} - // nested queries can force 1 thread (due to simplicity) - // but in case of union this cannot be done. - UInt64 max_threads = context->getSettingsRef().max_threads; - main_pipeline.setMaxThreads(std::min(nested_interpreters.size(), max_threads)); - } +BlockIO InterpreterSelectWithUnionQuery::execute() +{ + BlockIO res; + + QueryPlan query_plan; + buildQueryPlan(query_plan); + + auto pipeline = query_plan.buildQueryPipeline(); - main_pipeline.addInterpreterContext(context); + res.pipeline = std::move(*pipeline); + res.pipeline.addInterpreterContext(context); return res; } diff --git a/src/Interpreters/InterpreterSelectWithUnionQuery.h b/src/Interpreters/InterpreterSelectWithUnionQuery.h index 3b5fe533a8..5590066a4d 100644 --- a/src/Interpreters/InterpreterSelectWithUnionQuery.h +++ b/src/Interpreters/InterpreterSelectWithUnionQuery.h @@ -5,14 +5,12 @@ #include #include -#include - namespace DB { class Context; class InterpreterSelectQuery; - +class QueryPlan; /** Interprets one or multiple SELECT queries inside UNION ALL chain. */ @@ -27,6 +25,9 @@ public: ~InterpreterSelectWithUnionQuery() override; + /// Builds QueryPlan for current query. + void buildQueryPlan(QueryPlan & query_plan); + BlockIO execute() override; bool ignoreLimits() const override { return options.ignore_limits; } diff --git a/src/Processors/QueryPipeline.cpp b/src/Processors/QueryPipeline.cpp index 5b6109440d..a4e4ab2595 100644 --- a/src/Processors/QueryPipeline.cpp +++ b/src/Processors/QueryPipeline.cpp @@ -563,7 +563,7 @@ void QueryPipeline::setOutputFormat(ProcessorPtr output) } void QueryPipeline::unitePipelines( - std::vector && pipelines, const Block & common_header) + std::vector> pipelines, const Block & common_header) { if (initialized()) { @@ -583,8 +583,9 @@ void QueryPipeline::unitePipelines( if (totals_having_port) totals.push_back(totals_having_port); - for (auto & pipeline : pipelines) + for (auto & pipeline_ptr : pipelines) { + auto & pipeline = *pipeline_ptr; pipeline.checkInitialized(); if (!pipeline.isCompleted()) diff --git a/src/Processors/QueryPipeline.h b/src/Processors/QueryPipeline.h index 129b7f5ae3..6d9409ffc4 100644 --- a/src/Processors/QueryPipeline.h +++ b/src/Processors/QueryPipeline.h @@ -135,7 +135,7 @@ public: void enableQuotaForCurrentStreams(); - void unitePipelines(std::vector && pipelines, const Block & common_header); + void unitePipelines(std::vector> pipelines, const Block & common_header); PipelineExecutorPtr execute(); diff --git a/src/Processors/QueryPlan/ConvertingStep.cpp b/src/Processors/QueryPlan/ConvertingStep.cpp new file mode 100644 index 0000000000..312210fd14 --- /dev/null +++ b/src/Processors/QueryPlan/ConvertingStep.cpp @@ -0,0 +1,48 @@ +#include +#include +#include + +namespace DB +{ + +static ITransformingStep::DataStreamTraits getTraits() +{ + return ITransformingStep::DataStreamTraits{ + .preserves_distinct_columns = true + }; +} + +static void filterDistinctColumns(const Block & res_header, NameSet & distinct_columns) +{ + if (distinct_columns.empty()) + return; + + NameSet new_distinct_columns; + for (const auto & column : res_header) + if (distinct_columns.count(column.name)) + new_distinct_columns.insert(column.name); + + distinct_columns.swap(new_distinct_columns); +} + +ConvertingStep::ConvertingStep(const DataStream & input_stream_, Block result_header_) + : ITransformingStep( + input_stream_, + result_header_, + getTraits()) + , result_header(std::move(result_header_)) +{ + /// Some columns may be removed + filterDistinctColumns(output_stream->header, output_stream->distinct_columns); + filterDistinctColumns(output_stream->header, output_stream->local_distinct_columns); +} + +void ConvertingStep::transformPipeline(QueryPipeline & pipeline) +{ + pipeline.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, result_header, ConvertingTransform::MatchColumnsMode::Name); + }); +} + +} diff --git a/src/Processors/QueryPlan/ConvertingStep.h b/src/Processors/QueryPlan/ConvertingStep.h new file mode 100644 index 0000000000..540deece24 --- /dev/null +++ b/src/Processors/QueryPlan/ConvertingStep.h @@ -0,0 +1,20 @@ +#pragma once +#include + +namespace DB +{ + +class ConvertingStep : public ITransformingStep +{ +public: + ConvertingStep(const DataStream & input_stream_, Block result_header_); + + String getName() const override { return "Converting"; } + + void transformPipeline(QueryPipeline & pipeline) override; + +private: + Block result_header; +}; + +} diff --git a/src/Processors/QueryPlan/MergeSortingStep.cpp b/src/Processors/QueryPlan/MergeSortingStep.cpp index bc48ae1a98..d34fac4d3c 100644 --- a/src/Processors/QueryPlan/MergeSortingStep.cpp +++ b/src/Processors/QueryPlan/MergeSortingStep.cpp @@ -40,7 +40,7 @@ void MergeSortingStep::transformPipeline(QueryPipeline & pipeline) return std::make_shared( header, description, max_merged_block_size, limit, - max_bytes_before_remerge, + max_bytes_before_remerge / pipeline.getNumStreams(), max_bytes_before_external_sort, tmp_volume, min_free_disk_space); diff --git a/src/Processors/QueryPlan/QueryPlan.cpp b/src/Processors/QueryPlan/QueryPlan.cpp index d29b66876a..cbbd5bf5ef 100644 --- a/src/Processors/QueryPlan/QueryPlan.cpp +++ b/src/Processors/QueryPlan/QueryPlan.cpp @@ -35,6 +35,43 @@ const DataStream & QueryPlan::getCurrentDataStream() const return root->step->getOutputStream(); } +void QueryPlan::unitePlans(QueryPlanStepPtr step, std::vector plans) +{ + if (isInitialized()) + throw Exception("Cannot unite plans because current QueryPlan is already initialized", + ErrorCodes::LOGICAL_ERROR); + + const auto & inputs = step->getInputStreams(); + size_t num_inputs = step->getInputStreams().size(); + if (num_inputs != plans.size()) + { + throw Exception("Cannot unite QueryPlans using " + step->getName() + + " because step has different number of inputs. " + "Has " + std::to_string(plans.size()) + " plans " + "and " + std::to_string(num_inputs) + " inputs", ErrorCodes::LOGICAL_ERROR); + } + + for (size_t i = 0; i < num_inputs; ++i) + { + const auto & step_header = inputs[i].header; + const auto & plan_header = plans[i].getCurrentDataStream().header; + if (!blocksHaveEqualStructure(step_header, plan_header)) + throw Exception("Cannot unite QueryPlans using " + step->getName() + " because " + "it has incompatible header with plan " + root->step->getName() + " " + "plan header: " + plan_header.dumpStructure() + + "step header: " + step_header.dumpStructure(), ErrorCodes::LOGICAL_ERROR); + } + + for (auto & plan : plans) + nodes.insert(nodes.end(), plan.nodes.begin(), plan.nodes.end()); + + nodes.emplace_back(Node{.step = std::move(step)}); + root = &nodes.back(); + + for (auto & plan : plans) + root->children.emplace_back(plan.root); +} + void QueryPlan::addStep(QueryPlanStepPtr step) { checkNotCompleted(); @@ -48,6 +85,7 @@ void QueryPlan::addStep(QueryPlanStepPtr step) "step has no inputs, but QueryPlan is already initialised", ErrorCodes::LOGICAL_ERROR); nodes.emplace_back(Node{.step = std::move(step)}); + root = &nodes.back(); return; } @@ -100,7 +138,12 @@ QueryPipelinePtr QueryPlan::buildQueryPipeline() size_t next_child = frame.pipelines.size(); if (next_child == frame.node->children.size()) { + bool limit_max_threads = frame.pipelines.empty(); last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines)); + + if (limit_max_threads) + last_pipeline->setMaxThreads(max_threads); + stack.pop(); } else diff --git a/src/Processors/QueryPlan/QueryPlan.h b/src/Processors/QueryPlan/QueryPlan.h index 168cfc3665..d47c5052a4 100644 --- a/src/Processors/QueryPlan/QueryPlan.h +++ b/src/Processors/QueryPlan/QueryPlan.h @@ -18,6 +18,7 @@ using QueryPipelinePtr = std::unique_ptr; class QueryPlan { public: + void unitePlans(QueryPlanStepPtr step, std::vector plans); void addStep(QueryPlanStepPtr step); bool isInitialized() const { return root != nullptr; } /// Tree is not empty @@ -26,6 +27,10 @@ public: QueryPipelinePtr buildQueryPipeline(); + /// Set upper limit for the recommend number of threads. Will be applied to the newly-created pipelines. + /// TODO: make it in a better way. + void setMaxThreads(size_t max_threads_) { max_threads = max_threads_; } + private: struct Node { @@ -40,6 +45,8 @@ private: void checkInitialized() const; void checkNotCompleted() const; + + size_t max_threads = 0; }; } diff --git a/src/Processors/QueryPlan/TotalsHavingStep.h b/src/Processors/QueryPlan/TotalsHavingStep.h index ddb0ccd257..52cc936f62 100644 --- a/src/Processors/QueryPlan/TotalsHavingStep.h +++ b/src/Processors/QueryPlan/TotalsHavingStep.h @@ -1,6 +1,5 @@ #pragma once #include -#include namespace DB { diff --git a/src/Processors/QueryPlan/UnionStep.cpp b/src/Processors/QueryPlan/UnionStep.cpp new file mode 100644 index 0000000000..0da7806e58 --- /dev/null +++ b/src/Processors/QueryPlan/UnionStep.cpp @@ -0,0 +1,39 @@ +#include +#include +#include +#include + +namespace DB +{ + +UnionStep::UnionStep(DataStreams input_streams_, Block result_header, size_t max_threads_) + : header(std::move(result_header)) + , max_threads(max_threads_) +{ + input_streams = std::move(input_streams_); + + /// TODO: update traits + output_stream = DataStream{.header = header}; +} + +QueryPipelinePtr UnionStep::updatePipeline(QueryPipelines pipelines) +{ + auto pipeline = std::make_unique(); + if (pipelines.empty()) + { + pipeline->init(Pipe(std::make_shared(output_stream->header))); + return pipeline; + } + + size_t num_pipelines = pipelines.size(); + pipeline->unitePipelines(std::move(pipelines), output_stream->header); + + if (num_pipelines > 1) + { + // nested queries can force 1 thread (due to simplicity) + // but in case of union this cannot be done. + pipeline->setMaxThreads(std::min(num_pipelines, max_threads)); + } +} + +} diff --git a/src/Processors/QueryPlan/UnionStep.h b/src/Processors/QueryPlan/UnionStep.h new file mode 100644 index 0000000000..2c3d17b2e8 --- /dev/null +++ b/src/Processors/QueryPlan/UnionStep.h @@ -0,0 +1,22 @@ +#pragma once +#include + +namespace DB +{ + +class UnionStep : public IQueryPlanStep +{ +public: + /// max_threads is used to limit the number of threads for result pipeline. + UnionStep(DataStreams input_streams_, Block result_header, size_t max_threads_); + + String getName() const override { return "Union"; } + + QueryPipelinePtr updatePipeline(QueryPipelines pipelines) override; + +private: + Block header; + size_t max_threads; +}; + +} diff --git a/src/Processors/ya.make b/src/Processors/ya.make index 07328af754..6e4d2e4aa5 100644 --- a/src/Processors/ya.make +++ b/src/Processors/ya.make @@ -139,6 +139,7 @@ SRCS( Transforms/AggregatingInOrderTransform.cpp QueryPlan/AddingDelayedStreamStep.cpp QueryPlan/AggregatingStep.cpp + QueryPlan/ConvertingStep.cpp QueryPlan/CreatingSetsStep.cpp QueryPlan/CubeStep.cpp QueryPlan/DistinctStep.cpp @@ -157,6 +158,7 @@ SRCS( QueryPlan/MergingSortedStep.cpp QueryPlan/OffsetsStep.cpp QueryPlan/PartialSortingStep.cpp + QueryPlan/UnionStep.cpp QueryPlan/ReadFromPreparedSource.cpp QueryPlan/ReadFromStorageStep.cpp QueryPlan/ReadNothingStep.cpp -- GitLab