diff --git a/dbms/src/Common/MemoryTracker.cpp b/dbms/src/Common/MemoryTracker.cpp index 7e957ae1ae4d109669d1aad8ebe0861b2e60998f..082477c1d9a62309613e0b9fccb030cb12def39d 100644 --- a/dbms/src/Common/MemoryTracker.cpp +++ b/dbms/src/Common/MemoryTracker.cpp @@ -16,6 +16,9 @@ namespace DB } +static constexpr size_t log_peak_memory_usage_every = 1ULL << 30; + + MemoryTracker::~MemoryTracker() { if (static_cast(level) < static_cast(VariableContext::Process) && peak) @@ -52,6 +55,13 @@ void MemoryTracker::logPeakMemoryUsage() const << ": " << formatReadableSizeWithBinarySuffix(peak) << "."); } +static void logMemoryUsage(Int64 amount) +{ + LOG_DEBUG(&Logger::get("MemoryTracker"), + "Current memory usage: " << formatReadableSizeWithBinarySuffix(amount) << "."); +} + + void MemoryTracker::alloc(Int64 size) { @@ -101,9 +111,15 @@ void MemoryTracker::alloc(Int64 size) throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED); } - if (will_be > peak.load(std::memory_order_relaxed)) /// Races doesn't matter. Could rewrite with CAS, but not worth. + auto peak_old = peak.load(std::memory_order_relaxed); + if (will_be > peak_old) /// Races doesn't matter. Could rewrite with CAS, but not worth. + { peak.store(will_be, std::memory_order_relaxed); + if (level == VariableContext::Process && will_be / log_peak_memory_usage_every > peak_old / log_peak_memory_usage_every) + logMemoryUsage(will_be); + } + if (auto loaded_next = parent.load(std::memory_order_relaxed)) loaded_next->alloc(size); } diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index b062c679c0a68e4934136d92eca085c35a1b1dfb..6122f54630d8625f392ead6e9149ceee4d9457b0 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include @@ -65,9 +66,10 @@ static void enrichBlockWithConstants(Block & block, const Block & header) MergeSortingBlockInputStream::MergeSortingBlockInputStream( const BlockInputStreamPtr & input, SortDescription & description_, - size_t max_merged_block_size_, size_t limit_, + size_t max_merged_block_size_, size_t limit_, size_t max_bytes_before_remerge_, size_t max_bytes_before_external_sort_, const std::string & tmp_path_) : description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_), + max_bytes_before_remerge(max_bytes_before_remerge_), max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_) { children.push_back(input); @@ -100,7 +102,20 @@ Block MergeSortingBlockInputStream::readImpl() removeConstantsFromBlock(block); blocks.push_back(block); - sum_bytes_in_blocks += block.bytes(); + sum_rows_in_blocks += block.rows(); + sum_bytes_in_blocks += block.allocatedBytes(); + + /** If significant amount of data was accumulated, perform preliminary merging step. + */ + if (blocks.size() > 1 + && limit + && limit * 2 < sum_rows_in_blocks /// 2 is just a guess. + && remerge_is_useful + && max_bytes_before_remerge + && sum_bytes_in_blocks > max_bytes_before_remerge) + { + remerge(); + } /** If too many of them and if external sorting is enabled, * will merge blocks that we have in memory at this moment and write merged stream to temporary (compressed) file. @@ -123,6 +138,7 @@ Block MergeSortingBlockInputStream::readImpl() blocks.clear(); sum_bytes_in_blocks = 0; + sum_rows_in_blocks = 0; } } @@ -255,4 +271,37 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue sum_bytes_in_blocks) + remerge_is_useful = false; + + blocks = std::move(new_blocks); + sum_rows_in_blocks = new_sum_rows_in_blocks; + sum_bytes_in_blocks = new_sum_bytes_in_blocks; +} + } diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.h b/dbms/src/DataStreams/MergeSortingBlockInputStream.h index ad6d81984cc3b9f20daa8e31e820071a0bcb1bfe..9b7e1aa113913cd5a1dc361557847c70cbe2b79f 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.h +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.h @@ -72,6 +72,7 @@ public: /// limit - if not 0, allowed to return just first 'limit' rows in sorted order. MergeSortingBlockInputStream(const BlockInputStreamPtr & input, SortDescription & description_, size_t max_merged_block_size_, size_t limit_, + size_t max_bytes_before_remerge_, size_t max_bytes_before_external_sort_, const std::string & tmp_path_); String getName() const override { return "MergeSorting"; } @@ -89,12 +90,14 @@ private: size_t max_merged_block_size; size_t limit; + size_t max_bytes_before_remerge; size_t max_bytes_before_external_sort; const std::string tmp_path; Logger * log = &Logger::get("MergeSortingBlockInputStream"); Blocks blocks; + size_t sum_rows_in_blocks = 0; size_t sum_bytes_in_blocks = 0; std::unique_ptr impl; @@ -121,6 +124,11 @@ private: std::vector> temporary_inputs; BlockInputStreams inputs_to_merge; + + /// Merge all accumulated blocks to keep no more than limit rows. + void remerge(); + /// If remerge doesn't save memory at least several times, mark it as useless and don't do it anymore. + bool remerge_is_useful = true; }; } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 65acba867a6543eab9994f562f59c7ea80e042db..45edccf723c6947a4a2aeb1de8dfbaa104aaa6a8 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -1199,6 +1199,7 @@ void InterpreterSelectQuery::executeOrder(Pipeline & pipeline) /// Merge the sorted blocks. pipeline.firstStream() = std::make_shared( pipeline.firstStream(), order_descr, settings.max_block_size, limit, + settings.max_bytes_before_remerge_sort, settings.max_bytes_before_external_sort, context.getTemporaryPath()); } diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 1928d9d08286247b1afdee7313c36704a59966bf..60bd04fd5f22369c0fb9b84caa20fdb80b9762ab 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -218,6 +218,7 @@ struct Settings M(SettingUInt64, max_bytes_to_sort, 0, "") \ M(SettingOverflowMode, sort_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ M(SettingUInt64, max_bytes_before_external_sort, 0, "") \ + M(SettingUInt64, max_bytes_before_remerge_sort, 1000000000, "In case of ORDER BY with LIMIT, when memory usage is higher than specified threshold, perform additional steps of merging blocks before final merge to keep just top LIMIT rows.") \ \ M(SettingUInt64, max_result_rows, 0, "Limit on result size in rows. Also checked for intermediate data sent from remote servers.") \ M(SettingUInt64, max_result_bytes, 0, "Limit on result size in bytes (uncompressed). Also checked for intermediate data sent from remote servers.") \ diff --git a/dbms/tests/queries/0_stateless/00110_external_sort.sql b/dbms/tests/queries/0_stateless/00110_external_sort.sql index 6f56de66e3bd24fdec13b0f0c419e49106da84c4..91459d2dabb064b58f967dc16bb2ed2368ba95a5 100644 --- a/dbms/tests/queries/0_stateless/00110_external_sort.sql +++ b/dbms/tests/queries/0_stateless/00110_external_sort.sql @@ -1,3 +1,3 @@ SET max_memory_usage = 100000000; -SET max_bytes_before_external_sort = 10000000; +SET max_bytes_before_external_sort = 20000000; SELECT number FROM (SELECT number FROM system.numbers LIMIT 10000000) ORDER BY number * 1234567890123456789 LIMIT 9999990, 10; diff --git a/dbms/tests/queries/0_stateless/00723_remerge_sort.reference b/dbms/tests/queries/0_stateless/00723_remerge_sort.reference new file mode 100644 index 0000000000000000000000000000000000000000..e22c5f2d430682c34d3843101d5f2bae584f2236 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00723_remerge_sort.reference @@ -0,0 +1,20 @@ +0 +1 +10 +100 +1000 +10000 +100000 +1000000 +1000001 +1000002 +0 +1 +10 +100 +1000 +10000 +100000 +1000000 +1000001 +1000002 diff --git a/dbms/tests/queries/0_stateless/00723_remerge_sort.sql b/dbms/tests/queries/0_stateless/00723_remerge_sort.sql new file mode 100644 index 0000000000000000000000000000000000000000..562a5e80223af988cb4d4a4c65c9b09f0c128447 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00723_remerge_sort.sql @@ -0,0 +1,3 @@ +SELECT * FROM (SELECT x FROM (SELECT toString(number) AS x FROM system.numbers LIMIT 2000000) ORDER BY x LIMIT 10000) LIMIT 10; +SET max_bytes_before_remerge_sort = 1000000; +SELECT * FROM (SELECT x FROM (SELECT toString(number) AS x FROM system.numbers LIMIT 2000000) ORDER BY x LIMIT 10000) LIMIT 10;