From 71fab516f252acd72285bf040db2d2d087d57edb Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 6 Apr 2020 18:03:38 +0300 Subject: [PATCH] Fix AggregatingSorted for simple aggregate functions. --- .../Merges/AggregatingSortedTransform.cpp | 28 ++++++++++++++--- .../Merges/AggregatingSortedTransform.h | 4 +-- .../Merges/CollapsingSortedTransform.h | 5 +-- .../Merges/GraphiteRollupSortedTransform.h | 5 +-- .../Merges/ReplacingSortedTransform.h | 5 +-- src/Processors/Merges/RowRef.h | 31 ++++++++++++++++++- .../Merges/VersionedCollapsingTransform.h | 5 +-- 7 files changed, 67 insertions(+), 16 deletions(-) diff --git a/src/Processors/Merges/AggregatingSortedTransform.cpp b/src/Processors/Merges/AggregatingSortedTransform.cpp index 68f60aed8a..fe86d01b39 100644 --- a/src/Processors/Merges/AggregatingSortedTransform.cpp +++ b/src/Processors/Merges/AggregatingSortedTransform.cpp @@ -68,6 +68,24 @@ namespace return def; } + + MutableColumns getMergedColumns(const Block & header, const AggregatingSortedTransform::ColumnsDefinition & def) + { + MutableColumns columns; + columns.resize(header.columns()); + + for (auto & desc : def.columns_to_simple_aggregate) + { + auto & type = header.getByPosition(desc.column_number).type; + columns[desc.column_number] = recursiveRemoveLowCardinality(type)->createColumn(); + } + + for (size_t i = 0; i < columns.size(); ++i) + if (!columns[i]) + columns[i] = header.getByPosition(i).type->createColumn(); + + return columns; + } } AggregatingSortedTransform::AggregatingSortedTransform( @@ -75,7 +93,7 @@ AggregatingSortedTransform::AggregatingSortedTransform( SortDescription description_, size_t max_block_size) : IMergingTransform(num_inputs, header, header, true) , columns_definition(defineColumns(header, description_)) - , merged_data(header.cloneEmptyColumns(), false, max_block_size) + , merged_data(getMergedColumns(header, columns_definition), false, max_block_size) , description(std::move(description_)) , source_chunks(num_inputs) , cursors(num_inputs) @@ -106,7 +124,7 @@ void AggregatingSortedTransform::updateCursor(Chunk chunk, size_t source_num) column = column->convertToFullColumnIfConst(); for (auto & desc : columns_definition.columns_to_simple_aggregate) - if (desc.type_to_convert) + if (desc.inner_type) columns[desc.column_number] = recursiveRemoveLowCardinality(columns[desc.column_number]); chunk.setColumns(std::move(columns), num_rows); @@ -145,10 +163,10 @@ void AggregatingSortedTransform::work() for (auto & desc : columns_definition.columns_to_simple_aggregate) { - if (desc.type_to_convert) + if (desc.inner_type) { - auto & from_type = header.getByPosition(desc.column_number).type; - auto & to_type = desc.type_to_convert; + auto & from_type = desc.inner_type; + auto & to_type = header.getByPosition(desc.column_number).type; columns[desc.column_number] = recursiveTypeConversion(columns[desc.column_number], from_type, to_type); } } diff --git a/src/Processors/Merges/AggregatingSortedTransform.h b/src/Processors/Merges/AggregatingSortedTransform.h index bb950a6825..247d92d99e 100644 --- a/src/Processors/Merges/AggregatingSortedTransform.h +++ b/src/Processors/Merges/AggregatingSortedTransform.h @@ -129,13 +129,13 @@ public: size_t column_number = 0; IColumn * column = nullptr; - const DataTypePtr type_to_convert; + const DataTypePtr inner_type; AlignedBuffer state; bool created = false; SimpleAggregateDescription(AggregateFunctionPtr function_, const size_t column_number_, DataTypePtr type) - : function(std::move(function_)), column_number(column_number_), type_to_convert(std::move(type)) + : function(std::move(function_)), column_number(column_number_), inner_type(std::move(type)) { add_function = function->getAddressOfAddFunction(); state.reset(function->sizeOfData(), function->alignOfData()); diff --git a/src/Processors/Merges/CollapsingSortedTransform.h b/src/Processors/Merges/CollapsingSortedTransform.h index 46e3fb2e69..7e64d3253f 100644 --- a/src/Processors/Merges/CollapsingSortedTransform.h +++ b/src/Processors/Merges/CollapsingSortedTransform.h @@ -64,14 +64,15 @@ private: SortingHeap queue; bool is_queue_initialized = false; + /// Allocator must be destroyed after all RowRefs. + detail::SharedChunkAllocator chunk_allocator; + using RowRef = detail::RowRefWithOwnedChunk; static constexpr size_t max_row_refs = 4; /// first_negative, last_positive, last, current. RowRef first_negative_row; RowRef last_positive_row; RowRef last_row; - detail::SharedChunkAllocator chunk_allocator; - size_t count_positive = 0; /// The number of positive rows for the current primary key. size_t count_negative = 0; /// The number of negative rows for the current primary key. bool last_is_positive = false; /// true if the last row for the current primary key is positive. diff --git a/src/Processors/Merges/GraphiteRollupSortedTransform.h b/src/Processors/Merges/GraphiteRollupSortedTransform.h index 4dd394198a..ce9fb6e019 100644 --- a/src/Processors/Merges/GraphiteRollupSortedTransform.h +++ b/src/Processors/Merges/GraphiteRollupSortedTransform.h @@ -237,12 +237,13 @@ private: /// Path name of current bucket StringRef current_group_path; + /// Allocator must be destroyed after all RowRefs. + detail::SharedChunkAllocator chunk_allocator; + static constexpr size_t max_row_refs = 2; /// current_subgroup_newest_row, current_row. /// Last row with maximum version for current primary key (time bucket). RowRef current_subgroup_newest_row; - detail::SharedChunkAllocator chunk_allocator; - /// Time of last read row time_t current_time = 0; time_t current_time_rounded = 0; diff --git a/src/Processors/Merges/ReplacingSortedTransform.h b/src/Processors/Merges/ReplacingSortedTransform.h index ff447ee8da..ffbfe92be9 100644 --- a/src/Processors/Merges/ReplacingSortedTransform.h +++ b/src/Processors/Merges/ReplacingSortedTransform.h @@ -50,6 +50,9 @@ private: SortingHeap queue; bool is_queue_initialized = false; + /// Allocator must be destroyed after all RowRefs. + detail::SharedChunkAllocator chunk_allocator; + using RowRef = detail::RowRefWithOwnedChunk; static constexpr size_t max_row_refs = 3; /// last, current, selected. RowRef last_row; @@ -57,8 +60,6 @@ private: RowRef selected_row; /// Last row with maximum version for current primary key. size_t max_pos = 0; /// The position (into current_row_sources) of the row with the highest version. - detail::SharedChunkAllocator chunk_allocator; - /// Sources of rows with the current primary key. PODArray current_row_sources; diff --git a/src/Processors/Merges/RowRef.h b/src/Processors/Merges/RowRef.h index 48ce92dbdb..5aeae95206 100644 --- a/src/Processors/Merges/RowRef.h +++ b/src/Processors/Merges/RowRef.h @@ -3,6 +3,8 @@ #include #include #include +#include +#include #include @@ -51,6 +53,9 @@ class SharedChunkAllocator public: explicit SharedChunkAllocator(size_t max_chunks) { + if (max_chunks == 0) + max_chunks = 1; + chunks.resize(max_chunks); free_chunks.reserve(max_chunks); @@ -74,12 +79,36 @@ public: return SharedChunkPtr(&chunks[pos]); } + ~SharedChunkAllocator() + { + if (free_chunks.size() != chunks.size()) + { + LOG_ERROR(&Logger::get("SharedChunkAllocator"), + "SharedChunkAllocator was destroyed before RowRef was released. StackTrace: " + << StackTrace().toString()); + + return; + } + } + private: std::vector chunks; std::vector free_chunks; - void release(SharedChunk * ptr) + void release(SharedChunk * ptr) noexcept { + if (chunks.empty()) + { + /// This may happen if allocator was removed before chunks. + /// Log message and exit, because we don't want to throw exception in destructor. + + LOG_ERROR(&Logger::get("SharedChunkAllocator"), + "SharedChunkAllocator was destroyed before RowRef was released. StackTrace: " + << StackTrace().toString()); + + return; + } + /// Release memory. It is not obligatory. ptr->clear(); ptr->all_columns.clear(); diff --git a/src/Processors/Merges/VersionedCollapsingTransform.h b/src/Processors/Merges/VersionedCollapsingTransform.h index 04a1814207..214fb3b2f0 100644 --- a/src/Processors/Merges/VersionedCollapsingTransform.h +++ b/src/Processors/Merges/VersionedCollapsingTransform.h @@ -53,14 +53,15 @@ private: SortingHeap queue; bool is_queue_initialized = false; + /// Allocator must be destroyed after all RowRefs. + detail::SharedChunkAllocator chunk_allocator; + using RowRef = detail::RowRefWithOwnedChunk; const size_t max_rows_in_queue; /// Rows with the same primary key and sign. FixedSizeDequeWithGaps current_keys; Int8 sign_in_queue = 0; - detail::SharedChunkAllocator chunk_allocator; - std::queue current_row_sources; /// Sources of rows with the current primary key void insertGap(size_t gap_size); -- GitLab