提交 181191a1 编写于 作者: Y Yueh-Hsuan Chiang

Add a counter for collecting the wait time on db mutex.

Summary:
Add a counter for collecting the wait time on db mutex.
Also add MutexWrapper and CondVarWrapper for measuring wait time.

Test Plan:
./db_test
export ROCKSDB_TESTS=MutexWaitStats
./db_test

verify stats output using db_bench
make clean
make release
./db_bench --statistics=1 --benchmarks=fillseq,readwhilewriting --num=10000 --threads=10

Sample output:
    rocksdb.db.mutex.wait.micros COUNT : 7546866

Reviewers: MarkCallaghan, rven, sdong, igor

Reviewed By: igor

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D32787
上级 f36d394a
......@@ -66,7 +66,7 @@ uint64_t SlowdownAmount(int n, double bottom, double top) {
} // namespace
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
ColumnFamilyData* column_family_data, DBImpl* db, port::Mutex* mutex)
ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
: cfd_(column_family_data), db_(db), mutex_(mutex) {
if (cfd_ != nullptr) {
cfd_->Ref();
......@@ -482,7 +482,7 @@ Compaction* ColumnFamilyData::CompactRange(
}
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
port::Mutex* db_mutex) {
InstrumentedMutex* db_mutex) {
SuperVersion* sv = nullptr;
sv = GetThreadLocalSuperVersion(db_mutex);
sv->Ref();
......@@ -493,7 +493,7 @@ SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
}
SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
port::Mutex* db_mutex) {
InstrumentedMutex* db_mutex) {
SuperVersion* sv = nullptr;
// The SuperVersion is cached in thread local storage to avoid acquiring
// mutex when SuperVersion does not change since the last use. When a new
......@@ -599,13 +599,13 @@ void ColumnFamilyData::NotifyOnFlushCompleted(
}
SuperVersion* ColumnFamilyData::InstallSuperVersion(
SuperVersion* new_superversion, port::Mutex* db_mutex) {
SuperVersion* new_superversion, InstrumentedMutex* db_mutex) {
db_mutex->AssertHeld();
return InstallSuperVersion(new_superversion, db_mutex, mutable_cf_options_);
}
SuperVersion* ColumnFamilyData::InstallSuperVersion(
SuperVersion* new_superversion, port::Mutex* db_mutex,
SuperVersion* new_superversion, InstrumentedMutex* db_mutex,
const MutableCFOptions& mutable_cf_options) {
new_superversion->db_mutex = db_mutex;
new_superversion->mutable_cf_options = mutable_cf_options;
......
......@@ -21,9 +21,10 @@
#include "db/write_batch_internal.h"
#include "db/write_controller.h"
#include "db/table_cache.h"
#include "util/thread_local.h"
#include "db/flush_scheduler.h"
#include "util/instrumented_mutex.h"
#include "util/mutable_cf_options.h"
#include "util/thread_local.h"
namespace rocksdb {
......@@ -38,6 +39,8 @@ class InternalStats;
class ColumnFamilyData;
class DBImpl;
class LogBuffer;
class InstrumentedMutex;
class InstrumentedMutexLock;
// ColumnFamilyHandleImpl is the class that clients use to access different
// column families. It has non-trivial destructor, which gets called when client
......@@ -45,7 +48,8 @@ class LogBuffer;
class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
public:
// create while holding the mutex
ColumnFamilyHandleImpl(ColumnFamilyData* cfd, DBImpl* db, port::Mutex* mutex);
ColumnFamilyHandleImpl(
ColumnFamilyData* cfd, DBImpl* db, InstrumentedMutex* mutex);
// destroy without mutex
virtual ~ColumnFamilyHandleImpl();
virtual ColumnFamilyData* cfd() const { return cfd_; }
......@@ -57,7 +61,7 @@ class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
private:
ColumnFamilyData* cfd_;
DBImpl* db_;
port::Mutex* mutex_;
InstrumentedMutex* mutex_;
};
// Does not ref-count ColumnFamilyData
......@@ -91,7 +95,7 @@ struct SuperVersion {
autovector<MemTable*> to_delete;
// Version number of the current SuperVersion
uint64_t version_number;
port::Mutex* db_mutex;
InstrumentedMutex* db_mutex;
// should be called outside the mutex
SuperVersion() = default;
......@@ -235,11 +239,11 @@ class ColumnFamilyData {
SuperVersion* GetSuperVersion() { return super_version_; }
// thread-safe
// Return a already referenced SuperVersion to be used safely.
SuperVersion* GetReferencedSuperVersion(port::Mutex* db_mutex);
SuperVersion* GetReferencedSuperVersion(InstrumentedMutex* db_mutex);
// thread-safe
// Get SuperVersion stored in thread local storage. If it does not exist,
// get a reference from a current SuperVersion.
SuperVersion* GetThreadLocalSuperVersion(port::Mutex* db_mutex);
SuperVersion* GetThreadLocalSuperVersion(InstrumentedMutex* db_mutex);
// Try to return SuperVersion back to thread local storage. Retrun true on
// success and false on failure. It fails when the thread local storage
// contains anything other than SuperVersion::kSVInUse flag.
......@@ -254,10 +258,10 @@ class ColumnFamilyData {
// the clients to allocate SuperVersion outside of mutex.
// IMPORTANT: Only call this from DBImpl::InstallSuperVersion()
SuperVersion* InstallSuperVersion(SuperVersion* new_superversion,
port::Mutex* db_mutex,
InstrumentedMutex* db_mutex,
const MutableCFOptions& mutable_cf_options);
SuperVersion* InstallSuperVersion(SuperVersion* new_superversion,
port::Mutex* db_mutex);
InstrumentedMutex* db_mutex);
void ResetThreadLocalSuperVersions();
......
......@@ -470,7 +470,7 @@ Status CompactionJob::Run() {
return status;
}
void CompactionJob::Install(Status* status, port::Mutex* db_mutex) {
void CompactionJob::Install(Status* status, InstrumentedMutex* db_mutex) {
db_mutex->AssertHeld();
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
cfd->internal_stats()->AddCompactionStats(
......@@ -955,7 +955,7 @@ Status CompactionJob::FinishCompactionOutputFile(Iterator* input) {
return s;
}
Status CompactionJob::InstallCompactionResults(port::Mutex* db_mutex) {
Status CompactionJob::InstallCompactionResults(InstrumentedMutex* db_mutex) {
db_mutex->AssertHeld();
// paranoia: verify that the files that we started with
......
......@@ -75,7 +75,7 @@ class CompactionJob {
Status Run();
// REQUIRED: mutex held
// status is the return of Run()
void Install(Status* status, port::Mutex* db_mutex);
void Install(Status* status, InstrumentedMutex* db_mutex);
private:
void AllocateCompactionOutputFileNumbers();
......@@ -86,7 +86,7 @@ class CompactionJob {
// Call compaction_filter_v2->Filter() on kv-pairs in compact
void CallCompactionFilterV2(CompactionFilterV2* compaction_filter_v2);
Status FinishCompactionOutputFile(Iterator* input);
Status InstallCompactionResults(port::Mutex* db_mutex);
Status InstallCompactionResults(InstrumentedMutex* db_mutex);
SequenceNumber findEarliestVisibleSnapshot(
SequenceNumber in, const std::vector<SequenceNumber>& snapshots,
SequenceNumber* prev_snapshot);
......
......@@ -130,7 +130,7 @@ class CompactionJobTest {
ColumnFamilyOptions cf_options_;
WriteBuffer write_buffer_;
std::unique_ptr<VersionSet> versions_;
port::Mutex mutex_;
InstrumentedMutex mutex_;
std::atomic<bool> shutting_down_;
std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
};
......
......@@ -31,7 +31,7 @@
namespace rocksdb {
Status DBImpl::DisableFileDeletions() {
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
++disable_delete_obsolete_files_;
if (disable_delete_obsolete_files_ == 1) {
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
......@@ -48,7 +48,7 @@ Status DBImpl::EnableFileDeletions(bool force) {
JobContext job_context;
bool should_purge_files = false;
{
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
if (force) {
// if force, we need to enable file deletions right away
disable_delete_obsolete_files_ = 0;
......
......@@ -197,7 +197,9 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
db_options_(SanitizeOptions(dbname, options)),
stats_(db_options_.statistics.get()),
db_lock_(nullptr),
mutex_(options.use_adaptive_mutex),
mutex_(stats_, env_,
DB_MUTEX_WAIT_MICROS,
options.use_adaptive_mutex),
shutting_down_(false),
bg_cv_(&mutex_),
logfile_number_(0),
......@@ -411,7 +413,7 @@ void DBImpl::MaybeDumpStats() {
GetPropertyType("rocksdb.dbstats", &tmp1, &tmp2);
std::string stats;
{
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->internal_stats()->GetStringProperty(cf_property_type,
"rocksdb.cfstats", &stats);
......@@ -1225,7 +1227,7 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
int max_level_with_files = 0;
{
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
Version* base = cfd->current();
for (int level = 1; level < cfd->NumberLevels(); level++) {
if (base->storage_info()->OverlapInLevel(level, begin, end)) {
......@@ -1258,7 +1260,7 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
LogFlush(db_options_.info_log);
{
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
// an automatic compaction that has been scheduled might have been
// preempted by the manual compactions. Need to schedule it back.
MaybeScheduleFlushOrCompaction();
......@@ -1276,7 +1278,7 @@ Status DBImpl::CompactFiles(
// not supported in lite version
return Status::NotSupported("Not supported in ROCKSDB LITE");
#else
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
if (column_family == nullptr) {
return Status::InvalidArgument("ColumnFamilyHandle must be non-null.");
}
......@@ -1471,7 +1473,7 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
MutableCFOptions new_options;
Status s;
{
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
s = cfd->SetOptions(options_map);
if (s.ok()) {
new_options = *cfd->GetLatestMutableCFOptions();
......@@ -1607,14 +1609,14 @@ int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) {
int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* column_family) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
return cfh->cfd()->GetSuperVersion()->
mutable_cf_options.max_mem_compaction_level;
}
int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
return cfh->cfd()->GetSuperVersion()->
mutable_cf_options.level0_stop_writes_trigger;
}
......@@ -1662,7 +1664,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
manual.end = &end_storage;
}
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
// When a manual compaction arrives, temporarily disable scheduling of
// non-manual compactions and wait until the number of scheduled compaction
......@@ -1717,7 +1719,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
Status s;
{
WriteContext context;
MutexLock guard_lock(&mutex_);
InstrumentedMutexLock guard_lock(&mutex_);
if (cfd->imm()->size() == 0 && cfd->mem()->IsEmpty()) {
// Nothing to flush
......@@ -1750,7 +1752,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) {
Status s;
// Wait until the compaction completes
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
while (cfd->imm()->size() > 0 && bg_error_.ok()) {
bg_cv_.Wait();
}
......@@ -1917,7 +1919,7 @@ void DBImpl::BackgroundCallFlush() {
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
{
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs();
......@@ -1985,7 +1987,7 @@ void DBImpl::BackgroundCallCompaction() {
MaybeDumpStats();
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
{
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs();
......@@ -2352,11 +2354,11 @@ uint64_t DBImpl::CallFlushDuringCompaction(
namespace {
struct IterState {
IterState(DBImpl* _db, port::Mutex* _mu, SuperVersion* _super_version)
IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version)
: db(_db), mu(_mu), super_version(_super_version) {}
DBImpl* db;
port::Mutex* mu;
InstrumentedMutex* mu;
SuperVersion* super_version;
};
......@@ -2643,7 +2645,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
Status s;
*handle = nullptr;
{
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
if (versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name) !=
nullptr) {
......@@ -2691,7 +2693,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
"Creating column family [%s] FAILED -- %s",
column_family_name.c_str(), s.ToString().c_str());
}
} // MutexLock l(&mutex_)
} // InstrumentedMutexLock l(&mutex_)
// this is outside the mutex
if (s.ok()) {
......@@ -2716,7 +2718,7 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
Status s;
{
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
if (cfd->IsDropped()) {
s = Status::InvalidArgument("Column family already dropped!\n");
}
......@@ -2919,14 +2921,14 @@ const Snapshot* DBImpl::GetSnapshot() {
int64_t unix_time = 0;
env_->GetCurrentTime(&unix_time); // Ignore error
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
// returns null if the underlying memtable does not support snapshot.
if (!is_snapshot_supported_) return nullptr;
return snapshots_.New(versions_->LastSequence(), unix_time);
}
void DBImpl::ReleaseSnapshot(const Snapshot* s) {
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
snapshots_.Delete(reinterpret_cast<const SnapshotImpl*>(s));
}
......@@ -3377,7 +3379,7 @@ bool DBImpl::GetProperty(ColumnFamilyHandle* column_family,
} else {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
return cfd->internal_stats()->GetStringProperty(property_type, property,
value);
}
......@@ -3403,7 +3405,7 @@ bool DBImpl::GetIntPropertyInternal(ColumnFamilyHandle* column_family,
auto cfd = cfh->cfd();
if (!need_out_of_mutex) {
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
return cfd->internal_stats()->GetIntProperty(property_type, value, this);
} else {
SuperVersion* sv = GetAndRefSuperVersion(cfd);
......@@ -3430,7 +3432,7 @@ void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd,
// Release SuperVersion
if (sv->Unref()) {
{
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
sv->Cleanup();
}
delete sv;
......@@ -3447,7 +3449,7 @@ void DBImpl::GetApproximateSizes(ColumnFamilyHandle* column_family,
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
{
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
v = cfd->current();
v->Ref();
}
......@@ -3462,7 +3464,7 @@ void DBImpl::GetApproximateSizes(ColumnFamilyHandle* column_family,
}
{
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
v->Unref();
}
}
......@@ -3530,7 +3532,7 @@ Status DBImpl::DeleteFile(std::string name) {
VersionEdit edit;
JobContext job_context(true);
{
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
status = versions_->GetMetadataForFile(number, &level, &metadata, &cfd);
if (!status.ok()) {
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
......@@ -3589,7 +3591,7 @@ Status DBImpl::DeleteFile(std::string name) {
}
void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
versions_->GetLiveFilesMetaData(metadata);
}
......
......@@ -36,6 +36,7 @@
#include "util/thread_local.h"
#include "util/scoped_arena_iterator.h"
#include "util/hash.h"
#include "util/instrumented_mutex.h"
#include "db/internal_stats.h"
#include "db/write_controller.h"
#include "db/flush_scheduler.h"
......@@ -412,7 +413,7 @@ class DBImpl : public DB {
FileLock* db_lock_;
// State below is protected by mutex_
port::Mutex mutex_;
InstrumentedMutex mutex_;
std::atomic<bool> shutting_down_;
// This condition variable is signaled on these conditions:
// * whenever bg_compaction_scheduled_ goes down to 0
......@@ -422,7 +423,7 @@ class DBImpl : public DB {
// * whenever bg_flush_scheduled_ value decreases (i.e. whenever a flush is
// done, even if it didn't make any progress)
// * whenever there is an error in background flush or compaction
port::CondVar bg_cv_;
InstrumentedCondVar bg_cv_;
uint64_t logfile_number_;
unique_ptr<log::Writer> log_;
bool log_dir_synced_;
......
......@@ -15,7 +15,7 @@
namespace rocksdb {
uint64_t DBImpl::TEST_GetLevel0TotalSize() {
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
return default_cf_handle_->cfd()->current()->storage_info()->NumLevelBytes(0);
}
......@@ -45,7 +45,7 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes(
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
cfd = cfh->cfd();
}
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
return cfd->current()->storage_info()->MaxNextLevelOverlappingBytes();
}
......@@ -54,7 +54,7 @@ void DBImpl::TEST_GetFilesMetaData(
std::vector<std::vector<FileMetaData>>* metadata) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
metadata->resize(NumberLevels());
for (int level = 0; level < NumberLevels(); level++) {
const std::vector<FileMetaData*>& files =
......@@ -113,7 +113,7 @@ Status DBImpl::TEST_WaitForCompact() {
// wait for compact. It actually waits for scheduled compaction
// OR flush to finish.
MutexLock l(&mutex_);
InstrumentedMutexLock l(&mutex_);
while ((bg_compaction_scheduled_ || bg_flush_scheduled_) && bg_error_.ok()) {
bg_cv_.Wait();
}
......
......@@ -10298,6 +10298,21 @@ TEST(DBTest, EncodeDecompressedBlockSizeTest) {
}
}
TEST(DBTest, MutexWaitStats) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.statistics = rocksdb::CreateDBStatistics();
CreateAndReopenWithCF({"pikachu"}, options);
const int64_t kMutexWaitDelay = 100;
ThreadStatusUtil::TEST_SetStateDelay(
ThreadStatus::STATE_MUTEX_WAIT, kMutexWaitDelay);
ASSERT_OK(Put("hello", "rocksdb"));
ASSERT_GE(TestGetTickerCount(
options, DB_MUTEX_WAIT_MICROS), kMutexWaitDelay);
ThreadStatusUtil::TEST_SetStateDelay(
ThreadStatus::STATE_MUTEX_WAIT, 0);
}
} // namespace rocksdb
int main(int argc, char** argv) {
......
......@@ -55,7 +55,8 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
const DBOptions& db_options,
const MutableCFOptions& mutable_cf_options,
const EnvOptions& env_options, VersionSet* versions,
port::Mutex* db_mutex, std::atomic<bool>* shutting_down,
InstrumentedMutex* db_mutex,
std::atomic<bool>* shutting_down,
SequenceNumber newest_snapshot, JobContext* job_context,
LogBuffer* log_buffer, Directory* db_directory,
Directory* output_file_directory,
......
......@@ -28,6 +28,7 @@
#include "rocksdb/memtablerep.h"
#include "rocksdb/transaction_log.h"
#include "util/autovector.h"
#include "util/instrumented_mutex.h"
#include "util/stop_watch.h"
#include "util/thread_local.h"
#include "util/scoped_arena_iterator.h"
......@@ -54,7 +55,7 @@ class FlushJob {
const DBOptions& db_options,
const MutableCFOptions& mutable_cf_options,
const EnvOptions& env_options, VersionSet* versions,
port::Mutex* db_mutex, std::atomic<bool>* shutting_down,
InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down,
SequenceNumber newest_snapshot, JobContext* job_context,
LogBuffer* log_buffer, Directory* db_directory,
Directory* output_file_directory, CompressionType output_compression,
......@@ -72,7 +73,7 @@ class FlushJob {
const MutableCFOptions& mutable_cf_options_;
const EnvOptions& env_options_;
VersionSet* versions_;
port::Mutex* db_mutex_;
InstrumentedMutex* db_mutex_;
std::atomic<bool>* shutting_down_;
SequenceNumber newest_snapshot_;
JobContext* job_context_;
......
......@@ -75,7 +75,7 @@ class FlushJobTest {
WriteBuffer write_buffer_;
ColumnFamilyOptions cf_options_;
std::unique_ptr<VersionSet> versions_;
port::Mutex mutex_;
InstrumentedMutex mutex_;
std::atomic<bool> shutting_down_;
std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
};
......
......@@ -31,8 +31,8 @@ void BM_LogAndApply(int iters, int num_base_files) {
WriteController wc;
ColumnFamilyData* default_cfd;
uint64_t fnum = 1;
port::Mutex mu;
MutexLock l(&mu);
InstrumentedMutex mu;
InstrumentedMutexLock l(&mu);
BENCHMARK_SUSPEND {
std::string dbname = test::TmpDir() + "/rocksdb_test_benchmark";
......
......@@ -164,7 +164,7 @@ void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
// Record a successful flush in the manifest file
Status MemTableList::InstallMemtableFlushResults(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
const autovector<MemTable*>& mems, VersionSet* vset, port::Mutex* mu,
const autovector<MemTable*>& mems, VersionSet* vset, InstrumentedMutex* mu,
uint64_t file_number, autovector<MemTable*>* to_delete,
Directory* db_directory, LogBuffer* log_buffer) {
mu->AssertHeld();
......
......@@ -22,13 +22,14 @@
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "util/autovector.h"
#include "util/instrumented_mutex.h"
#include "util/log_buffer.h"
namespace rocksdb {
class ColumnFamilyData;
class InternalKeyComparator;
class Mutex;
class InstrumentedMutex;
class MergeIteratorBuilder;
// keeps a list of immutable memtables in a vector. the list is immutable
......@@ -113,7 +114,7 @@ class MemTableList {
// Commit a successful flush in the manifest file
Status InstallMemtableFlushResults(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
const autovector<MemTable*>& m, VersionSet* vset, port::Mutex* mu,
const autovector<MemTable*>& m, VersionSet* vset, InstrumentedMutex* mu,
uint64_t file_number, autovector<MemTable*>* to_delete,
Directory* db_directory, LogBuffer* log_buffer);
......
......@@ -1487,11 +1487,11 @@ std::string Version::DebugString(bool hex) const {
struct VersionSet::ManifestWriter {
Status status;
bool done;
port::CondVar cv;
InstrumentedCondVar cv;
ColumnFamilyData* cfd;
VersionEdit* edit;
explicit ManifestWriter(port::Mutex* mu, ColumnFamilyData* _cfd,
explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd,
VersionEdit* e)
: done(false), cv(mu), cfd(_cfd), edit(e) {}
};
......@@ -1556,7 +1556,7 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
const MutableCFOptions& mutable_cf_options,
VersionEdit* edit, port::Mutex* mu,
VersionEdit* edit, InstrumentedMutex* mu,
Directory* db_directory, bool new_descriptor_log,
const ColumnFamilyOptions* new_cf_options) {
mu->AssertHeld();
......@@ -1824,7 +1824,7 @@ void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
VersionBuilder* builder, Version* v,
VersionEdit* edit, port::Mutex* mu) {
VersionEdit* edit, InstrumentedMutex* mu) {
mu->AssertHeld();
assert(!edit->IsColumnFamilyManipulation());
......@@ -2275,8 +2275,8 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
MutableCFOptions mutable_cf_options(*options, ImmutableCFOptions(*options));
VersionEdit ve;
port::Mutex dummy_mutex;
MutexLock l(&dummy_mutex);
InstrumentedMutex dummy_mutex;
InstrumentedMutexLock l(&dummy_mutex);
return versions.LogAndApply(
versions.GetColumnFamilySet()->GetDefault(),
mutable_cf_options, &ve, &dummy_mutex, nullptr, true);
......
......@@ -36,6 +36,7 @@
#include "db/log_reader.h"
#include "db/file_indexer.h"
#include "db/write_controller.h"
#include "util/instrumented_mutex.h"
namespace rocksdb {
......@@ -485,7 +486,7 @@ class VersionSet {
Status LogAndApply(
ColumnFamilyData* column_family_data,
const MutableCFOptions& mutable_cf_options, VersionEdit* edit,
port::Mutex* mu, Directory* db_directory = nullptr,
InstrumentedMutex* mu, Directory* db_directory = nullptr,
bool new_descriptor_log = false,
const ColumnFamilyOptions* column_family_options = nullptr);
......@@ -656,7 +657,7 @@ class VersionSet {
void LogAndApplyCFHelper(VersionEdit* edit);
void LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b, Version* v,
VersionEdit* edit, port::Mutex* mu);
VersionEdit* edit, InstrumentedMutex* mu);
};
} // namespace rocksdb
......@@ -12,6 +12,7 @@
#include "db/write_batch_internal.h"
#include "util/autovector.h"
#include "port/port.h"
#include "util/instrumented_mutex.h"
namespace rocksdb {
......@@ -27,9 +28,9 @@ class WriteThread {
bool in_batch_group;
bool done;
uint64_t timeout_hint_us;
port::CondVar cv;
InstrumentedCondVar cv;
explicit Writer(port::Mutex* mu)
explicit Writer(InstrumentedMutex* mu)
: batch(nullptr),
sync(false),
disableWAL(false),
......
......@@ -81,6 +81,8 @@ enum Tickers : uint32_t {
STALL_L0_NUM_FILES_MICROS,
// Writer has to wait for compaction or flush to finish.
STALL_MICROS,
// The wait time for db mutex.
DB_MUTEX_WAIT_MICROS,
RATE_LIMIT_DELAY_MILLIS,
NO_ITERATORS, // number of iterators currently open
......@@ -163,6 +165,7 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{STALL_MEMTABLE_COMPACTION_MICROS, "rocksdb.memtable.compaction.micros"},
{STALL_L0_NUM_FILES_MICROS, "rocksdb.l0.num.files.stall.micros"},
{STALL_MICROS, "rocksdb.stall.micros"},
{DB_MUTEX_WAIT_MICROS, "rocksdb.db.mutex.wait.micros"},
{RATE_LIMIT_DELAY_MILLIS, "rocksdb.rate.limit.delay.millis"},
{NO_ITERATORS, "rocksdb.num.iterators"},
{NUMBER_MULTIGET_CALLS, "rocksdb.number.multiget.get"},
......
......@@ -53,6 +53,7 @@ struct ThreadStatus {
// such as reading / writing a file or waiting for a mutex.
enum StateType : int {
STATE_UNKNOWN = 0,
STATE_MUTEX_WAIT = 1,
NUM_STATE_TYPES
};
......
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "util/instrumented_mutex.h"
#include "util/thread_status_util.h"
namespace rocksdb {
void InstrumentedMutex::Lock() {
uint64_t wait_time_micros = 0;
if (env_ != nullptr && stats_ != nullptr) {
{
StopWatch sw(env_, nullptr, 0, &wait_time_micros);
LockInternal();
}
RecordTick(stats_, stats_code_, wait_time_micros);
} else {
LockInternal();
}
}
void InstrumentedMutex::LockInternal() {
#ifndef NDEBUG
ThreadStatusUtil::TEST_StateDelay(ThreadStatus::STATE_MUTEX_WAIT);
#endif
mutex_.Lock();
}
void InstrumentedCondVar::Wait() {
uint64_t wait_time_micros = 0;
if (env_ != nullptr && stats_ != nullptr) {
{
StopWatch sw(env_, nullptr, 0, &wait_time_micros);
WaitInternal();
}
RecordTick(stats_, stats_code_, wait_time_micros);
} else {
WaitInternal();
}
}
void InstrumentedCondVar::WaitInternal() {
#ifndef NDEBUG
ThreadStatusUtil::TEST_StateDelay(ThreadStatus::STATE_MUTEX_WAIT);
#endif
cond_.Wait();
}
bool InstrumentedCondVar::TimedWait(uint64_t abs_time_us) {
uint64_t wait_time_micros = 0;
bool result = false;
if (env_ != nullptr && stats_ != nullptr) {
{
StopWatch sw(env_, nullptr, 0, &wait_time_micros);
result = TimedWaitInternal(abs_time_us);
}
RecordTick(stats_, stats_code_, wait_time_micros);
} else {
result = TimedWaitInternal(abs_time_us);
}
return result;
}
bool InstrumentedCondVar::TimedWaitInternal(uint64_t abs_time_us) {
#ifndef NDEBUG
ThreadStatusUtil::TEST_StateDelay(ThreadStatus::STATE_MUTEX_WAIT);
#endif
return cond_.TimedWait(abs_time_us);
}
} // namespace rocksdb
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/statistics.h"
#include "rocksdb/thread_status.h"
#include "util/statistics.h"
#include "util/stop_watch.h"
namespace rocksdb {
class InstrumentedCondVar;
// A wrapper class for port::Mutex that provides additional layer
// for collecting stats and instrumentation.
class InstrumentedMutex {
public:
explicit InstrumentedMutex(bool adaptive = false)
: mutex_(adaptive), stats_(nullptr), env_(nullptr),
stats_code_(0) {}
InstrumentedMutex(
Statistics* stats, Env* env,
int stats_code, bool adaptive = false)
: mutex_(adaptive), stats_(stats), env_(env),
stats_code_(stats_code) {}
void Lock();
void Unlock() {
mutex_.Unlock();
}
void AssertHeld() {
mutex_.AssertHeld();
}
private:
void LockInternal();
friend class InstrumentedCondVar;
port::Mutex mutex_;
Statistics* stats_;
Env* env_;
int stats_code_;
};
// A wrapper class for port::Mutex that provides additional layer
// for collecting stats and instrumentation.
class InstrumentedMutexLock {
public:
explicit InstrumentedMutexLock(InstrumentedMutex* mutex) : mutex_(mutex) {
mutex_->Lock();
}
~InstrumentedMutexLock() {
mutex_->Unlock();
}
private:
InstrumentedMutex* const mutex_;
InstrumentedMutexLock(const InstrumentedMutexLock&) = delete;
void operator=(const InstrumentedMutexLock&) = delete;
};
class InstrumentedCondVar {
public:
explicit InstrumentedCondVar(InstrumentedMutex* instrumented_mutex)
: cond_(&(instrumented_mutex->mutex_)),
stats_(instrumented_mutex->stats_),
env_(instrumented_mutex->env_),
stats_code_(instrumented_mutex->stats_code_) {}
void Wait();
bool TimedWait(uint64_t abs_time_us);
void Signal() {
cond_.Signal();
}
void SignalAll() {
cond_.SignalAll();
}
private:
void WaitInternal();
bool TimedWaitInternal(uint64_t abs_time_us);
port::CondVar cond_;
Statistics* stats_;
Env* env_;
int stats_code_;
};
} // namespace rocksdb
......@@ -11,6 +11,7 @@
#include "util/thread_status_updater.h"
namespace rocksdb {
class ColumnFamilyData;
// The static utility class for updating thread-local status.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册