提交 603bcdde 编写于 作者: N Nikolai Kochetov

Add DistinctStep.

上级 cfe87a77
......@@ -82,6 +82,7 @@
#include <Processors/QueryPlan/PartialSortingStep.h>
#include <Processors/QueryPlan/MergeSortingStep.h>
#include <Processors/QueryPlan/MergingSortedStep.h>
#include <Processors/QueryPlan/DistinctStep.h>
namespace DB
......@@ -1769,13 +1770,11 @@ void InterpreterSelectQuery::executeDistinct(QueryPipeline & pipeline, bool befo
SizeLimits limits(settings.max_rows_in_distinct, settings.max_bytes_in_distinct, settings.distinct_overflow_mode);
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{
if (stream_type == QueryPipeline::StreamType::Totals)
return nullptr;
DistinctStep distinct_step(
DataStream{.header = pipeline.getHeader()},
limits, limit_for_distinct, columns);
return std::make_shared<DistinctTransform>(header, limits, limit_for_distinct, columns);
});
distinct_step.transformPipeline(pipeline);
}
}
......
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/Transforms/DistinctTransform.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
DistinctStep::DistinctStep(
const DataStream & input_stream_,
const SizeLimits & set_size_limits_,
UInt64 limit_hint_,
const Names & columns_)
: ITransformingStep(input_stream_, input_stream_)
, set_size_limits(set_size_limits_)
, limit_hint(limit_hint_)
, columns(columns_)
{
}
void DistinctStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipeline::StreamType::Main)
return nullptr;
return std::make_shared<DistinctTransform>(header, set_size_limits, limit_hint, columns);
});
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <DataStreams/SizeLimits.h>
namespace DB
{
class DistinctStep : public ITransformingStep
{
public:
explicit DistinctStep(
const DataStream & input_stream_,
const SizeLimits & set_size_limits_,
UInt64 limit_hint_,
const Names & columns_);
String getName() const override { return "Distinct"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
SizeLimits set_size_limits;
UInt64 limit_hint;
Names columns;
};
}
......@@ -6,7 +6,7 @@ namespace DB
{
MergingSortedStep::MergingSortedStep(
DataStream input_stream,
const DataStream & input_stream,
SortDescription sort_description_,
size_t max_block_size_,
UInt64 limit_)
......
......@@ -11,7 +11,7 @@ class MergingSortedStep : public ITransformingStep
{
public:
explicit MergingSortedStep(
DataStream input_stream,
const DataStream & input_stream,
SortDescription sort_description_,
size_t max_block_size_,
UInt64 limit_ = 0);
......
......@@ -137,6 +137,7 @@ SRCS(
Transforms/SortingTransform.cpp
Transforms/TotalsHavingTransform.cpp
Transforms/AggregatingInOrderTransform.cpp
QueryPlan/DistinctTransform.cpp
QueryPlan/ExpressionStep.cpp
QueryPlan/FilterStep.cpp
QueryPlan/ISourceStep.cpp
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册