From 108bfac17ee1c54e4debc2777e7356be6f51f161 Mon Sep 17 00:00:00 2001 From: Vitaliy Lyudvichenko Date: Fri, 23 Dec 2016 23:23:46 +0300 Subject: [PATCH] Added memory tracking events and columns for merges. [#METR-23911] --- .gitignore | 3 + dbms/include/DB/Common/MemoryTracker.h | 22 +++--- dbms/include/DB/Interpreters/Context.h | 3 + .../include/DB/Storages/MergeTree/MergeList.h | 68 ++++++++++++------- dbms/src/Common/CurrentMetrics.cpp | 1 + dbms/src/Common/ProfileEvents.cpp | 2 +- dbms/src/Interpreters/Context.cpp | 16 +++++ dbms/src/Storages/MergeTree/MergeList.cpp | 51 ++++++++++++++ .../MergeTree/MergeTreeDataMerger.cpp | 6 +- dbms/src/Storages/StorageMergeTree.cpp | 4 +- .../Storages/StorageReplicatedMergeTree.cpp | 4 +- .../Storages/System/StorageSystemMerges.cpp | 18 +++-- 12 files changed, 148 insertions(+), 50 deletions(-) create mode 100644 dbms/src/Storages/MergeTree/MergeList.cpp diff --git a/.gitignore b/.gitignore index e1ee691ad4..9cdac98eb0 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,9 @@ callgrind.out.* *.kdev4 *.kdev_include_paths +# Qt Creator files +*.user + # ignore perf output */perf.data diff --git a/dbms/include/DB/Common/MemoryTracker.h b/dbms/include/DB/Common/MemoryTracker.h index dbd8a9f778..653eeb33b4 100644 --- a/dbms/include/DB/Common/MemoryTracker.h +++ b/dbms/include/DB/Common/MemoryTracker.h @@ -11,9 +11,9 @@ namespace CurrentMetrics } -/** Отслеживает потребление памяти. - * Кидает исключение, если оно стало бы больше некоторого предельного значения. - * Один объект может использоваться одновременно в разных потоках. +/** Tracks memory consumption. + * It throws an exception if amount of consumed memory become greater than certain limit. + * The same memory tracker could be simultaneously used in different threads. */ class MemoryTracker { @@ -21,16 +21,17 @@ class MemoryTracker std::atomic peak {0}; Int64 limit {0}; - /// В целях тестирования exception safety - кидать исключение при каждом выделении памяти с указанной вероятностью. + /// To test exception safety of calling code, memory tracker throws an exception on each memory allocation with specified probability. double fault_probability = 0; - /// Односвязный список. Вся информация будет передаваться в следующие MemoryTracker-ы тоже. Они должны жить во время жизни данного MemoryTracker. + /// Singly-linked list. All information will be passed to subsequent memory trackers also (it allows to implement trackers hierarchy). + /// In terms of tree nodes it is the list of parents. Lifetime of these trackers should "include" lifetime of current tracker. MemoryTracker * next = nullptr; /// You could specify custom metric to track memory usage. CurrentMetrics::Metric metric = CurrentMetrics::MemoryTracking; - /// Если задано (например, "for user") - в сообщениях в логе будет указываться это описание. + /// This description will be used as prefix into log messages (if isn't nullptr) const char * description = nullptr; public: @@ -39,7 +40,7 @@ public: ~MemoryTracker(); - /** Вызывайте эти функции перед соответствующими операциями с памятью. + /** Call the following functions before calling of corresponding operations with memory allocators. */ void alloc(Int64 size); @@ -48,7 +49,7 @@ public: alloc(new_size - old_size); } - /** А эту функцию имеет смысл вызывать после освобождения памяти. + /** This function should be called after memory deallocation. */ void free(Int64 size); @@ -77,6 +78,7 @@ public: next = elem; } + /// The memory consumption could be shown in realtime via CurrentMetrics counter void setMetric(CurrentMetrics::Metric metric_) { metric = metric_; @@ -87,10 +89,10 @@ public: description = description_; } - /// Обнулить накопленные данные. + /// Reset the accumulated data. void reset(); - /// Вывести в лог информацию о пиковом потреблении памяти. + /// Prints info about peak memory consumption into log. void logPeakMemoryUsage() const; }; diff --git a/dbms/include/DB/Interpreters/Context.h b/dbms/include/DB/Interpreters/Context.h index 67585c82b0..4650a17719 100644 --- a/dbms/include/DB/Interpreters/Context.h +++ b/dbms/include/DB/Interpreters/Context.h @@ -235,6 +235,9 @@ public: MergeList & getMergeList(); const MergeList & getMergeList() const; + /// Global memory tracker for merges + MemoryTracker * getMergesMemoryTracker() const; + /// Создать кэш разжатых блоков указанного размера. Это можно сделать только один раз. void setUncompressedCache(size_t max_size_in_bytes); std::shared_ptr getUncompressedCache() const; diff --git a/dbms/include/DB/Storages/MergeTree/MergeList.h b/dbms/include/DB/Storages/MergeTree/MergeList.h index 5a7882aeee..d3cc320560 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeList.h +++ b/dbms/include/DB/Storages/MergeTree/MergeList.h @@ -2,6 +2,8 @@ #include #include +#include +#include #include #include #include @@ -22,6 +24,26 @@ namespace DB struct MergeInfo +{ + std::string database; + std::string table; + std::string result_part_name; + Float64 elapsed; + Float64 progress; + UInt64 num_parts; + UInt64 total_size_bytes_compressed; + UInt64 total_size_marks; + UInt64 bytes_read_uncompressed; + UInt64 bytes_written_uncompressed; + UInt64 rows_read; + UInt64 rows_written; + UInt64 columns_written; + UInt64 memory_usage; + UInt64 thread_number; +}; + + +struct MergeListElement : boost::noncopyable { const std::string database; const std::string table; @@ -41,27 +63,19 @@ struct MergeInfo /// Updated only for Vertical algorithm std::atomic columns_written{}; - MergeInfo(const std::string & database, const std::string & table, const std::string & result_part_name) - : database{database}, table{table}, result_part_name{result_part_name} - { - } + MemoryTracker memory_tracker; + MemoryTracker * background_pool_task_memory_tracker; + MemoryTracker * global_merges_memory_tracker; - MergeInfo(const MergeInfo & other) - : database(other.database), - table(other.table), - result_part_name(other.result_part_name), - watch(other.watch), - progress(other.progress), - num_parts(other.num_parts), - total_size_bytes_compressed(other.total_size_bytes_compressed), - total_size_marks(other.total_size_marks), - bytes_read_uncompressed(other.bytes_read_uncompressed.load(std::memory_order_relaxed)), - bytes_written_uncompressed(other.bytes_written_uncompressed.load(std::memory_order_relaxed)), - rows_read(other.rows_read.load(std::memory_order_relaxed)), - rows_written(other.rows_written.load(std::memory_order_relaxed)), - columns_written(other.columns_written.load(std::memory_order_relaxed)) - { - } + /// Poco thread number used in logs + UInt32 thread_number; + + + MergeListElement(const std::string & database, const std::string & table, const std::string & result_part_name, const Context & context); + + MergeInfo getInfo() const; + + ~MergeListElement(); }; @@ -71,7 +85,7 @@ class MergeListEntry { MergeList & list; - using container_t = std::list; + using container_t = std::list; container_t::iterator it; CurrentMetrics::Increment num_merges {CurrentMetrics::Merge}; @@ -83,7 +97,7 @@ public: MergeListEntry(MergeList & list, const container_t::iterator it) : list(list), it{it} {} ~MergeListEntry(); - MergeInfo * operator->() { return &*it; } + MergeListElement * operator->() { return &*it; } }; @@ -91,7 +105,8 @@ class MergeList { friend class MergeListEntry; - using container_t = std::list; + using container_t = std::list; + using info_container_t = std::list; mutable std::mutex mutex; container_t merges; @@ -107,10 +122,13 @@ public: return std::make_unique(*this, merges.emplace(merges.end(), std::forward(args)...)); } - container_t get() const + info_container_t get() const { std::lock_guard lock{mutex}; - return merges; + info_container_t res; + for (const auto & merge_element : merges) + res.emplace_back(merge_element.getInfo()); + return res; } }; diff --git a/dbms/src/Common/CurrentMetrics.cpp b/dbms/src/Common/CurrentMetrics.cpp index 4ed53c959c..79aeb6804d 100644 --- a/dbms/src/Common/CurrentMetrics.cpp +++ b/dbms/src/Common/CurrentMetrics.cpp @@ -25,6 +25,7 @@ M(LeaderReplica) \ M(MemoryTracking) \ M(MemoryTrackingInBackgroundProcessingPool) \ + M(MemoryTrackingForMerges) \ M(LeaderElection) \ M(EphemeralNode) \ M(ZooKeeperWatch) \ diff --git a/dbms/src/Common/ProfileEvents.cpp b/dbms/src/Common/ProfileEvents.cpp index 0798872053..ce2689582b 100644 --- a/dbms/src/Common/ProfileEvents.cpp +++ b/dbms/src/Common/ProfileEvents.cpp @@ -86,7 +86,7 @@ \ M(MergedRows) \ M(MergedUncompressedBytes) \ - M(MergesTime)\ + M(MergesTimeMilliseconds)\ \ M(MergeTreeDataWriterRows) \ M(MergeTreeDataWriterUncompressedBytes) \ diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 6fc8be002b..83e9d491d9 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -51,6 +51,7 @@ namespace ProfileEvents namespace CurrentMetrics { extern const Metric ContextLockWait; + extern const Metric MemoryTrackingForMerges; } @@ -127,6 +128,9 @@ struct ContextShared Poco::UUIDGenerator uuid_generator; + /// Aggregates memory trackers from all merges + mutable std::unique_ptr merges_memory_tracker; + bool shutdown_called = false; /// Позволяют запретить одновременное выполнение DDL запросов над одной и той же таблицей. @@ -215,6 +219,18 @@ const ProcessList & Context::getProcessList() const { return shared->proc MergeList & Context::getMergeList() { return shared->merge_list; } const MergeList & Context::getMergeList() const { return shared->merge_list; } +MemoryTracker * Context::getMergesMemoryTracker() const +{ + auto lock = getLock(); + if (!shared->merges_memory_tracker) + { + shared->merges_memory_tracker = std::make_unique(); + shared->merges_memory_tracker->setMetric(CurrentMetrics::MemoryTrackingForMerges); + } + + return shared->merges_memory_tracker.get(); +} + const Databases Context::getDatabases() const { diff --git a/dbms/src/Storages/MergeTree/MergeList.cpp b/dbms/src/Storages/MergeTree/MergeList.cpp new file mode 100644 index 0000000000..b3d3e9ebd1 --- /dev/null +++ b/dbms/src/Storages/MergeTree/MergeList.cpp @@ -0,0 +1,51 @@ +#include +#include + + +namespace DB +{ + +MergeListElement::MergeListElement(const std::string & database, const std::string & table, const std::string & result_part_name, + const Context & context) + : database{database}, table{table}, result_part_name{result_part_name}, thread_number{Poco::ThreadNumber::get()} +{ + /// Each merge is executed into separate background processing pool thread + background_pool_task_memory_tracker = current_memory_tracker; + + if (background_pool_task_memory_tracker) + { + background_pool_task_memory_tracker->setNext(&memory_tracker); + memory_tracker.setNext(context.getMergesMemoryTracker()); + } +} + +MergeInfo MergeListElement::getInfo() const +{ + MergeInfo res; + res.database = database; + res.table = table; + res.result_part_name = result_part_name; + res.elapsed = watch.elapsedSeconds(); + res.progress = progress; + res.num_parts = num_parts; + res.total_size_bytes_compressed = total_size_bytes_compressed; + res.total_size_marks = total_size_marks; + res.bytes_read_uncompressed = bytes_read_uncompressed.load(std::memory_order_relaxed); + res.bytes_written_uncompressed = bytes_written_uncompressed.load(std::memory_order_relaxed); + res.rows_read = rows_read.load(std::memory_order_relaxed); + res.rows_written = rows_written.load(std::memory_order_relaxed); + res.columns_written = columns_written.load(std::memory_order_relaxed); + res.memory_usage = memory_tracker.get(); + res.thread_number = thread_number; + + return res; +} + +MergeListElement::~MergeListElement() +{ + /// Unplug memory_tracker from current background processing pool thread + if (background_pool_task_memory_tracker) + background_pool_task_memory_tracker->setNext(nullptr); +} + +} diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 5b92034f69..726cb456cd 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -32,7 +32,7 @@ namespace ProfileEvents { extern const Event MergedRows; extern const Event MergedUncompressedBytes; - extern const Event MergesTime; + extern const Event MergesTimeMilliseconds; } namespace CurrentMetrics @@ -419,7 +419,7 @@ public: void updateWatch() { UInt64 watch_curr_elapsed = merge_entry->watch.elapsed(); - ProfileEvents::increment(ProfileEvents::MergesTime, watch_curr_elapsed - watch_prev_elapsed); + ProfileEvents::increment(ProfileEvents::MergesTimeMilliseconds, (watch_curr_elapsed - watch_prev_elapsed) / 1000000); watch_prev_elapsed = watch_curr_elapsed; } @@ -840,7 +840,7 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition( std::string merged_name = createMergedPartName(parts); MergeList::EntryPtr merge_entry_ptr = data.context.getMergeList().insert(job.database_name, - job.table_name, merged_name); + job.table_name, merged_name, data.context); MergeList::Entry & merge_entry = *merge_entry_ptr; merge_entry->num_parts = parts.size(); diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index cfec7099bd..11976495c5 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -339,10 +339,10 @@ bool StorageMergeTree::merge( merging_tagger.emplace(parts, MergeTreeDataMerger::estimateDiskSpaceForMerge(parts), *this); } - const auto & merge_entry = context.getMergeList().insert(database_name, table_name, merged_name); + MergeList::EntryPtr merge_entry_ptr = context.getMergeList().insert(database_name, table_name, merged_name, context); auto new_part = merger.mergePartsToTemporaryPart( - merging_tagger->parts, merged_name, *merge_entry, aio_threshold, time(0), merging_tagger->reserved_space.get()); + merging_tagger->parts, merged_name, *merge_entry_ptr, aio_threshold, time(0), merging_tagger->reserved_space.get()); merger.renameMergedTemporaryPart(merging_tagger->parts, new_part, merged_name, nullptr); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 4eedaaf3bc..fe5267e7ed 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1145,7 +1145,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry) auto table_lock = lockStructure(false); - const auto & merge_entry = context.getMergeList().insert(database_name, table_name, entry.new_part_name); + MergeList::EntryPtr merge_entry = context.getMergeList().insert(database_name, table_name, entry.new_part_name, context); MergeTreeData::Transaction transaction; size_t aio_threshold = context.getSettings().min_bytes_to_use_direct_io; @@ -2304,7 +2304,7 @@ bool StorageReplicatedMergeTree::optimize(const String & partition, bool final, if (unreplicated_merger->selectPartsToMerge(parts, merged_name, true, 0, always_can_merge)) { - const auto & merge_entry = context.getMergeList().insert(database_name, table_name, merged_name); + MergeList::EntryPtr merge_entry = context.getMergeList().insert(database_name, table_name, merged_name, context); auto new_part = unreplicated_merger->mergePartsToTemporaryPart( parts, merged_name, *merge_entry, settings.min_bytes_to_use_direct_io, time(0)); diff --git a/dbms/src/Storages/System/StorageSystemMerges.cpp b/dbms/src/Storages/System/StorageSystemMerges.cpp index 69157ab996..e77cbae982 100644 --- a/dbms/src/Storages/System/StorageSystemMerges.cpp +++ b/dbms/src/Storages/System/StorageSystemMerges.cpp @@ -25,7 +25,9 @@ StorageSystemMerges::StorageSystemMerges(const std::string & name) { "rows_read", std::make_shared() }, { "bytes_written_uncompressed", std::make_shared() }, { "rows_written", std::make_shared() }, - { "columns_written", std::make_shared() } + { "columns_written", std::make_shared() }, + { "memory_usage", std::make_shared() }, + { "thread_number", std::make_shared() }, } { } @@ -54,17 +56,19 @@ BlockInputStreams StorageSystemMerges::read( size_t i = 0; block.unsafeGetByPosition(i++).column->insert(merge.database); block.unsafeGetByPosition(i++).column->insert(merge.table); - block.unsafeGetByPosition(i++).column->insert(merge.watch.elapsedSeconds()); + block.unsafeGetByPosition(i++).column->insert(merge.elapsed); block.unsafeGetByPosition(i++).column->insert(std::min(1., merge.progress)); /// little cheat block.unsafeGetByPosition(i++).column->insert(merge.num_parts); block.unsafeGetByPosition(i++).column->insert(merge.result_part_name); block.unsafeGetByPosition(i++).column->insert(merge.total_size_bytes_compressed); block.unsafeGetByPosition(i++).column->insert(merge.total_size_marks); - block.unsafeGetByPosition(i++).column->insert(merge.bytes_read_uncompressed.load(std::memory_order_relaxed)); - block.unsafeGetByPosition(i++).column->insert(merge.rows_read.load(std::memory_order_relaxed)); - block.unsafeGetByPosition(i++).column->insert(merge.bytes_written_uncompressed.load(std::memory_order_relaxed)); - block.unsafeGetByPosition(i++).column->insert(merge.rows_written.load(std::memory_order_relaxed)); - block.unsafeGetByPosition(i++).column->insert(merge.columns_written.load(std::memory_order_relaxed)); + block.unsafeGetByPosition(i++).column->insert(merge.bytes_read_uncompressed); + block.unsafeGetByPosition(i++).column->insert(merge.rows_read); + block.unsafeGetByPosition(i++).column->insert(merge.bytes_written_uncompressed); + block.unsafeGetByPosition(i++).column->insert(merge.rows_written); + block.unsafeGetByPosition(i++).column->insert(merge.columns_written); + block.unsafeGetByPosition(i++).column->insert(merge.memory_usage); + block.unsafeGetByPosition(i++).column->insert(merge.thread_number); } return BlockInputStreams{1, std::make_shared(block)}; -- GitLab