From d41db3622fdf0c3abf64e89dd865f470e4c6c6c2 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 17 Jun 2020 20:15:24 +0300 Subject: [PATCH] Add RollupStep and CubeStep. --- src/Interpreters/InterpreterSelectQuery.cpp | 20 ++++++++------- src/Processors/QueryPlan/CubeStep.cpp | 27 +++++++++++++++++++++ src/Processors/QueryPlan/CubeStep.h | 24 ++++++++++++++++++ src/Processors/QueryPlan/DistinctStep.h | 2 +- src/Processors/QueryPlan/RollupStep.cpp | 27 +++++++++++++++++++++ src/Processors/QueryPlan/RollupStep.h | 24 ++++++++++++++++++ src/Processors/ya.make | 2 ++ 7 files changed, 116 insertions(+), 10 deletions(-) create mode 100644 src/Processors/QueryPlan/CubeStep.cpp create mode 100644 src/Processors/QueryPlan/CubeStep.h create mode 100644 src/Processors/QueryPlan/RollupStep.cpp create mode 100644 src/Processors/QueryPlan/RollupStep.h diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 9452fbeaa4..bec7d6fec1 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -90,6 +90,8 @@ #include #include #include +#include +#include namespace DB @@ -1540,16 +1542,16 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPipeline & pipeline, Modif auto transform_params = std::make_shared(params, true); - pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr + if (modificator == Modificator::ROLLUP) { - if (stream_type == QueryPipeline::StreamType::Totals) - return nullptr; - - if (modificator == Modificator::ROLLUP) - return std::make_shared(header, std::move(transform_params)); - else - return std::make_shared(header, std::move(transform_params)); - }); + RollupStep rollup_step(DataStream{.header = pipeline.getHeader()}, std::move(transform_params)); + rollup_step.transformPipeline(pipeline); + } + else + { + CubeStep rollup_step(DataStream{.header = pipeline.getHeader()}, std::move(transform_params)); + cube_step.transformPipeline(pipeline); + } } diff --git a/src/Processors/QueryPlan/CubeStep.cpp b/src/Processors/QueryPlan/CubeStep.cpp new file mode 100644 index 0000000000..3c8b2087fe --- /dev/null +++ b/src/Processors/QueryPlan/CubeStep.cpp @@ -0,0 +1,27 @@ +#include +#include +#include + +namespace DB +{ + +CubeStep::CubeStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_) + : ITransformingStep(input_stream_, DataStream{.header = params_->getHeader()}) + , params(std::move(params_)) +{ +} + +void CubeStep::transformPipeline(QueryPipeline & pipeline) +{ + pipeline.resize(1); + + pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr + { + if (stream_type == QueryPipeline::StreamType::Totals) + return nullptr; + + return std::make_shared(header, std::move(params)); + }); +} + +} diff --git a/src/Processors/QueryPlan/CubeStep.h b/src/Processors/QueryPlan/CubeStep.h new file mode 100644 index 0000000000..c04f6a4f85 --- /dev/null +++ b/src/Processors/QueryPlan/CubeStep.h @@ -0,0 +1,24 @@ +#pragma once +#include +#include + +namespace DB +{ + +struct AggregatingTransformParams; +using AggregatingTransformParamsPtr = std::shared_ptr; + +class CubeStep : public ITransformingStep +{ +public: + CubeStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_); + + String getName() const override { return "Cube"; } + + void transformPipeline(QueryPipeline & pipeline) override; + +private: + AggregatingTransformParamsPtr params; +}; + +} diff --git a/src/Processors/QueryPlan/DistinctStep.h b/src/Processors/QueryPlan/DistinctStep.h index 638bd3debf..a70c66b2a4 100644 --- a/src/Processors/QueryPlan/DistinctStep.h +++ b/src/Processors/QueryPlan/DistinctStep.h @@ -8,7 +8,7 @@ namespace DB class DistinctStep : public ITransformingStep { public: - explicit DistinctStep( + DistinctStep( const DataStream & input_stream_, const SizeLimits & set_size_limits_, UInt64 limit_hint_, diff --git a/src/Processors/QueryPlan/RollupStep.cpp b/src/Processors/QueryPlan/RollupStep.cpp new file mode 100644 index 0000000000..5045cb71cb --- /dev/null +++ b/src/Processors/QueryPlan/RollupStep.cpp @@ -0,0 +1,27 @@ +#include +#include +#include + +namespace DB +{ + +RollupStep::RollupStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_) + : ITransformingStep(input_stream_, DataStream{.header = params_->getHeader()}) + , params(std::move(params_)) +{ +} + +void RollupStep::transformPipeline(QueryPipeline & pipeline) +{ + pipeline.resize(1); + + pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr + { + if (stream_type == QueryPipeline::StreamType::Totals) + return nullptr; + + return std::make_shared(header, std::move(params)); + }); +} + +} diff --git a/src/Processors/QueryPlan/RollupStep.h b/src/Processors/QueryPlan/RollupStep.h new file mode 100644 index 0000000000..56e8d81e37 --- /dev/null +++ b/src/Processors/QueryPlan/RollupStep.h @@ -0,0 +1,24 @@ +#pragma once +#include +#include + +namespace DB +{ + +struct AggregatingTransformParams; +using AggregatingTransformParamsPtr = std::shared_ptr; + +class RollupStep : public ITransformingStep +{ +public: + RollupStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_); + + String getName() const override { return "Rollup"; } + + void transformPipeline(QueryPipeline & pipeline) override; + +private: + AggregatingTransformParamsPtr params; +}; + +} diff --git a/src/Processors/ya.make b/src/Processors/ya.make index b5be020824..8cf8e4954c 100644 --- a/src/Processors/ya.make +++ b/src/Processors/ya.make @@ -140,6 +140,7 @@ SRCS( QueryPlan/AddingDelayedStreamStep.cpp QueryPlan/AggregatingStep.cpp QueryPlan/CreatingSetsStep.cpp + QueryPlan/CubeStep.cpp QueryPlan/DistinctStep.cpp QueryPlan/ExpressionStep.cpp QueryPlan/FilterStep.cpp @@ -155,6 +156,7 @@ SRCS( QueryPlan/ReadFromPreparedSource.cpp QueryPlan/ReadFromStorageStep.cpp QueryPlan/ReadNothingStep.cpp + QueryPlan/RollupStep.cpp QueryPlan/TotalsHavingStep QueryPlan/QueryPlan.cpp ) -- GitLab