diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index ab64d56a2453a4ea97b7e336d36244e1aa335ec8..50e64dbd4361f31089198052acaf3ead65a093b8 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -80,6 +80,7 @@ #include #include #include +#include namespace DB @@ -1704,17 +1705,15 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputOrderIn partial_sorting.transformPipeline(pipeline); /// Merge the sorted blocks. - pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr - { - if (stream_type == QueryPipeline::StreamType::Totals) - return nullptr; + MergeSortingStep merge_sorting_step( + DataStream{.header = pipeline.getHeader()}, + output_order_descr, settings.max_block_size, limit, + settings.max_bytes_before_remerge_sort / pipeline.getNumStreams(), + settings.max_bytes_before_external_sort, context->getTemporaryVolume(), + settings.min_free_disk_space_for_temporary_data); - return std::make_shared( - header, output_order_descr, settings.max_block_size, limit, - settings.max_bytes_before_remerge_sort / pipeline.getNumStreams(), - settings.max_bytes_before_external_sort, context->getTemporaryVolume(), - settings.min_free_disk_space_for_temporary_data); - }); + merge_sorting_step.setStepDescription("Merge sorted blocks before ORDER BY"); + merge_sorting_step.transformPipeline(pipeline); /// If there are several streams, we merge them into one executeMergeSorted(pipeline, output_order_descr, limit); diff --git a/src/Processors/QueryPlan/MergeSortingStep.cpp b/src/Processors/QueryPlan/MergeSortingStep.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d2a3f01b060987bf15c881d0485a0dc10ea43035 --- /dev/null +++ b/src/Processors/QueryPlan/MergeSortingStep.cpp @@ -0,0 +1,43 @@ +#include +#include +#include + +namespace DB +{ + +MergeSortingStep::MergeSortingStep( + const DataStream & input_stream, + const SortDescription & description_, + size_t max_merged_block_size_, + UInt64 limit_, + size_t max_bytes_before_remerge_, + size_t max_bytes_before_external_sort_, + VolumePtr tmp_volume_, + size_t min_free_disk_space_) + : ITransformingStep(input_stream, input_stream) + , description(description_) + , max_merged_block_size(max_merged_block_size_) + , limit(limit_) + , max_bytes_before_remerge(max_bytes_before_remerge_) + , max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_volume(tmp_volume_) + , min_free_disk_space(min_free_disk_space_) +{ +} + +void MergeSortingStep::transformPipeline(QueryPipeline & pipeline) +{ + pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr + { + if (stream_type == QueryPipeline::StreamType::Totals) + return nullptr; + + return std::make_shared( + header, description, max_merged_block_size, limit, + max_bytes_before_remerge, + max_bytes_before_external_sort, + tmp_volume, + min_free_disk_space); + }); +} + +} diff --git a/src/Processors/QueryPlan/MergeSortingStep.h b/src/Processors/QueryPlan/MergeSortingStep.h new file mode 100644 index 0000000000000000000000000000000000000000..3d12bda31392af1b1d4738e6f1addbeb4aa56277 --- /dev/null +++ b/src/Processors/QueryPlan/MergeSortingStep.h @@ -0,0 +1,39 @@ +#pragma once +#include +#include +#include +#include + +namespace DB +{ + +class MergeSortingStep : public ITransformingStep +{ +public: + explicit MergeSortingStep( + const DataStream & input_stream, + const SortDescription & description_, + size_t max_merged_block_size_, + UInt64 limit_, + size_t max_bytes_before_remerge_, + size_t max_bytes_before_external_sort_, + VolumePtr tmp_volume_, + size_t min_free_disk_space_); + + String getName() const override { return "MergeSorting"; } + + void transformPipeline(QueryPipeline & pipeline) override; + +private: + SortDescription description; + size_t max_merged_block_size; + UInt64 limit; + + size_t max_bytes_before_remerge; + size_t max_bytes_before_external_sort; + VolumePtr tmp_volume; + size_t min_free_disk_space; +}; + +} + diff --git a/src/Processors/ya.make b/src/Processors/ya.make index cda984253e7ea849f7ba5f1055ba83bd712b439b..d5e474947959dbe77429e3548042034148d29619 100644 --- a/src/Processors/ya.make +++ b/src/Processors/ya.make @@ -142,6 +142,7 @@ SRCS( QueryPlan/ISourceStep.cpp QueryPlan/ITransformingStep.cpp QueryPlan/IQueryPlanStep.cpp + QueryPlan/MergeSortingStep.cpp QueryPlan/PartialSortingStep.cpp QueryPlan/ReadFromStorageStep.cpp QueryPlan/ReadNothingStep.cpp