diff --git a/dbms/src/Processors/ISimpleTransform.cpp b/dbms/src/Processors/ISimpleTransform.cpp index 5ff50a9f8ac8d85185a81f20509644cb960bf4d8..e14ec13f4c132ee63a463333edf0de0e15abd1a2 100644 --- a/dbms/src/Processors/ISimpleTransform.cpp +++ b/dbms/src/Processors/ISimpleTransform.cpp @@ -52,6 +52,9 @@ ISimpleTransform::Status ISimpleTransform::prepare() current_chunk = input.pull(); has_input = true; + + if (set_input_not_needed_after_read) + input.setNotNeeded(); } /// Now transform. diff --git a/dbms/src/Processors/ISimpleTransform.h b/dbms/src/Processors/ISimpleTransform.h index 083ede7821958946dca1d7dfdb4e8e4b3a2c33c3..7470146415ccd51fe1fb905f4dac00019e860026 100644 --- a/dbms/src/Processors/ISimpleTransform.h +++ b/dbms/src/Processors/ISimpleTransform.h @@ -20,6 +20,11 @@ protected: bool transformed = false; const bool skip_empty_chunks; + /// Set input port NotNeeded after chunk was pulled. + /// Input port will become needed again only after data was transformed. + /// This allows to escape caching chunks in input port, which can lead to uneven data distribution. + bool set_input_not_needed_after_read = false; + virtual void transform(Chunk & chunk) = 0; public: @@ -30,6 +35,8 @@ public: InputPort & getInputPort() { return input; } OutputPort & getOutputPort() { return output; } + + void setInputNotNeededAfterRead(bool value) { set_input_not_needed_after_read = value; } }; } diff --git a/dbms/src/Processors/Transforms/MergeSortingTransform.h b/dbms/src/Processors/Transforms/MergeSortingTransform.h index 63df63248003c80800ff779770cbc6bf4ddf2df2..4b197df77b1a748950073871f3eeb020d1ce81bf 100644 --- a/dbms/src/Processors/Transforms/MergeSortingTransform.h +++ b/dbms/src/Processors/Transforms/MergeSortingTransform.h @@ -26,7 +26,7 @@ public: size_t max_bytes_before_remerge_, size_t max_bytes_before_external_sort_, const std::string & tmp_path_); - ~MergeSortingTransform(); + ~MergeSortingTransform() override; String getName() const override { return "MergeSortingTransform"; } diff --git a/dbms/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.cpp b/dbms/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ab8481aaa620809d2e81d2080cde9c5463115e84 --- /dev/null +++ b/dbms/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.cpp @@ -0,0 +1,433 @@ +#include + +#include +#include +#include + +namespace DB +{ + +struct ChunksToMerge : public ChunkInfo +{ + std::unique_ptr chunks; + Int32 bucket_num = -1; + bool is_overflows = false; +}; + +GroupingAggregatedTransform::GroupingAggregatedTransform( + const Block & header, size_t num_inputs, AggregatingTransformParamsPtr params) + : IProcessor(InputPorts(num_inputs, header), {header}) + , num_inputs(num_inputs) + , params(std::move(params)) + , last_bucket_number(num_inputs, -1) + , read_from_input(num_inputs, false) +{ +} + +void GroupingAggregatedTransform::readFromAllInputs() +{ + auto in = inputs.begin(); + for (size_t i = 0; i < num_inputs; ++i, ++in) + { + if (in->isFinished()) + continue; + + if (read_from_input[i]) + continue; + + in->setNeeded(); + + if (!in->hasData()) + return; + + auto chunk = in->pull(); + read_from_input[i] = true; + addChunk(std::move(chunk), i); + } + + read_from_all_inputs = true; +} + +void GroupingAggregatedTransform::pushData(Chunks chunks, Int32 bucket, bool is_overflows) +{ + auto & output = outputs.front(); + + auto info = std::make_shared(); + info->bucket_num = bucket; + info->is_overflows = is_overflows; + info->chunks = std::make_unique(std::move(chunks)); + + Chunk chunk; + chunk.setChunkInfo(std::move(info)); + output.push(std::move(chunk)); +} + +bool GroupingAggregatedTransform::tryPushTwoLevelData() +{ + auto try_push_by_iter = [&](auto batch_it) + { + if (batch_it == chunks.end()) + return false; + + Chunks & cur_chunks = batch_it->second; + if (cur_chunks.empty()) + { + chunks.erase(batch_it); + return false; + } + + pushData(std::move(cur_chunks), current_bucket, false); + chunks.erase(batch_it); + return true; + }; + + if (all_inputs_finished) + { + /// Chunks are sorted by bucket. + while (!chunks.empty()) + if (try_push_by_iter(chunks.begin())) + return true; + } + else + { + for (; next_bucket_to_push < current_bucket; ++next_bucket_to_push) + if (try_push_by_iter(chunks.find(next_bucket_to_push))) + return true; + } + + return false; +} + +bool GroupingAggregatedTransform::tryPushSingleLevelData() +{ + if (single_level_chunks.empty()) + return false; + + pushData(single_level_chunks, -1, false); + return true; +} + +bool GroupingAggregatedTransform::tryPushOverflowData() +{ + if (overflow_chunks.empty()) + return false; + + pushData(overflow_chunks, -1, true); + return true; +} + +IProcessor::Status GroupingAggregatedTransform::prepare() +{ + /// Check can output. + auto & output = outputs.front(); + + if (output.isFinished()) + { + for (auto & input : inputs) + input.close(); + + chunks.clear(); + last_bucket_number.clear(); + return Status::Finished; + } + + /// Read first time from each input to understand if we have two-level aggregation. + if (!read_from_all_inputs) + { + readFromAllInputs(); + if (!read_from_all_inputs) + return Status::NeedData; + } + + /// Convert single level to two levels if have two-level input. + if (has_two_level && !single_level_chunks.empty()) + return Status::Ready; + + /// Check can push (to avoid data caching). + if (!output.canPush()) + { + for (auto & input : inputs) + input.setNotNeeded(); + + return Status::PortFull; + } + + bool pushed_to_output = false; + + /// Output if has data. + if (has_two_level) + pushed_to_output = tryPushTwoLevelData(); + + auto need_input = [this](size_t input_num) + { + if (last_bucket_number[input_num] < current_bucket) + return true; + + return expect_several_chunks_for_single_bucket_per_source && last_bucket_number[input_num] == current_bucket; + }; + + /// Read next bucket if can. + for (; ; ++current_bucket) + { + bool finished = true; + bool need_data = false; + + auto in = inputs.begin(); + for (size_t input_num = 0; input_num < num_inputs; ++input_num, ++in) + { + if (in->isFinished()) + continue; + + finished = false; + + if (!need_input(input_num)) + continue; + + in->setNeeded(); + + if (!in->hasData()) + { + need_data = true; + continue; + } + + auto chunk = in->pull(); + addChunk(std::move(chunk), input_num); + + if (has_two_level && !single_level_chunks.empty()) + return Status::Ready; + + if (need_input(input_num)) + need_data = true; + } + + if (finished) + { + all_inputs_finished = true; + break; + } + + if (need_data) + return Status::NeedData; + } + + if (pushed_to_output) + return Status::PortFull; + + if (has_two_level) + { + if (tryPushTwoLevelData()) + return Status::PortFull; + + /// Sanity check. If new bucket was read, we should be able to push it. + if (!all_inputs_finished) + throw Exception("GroupingAggregatedTransform has read new two-level bucket, but couldn't push it.", + ErrorCodes::LOGICAL_ERROR); + } + else + { + if (!all_inputs_finished) + throw Exception("GroupingAggregatedTransform should have read all chunks for single level aggregation, " + "but not all of the inputs are finished.", ErrorCodes::LOGICAL_ERROR); + + if (tryPushSingleLevelData()) + return Status::PortFull; + } + + /// If we haven't pushed to output, then all data was read. Push overflows if have. + if (tryPushOverflowData()) + return Status::PortFull; + + output.finish(); + return Status::Finished; +} + +void GroupingAggregatedTransform::addChunk(Chunk chunk, size_t input) +{ + auto & info = chunk.getChunkInfo(); + if (!info) + throw Exception("Chunk info was not set for chunk in GroupingAggregatedTransform.", ErrorCodes::LOGICAL_ERROR); + + auto * agg_info = typeid_cast(info.get()); + if (!agg_info) + throw Exception("Chunk should have AggregatedChunkInfo in GroupingAggregatedTransform.", ErrorCodes::LOGICAL_ERROR); + + Int32 bucket = agg_info->bucket_num; + bool is_overflows = agg_info->is_overflows; + + if (is_overflows) + overflow_chunks.emplace_back(std::move(chunk)); + else if (bucket < 0) + single_level_chunks.emplace_back(std::move(chunk)); + else + { + chunks[bucket].emplace_back(std::move(chunk)); + has_two_level = true; + last_bucket_number[input] = bucket; + } +} + +void GroupingAggregatedTransform::work() +{ + if (!single_level_chunks.empty()) + { + auto & header = getOutputs().front().getHeader(); + auto block = header.cloneWithColumns(single_level_chunks.back().detachColumns()); + single_level_chunks.pop_back(); + auto blocks = params->aggregator.convertBlockToTwoLevel(block); + + for (auto & cur_block : blocks) + { + Int32 bucket = cur_block.info.bucket_num; + chunks[bucket].emplace_back(Chunk(cur_block.getColumns(), cur_block.rows())); + } + } +} + + +MergingAggregatedBucketTransform::MergingAggregatedBucketTransform(AggregatingTransformParamsPtr params) + : ISimpleTransform({}, params->getHeader(), false), params(std::move(params)) +{ + setInputNotNeededAfterRead(true); +} + +void MergingAggregatedBucketTransform::transform(Chunk & chunk) +{ + auto & info = chunk.getChunkInfo(); + auto * chunks_to_merge = typeid_cast(info.get()); + + if (!chunks_to_merge) + throw Exception("MergingAggregatedSimpleTransform chunk must have ChunkInfo with type ChunksToMerge.", + ErrorCodes::LOGICAL_ERROR); + + BlocksList blocks_list; + for (auto & cur_chunk : *chunks_to_merge->chunks) + blocks_list.emplace_back(getInputPort().getHeader().cloneWithColumns(cur_chunk.detachColumns())); + + chunk.setChunkInfo(nullptr); + + auto block = params->aggregator.mergeBlocks(blocks_list, params->final); + size_t num_rows = block.rows(); + chunk.setColumns(block.getColumns(), num_rows); +} + + +SortingAggregatedTransform::SortingAggregatedTransform(size_t num_inputs, AggregatingTransformParamsPtr params) + : IProcessor(InputPorts(num_inputs, params->getHeader()), {params->getHeader()}) + , num_inputs(num_inputs) + , params(std::move(params)) + , last_bucket_number(num_inputs, -1) +{ +} + +bool SortingAggregatedTransform::tryPushChunk() +{ + auto & output = outputs.front(); + + UInt32 min_bucket = last_bucket_number[0]; + for (auto & bucket : last_bucket_number) + min_bucket = std::min(min_bucket, bucket); + + auto it = chunks.find(min_bucket); + if (it != chunks.end()) + { + output.push(std::move(it->second)); + return true; + } + + return false; +} + +void SortingAggregatedTransform::addChunk(Chunk chunk) +{ + auto & info = chunk.getChunkInfo(); + if (!info) + throw Exception("Chunk info was not set for chunk in GroupingAggregatedTransform.", ErrorCodes::LOGICAL_ERROR); + + auto * agg_info = typeid_cast(info.get()); + if (!agg_info) + throw Exception("Chunk should have AggregatedChunkInfo in GroupingAggregatedTransform.", ErrorCodes::LOGICAL_ERROR); + + Int32 bucket = agg_info->bucket_num; + bool is_overflows = agg_info->is_overflows; + + if (is_overflows) + overflow_chunk = std::move(chunk); + else + chunks[bucket] = std::move(chunk); +} + +IProcessor::Status SortingAggregatedTransform::prepare() +{ + /// Check can output. + auto & output = outputs.front(); + + if (output.isFinished()) + { + for (auto & input : inputs) + input.close(); + + chunks.clear(); + last_bucket_number.clear(); + return Status::Finished; + } + + /// Check can push (to avoid data caching). + if (!output.canPush()) + { + for (auto & input : inputs) + input.setNotNeeded(); + + return Status::PortFull; + } + + /// Push if have min version. + bool pushed_to_output = tryPushChunk(); + + bool need_data = false; + bool all_finished = true; + + /// Try read anything. + auto in = inputs.begin(); + for (size_t input_num = 0; input_num < num_inputs; ++input_num, ++in) + { + if (in->isFinished()) + continue; + + all_finished = false; + + in->setNeeded(); + + if (!in->hasData()) + { + need_data = true; + continue; + } + + auto chunk = in->pull(); + addChunk(std::move(chunk)); + } + + if (pushed_to_output) + return Status::PortFull; + + if (tryPushChunk()) + return Status::PortFull; + + if (need_data) + return Status::NeedData; + + if (!all_finished) + throw Exception("SortingAggregatedTransform has read bucket, but couldn't push it.", + ErrorCodes::LOGICAL_ERROR); + + if (overflow_chunk) + { + output.push(std::move(overflow_chunk)); + return Status::PortFull; + } + + return Status::Finished; +} + +} diff --git a/dbms/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h b/dbms/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h new file mode 100644 index 0000000000000000000000000000000000000000..d83da2a6dd96c96ae4dff729b3808f4f681df54a --- /dev/null +++ b/dbms/src/Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h @@ -0,0 +1,134 @@ +#include +#include +#include +#include +#include + + +namespace DB +{ + +/// Has several inputs and single output. +/// Read from inputs chunks with partially aggregated data, group them by bucket number +/// and write data from single bucket as single chunk. +class GroupingAggregatedTransform : public IProcessor +{ +public: + GroupingAggregatedTransform(const Block & header, size_t num_inputs, AggregatingTransformParamsPtr params); + + /// Special setting: in case if single source can return several chunks with same bucket. + void allowSeveralChunksForSingleBucketPerSource() { expect_several_chunks_for_single_bucket_per_source = true; } + +protected: + Status prepare() override; + void work() override; + +private: + size_t num_inputs; + AggregatingTransformParamsPtr params; + + std::vector last_bucket_number; + std::map chunks; + Chunks overflow_chunks; + Chunks single_level_chunks; + Int32 current_bucket = 0; + Int32 next_bucket_to_push = 0; /// Always <= current_bucket. + bool has_two_level = false; + + bool all_inputs_finished = false; + bool read_from_all_inputs = false; + std::vector read_from_input; + + bool expect_several_chunks_for_single_bucket_per_source = false; + + void addChunk(Chunk chunk, size_t input); + void readFromAllInputs(); + bool tryPushSingleLevelData(); + bool tryPushTwoLevelData(); + bool tryPushOverflowData(); + void pushData(Chunks chunks, Int32 bucket, bool is_overflows); +}; + +/// Merge aggregated data from single bucket. +class MergingAggregatedBucketTransform : public ISimpleTransform +{ +public: + explicit MergingAggregatedBucketTransform(AggregatingTransformParamsPtr params); + +protected: + void transform(Chunk & chunk) override; + +private: + AggregatingTransformParamsPtr params; +}; + +/// Has several inputs and single output. +/// Read from inputs merged bucket with aggregated data, sort them by bucket number and write to output. +/// Presumption: inputs return chunks with increasing bucket number, there is at most one chunk per bucket. +class SortingAggregatedTransform : public IProcessor +{ +public: + SortingAggregatedTransform(size_t num_inputs, AggregatingTransformParamsPtr params); + Status prepare() override; + +private: + size_t num_inputs; + AggregatingTransformParamsPtr params; + std::vector last_bucket_number; + std::map chunks; + Chunk overflow_chunk; + + bool tryPushChunk(); + void addChunk(Chunk chunk); +}; + +/// Creates piece of pipeline which performs memory efficient merging of partially aggregated data from several sources. +/// First processor will have num_inputs, last - single output. You should connect them to create pipeline. +Processors createMergingAggregatedMemoryEfficientPipe( + Block header, + AggregatingTransformParamsPtr params, + size_t num_inputs, + size_t num_merging_processors) +{ + Processors processors; + processors.reserve(num_merging_processors + 2); + + auto grouping = std::make_shared(header, num_inputs, params); + processors.emplace_back(std::move(grouping)); + + if (num_merging_processors <= 1) + { + /// --> GroupingAggregated --> MergingAggregatedBucket --> + auto transform = std::make_shared(params); + connect(processors.back()->getOutputs().front(), transform->getInputPort()); + + processors.emplace_back(std::move(transform)); + return processors; + } + + /// --> --> MergingAggregatedBucket --> + /// --> GroupingAggregated --> ResizeProcessor --> MergingAggregatedBucket --> SortingAggregated --> + /// --> --> MergingAggregatedBucket --> + + auto resize = std::make_shared(header, 1, num_merging_processors); + connect(processors.back()->getOutputs().front(), resize->getInputs().front()); + processors.emplace_back(std::move(resize)); + + auto sorting = std::make_shared(num_merging_processors, params); + auto out = processors.back()->getOutputs().begin(); + auto in = sorting->getInputs().begin(); + + for (size_t i = 0; i < num_merging_processors; ++i, ++in, ++out) + { + auto transform = std::make_shared(params); + connect(*out, transform->getInputPort()); + connect(transform->getOutputPort(), *in); + processors.emplace_back(std::move(transform)); + } + + processors.emplace_back(std::move(sorting)); + return processors; +} + +} +