From 19dadb8c2d7afd357b228a836edb646fa84fc4f9 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 22 Apr 2020 16:52:07 +0300 Subject: [PATCH] Add parallel final. --- src/Common/WeakHash.h | 2 + src/Core/Settings.h | 2 +- src/Processors/Chunk.cpp | 2 +- src/Processors/ISimpleTransform.cpp | 46 +++-- src/Processors/ISimpleTransform.h | 24 ++- .../Algorithms/AggregatingSortedAlgorithm.cpp | 4 +- .../Algorithms/AggregatingSortedAlgorithm.h | 2 +- .../Algorithms/CollapsingSortedAlgorithm.cpp | 4 +- .../Algorithms/CollapsingSortedAlgorithm.h | 2 + .../Merges/Algorithms/IMergingAlgorithm.h | 2 +- .../IMergingAlgorithmWithDelayedChunk.cpp | 6 +- .../IMergingAlgorithmWithDelayedChunk.h | 2 +- .../IMergingAlgorithmWithSharedChunks.cpp | 6 +- .../IMergingAlgorithmWithSharedChunks.h | 2 +- .../Algorithms/MergingSortedAlgorithm.cpp | 4 +- .../Algorithms/MergingSortedAlgorithm.h | 2 +- src/Processors/Merges/Algorithms/RowRef.h | 10 +- .../Algorithms/SummingSortedAlgorithm.cpp | 4 +- .../Algorithms/SummingSortedAlgorithm.h | 2 +- .../Merges/CollapsingSortedTransform.h | 2 + src/Processors/Merges/IMergingTransform.cpp | 87 ++++++++- src/Processors/Merges/IMergingTransform.h | 24 ++- src/Processors/Pipe.cpp | 9 + src/Processors/Pipe.h | 5 + .../Transforms/AddingSelectorTransform.cpp | 76 ++++++++ .../Transforms/AddingSelectorTransform.h | 26 +++ src/Processors/Transforms/CopyTransform.cpp | 108 +++++++++++ src/Processors/Transforms/CopyTransform.h | 28 +++ src/Processors/Transforms/SelectorInfo.h | 14 ++ src/Processors/ya.make | 2 + .../MergeTree/MergeTreeDataMergerMutator.cpp | 2 +- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 178 ++++++++++++------ .../MergeTree/MergeTreeDataSelectExecutor.h | 1 + tests/performance/parallel_final.xml | 51 +++++ ...00191_aggregating_merge_tree_and_final.sql | 4 +- .../00564_versioned_collapsing_merge_tree.sql | 48 ++--- .../00915_simple_aggregate_function.sql | 6 +- ...030_incorrect_count_summing_merge_tree.sql | 8 +- 38 files changed, 651 insertions(+), 156 deletions(-) create mode 100644 src/Processors/Transforms/AddingSelectorTransform.cpp create mode 100644 src/Processors/Transforms/AddingSelectorTransform.h create mode 100644 src/Processors/Transforms/CopyTransform.cpp create mode 100644 src/Processors/Transforms/CopyTransform.h create mode 100644 src/Processors/Transforms/SelectorInfo.h create mode 100644 tests/performance/parallel_final.xml diff --git a/src/Common/WeakHash.h b/src/Common/WeakHash.h index 87ddec4def..bfea75eddf 100644 --- a/src/Common/WeakHash.h +++ b/src/Common/WeakHash.h @@ -17,6 +17,8 @@ public: explicit WeakHash32(size_t size) : data(size, ~UInt32(0)) {} WeakHash32(const WeakHash32 & other) { data.assign(other.data); } + void reset(size_t size) { data.assign(size, ~UInt32(0)); } + const Container & getData() const { return data; } Container & getData() { return data; } diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 61fcf658ba..1723915205 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -54,6 +54,7 @@ struct Settings : public SettingsCollection M(SettingUInt64, min_insert_block_size_bytes, (DEFAULT_INSERT_BLOCK_SIZE * 256), "Squash blocks passed to INSERT query to specified size in bytes, if blocks are not big enough.", 0) \ M(SettingUInt64, max_joined_block_size_rows, DEFAULT_BLOCK_SIZE, "Maximum block size for JOIN result (if join algorithm supports it). 0 means unlimited.", 0) \ M(SettingUInt64, max_insert_threads, 0, "The maximum number of threads to execute the INSERT SELECT query. Values 0 or 1 means that INSERT SELECT is not run in parallel. Higher values will lead to higher memory usage. Parallel INSERT SELECT has effect only if the SELECT part is run on parallel, see 'max_threads' setting.", 0) \ + M(SettingUInt64, max_final_threads, 16, "The maximum number of threads to read from table with FINAL.", 0) \ M(SettingMaxThreads, max_threads, 0, "The maximum number of threads to execute the request. By default, it is determined automatically.", 0) \ M(SettingMaxThreads, max_alter_threads, 0, "The maximum number of threads to execute the ALTER requests. By default, it is determined automatically.", 0) \ M(SettingUInt64, max_read_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, "The maximum size of the buffer to read from the filesystem.", 0) \ @@ -429,7 +430,6 @@ struct Settings : public SettingsCollection M(SettingBool, partial_merge_join, false, "Obsolete. Use join_algorithm='prefer_partial_merge' instead.", 0) \ M(SettingUInt64, max_memory_usage_for_all_queries, 0, "Obsolete. Will be removed after 2020-10-20", 0) \ - DECLARE_SETTINGS_COLLECTION(LIST_OF_SETTINGS) /** Set multiple settings from "profile" (in server configuration file (users.xml), profiles contain groups of multiple settings). diff --git a/src/Processors/Chunk.cpp b/src/Processors/Chunk.cpp index d68c2bea5a..2c3d506fa0 100644 --- a/src/Processors/Chunk.cpp +++ b/src/Processors/Chunk.cpp @@ -46,7 +46,7 @@ Chunk::Chunk(MutableColumns columns_, UInt64 num_rows_, ChunkInfoPtr chunk_info_ Chunk Chunk::clone() const { - return Chunk(getColumns(), getNumRows()); + return Chunk(getColumns(), getNumRows(), chunk_info); } void Chunk::setColumns(Columns columns_, UInt64 num_rows_) diff --git a/src/Processors/ISimpleTransform.cpp b/src/Processors/ISimpleTransform.cpp index fc8f1ba30f..ac8f2f8b7a 100644 --- a/src/Processors/ISimpleTransform.cpp +++ b/src/Processors/ISimpleTransform.cpp @@ -29,10 +29,10 @@ ISimpleTransform::Status ISimpleTransform::prepare() } /// Output if has data. - if (transformed) + if (has_output) { - output.pushData(std::move(current_data)); - transformed = false; + output.pushData(std::move(output_data)); + has_output = false; if (!no_more_data_needed) return Status::PortFull; @@ -56,27 +56,17 @@ ISimpleTransform::Status ISimpleTransform::prepare() return Status::Finished; } + input.setNeeded(); + if (!input.hasData()) - { - input.setNeeded(); return Status::NeedData; - } - current_data = input.pullData(true); + input_data = input.pullData(set_input_not_needed_after_read); has_input = true; - if (current_data.exception) - { - /// Skip transform in case of exception. - has_input = false; - transformed = true; - + if (input_data.exception) /// No more data needed. Exception will be thrown (or swallowed) later. input.setNotNeeded(); - } - - if (set_input_not_needed_after_read) - input.setNotNeeded(); } /// Now transform. @@ -85,29 +75,35 @@ ISimpleTransform::Status ISimpleTransform::prepare() void ISimpleTransform::work() { - if (current_data.exception) + if (input_data.exception) + { + /// Skip transform in case of exception. + output_data = std::move(input_data); + has_input = false; + has_output = true; return; + } try { - transform(current_data.chunk); + transform(input_data.chunk, output_data.chunk); } catch (DB::Exception &) { - current_data.exception = std::current_exception(); - transformed = true; + output_data.exception = std::current_exception(); + has_output = true; has_input = false; return; } has_input = !needInputData(); - if (!skip_empty_chunks || current_data.chunk) - transformed = true; + if (!skip_empty_chunks || output_data.chunk) + has_output = true; - if (transformed && !current_data.chunk) + if (has_output && !output_data.chunk && getOutputPort().getHeader()) /// Support invariant that chunks must have the same number of columns as header. - current_data.chunk = Chunk(getOutputPort().getHeader().cloneEmpty().getColumns(), 0); + output_data.chunk = Chunk(getOutputPort().getHeader().cloneEmpty().getColumns(), 0); } } diff --git a/src/Processors/ISimpleTransform.h b/src/Processors/ISimpleTransform.h index 1f0c5a5b04..ee92b574d7 100644 --- a/src/Processors/ISimpleTransform.h +++ b/src/Processors/ISimpleTransform.h @@ -6,6 +6,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int NOT_IMPLEMENTED; +} + /** Has one input and one output. * Simply pull a block from input, transform it, and push it to output. */ @@ -15,18 +20,29 @@ protected: InputPort & input; OutputPort & output; - Port::Data current_data; + Port::Data input_data; + Port::Data output_data; bool has_input = false; - bool transformed = false; + bool has_output = false; bool no_more_data_needed = false; const bool skip_empty_chunks; /// Set input port NotNeeded after chunk was pulled. /// Input port will become needed again only after data was transformed. /// This allows to escape caching chunks in input port, which can lead to uneven data distribution. - bool set_input_not_needed_after_read = false; + bool set_input_not_needed_after_read = true; + + virtual void transform(Chunk &) + { + throw Exception("Method transform is not implemented for " + getName(), ErrorCodes::NOT_IMPLEMENTED); + } + + virtual void transform(Chunk & input_chunk, Chunk & output_chunk) + { + transform(input_chunk); + output_chunk.swap(input_chunk); + } - virtual void transform(Chunk & chunk) = 0; virtual bool needInputData() const { return true; } void stopReading() { no_more_data_needed = true; } diff --git a/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp index 269c69f274..be9bf3e354 100644 --- a/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp @@ -289,10 +289,10 @@ void AggregatingSortedAlgorithm::initialize(Chunks chunks) initializeQueue(std::move(chunks)); } -void AggregatingSortedAlgorithm::consume(Chunk chunk, size_t source_num) +void AggregatingSortedAlgorithm::consume(Chunk & chunk, size_t source_num) { preprocessChunk(chunk, columns_definition); - updateCursor(std::move(chunk), source_num); + updateCursor(chunk, source_num); } IMergingAlgorithm::Status AggregatingSortedAlgorithm::merge() diff --git a/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.h b/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.h index b4819ad030..fe1710adc8 100644 --- a/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.h +++ b/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.h @@ -20,7 +20,7 @@ public: SortDescription description_, size_t max_block_size); void initialize(Chunks chunks) override; - void consume(Chunk chunk, size_t source_num) override; + void consume(Chunk & chunk, size_t source_num) override; Status merge() override; struct SimpleAggregateDescription; diff --git a/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp index cd3a193105..8e799664fa 100644 --- a/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp @@ -23,6 +23,7 @@ CollapsingSortedAlgorithm::CollapsingSortedAlgorithm( size_t num_inputs, SortDescription description_, const String & sign_column, + bool only_positive_sign_, size_t max_block_size, WriteBuffer * out_row_sources_buf_, bool use_average_block_sizes, @@ -30,6 +31,7 @@ CollapsingSortedAlgorithm::CollapsingSortedAlgorithm( : IMergingAlgorithmWithSharedChunks(num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs) , merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size) , sign_column_number(header.getPositionByName(sign_column)) + , only_positive_sign(only_positive_sign_) , log(log_) { } @@ -76,7 +78,7 @@ void CollapsingSortedAlgorithm::insertRows() if (last_is_positive || count_positive != count_negative) { - if (count_positive <= count_negative) + if (count_positive <= count_negative && !only_positive_sign) { insertRow(first_negative_row); diff --git a/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h b/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h index 4158f55a7c..3cbe95d96e 100644 --- a/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h +++ b/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h @@ -31,6 +31,7 @@ public: size_t num_inputs, SortDescription description_, const String & sign_column, + bool only_positive_sign_, /// For select final. Skip rows with sum(sign) < 0. size_t max_block_size, WriteBuffer * out_row_sources_buf_, bool use_average_block_sizes, @@ -42,6 +43,7 @@ private: MergedData merged_data; const size_t sign_column_number; + const bool only_positive_sign; static constexpr size_t max_row_refs = 4; /// first_negative, last_positive, last, current. RowRef first_negative_row; diff --git a/src/Processors/Merges/Algorithms/IMergingAlgorithm.h b/src/Processors/Merges/Algorithms/IMergingAlgorithm.h index 263acee4c2..b49209e462 100644 --- a/src/Processors/Merges/Algorithms/IMergingAlgorithm.h +++ b/src/Processors/Merges/Algorithms/IMergingAlgorithm.h @@ -21,7 +21,7 @@ public: }; virtual void initialize(Chunks chunks) = 0; - virtual void consume(Chunk chunk, size_t source_num) = 0; + virtual void consume(Chunk & chunk, size_t source_num) = 0; virtual Status merge() = 0; IMergingAlgorithm() = default; diff --git a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.cpp b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.cpp index 6777109982..751a08ce69 100644 --- a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.cpp +++ b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.cpp @@ -28,15 +28,15 @@ void IMergingAlgorithmWithDelayedChunk::initializeQueue(Chunks chunks) queue = SortingHeap(cursors); } -void IMergingAlgorithmWithDelayedChunk::updateCursor(Chunk chunk, size_t source_num) +void IMergingAlgorithmWithDelayedChunk::updateCursor(Chunk & chunk, size_t source_num) { auto & source_chunk = source_chunks[source_num]; /// Extend lifetime of last chunk. - last_chunk = std::move(source_chunk); + last_chunk.swap(source_chunk); last_chunk_sort_columns = std::move(cursors[source_num].sort_columns); - source_chunk = std::move(chunk); + source_chunk.swap(chunk); cursors[source_num].reset(source_chunk.getColumns(), {}); queue.push(cursors[source_num]); diff --git a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.h b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.h index d02b9dfcb7..f7d5f63023 100644 --- a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.h +++ b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.h @@ -24,7 +24,7 @@ protected: ColumnRawPtrs last_chunk_sort_columns; /// Point to last_chunk if valid. void initializeQueue(Chunks chunks); - void updateCursor(Chunk chunk, size_t source_num); + void updateCursor(Chunk & chunk, size_t source_num); private: /// Chunks currently being merged. diff --git a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.cpp b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.cpp index bcea74b5f3..1fe61653ec 100644 --- a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.cpp +++ b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.cpp @@ -39,7 +39,7 @@ void IMergingAlgorithmWithSharedChunks::initialize(Chunks chunks) auto & source_chunk = source_chunks[source_num]; - source_chunk = chunk_allocator.alloc(std::move(chunks[source_num])); + source_chunk = chunk_allocator.alloc(chunks[source_num]); cursors[source_num] = SortCursorImpl(source_chunk->getColumns(), description, source_num); source_chunk->all_columns = cursors[source_num].all_columns; @@ -49,12 +49,12 @@ void IMergingAlgorithmWithSharedChunks::initialize(Chunks chunks) queue = SortingHeap(cursors); } -void IMergingAlgorithmWithSharedChunks::consume(Chunk chunk, size_t source_num) +void IMergingAlgorithmWithSharedChunks::consume(Chunk & chunk, size_t source_num) { prepareChunk(chunk); auto & source_chunk = source_chunks[source_num]; - source_chunk = chunk_allocator.alloc(std::move(chunk)); + source_chunk = chunk_allocator.alloc(chunk); cursors[source_num].reset(source_chunk->getColumns(), {}); source_chunk->all_columns = cursors[source_num].all_columns; diff --git a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.h b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.h index 1ef7f540f9..a3dbadc458 100644 --- a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.h +++ b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.h @@ -16,7 +16,7 @@ public: size_t max_row_refs); void initialize(Chunks chunks) override; - void consume(Chunk chunk, size_t source_num) override; + void consume(Chunk & chunk, size_t source_num) override; private: SortDescription description; diff --git a/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.cpp index d06e3c1179..78221b4825 100644 --- a/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.cpp @@ -74,10 +74,10 @@ void MergingSortedAlgorithm::initialize(Chunks chunks) queue_without_collation = SortingHeap(cursors); } -void MergingSortedAlgorithm::consume(Chunk chunk, size_t source_num) +void MergingSortedAlgorithm::consume(Chunk & chunk, size_t source_num) { prepareChunk(chunk); - source_chunks[source_num] = std::move(chunk); + source_chunks[source_num].swap(chunk); cursors[source_num].reset(source_chunks[source_num].getColumns(), {}); if (has_collation) diff --git a/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.h b/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.h index 6ff48b520b..5b361c1000 100644 --- a/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.h +++ b/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.h @@ -23,7 +23,7 @@ public: void addInput(); void initialize(Chunks chunks) override; - void consume(Chunk chunk, size_t source_num) override; + void consume(Chunk & chunk, size_t source_num) override; Status merge() override; const MergedData & getMergedData() const { return merged_data; } diff --git a/src/Processors/Merges/Algorithms/RowRef.h b/src/Processors/Merges/Algorithms/RowRef.h index 5aeae95206..658442e34c 100644 --- a/src/Processors/Merges/Algorithms/RowRef.h +++ b/src/Processors/Merges/Algorithms/RowRef.h @@ -63,7 +63,7 @@ public: free_chunks.push_back(i); } - SharedChunkPtr alloc(Chunk && chunk) + SharedChunkPtr alloc(Chunk & chunk) { if (free_chunks.empty()) throw Exception("Not enough space in SharedChunkAllocator. " @@ -72,7 +72,7 @@ public: auto pos = free_chunks.back(); free_chunks.pop_back(); - chunks[pos] = std::move(chunk); + chunks[pos].swap(chunk); chunks[pos].position = pos; chunks[pos].allocator = this; @@ -110,9 +110,9 @@ private: } /// Release memory. It is not obligatory. - ptr->clear(); - ptr->all_columns.clear(); - ptr->sort_columns.clear(); +// ptr->clear(); +// ptr->all_columns.clear(); +// ptr->sort_columns.clear(); free_chunks.push_back(ptr->position); } diff --git a/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp index 5bcbf778d6..89154044ae 100644 --- a/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp @@ -632,10 +632,10 @@ void SummingSortedAlgorithm::initialize(Chunks chunks) initializeQueue(std::move(chunks)); } -void SummingSortedAlgorithm::consume(Chunk chunk, size_t source_num) +void SummingSortedAlgorithm::consume(Chunk & chunk, size_t source_num) { preprocessChunk(chunk); - updateCursor(std::move(chunk), source_num); + updateCursor(chunk, source_num); } IMergingAlgorithm::Status SummingSortedAlgorithm::merge() diff --git a/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.h b/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.h index a38df215cc..fc5431f1a0 100644 --- a/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.h +++ b/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.h @@ -23,7 +23,7 @@ public: size_t max_block_size); void initialize(Chunks chunks) override; - void consume(Chunk chunk, size_t source_num) override; + void consume(Chunk & chunk, size_t source_num) override; Status merge() override; struct AggregateDescription; diff --git a/src/Processors/Merges/CollapsingSortedTransform.h b/src/Processors/Merges/CollapsingSortedTransform.h index d4f40c6093..cdf7c4a160 100644 --- a/src/Processors/Merges/CollapsingSortedTransform.h +++ b/src/Processors/Merges/CollapsingSortedTransform.h @@ -15,6 +15,7 @@ public: size_t num_inputs, SortDescription description_, const String & sign_column, + bool only_positive_sign, size_t max_block_size, WriteBuffer * out_row_sources_buf_ = nullptr, bool use_average_block_sizes = false) @@ -24,6 +25,7 @@ public: num_inputs, std::move(description_), sign_column, + only_positive_sign, max_block_size, out_row_sources_buf_, use_average_block_sizes, diff --git a/src/Processors/Merges/IMergingTransform.cpp b/src/Processors/Merges/IMergingTransform.cpp index 0dc4cd4199..f42bd44f6c 100644 --- a/src/Processors/Merges/IMergingTransform.cpp +++ b/src/Processors/Merges/IMergingTransform.cpp @@ -1,4 +1,5 @@ #include +#include namespace DB { @@ -135,7 +136,6 @@ IProcessor::Status IMergingTransformBase::prepare() if (state.is_finished) { - if (is_port_full) return Status::PortFull; @@ -158,11 +158,11 @@ IProcessor::Status IMergingTransformBase::prepare() if (!input.hasData()) return Status::NeedData; - auto chunk = input.pull(); - if (!chunk.hasRows() && !input.isFinished()) + state.input_chunk = input.pull(); + if (!state.input_chunk.hasRows() && !input.isFinished()) return Status::NeedData; - state.input_chunk = std::move(chunk); + state.has_input = true; } state.need_data = false; @@ -174,4 +174,83 @@ IProcessor::Status IMergingTransformBase::prepare() return Status::Ready; } +static void filterChunk(Chunk & chunk, size_t selector_position) +{ + if (!chunk.getChunkInfo()) + throw Exception("IMergingTransformBase expected ChunkInfo for input chunk", ErrorCodes::LOGICAL_ERROR); + + const auto * chunk_info = typeid_cast(chunk.getChunkInfo().get()); + if (!chunk_info) + throw Exception("IMergingTransformBase expected SelectorInfo for input chunk", ErrorCodes::LOGICAL_ERROR); + + auto & selector = chunk_info->selector; + + IColumn::Filter filter; + filter.resize_fill(selector.size()); + + size_t num_rows = chunk.getNumRows(); + auto columns = chunk.detachColumns(); + + size_t num_result_rows = 0; + + for (size_t row = 0; row < num_rows; ++row) + { + if (selector[row] == selector_position) + { + ++num_result_rows; + filter[row] = 1; + } + } + + for (auto & column : columns) + column = column->filter(filter, num_result_rows); + + chunk.clear(); + chunk.setColumns(std::move(columns), num_result_rows); +} + +bool IMergingTransformBase::filterChunks() +{ + if (state.selector_position < 0) + return true; + + bool has_empty_chunk = false; + + if (!state.init_chunks.empty()) + { + for (size_t i = 0; i < input_states.size(); ++i) + { + auto & chunk = state.init_chunks[i]; + if (!chunk || input_states[i].is_filtered) + continue; + + filterChunk(chunk, state.selector_position); + + if (!chunk.hasRows()) + { + chunk.clear(); + has_empty_chunk = true; + input_states[i].is_initialized = false; + is_initialized = false; + } + else + input_states[i].is_filtered = true; + } + } + + if (state.has_input) + { + filterChunk(state.input_chunk, state.selector_position); + if (!state.input_chunk.hasRows()) + { + state.has_input = false; + state.need_data = true; + has_empty_chunk = true; + } + } + + return !has_empty_chunk; +} + + } diff --git a/src/Processors/Merges/IMergingTransform.h b/src/Processors/Merges/IMergingTransform.h index f9c2dba827..939faa48d2 100644 --- a/src/Processors/Merges/IMergingTransform.h +++ b/src/Processors/Merges/IMergingTransform.h @@ -18,6 +18,10 @@ public: const Block & output_header, bool have_all_inputs_); + virtual ~IMergingTransformBase() = default; + + OutputPort & getOutputPort() { return outputs.front(); } + /// Methods to add additional input port. It is possible to do only before the first call of `prepare`. void addInput(); /// Need to be called after all inputs are added. (only if have_all_inputs was not specified). @@ -25,20 +29,29 @@ public: Status prepare() override; + /// Set position which will be used in selector if input chunk has attached SelectorInfo (see SelectorInfo.h). + /// Columns will be filtered, keep only rows labeled with this position. + /// It is used in parallel final. + void setSelectorPosition(size_t position) { state.selector_position = position; } + protected: virtual void onNewInput(); /// Is called when new input is added. Only if have_all_inputs = false. virtual void onFinish() {} /// Is called when all data is processed. + bool filterChunks(); /// Filter chunks if selector position was set. For parallel final. + /// Processor state. struct State { Chunk output_chunk; Chunk input_chunk; + bool has_input = false; bool is_finished = false; bool need_data = false; size_t next_input_to_read = 0; Chunks init_chunks; + ssize_t selector_position = -1; }; State state; @@ -50,6 +63,7 @@ private: InputPort & port; bool is_initialized = false; + bool is_filtered = false; }; std::vector input_states; @@ -78,14 +92,18 @@ public: void work() override { + if (!filterChunks()) + return; + if (!state.init_chunks.empty()) algorithm.initialize(std::move(state.init_chunks)); - if (state.input_chunk) + if (state.has_input) { // std::cerr << "Consume chunk with " << state.input_chunk.getNumRows() // << " for input " << state.next_input_to_read << std::endl; - algorithm.consume(std::move(state.input_chunk), state.next_input_to_read); + algorithm.consume(state.input_chunk, state.next_input_to_read); + state.has_input = false; } IMergingAlgorithm::Status status = algorithm.merge(); @@ -120,4 +138,6 @@ private: using IMergingTransformBase::state; }; +using MergingTransformPtr = std::shared_ptr; + } diff --git a/src/Processors/Pipe.cpp b/src/Processors/Pipe.cpp index d9b21dbc85..5d92e909a5 100644 --- a/src/Processors/Pipe.cpp +++ b/src/Processors/Pipe.cpp @@ -96,6 +96,15 @@ Pipe::Pipe(Pipes && pipes, ProcessorPtr transform) processors.emplace_back(std::move(transform)); } +Pipe::Pipe(OutputPort * port) : output_port(port) +{ +} + +void Pipe::addProcessors(const Processors & processors_) +{ + processors.insert(processors.end(), processors_.begin(), processors_.end()); +} + void Pipe::addSimpleTransform(ProcessorPtr transform) { checkSimpleTransform(*transform); diff --git a/src/Processors/Pipe.h b/src/Processors/Pipe.h index 42bbd4e06d..984fa7605c 100644 --- a/src/Processors/Pipe.h +++ b/src/Processors/Pipe.h @@ -22,6 +22,8 @@ public: /// Transform must have the number of inputs equals to the number of pipes. And single output. /// Will connect pipes outputs with transform inputs automatically. Pipe(Pipes && pipes, ProcessorPtr transform); + /// Create pipe from output port. If pipe was created that way, it possibly will not have tree shape. + Pipe(OutputPort * port); Pipe(const Pipe & other) = delete; Pipe(Pipe && other) = default; @@ -29,6 +31,9 @@ public: Pipe & operator=(const Pipe & other) = delete; Pipe & operator=(Pipe && other) = default; + /// Append processors to pipe. After this, it possibly will not have tree shape. + void addProcessors(const Processors & processors_); + OutputPort & getPort() const { return *output_port; } const Block & getHeader() const { return output_port->getHeader(); } diff --git a/src/Processors/Transforms/AddingSelectorTransform.cpp b/src/Processors/Transforms/AddingSelectorTransform.cpp new file mode 100644 index 0000000000..f75a592007 --- /dev/null +++ b/src/Processors/Transforms/AddingSelectorTransform.cpp @@ -0,0 +1,76 @@ +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +AddingSelectorTransform::AddingSelectorTransform( + const Block & header, size_t num_outputs_, ColumnNumbers key_columns_) + : ISimpleTransform(header, header, false) + , num_outputs(num_outputs_) + , key_columns(std::move(key_columns_)) + , hash(0) +{ + setInputNotNeededAfterRead(false); + + if (num_outputs <= 1) + throw Exception("SplittingByHashTransform expects more than 1 outputs, got " + std::to_string(num_outputs), + ErrorCodes::LOGICAL_ERROR); + + if (key_columns.empty()) + throw Exception("SplittingByHashTransform cannot split by empty set of key columns", + ErrorCodes::LOGICAL_ERROR); + + for (auto & column : key_columns) + if (column >= header.columns()) + throw Exception("Invalid column number: " + std::to_string(column) + + ". There is only " + std::to_string(header.columns()) + " columns in header", + ErrorCodes::LOGICAL_ERROR); +} + +static void calculateWeakHash32(const Chunk & chunk, const ColumnNumbers & key_columns, WeakHash32 & hash) +{ + auto num_rows = chunk.getNumRows(); + const auto & columns = chunk.getColumns(); + + hash.reset(num_rows); + + for (const auto & column_number : key_columns) + columns[column_number]->updateWeakHash32(hash); +} + +static IColumn::Selector fillSelector(const WeakHash32 & hash, size_t num_outputs) +{ + /// Row from interval [(2^32 / num_outputs) * i, (2^32 / num_outputs) * (i + 1)) goes to bucket with number i. + + const auto & hash_data = hash.getData(); + size_t num_rows = hash_data.size(); + IColumn::Selector selector(num_rows); + + for (size_t row = 0; row < num_rows; ++row) + { + selector[row] = hash_data[row]; /// [0, 2^32) + selector[row] *= num_outputs; /// [0, num_outputs * 2^32), selector stores 64 bit values. + selector[row] >>= 32u; /// [0, num_outputs) + } + + return selector; +} + +void AddingSelectorTransform::transform(Chunk & input_chunk, Chunk & output_chunk) +{ + auto chunk_info = std::make_shared(); + + calculateWeakHash32(input_chunk, key_columns, hash); + chunk_info->selector = fillSelector(hash, num_outputs); + + input_chunk.swap(output_chunk); + output_chunk.setChunkInfo(std::move(chunk_info)); +} + +} diff --git a/src/Processors/Transforms/AddingSelectorTransform.h b/src/Processors/Transforms/AddingSelectorTransform.h new file mode 100644 index 0000000000..e82dcc964d --- /dev/null +++ b/src/Processors/Transforms/AddingSelectorTransform.h @@ -0,0 +1,26 @@ +#pragma once +#include +#include +#include +#include + +namespace DB +{ + +/// Add IColumn::Selector to chunk (see SelectorInfo.h). +/// Selector is filled by formula (WeakHash(key_columns) * num_outputs / MAX_INT). +class AddingSelectorTransform : public ISimpleTransform +{ +public: + AddingSelectorTransform(const Block & header, size_t num_outputs_, ColumnNumbers key_columns_); + String getName() const override { return "SplittingByHash"; } + void transform(Chunk & input_chunk, Chunk & output_chunk) override; + +private: + size_t num_outputs; + ColumnNumbers key_columns; + + WeakHash32 hash; +}; + +} diff --git a/src/Processors/Transforms/CopyTransform.cpp b/src/Processors/Transforms/CopyTransform.cpp new file mode 100644 index 0000000000..c9047c942d --- /dev/null +++ b/src/Processors/Transforms/CopyTransform.cpp @@ -0,0 +1,108 @@ +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +CopyTransform::CopyTransform(const Block & header, size_t num_outputs) + : IProcessor(InputPorts(1, header), OutputPorts(num_outputs, header)) +{ + if (num_outputs <= 1) + throw Exception("CopyTransform expects more than 1 outputs, got " + std::to_string(num_outputs), ErrorCodes::LOGICAL_ERROR); +} + +IProcessor::Status CopyTransform::prepare() +{ + Status status = Status::Ready; + + while (status == Status::Ready) + { + status = !has_data ? prepareConsume() + : prepareGenerate(); + } + + return status; +} + +IProcessor::Status CopyTransform::prepareConsume() +{ + auto & input = getInputPort(); + + /// Check all outputs are finished or ready to get data. + + bool all_finished = true; + for (auto & output : outputs) + { + if (output.isFinished()) + continue; + + all_finished = false; + } + + if (all_finished) + { + input.close(); + return Status::Finished; + } + + /// Try get chunk from input. + + if (input.isFinished()) + { + for (auto & output : outputs) + output.finish(); + + return Status::Finished; + } + + input.setNeeded(); + if (!input.hasData()) + return Status::NeedData; + + chunk = input.pull(); + has_data = true; + was_output_processed.assign(outputs.size(), false); + + return Status::Ready; +} + +IProcessor::Status CopyTransform::prepareGenerate() +{ + bool all_outputs_processed = true; + + size_t chunk_number = 0; + for (auto & output : outputs) + { + auto & was_processed = was_output_processed[chunk_number]; + ++chunk_number; + + if (was_processed) + continue; + + if (output.isFinished()) + continue; + + if (!output.canPush()) + { + all_outputs_processed = false; + continue; + } + + output.push(chunk.clone()); + was_processed = true; + } + + if (all_outputs_processed) + { + has_data = false; + return Status::Ready; + } + + return Status::PortFull; +} + +} diff --git a/src/Processors/Transforms/CopyTransform.h b/src/Processors/Transforms/CopyTransform.h new file mode 100644 index 0000000000..cf56fdf10d --- /dev/null +++ b/src/Processors/Transforms/CopyTransform.h @@ -0,0 +1,28 @@ +#pragma once +#include + +namespace DB +{ + +/// Transform which has single input and num_outputs outputs. +/// Read chunk from input and copy it to all outputs. +class CopyTransform : public IProcessor +{ +public: + CopyTransform(const Block & header, size_t num_outputs); + + String getName() const override { return "Copy"; } + Status prepare() override; + + InputPort & getInputPort() { return inputs.front(); } + +private: + Chunk chunk; + bool has_data = false; + std::vector was_output_processed; + + Status prepareGenerate(); + Status prepareConsume(); +}; + +} diff --git a/src/Processors/Transforms/SelectorInfo.h b/src/Processors/Transforms/SelectorInfo.h new file mode 100644 index 0000000000..2876d64ed2 --- /dev/null +++ b/src/Processors/Transforms/SelectorInfo.h @@ -0,0 +1,14 @@ +#pragma once +#include +#include + +namespace DB +{ + +/// ChunkInfo with IColumn::Selector. It is added by AddingSelectorTransform. +struct SelectorInfo : public ChunkInfo +{ + IColumn::Selector selector; +}; + +} diff --git a/src/Processors/ya.make b/src/Processors/ya.make index 6469532a8e..7818f2fb18 100644 --- a/src/Processors/ya.make +++ b/src/Processors/ya.make @@ -106,8 +106,10 @@ SRCS( Sources/SourceFromInputStream.cpp Sources/SourceWithProgress.cpp Transforms/AddingMissedTransform.cpp + Transforms/AddingSelectorTransform.cpp Transforms/AggregatingTransform.cpp Transforms/ConvertingTransform.cpp + Transforms/CopyTransform.cpp Transforms/CreatingSetsTransform.cpp Transforms/CubeTransform.cpp Transforms/DistinctTransform.cpp diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 2ab43f8f56..f1c1904256 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -752,7 +752,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor case MergeTreeData::MergingParams::Collapsing: merged_transform = std::make_unique( - header, pipes.size(), sort_description, data.merging_params.sign_column, + header, pipes.size(), sort_description, data.merging_params.sign_column, false, merge_block_size, rows_sources_write_buf.get(), blocks_are_granules_size); break; diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 580c95b34d..4eeb954bd7 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -41,22 +41,25 @@ namespace std #include #include #include -#include #include #include -#include -#include -#include -#include -#include -#include -#include +#include +#include +#include #include +#include +#include #include +#include #include -#include #include -#include +#include +#include +#include +#include +#include +#include +#include namespace ProfileEvents { @@ -617,6 +620,7 @@ Pipes MergeTreeDataSelectExecutor::readFromParts( res = spreadMarkRangesAmongStreamsFinal( std::move(parts_with_ranges), + num_streams, column_names_to_read, max_block_size, settings.use_uncompressed_cache, @@ -1017,6 +1021,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder( Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( RangesInDataParts && parts, + size_t num_streams, const Names & column_names, UInt64 max_block_size, bool use_uncompressed_cache, @@ -1074,71 +1079,122 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( for (size_t i = 0; i < sort_columns_size; ++i) sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1); - /// Converts pipes to BlockInputsStreams. - /// It is temporary, till not all merging streams are implemented as processors. - auto streams_to_merge = [&pipes]() + auto get_merging_processor = [&]() -> MergingTransformPtr { - size_t num_streams = pipes.size(); + switch (data.merging_params.mode) + { + case MergeTreeData::MergingParams::Ordinary: + { + return std::make_shared(header, pipes.size(), + sort_description, max_block_size); + } - BlockInputStreams streams; - streams.reserve(num_streams); + case MergeTreeData::MergingParams::Collapsing: + return std::make_shared(header, pipes.size(), + sort_description, data.merging_params.sign_column, true, max_block_size); - for (size_t i = 0; i < num_streams; ++i) - streams.emplace_back(std::make_shared(std::move(pipes[i]))); + case MergeTreeData::MergingParams::Summing: + return std::make_shared(header, pipes.size(), + sort_description, data.merging_params.columns_to_sum, max_block_size); + + case MergeTreeData::MergingParams::Aggregating: + return std::make_shared(header, pipes.size(), + sort_description, max_block_size); + + case MergeTreeData::MergingParams::Replacing: + return std::make_shared(header, pipes.size(), + sort_description, data.merging_params.version_column, max_block_size); - pipes.clear(); - return streams; + case MergeTreeData::MergingParams::VersionedCollapsing: + return std::make_shared(header, pipes.size(), + sort_description, data.merging_params.sign_column, max_block_size); + + case MergeTreeData::MergingParams::Graphite: + throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR); + } + + __builtin_unreachable(); }; - BlockInputStreamPtr merged; - ProcessorPtr merged_processor; - switch (data.merging_params.mode) + if (num_streams > settings.max_final_threads) + num_streams = settings.max_final_threads; + + if (num_streams <= 1 || sort_description.empty() || query_info.force_tree_shaped_pipeline) + { + + Pipe pipe(std::move(pipes), get_merging_processor()); + pipes = Pipes(); + pipes.emplace_back(std::move(pipe)); + + return pipes; + } + + ColumnNumbers key_columns; + key_columns.reserve(sort_description.size()); + + for (auto & desc : sort_description) + { + if (!desc.column_name.empty()) + key_columns.push_back(header.getPositionByName(desc.column_name)); + else + key_columns.emplace_back(desc.column_number); + } + + Processors selectors; + Processors copiers; + selectors.reserve(pipes.size()); + + for (auto & pipe : pipes) + { + auto selector = std::make_shared(pipe.getHeader(), num_streams, key_columns); + auto copier = std::make_shared(pipe.getHeader(), num_streams); + connect(pipe.getPort(), selector->getInputPort()); + connect(selector->getOutputPort(), copier->getInputPort()); + selectors.emplace_back(std::move(selector)); + copiers.emplace_back(std::move(copier)); + } + + Processors merges; + std::vector input_ports; + merges.reserve(num_streams); + input_ports.reserve(num_streams); + + for (size_t i = 0; i < num_streams; ++i) { - case MergeTreeData::MergingParams::Ordinary: + auto merge = get_merging_processor(); + merge->setSelectorPosition(i); + input_ports.emplace_back(merge->getInputs().begin()); + merges.emplace_back(std::move(merge)); + } + + /// Connect outputs of i-th splitter with i-th input port of every merge. + for (auto & resize : copiers) + { + size_t input_num = 0; + for (auto & output : resize->getOutputs()) { - merged_processor = std::make_shared(header, pipes.size(), - sort_description, max_block_size); - break; + connect(output, *input_ports[input_num]); + ++input_ports[input_num]; + ++input_num; } - - case MergeTreeData::MergingParams::Collapsing: - merged = std::make_shared( - streams_to_merge(), sort_description, data.merging_params.sign_column); - break; - - case MergeTreeData::MergingParams::Summing: - merged_processor = std::make_shared(header, pipes.size(), - sort_description, data.merging_params.columns_to_sum, max_block_size); - break; - - case MergeTreeData::MergingParams::Aggregating: - merged_processor = std::make_shared(header, pipes.size(), - sort_description, max_block_size); - break; - - case MergeTreeData::MergingParams::Replacing: /// TODO Make ReplacingFinalBlockInputStream - merged_processor = std::make_shared(header, pipes.size(), - sort_description, data.merging_params.version_column, max_block_size); - break; - - case MergeTreeData::MergingParams::VersionedCollapsing: /// TODO Make VersionedCollapsingFinalBlockInputStream - merged_processor = std::make_shared(header, pipes.size(), - sort_description, data.merging_params.sign_column, max_block_size); - break; - - case MergeTreeData::MergingParams::Graphite: - throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR); } - if (merged_processor) + Processors processors; + for (auto & pipe : pipes) { - Pipe pipe(std::move(pipes), std::move(merged_processor)); - pipes = Pipes(); - pipes.emplace_back(std::move(pipe)); + auto pipe_processors = std::move(pipe).detachProcessors(); + processors.insert(processors.end(), pipe_processors.begin(), pipe_processors.end()); } - if (merged) - pipes.emplace_back(std::make_shared(merged)); + pipes.clear(); + pipes.reserve(num_streams); + for (auto & merge : merges) + pipes.emplace_back(&merge->getOutputs().front()); + + pipes.front().addProcessors(processors); + pipes.front().addProcessors(selectors); + pipes.front().addProcessors(copiers); + pipes.front().addProcessors(merges); return pipes; } diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h index e6eb26da7e..7ded8fcfad 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h @@ -71,6 +71,7 @@ private: Pipes spreadMarkRangesAmongStreamsFinal( RangesInDataParts && parts, + size_t num_streams, const Names & column_names, UInt64 max_block_size, bool use_uncompressed_cache, diff --git a/tests/performance/parallel_final.xml b/tests/performance/parallel_final.xml new file mode 100644 index 0000000000..f5e7ff51f6 --- /dev/null +++ b/tests/performance/parallel_final.xml @@ -0,0 +1,51 @@ + + + + 1024 + + + + + collapsing + + collapsing_final_16p_ord + collapsing_final_16p_rnd + collapsing_final_16p_int_keys_ord + collapsing_final_16p_int_keys_rnd + collapsing_final_16p_str_keys_ord + collapsing_final_16p_str_keys_rnd + collapsing_final_1024p_ord + collapsing_final_1024p_rnd + + + + + create table collapsing_final_16p_ord (key1 UInt32, key2 String, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1, key2) partition by intDiv(key1, 8192 * 64) + create table collapsing_final_16p_rnd (key1 UInt32, key2 String, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1, key2) partition by key1 % 16 + create table collapsing_final_16p_int_keys_ord (key1 UInt32, key2 UInt32, key3 UInt32, key4 UInt32, key5 UInt32, key6 UInt32, key7 UInt32, key8 UInt32, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1, key2, key3, key4, key5, key6, key7, key8) partition by intDiv(key1, 8192 * 64) + create table collapsing_final_16p_int_keys_rnd (key1 UInt32, key2 UInt32, key3 UInt32, key4 UInt32, key5 UInt32, key6 UInt32, key7 UInt32, key8 UInt32, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1, key2, key3, key4, key5, key6, key7, key8) partition by key1 % 16 + create table collapsing_final_16p_str_keys_ord (key1 UInt32, key2 String, key3 String, key4 String, key5 String, key6 String, key7 String, key8 String, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1, key2, key3, key4, key5, key6, key7, key8) partition by intDiv(key1, 8192 * 64) + create table collapsing_final_16p_str_keys_rnd (key1 UInt32, key2 String, key3 String, key4 String, key5 String, key6 String, key7 String, key8 String, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1, key2, key3, key4, key5, key6, key7, key8) partition by key1 % 16 + create table collapsing_final_1024p_ord (key1 UInt32, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1) partition by intDiv(key1, 8192 * 2) + create table collapsing_final_1024p_rnd (key1 UInt32, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1) partition by key1 % 1024 + + + insert into collapsing_final_16p_ord select number, number, 1, number from numbers(8388608) + insert into collapsing_final_16p_rnd select sipHash64(number), number, 1, number from numbers(8388608) + insert into collapsing_final_16p_int_keys_ord select number, number, number, number, number, number, number, number, 1, number from numbers(8388608) + insert into collapsing_final_16p_int_keys_rnd select sipHash64(number), number, number, number, number, number, number, number, 1, number from numbers(8388608) + insert into collapsing_final_16p_str_keys_ord select number, number, number, number, number, number, number, number, 1, number from numbers(8388608) + insert into collapsing_final_16p_str_keys_rnd select sipHash64(number), number, number, number, number, number, number, number, 1, number from numbers(8388608) + + + insert into collapsing_final_1024p_ord select number, 1, number from numbers(16777216) + insert into collapsing_final_1024p_rnd select number, 1, number from numbers(16777216) + + optimize table {collapsing} final + + SELECT count() FROM {collapsing} final + SELECT sum(s) FROM {collapsing} final group by key1 limit 10 + SELECT sum(s) FROM {collapsing} final group by key1 % 8192 limit 10 + + DROP TABLE IF EXISTS {collapsing} + diff --git a/tests/queries/0_stateless/00191_aggregating_merge_tree_and_final.sql b/tests/queries/0_stateless/00191_aggregating_merge_tree_and_final.sql index 47b9600611..776edeeb43 100644 --- a/tests/queries/0_stateless/00191_aggregating_merge_tree_and_final.sql +++ b/tests/queries/0_stateless/00191_aggregating_merge_tree_and_final.sql @@ -4,11 +4,11 @@ CREATE TABLE aggregating_00191 (d Date DEFAULT '2000-01-01', k UInt64, u Aggrega INSERT INTO aggregating_00191 (k, u) SELECT intDiv(number, 100) AS k, uniqState(toUInt64(number % 100)) AS u FROM (SELECT * FROM system.numbers LIMIT 1000) GROUP BY k; INSERT INTO aggregating_00191 (k, u) SELECT intDiv(number, 100) AS k, uniqState(toUInt64(number % 100) + 50) AS u FROM (SELECT * FROM system.numbers LIMIT 500, 1000) GROUP BY k; -SELECT k, finalizeAggregation(u) FROM aggregating_00191 FINAL; +SELECT k, finalizeAggregation(u) FROM aggregating_00191 FINAL order by k; OPTIMIZE TABLE aggregating_00191; SELECT k, finalizeAggregation(u) FROM aggregating_00191; -SELECT k, finalizeAggregation(u) FROM aggregating_00191 FINAL; +SELECT k, finalizeAggregation(u) FROM aggregating_00191 FINAL order by k; DROP TABLE aggregating_00191; diff --git a/tests/queries/0_stateless/00564_versioned_collapsing_merge_tree.sql b/tests/queries/0_stateless/00564_versioned_collapsing_merge_tree.sql index 5b7f59f8b6..b7824e7efd 100644 --- a/tests/queries/0_stateless/00564_versioned_collapsing_merge_tree.sql +++ b/tests/queries/0_stateless/00564_versioned_collapsing_merge_tree.sql @@ -3,7 +3,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; select 'table with 2 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, sign; optimize table mult_tab; select 'table with 2 blocks optimized'; select * from mult_tab; @@ -15,7 +15,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; select 'table with 2 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, sign; optimize table mult_tab; select 'table with 2 blocks optimized'; select * from mult_tab; @@ -27,7 +27,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 10; select 'table with 2 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, sign; optimize table mult_tab; select 'table with 2 blocks optimized'; select * from mult_tab; @@ -39,7 +39,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 2, -1, 1) from system.numbers limit 10; select 'table with 2 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, version, sign; optimize table mult_tab; select 'table with 2 blocks optimized'; select * from mult_tab; @@ -53,7 +53,7 @@ insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(numb insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 10; select 'table with 4 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, sign; optimize table mult_tab; select 'table with 4 blocks optimized'; select * from mult_tab; @@ -68,7 +68,7 @@ insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(numb insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 3 = 1, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 3 = 2, 1, -1) from system.numbers limit 10; select 'table with 5 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, sign; optimize table mult_tab; select 'table with 5 blocks optimized'; select * from mult_tab; @@ -80,7 +80,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 1000000; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 1000000; select 'table with 2 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, sign; optimize table mult_tab; select 'table with 2 blocks optimized'; select * from mult_tab; @@ -88,14 +88,14 @@ select * from mult_tab; select '-------------------------'; drop table if exists mult_tab; -create table mult_tab (date Date, value UInt64, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(date, (date), 8192, sign, version); -insert into mult_tab select '2018-01-31', number, 0, if(number < 64, 1, -1) from system.numbers limit 128; -insert into mult_tab select '2018-01-31', number, 0, if(number < 64, -1, 1) from system.numbers limit 128; +create table mult_tab (date Date, value UInt64, key UInt64, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(date, (date), 8192, sign, version); +insert into mult_tab select '2018-01-31', number, number, 0, if(number < 64, 1, -1) from system.numbers limit 128; +insert into mult_tab select '2018-01-31', number, number + 128, 0, if(number < 64, -1, 1) from system.numbers limit 128; select 'table with 2 blocks final'; -select * from mult_tab final settings max_block_size=33; +select date, value, version, sign from mult_tab final order by date, key, sign settings max_block_size=33; optimize table mult_tab; select 'table with 2 blocks optimized'; -select * from mult_tab; +select date, value, version, sign from mult_tab; select '-------------------------'; select 'Vertival merge'; @@ -106,7 +106,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; select 'table with 2 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, sign; optimize table mult_tab; select 'table with 2 blocks optimized'; select * from mult_tab; @@ -118,7 +118,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; select 'table with 2 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, sign; optimize table mult_tab; select 'table with 2 blocks optimized'; select * from mult_tab; @@ -130,7 +130,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 10; select 'table with 2 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, sign; optimize table mult_tab; select 'table with 2 blocks optimized'; select * from mult_tab; @@ -142,7 +142,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 2, -1, 1) from system.numbers limit 10; select 'table with 2 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, version, sign; optimize table mult_tab; select 'table with 2 blocks optimized'; select * from mult_tab; @@ -156,7 +156,7 @@ insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(numb insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 10; select 'table with 4 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, sign; optimize table mult_tab; select 'table with 4 blocks optimized'; select * from mult_tab; @@ -171,7 +171,7 @@ insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(numb insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 3 = 1, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 3 = 2, 1, -1) from system.numbers limit 10; select 'table with 5 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, sign; optimize table mult_tab; select 'table with 5 blocks optimized'; select * from mult_tab; @@ -183,7 +183,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 1000000; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 1000000; select 'table with 2 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, sign; optimize table mult_tab; select 'table with 2 blocks optimized'; select * from mult_tab; @@ -191,13 +191,13 @@ select * from mult_tab; select '-------------------------'; drop table if exists mult_tab; -create table mult_tab (date Date, value UInt64, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(sign, version) order by (date) settings enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 0; -insert into mult_tab select '2018-01-31', number, 0, if(number < 64, 1, -1) from system.numbers limit 128; -insert into mult_tab select '2018-01-31', number, 0, if(number < 64, -1, 1) from system.numbers limit 128; +create table mult_tab (date Date, value UInt64, key UInt64, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(sign, version) order by (date) settings enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 0; +insert into mult_tab select '2018-01-31', number, number, 0, if(number < 64, 1, -1) from system.numbers limit 128; +insert into mult_tab select '2018-01-31', number, number + 128, 0, if(number < 64, -1, 1) from system.numbers limit 128; select 'table with 2 blocks final'; -select * from mult_tab final settings max_block_size=33; +select date, value, version, sign from mult_tab final order by date, key, sign settings max_block_size=33; optimize table mult_tab; select 'table with 2 blocks optimized'; -select * from mult_tab; +select date, value, version, sign from mult_tab; DROP TABLE mult_tab; diff --git a/tests/queries/0_stateless/00915_simple_aggregate_function.sql b/tests/queries/0_stateless/00915_simple_aggregate_function.sql index 1866e2bc8c..ba4935a651 100644 --- a/tests/queries/0_stateless/00915_simple_aggregate_function.sql +++ b/tests/queries/0_stateless/00915_simple_aggregate_function.sql @@ -5,13 +5,13 @@ create table simple (id UInt64,val SimpleAggregateFunction(sum,Double)) engine=A insert into simple select number,number from system.numbers limit 10; select * from simple; -select * from simple final; +select * from simple final order by id; select toTypeName(val) from simple limit 1; -- merge insert into simple select number,number from system.numbers limit 10; -select * from simple final; +select * from simple final order by id; optimize table simple final; select * from simple; @@ -33,7 +33,7 @@ insert into simple values(1,null,'2','2.2.2.2', 2, ([1,3], [1,1])); insert into simple values(10,'10','10','10.10.10.10', 4, ([2,3], [1,1])); insert into simple values(10,'2222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222','20','20.20.20.20', 1, ([2, 4], [1,1])); -select * from simple final; +select * from simple final order by id; select toTypeName(nullable_str),toTypeName(low_str),toTypeName(ip),toTypeName(status), toTypeName(tup) from simple limit 1; optimize table simple final; diff --git a/tests/queries/0_stateless/01030_incorrect_count_summing_merge_tree.sql b/tests/queries/0_stateless/01030_incorrect_count_summing_merge_tree.sql index a9f7bf7ecd..0b5845d3b0 100644 --- a/tests/queries/0_stateless/01030_incorrect_count_summing_merge_tree.sql +++ b/tests/queries/0_stateless/01030_incorrect_count_summing_merge_tree.sql @@ -4,7 +4,7 @@ drop table if exists tst; create table tst (timestamp DateTime, val Nullable(Int8)) engine SummingMergeTree partition by toYYYYMM(timestamp) ORDER by (timestamp); insert into tst values ('2018-02-01 00:00:00', 1), ('2018-02-02 00:00:00', 2); -select * from tst final; +select * from tst final order by timestamp; select '-- 2 2'; select count() from tst; @@ -34,7 +34,7 @@ drop table if exists tst; create table tst (timestamp DateTime, val Nullable(Int8)) engine SummingMergeTree partition by toYYYYMM(timestamp) ORDER by (timestamp); insert into tst values ('2018-02-01 00:00:00', 1), ('2018-02-02 00:00:00', 2), ('2018-02-01 00:00:00', 3), ('2018-02-02 00:00:00', 4); -select * from tst final; +select * from tst final order by timestamp; select '-- 4 2'; select count() from tst; @@ -64,7 +64,7 @@ drop table if exists tst; create table tst (timestamp DateTime, val Int8) engine SummingMergeTree partition by toYYYYMM(timestamp) ORDER by (timestamp); insert into tst values ('2018-02-01 00:00:00', 1), ('2018-02-02 00:00:00', 2); -select * from tst final; +select * from tst final order by timestamp; select '-- 2 2'; select count() from tst; @@ -96,7 +96,7 @@ drop table if exists tst; create table tst (timestamp DateTime, val Int8) engine SummingMergeTree partition by toYYYYMM(timestamp) ORDER by (timestamp); insert into tst values ('2018-02-01 00:00:00', 1), ('2018-02-02 00:00:00', 2), ('2018-02-01 00:00:00', 3), ('2018-02-02 00:00:00', 4); -select * from tst final; +select * from tst final order by timestamp; select '-- 4 2'; select count() from tst; -- GitLab