diff --git a/src/Common/WeakHash.h b/src/Common/WeakHash.h index 87ddec4deff14e5bfe47a1eacfba453088c4b939..bfea75eddf189439741780a83e9cef1511ad896c 100644 --- a/src/Common/WeakHash.h +++ b/src/Common/WeakHash.h @@ -17,6 +17,8 @@ public: explicit WeakHash32(size_t size) : data(size, ~UInt32(0)) {} WeakHash32(const WeakHash32 & other) { data.assign(other.data); } + void reset(size_t size) { data.assign(size, ~UInt32(0)); } + const Container & getData() const { return data; } Container & getData() { return data; } diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 61fcf658ba8f14031db15ab7647818e05e8dd8a1..17239152055528da89793a668c8ee7b4ba6d880a 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -54,6 +54,7 @@ struct Settings : public SettingsCollection M(SettingUInt64, min_insert_block_size_bytes, (DEFAULT_INSERT_BLOCK_SIZE * 256), "Squash blocks passed to INSERT query to specified size in bytes, if blocks are not big enough.", 0) \ M(SettingUInt64, max_joined_block_size_rows, DEFAULT_BLOCK_SIZE, "Maximum block size for JOIN result (if join algorithm supports it). 0 means unlimited.", 0) \ M(SettingUInt64, max_insert_threads, 0, "The maximum number of threads to execute the INSERT SELECT query. Values 0 or 1 means that INSERT SELECT is not run in parallel. Higher values will lead to higher memory usage. Parallel INSERT SELECT has effect only if the SELECT part is run on parallel, see 'max_threads' setting.", 0) \ + M(SettingUInt64, max_final_threads, 16, "The maximum number of threads to read from table with FINAL.", 0) \ M(SettingMaxThreads, max_threads, 0, "The maximum number of threads to execute the request. By default, it is determined automatically.", 0) \ M(SettingMaxThreads, max_alter_threads, 0, "The maximum number of threads to execute the ALTER requests. By default, it is determined automatically.", 0) \ M(SettingUInt64, max_read_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, "The maximum size of the buffer to read from the filesystem.", 0) \ @@ -429,7 +430,6 @@ struct Settings : public SettingsCollection M(SettingBool, partial_merge_join, false, "Obsolete. Use join_algorithm='prefer_partial_merge' instead.", 0) \ M(SettingUInt64, max_memory_usage_for_all_queries, 0, "Obsolete. Will be removed after 2020-10-20", 0) \ - DECLARE_SETTINGS_COLLECTION(LIST_OF_SETTINGS) /** Set multiple settings from "profile" (in server configuration file (users.xml), profiles contain groups of multiple settings). diff --git a/src/Processors/Chunk.cpp b/src/Processors/Chunk.cpp index d68c2bea5ae52561ae2c460328ac3a9b72d328d3..2c3d506fa04127d571d0ceb98d90cd7fe00a0c53 100644 --- a/src/Processors/Chunk.cpp +++ b/src/Processors/Chunk.cpp @@ -46,7 +46,7 @@ Chunk::Chunk(MutableColumns columns_, UInt64 num_rows_, ChunkInfoPtr chunk_info_ Chunk Chunk::clone() const { - return Chunk(getColumns(), getNumRows()); + return Chunk(getColumns(), getNumRows(), chunk_info); } void Chunk::setColumns(Columns columns_, UInt64 num_rows_) diff --git a/src/Processors/ISimpleTransform.cpp b/src/Processors/ISimpleTransform.cpp index fc8f1ba30f4b123b9e826d51fb30db13ec54535a..ac8f2f8b7ae77150c698859169cee11f00197a75 100644 --- a/src/Processors/ISimpleTransform.cpp +++ b/src/Processors/ISimpleTransform.cpp @@ -29,10 +29,10 @@ ISimpleTransform::Status ISimpleTransform::prepare() } /// Output if has data. - if (transformed) + if (has_output) { - output.pushData(std::move(current_data)); - transformed = false; + output.pushData(std::move(output_data)); + has_output = false; if (!no_more_data_needed) return Status::PortFull; @@ -56,27 +56,17 @@ ISimpleTransform::Status ISimpleTransform::prepare() return Status::Finished; } + input.setNeeded(); + if (!input.hasData()) - { - input.setNeeded(); return Status::NeedData; - } - current_data = input.pullData(true); + input_data = input.pullData(set_input_not_needed_after_read); has_input = true; - if (current_data.exception) - { - /// Skip transform in case of exception. - has_input = false; - transformed = true; - + if (input_data.exception) /// No more data needed. Exception will be thrown (or swallowed) later. input.setNotNeeded(); - } - - if (set_input_not_needed_after_read) - input.setNotNeeded(); } /// Now transform. @@ -85,29 +75,35 @@ ISimpleTransform::Status ISimpleTransform::prepare() void ISimpleTransform::work() { - if (current_data.exception) + if (input_data.exception) + { + /// Skip transform in case of exception. + output_data = std::move(input_data); + has_input = false; + has_output = true; return; + } try { - transform(current_data.chunk); + transform(input_data.chunk, output_data.chunk); } catch (DB::Exception &) { - current_data.exception = std::current_exception(); - transformed = true; + output_data.exception = std::current_exception(); + has_output = true; has_input = false; return; } has_input = !needInputData(); - if (!skip_empty_chunks || current_data.chunk) - transformed = true; + if (!skip_empty_chunks || output_data.chunk) + has_output = true; - if (transformed && !current_data.chunk) + if (has_output && !output_data.chunk && getOutputPort().getHeader()) /// Support invariant that chunks must have the same number of columns as header. - current_data.chunk = Chunk(getOutputPort().getHeader().cloneEmpty().getColumns(), 0); + output_data.chunk = Chunk(getOutputPort().getHeader().cloneEmpty().getColumns(), 0); } } diff --git a/src/Processors/ISimpleTransform.h b/src/Processors/ISimpleTransform.h index 1f0c5a5b0405aac60b979f3b972150896f94d084..ee92b574d7c0e9a58e9947dfb2ab4b184c0099fe 100644 --- a/src/Processors/ISimpleTransform.h +++ b/src/Processors/ISimpleTransform.h @@ -6,6 +6,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int NOT_IMPLEMENTED; +} + /** Has one input and one output. * Simply pull a block from input, transform it, and push it to output. */ @@ -15,18 +20,29 @@ protected: InputPort & input; OutputPort & output; - Port::Data current_data; + Port::Data input_data; + Port::Data output_data; bool has_input = false; - bool transformed = false; + bool has_output = false; bool no_more_data_needed = false; const bool skip_empty_chunks; /// Set input port NotNeeded after chunk was pulled. /// Input port will become needed again only after data was transformed. /// This allows to escape caching chunks in input port, which can lead to uneven data distribution. - bool set_input_not_needed_after_read = false; + bool set_input_not_needed_after_read = true; + + virtual void transform(Chunk &) + { + throw Exception("Method transform is not implemented for " + getName(), ErrorCodes::NOT_IMPLEMENTED); + } + + virtual void transform(Chunk & input_chunk, Chunk & output_chunk) + { + transform(input_chunk); + output_chunk.swap(input_chunk); + } - virtual void transform(Chunk & chunk) = 0; virtual bool needInputData() const { return true; } void stopReading() { no_more_data_needed = true; } diff --git a/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp index 269c69f2747a4564617180ab2090582b7d7a9d00..be9bf3e354ce9b408b8ea02a7962dad18c7fca9e 100644 --- a/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp @@ -289,10 +289,10 @@ void AggregatingSortedAlgorithm::initialize(Chunks chunks) initializeQueue(std::move(chunks)); } -void AggregatingSortedAlgorithm::consume(Chunk chunk, size_t source_num) +void AggregatingSortedAlgorithm::consume(Chunk & chunk, size_t source_num) { preprocessChunk(chunk, columns_definition); - updateCursor(std::move(chunk), source_num); + updateCursor(chunk, source_num); } IMergingAlgorithm::Status AggregatingSortedAlgorithm::merge() diff --git a/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.h b/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.h index b4819ad030c3f45c214c1f57bc6283b21dbf5c0d..fe1710adc8b65566f71aa548f0d033b6b41a4dec 100644 --- a/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.h +++ b/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.h @@ -20,7 +20,7 @@ public: SortDescription description_, size_t max_block_size); void initialize(Chunks chunks) override; - void consume(Chunk chunk, size_t source_num) override; + void consume(Chunk & chunk, size_t source_num) override; Status merge() override; struct SimpleAggregateDescription; diff --git a/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp index cd3a193105d728d7886ed79dba10a8ebc0277d71..8e799664faec55496076eb0f9c1174e75f9bdcc3 100644 --- a/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp @@ -23,6 +23,7 @@ CollapsingSortedAlgorithm::CollapsingSortedAlgorithm( size_t num_inputs, SortDescription description_, const String & sign_column, + bool only_positive_sign_, size_t max_block_size, WriteBuffer * out_row_sources_buf_, bool use_average_block_sizes, @@ -30,6 +31,7 @@ CollapsingSortedAlgorithm::CollapsingSortedAlgorithm( : IMergingAlgorithmWithSharedChunks(num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs) , merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size) , sign_column_number(header.getPositionByName(sign_column)) + , only_positive_sign(only_positive_sign_) , log(log_) { } @@ -76,7 +78,7 @@ void CollapsingSortedAlgorithm::insertRows() if (last_is_positive || count_positive != count_negative) { - if (count_positive <= count_negative) + if (count_positive <= count_negative && !only_positive_sign) { insertRow(first_negative_row); diff --git a/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h b/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h index 4158f55a7cd39c5d0200ed98421bd4374b366368..3cbe95d96e1a4e7c7165a2745bab1f8a53c76569 100644 --- a/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h +++ b/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h @@ -31,6 +31,7 @@ public: size_t num_inputs, SortDescription description_, const String & sign_column, + bool only_positive_sign_, /// For select final. Skip rows with sum(sign) < 0. size_t max_block_size, WriteBuffer * out_row_sources_buf_, bool use_average_block_sizes, @@ -42,6 +43,7 @@ private: MergedData merged_data; const size_t sign_column_number; + const bool only_positive_sign; static constexpr size_t max_row_refs = 4; /// first_negative, last_positive, last, current. RowRef first_negative_row; diff --git a/src/Processors/Merges/Algorithms/IMergingAlgorithm.h b/src/Processors/Merges/Algorithms/IMergingAlgorithm.h index 263acee4c2d00a46f05a976bd3b526142eb36596..b49209e462e36df25e3dd51a8719107c786b5f64 100644 --- a/src/Processors/Merges/Algorithms/IMergingAlgorithm.h +++ b/src/Processors/Merges/Algorithms/IMergingAlgorithm.h @@ -21,7 +21,7 @@ public: }; virtual void initialize(Chunks chunks) = 0; - virtual void consume(Chunk chunk, size_t source_num) = 0; + virtual void consume(Chunk & chunk, size_t source_num) = 0; virtual Status merge() = 0; IMergingAlgorithm() = default; diff --git a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.cpp b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.cpp index 6777109982e52f9a3039dd1e4d2c2b6d1fdd4c34..751a08ce69f6e2ea9f232b91d4ef58c472a7cc95 100644 --- a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.cpp +++ b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.cpp @@ -28,15 +28,15 @@ void IMergingAlgorithmWithDelayedChunk::initializeQueue(Chunks chunks) queue = SortingHeap(cursors); } -void IMergingAlgorithmWithDelayedChunk::updateCursor(Chunk chunk, size_t source_num) +void IMergingAlgorithmWithDelayedChunk::updateCursor(Chunk & chunk, size_t source_num) { auto & source_chunk = source_chunks[source_num]; /// Extend lifetime of last chunk. - last_chunk = std::move(source_chunk); + last_chunk.swap(source_chunk); last_chunk_sort_columns = std::move(cursors[source_num].sort_columns); - source_chunk = std::move(chunk); + source_chunk.swap(chunk); cursors[source_num].reset(source_chunk.getColumns(), {}); queue.push(cursors[source_num]); diff --git a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.h b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.h index d02b9dfcb7a5371f18fa91c346435f54bb68ecce..f7d5f6302388698f4cd64c6dbb7a7d82e01eff8d 100644 --- a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.h +++ b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.h @@ -24,7 +24,7 @@ protected: ColumnRawPtrs last_chunk_sort_columns; /// Point to last_chunk if valid. void initializeQueue(Chunks chunks); - void updateCursor(Chunk chunk, size_t source_num); + void updateCursor(Chunk & chunk, size_t source_num); private: /// Chunks currently being merged. diff --git a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.cpp b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.cpp index bcea74b5f38f2ea302c90531941d0c4bbff9653f..1fe61653ecc2c4aabf559e8013e87be52db810ca 100644 --- a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.cpp +++ b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.cpp @@ -39,7 +39,7 @@ void IMergingAlgorithmWithSharedChunks::initialize(Chunks chunks) auto & source_chunk = source_chunks[source_num]; - source_chunk = chunk_allocator.alloc(std::move(chunks[source_num])); + source_chunk = chunk_allocator.alloc(chunks[source_num]); cursors[source_num] = SortCursorImpl(source_chunk->getColumns(), description, source_num); source_chunk->all_columns = cursors[source_num].all_columns; @@ -49,12 +49,12 @@ void IMergingAlgorithmWithSharedChunks::initialize(Chunks chunks) queue = SortingHeap(cursors); } -void IMergingAlgorithmWithSharedChunks::consume(Chunk chunk, size_t source_num) +void IMergingAlgorithmWithSharedChunks::consume(Chunk & chunk, size_t source_num) { prepareChunk(chunk); auto & source_chunk = source_chunks[source_num]; - source_chunk = chunk_allocator.alloc(std::move(chunk)); + source_chunk = chunk_allocator.alloc(chunk); cursors[source_num].reset(source_chunk->getColumns(), {}); source_chunk->all_columns = cursors[source_num].all_columns; diff --git a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.h b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.h index 1ef7f540f9613e1f20eee90451f59ea8175e2e7f..a3dbadc458dfff007546b02547b772f1ea92984a 100644 --- a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.h +++ b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.h @@ -16,7 +16,7 @@ public: size_t max_row_refs); void initialize(Chunks chunks) override; - void consume(Chunk chunk, size_t source_num) override; + void consume(Chunk & chunk, size_t source_num) override; private: SortDescription description; diff --git a/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.cpp index d06e3c1179a65cb64886e21d19468dbca937cc61..78221b4825596ee39123a9ac57802d28b49642c2 100644 --- a/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.cpp @@ -74,10 +74,10 @@ void MergingSortedAlgorithm::initialize(Chunks chunks) queue_without_collation = SortingHeap(cursors); } -void MergingSortedAlgorithm::consume(Chunk chunk, size_t source_num) +void MergingSortedAlgorithm::consume(Chunk & chunk, size_t source_num) { prepareChunk(chunk); - source_chunks[source_num] = std::move(chunk); + source_chunks[source_num].swap(chunk); cursors[source_num].reset(source_chunks[source_num].getColumns(), {}); if (has_collation) diff --git a/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.h b/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.h index 6ff48b520bd333b65c8ae876a8a256240c6ad3fb..5b361c1000e8d9296704e75e2337f275d4723544 100644 --- a/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.h +++ b/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.h @@ -23,7 +23,7 @@ public: void addInput(); void initialize(Chunks chunks) override; - void consume(Chunk chunk, size_t source_num) override; + void consume(Chunk & chunk, size_t source_num) override; Status merge() override; const MergedData & getMergedData() const { return merged_data; } diff --git a/src/Processors/Merges/Algorithms/RowRef.h b/src/Processors/Merges/Algorithms/RowRef.h index 5aeae952067b810bf46e8fb1bd7fc17f3090749f..658442e34cc353b8ff11b4797cf9dfdffd280484 100644 --- a/src/Processors/Merges/Algorithms/RowRef.h +++ b/src/Processors/Merges/Algorithms/RowRef.h @@ -63,7 +63,7 @@ public: free_chunks.push_back(i); } - SharedChunkPtr alloc(Chunk && chunk) + SharedChunkPtr alloc(Chunk & chunk) { if (free_chunks.empty()) throw Exception("Not enough space in SharedChunkAllocator. " @@ -72,7 +72,7 @@ public: auto pos = free_chunks.back(); free_chunks.pop_back(); - chunks[pos] = std::move(chunk); + chunks[pos].swap(chunk); chunks[pos].position = pos; chunks[pos].allocator = this; @@ -110,9 +110,9 @@ private: } /// Release memory. It is not obligatory. - ptr->clear(); - ptr->all_columns.clear(); - ptr->sort_columns.clear(); +// ptr->clear(); +// ptr->all_columns.clear(); +// ptr->sort_columns.clear(); free_chunks.push_back(ptr->position); } diff --git a/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp index 5bcbf778d640757e34bd0918fa7aa2be8759d98a..89154044ae5211b100568d9a6206cca7f4e7af7b 100644 --- a/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp @@ -632,10 +632,10 @@ void SummingSortedAlgorithm::initialize(Chunks chunks) initializeQueue(std::move(chunks)); } -void SummingSortedAlgorithm::consume(Chunk chunk, size_t source_num) +void SummingSortedAlgorithm::consume(Chunk & chunk, size_t source_num) { preprocessChunk(chunk); - updateCursor(std::move(chunk), source_num); + updateCursor(chunk, source_num); } IMergingAlgorithm::Status SummingSortedAlgorithm::merge() diff --git a/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.h b/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.h index a38df215ccc9131e7d0790d658aca215ede4adc0..fc5431f1a08094d77b4825cd336c909b91ab7edf 100644 --- a/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.h +++ b/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.h @@ -23,7 +23,7 @@ public: size_t max_block_size); void initialize(Chunks chunks) override; - void consume(Chunk chunk, size_t source_num) override; + void consume(Chunk & chunk, size_t source_num) override; Status merge() override; struct AggregateDescription; diff --git a/src/Processors/Merges/CollapsingSortedTransform.h b/src/Processors/Merges/CollapsingSortedTransform.h index d4f40c60938821b3ce668e8e77bcd9345e87d259..cdf7c4a160733515f66b2030bfa5f2ab7e2c250c 100644 --- a/src/Processors/Merges/CollapsingSortedTransform.h +++ b/src/Processors/Merges/CollapsingSortedTransform.h @@ -15,6 +15,7 @@ public: size_t num_inputs, SortDescription description_, const String & sign_column, + bool only_positive_sign, size_t max_block_size, WriteBuffer * out_row_sources_buf_ = nullptr, bool use_average_block_sizes = false) @@ -24,6 +25,7 @@ public: num_inputs, std::move(description_), sign_column, + only_positive_sign, max_block_size, out_row_sources_buf_, use_average_block_sizes, diff --git a/src/Processors/Merges/IMergingTransform.cpp b/src/Processors/Merges/IMergingTransform.cpp index 0dc4cd419918416fb84fd3bcfbe7d2fac149b373..f42bd44f6ca153c9ded504f470bbfa11979e309e 100644 --- a/src/Processors/Merges/IMergingTransform.cpp +++ b/src/Processors/Merges/IMergingTransform.cpp @@ -1,4 +1,5 @@ #include +#include namespace DB { @@ -135,7 +136,6 @@ IProcessor::Status IMergingTransformBase::prepare() if (state.is_finished) { - if (is_port_full) return Status::PortFull; @@ -158,11 +158,11 @@ IProcessor::Status IMergingTransformBase::prepare() if (!input.hasData()) return Status::NeedData; - auto chunk = input.pull(); - if (!chunk.hasRows() && !input.isFinished()) + state.input_chunk = input.pull(); + if (!state.input_chunk.hasRows() && !input.isFinished()) return Status::NeedData; - state.input_chunk = std::move(chunk); + state.has_input = true; } state.need_data = false; @@ -174,4 +174,83 @@ IProcessor::Status IMergingTransformBase::prepare() return Status::Ready; } +static void filterChunk(Chunk & chunk, size_t selector_position) +{ + if (!chunk.getChunkInfo()) + throw Exception("IMergingTransformBase expected ChunkInfo for input chunk", ErrorCodes::LOGICAL_ERROR); + + const auto * chunk_info = typeid_cast(chunk.getChunkInfo().get()); + if (!chunk_info) + throw Exception("IMergingTransformBase expected SelectorInfo for input chunk", ErrorCodes::LOGICAL_ERROR); + + auto & selector = chunk_info->selector; + + IColumn::Filter filter; + filter.resize_fill(selector.size()); + + size_t num_rows = chunk.getNumRows(); + auto columns = chunk.detachColumns(); + + size_t num_result_rows = 0; + + for (size_t row = 0; row < num_rows; ++row) + { + if (selector[row] == selector_position) + { + ++num_result_rows; + filter[row] = 1; + } + } + + for (auto & column : columns) + column = column->filter(filter, num_result_rows); + + chunk.clear(); + chunk.setColumns(std::move(columns), num_result_rows); +} + +bool IMergingTransformBase::filterChunks() +{ + if (state.selector_position < 0) + return true; + + bool has_empty_chunk = false; + + if (!state.init_chunks.empty()) + { + for (size_t i = 0; i < input_states.size(); ++i) + { + auto & chunk = state.init_chunks[i]; + if (!chunk || input_states[i].is_filtered) + continue; + + filterChunk(chunk, state.selector_position); + + if (!chunk.hasRows()) + { + chunk.clear(); + has_empty_chunk = true; + input_states[i].is_initialized = false; + is_initialized = false; + } + else + input_states[i].is_filtered = true; + } + } + + if (state.has_input) + { + filterChunk(state.input_chunk, state.selector_position); + if (!state.input_chunk.hasRows()) + { + state.has_input = false; + state.need_data = true; + has_empty_chunk = true; + } + } + + return !has_empty_chunk; +} + + } diff --git a/src/Processors/Merges/IMergingTransform.h b/src/Processors/Merges/IMergingTransform.h index f9c2dba8271839172e07572283e74731a809eeb2..939faa48d26ececccefda2b3cd0fdbc9f7a0aae2 100644 --- a/src/Processors/Merges/IMergingTransform.h +++ b/src/Processors/Merges/IMergingTransform.h @@ -18,6 +18,10 @@ public: const Block & output_header, bool have_all_inputs_); + virtual ~IMergingTransformBase() = default; + + OutputPort & getOutputPort() { return outputs.front(); } + /// Methods to add additional input port. It is possible to do only before the first call of `prepare`. void addInput(); /// Need to be called after all inputs are added. (only if have_all_inputs was not specified). @@ -25,20 +29,29 @@ public: Status prepare() override; + /// Set position which will be used in selector if input chunk has attached SelectorInfo (see SelectorInfo.h). + /// Columns will be filtered, keep only rows labeled with this position. + /// It is used in parallel final. + void setSelectorPosition(size_t position) { state.selector_position = position; } + protected: virtual void onNewInput(); /// Is called when new input is added. Only if have_all_inputs = false. virtual void onFinish() {} /// Is called when all data is processed. + bool filterChunks(); /// Filter chunks if selector position was set. For parallel final. + /// Processor state. struct State { Chunk output_chunk; Chunk input_chunk; + bool has_input = false; bool is_finished = false; bool need_data = false; size_t next_input_to_read = 0; Chunks init_chunks; + ssize_t selector_position = -1; }; State state; @@ -50,6 +63,7 @@ private: InputPort & port; bool is_initialized = false; + bool is_filtered = false; }; std::vector input_states; @@ -78,14 +92,18 @@ public: void work() override { + if (!filterChunks()) + return; + if (!state.init_chunks.empty()) algorithm.initialize(std::move(state.init_chunks)); - if (state.input_chunk) + if (state.has_input) { // std::cerr << "Consume chunk with " << state.input_chunk.getNumRows() // << " for input " << state.next_input_to_read << std::endl; - algorithm.consume(std::move(state.input_chunk), state.next_input_to_read); + algorithm.consume(state.input_chunk, state.next_input_to_read); + state.has_input = false; } IMergingAlgorithm::Status status = algorithm.merge(); @@ -120,4 +138,6 @@ private: using IMergingTransformBase::state; }; +using MergingTransformPtr = std::shared_ptr; + } diff --git a/src/Processors/Pipe.cpp b/src/Processors/Pipe.cpp index d9b21dbc85402aa31481dd70b3a0bd53c6d09fe8..5d92e909a52c6dd48a74a172cb472ac1c0948463 100644 --- a/src/Processors/Pipe.cpp +++ b/src/Processors/Pipe.cpp @@ -96,6 +96,15 @@ Pipe::Pipe(Pipes && pipes, ProcessorPtr transform) processors.emplace_back(std::move(transform)); } +Pipe::Pipe(OutputPort * port) : output_port(port) +{ +} + +void Pipe::addProcessors(const Processors & processors_) +{ + processors.insert(processors.end(), processors_.begin(), processors_.end()); +} + void Pipe::addSimpleTransform(ProcessorPtr transform) { checkSimpleTransform(*transform); diff --git a/src/Processors/Pipe.h b/src/Processors/Pipe.h index 42bbd4e06d06bf0d735958046a23b9a9849a9f1c..984fa7605c63bb6570865bad37c3933d8a661b93 100644 --- a/src/Processors/Pipe.h +++ b/src/Processors/Pipe.h @@ -22,6 +22,8 @@ public: /// Transform must have the number of inputs equals to the number of pipes. And single output. /// Will connect pipes outputs with transform inputs automatically. Pipe(Pipes && pipes, ProcessorPtr transform); + /// Create pipe from output port. If pipe was created that way, it possibly will not have tree shape. + Pipe(OutputPort * port); Pipe(const Pipe & other) = delete; Pipe(Pipe && other) = default; @@ -29,6 +31,9 @@ public: Pipe & operator=(const Pipe & other) = delete; Pipe & operator=(Pipe && other) = default; + /// Append processors to pipe. After this, it possibly will not have tree shape. + void addProcessors(const Processors & processors_); + OutputPort & getPort() const { return *output_port; } const Block & getHeader() const { return output_port->getHeader(); } diff --git a/src/Processors/Transforms/AddingSelectorTransform.cpp b/src/Processors/Transforms/AddingSelectorTransform.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f75a5920072634cb858ec485f02f90be2055da63 --- /dev/null +++ b/src/Processors/Transforms/AddingSelectorTransform.cpp @@ -0,0 +1,76 @@ +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +AddingSelectorTransform::AddingSelectorTransform( + const Block & header, size_t num_outputs_, ColumnNumbers key_columns_) + : ISimpleTransform(header, header, false) + , num_outputs(num_outputs_) + , key_columns(std::move(key_columns_)) + , hash(0) +{ + setInputNotNeededAfterRead(false); + + if (num_outputs <= 1) + throw Exception("SplittingByHashTransform expects more than 1 outputs, got " + std::to_string(num_outputs), + ErrorCodes::LOGICAL_ERROR); + + if (key_columns.empty()) + throw Exception("SplittingByHashTransform cannot split by empty set of key columns", + ErrorCodes::LOGICAL_ERROR); + + for (auto & column : key_columns) + if (column >= header.columns()) + throw Exception("Invalid column number: " + std::to_string(column) + + ". There is only " + std::to_string(header.columns()) + " columns in header", + ErrorCodes::LOGICAL_ERROR); +} + +static void calculateWeakHash32(const Chunk & chunk, const ColumnNumbers & key_columns, WeakHash32 & hash) +{ + auto num_rows = chunk.getNumRows(); + const auto & columns = chunk.getColumns(); + + hash.reset(num_rows); + + for (const auto & column_number : key_columns) + columns[column_number]->updateWeakHash32(hash); +} + +static IColumn::Selector fillSelector(const WeakHash32 & hash, size_t num_outputs) +{ + /// Row from interval [(2^32 / num_outputs) * i, (2^32 / num_outputs) * (i + 1)) goes to bucket with number i. + + const auto & hash_data = hash.getData(); + size_t num_rows = hash_data.size(); + IColumn::Selector selector(num_rows); + + for (size_t row = 0; row < num_rows; ++row) + { + selector[row] = hash_data[row]; /// [0, 2^32) + selector[row] *= num_outputs; /// [0, num_outputs * 2^32), selector stores 64 bit values. + selector[row] >>= 32u; /// [0, num_outputs) + } + + return selector; +} + +void AddingSelectorTransform::transform(Chunk & input_chunk, Chunk & output_chunk) +{ + auto chunk_info = std::make_shared(); + + calculateWeakHash32(input_chunk, key_columns, hash); + chunk_info->selector = fillSelector(hash, num_outputs); + + input_chunk.swap(output_chunk); + output_chunk.setChunkInfo(std::move(chunk_info)); +} + +} diff --git a/src/Processors/Transforms/AddingSelectorTransform.h b/src/Processors/Transforms/AddingSelectorTransform.h new file mode 100644 index 0000000000000000000000000000000000000000..e82dcc964d00f179a63ae71176a8c286811ec5f0 --- /dev/null +++ b/src/Processors/Transforms/AddingSelectorTransform.h @@ -0,0 +1,26 @@ +#pragma once +#include +#include +#include +#include + +namespace DB +{ + +/// Add IColumn::Selector to chunk (see SelectorInfo.h). +/// Selector is filled by formula (WeakHash(key_columns) * num_outputs / MAX_INT). +class AddingSelectorTransform : public ISimpleTransform +{ +public: + AddingSelectorTransform(const Block & header, size_t num_outputs_, ColumnNumbers key_columns_); + String getName() const override { return "SplittingByHash"; } + void transform(Chunk & input_chunk, Chunk & output_chunk) override; + +private: + size_t num_outputs; + ColumnNumbers key_columns; + + WeakHash32 hash; +}; + +} diff --git a/src/Processors/Transforms/CopyTransform.cpp b/src/Processors/Transforms/CopyTransform.cpp new file mode 100644 index 0000000000000000000000000000000000000000..c9047c942d6738fe51929a2636881d2a194016ad --- /dev/null +++ b/src/Processors/Transforms/CopyTransform.cpp @@ -0,0 +1,108 @@ +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +CopyTransform::CopyTransform(const Block & header, size_t num_outputs) + : IProcessor(InputPorts(1, header), OutputPorts(num_outputs, header)) +{ + if (num_outputs <= 1) + throw Exception("CopyTransform expects more than 1 outputs, got " + std::to_string(num_outputs), ErrorCodes::LOGICAL_ERROR); +} + +IProcessor::Status CopyTransform::prepare() +{ + Status status = Status::Ready; + + while (status == Status::Ready) + { + status = !has_data ? prepareConsume() + : prepareGenerate(); + } + + return status; +} + +IProcessor::Status CopyTransform::prepareConsume() +{ + auto & input = getInputPort(); + + /// Check all outputs are finished or ready to get data. + + bool all_finished = true; + for (auto & output : outputs) + { + if (output.isFinished()) + continue; + + all_finished = false; + } + + if (all_finished) + { + input.close(); + return Status::Finished; + } + + /// Try get chunk from input. + + if (input.isFinished()) + { + for (auto & output : outputs) + output.finish(); + + return Status::Finished; + } + + input.setNeeded(); + if (!input.hasData()) + return Status::NeedData; + + chunk = input.pull(); + has_data = true; + was_output_processed.assign(outputs.size(), false); + + return Status::Ready; +} + +IProcessor::Status CopyTransform::prepareGenerate() +{ + bool all_outputs_processed = true; + + size_t chunk_number = 0; + for (auto & output : outputs) + { + auto & was_processed = was_output_processed[chunk_number]; + ++chunk_number; + + if (was_processed) + continue; + + if (output.isFinished()) + continue; + + if (!output.canPush()) + { + all_outputs_processed = false; + continue; + } + + output.push(chunk.clone()); + was_processed = true; + } + + if (all_outputs_processed) + { + has_data = false; + return Status::Ready; + } + + return Status::PortFull; +} + +} diff --git a/src/Processors/Transforms/CopyTransform.h b/src/Processors/Transforms/CopyTransform.h new file mode 100644 index 0000000000000000000000000000000000000000..cf56fdf10d921d20a6d63d7c44927cc8c68ec9b0 --- /dev/null +++ b/src/Processors/Transforms/CopyTransform.h @@ -0,0 +1,28 @@ +#pragma once +#include + +namespace DB +{ + +/// Transform which has single input and num_outputs outputs. +/// Read chunk from input and copy it to all outputs. +class CopyTransform : public IProcessor +{ +public: + CopyTransform(const Block & header, size_t num_outputs); + + String getName() const override { return "Copy"; } + Status prepare() override; + + InputPort & getInputPort() { return inputs.front(); } + +private: + Chunk chunk; + bool has_data = false; + std::vector was_output_processed; + + Status prepareGenerate(); + Status prepareConsume(); +}; + +} diff --git a/src/Processors/Transforms/SelectorInfo.h b/src/Processors/Transforms/SelectorInfo.h new file mode 100644 index 0000000000000000000000000000000000000000..2876d64ed281231d132ed2e52c88acb6c7cf546a --- /dev/null +++ b/src/Processors/Transforms/SelectorInfo.h @@ -0,0 +1,14 @@ +#pragma once +#include +#include + +namespace DB +{ + +/// ChunkInfo with IColumn::Selector. It is added by AddingSelectorTransform. +struct SelectorInfo : public ChunkInfo +{ + IColumn::Selector selector; +}; + +} diff --git a/src/Processors/ya.make b/src/Processors/ya.make index 6469532a8e8855bda0c0139f5144d527dd1185b1..7818f2fb183deb0c1a4de42f9611cc1fba815e0e 100644 --- a/src/Processors/ya.make +++ b/src/Processors/ya.make @@ -106,8 +106,10 @@ SRCS( Sources/SourceFromInputStream.cpp Sources/SourceWithProgress.cpp Transforms/AddingMissedTransform.cpp + Transforms/AddingSelectorTransform.cpp Transforms/AggregatingTransform.cpp Transforms/ConvertingTransform.cpp + Transforms/CopyTransform.cpp Transforms/CreatingSetsTransform.cpp Transforms/CubeTransform.cpp Transforms/DistinctTransform.cpp diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 2ab43f8f56c7033eb82a68b373a58e8f00bab34c..f1c1904256e79dbaa1bad495b3765fa7316dceeb 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -752,7 +752,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor case MergeTreeData::MergingParams::Collapsing: merged_transform = std::make_unique( - header, pipes.size(), sort_description, data.merging_params.sign_column, + header, pipes.size(), sort_description, data.merging_params.sign_column, false, merge_block_size, rows_sources_write_buf.get(), blocks_are_granules_size); break; diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 580c95b34dd21202883a4ac2210a4c447e0b43fa..4eeb954bd7d0f1fdfaad2458c670910d37d55674 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -41,22 +41,25 @@ namespace std #include #include #include -#include #include #include -#include -#include -#include -#include -#include -#include -#include +#include +#include +#include #include +#include +#include #include +#include #include -#include #include -#include +#include +#include +#include +#include +#include +#include +#include namespace ProfileEvents { @@ -617,6 +620,7 @@ Pipes MergeTreeDataSelectExecutor::readFromParts( res = spreadMarkRangesAmongStreamsFinal( std::move(parts_with_ranges), + num_streams, column_names_to_read, max_block_size, settings.use_uncompressed_cache, @@ -1017,6 +1021,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder( Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( RangesInDataParts && parts, + size_t num_streams, const Names & column_names, UInt64 max_block_size, bool use_uncompressed_cache, @@ -1074,71 +1079,122 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( for (size_t i = 0; i < sort_columns_size; ++i) sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1); - /// Converts pipes to BlockInputsStreams. - /// It is temporary, till not all merging streams are implemented as processors. - auto streams_to_merge = [&pipes]() + auto get_merging_processor = [&]() -> MergingTransformPtr { - size_t num_streams = pipes.size(); + switch (data.merging_params.mode) + { + case MergeTreeData::MergingParams::Ordinary: + { + return std::make_shared(header, pipes.size(), + sort_description, max_block_size); + } - BlockInputStreams streams; - streams.reserve(num_streams); + case MergeTreeData::MergingParams::Collapsing: + return std::make_shared(header, pipes.size(), + sort_description, data.merging_params.sign_column, true, max_block_size); - for (size_t i = 0; i < num_streams; ++i) - streams.emplace_back(std::make_shared(std::move(pipes[i]))); + case MergeTreeData::MergingParams::Summing: + return std::make_shared(header, pipes.size(), + sort_description, data.merging_params.columns_to_sum, max_block_size); + + case MergeTreeData::MergingParams::Aggregating: + return std::make_shared(header, pipes.size(), + sort_description, max_block_size); + + case MergeTreeData::MergingParams::Replacing: + return std::make_shared(header, pipes.size(), + sort_description, data.merging_params.version_column, max_block_size); - pipes.clear(); - return streams; + case MergeTreeData::MergingParams::VersionedCollapsing: + return std::make_shared(header, pipes.size(), + sort_description, data.merging_params.sign_column, max_block_size); + + case MergeTreeData::MergingParams::Graphite: + throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR); + } + + __builtin_unreachable(); }; - BlockInputStreamPtr merged; - ProcessorPtr merged_processor; - switch (data.merging_params.mode) + if (num_streams > settings.max_final_threads) + num_streams = settings.max_final_threads; + + if (num_streams <= 1 || sort_description.empty() || query_info.force_tree_shaped_pipeline) + { + + Pipe pipe(std::move(pipes), get_merging_processor()); + pipes = Pipes(); + pipes.emplace_back(std::move(pipe)); + + return pipes; + } + + ColumnNumbers key_columns; + key_columns.reserve(sort_description.size()); + + for (auto & desc : sort_description) + { + if (!desc.column_name.empty()) + key_columns.push_back(header.getPositionByName(desc.column_name)); + else + key_columns.emplace_back(desc.column_number); + } + + Processors selectors; + Processors copiers; + selectors.reserve(pipes.size()); + + for (auto & pipe : pipes) + { + auto selector = std::make_shared(pipe.getHeader(), num_streams, key_columns); + auto copier = std::make_shared(pipe.getHeader(), num_streams); + connect(pipe.getPort(), selector->getInputPort()); + connect(selector->getOutputPort(), copier->getInputPort()); + selectors.emplace_back(std::move(selector)); + copiers.emplace_back(std::move(copier)); + } + + Processors merges; + std::vector input_ports; + merges.reserve(num_streams); + input_ports.reserve(num_streams); + + for (size_t i = 0; i < num_streams; ++i) { - case MergeTreeData::MergingParams::Ordinary: + auto merge = get_merging_processor(); + merge->setSelectorPosition(i); + input_ports.emplace_back(merge->getInputs().begin()); + merges.emplace_back(std::move(merge)); + } + + /// Connect outputs of i-th splitter with i-th input port of every merge. + for (auto & resize : copiers) + { + size_t input_num = 0; + for (auto & output : resize->getOutputs()) { - merged_processor = std::make_shared(header, pipes.size(), - sort_description, max_block_size); - break; + connect(output, *input_ports[input_num]); + ++input_ports[input_num]; + ++input_num; } - - case MergeTreeData::MergingParams::Collapsing: - merged = std::make_shared( - streams_to_merge(), sort_description, data.merging_params.sign_column); - break; - - case MergeTreeData::MergingParams::Summing: - merged_processor = std::make_shared(header, pipes.size(), - sort_description, data.merging_params.columns_to_sum, max_block_size); - break; - - case MergeTreeData::MergingParams::Aggregating: - merged_processor = std::make_shared(header, pipes.size(), - sort_description, max_block_size); - break; - - case MergeTreeData::MergingParams::Replacing: /// TODO Make ReplacingFinalBlockInputStream - merged_processor = std::make_shared(header, pipes.size(), - sort_description, data.merging_params.version_column, max_block_size); - break; - - case MergeTreeData::MergingParams::VersionedCollapsing: /// TODO Make VersionedCollapsingFinalBlockInputStream - merged_processor = std::make_shared(header, pipes.size(), - sort_description, data.merging_params.sign_column, max_block_size); - break; - - case MergeTreeData::MergingParams::Graphite: - throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR); } - if (merged_processor) + Processors processors; + for (auto & pipe : pipes) { - Pipe pipe(std::move(pipes), std::move(merged_processor)); - pipes = Pipes(); - pipes.emplace_back(std::move(pipe)); + auto pipe_processors = std::move(pipe).detachProcessors(); + processors.insert(processors.end(), pipe_processors.begin(), pipe_processors.end()); } - if (merged) - pipes.emplace_back(std::make_shared(merged)); + pipes.clear(); + pipes.reserve(num_streams); + for (auto & merge : merges) + pipes.emplace_back(&merge->getOutputs().front()); + + pipes.front().addProcessors(processors); + pipes.front().addProcessors(selectors); + pipes.front().addProcessors(copiers); + pipes.front().addProcessors(merges); return pipes; } diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h index e6eb26da7e3afe39b468aa6d3f8274f6253f4b0c..7ded8fcfad5917a71184134a4ccd1d78f0d8794f 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h @@ -71,6 +71,7 @@ private: Pipes spreadMarkRangesAmongStreamsFinal( RangesInDataParts && parts, + size_t num_streams, const Names & column_names, UInt64 max_block_size, bool use_uncompressed_cache, diff --git a/tests/performance/parallel_final.xml b/tests/performance/parallel_final.xml new file mode 100644 index 0000000000000000000000000000000000000000..f5e7ff51f6e2217fd341063deeb228dd53d787cf --- /dev/null +++ b/tests/performance/parallel_final.xml @@ -0,0 +1,51 @@ + + + + 1024 + + + + + collapsing + + collapsing_final_16p_ord + collapsing_final_16p_rnd + collapsing_final_16p_int_keys_ord + collapsing_final_16p_int_keys_rnd + collapsing_final_16p_str_keys_ord + collapsing_final_16p_str_keys_rnd + collapsing_final_1024p_ord + collapsing_final_1024p_rnd + + + + + create table collapsing_final_16p_ord (key1 UInt32, key2 String, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1, key2) partition by intDiv(key1, 8192 * 64) + create table collapsing_final_16p_rnd (key1 UInt32, key2 String, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1, key2) partition by key1 % 16 + create table collapsing_final_16p_int_keys_ord (key1 UInt32, key2 UInt32, key3 UInt32, key4 UInt32, key5 UInt32, key6 UInt32, key7 UInt32, key8 UInt32, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1, key2, key3, key4, key5, key6, key7, key8) partition by intDiv(key1, 8192 * 64) + create table collapsing_final_16p_int_keys_rnd (key1 UInt32, key2 UInt32, key3 UInt32, key4 UInt32, key5 UInt32, key6 UInt32, key7 UInt32, key8 UInt32, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1, key2, key3, key4, key5, key6, key7, key8) partition by key1 % 16 + create table collapsing_final_16p_str_keys_ord (key1 UInt32, key2 String, key3 String, key4 String, key5 String, key6 String, key7 String, key8 String, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1, key2, key3, key4, key5, key6, key7, key8) partition by intDiv(key1, 8192 * 64) + create table collapsing_final_16p_str_keys_rnd (key1 UInt32, key2 String, key3 String, key4 String, key5 String, key6 String, key7 String, key8 String, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1, key2, key3, key4, key5, key6, key7, key8) partition by key1 % 16 + create table collapsing_final_1024p_ord (key1 UInt32, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1) partition by intDiv(key1, 8192 * 2) + create table collapsing_final_1024p_rnd (key1 UInt32, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1) partition by key1 % 1024 + + + insert into collapsing_final_16p_ord select number, number, 1, number from numbers(8388608) + insert into collapsing_final_16p_rnd select sipHash64(number), number, 1, number from numbers(8388608) + insert into collapsing_final_16p_int_keys_ord select number, number, number, number, number, number, number, number, 1, number from numbers(8388608) + insert into collapsing_final_16p_int_keys_rnd select sipHash64(number), number, number, number, number, number, number, number, 1, number from numbers(8388608) + insert into collapsing_final_16p_str_keys_ord select number, number, number, number, number, number, number, number, 1, number from numbers(8388608) + insert into collapsing_final_16p_str_keys_rnd select sipHash64(number), number, number, number, number, number, number, number, 1, number from numbers(8388608) + + + insert into collapsing_final_1024p_ord select number, 1, number from numbers(16777216) + insert into collapsing_final_1024p_rnd select number, 1, number from numbers(16777216) + + optimize table {collapsing} final + + SELECT count() FROM {collapsing} final + SELECT sum(s) FROM {collapsing} final group by key1 limit 10 + SELECT sum(s) FROM {collapsing} final group by key1 % 8192 limit 10 + + DROP TABLE IF EXISTS {collapsing} + diff --git a/tests/queries/0_stateless/00191_aggregating_merge_tree_and_final.sql b/tests/queries/0_stateless/00191_aggregating_merge_tree_and_final.sql index 47b9600611323958d41894100d2a40fd00ed5cb3..776edeeb43cc429fe8754624001d71e287d4b9cd 100644 --- a/tests/queries/0_stateless/00191_aggregating_merge_tree_and_final.sql +++ b/tests/queries/0_stateless/00191_aggregating_merge_tree_and_final.sql @@ -4,11 +4,11 @@ CREATE TABLE aggregating_00191 (d Date DEFAULT '2000-01-01', k UInt64, u Aggrega INSERT INTO aggregating_00191 (k, u) SELECT intDiv(number, 100) AS k, uniqState(toUInt64(number % 100)) AS u FROM (SELECT * FROM system.numbers LIMIT 1000) GROUP BY k; INSERT INTO aggregating_00191 (k, u) SELECT intDiv(number, 100) AS k, uniqState(toUInt64(number % 100) + 50) AS u FROM (SELECT * FROM system.numbers LIMIT 500, 1000) GROUP BY k; -SELECT k, finalizeAggregation(u) FROM aggregating_00191 FINAL; +SELECT k, finalizeAggregation(u) FROM aggregating_00191 FINAL order by k; OPTIMIZE TABLE aggregating_00191; SELECT k, finalizeAggregation(u) FROM aggregating_00191; -SELECT k, finalizeAggregation(u) FROM aggregating_00191 FINAL; +SELECT k, finalizeAggregation(u) FROM aggregating_00191 FINAL order by k; DROP TABLE aggregating_00191; diff --git a/tests/queries/0_stateless/00564_versioned_collapsing_merge_tree.sql b/tests/queries/0_stateless/00564_versioned_collapsing_merge_tree.sql index 5b7f59f8b65c2131106f882d595298ce987987e0..b7824e7efdcdcbd5333beaec1763c6f70caa91cb 100644 --- a/tests/queries/0_stateless/00564_versioned_collapsing_merge_tree.sql +++ b/tests/queries/0_stateless/00564_versioned_collapsing_merge_tree.sql @@ -3,7 +3,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; select 'table with 2 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, sign; optimize table mult_tab; select 'table with 2 blocks optimized'; select * from mult_tab; @@ -15,7 +15,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; select 'table with 2 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, sign; optimize table mult_tab; select 'table with 2 blocks optimized'; select * from mult_tab; @@ -27,7 +27,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 10; select 'table with 2 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, sign; optimize table mult_tab; select 'table with 2 blocks optimized'; select * from mult_tab; @@ -39,7 +39,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 2, -1, 1) from system.numbers limit 10; select 'table with 2 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, version, sign; optimize table mult_tab; select 'table with 2 blocks optimized'; select * from mult_tab; @@ -53,7 +53,7 @@ insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(numb insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 10; select 'table with 4 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, sign; optimize table mult_tab; select 'table with 4 blocks optimized'; select * from mult_tab; @@ -68,7 +68,7 @@ insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(numb insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 3 = 1, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 3 = 2, 1, -1) from system.numbers limit 10; select 'table with 5 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, sign; optimize table mult_tab; select 'table with 5 blocks optimized'; select * from mult_tab; @@ -80,7 +80,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 1000000; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 1000000; select 'table with 2 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, sign; optimize table mult_tab; select 'table with 2 blocks optimized'; select * from mult_tab; @@ -88,14 +88,14 @@ select * from mult_tab; select '-------------------------'; drop table if exists mult_tab; -create table mult_tab (date Date, value UInt64, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(date, (date), 8192, sign, version); -insert into mult_tab select '2018-01-31', number, 0, if(number < 64, 1, -1) from system.numbers limit 128; -insert into mult_tab select '2018-01-31', number, 0, if(number < 64, -1, 1) from system.numbers limit 128; +create table mult_tab (date Date, value UInt64, key UInt64, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(date, (date), 8192, sign, version); +insert into mult_tab select '2018-01-31', number, number, 0, if(number < 64, 1, -1) from system.numbers limit 128; +insert into mult_tab select '2018-01-31', number, number + 128, 0, if(number < 64, -1, 1) from system.numbers limit 128; select 'table with 2 blocks final'; -select * from mult_tab final settings max_block_size=33; +select date, value, version, sign from mult_tab final order by date, key, sign settings max_block_size=33; optimize table mult_tab; select 'table with 2 blocks optimized'; -select * from mult_tab; +select date, value, version, sign from mult_tab; select '-------------------------'; select 'Vertival merge'; @@ -106,7 +106,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; select 'table with 2 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, sign; optimize table mult_tab; select 'table with 2 blocks optimized'; select * from mult_tab; @@ -118,7 +118,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; select 'table with 2 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, sign; optimize table mult_tab; select 'table with 2 blocks optimized'; select * from mult_tab; @@ -130,7 +130,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 10; select 'table with 2 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, sign; optimize table mult_tab; select 'table with 2 blocks optimized'; select * from mult_tab; @@ -142,7 +142,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 2, -1, 1) from system.numbers limit 10; select 'table with 2 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, version, sign; optimize table mult_tab; select 'table with 2 blocks optimized'; select * from mult_tab; @@ -156,7 +156,7 @@ insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(numb insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 10; select 'table with 4 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, sign; optimize table mult_tab; select 'table with 4 blocks optimized'; select * from mult_tab; @@ -171,7 +171,7 @@ insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(numb insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 3 = 1, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 3 = 2, 1, -1) from system.numbers limit 10; select 'table with 5 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, sign; optimize table mult_tab; select 'table with 5 blocks optimized'; select * from mult_tab; @@ -183,7 +183,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 1000000; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 1000000; select 'table with 2 blocks final'; -select * from mult_tab final; +select * from mult_tab final order by date, value, sign; optimize table mult_tab; select 'table with 2 blocks optimized'; select * from mult_tab; @@ -191,13 +191,13 @@ select * from mult_tab; select '-------------------------'; drop table if exists mult_tab; -create table mult_tab (date Date, value UInt64, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(sign, version) order by (date) settings enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 0; -insert into mult_tab select '2018-01-31', number, 0, if(number < 64, 1, -1) from system.numbers limit 128; -insert into mult_tab select '2018-01-31', number, 0, if(number < 64, -1, 1) from system.numbers limit 128; +create table mult_tab (date Date, value UInt64, key UInt64, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(sign, version) order by (date) settings enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 0; +insert into mult_tab select '2018-01-31', number, number, 0, if(number < 64, 1, -1) from system.numbers limit 128; +insert into mult_tab select '2018-01-31', number, number + 128, 0, if(number < 64, -1, 1) from system.numbers limit 128; select 'table with 2 blocks final'; -select * from mult_tab final settings max_block_size=33; +select date, value, version, sign from mult_tab final order by date, key, sign settings max_block_size=33; optimize table mult_tab; select 'table with 2 blocks optimized'; -select * from mult_tab; +select date, value, version, sign from mult_tab; DROP TABLE mult_tab; diff --git a/tests/queries/0_stateless/00915_simple_aggregate_function.sql b/tests/queries/0_stateless/00915_simple_aggregate_function.sql index 1866e2bc8c596cb44f4ac908258116d8496fa1c1..ba4935a6518126b3ce5a46041ee0cfa0e8236169 100644 --- a/tests/queries/0_stateless/00915_simple_aggregate_function.sql +++ b/tests/queries/0_stateless/00915_simple_aggregate_function.sql @@ -5,13 +5,13 @@ create table simple (id UInt64,val SimpleAggregateFunction(sum,Double)) engine=A insert into simple select number,number from system.numbers limit 10; select * from simple; -select * from simple final; +select * from simple final order by id; select toTypeName(val) from simple limit 1; -- merge insert into simple select number,number from system.numbers limit 10; -select * from simple final; +select * from simple final order by id; optimize table simple final; select * from simple; @@ -33,7 +33,7 @@ insert into simple values(1,null,'2','2.2.2.2', 2, ([1,3], [1,1])); insert into simple values(10,'10','10','10.10.10.10', 4, ([2,3], [1,1])); insert into simple values(10,'2222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222','20','20.20.20.20', 1, ([2, 4], [1,1])); -select * from simple final; +select * from simple final order by id; select toTypeName(nullable_str),toTypeName(low_str),toTypeName(ip),toTypeName(status), toTypeName(tup) from simple limit 1; optimize table simple final; diff --git a/tests/queries/0_stateless/01030_incorrect_count_summing_merge_tree.sql b/tests/queries/0_stateless/01030_incorrect_count_summing_merge_tree.sql index a9f7bf7ecd70cfa56d114f38674c72368f3d97ff..0b5845d3b04f292579a405eb73908bec4a161156 100644 --- a/tests/queries/0_stateless/01030_incorrect_count_summing_merge_tree.sql +++ b/tests/queries/0_stateless/01030_incorrect_count_summing_merge_tree.sql @@ -4,7 +4,7 @@ drop table if exists tst; create table tst (timestamp DateTime, val Nullable(Int8)) engine SummingMergeTree partition by toYYYYMM(timestamp) ORDER by (timestamp); insert into tst values ('2018-02-01 00:00:00', 1), ('2018-02-02 00:00:00', 2); -select * from tst final; +select * from tst final order by timestamp; select '-- 2 2'; select count() from tst; @@ -34,7 +34,7 @@ drop table if exists tst; create table tst (timestamp DateTime, val Nullable(Int8)) engine SummingMergeTree partition by toYYYYMM(timestamp) ORDER by (timestamp); insert into tst values ('2018-02-01 00:00:00', 1), ('2018-02-02 00:00:00', 2), ('2018-02-01 00:00:00', 3), ('2018-02-02 00:00:00', 4); -select * from tst final; +select * from tst final order by timestamp; select '-- 4 2'; select count() from tst; @@ -64,7 +64,7 @@ drop table if exists tst; create table tst (timestamp DateTime, val Int8) engine SummingMergeTree partition by toYYYYMM(timestamp) ORDER by (timestamp); insert into tst values ('2018-02-01 00:00:00', 1), ('2018-02-02 00:00:00', 2); -select * from tst final; +select * from tst final order by timestamp; select '-- 2 2'; select count() from tst; @@ -96,7 +96,7 @@ drop table if exists tst; create table tst (timestamp DateTime, val Int8) engine SummingMergeTree partition by toYYYYMM(timestamp) ORDER by (timestamp); insert into tst values ('2018-02-01 00:00:00', 1), ('2018-02-02 00:00:00', 2), ('2018-02-01 00:00:00', 3), ('2018-02-02 00:00:00', 4); -select * from tst final; +select * from tst final order by timestamp; select '-- 4 2'; select count() from tst;