From 1ad4f2c0fd4ad17c5081d86551daf02174eefb55 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 17 Jun 2020 18:50:21 +0300 Subject: [PATCH] Add CreatingSets step. --- src/Interpreters/InterpreterSelectQuery.cpp | 9 ++++-- src/Processors/QueryPlan/CreatingSetsStep.cpp | 30 +++++++++++++++++++ src/Processors/QueryPlan/CreatingSetsStep.h | 28 +++++++++++++++++ .../Transforms/CreatingSetsTransform.cpp | 8 ++--- .../Transforms/CreatingSetsTransform.h | 4 +-- src/Processors/ya.make | 1 + 6 files changed, 71 insertions(+), 9 deletions(-) create mode 100644 src/Processors/QueryPlan/CreatingSetsStep.cpp create mode 100644 src/Processors/QueryPlan/CreatingSetsStep.h diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index e64595dd75..11c3a6b121 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -88,6 +88,7 @@ #include #include #include +#include namespace DB @@ -1873,12 +1874,14 @@ void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(QueryPipeline & pip const Settings & settings = context->getSettingsRef(); - auto creating_sets = std::make_shared( - pipeline.getHeader(), subqueries_for_sets, + CreatingSetsStep creating_sets( + DataStream{.header = pipeline.getHeader()}, + subqueries_for_sets, SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode), *context); - pipeline.addCreatingSetsTransform(std::move(creating_sets)); + creating_sets.setStepDescription("Create sets for subqueries and joins"); + creating_sets.transformPipeline(pipeline); } diff --git a/src/Processors/QueryPlan/CreatingSetsStep.cpp b/src/Processors/QueryPlan/CreatingSetsStep.cpp new file mode 100644 index 0000000000..fa2eb9b34b --- /dev/null +++ b/src/Processors/QueryPlan/CreatingSetsStep.cpp @@ -0,0 +1,30 @@ +#include +#include +#include + +namespace DB +{ + +CreatingSetsStep::CreatingSetsStep( + const DataStream & input_stream_, + SubqueriesForSets subqueries_for_sets_, + SizeLimits network_transfer_limits_, + const Context & context_) + : ITransformingStep(input_stream_, input_stream_) + , subqueries_for_sets(std::move(subqueries_for_sets_)) + , network_transfer_limits(std::move(network_transfer_limits_)) + , context(context_) +{ +} + +void CreatingSetsStep::transformPipeline(QueryPipeline & pipeline) +{ + auto creating_sets = std::make_shared( + pipeline.getHeader(), subqueries_for_sets, + network_transfer_limits, + context); + + pipeline.addCreatingSetsTransform(std::move(creating_sets)); +} + +} diff --git a/src/Processors/QueryPlan/CreatingSetsStep.h b/src/Processors/QueryPlan/CreatingSetsStep.h new file mode 100644 index 0000000000..d3c4db3050 --- /dev/null +++ b/src/Processors/QueryPlan/CreatingSetsStep.h @@ -0,0 +1,28 @@ +#pragma once +#include +#include +#include + +namespace DB +{ + +class CreatingSetsStep : public ITransformingStep +{ +public: + CreatingSetsStep( + const DataStream & input_stream_, + SubqueriesForSets subqueries_for_sets_, + SizeLimits network_transfer_limits_, + const Context & context_); + + String getName() const override { return "CreatingSets"; } + + void transformPipeline(QueryPipeline & pipeline) override; + +private: + SubqueriesForSets subqueries_for_sets; + SizeLimits network_transfer_limits; + const Context & context; +}; + +} diff --git a/src/Processors/Transforms/CreatingSetsTransform.cpp b/src/Processors/Transforms/CreatingSetsTransform.cpp index af8fa4097d..321419c092 100644 --- a/src/Processors/Transforms/CreatingSetsTransform.cpp +++ b/src/Processors/Transforms/CreatingSetsTransform.cpp @@ -23,13 +23,13 @@ namespace ErrorCodes CreatingSetsTransform::CreatingSetsTransform( Block out_header_, - const SubqueriesForSets & subqueries_for_sets_, - const SizeLimits & network_transfer_limits_, + SubqueriesForSets subqueries_for_sets_, + SizeLimits network_transfer_limits_, const Context & context_) : IProcessor({}, {std::move(out_header_)}) - , subqueries_for_sets(subqueries_for_sets_) + , subqueries_for_sets(std::move(subqueries_for_sets_)) , cur_subquery(subqueries_for_sets.begin()) - , network_transfer_limits(network_transfer_limits_) + , network_transfer_limits(std::move(network_transfer_limits_)) , context(context_) { } diff --git a/src/Processors/Transforms/CreatingSetsTransform.h b/src/Processors/Transforms/CreatingSetsTransform.h index f6df604295..ac9ac7130f 100644 --- a/src/Processors/Transforms/CreatingSetsTransform.h +++ b/src/Processors/Transforms/CreatingSetsTransform.h @@ -21,8 +21,8 @@ class CreatingSetsTransform : public IProcessor public: CreatingSetsTransform( Block out_header_, - const SubqueriesForSets & subqueries_for_sets_, - const SizeLimits & network_transfer_limits_, + SubqueriesForSets subqueries_for_sets_, + SizeLimits network_transfer_limits_, const Context & context_); String getName() const override { return "CreatingSetsTransform"; } diff --git a/src/Processors/ya.make b/src/Processors/ya.make index 8d373fbf05..537d81e975 100644 --- a/src/Processors/ya.make +++ b/src/Processors/ya.make @@ -139,6 +139,7 @@ SRCS( Transforms/AggregatingInOrderTransform.cpp QueryPlan/AddingDelayedStreamStep.cpp QueryPlan/AggregatingStep.cpp + QueryPlan/CreatingSetsStep.cpp QueryPlan/DistinctStep.cpp QueryPlan/ExpressionStep.cpp QueryPlan/FilterStep.cpp -- GitLab