提交 f5631939 编写于 作者: N Nikolai Kochetov

Add MergeSortingStep.

上级 815ac038
......@@ -80,6 +80,7 @@
#include <Processors/QueryPlan/ReadNothingStep.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPlan/PartialSortingStep.h>
#include <Processors/QueryPlan/MergeSortingStep.h>
namespace DB
......@@ -1704,17 +1705,15 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputOrderIn
partial_sorting.transformPipeline(pipeline);
/// Merge the sorted blocks.
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{
if (stream_type == QueryPipeline::StreamType::Totals)
return nullptr;
MergeSortingStep merge_sorting_step(
DataStream{.header = pipeline.getHeader()},
output_order_descr, settings.max_block_size, limit,
settings.max_bytes_before_remerge_sort / pipeline.getNumStreams(),
settings.max_bytes_before_external_sort, context->getTemporaryVolume(),
settings.min_free_disk_space_for_temporary_data);
return std::make_shared<MergeSortingTransform>(
header, output_order_descr, settings.max_block_size, limit,
settings.max_bytes_before_remerge_sort / pipeline.getNumStreams(),
settings.max_bytes_before_external_sort, context->getTemporaryVolume(),
settings.min_free_disk_space_for_temporary_data);
});
merge_sorting_step.setStepDescription("Merge sorted blocks before ORDER BY");
merge_sorting_step.transformPipeline(pipeline);
/// If there are several streams, we merge them into one
executeMergeSorted(pipeline, output_order_descr, limit);
......
#include <Processors/QueryPlan/MergeSortingStep.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/MergeSortingTransform.h>
namespace DB
{
MergeSortingStep::MergeSortingStep(
const DataStream & input_stream,
const SortDescription & description_,
size_t max_merged_block_size_,
UInt64 limit_,
size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_,
VolumePtr tmp_volume_,
size_t min_free_disk_space_)
: ITransformingStep(input_stream, input_stream)
, description(description_)
, max_merged_block_size(max_merged_block_size_)
, limit(limit_)
, max_bytes_before_remerge(max_bytes_before_remerge_)
, max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_volume(tmp_volume_)
, min_free_disk_space(min_free_disk_space_)
{
}
void MergeSortingStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{
if (stream_type == QueryPipeline::StreamType::Totals)
return nullptr;
return std::make_shared<MergeSortingTransform>(
header, description, max_merged_block_size, limit,
max_bytes_before_remerge,
max_bytes_before_external_sort,
tmp_volume,
min_free_disk_space);
});
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Core/SortDescription.h>
#include <DataStreams/SizeLimits.h>
#include <Disks/IVolume.h>
namespace DB
{
class MergeSortingStep : public ITransformingStep
{
public:
explicit MergeSortingStep(
const DataStream & input_stream,
const SortDescription & description_,
size_t max_merged_block_size_,
UInt64 limit_,
size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_,
VolumePtr tmp_volume_,
size_t min_free_disk_space_);
String getName() const override { return "MergeSorting"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
SortDescription description;
size_t max_merged_block_size;
UInt64 limit;
size_t max_bytes_before_remerge;
size_t max_bytes_before_external_sort;
VolumePtr tmp_volume;
size_t min_free_disk_space;
};
}
......@@ -142,6 +142,7 @@ SRCS(
QueryPlan/ISourceStep.cpp
QueryPlan/ITransformingStep.cpp
QueryPlan/IQueryPlanStep.cpp
QueryPlan/MergeSortingStep.cpp
QueryPlan/PartialSortingStep.cpp
QueryPlan/ReadFromStorageStep.cpp
QueryPlan/ReadNothingStep.cpp
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册