提交 be84b78b 编写于 作者: A Alexey Milovidov

Added actively updating metrics [#METR-23237].

上级 f05fa769
......@@ -371,6 +371,7 @@ add_library (dbms
include/DB/Interpreters/ClusterProxy/DescribeQueryConstructor.h
include/DB/Interpreters/ClusterProxy/AlterQueryConstructor.h
include/DB/Interpreters/ClusterProxy/Query.h
include/DB/Interpreters/ActiveMetrics.h
include/DB/Common/Allocator.h
include/DB/Common/CombinedCardinalityEstimator.h
include/DB/Common/ExternalTable.h
......@@ -852,6 +853,7 @@ add_library (dbms
src/Interpreters/ClusterProxy/DescribeQueryConstructor.cpp
src/Interpreters/ClusterProxy/AlterQueryConstructor.cpp
src/Interpreters/ClusterProxy/Query.cpp
src/Interpreters/ActiveMetrics.cpp
src/Functions/FunctionFactory.cpp
src/Functions/FunctionsArithmetic.cpp
......
......@@ -35,6 +35,19 @@
M(QueryThread) \
M(ReadonlyReplica) \
M(MemoryTracking) \
M(MarkCacheBytes) \
M(MarkCacheFiles) \
M(UncompressedCacheBytes) \
M(UncompressedCacheCells) \
M(ReplicasMaxQueueSize) \
M(ReplicasMaxInsertsInQueue) \
M(ReplicasMaxMergesInQueue) \
M(ReplicasSumQueueSize) \
M(ReplicasSumInsertsInQueue) \
M(ReplicasSumMergesInQueue) \
M(ReplicasMaxAbsoluteDelay) \
M(ReplicasMaxRelativeDelay) \
M(MaxPartCountForPartition) \
\
M(END)
......
#pragma once
#include <thread>
#include <mutex>
#include <condition_variable>
namespace DB
{
class Context;
/** Periodically (each minute, starting at 30 seconds offset)
* calculates and updates some metrics (see CurrentMetrics.h),
* that are not updated automatically (so, need to be actively calculated).
*/
class ActiveMetrics
{
public:
ActiveMetrics(Context & context_)
: context(context_), thread([this] { run(); })
{
}
~ActiveMetrics();
private:
Context & context;
bool quit {false};
std::mutex mutex;
std::condition_variable cond;
std::thread thread;
void run();
void update();
};
}
#include <DB/Interpreters/ActiveMetrics.h>
#include <DB/Common/Exception.h>
#include <DB/Common/setThreadName.h>
#include <DB/Common/CurrentMetrics.h>
#include <DB/Storages/MarkCache.h>
#include <DB/Storages/StorageMergeTree.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/IO/UncompressedCache.h>
#include <DB/Databases/IDatabase.h>
#include <chrono>
namespace DB
{
ActiveMetrics::~ActiveMetrics()
{
try
{
{
std::lock_guard<std::mutex> lock{mutex};
quit = true;
}
cond.notify_one();
thread.join();
}
catch (...)
{
DB::tryLogCurrentException(__FUNCTION__);
}
}
void ActiveMetrics::run()
{
using namespace std::literals;
setThreadName("ActiveMetrics");
std::this_thread::sleep_for(30s); /// To be distant with moment of transmission of metrics. It is not strictly necessary.
const auto get_next_minute = []
{
return std::chrono::time_point_cast<std::chrono::minutes, std::chrono::system_clock>(
std::chrono::system_clock::now() + std::chrono::minutes(1));
};
std::unique_lock<std::mutex> lock{mutex};
while (true)
{
if (cond.wait_until(lock, get_next_minute(), [this] { return quit; }))
break;
try
{
update();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
template <typename Max>
static void calculateMax(Max & max, Max x)
{
if (x > max)
max = x;
}
template <typename Max, typename Sum>
static void calculateMaxAndSum(Max & max, Sum & sum, Max x)
{
sum += x;
if (x > max)
max = x;
}
void ActiveMetrics::update()
{
{
if (auto mark_cache = context.getMarkCache())
{
CurrentMetrics::set(CurrentMetrics::MarkCacheBytes, mark_cache->weight());
CurrentMetrics::set(CurrentMetrics::MarkCacheFiles, mark_cache->count());
}
}
{
if (auto uncompressed_cache = context.getUncompressedCache())
{
CurrentMetrics::set(CurrentMetrics::UncompressedCacheBytes, uncompressed_cache->weight());
CurrentMetrics::set(CurrentMetrics::UncompressedCacheCells, uncompressed_cache->count());
}
}
{
auto databases = context.getDatabases();
size_t max_queue_size = 0;
size_t max_inserts_in_queue = 0;
size_t max_merges_in_queue = 0;
size_t sum_queue_size = 0;
size_t sum_inserts_in_queue = 0;
size_t sum_merges_in_queue = 0;
size_t max_absolute_delay = 0;
size_t max_relative_delay = 0;
size_t max_part_count_for_partition = 0;
for (const auto & db : databases)
{
for (auto iterator = db.second->getIterator(); iterator->isValid(); iterator->next())
{
auto & table = iterator->table();
StorageMergeTree * table_merge_tree = typeid_cast<StorageMergeTree *>(table.get());
StorageReplicatedMergeTree * table_replicated_merge_tree = typeid_cast<StorageReplicatedMergeTree *>(table.get());
if (table_replicated_merge_tree)
{
StorageReplicatedMergeTree::Status status;
table_replicated_merge_tree->getStatus(status, false);
calculateMaxAndSum(max_queue_size, sum_queue_size, status.queue.queue_size);
calculateMaxAndSum(max_inserts_in_queue, sum_inserts_in_queue, status.queue.inserts_in_queue);
calculateMaxAndSum(max_merges_in_queue, sum_merges_in_queue, status.queue.merges_in_queue);
try
{
time_t absolute_delay = 0;
time_t relative_delay = 0;
table_replicated_merge_tree->getReplicaDelays(absolute_delay, relative_delay);
calculateMax(max_absolute_delay, absolute_delay);
calculateMax(max_relative_delay, relative_delay);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__,
"Cannot get replica delay for table: " + backQuoteIfNeed(db.first) + "." + backQuoteIfNeed(iterator->name()));
}
calculateMax(max_part_count_for_partition, table_replicated_merge_tree->getData().getMaxPartsCountForMonth());
if (auto unreplicated_data = table_replicated_merge_tree->getUnreplicatedData())
calculateMax(max_part_count_for_partition, unreplicated_data->getMaxPartsCountForMonth());
}
if (table_merge_tree)
{
calculateMax(max_part_count_for_partition, table_merge_tree->getData().getMaxPartsCountForMonth());
}
}
}
CurrentMetrics::set(CurrentMetrics::ReplicasMaxQueueSize, max_queue_size);
CurrentMetrics::set(CurrentMetrics::ReplicasMaxInsertsInQueue, max_inserts_in_queue);
CurrentMetrics::set(CurrentMetrics::ReplicasMaxMergesInQueue, max_merges_in_queue);
CurrentMetrics::set(CurrentMetrics::ReplicasSumQueueSize, sum_queue_size);
CurrentMetrics::set(CurrentMetrics::ReplicasSumInsertsInQueue, sum_inserts_in_queue);
CurrentMetrics::set(CurrentMetrics::ReplicasSumMergesInQueue, sum_merges_in_queue);
CurrentMetrics::set(CurrentMetrics::ReplicasMaxAbsoluteDelay, max_absolute_delay);
CurrentMetrics::set(CurrentMetrics::ReplicasMaxRelativeDelay, max_relative_delay);
CurrentMetrics::set(CurrentMetrics::MaxPartCountForPartition, max_part_count_for_partition);
}
/// Add more metrics as you wish.
}
}
......@@ -32,10 +32,10 @@ void MetricsTransmitter::run()
{
setThreadName("MetricsTransmit");
const auto get_next_minute = [] {
const auto get_next_minute = []
{
return std::chrono::time_point_cast<std::chrono::minutes, std::chrono::system_clock>(
std::chrono::system_clock::now() + std::chrono::minutes(1)
);
std::chrono::system_clock::now() + std::chrono::minutes(1));
};
std::unique_lock<std::mutex> lock{mutex};
......
......@@ -20,6 +20,7 @@
#include <DB/Interpreters/loadMetadata.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/Interpreters/ActiveMetrics.h>
#include <DB/Storages/System/StorageSystemNumbers.h>
#include <DB/Storages/System/StorageSystemTables.h>
......@@ -496,14 +497,17 @@ int Server::main(const std::vector<std::string> & args)
global_context->tryCreateDictionaries();
global_context->tryCreateExternalDictionaries();
}
waitForTerminationRequest();
}
catch (...)
{
LOG_ERROR(log, "Caught exception while loading dictionaries.");
throw;
}
/// This object will periodically calculate some metrics.
ActiveMetrics active_metrics(*global_context);
waitForTerminationRequest();
}
return Application::EXIT_OK;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册