提交 69760b4d 编写于 作者: J Jay Zhuang 提交者: Facebook GitHub Bot

Introduce a global StatsDumpScheduler for stats dumping (#7223)

Summary:
Have a global StatsDumpScheduler for all DB instance stats dumping, including `DumpStats()` and `PersistStats()`. Before this, there're 2 dedicate threads for every DB instance, one for DumpStats() one for PersistStats(), which could create lots of threads if there're hundreds DB instances.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7223

Reviewed By: riversand963

Differential Revision: D23056737

Pulled By: jay-zhuang

fbshipit-source-id: 0faa2311142a73433ebb3317361db7cbf43faeba
上级 d758273c
......@@ -652,6 +652,7 @@ set(SOURCES
monitoring/perf_level.cc
monitoring/persistent_stats_history.cc
monitoring/statistics.cc
monitoring/stats_dump_scheduler.cc
monitoring/thread_status_impl.cc
monitoring/thread_status_updater.cc
monitoring/thread_status_util.cc
......@@ -1101,6 +1102,7 @@ if(WITH_TESTS)
monitoring/histogram_test.cc
monitoring/iostats_context_test.cc
monitoring/statistics_test.cc
monitoring/stats_dump_scheduler_test.cc
monitoring/stats_history_test.cc
options/options_settable_test.cc
options/options_test.cc
......
......@@ -8,6 +8,9 @@
* A new option `std::shared_ptr<FileChecksumGenFactory> file_checksum_gen_factory` is added to `BackupableDBOptions`. The default value for this option is `nullptr`. If this option is null, the default backup engine checksum function (crc32c) will be used for creating, verifying, or restoring backups. If it is not null and is set to the DB custom checksum factory, the custom checksum function used in DB will also be used for creating, verifying, or restoring backups, in addition to the default checksum function (crc32c). If it is not null and is set to a custom checksum factory different than the DB custom checksum factory (which may be null), BackupEngine will return `Status::InvalidArgument()`.
* A new field `std::string requested_checksum_func_name` is added to `FileChecksumGenContext`, which enables the checksum factory to create generators for a suite of different functions.
### Performance Improvements
* Reduce thread number for multiple DB instances by re-using one global thread for statistics dumping and persisting.
## 6.12 (2020-07-28)
### Public API Change
* Encryption file classes now exposed for inheritance in env_encryption.h
......
......@@ -1808,6 +1808,9 @@ blob_file_garbage_test: $(OBJ_DIR)/db/blob/blob_file_garbage_test.o $(TEST_LIBRA
timer_test: $(OBJ_DIR)/util/timer_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
stats_dump_scheduler_test: $(OBJ_DIR)/monitoring/stats_dump_scheduler_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
testutil_test: $(OBJ_DIR)/test_util/testutil_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
......
......@@ -231,6 +231,7 @@ cpp_library(
"monitoring/perf_level.cc",
"monitoring/persistent_stats_history.cc",
"monitoring/statistics.cc",
"monitoring/stats_dump_scheduler.cc",
"monitoring/thread_status_impl.cc",
"monitoring/thread_status_updater.cc",
"monitoring/thread_status_updater_debug.cc",
......@@ -1470,6 +1471,13 @@ ROCKS_TESTS = [
[],
[],
],
[
"stats_dump_scheduler_test",
"monitoring/stats_dump_scheduler_test.cc",
"serial",
[],
[],
],
[
"stats_history_test",
"monitoring/stats_history_test.cc",
......
......@@ -149,7 +149,7 @@ Status CompactedDBImpl::Open(const Options& options,
std::unique_ptr<CompactedDBImpl> db(new CompactedDBImpl(db_options, dbname));
Status s = db->Init(options);
if (s.ok()) {
db->StartTimedTasks();
db->StartStatsDumpScheduler();
ROCKS_LOG_INFO(db->immutable_db_options_.info_log,
"Opened the db as fully compacted mode");
LogFlush(db->immutable_db_options_.info_log);
......
......@@ -65,6 +65,7 @@
#include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/persistent_stats_history.h"
#include "monitoring/stats_dump_scheduler.h"
#include "monitoring/thread_status_updater.h"
#include "monitoring/thread_status_util.h"
#include "options/cf_options.h"
......@@ -205,6 +206,9 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
bg_compaction_paused_(0),
refitting_level_(false),
opened_successfully_(false),
#ifndef ROCKSDB_LITE
stats_dump_scheduler_(nullptr),
#endif // ROCKSDB_LITE
two_write_queues_(options.two_write_queues),
manual_wal_flush_(options.manual_wal_flush),
// last_sequencee_ is always maintained by the main queue that also writes
......@@ -440,14 +444,12 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Shutdown: canceling all background work");
if (thread_dump_stats_ != nullptr) {
thread_dump_stats_->cancel();
thread_dump_stats_.reset();
}
if (thread_persist_stats_ != nullptr) {
thread_persist_stats_->cancel();
thread_persist_stats_.reset();
#ifndef ROCKSDB_LITE
if (stats_dump_scheduler_ != nullptr) {
stats_dump_scheduler_->Unregister(this);
}
#endif // !ROCKSDB_LITE
InstrumentedMutexLock l(&mutex_);
if (!shutting_down_.load(std::memory_order_acquire) &&
has_unpersisted_data_.load(std::memory_order_relaxed) &&
......@@ -680,36 +682,19 @@ void DBImpl::PrintStatistics() {
}
}
void DBImpl::StartTimedTasks() {
unsigned int stats_dump_period_sec = 0;
unsigned int stats_persist_period_sec = 0;
void DBImpl::StartStatsDumpScheduler() {
#ifndef ROCKSDB_LITE
{
InstrumentedMutexLock l(&mutex_);
stats_dump_period_sec = mutable_db_options_.stats_dump_period_sec;
if (stats_dump_period_sec > 0) {
if (!thread_dump_stats_) {
// In case of many `DB::Open()` in rapid succession we can have all
// threads dumping at once, which causes severe lock contention in
// jemalloc. Ensure successive `DB::Open()`s are staggered by at least
// one second in the common case.
static std::atomic<uint64_t> stats_dump_threads_started(0);
thread_dump_stats_.reset(new ROCKSDB_NAMESPACE::RepeatableThread(
[this]() { DBImpl::DumpStats(); }, "dump_st", env_,
static_cast<uint64_t>(stats_dump_period_sec) * kMicrosInSecond,
stats_dump_threads_started.fetch_add(1) %
static_cast<uint64_t>(stats_dump_period_sec) *
kMicrosInSecond));
}
}
stats_persist_period_sec = mutable_db_options_.stats_persist_period_sec;
if (stats_persist_period_sec > 0) {
if (!thread_persist_stats_) {
thread_persist_stats_.reset(new ROCKSDB_NAMESPACE::RepeatableThread(
[this]() { DBImpl::PersistStats(); }, "pst_st", env_,
static_cast<uint64_t>(stats_persist_period_sec) * kMicrosInSecond));
}
}
stats_dump_scheduler_ = StatsDumpScheduler::Default();
TEST_SYNC_POINT_CALLBACK("DBImpl::StartStatsDumpScheduler:Init",
&stats_dump_scheduler_);
}
stats_dump_scheduler_->Register(this,
mutable_db_options_.stats_dump_period_sec,
mutable_db_options_.stats_persist_period_sec);
#endif // !ROCKSDB_LITE
}
// esitmate the total size of stats_history_
......@@ -735,7 +720,9 @@ void DBImpl::PersistStats() {
if (shutdown_initiated_) {
return;
}
TEST_SYNC_POINT("DBImpl::PersistStats:StartRunning");
uint64_t now_seconds = env_->NowMicros() / kMicrosInSecond;
Statistics* statistics = immutable_db_options_.statistics.get();
if (!statistics) {
return;
......@@ -826,6 +813,7 @@ void DBImpl::PersistStats() {
" bytes, slice count: %" ROCKSDB_PRIszt,
stats_history_size, stats_history_.size());
}
TEST_SYNC_POINT("DBImpl::PersistStats:End");
#endif // !ROCKSDB_LITE
}
......@@ -880,6 +868,7 @@ void DBImpl::DumpStats() {
if (shutdown_initiated_) {
return;
}
TEST_SYNC_POINT("DBImpl::DumpStats:StartRunning");
{
InstrumentedMutexLock l(&mutex_);
default_cf_internal_stats_->GetStringProperty(
......@@ -1087,45 +1076,22 @@ Status DBImpl::SetDBOptions(
}
if (new_options.stats_dump_period_sec !=
mutable_db_options_.stats_dump_period_sec) {
if (thread_dump_stats_) {
mutable_db_options_.stats_dump_period_sec ||
new_options.stats_persist_period_sec !=
mutable_db_options_.stats_persist_period_sec) {
if (stats_dump_scheduler_) {
mutex_.Unlock();
thread_dump_stats_->cancel();
stats_dump_scheduler_->Unregister(this);
mutex_.Lock();
}
if (new_options.stats_dump_period_sec > 0) {
// In case many DBs have `stats_dump_period_sec` enabled in rapid
// succession, we can have all threads dumping at once, which causes
// severe lock contention in jemalloc. Ensure successive enabling of
// `stats_dump_period_sec` are staggered by at least one second in the
// common case.
static std::atomic<uint64_t> stats_dump_threads_started(0);
thread_dump_stats_.reset(new ROCKSDB_NAMESPACE::RepeatableThread(
[this]() { DBImpl::DumpStats(); }, "dump_st", env_,
static_cast<uint64_t>(new_options.stats_dump_period_sec) *
kMicrosInSecond,
stats_dump_threads_started.fetch_add(1) %
static_cast<uint64_t>(new_options.stats_dump_period_sec) *
kMicrosInSecond));
} else {
thread_dump_stats_.reset();
}
}
if (new_options.stats_persist_period_sec !=
mutable_db_options_.stats_persist_period_sec) {
if (thread_persist_stats_) {
if (new_options.stats_dump_period_sec > 0 ||
new_options.stats_persist_period_sec > 0) {
mutex_.Unlock();
thread_persist_stats_->cancel();
stats_dump_scheduler_->Register(this,
new_options.stats_dump_period_sec,
new_options.stats_persist_period_sec);
mutex_.Lock();
}
if (new_options.stats_persist_period_sec > 0) {
thread_persist_stats_.reset(new ROCKSDB_NAMESPACE::RepeatableThread(
[this]() { DBImpl::PersistStats(); }, "pst_st", env_,
static_cast<uint64_t>(new_options.stats_persist_period_sec) *
kMicrosInSecond));
} else {
thread_persist_stats_.reset();
}
}
write_controller_.set_max_delayed_write_rate(
new_options.delayed_write_rate);
......
......@@ -70,6 +70,10 @@ class ArenaWrappedDBIter;
class InMemoryStatsHistoryIterator;
class MemTable;
class PersistentStatsHistoryIterator;
class StatsDumpScheduler;
#ifndef NDEBUG
class StatsDumpTestScheduler;
#endif // !NDEBUG
class TableCache;
class TaskLimiterToken;
class Version;
......@@ -988,9 +992,7 @@ class DBImpl : public DB {
int TEST_BGCompactionsAllowed() const;
int TEST_BGFlushesAllowed() const;
size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
void TEST_WaitForDumpStatsRun(std::function<void()> callback) const;
void TEST_WaitForPersistStatsRun(std::function<void()> callback) const;
bool TEST_IsPersistentStatsEnabled() const;
void TEST_WaitForStatsDumpRun(std::function<void()> callback) const;
size_t TEST_EstimateInMemoryStatsHistorySize() const;
VersionSet* TEST_GetVersionSet() const { return versions_.get(); }
......@@ -998,8 +1000,19 @@ class DBImpl : public DB {
const std::unordered_set<uint64_t>& TEST_GetFilesGrabbedForPurge() const {
return files_grabbed_for_purge_;
}
#ifndef ROCKSDB_LITE
StatsDumpTestScheduler* TEST_GetStatsDumpScheduler() const;
#endif // !ROCKSDB_LITE
#endif // NDEBUG
// persist stats to column family "_persistent_stats"
void PersistStats();
// dump rocksdb.stats to LOG
void DumpStats();
protected:
const std::string dbname_;
std::string db_id_;
......@@ -1639,18 +1652,12 @@ class DBImpl : public DB {
LogBuffer* log_buffer);
// Schedule background tasks
void StartTimedTasks();
void StartStatsDumpScheduler();
void PrintStatistics();
size_t EstimateInMemoryStatsHistorySize() const;
// persist stats to column family "_persistent_stats"
void PersistStats();
// dump rocksdb.stats to LOG
void DumpStats();
// Return the minimum empty level that could hold the total data in the
// input level. Return the input level, if such level could not be found.
int FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd,
......@@ -2103,13 +2110,12 @@ class DBImpl : public DB {
// Only to be set during initialization
std::unique_ptr<PreReleaseCallback> recoverable_state_pre_release_callback_;
// handle for scheduling stats dumping at fixed intervals
// REQUIRES: mutex locked
std::unique_ptr<ROCKSDB_NAMESPACE::RepeatableThread> thread_dump_stats_;
// handle for scheduling stats snapshoting at fixed intervals
// REQUIRES: mutex locked
std::unique_ptr<ROCKSDB_NAMESPACE::RepeatableThread> thread_persist_stats_;
#ifndef ROCKSDB_LITE
// Scheduler to run DumpStats() and PersistStats(). Currently, it always use
// a global instance from StatsDumpScheduler::Default(). Only in unittest, it
// can be overrided by StatsDumpTestSchduler.
StatsDumpScheduler* stats_dump_scheduler_;
#endif
// When set, we use a separate queue for writes that don't write to memtable.
// In 2PC these are the writes at Prepare phase.
......
......@@ -12,6 +12,7 @@
#include "db/column_family.h"
#include "db/db_impl/db_impl.h"
#include "db/error_handler.h"
#include "monitoring/stats_dump_scheduler.h"
#include "monitoring/thread_status_updater.h"
#include "util/cast_util.h"
......@@ -271,21 +272,18 @@ size_t DBImpl::TEST_GetWalPreallocateBlockSize(
return GetWalPreallocateBlockSize(write_buffer_size);
}
void DBImpl::TEST_WaitForDumpStatsRun(std::function<void()> callback) const {
if (thread_dump_stats_ != nullptr) {
thread_dump_stats_->TEST_WaitForRun(callback);
#ifndef ROCKSDB_LITE
void DBImpl::TEST_WaitForStatsDumpRun(std::function<void()> callback) const {
if (stats_dump_scheduler_ != nullptr) {
static_cast<StatsDumpTestScheduler*>(stats_dump_scheduler_)
->TEST_WaitForRun(callback);
}
}
void DBImpl::TEST_WaitForPersistStatsRun(std::function<void()> callback) const {
if (thread_persist_stats_ != nullptr) {
thread_persist_stats_->TEST_WaitForRun(callback);
}
}
bool DBImpl::TEST_IsPersistentStatsEnabled() const {
return thread_persist_stats_ && thread_persist_stats_->IsRunning();
StatsDumpTestScheduler* DBImpl::TEST_GetStatsDumpScheduler() const {
return static_cast<StatsDumpTestScheduler*>(stats_dump_scheduler_);
}
#endif // !ROCKSDB_LITE
size_t DBImpl::TEST_EstimateInMemoryStatsHistorySize() const {
return EstimateInMemoryStatsHistorySize();
......
......@@ -6,17 +6,17 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_impl/db_impl.h"
#include <cinttypes>
#include "db/builder.h"
#include "db/db_impl/db_impl.h"
#include "db/error_handler.h"
#include "env/composite_env_wrapper.h"
#include "file/read_write_util.h"
#include "file/sst_file_manager_impl.h"
#include "file/writable_file_writer.h"
#include "monitoring/persistent_stats_history.h"
#include "monitoring/stats_dump_scheduler.h"
#include "options/options_helper.h"
#include "rocksdb/wal_filter.h"
#include "table/block_based/block_based_table_factory.h"
......@@ -1700,6 +1700,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
sfm->ReserveDiskBuffer(max_write_buffer_size,
impl->immutable_db_options_.db_paths[0].path);
}
#endif // !ROCKSDB_LITE
if (s.ok()) {
......@@ -1716,9 +1717,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
}
}
if (s.ok()) {
impl->StartTimedTasks();
}
if (!s.ok()) {
impl->StartStatsDumpScheduler();
} else {
for (auto* h : *handles) {
delete h;
}
......
......@@ -1147,4 +1147,28 @@ class DBTestBase : public testing::Test {
bool time_elapse_only_sleep_on_reopen_ = false;
};
class SafeMockTimeEnv : public MockTimeEnv {
public:
explicit SafeMockTimeEnv(Env* base) : MockTimeEnv(base) {
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
#if defined(OS_MACOSX) && !defined(NDEBUG)
// This is an alternate way (vs. SpecialEnv) of dealing with the fact
// that on some platforms, pthread_cond_timedwait does not appear to
// release the lock for other threads to operate if the deadline time
// is already passed. (TimedWait calls are currently a bad abstraction
// because the deadline parameter is usually computed from Env time,
// but is interpreted in real clock time.)
SyncPoint::GetInstance()->SetCallBack(
"InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
uint64_t time_us = *reinterpret_cast<uint64_t*>(arg);
if (time_us < this->RealNowMicros()) {
*reinterpret_cast<uint64_t*>(arg) = this->RealNowMicros() + 1000;
}
});
#endif // OS_MACOSX && !NDEBUG
SyncPoint::GetInstance()->EnableProcessing();
}
};
} // namespace ROCKSDB_NAMESPACE
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "monitoring/stats_dump_scheduler.h"
#include "db/db_impl/db_impl.h"
#include "util/cast_util.h"
#ifndef ROCKSDB_LITE
namespace ROCKSDB_NAMESPACE {
StatsDumpScheduler::StatsDumpScheduler(Env* env) {
timer = std::unique_ptr<Timer>(new Timer(env));
}
void StatsDumpScheduler::Register(DBImpl* dbi,
unsigned int stats_dump_period_sec,
unsigned int stats_persist_period_sec) {
static std::atomic<uint64_t> initial_delay(0);
if (stats_dump_period_sec > 0) {
timer->Start();
timer->Add([dbi]() { dbi->DumpStats(); }, GetTaskName(dbi, "dump_st"),
initial_delay.fetch_add(1) %
static_cast<uint64_t>(stats_dump_period_sec) *
kMicrosInSecond,
static_cast<uint64_t>(stats_dump_period_sec) * kMicrosInSecond);
}
if (stats_persist_period_sec > 0) {
timer->Start();
timer->Add(
[dbi]() { dbi->PersistStats(); }, GetTaskName(dbi, "pst_st"),
initial_delay.fetch_add(1) %
static_cast<uint64_t>(stats_persist_period_sec) * kMicrosInSecond,
static_cast<uint64_t>(stats_persist_period_sec) * kMicrosInSecond);
}
}
void StatsDumpScheduler::Unregister(DBImpl* dbi) {
timer->Cancel(GetTaskName(dbi, "dump_st"));
timer->Cancel(GetTaskName(dbi, "pst_st"));
if (!timer->HasPendingTask()) {
timer->Shutdown();
}
}
StatsDumpScheduler* StatsDumpScheduler::Default() {
// Always use the default Env for the scheduler, as we only use the NowMicros
// which is the same for all env.
// The Env could only be overridden in test.
static StatsDumpScheduler scheduler(Env::Default());
return &scheduler;
}
std::string StatsDumpScheduler::GetTaskName(DBImpl* dbi,
const std::string& func_name) {
std::string db_session_id;
dbi->GetDbSessionId(db_session_id);
return db_session_id + ":" + func_name;
}
#ifndef NDEBUG
// Get the static scheduler. For a new env, it needs to re-create the internal
// timer, so only re-create it when there's no running task. Otherwise, return
// the existing scheduler. Which means if the unittest needs to update MockEnv,
// Close all db instances and then re-open them.
StatsDumpTestScheduler* StatsDumpTestScheduler::Default(Env* env) {
static StatsDumpTestScheduler scheduler(env);
static port::Mutex mutex;
{
MutexLock l(&mutex);
if (scheduler.timer.get() != nullptr &&
scheduler.timer->TEST_GetPendingTaskNum() == 0) {
scheduler.timer->Shutdown();
scheduler.timer.reset(new Timer(env));
}
}
return &scheduler;
}
void StatsDumpTestScheduler::TEST_WaitForRun(
std::function<void()> callback) const {
if (timer != nullptr) {
timer->TEST_WaitForRun(callback);
}
}
size_t StatsDumpTestScheduler::TEST_GetValidTaskNum() const {
if (timer != nullptr) {
return timer->TEST_GetPendingTaskNum();
}
return 0;
}
StatsDumpTestScheduler::StatsDumpTestScheduler(Env* env)
: StatsDumpScheduler(env) {}
#endif // !NDEBUG
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include "db/db_impl/db_impl.h"
#include "util/timer.h"
namespace ROCKSDB_NAMESPACE {
// StatsDumpScheduler is a singleton object, which is scheduling/running
// DumpStats() and PersistStats() for all DB instances. All DB instances uses
// the same object from `Default()`.
// Internally, it uses a single threaded timer object to run the stats dump
// functions. Timer thread won't be started if there's no function needs to run,
// for example, option.stats_dump_period_sec and option.stats_persist_period_sec
// are set to 0.
class StatsDumpScheduler {
public:
static StatsDumpScheduler* Default();
StatsDumpScheduler() = delete;
StatsDumpScheduler(const StatsDumpScheduler&) = delete;
StatsDumpScheduler(StatsDumpScheduler&&) = delete;
StatsDumpScheduler& operator=(const StatsDumpScheduler&) = delete;
StatsDumpScheduler& operator=(StatsDumpScheduler&&) = delete;
void Register(DBImpl* dbi, unsigned int stats_dump_period_sec,
unsigned int stats_persist_period_sec);
void Unregister(DBImpl* dbi);
protected:
std::unique_ptr<Timer> timer;
explicit StatsDumpScheduler(Env* env);
private:
std::string GetTaskName(DBImpl* dbi, const std::string& func_name);
};
#ifndef NDEBUG
// StatsDumpTestScheduler is for unittest, which can specify the Env like
// SafeMockTimeEnv. It also contains functions for unittest.
class StatsDumpTestScheduler : public StatsDumpScheduler {
public:
static StatsDumpTestScheduler* Default(Env* env);
void TEST_WaitForRun(std::function<void()> callback) const;
size_t TEST_GetValidTaskNum() const;
private:
explicit StatsDumpTestScheduler(Env* env);
};
#endif // !NDEBUG
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "monitoring/stats_dump_scheduler.h"
#include "db/db_test_util.h"
namespace ROCKSDB_NAMESPACE {
#ifndef ROCKSDB_LITE
class StatsDumpSchedulerTest : public DBTestBase {
public:
StatsDumpSchedulerTest()
: DBTestBase("/stats_dump_scheduler_test"),
mock_env_(new SafeMockTimeEnv(Env::Default())) {}
protected:
std::unique_ptr<SafeMockTimeEnv> mock_env_;
void SetUp() override {
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::StartStatsDumpScheduler:Init", [&](void* arg) {
auto* stats_dump_scheduler_ptr =
reinterpret_cast<StatsDumpScheduler**>(arg);
*stats_dump_scheduler_ptr =
StatsDumpTestScheduler::Default(mock_env_.get());
});
}
};
TEST_F(StatsDumpSchedulerTest, Basic) {
constexpr int kPeriodSec = 5;
Close();
Options options;
options.stats_dump_period_sec = kPeriodSec;
options.stats_persist_period_sec = kPeriodSec;
options.create_if_missing = true;
int mock_time_sec = 0;
mock_env_->set_current_time(mock_time_sec);
options.env = mock_env_.get();
int dump_st_counter = 0;
SyncPoint::GetInstance()->SetCallBack("DBImpl::DumpStats:StartRunning",
[&](void*) { dump_st_counter++; });
int pst_st_counter = 0;
SyncPoint::GetInstance()->SetCallBack("DBImpl::PersistStats:StartRunning",
[&](void*) { pst_st_counter++; });
SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_dump_period_sec);
ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_persist_period_sec);
mock_time_sec += kPeriodSec - 1;
dbfull()->TEST_WaitForStatsDumpRun(
[&] { mock_env_->set_current_time(mock_time_sec); });
auto scheduler = dbfull()->TEST_GetStatsDumpScheduler();
ASSERT_NE(nullptr, scheduler);
ASSERT_EQ(2, scheduler->TEST_GetValidTaskNum());
ASSERT_EQ(1, dump_st_counter);
ASSERT_EQ(1, pst_st_counter);
mock_time_sec += kPeriodSec;
dbfull()->TEST_WaitForStatsDumpRun(
[&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(2, dump_st_counter);
ASSERT_EQ(2, pst_st_counter);
mock_time_sec += kPeriodSec;
dbfull()->TEST_WaitForStatsDumpRun(
[&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(3, dump_st_counter);
ASSERT_EQ(3, pst_st_counter);
// Disable scheduler with SetOption
ASSERT_OK(dbfull()->SetDBOptions(
{{"stats_dump_period_sec", "0"}, {"stats_persist_period_sec", "0"}}));
ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_dump_period_sec);
ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_persist_period_sec);
scheduler = dbfull()->TEST_GetStatsDumpScheduler();
ASSERT_EQ(0u, scheduler->TEST_GetValidTaskNum());
// Re-enable one task
ASSERT_OK(dbfull()->SetDBOptions({{"stats_dump_period_sec", "5"}}));
ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_dump_period_sec);
ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_persist_period_sec);
scheduler = dbfull()->TEST_GetStatsDumpScheduler();
ASSERT_NE(nullptr, scheduler);
ASSERT_EQ(1, scheduler->TEST_GetValidTaskNum());
dump_st_counter = 0;
mock_time_sec += kPeriodSec;
dbfull()->TEST_WaitForStatsDumpRun(
[&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(1, dump_st_counter);
Close();
}
TEST_F(StatsDumpSchedulerTest, MultiInstances) {
constexpr int kPeriodSec = 5;
const int kInstanceNum = 10;
Close();
Options options;
options.stats_dump_period_sec = kPeriodSec;
options.stats_persist_period_sec = kPeriodSec;
options.create_if_missing = true;
int mock_time_sec = 0;
mock_env_->set_current_time(mock_time_sec);
options.env = mock_env_.get();
int dump_st_counter = 0;
SyncPoint::GetInstance()->SetCallBack("DBImpl::DumpStats:2",
[&](void*) { dump_st_counter++; });
int pst_st_counter = 0;
SyncPoint::GetInstance()->SetCallBack("DBImpl::PersistStats:StartRunning",
[&](void*) { pst_st_counter++; });
SyncPoint::GetInstance()->EnableProcessing();
auto dbs = std::vector<DB*>(kInstanceNum);
for (int i = 0; i < kInstanceNum; i++) {
ASSERT_OK(
DB::Open(options, test::PerThreadDBPath(std::to_string(i)), &(dbs[i])));
}
auto dbi = static_cast_with_check<DBImpl>(dbs[kInstanceNum - 1]);
auto scheduler = dbi->TEST_GetStatsDumpScheduler();
ASSERT_EQ(kInstanceNum * 2, scheduler->TEST_GetValidTaskNum());
int expected_run = kInstanceNum;
mock_time_sec += kPeriodSec - 1;
dbi->TEST_WaitForStatsDumpRun(
[&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(expected_run, dump_st_counter);
ASSERT_EQ(expected_run, pst_st_counter);
expected_run += kInstanceNum;
mock_time_sec += kPeriodSec;
dbi->TEST_WaitForStatsDumpRun(
[&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(expected_run, dump_st_counter);
ASSERT_EQ(expected_run, pst_st_counter);
expected_run += kInstanceNum;
mock_time_sec += kPeriodSec;
dbi->TEST_WaitForStatsDumpRun(
[&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(expected_run, dump_st_counter);
ASSERT_EQ(expected_run, pst_st_counter);
int half = kInstanceNum / 2;
for (int i = 0; i < half; i++) {
delete dbs[i];
}
expected_run += (kInstanceNum - half) * 2;
mock_time_sec += kPeriodSec;
dbi->TEST_WaitForStatsDumpRun(
[&] { mock_env_->set_current_time(mock_time_sec); });
mock_time_sec += kPeriodSec;
dbi->TEST_WaitForStatsDumpRun(
[&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(expected_run, dump_st_counter);
ASSERT_EQ(expected_run, pst_st_counter);
for (int i = half; i < kInstanceNum; i++) {
dbs[i]->Close();
delete dbs[i];
}
}
TEST_F(StatsDumpSchedulerTest, MultiEnv) {
constexpr int kDumpPeriodSec = 5;
constexpr int kPersistPeriodSec = 10;
Close();
Options options1;
options1.stats_dump_period_sec = kDumpPeriodSec;
options1.stats_persist_period_sec = kPersistPeriodSec;
options1.create_if_missing = true;
mock_env_->set_current_time(0);
options1.env = mock_env_.get();
Reopen(options1);
std::unique_ptr<MockTimeEnv> mock_env2(new MockTimeEnv(Env::Default()));
Options options2;
options2.stats_dump_period_sec = kDumpPeriodSec;
options2.stats_persist_period_sec = kPersistPeriodSec;
options2.create_if_missing = true;
mock_env2->set_current_time(0);
options1.env = mock_env2.get();
std::string dbname = test::PerThreadDBPath("multi_env_test");
DB* db;
ASSERT_OK(DB::Open(options2, dbname, &db));
DBImpl* dbi = static_cast_with_check<DBImpl>(db);
ASSERT_EQ(dbi->TEST_GetStatsDumpScheduler(),
dbfull()->TEST_GetStatsDumpScheduler());
db->Close();
delete db;
Close();
}
#endif // !ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
此差异已折叠。
......@@ -115,6 +115,7 @@ LIB_SOURCES = \
monitoring/perf_level.cc \
monitoring/persistent_stats_history.cc \
monitoring/statistics.cc \
monitoring/stats_dump_scheduler.cc \
monitoring/thread_status_impl.cc \
monitoring/thread_status_updater.cc \
monitoring/thread_status_updater_debug.cc \
......@@ -433,6 +434,7 @@ TEST_MAIN_SOURCES = \
monitoring/histogram_test.cc \
monitoring/iostats_context_test.cc \
monitoring/statistics_test.cc \
monitoring/stats_dump_scheduler_test.cc \
monitoring/stats_history_test.cc \
options/options_settable_test.cc \
options/options_test.cc \
......
......@@ -42,23 +42,31 @@ class Timer {
running_(false),
executing_task_(false) {}
~Timer() {}
// Add a new function. If the fn_name already exists, overriding it,
// regardless if the function is pending removed (invalid) or not.
// repeat_every_us == 0 means do not repeat
void Add(std::function<void()> fn,
const std::string& fn_name,
uint64_t start_after_us,
uint64_t repeat_every_us) {
std::unique_ptr<FunctionInfo> fn_info(new FunctionInfo(
std::move(fn),
fn_name,
env_->NowMicros() + start_after_us,
repeat_every_us));
InstrumentedMutexLock l(&mutex_);
heap_.push(fn_info.get());
map_.emplace(std::make_pair(fn_name, std::move(fn_info)));
cond_var_.Signal();
std::unique_ptr<FunctionInfo> fn_info(
new FunctionInfo(std::move(fn), fn_name,
env_->NowMicros() + start_after_us, repeat_every_us));
{
InstrumentedMutexLock l(&mutex_);
auto it = map_.find(fn_name);
if (it == map_.end()) {
heap_.push(fn_info.get());
map_.emplace(std::make_pair(fn_name, std::move(fn_info)));
} else {
// If it already exists, overriding it.
it->second->fn = std::move(fn_info->fn);
it->second->valid = true;
it->second->next_run_time_us = env_->NowMicros() + start_after_us;
it->second->repeat_every_us = repeat_every_us;
}
}
cond_var_.SignalAll();
}
void Cancel(const std::string& fn_name) {
......@@ -119,6 +127,45 @@ class Timer {
return true;
}
bool HasPendingTask() const {
InstrumentedMutexLock l(&mutex_);
for (auto it = map_.begin(); it != map_.end(); it++) {
if (it->second->IsValid()) {
return true;
}
}
return false;
}
#ifndef NDEBUG
void TEST_WaitForRun(std::function<void()> callback = nullptr) {
InstrumentedMutexLock l(&mutex_);
while (!heap_.empty() &&
heap_.top()->next_run_time_us <= env_->NowMicros()) {
cond_var_.TimedWait(env_->NowMicros() + 1000);
}
if (callback != nullptr) {
callback();
}
cond_var_.SignalAll();
do {
cond_var_.TimedWait(env_->NowMicros() + 1000);
} while (!heap_.empty() &&
heap_.top()->next_run_time_us <= env_->NowMicros());
}
size_t TEST_GetPendingTaskNum() const {
InstrumentedMutexLock l(&mutex_);
size_t ret = 0;
for (auto it = map_.begin(); it != map_.end(); it++) {
if (it->second->IsValid()) {
ret++;
}
}
return ret;
}
#endif // NDEBUG
private:
void Run() {
......@@ -142,10 +189,13 @@ class Timer {
}
if (current_fn->next_run_time_us <= env_->NowMicros()) {
// make a copy of the function so it won't be changed after
// mutex_.unlock.
std::function<void()> fn = current_fn->fn;
executing_task_ = true;
mutex_.Unlock();
// Execute the work
current_fn->fn();
fn();
mutex_.Lock();
executing_task_ = false;
cond_var_.SignalAll();
......@@ -243,7 +293,7 @@ class Timer {
Env* const env_;
// This mutex controls both the heap_ and the map_. It needs to be held for
// making any changes in them.
InstrumentedMutex mutex_;
mutable InstrumentedMutex mutex_;
InstrumentedCondVar cond_var_;
std::unique_ptr<port::Thread> thread_;
bool running_;
......
......@@ -356,6 +356,39 @@ TEST_F(TimerTest, ShutdownRunningTask) {
delete value;
}
TEST_F(TimerTest, AddSameFuncNameTest) {
mock_env_->set_current_time(0);
Timer timer(mock_env_.get());
ASSERT_TRUE(timer.Start());
int func_counter1 = 0;
timer.Add([&] { func_counter1++; }, "duplicated_func", 1 * kSecond,
5 * kSecond);
int func2_counter = 0;
timer.Add([&] { func2_counter++; }, "func2", 1 * kSecond, 4 * kSecond);
// New function with the same name should override the existing one
int func_counter2 = 0;
timer.Add([&] { func_counter2++; }, "duplicated_func", 1 * kSecond,
5 * kSecond);
timer.TEST_WaitForRun([&] { mock_env_->set_current_time(1); });
ASSERT_EQ(func_counter1, 0);
ASSERT_EQ(func2_counter, 1);
ASSERT_EQ(func_counter2, 1);
timer.TEST_WaitForRun([&] { mock_env_->set_current_time(6); });
ASSERT_EQ(func_counter1, 0);
ASSERT_EQ(func2_counter, 2);
ASSERT_EQ(func_counter2, 2);
ASSERT_TRUE(timer.Shutdown());
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册