diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 3564c84a796e9be1c6f1d0d4ecb128519aca5ffb..c73ad646c02b54a14126bebc69eeaca887e0d9b3 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -371,6 +371,7 @@ add_library (dbms include/DB/Interpreters/ClusterProxy/DescribeQueryConstructor.h include/DB/Interpreters/ClusterProxy/AlterQueryConstructor.h include/DB/Interpreters/ClusterProxy/Query.h + include/DB/Interpreters/ActiveMetrics.h include/DB/Common/Allocator.h include/DB/Common/CombinedCardinalityEstimator.h include/DB/Common/ExternalTable.h @@ -852,6 +853,7 @@ add_library (dbms src/Interpreters/ClusterProxy/DescribeQueryConstructor.cpp src/Interpreters/ClusterProxy/AlterQueryConstructor.cpp src/Interpreters/ClusterProxy/Query.cpp + src/Interpreters/ActiveMetrics.cpp src/Functions/FunctionFactory.cpp src/Functions/FunctionsArithmetic.cpp diff --git a/dbms/include/DB/Common/CurrentMetrics.h b/dbms/include/DB/Common/CurrentMetrics.h index 3544845ab15b90921a8dea340d9355109f6a06d9..e06681ae5b119dbe26d96d86baf5e9e2bef1c7d7 100644 --- a/dbms/include/DB/Common/CurrentMetrics.h +++ b/dbms/include/DB/Common/CurrentMetrics.h @@ -35,6 +35,19 @@ M(QueryThread) \ M(ReadonlyReplica) \ M(MemoryTracking) \ + M(MarkCacheBytes) \ + M(MarkCacheFiles) \ + M(UncompressedCacheBytes) \ + M(UncompressedCacheCells) \ + M(ReplicasMaxQueueSize) \ + M(ReplicasMaxInsertsInQueue) \ + M(ReplicasMaxMergesInQueue) \ + M(ReplicasSumQueueSize) \ + M(ReplicasSumInsertsInQueue) \ + M(ReplicasSumMergesInQueue) \ + M(ReplicasMaxAbsoluteDelay) \ + M(ReplicasMaxRelativeDelay) \ + M(MaxPartCountForPartition) \ \ M(END) diff --git a/dbms/include/DB/Interpreters/ActiveMetrics.h b/dbms/include/DB/Interpreters/ActiveMetrics.h new file mode 100644 index 0000000000000000000000000000000000000000..aad4a3b93fe38ea13f90c891de646bdfc5718e95 --- /dev/null +++ b/dbms/include/DB/Interpreters/ActiveMetrics.h @@ -0,0 +1,39 @@ +#pragma once + +#include +#include +#include + + +namespace DB +{ + +class Context; + + +/** Periodically (each minute, starting at 30 seconds offset) + * calculates and updates some metrics (see CurrentMetrics.h), + * that are not updated automatically (so, need to be actively calculated). + */ +class ActiveMetrics +{ +public: + ActiveMetrics(Context & context_) + : context(context_), thread([this] { run(); }) + { + } + + ~ActiveMetrics(); + +private: + Context & context; + bool quit {false}; + std::mutex mutex; + std::condition_variable cond; + std::thread thread; + + void run(); + void update(); +}; + +} diff --git a/dbms/src/Interpreters/ActiveMetrics.cpp b/dbms/src/Interpreters/ActiveMetrics.cpp new file mode 100644 index 0000000000000000000000000000000000000000..7e1401568af84ba62f1c914fb994303684968d6f --- /dev/null +++ b/dbms/src/Interpreters/ActiveMetrics.cpp @@ -0,0 +1,180 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +ActiveMetrics::~ActiveMetrics() +{ + try + { + { + std::lock_guard lock{mutex}; + quit = true; + } + + cond.notify_one(); + thread.join(); + } + catch (...) + { + DB::tryLogCurrentException(__FUNCTION__); + } +} + + +void ActiveMetrics::run() +{ + using namespace std::literals; + + setThreadName("ActiveMetrics"); + + std::this_thread::sleep_for(30s); /// To be distant with moment of transmission of metrics. It is not strictly necessary. + + const auto get_next_minute = [] + { + return std::chrono::time_point_cast( + std::chrono::system_clock::now() + std::chrono::minutes(1)); + }; + + std::unique_lock lock{mutex}; + + while (true) + { + if (cond.wait_until(lock, get_next_minute(), [this] { return quit; })) + break; + + try + { + update(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } +} + + +template +static void calculateMax(Max & max, Max x) +{ + if (x > max) + max = x; +} + +template +static void calculateMaxAndSum(Max & max, Sum & sum, Max x) +{ + sum += x; + if (x > max) + max = x; +} + + +void ActiveMetrics::update() +{ + { + if (auto mark_cache = context.getMarkCache()) + { + CurrentMetrics::set(CurrentMetrics::MarkCacheBytes, mark_cache->weight()); + CurrentMetrics::set(CurrentMetrics::MarkCacheFiles, mark_cache->count()); + } + } + + { + if (auto uncompressed_cache = context.getUncompressedCache()) + { + CurrentMetrics::set(CurrentMetrics::UncompressedCacheBytes, uncompressed_cache->weight()); + CurrentMetrics::set(CurrentMetrics::UncompressedCacheCells, uncompressed_cache->count()); + } + } + + { + auto databases = context.getDatabases(); + + size_t max_queue_size = 0; + size_t max_inserts_in_queue = 0; + size_t max_merges_in_queue = 0; + + size_t sum_queue_size = 0; + size_t sum_inserts_in_queue = 0; + size_t sum_merges_in_queue = 0; + + size_t max_absolute_delay = 0; + size_t max_relative_delay = 0; + + size_t max_part_count_for_partition = 0; + + for (const auto & db : databases) + { + for (auto iterator = db.second->getIterator(); iterator->isValid(); iterator->next()) + { + auto & table = iterator->table(); + StorageMergeTree * table_merge_tree = typeid_cast(table.get()); + StorageReplicatedMergeTree * table_replicated_merge_tree = typeid_cast(table.get()); + + if (table_replicated_merge_tree) + { + StorageReplicatedMergeTree::Status status; + table_replicated_merge_tree->getStatus(status, false); + + calculateMaxAndSum(max_queue_size, sum_queue_size, status.queue.queue_size); + calculateMaxAndSum(max_inserts_in_queue, sum_inserts_in_queue, status.queue.inserts_in_queue); + calculateMaxAndSum(max_merges_in_queue, sum_merges_in_queue, status.queue.merges_in_queue); + + try + { + time_t absolute_delay = 0; + time_t relative_delay = 0; + table_replicated_merge_tree->getReplicaDelays(absolute_delay, relative_delay); + + calculateMax(max_absolute_delay, absolute_delay); + calculateMax(max_relative_delay, relative_delay); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__, + "Cannot get replica delay for table: " + backQuoteIfNeed(db.first) + "." + backQuoteIfNeed(iterator->name())); + } + + calculateMax(max_part_count_for_partition, table_replicated_merge_tree->getData().getMaxPartsCountForMonth()); + if (auto unreplicated_data = table_replicated_merge_tree->getUnreplicatedData()) + calculateMax(max_part_count_for_partition, unreplicated_data->getMaxPartsCountForMonth()); + } + + if (table_merge_tree) + { + calculateMax(max_part_count_for_partition, table_merge_tree->getData().getMaxPartsCountForMonth()); + } + } + } + + CurrentMetrics::set(CurrentMetrics::ReplicasMaxQueueSize, max_queue_size); + CurrentMetrics::set(CurrentMetrics::ReplicasMaxInsertsInQueue, max_inserts_in_queue); + CurrentMetrics::set(CurrentMetrics::ReplicasMaxMergesInQueue, max_merges_in_queue); + + CurrentMetrics::set(CurrentMetrics::ReplicasSumQueueSize, sum_queue_size); + CurrentMetrics::set(CurrentMetrics::ReplicasSumInsertsInQueue, sum_inserts_in_queue); + CurrentMetrics::set(CurrentMetrics::ReplicasSumMergesInQueue, sum_merges_in_queue); + + CurrentMetrics::set(CurrentMetrics::ReplicasMaxAbsoluteDelay, max_absolute_delay); + CurrentMetrics::set(CurrentMetrics::ReplicasMaxRelativeDelay, max_relative_delay); + + CurrentMetrics::set(CurrentMetrics::MaxPartCountForPartition, max_part_count_for_partition); + } + + /// Add more metrics as you wish. +} + + +} diff --git a/dbms/src/Server/MetricsTransmitter.cpp b/dbms/src/Server/MetricsTransmitter.cpp index 252daeefe44588dfc6bbe437d29bc6844ddfd5c4..0f29bdfd56f589765bf2297db89cecc8ef63ba56 100644 --- a/dbms/src/Server/MetricsTransmitter.cpp +++ b/dbms/src/Server/MetricsTransmitter.cpp @@ -32,10 +32,10 @@ void MetricsTransmitter::run() { setThreadName("MetricsTransmit"); - const auto get_next_minute = [] { + const auto get_next_minute = [] + { return std::chrono::time_point_cast( - std::chrono::system_clock::now() + std::chrono::minutes(1) - ); + std::chrono::system_clock::now() + std::chrono::minutes(1)); }; std::unique_lock lock{mutex}; diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 2bedcdfc8cea3a7f5a1bfa4c81189e54730f0851..8ff451a23acc69f5de4924d1ed42174c7822817d 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -496,14 +497,17 @@ int Server::main(const std::vector & args) global_context->tryCreateDictionaries(); global_context->tryCreateExternalDictionaries(); } - - waitForTerminationRequest(); } catch (...) { LOG_ERROR(log, "Caught exception while loading dictionaries."); throw; } + + /// This object will periodically calculate some metrics. + ActiveMetrics active_metrics(*global_context); + + waitForTerminationRequest(); } return Application::EXIT_OK;