From 89939a685a651190805dc23205b552fe8c139a98 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Tue, 4 Jul 2017 15:38:53 +0300 Subject: [PATCH] Stream row sources from disk for vertical merge [#CLICKHOUSE-3118] --- .../CollapsingSortedBlockInputStream.cpp | 68 +++++++------ .../CollapsingSortedBlockInputStream.h | 22 +++-- dbms/src/DataStreams/ColumnGathererStream.cpp | 32 +++--- dbms/src/DataStreams/ColumnGathererStream.h | 99 +++++++++++-------- .../MergingSortedBlockInputStream.cpp | 24 +++-- .../MergingSortedBlockInputStream.h | 7 +- .../MergeTree/MergeTreeDataMerger.cpp | 48 ++++----- .../Storages/MergeTree/MergeTreeDataMerger.h | 5 +- 8 files changed, 176 insertions(+), 129 deletions(-) diff --git a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp index b6288eeae8..ea1eff48a3 100644 --- a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp @@ -41,39 +41,40 @@ void CollapsingSortedBlockInputStream::reportIncorrectData() void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_columns, size_t & merged_rows, bool last_in_stream) { - if (count_positive != 0 || count_negative != 0) + if (count_positive == 0 && count_negative == 0) + return; + + if (count_positive == count_negative && !last_is_positive) { - if (count_positive == count_negative && !last_is_positive) + /// If all the rows in the input streams collapsed, we still want to give at least one block in the result. + if (last_in_stream && merged_rows == 0 && !blocks_written) { - /// If all the rows in the input streams collapsed, we still want to give at least one block in the result. - if (last_in_stream && merged_rows == 0 && !blocks_written) + LOG_INFO(log, "All rows collapsed"); + ++merged_rows; + for (size_t i = 0; i < num_columns; ++i) + merged_columns[i]->insertFrom(*last_positive.columns[i], last_positive.row_num); + ++merged_rows; + for (size_t i = 0; i < num_columns; ++i) + merged_columns[i]->insertFrom(*last_negative.columns[i], last_negative.row_num); + + if (out_row_sources_buf) { - LOG_INFO(log, "All rows collapsed"); - ++merged_rows; - for (size_t i = 0; i < num_columns; ++i) - merged_columns[i]->insertFrom(*last_positive.columns[i], last_positive.row_num); - ++merged_rows; - for (size_t i = 0; i < num_columns; ++i) - merged_columns[i]->insertFrom(*last_negative.columns[i], last_negative.row_num); - - if (out_row_sources) - { - /// true flag value means "skip row" - out_row_sources->data()[last_positive_pos].setSkipFlag(false); - out_row_sources->data()[last_negative_pos].setSkipFlag(false); - } + /// true flag value means "skip row" + current_row_sources[last_positive_pos].setSkipFlag(false); + current_row_sources[last_negative_pos].setSkipFlag(false); } - return; } - + } + else + { if (count_positive <= count_negative) { ++merged_rows; for (size_t i = 0; i < num_columns; ++i) merged_columns[i]->insertFrom(*first_negative.columns[i], first_negative.row_num); - if (out_row_sources) - out_row_sources->data()[first_negative_pos].setSkipFlag(false); + if (out_row_sources_buf) + current_row_sources[first_negative_pos].setSkipFlag(false); } if (count_positive >= count_negative) @@ -82,8 +83,8 @@ void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_colum for (size_t i = 0; i < num_columns; ++i) merged_columns[i]->insertFrom(*last_positive.columns[i], last_positive.row_num); - if (out_row_sources) - out_row_sources->data()[last_positive_pos].setSkipFlag(false); + if (out_row_sources_buf) + current_row_sources[last_positive_pos].setSkipFlag(false); } if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1)) @@ -93,6 +94,11 @@ void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_colum ++count_incorrect_data; } } + + if (out_row_sources_buf) + out_row_sources_buf->write( + reinterpret_cast(current_row_sources.data()), + current_row_sources.size() * sizeof(RowSourcePart)); } @@ -162,10 +168,6 @@ void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, s queue.pop(); - /// Initially, skip all rows. On insert, unskip "corner" rows. - if (out_row_sources) - out_row_sources->emplace_back(current.impl->order, true); - if (key_differs) { /// We write data for the previous primary key. @@ -175,8 +177,18 @@ void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, s count_negative = 0; count_positive = 0; + + current_pos = 0; + first_negative_pos = 0; + last_positive_pos = 0; + last_negative_pos = 0; + current_row_sources.resize(0); } + /// Initially, skip all rows. On insert, unskip "corner" rows. + if (out_row_sources_buf) + current_row_sources.emplace_back(current.impl->order, true); + if (sign == 1) { ++count_positive; diff --git a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h index 0f082fbcb8..4eb3cc1a24 100644 --- a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h @@ -23,10 +23,12 @@ namespace DB class CollapsingSortedBlockInputStream : public MergingSortedBlockInputStream { public: - CollapsingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, - const String & sign_column_, size_t max_block_size_, MergedRowSources * out_row_sources_ = nullptr) - : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_), - sign_column(sign_column_) + CollapsingSortedBlockInputStream( + BlockInputStreams inputs_, const SortDescription & description_, + const String & sign_column_, size_t max_block_size_, + WriteBuffer * out_row_sources_buf_ = nullptr) + : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_) + , sign_column(sign_column_) { } @@ -77,11 +79,13 @@ private: size_t blocks_written = 0; - /// Fields specific for VERTICAL merge algorithm - size_t current_pos = 0; /// Global row number of current key - size_t first_negative_pos = 0; /// Global row number of first_negative - size_t last_positive_pos = 0; /// Global row number of last_positive - size_t last_negative_pos = 0; /// Global row number of last_negative + /// Fields specific for VERTICAL merge algorithm. + /// Row numbers are relative to the start of current primary key. + size_t current_pos = 0; /// Current row number + size_t first_negative_pos = 0; /// Row number of first_negative + size_t last_positive_pos = 0; /// Row number of last_positive + size_t last_negative_pos = 0; /// Row number of last_negative + PODArray current_row_sources; /// Sources of rows with the current primary key /** We support two different cursors - with Collation and without. * Templates are used instead of polymorphic SortCursors and calls to virtual functions. diff --git a/dbms/src/DataStreams/ColumnGathererStream.cpp b/dbms/src/DataStreams/ColumnGathererStream.cpp index dffa84ec90..f1166dc35e 100644 --- a/dbms/src/DataStreams/ColumnGathererStream.cpp +++ b/dbms/src/DataStreams/ColumnGathererStream.cpp @@ -11,13 +11,16 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; extern const int INCOMPATIBLE_COLUMNS; extern const int INCORRECT_NUMBER_OF_COLUMNS; + extern const int NOT_FOUND_COLUMN_IN_BLOCK; extern const int EMPTY_DATA_PASSED; extern const int RECEIVED_EMPTY_DATA; } -ColumnGathererStream::ColumnGathererStream(const BlockInputStreams & source_streams, const String & column_name_, - const MergedRowSources & row_source_, size_t block_preferred_size_) - : name(column_name_), row_source(row_source_), block_preferred_size(block_preferred_size_), log(&Logger::get("ColumnGathererStream")) +ColumnGathererStream::ColumnGathererStream( + const String & column_name_, const BlockInputStreams & source_streams, ReadBuffer & row_sources_buf_, + size_t block_preferred_size_) + : name(column_name_), row_sources_buf(row_sources_buf_) + , block_preferred_size(block_preferred_size_), log(&Logger::get("ColumnGathererStream")) { if (source_streams.empty()) throw Exception("There are no streams to gather", ErrorCodes::EMPTY_DATA_PASSED); @@ -49,8 +52,14 @@ void ColumnGathererStream::init() Block & block = sources.back().block; /// Sometimes MergeTreeReader injects additional column with partitioning key - if (block.columns() > 2 || !block.has(name)) - throw Exception("Block should have 1 or 2 columns and contain column with requested name", ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS); + if (block.columns() > 2) + throw Exception( + "Block should have 1 or 2 columns, but contains " + toString(block.columns()), + ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS); + if (!block.has(name)) + throw Exception( + "Not found column `" + name + "' in block.", + ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK); if (i == 0) { @@ -68,22 +77,19 @@ void ColumnGathererStream::init() Block ColumnGathererStream::readImpl() { /// Special case: single source and there are no skipped rows - if (children.size() == 1 && row_source.size() == 0) + if (children.size() == 1 && row_sources_buf.eof()) return children[0]->read(); /// Initialize first source blocks if (sources.empty()) init(); - if (pos_global_start >= row_source.size()) + if (row_sources_buf.eof()) return Block(); - block_res = Block{column.cloneEmpty()}; - IColumn & column_res = *block_res.getByPosition(0).column; - - column_res.gather(*this); - - return std::move(block_res); + output_block = Block{column.cloneEmpty()}; + output_block.getByPosition(0).column->gather(*this); + return std::move(output_block); } diff --git a/dbms/src/DataStreams/ColumnGathererStream.h b/dbms/src/DataStreams/ColumnGathererStream.h index b74d25c185..dc86bdb72c 100644 --- a/dbms/src/DataStreams/ColumnGathererStream.h +++ b/dbms/src/DataStreams/ColumnGathererStream.h @@ -14,22 +14,21 @@ namespace DB /// Tiny struct, stores number of a Part from which current row was fetched, and insertion flag. struct RowSourcePart { + UInt8 data = 0; + RowSourcePart() = default; - RowSourcePart(size_t source_num, bool flag = false) + RowSourcePart(size_t source_num, bool skip_flag = false) { static_assert(sizeof(*this) == 1, "Size of RowSourcePart is too big due to compiler settings"); setSourceNum(source_num); - setSkipFlag(flag); + setSkipFlag(skip_flag); } - /// Data is equal to getSourceNum() if flag is false - UInt8 getData() const { return data; } - size_t getSourceNum() const { return data & MASK_NUMBER; } /// In CollapsingMergeTree case flag means "skip this rows" - bool getSkipFlag() const { return (data & MASK_FLAG) != 0; } + bool getSkipFlag() const { return (data & MASK_FLAG) != 0; } void setSourceNum(size_t source_num) { @@ -44,9 +43,6 @@ struct RowSourcePart static constexpr size_t MAX_PARTS = 0x7F; static constexpr UInt8 MASK_NUMBER = 0x7F; static constexpr UInt8 MASK_FLAG = 0x80; - -private: - UInt8 data; }; using MergedRowSources = PODArray; @@ -59,8 +55,9 @@ using MergedRowSources = PODArray; class ColumnGathererStream : public IProfilingBlockInputStream { public: - ColumnGathererStream(const BlockInputStreams & source_streams, const String & column_name_, - const MergedRowSources & row_source_, size_t block_preferred_size_ = DEFAULT_MERGE_BLOCK_SIZE); + ColumnGathererStream( + const String & column_name_, const BlockInputStreams & source_streams, ReadBuffer & row_sources_buf_, + size_t block_preferred_size_ = DEFAULT_BLOCK_SIZE); String getName() const override { return "ColumnGatherer"; } @@ -75,11 +72,7 @@ public: void gather(Column & column_res); private: - String name; - ColumnWithTypeAndName column; - const MergedRowSources & row_source; - - /// Cache required fileds + /// Cache required fields struct Source { const IColumn * column; @@ -103,12 +96,16 @@ private: void init(); void fetchNewBlock(Source & source, size_t source_num); + String name; + ColumnWithTypeAndName column; + std::vector sources; + ReadBuffer & row_sources_buf; - size_t pos_global_start = 0; size_t block_preferred_size; - Block block_res; + Source * source_to_fully_copy = nullptr; + Block output_block; Poco::Logger * log; }; @@ -116,56 +113,74 @@ private: template void ColumnGathererStream::gather(Column & column_res) { - size_t global_size = row_source.size(); - size_t curr_block_preferred_size = std::min(global_size - pos_global_start, block_preferred_size); - column_res.reserve(curr_block_preferred_size); + if (source_to_fully_copy) /// Was set on a previous iteration + { + output_block.getByPosition(0).column = source_to_fully_copy->block.getByName(name).column; + source_to_fully_copy->pos = source_to_fully_copy->size; + source_to_fully_copy = nullptr; + return; + } - size_t pos_global = pos_global_start; - while (pos_global < global_size && column_res.size() < curr_block_preferred_size) + row_sources_buf.nextIfAtEnd(); + RowSourcePart * row_source_pos = reinterpret_cast(row_sources_buf.position()); + RowSourcePart * row_sources_end = reinterpret_cast(row_sources_buf.buffer().end()); + + size_t cur_block_preferred_size = std::min(static_cast(row_sources_end - row_source_pos), block_preferred_size); + column_res.reserve(cur_block_preferred_size); + + size_t cur_size = 0; + + while (row_source_pos < row_sources_end && cur_size < cur_block_preferred_size) { - auto source_data = row_source[pos_global].getData(); - bool source_skip = row_source[pos_global].getSkipFlag(); - auto source_num = row_source[pos_global].getSourceNum(); + RowSourcePart row_source = *row_source_pos; + size_t source_num = row_source.getSourceNum(); Source & source = sources[source_num]; + bool source_skip = row_source.getSkipFlag(); + ++row_source_pos; if (source.pos >= source.size) /// Fetch new block from source_num part { fetchNewBlock(source, source_num); } - /// Consecutive optimization. TODO: precompute lens + /// Consecutive optimization. TODO: precompute lengths size_t len = 1; - size_t max_len = std::min(global_size - pos_global, source.size - source.pos); // interval should be in the same block - for (; len < max_len && source_data == row_source[pos_global + len].getData(); ++len); + size_t max_len = std::min(static_cast(row_sources_end - row_source_pos), source.size - source.pos); // interval should be in the same block + while (len < max_len && row_source_pos->data == row_source.data) + { + ++len; + ++row_source_pos; + } + + row_sources_buf.position() = reinterpret_cast(row_source_pos); if (!source_skip) { /// Whole block could be produced via copying pointer from current block if (source.pos == 0 && source.size == len) { - /// If current block already contains data, return it. We will be here again on next read() iteration. - if (column_res.size() != 0) - break; - - block_res.getByPosition(0).column = source.block.getByName(name).column; + /// If current block already contains data, return it. + /// Whole column from current source will be returned on next read() iteration. + if (cur_size > 0) + { + source_to_fully_copy = &source; + return; + } + + output_block.getByPosition(0).column = source.block.getByName(name).column; source.pos += len; - pos_global += len; - break; + return; } else if (len == 1) - { column_res.insertFrom(*source.column, source.pos); - } else - { column_res.insertRangeFrom(*source.column, source.pos, len); - } + + cur_size += len; } source.pos += len; - pos_global += len; } - pos_global_start = pos_global; } } diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp index e1c80629f5..b6d4e566a7 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -15,14 +15,13 @@ namespace ErrorCodes } -MergingSortedBlockInputStream::MergingSortedBlockInputStream(BlockInputStreams & inputs_, const SortDescription & description_, - size_t max_block_size_, size_t limit_, MergedRowSources * out_row_sources_, bool quiet_) - : description(description_), max_block_size(max_block_size_), limit(limit_), quiet(quiet_), - source_blocks(inputs_.size()), cursors(inputs_.size()), out_row_sources(out_row_sources_) +MergingSortedBlockInputStream::MergingSortedBlockInputStream( + BlockInputStreams & inputs_, const SortDescription & description_, + size_t max_block_size_, size_t limit_, WriteBuffer * out_row_sources_buf_, bool quiet_) + : description(description_), max_block_size(max_block_size_), limit(limit_), quiet(quiet_) + , source_blocks(inputs_.size()), cursors(inputs_.size()), out_row_sources_buf(out_row_sources_buf_) { children.insert(children.end(), inputs_.begin(), inputs_.end()); - if (out_row_sources) - out_row_sources->clear(); } String MergingSortedBlockInputStream::getID() const @@ -253,8 +252,12 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs finished = true; } - if (out_row_sources) - out_row_sources->resize_fill(out_row_sources->size() + merged_rows, RowSourcePart(source_num)); + if (out_row_sources_buf) + { + RowSourcePart row_source(source_num); + for (size_t i = 0; i < merged_rows; ++i) + out_row_sources_buf->write(row_source.data); + } // std::cerr << "fetching next block\n"; @@ -268,10 +271,11 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs for (size_t i = 0; i < num_columns; ++i) merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); - if (out_row_sources) + if (out_row_sources_buf) { /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) - out_row_sources->emplace_back(current.impl->order); + RowSourcePart row_source(current.impl->order); + out_row_sources_buf->write(row_source.data); } if (!current->isLast()) diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.h b/dbms/src/DataStreams/MergingSortedBlockInputStream.h index bc1a977261..15e6407ea3 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.h @@ -62,8 +62,9 @@ public: * out_row_sources - if isn't nullptr, then at the end of execution it should contain part numbers of each readed row (and needed flag) * quiet - don't log profiling info */ - MergingSortedBlockInputStream(BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_, - size_t limit_ = 0, MergedRowSources * out_row_sources_ = nullptr, bool quiet_ = false); + MergingSortedBlockInputStream( + BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_, + size_t limit_ = 0, WriteBuffer * out_row_sources_buf_ = nullptr, bool quiet_ = false); String getName() const override { return "MergingSorted"; } @@ -146,7 +147,7 @@ protected: /// Used in Vertical merge algorithm to gather non-PK columns (on next step) /// If it is not nullptr then it should be populated during execution - MergedRowSources * out_row_sources = nullptr; + WriteBuffer * out_row_sources_buf; /// These methods are used in Collapsing/Summing/Aggregating... SortedBlockInputStream-s. diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 1b45de859f..be8275b098 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -18,6 +18,8 @@ #include #include #include +#include +#include #include #include #include @@ -516,15 +518,23 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart size_t sum_input_rows_upper_bound = merge_entry->total_size_marks * data.index_granularity; - MergedRowSources merged_rows_sources; - MergedRowSources * merged_rows_sources_ptr = &merged_rows_sources; - MergeAlgorithm merge_alg = chooseMergeAlgorithm(data, parts, sum_input_rows_upper_bound, gathering_columns, merged_rows_sources, deduplicate); + MergeAlgorithm merge_alg = chooseMergeAlgorithm(data, parts, sum_input_rows_upper_bound, gathering_columns, deduplicate); LOG_DEBUG(log, "Selected MergeAlgorithm: " << ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Horizontal")); - if (merge_alg != MergeAlgorithm::Vertical) + String rows_sources_file_path; + std::unique_ptr rows_sources_uncompressed_write_buf; + std::unique_ptr rows_sources_write_buf; + + if (merge_alg == MergeAlgorithm::Vertical) + { + Poco::File(new_part_tmp_path).createDirectories(); + rows_sources_file_path = new_part_tmp_path + "rows_sources"; + rows_sources_uncompressed_write_buf = std::make_unique(rows_sources_file_path); + rows_sources_write_buf = std::make_unique(*rows_sources_uncompressed_write_buf); + } + else { - merged_rows_sources_ptr = nullptr; merging_columns = all_columns; merging_column_names = all_column_names; gathering_columns.clear(); @@ -564,12 +574,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart { case MergeTreeData::MergingParams::Ordinary: merged_stream = std::make_unique( - src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE, 0, merged_rows_sources_ptr, true); + src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE, 0, rows_sources_write_buf.get(), true); break; case MergeTreeData::MergingParams::Collapsing: merged_stream = std::make_unique( - src_streams, sort_desc, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, merged_rows_sources_ptr); + src_streams, sort_desc, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, rows_sources_write_buf.get()); break; case MergeTreeData::MergingParams::Summing: @@ -658,6 +668,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart auto it_name_and_type = gathering_columns.cbegin(); + rows_sources_write_buf->next(); + rows_sources_uncompressed_write_buf->next(); + CompressedReadBufferFromFile rows_sources_read_buf(rows_sources_file_path, 0, 0); + for (size_t column_num = 0, gathering_column_names_size = gathering_column_names.size(); column_num < gathering_column_names_size; ++column_num, ++it_name_and_type) @@ -681,7 +695,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart column_part_streams[part_num] = std::move(column_part_stream); } - ColumnGathererStream column_gathered_stream(column_part_streams, column_name, merged_rows_sources, DEFAULT_BLOCK_SIZE); + rows_sources_read_buf.seek(0, 0); + ColumnGathererStream column_gathered_stream(column_name, column_part_streams, rows_sources_read_buf); MergedColumnOnlyOutputStream column_to(data, new_part_tmp_path, false, compression_method, offset_written); size_t column_elems_written = 0; @@ -710,6 +725,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart if (isCancelled()) throw Exception("Cancelled merging parts", ErrorCodes::ABORTED); } + + Poco::File(rows_sources_file_path).remove(); } /// Print overall profiling info. NOTE: it may duplicates previous messages @@ -746,7 +763,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart MergeTreeDataMerger::MergeAlgorithm MergeTreeDataMerger::chooseMergeAlgorithm( const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts, size_t sum_rows_upper_bound, - const NamesAndTypesList & gathering_columns, MergedRowSources & rows_sources_to_alloc, bool deduplicate) const + const NamesAndTypesList & gathering_columns, bool deduplicate) const { if (deduplicate) return MergeAlgorithm::Horizontal; @@ -766,19 +783,6 @@ MergeTreeDataMerger::MergeAlgorithm MergeTreeDataMerger::chooseMergeAlgorithm( auto merge_alg = (is_supported_storage && enough_total_rows && enough_ordinary_cols && no_parts_overflow) ? MergeAlgorithm::Vertical : MergeAlgorithm::Horizontal; - if (merge_alg == MergeAlgorithm::Vertical) - { - try - { - rows_sources_to_alloc.reserve(sum_rows_upper_bound); - } - catch (...) - { - /// Not enough memory for VERTICAL merge algorithm, make sense for very large tables - merge_alg = MergeAlgorithm::Horizontal; - } - } - return merge_alg; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.h b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.h index 323d8ff61e..90efd8dceb 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.h @@ -136,8 +136,9 @@ public: private: - MergeAlgorithm chooseMergeAlgorithm(const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts, - size_t rows_upper_bound, const NamesAndTypesList & gathering_columns, MergedRowSources & rows_sources_to_alloc, bool deduplicate) const; + MergeAlgorithm chooseMergeAlgorithm( + const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts, + size_t rows_upper_bound, const NamesAndTypesList & gathering_columns, bool deduplicate) const; private: MergeTreeData & data; -- GitLab