diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 9452fbeaa43ce7a05668c0bf80bcf51fd5ad6d8c..bec7d6fec18590fccd85a2a061ba96a74f144d90 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -90,6 +90,8 @@ #include #include #include +#include +#include namespace DB @@ -1540,16 +1542,16 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPipeline & pipeline, Modif auto transform_params = std::make_shared(params, true); - pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr + if (modificator == Modificator::ROLLUP) { - if (stream_type == QueryPipeline::StreamType::Totals) - return nullptr; - - if (modificator == Modificator::ROLLUP) - return std::make_shared(header, std::move(transform_params)); - else - return std::make_shared(header, std::move(transform_params)); - }); + RollupStep rollup_step(DataStream{.header = pipeline.getHeader()}, std::move(transform_params)); + rollup_step.transformPipeline(pipeline); + } + else + { + CubeStep rollup_step(DataStream{.header = pipeline.getHeader()}, std::move(transform_params)); + cube_step.transformPipeline(pipeline); + } } diff --git a/src/Processors/QueryPlan/CubeStep.cpp b/src/Processors/QueryPlan/CubeStep.cpp new file mode 100644 index 0000000000000000000000000000000000000000..3c8b2087fe0abb31c4ba526de09c27aa549d8b8b --- /dev/null +++ b/src/Processors/QueryPlan/CubeStep.cpp @@ -0,0 +1,27 @@ +#include +#include +#include + +namespace DB +{ + +CubeStep::CubeStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_) + : ITransformingStep(input_stream_, DataStream{.header = params_->getHeader()}) + , params(std::move(params_)) +{ +} + +void CubeStep::transformPipeline(QueryPipeline & pipeline) +{ + pipeline.resize(1); + + pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr + { + if (stream_type == QueryPipeline::StreamType::Totals) + return nullptr; + + return std::make_shared(header, std::move(params)); + }); +} + +} diff --git a/src/Processors/QueryPlan/CubeStep.h b/src/Processors/QueryPlan/CubeStep.h new file mode 100644 index 0000000000000000000000000000000000000000..c04f6a4f85442a3b24d1fb6f809333042b1d420f --- /dev/null +++ b/src/Processors/QueryPlan/CubeStep.h @@ -0,0 +1,24 @@ +#pragma once +#include +#include + +namespace DB +{ + +struct AggregatingTransformParams; +using AggregatingTransformParamsPtr = std::shared_ptr; + +class CubeStep : public ITransformingStep +{ +public: + CubeStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_); + + String getName() const override { return "Cube"; } + + void transformPipeline(QueryPipeline & pipeline) override; + +private: + AggregatingTransformParamsPtr params; +}; + +} diff --git a/src/Processors/QueryPlan/DistinctStep.h b/src/Processors/QueryPlan/DistinctStep.h index 638bd3debf75231d978582d80d7320f16e43f8a4..a70c66b2a4f98d7b42c4fa3ae461bfb403f70a5d 100644 --- a/src/Processors/QueryPlan/DistinctStep.h +++ b/src/Processors/QueryPlan/DistinctStep.h @@ -8,7 +8,7 @@ namespace DB class DistinctStep : public ITransformingStep { public: - explicit DistinctStep( + DistinctStep( const DataStream & input_stream_, const SizeLimits & set_size_limits_, UInt64 limit_hint_, diff --git a/src/Processors/QueryPlan/RollupStep.cpp b/src/Processors/QueryPlan/RollupStep.cpp new file mode 100644 index 0000000000000000000000000000000000000000..5045cb71cb20d46d25bed15306f03faf76c0b688 --- /dev/null +++ b/src/Processors/QueryPlan/RollupStep.cpp @@ -0,0 +1,27 @@ +#include +#include +#include + +namespace DB +{ + +RollupStep::RollupStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_) + : ITransformingStep(input_stream_, DataStream{.header = params_->getHeader()}) + , params(std::move(params_)) +{ +} + +void RollupStep::transformPipeline(QueryPipeline & pipeline) +{ + pipeline.resize(1); + + pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr + { + if (stream_type == QueryPipeline::StreamType::Totals) + return nullptr; + + return std::make_shared(header, std::move(params)); + }); +} + +} diff --git a/src/Processors/QueryPlan/RollupStep.h b/src/Processors/QueryPlan/RollupStep.h new file mode 100644 index 0000000000000000000000000000000000000000..56e8d81e37b94c1213ca5f369d661eaa857e1dba --- /dev/null +++ b/src/Processors/QueryPlan/RollupStep.h @@ -0,0 +1,24 @@ +#pragma once +#include +#include + +namespace DB +{ + +struct AggregatingTransformParams; +using AggregatingTransformParamsPtr = std::shared_ptr; + +class RollupStep : public ITransformingStep +{ +public: + RollupStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_); + + String getName() const override { return "Rollup"; } + + void transformPipeline(QueryPipeline & pipeline) override; + +private: + AggregatingTransformParamsPtr params; +}; + +} diff --git a/src/Processors/ya.make b/src/Processors/ya.make index b5be02082417f8a20e7552396dad8ee3a8b6ec3b..8cf8e4954c2db2a9423f83869adca32f060800ec 100644 --- a/src/Processors/ya.make +++ b/src/Processors/ya.make @@ -140,6 +140,7 @@ SRCS( QueryPlan/AddingDelayedStreamStep.cpp QueryPlan/AggregatingStep.cpp QueryPlan/CreatingSetsStep.cpp + QueryPlan/CubeStep.cpp QueryPlan/DistinctStep.cpp QueryPlan/ExpressionStep.cpp QueryPlan/FilterStep.cpp @@ -155,6 +156,7 @@ SRCS( QueryPlan/ReadFromPreparedSource.cpp QueryPlan/ReadFromStorageStep.cpp QueryPlan/ReadNothingStep.cpp + QueryPlan/RollupStep.cpp QueryPlan/TotalsHavingStep QueryPlan/QueryPlan.cpp )