提交 7646cf66 编写于 作者: W wangyi.ywq 提交者: 奏之章

[cherry-pick] merge cherry-pick

上级 fb86a227
......@@ -604,6 +604,7 @@ set(SOURCES
monitoring/perf_context.cc
monitoring/perf_level.cc
monitoring/statistics.cc
monitoring/stats_dump_scheduler.cc
monitoring/thread_status_impl.cc
monitoring/thread_status_updater.cc
monitoring/thread_status_util.cc
......@@ -1010,7 +1011,6 @@ if(WITH_TESTS)
table/block_based_table_builder_ttl_test.cc
util/timer_test.cc
)
if(WITH_LIBRADOS)
list(APPEND TESTS utilities/env_librados_test.cc)
endif()
......
# Rocksdb Change Log
## 6.0.0 (2/19/2019)
### New Features
* Enabled checkpoint on readonly db (DBImplReadOnly).
......
此差异已折叠。
......@@ -159,6 +159,7 @@ cpp_library(
"monitoring/perf_context.cc",
"monitoring/perf_level.cc",
"monitoring/statistics.cc",
"monitoring/stats_dump_scheduler.cc",
"monitoring/thread_status_impl.cc",
"monitoring/thread_status_updater.cc",
"monitoring/thread_status_updater_debug.cc",
......@@ -982,6 +983,22 @@ ROCKS_TESTS = [
"statistics_test",
"monitoring/statistics_test.cc",
"serial",
[],
[],
],
[
"stats_dump_scheduler_test",
"monitoring/stats_dump_scheduler_test.cc",
"serial",
[],
[],
],
[
"stats_history_test",
"monitoring/stats_history_test.cc",
"serial",
[],
[],
],
[
"stringappend_test",
......
......@@ -208,7 +208,7 @@ Status CompactedDBImpl::Open(const Options& options, const std::string& dbname,
std::unique_ptr<CompactedDBImpl> db(new CompactedDBImpl(db_options, dbname));
Status s = db->Init(options);
if (s.ok()) {
db->StartTimedTasks();
db->StartStatsDumpScheduler();
ROCKS_LOG_INFO(db->immutable_db_options_.info_log,
"Opened the db as fully compacted mode");
LogFlush(db->immutable_db_options_.info_log);
......
......@@ -58,6 +58,7 @@
#include "memtable/hash_skiplist_rep.h"
#include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/stats_dump_scheduler.h"
#include "monitoring/thread_status_updater.h"
#include "monitoring/thread_status_util.h"
#include "options/cf_options.h"
......@@ -256,6 +257,9 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
bg_compaction_paused_(0),
refitting_level_(false),
opened_successfully_(false),
#ifndef ROCKSDB_LITE
stats_dump_scheduler_(nullptr),
#endif // ROCKSDB_LITE
two_write_queues_(options.two_write_queues),
manual_wal_flush_(options.manual_wal_flush),
seq_per_batch_(seq_per_batch),
......@@ -503,14 +507,12 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Shutdown: canceling all background work");
if (thread_dump_stats_ != nullptr) {
thread_dump_stats_->cancel();
thread_dump_stats_.reset();
}
if (thread_persist_stats_ != nullptr) {
thread_persist_stats_->cancel();
thread_persist_stats_.reset();
#ifndef ROCKSDB_LITE
if (stats_dump_scheduler_ != nullptr) {
stats_dump_scheduler_->Unregister(this);
}
#endif // !ROCKSDB_LITE
InstrumentedMutexLock l(&mutex_);
if (!shutting_down_.load(std::memory_order_acquire) &&
has_unpersisted_data_.load(std::memory_order_relaxed) &&
......@@ -759,28 +761,19 @@ void DBImpl::PrintStatistics() {
}
}
void DBImpl::StartTimedTasks() {
unsigned int stats_dump_period_sec = 0;
unsigned int stats_persist_period_sec = 0;
void DBImpl::StartStatsDumpScheduler() {
#ifndef ROCKSDB_LITE
{
InstrumentedMutexLock l(&mutex_);
stats_dump_period_sec = mutable_db_options_.stats_dump_period_sec;
if (stats_dump_period_sec > 0) {
if (!thread_dump_stats_) {
thread_dump_stats_.reset(new TERARKDB_NAMESPACE::RepeatableThread(
[this]() { DBImpl::DumpStats(); }, "dump_st", env_,
stats_dump_period_sec * 1000000));
}
}
stats_persist_period_sec = mutable_db_options_.stats_persist_period_sec;
if (stats_persist_period_sec > 0) {
if (!thread_persist_stats_) {
thread_persist_stats_.reset(new rocksdb::RepeatableThread(
[this]() { DBImpl::PersistStats(); }, "pst_st", env_,
stats_persist_period_sec * 1000000));
}
}
stats_dump_scheduler_ = StatsDumpScheduler::Default();
TEST_SYNC_POINT_CALLBACK("DBImpl::StartStatsDumpScheduler:Init",
&stats_dump_scheduler_);
}
stats_dump_scheduler_->Register(this,
mutable_db_options_.stats_dump_period_sec,
mutable_db_options_.stats_persist_period_sec);
#endif // !ROCKSDB_LITE
}
// esitmate the total size of stats_history_
......@@ -806,7 +799,8 @@ void DBImpl::PersistStats() {
if (shutdown_initiated_) {
return;
}
uint64_t now_micros = env_->NowMicros();
TEST_SYNC_POINT("DBImpl::PersistStats:StartRunning");
uint64_t now_seconds = env_->NowMicros() / kMicrosInSecond;
Statistics* statistics = immutable_db_options_.statistics.get();
if (!statistics) {
return;
......@@ -845,7 +839,8 @@ void DBImpl::PersistStats() {
purge_needed = EstiamteStatsHistorySize() > stats_history_size_limit;
}
}
// TODO: persist stats to disk
TEST_SYNC_POINT("DBImpl::PersistStats:End");
#endif // !ROCKSDB_LITE
}
......@@ -870,7 +865,8 @@ bool DBImpl::FindStatsByTime(uint64_t start_time, uint64_t end_time,
}
}
Status DBImpl::GetStatsHistory(uint64_t start_time, uint64_t end_time,
Status DBImpl::GetStatsHistory(
uint64_t start_time, uint64_t end_time,
std::unique_ptr<StatsHistoryIterator>* stats_iterator) {
if (!stats_iterator) {
return Status::InvalidArgument("stats_iterator not preallocated.");
......@@ -891,19 +887,18 @@ void DBImpl::ScheduleGCTTL() {
};
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Start ScheduleGCTTL");
for (auto cfd : *versions_->GetColumnFamilySet()) {
if(cfd->GetLatestCFOptions().ttl_extractor_factory == nullptr) continue;
if (cfd->GetLatestCFOptions().ttl_extractor_factory == nullptr) continue;
if (cfd->initialized()) {
VersionStorageInfo* vsi = cfd->current()->storage_info();
for (int l = 0; l < vsi->num_levels(); l++) {
for (auto sst : vsi->LevelFiles(l)) {
if (sst->marked_for_compaction) marked_count++;
if (!sst->marked_for_compaction)
sst->marked_for_compaction = should_marked_for_compacted(
sst->prop.ratio_expire_time, sst->prop.scan_gap_expire_time,
nowSeconds);
if (sst->marked_for_compaction) {
std::cout << sst->marked_for_compaction << std::endl ;
std::cout << sst->marked_for_compaction << std::endl;
TEST_SYNC_POINT("DBImpl:ScheduleGCTTL-mark");
mark_count++;
}
......@@ -911,8 +906,8 @@ void DBImpl::ScheduleGCTTL() {
}
}
}
ROCKS_LOG_INFO(immutable_db_options_.info_log, "marked for compact SST: %d,%d",
marked_count,mark_count);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"marked for compact SST: %d,%d", marked_count, mark_count);
if (mark_count > 0) {
InstrumentedMutexLock l(&mutex_);
MaybeScheduleFlushOrCompaction();
......@@ -934,6 +929,7 @@ void DBImpl::DumpStats() {
}
std::vector<ColumnFamilyData*> cfd_vec;
std::string stats;
TEST_SYNC_POINT("DBImpl::DumpStats:StartRunning");
{
InstrumentedMutexLock l(&mutex_);
for (auto cfd : *versions_->GetColumnFamilySet()) {
......@@ -1124,34 +1120,22 @@ Status DBImpl::SetDBOptions(
MaybeScheduleFlushOrCompaction();
}
if (new_options.stats_dump_period_sec !=
mutable_db_options_.stats_dump_period_sec) {
if (thread_dump_stats_) {
mutable_db_options_.stats_dump_period_sec ||
new_options.stats_persist_period_sec !=
mutable_db_options_.stats_persist_period_sec) {
if (stats_dump_scheduler_) {
mutex_.Unlock();
thread_dump_stats_->cancel();
stats_dump_scheduler_->Unregister(this);
mutex_.Lock();
}
if (new_options.stats_dump_period_sec > 0) {
thread_dump_stats_.reset(new TERARKDB_NAMESPACE::RepeatableThread(
[this]() { DBImpl::DumpStats(); }, "dump_st", env_,
new_options.stats_dump_period_sec * 1000000));
} else {
thread_dump_stats_.reset();
}
}
if (new_options.stats_persist_period_sec !=
mutable_db_options_.stats_persist_period_sec) {
if (thread_persist_stats_) {
if (new_options.stats_dump_period_sec > 0 ||
new_options.stats_persist_period_sec > 0) {
mutex_.Unlock();
thread_persist_stats_->cancel();
stats_dump_scheduler_->Register(this,
new_options.stats_dump_period_sec,
new_options.stats_persist_period_sec);
mutex_.Lock();
}
if (new_options.stats_persist_period_sec > 0) {
thread_persist_stats_.reset(new rocksdb::RepeatableThread(
[this]() { DBImpl::PersistStats(); }, "pst_st", env_,
new_options.stats_persist_period_sec * 1000000));
} else {
thread_persist_stats_.reset();
}
}
write_controller_.set_max_delayed_write_rate(
new_options.delayed_write_rate);
......@@ -4155,7 +4139,8 @@ Status DBImpl::VerifyChecksum() {
const auto& fd = vstorage->LevelFiles(i)[j]->fd;
std::string fname = TableFileName(cfd->ioptions()->cf_paths,
fd.GetNumber(), fd.GetPathId());
s = TERARKDB_NAMESPACE::VerifySstFileChecksum(opts, env_options_, fname);
s = TERARKDB_NAMESPACE::VerifySstFileChecksum(opts, env_options_,
fname);
}
}
if (!s.ok()) {
......
......@@ -68,6 +68,12 @@ class Arena;
class ArenaWrappedDBIter;
class InMemoryStatsHistoryIterator;
class MemTable;
class PersistentStatsHistoryIterator;
class StatsDumpScheduler;
#ifndef NDEBUG
class StatsDumpTestScheduler;
#endif // !NDEBUG
class TableCache;
class Version;
class VersionEdit;
......@@ -752,6 +758,35 @@ class DBImpl : public DB {
uint64_t* new_time,
std::map<std::string, uint64_t>* stats_map);
// Print information of all tombstones of all iterators to the std::string
// This is only used by ldb. The output might be capped. Tombstones
// printed out are not guaranteed to be in any order.
Status TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family,
int max_entries_to_print,
std::string* out_str);
#ifndef NDEBUG
Status TEST_FlushMemTable(ColumnFamilyData* cfd,
const FlushOptions& flush_opts);
// Flush (multiple) ColumnFamilyData without using ColumnFamilyHandle. This
// is because in certain cases, we can flush column families, wait for the
// flush to complete, but delete the column family handle before the wait
// finishes. For example in CompactRange.
Status TEST_AtomicFlushMemTables(const autovector<ColumnFamilyData*>& cfds,
const FlushOptions& flush_opts);
void TEST_WaitForStatsDumpRun(std::function<void()> callback) const;
size_t TEST_EstimateInMemoryStatsHistorySize() const;
VersionSet* TEST_GetVersionSet() const { return versions_.get(); }
#ifndef ROCKSDB_LITE
StatsDumpTestScheduler* TEST_GetStatsDumpScheduler() const;
#endif // !ROCKSDB_LITE
#endif // NDEBUG
protected:
Env* const env_;
const std::string dbname_;
......@@ -1159,7 +1194,7 @@ class DBImpl : public DB {
bool* sfm_bookkeeping, LogBuffer* log_buffer);
// Schedule background tasks
void StartTimedTasks();
void StartStatsDumpScheduler();
void PrintStatistics();
......@@ -1616,14 +1651,6 @@ class DBImpl : public DB {
// Only to be set during initialization
std::unique_ptr<PreReleaseCallback> recoverable_state_pre_release_callback_;
// handle for scheduling stats dumping at fixed intervals
// REQUIRES: mutex locked
std::unique_ptr<TERARKDB_NAMESPACE::RepeatableThread> thread_dump_stats_;
// handle for scheduling stats snapshoting at fixed intervals
// REQUIRES: mutex locked
std::unique_ptr<rocksdb::RepeatableThread> thread_persist_stats_;
// No copying allowed
DBImpl(const DBImpl&);
void operator=(const DBImpl&);
......@@ -1666,6 +1693,12 @@ class DBImpl : public DB {
size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
Env::WriteLifeTimeHint CalculateWALWriteHint() { return Env::WLTH_SHORT; }
#ifndef ROCKSDB_LITE
// Scheduler to run DumpStats() and PersistStats(). Currently, it always use
// a global instance from StatsDumpScheduler::Default(). Only in unittest, it
// can be overrided by StatsDumpTestSchduler.
StatsDumpScheduler* stats_dump_scheduler_;
#endif
// When set, we use a separate queue for writes that dont write to memtable.
// In 2PC these are the writes at Prepare phase.
......
......@@ -11,6 +11,7 @@
#include "db/db_impl.h"
#include "db/error_handler.h"
#include "monitoring/stats_dump_scheduler.h"
#include "monitoring/thread_status_updater.h"
#include "rocksdb/terark_namespace.h"
......@@ -251,21 +252,18 @@ size_t DBImpl::TEST_GetWalPreallocateBlockSize(
return GetWalPreallocateBlockSize(write_buffer_size);
}
void DBImpl::TEST_WaitForDumpStatsRun(std::function<void()> callback) const {
if (thread_dump_stats_ != nullptr) {
thread_dump_stats_->TEST_WaitForRun(callback);
#ifndef ROCKSDB_LITE
void DBImpl::TEST_WaitForStatsDumpRun(std::function<void()> callback) const {
if (stats_dump_scheduler_ != nullptr) {
static_cast<StatsDumpTestScheduler*>(stats_dump_scheduler_)
->TEST_WaitForRun(callback);
}
}
void DBImpl::TEST_WaitForPersistStatsRun(std::function<void()> callback) const {
if (thread_persist_stats_ != nullptr) {
thread_persist_stats_->TEST_WaitForRun(callback);
}
}
bool DBImpl::TEST_IsPersistentStatsEnabled() const {
return thread_persist_stats_ && thread_persist_stats_->IsRunning();
StatsDumpTestScheduler* DBImpl::TEST_GetStatsDumpScheduler() const {
return static_cast<StatsDumpTestScheduler*>(stats_dump_scheduler_);
}
#endif // !ROCKSDB_LITE
size_t DBImpl::TEST_EstiamteStatsHistorySize() const {
return EstiamteStatsHistorySize();
......
......@@ -1381,6 +1381,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
sfm->ReserveDiskBuffer(max_write_buffer_size,
impl->immutable_db_options_.db_paths[0].path);
}
#endif // !ROCKSDB_LITE
if (s.ok()) {
......@@ -1396,9 +1397,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
}
}
if (s.ok()) {
impl->StartTimedTasks();
}
if (!s.ok()) {
impl->StartStatsDumpScheduler();
} else {
for (auto* h : *handles) {
delete h;
}
......
......@@ -981,4 +981,28 @@ class DBTestBase : public testing::Test {
}
};
class SafeMockTimeEnv : public MockTimeEnv {
public:
explicit SafeMockTimeEnv(Env* base) : MockTimeEnv(base) {
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
#if defined(OS_MACOSX) && !defined(NDEBUG)
// This is an alternate way (vs. SpecialEnv) of dealing with the fact
// that on some platforms, pthread_cond_timedwait does not appear to
// release the lock for other threads to operate if the deadline time
// is already passed. (TimedWait calls are currently a bad abstraction
// because the deadline parameter is usually computed from Env time,
// but is interpreted in real clock time.)
SyncPoint::GetInstance()->SetCallBack(
"InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
uint64_t time_us = *reinterpret_cast<uint64_t*>(arg);
if (time_us < this->RealNowMicros()) {
*reinterpret_cast<uint64_t*>(arg) = this->RealNowMicros() + 1000;
}
});
#endif // OS_MACOSX && !NDEBUG
SyncPoint::GetInstance()->EnableProcessing();
}
};
} // namespace TERARKDB_NAMESPACE
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "monitoring/stats_dump_scheduler.h"
#include "db/db_impl/db_impl.h"
#include "util/cast_util.h"
#ifndef ROCKSDB_LITE
namespace ROCKSDB_NAMESPACE {
StatsDumpScheduler::StatsDumpScheduler(Env* env) {
timer = std::unique_ptr<Timer>(new Timer(env));
}
void StatsDumpScheduler::Register(DBImpl* dbi,
unsigned int stats_dump_period_sec,
unsigned int stats_persist_period_sec) {
static std::atomic<uint64_t> initial_delay(0);
if (stats_dump_period_sec > 0) {
timer->Start();
timer->Add([dbi]() { dbi->DumpStats(); }, GetTaskName(dbi, "dump_st"),
initial_delay.fetch_add(1) %
static_cast<uint64_t>(stats_dump_period_sec) *
kMicrosInSecond,
static_cast<uint64_t>(stats_dump_period_sec) * kMicrosInSecond);
}
if (stats_persist_period_sec > 0) {
timer->Start();
timer->Add(
[dbi]() { dbi->PersistStats(); }, GetTaskName(dbi, "pst_st"),
initial_delay.fetch_add(1) %
static_cast<uint64_t>(stats_persist_period_sec) * kMicrosInSecond,
static_cast<uint64_t>(stats_persist_period_sec) * kMicrosInSecond);
}
}
void StatsDumpScheduler::Unregister(DBImpl* dbi) {
timer->Cancel(GetTaskName(dbi, "dump_st"));
timer->Cancel(GetTaskName(dbi, "pst_st"));
if (!timer->HasPendingTask()) {
timer->Shutdown();
}
}
StatsDumpScheduler* StatsDumpScheduler::Default() {
// Always use the default Env for the scheduler, as we only use the NowMicros
// which is the same for all env.
// The Env could only be overridden in test.
static StatsDumpScheduler scheduler(Env::Default());
return &scheduler;
}
std::string StatsDumpScheduler::GetTaskName(DBImpl* dbi,
const std::string& func_name) {
std::string db_session_id;
dbi->GetDbSessionId(db_session_id);
return db_session_id + ":" + func_name;
}
#ifndef NDEBUG
// Get the static scheduler. For a new env, it needs to re-create the internal
// timer, so only re-create it when there's no running task. Otherwise, return
// the existing scheduler. Which means if the unittest needs to update MockEnv,
// Close all db instances and then re-open them.
StatsDumpTestScheduler* StatsDumpTestScheduler::Default(Env* env) {
static StatsDumpTestScheduler scheduler(env);
static port::Mutex mutex;
{
MutexLock l(&mutex);
if (scheduler.timer.get() != nullptr &&
scheduler.timer->TEST_GetPendingTaskNum() == 0) {
scheduler.timer->Shutdown();
scheduler.timer.reset(new Timer(env));
}
}
return &scheduler;
}
void StatsDumpTestScheduler::TEST_WaitForRun(
std::function<void()> callback) const {
if (timer != nullptr) {
timer->TEST_WaitForRun(callback);
}
}
size_t StatsDumpTestScheduler::TEST_GetValidTaskNum() const {
if (timer != nullptr) {
return timer->TEST_GetPendingTaskNum();
}
return 0;
}
StatsDumpTestScheduler::StatsDumpTestScheduler(Env* env)
: StatsDumpScheduler(env) {}
#endif // !NDEBUG
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include "db/db_impl.h"
#include "util/timer.h"
namespace rocksdb {
// StatsDumpScheduler is a singleton object, which is scheduling/running
// DumpStats() and PersistStats() for all DB instances. All DB instances uses
// the same object from `Default()`.
// Internally, it uses a single threaded timer object to run the stats dump
// functions. Timer thread won't be started if there's no function needs to run,
// for example, option.stats_dump_period_sec and option.stats_persist_period_sec
// are set to 0.
class StatsDumpScheduler {
public:
static StatsDumpScheduler* Default();
StatsDumpScheduler() = delete;
StatsDumpScheduler(const StatsDumpScheduler&) = delete;
StatsDumpScheduler(StatsDumpScheduler&&) = delete;
StatsDumpScheduler& operator=(const StatsDumpScheduler&) = delete;
StatsDumpScheduler& operator=(StatsDumpScheduler&&) = delete;
void Register(DBImpl* dbi, unsigned int stats_dump_period_sec,
unsigned int stats_persist_period_sec);
void Unregister(DBImpl* dbi);
protected:
std::unique_ptr<Timer> timer;
explicit StatsDumpScheduler(Env* env);
private:
std::string GetTaskName(DBImpl* dbi, const std::string& func_name);
};
#ifndef NDEBUG
// StatsDumpTestScheduler is for unittest, which can specify the Env like
// SafeMockTimeEnv. It also contains functions for unittest.
class StatsDumpTestScheduler : public StatsDumpScheduler {
public:
static StatsDumpTestScheduler* Default(Env* env);
void TEST_WaitForRun(std::function<void()> callback) const;
size_t TEST_GetValidTaskNum() const;
private:
explicit StatsDumpTestScheduler(Env* env);
};
#endif // !NDEBUG
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "monitoring/stats_dump_scheduler.h"
#include "db/db_test_util.h"
namespace ROCKSDB_NAMESPACE {
#ifndef ROCKSDB_LITE
class StatsDumpSchedulerTest : public DBTestBase {
public:
StatsDumpSchedulerTest()
: DBTestBase("/stats_dump_scheduler_test"),
mock_env_(new SafeMockTimeEnv(Env::Default())) {}
protected:
std::unique_ptr<SafeMockTimeEnv> mock_env_;
void SetUp() override {
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::StartStatsDumpScheduler:Init", [&](void* arg) {
auto* stats_dump_scheduler_ptr =
reinterpret_cast<StatsDumpScheduler**>(arg);
*stats_dump_scheduler_ptr =
StatsDumpTestScheduler::Default(mock_env_.get());
});
}
};
TEST_F(StatsDumpSchedulerTest, Basic) {
constexpr int kPeriodSec = 5;
Close();
Options options;
options.stats_dump_period_sec = kPeriodSec;
options.stats_persist_period_sec = kPeriodSec;
options.create_if_missing = true;
int mock_time_sec = 0;
mock_env_->set_current_time(mock_time_sec);
options.env = mock_env_.get();
int dump_st_counter = 0;
SyncPoint::GetInstance()->SetCallBack("DBImpl::DumpStats:StartRunning",
[&](void*) { dump_st_counter++; });
int pst_st_counter = 0;
SyncPoint::GetInstance()->SetCallBack("DBImpl::PersistStats:StartRunning",
[&](void*) { pst_st_counter++; });
SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_dump_period_sec);
ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_persist_period_sec);
mock_time_sec += kPeriodSec - 1;
dbfull()->TEST_WaitForStatsDumpRun(
[&] { mock_env_->set_current_time(mock_time_sec); });
auto scheduler = dbfull()->TEST_GetStatsDumpScheduler();
ASSERT_NE(nullptr, scheduler);
ASSERT_EQ(2, scheduler->TEST_GetValidTaskNum());
ASSERT_EQ(1, dump_st_counter);
ASSERT_EQ(1, pst_st_counter);
mock_time_sec += kPeriodSec;
dbfull()->TEST_WaitForStatsDumpRun(
[&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(2, dump_st_counter);
ASSERT_EQ(2, pst_st_counter);
mock_time_sec += kPeriodSec;
dbfull()->TEST_WaitForStatsDumpRun(
[&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(3, dump_st_counter);
ASSERT_EQ(3, pst_st_counter);
// Disable scheduler with SetOption
ASSERT_OK(dbfull()->SetDBOptions(
{{"stats_dump_period_sec", "0"}, {"stats_persist_period_sec", "0"}}));
ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_dump_period_sec);
ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_persist_period_sec);
scheduler = dbfull()->TEST_GetStatsDumpScheduler();
ASSERT_EQ(0u, scheduler->TEST_GetValidTaskNum());
// Re-enable one task
ASSERT_OK(dbfull()->SetDBOptions({{"stats_dump_period_sec", "5"}}));
ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_dump_period_sec);
ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_persist_period_sec);
scheduler = dbfull()->TEST_GetStatsDumpScheduler();
ASSERT_NE(nullptr, scheduler);
ASSERT_EQ(1, scheduler->TEST_GetValidTaskNum());
dump_st_counter = 0;
mock_time_sec += kPeriodSec;
dbfull()->TEST_WaitForStatsDumpRun(
[&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(1, dump_st_counter);
Close();
}
TEST_F(StatsDumpSchedulerTest, MultiInstances) {
constexpr int kPeriodSec = 5;
const int kInstanceNum = 10;
Close();
Options options;
options.stats_dump_period_sec = kPeriodSec;
options.stats_persist_period_sec = kPeriodSec;
options.create_if_missing = true;
int mock_time_sec = 0;
mock_env_->set_current_time(mock_time_sec);
options.env = mock_env_.get();
int dump_st_counter = 0;
SyncPoint::GetInstance()->SetCallBack("DBImpl::DumpStats:2",
[&](void*) { dump_st_counter++; });
int pst_st_counter = 0;
SyncPoint::GetInstance()->SetCallBack("DBImpl::PersistStats:StartRunning",
[&](void*) { pst_st_counter++; });
SyncPoint::GetInstance()->EnableProcessing();
auto dbs = std::vector<DB*>(kInstanceNum);
for (int i = 0; i < kInstanceNum; i++) {
ASSERT_OK(
DB::Open(options, test::PerThreadDBPath(std::to_string(i)), &(dbs[i])));
}
auto dbi = static_cast_with_check<DBImpl>(dbs[kInstanceNum - 1]);
auto scheduler = dbi->TEST_GetStatsDumpScheduler();
ASSERT_EQ(kInstanceNum * 2, scheduler->TEST_GetValidTaskNum());
int expected_run = kInstanceNum;
mock_time_sec += kPeriodSec - 1;
dbi->TEST_WaitForStatsDumpRun(
[&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(expected_run, dump_st_counter);
ASSERT_EQ(expected_run, pst_st_counter);
expected_run += kInstanceNum;
mock_time_sec += kPeriodSec;
dbi->TEST_WaitForStatsDumpRun(
[&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(expected_run, dump_st_counter);
ASSERT_EQ(expected_run, pst_st_counter);
expected_run += kInstanceNum;
mock_time_sec += kPeriodSec;
dbi->TEST_WaitForStatsDumpRun(
[&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(expected_run, dump_st_counter);
ASSERT_EQ(expected_run, pst_st_counter);
int half = kInstanceNum / 2;
for (int i = 0; i < half; i++) {
delete dbs[i];
}
expected_run += (kInstanceNum - half) * 2;
mock_time_sec += kPeriodSec;
dbi->TEST_WaitForStatsDumpRun(
[&] { mock_env_->set_current_time(mock_time_sec); });
mock_time_sec += kPeriodSec;
dbi->TEST_WaitForStatsDumpRun(
[&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(expected_run, dump_st_counter);
ASSERT_EQ(expected_run, pst_st_counter);
for (int i = half; i < kInstanceNum; i++) {
dbs[i]->Close();
delete dbs[i];
}
}
TEST_F(StatsDumpSchedulerTest, MultiEnv) {
constexpr int kDumpPeriodSec = 5;
constexpr int kPersistPeriodSec = 10;
Close();
Options options1;
options1.stats_dump_period_sec = kDumpPeriodSec;
options1.stats_persist_period_sec = kPersistPeriodSec;
options1.create_if_missing = true;
mock_env_->set_current_time(0);
options1.env = mock_env_.get();
Reopen(options1);
std::unique_ptr<MockTimeEnv> mock_env2(new MockTimeEnv(Env::Default()));
Options options2;
options2.stats_dump_period_sec = kDumpPeriodSec;
options2.stats_persist_period_sec = kPersistPeriodSec;
options2.create_if_missing = true;
mock_env2->set_current_time(0);
options1.env = mock_env2.get();
std::string dbname = test::PerThreadDBPath("multi_env_test");
DB* db;
ASSERT_OK(DB::Open(options2, dbname, &db));
DBImpl* dbi = static_cast_with_check<DBImpl>(db);
ASSERT_EQ(dbi->TEST_GetStatsDumpScheduler(),
dbfull()->TEST_GetStatsDumpScheduler());
db->Close();
delete db;
Close();
}
#endif // !ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
此差异已折叠。
......@@ -87,6 +87,7 @@ LIB_SOURCES = \
monitoring/perf_context.cc \
monitoring/perf_level.cc \
monitoring/statistics.cc \
monitoring/stats_dump_scheduler.cc \
monitoring/thread_status_impl.cc \
monitoring/thread_status_updater.cc \
monitoring/thread_status_updater_debug.cc \
......@@ -369,6 +370,9 @@ MAIN_SOURCES = \
monitoring/histogram_test.cc \
monitoring/iostats_context_test.cc \
monitoring/statistics_test.cc \
monitoring/stats_dump_scheduler_test.cc \
monitoring/stats_history_test.cc \
options/options_settable_test.cc
options/options_test.cc \
table/block_based_filter_block_test.cc \
table/block_test.cc \
......
......@@ -42,20 +42,32 @@ class Timer {
executing_task_(false) {}
~Timer() {}
// Add a new function. If the fn_name already exists, overriding it,
// regardless if the function is pending removed (invalid) or not.
// repeat_every_us == 0 means do not repeat
void Add(std::function<void()> fn,
const std::string& fn_name,
uint64_t start_after_us,
uint64_t repeat_every_us) {
std::unique_ptr<FunctionInfo> fn_info(new FunctionInfo(
std::move(fn),
fn_name,
env_->NowMicros() + start_after_us,
repeat_every_us));
InstrumentedMutexLock l(&mutex_);
heap_.push(fn_info.get());
map_.emplace(std::make_pair(fn_name, std::move(fn_info)));
std::unique_ptr<FunctionInfo> fn_info(
new FunctionInfo(std::move(fn), fn_name,
env_->NowMicros() + start_after_us, repeat_every_us));
{
InstrumentedMutexLock l(&mutex_);
auto it = map_.find(fn_name);
if (it == map_.end()) {
heap_.push(fn_info.get());
map_.emplace(std::make_pair(fn_name, std::move(fn_info)));
} else {
// If it already exists, overriding it.
it->second->fn = std::move(fn_info->fn);
it->second->valid = true;
it->second->next_run_time_us = env_->NowMicros() + start_after_us;
it->second->repeat_every_us = repeat_every_us;
}
}
cond_var_.SignalAll();
}
void Cancel(const std::string& fn_name) {
......@@ -116,6 +128,45 @@ class Timer {
return true;
}
bool HasPendingTask() const {
InstrumentedMutexLock l(&mutex_);
for (auto it = map_.begin(); it != map_.end(); it++) {
if (it->second->IsValid()) {
return true;
}
}
return false;
}
#ifndef NDEBUG
void TEST_WaitForRun(std::function<void()> callback = nullptr) {
InstrumentedMutexLock l(&mutex_);
while (!heap_.empty() &&
heap_.top()->next_run_time_us <= env_->NowMicros()) {
cond_var_.TimedWait(env_->NowMicros() + 1000);
}
if (callback != nullptr) {
callback();
}
cond_var_.SignalAll();
do {
cond_var_.TimedWait(env_->NowMicros() + 1000);
} while (!heap_.empty() &&
heap_.top()->next_run_time_us <= env_->NowMicros());
}
size_t TEST_GetPendingTaskNum() const {
InstrumentedMutexLock l(&mutex_);
size_t ret = 0;
for (auto it = map_.begin(); it != map_.end(); it++) {
if (it->second->IsValid()) {
ret++;
}
}
return ret;
}
#endif // NDEBUG
private:
void Run() {
......@@ -138,14 +189,16 @@ class Timer {
}
if (current_fn->next_run_time_us <= env_->NowMicros()) {
// make a copy of the function so it won't be changed after
// mutex_.unlock.
std::function<void()> fn = current_fn->fn;
executing_task_ = true;
mutex_.Unlock();
// Execute the work
current_fn->fn();
fn();
mutex_.Lock();
executing_task_ = false;
cond_var_.SignalAll();
// Remove the work from the heap once it is done executing.
// Note that we are just removing the pointer from the heap. Its
// memory is still managed in the map (as it holds a unique ptr).
......@@ -239,7 +292,7 @@ class Timer {
Env* const env_;
// This mutex controls both the heap_ and the map_. It needs to be held for
// making any changes in them.
InstrumentedMutex mutex_;
mutable InstrumentedMutex mutex_;
InstrumentedCondVar cond_var_;
std::unique_ptr<port::Thread> thread_;
bool running_;
......
......@@ -354,6 +354,41 @@ TEST_F(TimerTest, ShutdownRunningTask) {
control_thr.join();
delete value;
}
<<<<<<< Updated upstream
=======
TEST_F(TimerTest, AddSameFuncNameTest) {
mock_env_->set_current_time(0);
Timer timer(mock_env_.get());
ASSERT_TRUE(timer.Start());
int func_counter1 = 0;
timer.Add([&] { func_counter1++; }, "duplicated_func", 1 * kSecond,
5 * kSecond);
int func2_counter = 0;
timer.Add([&] { func2_counter++; }, "func2", 1 * kSecond, 4 * kSecond);
// New function with the same name should override the existing one
int func_counter2 = 0;
timer.Add([&] { func_counter2++; }, "duplicated_func", 1 * kSecond,
5 * kSecond);
timer.TEST_WaitForRun([&] { mock_env_->set_current_time(1); });
ASSERT_EQ(func_counter1, 0);
ASSERT_EQ(func2_counter, 1);
ASSERT_EQ(func_counter2, 1);
timer.TEST_WaitForRun([&] { mock_env_->set_current_time(6); });
ASSERT_EQ(func_counter1, 0);
ASSERT_EQ(func2_counter, 2);
ASSERT_EQ(func_counter2, 2);
ASSERT_TRUE(timer.Shutdown());
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册