提交 d41db362 编写于 作者: N Nikolai Kochetov

Add RollupStep and CubeStep.

上级 c5d9379d
......@@ -90,6 +90,8 @@
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/QueryPlan/RollupStep.h>
#include <Processors/QueryPlan/CubeStep.h>
namespace DB
......@@ -1540,16 +1542,16 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPipeline & pipeline, Modif
auto transform_params = std::make_shared<AggregatingTransformParams>(params, true);
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
if (modificator == Modificator::ROLLUP)
{
if (stream_type == QueryPipeline::StreamType::Totals)
return nullptr;
if (modificator == Modificator::ROLLUP)
return std::make_shared<RollupTransform>(header, std::move(transform_params));
else
return std::make_shared<CubeTransform>(header, std::move(transform_params));
});
RollupStep rollup_step(DataStream{.header = pipeline.getHeader()}, std::move(transform_params));
rollup_step.transformPipeline(pipeline);
}
else
{
CubeStep rollup_step(DataStream{.header = pipeline.getHeader()}, std::move(transform_params));
cube_step.transformPipeline(pipeline);
}
}
......
#include <Processors/QueryPlan/CubeStep.h>
#include <Processors/Transforms/CubeTransform.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
CubeStep::CubeStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_)
: ITransformingStep(input_stream_, DataStream{.header = params_->getHeader()})
, params(std::move(params_))
{
}
void CubeStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.resize(1);
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{
if (stream_type == QueryPipeline::StreamType::Totals)
return nullptr;
return std::make_shared<CubeTransform>(header, std::move(params));
});
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <DataStreams/SizeLimits.h>
namespace DB
{
struct AggregatingTransformParams;
using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams>;
class CubeStep : public ITransformingStep
{
public:
CubeStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_);
String getName() const override { return "Cube"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
AggregatingTransformParamsPtr params;
};
}
......@@ -8,7 +8,7 @@ namespace DB
class DistinctStep : public ITransformingStep
{
public:
explicit DistinctStep(
DistinctStep(
const DataStream & input_stream_,
const SizeLimits & set_size_limits_,
UInt64 limit_hint_,
......
#include <Processors/QueryPlan/RollupStep.h>
#include <Processors/Transforms/RollupTransform.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
RollupStep::RollupStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_)
: ITransformingStep(input_stream_, DataStream{.header = params_->getHeader()})
, params(std::move(params_))
{
}
void RollupStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.resize(1);
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{
if (stream_type == QueryPipeline::StreamType::Totals)
return nullptr;
return std::make_shared<RollupTransform>(header, std::move(params));
});
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <DataStreams/SizeLimits.h>
namespace DB
{
struct AggregatingTransformParams;
using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams>;
class RollupStep : public ITransformingStep
{
public:
RollupStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_);
String getName() const override { return "Rollup"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
AggregatingTransformParamsPtr params;
};
}
......@@ -140,6 +140,7 @@ SRCS(
QueryPlan/AddingDelayedStreamStep.cpp
QueryPlan/AggregatingStep.cpp
QueryPlan/CreatingSetsStep.cpp
QueryPlan/CubeStep.cpp
QueryPlan/DistinctStep.cpp
QueryPlan/ExpressionStep.cpp
QueryPlan/FilterStep.cpp
......@@ -155,6 +156,7 @@ SRCS(
QueryPlan/ReadFromPreparedSource.cpp
QueryPlan/ReadFromStorageStep.cpp
QueryPlan/ReadNothingStep.cpp
QueryPlan/RollupStep.cpp
QueryPlan/TotalsHavingStep
QueryPlan/QueryPlan.cpp
)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册