未验证 提交 6f6120f6 编写于 作者: Y yi wang 提交者: GitHub

enhancement metric report, support LatencyHistLoggedGuard (#117)

* enhancement metric report, support LatencyHistLoggedGuard
上级 5f922446
......@@ -581,6 +581,7 @@ set(SOURCES
db/memtable_list.cc
db/merge_helper.cc
db/merge_operator.cc
db/metrics_reporter.cc
db/periodic_work_scheduler.cc
db/range_del_aggregator.cc
db/range_tombstone_fragmenter.cc
......@@ -994,6 +995,7 @@ if(WITH_TESTS)
db/memtable_list_test.cc
db/merge_helper_test.cc
db/merge_test.cc
db/metrics_reporter_test.cc
db/options_file_test.cc
db/perf_context_test.cc
db/periodic_work_scheduler_test.cc
......
......@@ -281,55 +281,55 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
write_qps_reporter_(*metrics_reporter_factory_->BuildCountReporter(
write_qps_metric_name, bytedance_tags_,
immutable_db_options_.info_log.get())),
immutable_db_options_.info_log.get(), env_)),
read_qps_reporter_(*metrics_reporter_factory_->BuildCountReporter(
read_qps_metric_name, bytedance_tags_,
immutable_db_options_.info_log.get())),
immutable_db_options_.info_log.get(), env_)),
newiterator_qps_reporter_(*metrics_reporter_factory_->BuildCountReporter(
newiterator_qps_metric_name, bytedance_tags_,
immutable_db_options_.info_log.get())),
immutable_db_options_.info_log.get(), env_)),
seek_qps_reporter_(*metrics_reporter_factory_->BuildCountReporter(
seek_qps_metric_name, bytedance_tags_,
immutable_db_options_.info_log.get())),
immutable_db_options_.info_log.get(), env_)),
next_qps_reporter_(*metrics_reporter_factory_->BuildCountReporter(
next_qps_metric_name, bytedance_tags_,
immutable_db_options_.info_log.get())),
immutable_db_options_.info_log.get(), env_)),
seekforprev_qps_reporter_(*metrics_reporter_factory_->BuildCountReporter(
seekforprev_qps_metric_name, bytedance_tags_,
immutable_db_options_.info_log.get())),
immutable_db_options_.info_log.get(), env_)),
prev_qps_reporter_(*metrics_reporter_factory_->BuildCountReporter(
prev_qps_metric_name, bytedance_tags_,
immutable_db_options_.info_log.get())),
immutable_db_options_.info_log.get(), env_)),
write_latency_reporter_(*metrics_reporter_factory_->BuildHistReporter(
write_latency_metric_name, bytedance_tags_,
immutable_db_options_.info_log.get())),
immutable_db_options_.info_log.get(), env_)),
read_latency_reporter_(*metrics_reporter_factory_->BuildHistReporter(
read_latency_metric_name, bytedance_tags_,
immutable_db_options_.info_log.get())),
immutable_db_options_.info_log.get(), env_)),
newiterator_latency_reporter_(
*metrics_reporter_factory_->BuildHistReporter(
newiterator_latency_metric_name, bytedance_tags_,
immutable_db_options_.info_log.get())),
immutable_db_options_.info_log.get(), env_)),
seek_latency_reporter_(*metrics_reporter_factory_->BuildHistReporter(
seek_latency_metric_name, bytedance_tags_,
immutable_db_options_.info_log.get())),
immutable_db_options_.info_log.get(), env_)),
next_latency_reporter_(*metrics_reporter_factory_->BuildHistReporter(
next_latency_metric_name, bytedance_tags_,
immutable_db_options_.info_log.get())),
immutable_db_options_.info_log.get(), env_)),
seekforprev_latency_reporter_(
*metrics_reporter_factory_->BuildHistReporter(
seekforprev_latency_metric_name, bytedance_tags_,
immutable_db_options_.info_log.get())),
immutable_db_options_.info_log.get(), env_)),
prev_latency_reporter_(*metrics_reporter_factory_->BuildHistReporter(
prev_latency_metric_name, bytedance_tags_,
immutable_db_options_.info_log.get())),
immutable_db_options_.info_log.get(), env_)),
write_throughput_reporter_(*metrics_reporter_factory_->BuildCountReporter(
write_throughput_metric_name, bytedance_tags_,
immutable_db_options_.info_log.get())),
immutable_db_options_.info_log.get(), env_)),
write_batch_size_reporter_(*metrics_reporter_factory_->BuildHistReporter(
write_batch_size_metric_name, bytedance_tags_,
immutable_db_options_.info_log.get())) {
immutable_db_options_.info_log.get(), env_)) {
// !batch_per_trx_ implies seq_per_batch_ because it is only unset for
// WriteUnprepared, which should use seq_per_batch_.
assert(batch_per_txn_ || seq_per_batch_);
......
#include "rocksdb/metrics_reporter.h"
#include <inttypes.h>
#include <cstddef>
#include <string>
#include "rocksdb/env.h"
#include "util/logging.h"
#define REPORT_DEBUG_STACKTRACE 1
#if REPORT_DEBUG_STACKTRACE
#include <boost/stacktrace.hpp>
#endif
namespace TERARKDB_NAMESPACE {
LatencyHistGuard::LatencyHistGuard(HistReporterHandle* handle)
: handle_(handle), begin_time_ns_(handle_->GetEnv()->NowNanos()) {
assert(handle_ != nullptr);
}
LatencyHistGuard::~LatencyHistGuard() {
auto us = (handle_->GetEnv()->NowNanos() - begin_time_ns_) / 1000;
handle_->AddRecord(us);
}
LatencyHistLoggedGuard::LatencyHistLoggedGuard(HistReporterHandle* handle,
uint64_t threshold_us)
: handle_(handle),
begin_time_ns_(handle_->GetEnv()->NowNanos()),
log_threshold_us_(threshold_us) {
assert(handle_ != nullptr);
#if REPORT_DEBUG_STACKTRACE
auto stacktrace = new boost::stacktrace::stacktrace();
start_stacktrace_ = stacktrace;
#endif
}
LatencyHistLoggedGuard::~LatencyHistLoggedGuard() {
auto us = (handle_->GetEnv()->NowNanos() - begin_time_ns_) / 1000;
handle_->AddRecord(us);
if (us >= log_threshold_us_ && handle_->GetLogger() != nullptr) {
#if REPORT_DEBUG_STACKTRACE
auto stacktrace =
static_cast<boost::stacktrace::stacktrace*>(start_stacktrace_);
ROCKS_LOG_WARN(
handle_->GetLogger(),
"[name:%s] [tags:%s]: %" PRIu64 "us\n%s----------\n%s-----------\n",
handle_->GetName(), handle_->GetTag(), us,
boost::stacktrace::to_string(*stacktrace).c_str(),
boost::stacktrace::to_string(boost::stacktrace::stacktrace()).c_str());
#else
ROCKS_LOG_WARN(handle_->GetLogger(), "[name:%s] [tags:%s]: %" PRIu64 "us\n",
handle_->GetName(), handle_->GetTag(),
static_cast<uint64_t>(us));
#endif
}
#if REPORT_DEBUG_STACKTRACE
auto stacktrace =
static_cast<boost::stacktrace::stacktrace*>(start_stacktrace_);
start_stacktrace_ = nullptr;
delete stacktrace;
#endif
}
} // namespace TERARKDB_NAMESPACE
\ No newline at end of file
#include "rocksdb/metrics_reporter.h"
#include <atomic>
#include "port/stack_trace.h"
#include "rocksdb/env.h"
#include "rocksdb/terark_namespace.h"
#include "util/logging.h"
#include "util/mock_time_env.h"
#include "util/testharness.h"
namespace TERARKDB_NAMESPACE {
class TestLogger : public Logger {
public:
TestLogger(int* count) : count_(count) {}
using Logger::Logv;
virtual void Logv(const char* /*format*/, va_list /*ap*/) override {
++*count_;
};
private:
int* count_;
};
class TestHistReporterHandle : public HistReporterHandle {
public:
TestHistReporterHandle(Env* const env)
: name_("name"), tags_("tags"), count(0), log_(&count), env_(env) {}
Env* GetEnv() { return env_; }
~TestHistReporterHandle() {}
public:
void AddRecord(size_t val) override { stat_.push_back(val); }
Logger* GetLogger() override { return &log_; }
const char* GetTag() { return tags_.c_str(); }
const char* GetName() { return name_.c_str(); }
int LoggerCount() { return count; }
private:
const std::string name_;
const std::string tags_;
int count;
TestLogger log_;
Env* env_;
std::vector<size_t> stat_;
};
class MockMetricsReporterTest : public testing::Test {
public:
MockMetricsReporterTest() : env_(Env::Default()), handler_(&env_) {}
protected:
MockTimeEnv env_;
TestHistReporterHandle handler_;
};
TEST_F(MockMetricsReporterTest, Basic) {
ASSERT_EQ(handler_.LoggerCount(), 0);
{
LatencyHistLoggedGuard g(&handler_, 100);
env_.MockSleepForMicroseconds(100);
}
ASSERT_EQ(handler_.LoggerCount(), 1);
{
LatencyHistLoggedGuard g(&handler_, 100);
env_.MockSleepForMicroseconds(99);
}
ASSERT_EQ(handler_.LoggerCount(), 1);
}
} // namespace TERARKDB_NAMESPACE
int main(int argc, char** argv) {
TERARKDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
\ No newline at end of file
#pragma once
#include <chrono>
#include <inttypes.h>
#include <cstddef>
#include <string>
#include "rocksdb/env.h"
#include "rocksdb/terark_namespace.h"
namespace TERARKDB_NAMESPACE {
class Env;
class Logger;
class HistReporterHandle {
public:
HistReporterHandle() = default;
virtual Logger* GetLogger() { return nullptr; }
virtual const char* GetTag() { return ""; }
virtual const char* GetName() { return ""; }
virtual Env* GetEnv() { return nullptr; }
virtual ~HistReporterHandle() = default;
public:
......@@ -20,22 +29,26 @@ class HistReporterHandle {
class LatencyHistGuard {
public:
explicit LatencyHistGuard(HistReporterHandle* handle)
: handle_(handle),
begin_time_(std::chrono::high_resolution_clock::now()) {}
~LatencyHistGuard() {
if (handle_ != nullptr) {
auto us = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - begin_time_)
.count();
handle_->AddRecord(us);
}
}
explicit LatencyHistGuard(HistReporterHandle* handle);
~LatencyHistGuard();
private:
HistReporterHandle* handle_;
uint64_t begin_time_ns_;
};
class LatencyHistLoggedGuard {
public:
explicit LatencyHistLoggedGuard(HistReporterHandle* handle,
uint64_t threshold_us = 500 * 1000);
~LatencyHistLoggedGuard();
private:
HistReporterHandle* handle_;
decltype(std::chrono::high_resolution_clock::now()) begin_time_;
uint64_t begin_time_ns_;
uint64_t log_threshold_us_;
void* start_stacktrace_;
};
class CountReporterHandle {
......@@ -57,10 +70,12 @@ class MetricsReporterFactory {
public:
virtual HistReporterHandle* BuildHistReporter(const std::string& name,
const std::string& tags,
Logger* log) = 0;
Logger* log,
Env* const env) = 0;
virtual CountReporterHandle* BuildCountReporter(const std::string& name,
const std::string& tags,
Logger* log) = 0;
Logger* log,
Env* const env) = 0;
};
} // namespace TERARKDB_NAMESPACE
\ No newline at end of file
......@@ -2,7 +2,6 @@
#include <cassert>
#include <mutex>
#ifdef TERARKDB_ENABLE_METRICS
#include "metrics.h"
#endif
......@@ -12,6 +11,8 @@
namespace TERARKDB_NAMESPACE {
const int kNanosInMilli = 1000000;
#ifdef TERARKDB_ENABLE_METRICS
static std::mutex metrics_mtx;
static std::atomic<bool> metrics_init{false};
......@@ -50,8 +51,8 @@ static int GetThreadID() {
}
#else
namespace {
static ByteDanceHistReporterHandle dummy_hist_("", "", nullptr);
static ByteDanceCountReporterHandle dummy_count_("", "", nullptr);
static ByteDanceHistReporterHandle dummy_hist_("", "", nullptr, nullptr);
static ByteDanceCountReporterHandle dummy_count_("", "", nullptr, nullptr);
} // namespace
#endif
......@@ -65,22 +66,18 @@ void ByteDanceHistReporterHandle::AddRecord(size_t val) {
auto& tls_stat = *tls_stat_ptr;
tls_stat.AppendRecord(val);
auto curr_time = std::chrono::high_resolution_clock::now();
auto diff_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
curr_time - tls_stat.last_report_time)
.count();
auto curr_time_ns = env_->NowNanos();
auto diff_ms = (curr_time_ns - tls_stat.last_report_time_ns_) / kNanosInMilli;
if (diff_ms > 1000 && !merge_lock_.load(std::memory_order_relaxed) &&
!merge_lock_.exchange(true, std::memory_order_acquire)) {
stats_.Merge(tls_stat);
diff_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
curr_time - stats_.last_report_time)
.count();
diff_ms = (curr_time_ns - stats_.last_report_time_ns_) / kNanosInMilli;
if (diff_ms > 5000) {
auto result = stats_.GetResult({0.50, 0.99, 0.999});
stats_.Reset();
stats_.last_report_time = curr_time;
stats_.last_report_time_ns_ = curr_time_ns;
merge_lock_.store(false, std::memory_order_release);
cpputil::metrics2::Metrics::emit_store(name_ + "_p50", result[0], tags_);
......@@ -89,9 +86,7 @@ void ByteDanceHistReporterHandle::AddRecord(size_t val) {
cpputil::metrics2::Metrics::emit_store(name_ + "_avg", result[3], tags_);
cpputil::metrics2::Metrics::emit_store(name_ + "_max", result[4], tags_);
diff_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
curr_time - last_log_time_)
.count();
diff_ms = (curr_time_ns - last_log_time_ns_) / kNanosInMilli;
if (diff_ms > 10 * 60 * 1000) {
ROCKS_LOG_INFO(log_, "name:%s P50, tags:%s, val:%zu", name_.c_str(),
tags_.c_str(), result[0]);
......@@ -103,14 +98,14 @@ void ByteDanceHistReporterHandle::AddRecord(size_t val) {
tags_.c_str(), result[3]);
ROCKS_LOG_INFO(log_, "name:%s Max, tags:%s, val:%zu", name_.c_str(),
tags_.c_str(), result[4]);
last_log_time_ = curr_time;
last_log_time_ns_ = curr_time_ns;
}
} else {
merge_lock_.store(false, std::memory_order_release);
}
tls_stat.Reset();
tls_stat.last_report_time = curr_time;
tls_stat.last_report_time_ns_ = curr_time_ns;
}
}
#else
......@@ -125,7 +120,7 @@ HistStats<>* ByteDanceHistReporterHandle::GetThreadLocalStats() {
}
auto& s = stats_arr_[id];
if (s == nullptr) {
s = new HistStats<>;
s = new HistStats<>(env_->NowNanos());
}
return s;
#else
......@@ -138,27 +133,22 @@ void ByteDanceCountReporterHandle::AddCount(size_t n) {
count_.fetch_add(n, std::memory_order_relaxed);
if (!reporter_lock_.load(std::memory_order_relaxed)) {
if (!reporter_lock_.exchange(true, std::memory_order_acquire)) {
auto curr_time = std::chrono::high_resolution_clock::now();
auto diff_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
curr_time - last_report_time_)
.count();
auto curr_time_ns = env_->NowNanos();
auto diff_ms = (curr_time_ns - last_report_time_ns_) / kNanosInMilli;
if (diff_ms > 1000) {
size_t curr_count = count_.load(std::memory_order_relaxed);
size_t qps = (curr_count - last_report_count_) /
(static_cast<double>(diff_ms) / 1000);
cpputil::metrics2::Metrics::emit_store(name_, qps, tags_);
last_report_time_ = curr_time;
last_report_time_ns_ = curr_time_ns;
last_report_count_ = curr_count;
diff_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
curr_time - last_log_time_)
.count();
diff_ms = (curr_time_ns - last_log_time_ns_) / kNanosInMilli;
if (diff_ms > 10 * 60 * 1000) {
ROCKS_LOG_INFO(log_, "name:%s, tags:%s, val:%zu", name_.c_str(),
tags_.c_str(), qps);
last_log_time_ = curr_time;
last_log_time_ns_ = curr_time_ns;
}
}
reporter_lock_.store(false, std::memory_order_release);
......@@ -203,14 +193,16 @@ void ByteDanceMetricsReporterFactory::InitNamespace(const std::string&) {}
#ifdef TERARKDB_ENABLE_METRICS
ByteDanceHistReporterHandle* ByteDanceMetricsReporterFactory::BuildHistReporter(
const std::string& name, const std::string& tags, Logger* log) {
const std::string& name, const std::string& tags, Logger* log,
Env* const env) {
std::lock_guard<std::mutex> guard(metrics_mtx);
hist_reporters_.emplace_back(name, tags, log);
hist_reporters_.emplace_back(name, tags, log, env);
return &hist_reporters_.back();
}
#else
ByteDanceHistReporterHandle* ByteDanceMetricsReporterFactory::BuildHistReporter(
const std::string& /*name*/, const std::string& /*tags*/, Logger* /*log*/) {
const std::string& /*name*/, const std::string& /*tags*/, Logger* /*log*/,
Env* const /*env*/) {
return &dummy_hist_;
}
#endif
......@@ -219,16 +211,18 @@ ByteDanceHistReporterHandle* ByteDanceMetricsReporterFactory::BuildHistReporter(
ByteDanceCountReporterHandle*
ByteDanceMetricsReporterFactory::BuildCountReporter(const std::string& name,
const std::string& tags,
Logger* log) {
Logger* log,
Env* const env) {
std::lock_guard<std::mutex> guard(metrics_mtx);
count_reporters_.emplace_back(name, tags, log);
count_reporters_.emplace_back(name, tags, log, env);
return &count_reporters_.back();
}
#else
ByteDanceCountReporterHandle*
ByteDanceMetricsReporterFactory::BuildCountReporter(const std::string& /*name*/,
const std::string& /*tags*/,
Logger* /*log*/) {
Logger* /*log*/,
Env* const /*env*/) {
return &dummy_count_;
}
#endif
......
#include <atomic>
#include <chrono>
#include <list>
#include <deque>
#include "rocksdb/env.h"
#include "rocksdb/metrics_reporter.h"
#include "rocksdb/terark_namespace.h"
#include "stats.h"
namespace TERARKDB_NAMESPACE {
class ByteDanceHistReporterHandle : public HistReporterHandle {
public:
#ifdef TERARKDB_ENABLE_METRICS
ByteDanceHistReporterHandle(const std::string& name, const std::string& tags,
Logger* log)
Logger* log, Env* const env)
: name_(name),
tags_(tags),
last_log_time_(std::chrono::high_resolution_clock::now()),
log_(log) {}
env_(env),
last_log_time_ns_(env_->NowNanos()),
log_(log),
stats_(env_->NowNanos()) {}
virtual Env* GetEnv() override { return env_; }
#else
ByteDanceHistReporterHandle(const std::string& /*name*/,
const std::string& /*tags*/, Logger* /*log*/) {}
const std::string& /*tags*/, Logger* /*log*/,
Env* const /*env*/) {}
#endif
~ByteDanceHistReporterHandle() override {
......@@ -33,6 +35,28 @@ class ByteDanceHistReporterHandle : public HistReporterHandle {
public:
void AddRecord(size_t val) override;
Logger* GetLogger() override {
#ifdef TERARKDB_ENABLE_METRICS
return log_;
#else
return nullptr;
#endif
}
const char* GetTag() {
#ifdef TERARKDB_ENABLE_METRICS
return tags_.c_str();
#else
return "";
#endif
}
const char* GetName() {
#ifdef TERARKDB_ENABLE_METRICS
return name_.c_str();
#else
return "";
#endif
}
private:
#ifdef TERARKDB_ENABLE_METRICS
enum {
......@@ -41,8 +65,9 @@ class ByteDanceHistReporterHandle : public HistReporterHandle {
const std::string& name_;
const std::string& tags_;
Env* env_;
std::chrono::high_resolution_clock::time_point last_log_time_;
uint64_t last_log_time_ns_;
Logger* log_;
std::array<HistStats<>*, kMaxThreadNum> stats_arr_{};
......@@ -58,15 +83,17 @@ class ByteDanceCountReporterHandle : public CountReporterHandle {
public:
#ifdef TERARKDB_ENABLE_METRICS
ByteDanceCountReporterHandle(const std::string& name, const std::string& tags,
Logger* log)
Logger* log, Env* const env)
: name_(name),
tags_(tags),
last_report_time_(std::chrono::high_resolution_clock::now()),
last_log_time_(std::chrono::high_resolution_clock::now()),
env_(env),
last_report_time_ns_(env_->NowNanos()),
last_log_time_ns_(env_->NowNanos()),
log_(log) {}
#else
ByteDanceCountReporterHandle(const std::string& /*name*/,
const std::string& /*tags*/, Logger* /*log*/) {}
const std::string& /*tags*/, Logger* /*log*/,
Env* const /*env*/) {}
#endif
~ByteDanceCountReporterHandle() override = default;
......@@ -80,15 +107,13 @@ class ByteDanceCountReporterHandle : public CountReporterHandle {
const std::string& name_;
const std::string& tags_;
std::chrono::high_resolution_clock::time_point last_report_time_;
Env* const env_;
uint64_t last_report_time_ns_;
size_t last_report_count_ = 0;
std::chrono::high_resolution_clock::time_point last_log_time_;
uint64_t last_log_time_ns_;
Logger* log_;
char _padding_[64 /* x86 cache line size */ - 8 * 7];
std::atomic<size_t> count_{0};
#endif
};
......@@ -97,23 +122,25 @@ class ByteDanceMetricsReporterFactory : public MetricsReporterFactory {
public:
ByteDanceMetricsReporterFactory();
ByteDanceMetricsReporterFactory(const std::string& ns);
explicit ByteDanceMetricsReporterFactory(const std::string& ns);
~ByteDanceMetricsReporterFactory() override = default;
public:
ByteDanceHistReporterHandle* BuildHistReporter(const std::string& name,
const std::string& tags,
Logger* log) override;
Logger* log,
Env* const env) override;
ByteDanceCountReporterHandle* BuildCountReporter(const std::string& name,
const std::string& tags,
Logger* log) override;
Logger* log,
Env* const env) override;
private:
#ifdef TERARKDB_ENABLE_METRICS
std::list<ByteDanceHistReporterHandle> hist_reporters_;
std::list<ByteDanceCountReporterHandle> count_reporters_;
std::deque<ByteDanceHistReporterHandle> hist_reporters_;
std::deque<ByteDanceCountReporterHandle> count_reporters_;
#endif
void InitNamespace(const std::string& ns);
......
......@@ -2,7 +2,6 @@
#include <algorithm>
#include <array>
#include <chrono>
#include <cstddef>
#include <numeric>
#include <vector>
......@@ -14,7 +13,8 @@ namespace TERARKDB_NAMESPACE {
template <size_t MAX_LATENCY_US_FAST = 10 * 1000> // 10ms
class HistStats {
public:
HistStats() : last_report_time(std::chrono::high_resolution_clock::now()) {}
explicit HistStats(uint64_t last_report_time_ns)
: last_report_time_ns_(last_report_time_ns) {}
void AppendRecord(size_t us) {
if (us < buckets_.size()) {
......@@ -82,7 +82,7 @@ class HistStats {
}
}
std::chrono::high_resolution_clock::time_point last_report_time;
uint64_t last_report_time_ns_;
private:
std::array<size_t, MAX_LATENCY_US_FAST> buckets_{};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册