提交 c91cdd59 编写于 作者: Y Yueh-Hsuan Chiang

Allow GetThreadList() to indicate a thread is doing Compaction.

Summary: Allow GetThreadList() to indicate a thread is doing Compaction.

Test Plan:
export ROCKSDB_TESTS=ThreadStatus
./db_test

Reviewers: ljin, igor, sdong

Reviewed By: sdong

Subscribers: leveldb, dhruba, jonahcohen, rven

Differential Revision: https://reviews.facebook.net/D30105
上级 402c1152
......@@ -50,6 +50,7 @@
#include "util/iostats_context_imp.h"
#include "util/stop_watch.h"
#include "util/sync_point.h"
#include "util/thread_status_util.h"
namespace rocksdb {
......@@ -270,6 +271,11 @@ void CompactionJob::Prepare() {
Status CompactionJob::Run() {
log_buffer_->FlushBufferToLog();
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
ThreadStatusUtil::SetColumnFamily(cfd);
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
#ifndef NDEBUG
ThreadStatusUtil::TEST_OperationDelay(ThreadStatus::OP_COMPACTION);
#endif
const uint64_t start_micros = env_->NowMicros();
std::unique_ptr<Iterator> input(
......@@ -459,6 +465,7 @@ Status CompactionJob::Run() {
RecordCompactionIOStats();
LogFlush(db_options_.info_log);
ThreadStatusUtil::ResetThreadStatus();
return status;
}
......
......@@ -2145,6 +2145,14 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
c->ReleaseCompactionFiles(status);
*madeProgress = true;
} else if (!is_manual && c->IsTrivialMove()) {
// Instrument for event update
// TODO(yhchiang): add op details for showing trivial-move.
ThreadStatusUtil::SetColumnFamily(c->column_family_data());
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
#ifndef NDEBUG
ThreadStatusUtil::TEST_OperationDelay(ThreadStatus::OP_COMPACTION);
#endif
// Move file to next level
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0);
......@@ -2171,6 +2179,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
c->column_family_data()->current()->storage_info()->LevelSummary(&tmp));
c->ReleaseCompactionFiles(status);
*madeProgress = true;
// Clear Instrument
ThreadStatusUtil::ResetThreadStatus();
} else {
auto yield_callback = [&]() {
return CallFlushDuringCompaction(c->column_family_data(),
......
......@@ -234,6 +234,7 @@ class DBImpl : public DB {
uint64_t TEST_max_total_in_memory_state() {
return max_total_in_memory_state_;
}
#endif // ROCKSDB_LITE
// Returns the list of live files in 'live' and the list
......
......@@ -10,6 +10,7 @@
#ifndef ROCKSDB_LITE
#include "db/db_impl.h"
#include "util/thread_status_updater.h"
namespace rocksdb {
......
......@@ -52,7 +52,7 @@
#include "util/testutil.h"
#include "util/mock_env.h"
#include "util/string_util.h"
#include "util/thread_status_updater.h"
#include "util/thread_status_util.h"
namespace rocksdb {
......@@ -9414,7 +9414,7 @@ TEST(DBTest, DynamicMemtableOptions) {
}
#if ROCKSDB_USING_THREAD_STATUS
TEST(DBTest, GetThreadList) {
TEST(DBTest, GetThreadStatus) {
Options options;
options.env = env_;
options.enable_thread_tracking = true;
......@@ -9472,7 +9472,7 @@ TEST(DBTest, GetThreadList) {
handles_, true);
}
TEST(DBTest, DisableThreadList) {
TEST(DBTest, DisableThreadStatus) {
Options options;
options.env = env_;
options.enable_thread_tracking = false;
......@@ -9482,6 +9482,146 @@ TEST(DBTest, DisableThreadList) {
env_->GetThreadStatusUpdater()->TEST_VerifyColumnFamilyInfoMap(
handles_, false);
}
TEST(DBTest, ThreadStatusSingleCompaction) {
const int kTestKeySize = 16;
const int kTestValueSize = 984;
const int kEntrySize = kTestKeySize + kTestValueSize;
const int kEntriesPerBuffer = 100;
Options options;
options.create_if_missing = true;
options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
options.compaction_style = kCompactionStyleLevel;
options.target_file_size_base = options.write_buffer_size;
options.max_bytes_for_level_base = options.target_file_size_base * 2;
options.max_bytes_for_level_multiplier = 2;
options.compression = kNoCompression;
options = CurrentOptions(options);
options.env = env_;
options.enable_thread_tracking = true;
const int kNumL0Files = 4;
options.level0_file_num_compaction_trigger = kNumL0Files;
for (int tests = 0; tests < 2; ++tests) {
TryReopen(options);
// Each compaction will run at least 2 seconds, which allows
// the test to capture the status of compaction with fewer
// false alarm.
const int kCompactionDelayMicro = 2000000;
ThreadStatusUtil::TEST_SetOperationDelay(
ThreadStatus::OP_COMPACTION, kCompactionDelayMicro);
Random rnd(301);
for (int key = kEntriesPerBuffer * kNumL0Files; key >= 0; --key) {
ASSERT_OK(Put(ToString(key), RandomString(&rnd, kTestValueSize)));
}
// wait for compaction to be scheduled
env_->SleepForMicroseconds(500000);
// check how many threads are doing compaction using GetThreadList
std::vector<ThreadStatus> thread_list;
Status s = env_->GetThreadList(&thread_list);
ASSERT_OK(s);
int compaction_count = 0;
for (auto thread : thread_list) {
if (thread.operation_type == ThreadStatus::OP_COMPACTION) {
compaction_count++;
}
}
if (options.enable_thread_tracking) {
// expecting one single L0 to L1 compaction
ASSERT_EQ(compaction_count, 1);
} else {
// If thread tracking is not enabled, compaction count should be 0.
ASSERT_EQ(compaction_count, 0);
}
ThreadStatusUtil::TEST_SetOperationDelay(
ThreadStatus::OP_COMPACTION, 0);
// repeat the test with disabling thread tracking.
options.enable_thread_tracking = false;
}
}
TEST(DBTest, ThreadStatusMultipleCompaction) {
const int kTestKeySize = 16;
const int kTestValueSize = 984;
const int kEntrySize = kTestKeySize + kTestValueSize;
const int kEntriesPerBuffer = 10;
const int kNumL0Files = 4;
const int kHighPriCount = 3;
const int kLowPriCount = 5;
env_->SetBackgroundThreads(kHighPriCount, Env::HIGH);
env_->SetBackgroundThreads(kLowPriCount, Env::LOW);
Options options;
options.create_if_missing = true;
options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
options.compaction_style = kCompactionStyleLevel;
options.target_file_size_base = options.write_buffer_size;
options.max_bytes_for_level_base =
options.target_file_size_base * kNumL0Files;
options.compression = kNoCompression;
options = CurrentOptions(options);
options.env = env_;
options.enable_thread_tracking = true;
options.level0_file_num_compaction_trigger = kNumL0Files;
options.max_bytes_for_level_multiplier = 2;
options.max_background_compactions = kLowPriCount;
for (int tests = 0; tests < 2; ++tests) {
TryReopen(options);
Random rnd(301);
int max_compaction_count = 0;
std::vector<ThreadStatus> thread_list;
const int kCompactionDelayMicro = 10000;
ThreadStatusUtil::TEST_SetOperationDelay(
ThreadStatus::OP_COMPACTION, kCompactionDelayMicro);
// Make rocksdb busy
int key = 0;
for (int file = 0; file < 64 * kNumL0Files; ++file) {
for (int k = 0; k < kEntriesPerBuffer; ++k) {
ASSERT_OK(Put(ToString(key++), RandomString(&rnd, kTestValueSize)));
}
// check how many threads are doing compaction using GetThreadList
int compaction_count = 0;
Status s = env_->GetThreadList(&thread_list);
for (auto thread : thread_list) {
if (thread.operation_type == ThreadStatus::OP_COMPACTION) {
compaction_count++;
}
}
// Record the max number of compactions at a time.
if (max_compaction_count < compaction_count) {
max_compaction_count = compaction_count;
}
}
if (options.enable_thread_tracking) {
// Expect rocksdb max-out the concurrent compaction jobs.
ASSERT_EQ(max_compaction_count, options.max_background_compactions);
} else {
// If thread tracking is not enabled, compaction count should be 0.
ASSERT_EQ(max_compaction_count, 0);
}
// repeat the test with disabling thread tracking.
options.enable_thread_tracking = false;
}
ThreadStatusUtil::TEST_SetOperationDelay(
ThreadStatus::OP_COMPACTION, 0);
}
#endif // ROCKSDB_USING_THREAD_STATUS
TEST(DBTest, DynamicCompactionOptions) {
......
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// This file defines the structures for thread event and operation.
// Thread events are used to describe high level action of a
// thread such as doing compaction or flush, while thread operation
// are used to describe lower-level action such as reading /
// writing a file or waiting for a mutex. Events and operations
// are designed to be independent. Typically, a thread usually involves
// in one event and one operation at any specific point in time.
#pragma once
#include "include/rocksdb/thread_status.h"
#include <string>
namespace rocksdb {
#if ROCKSDB_USING_THREAD_STATUS
// The structure that describes a major thread event.
struct EventInfo {
const ThreadStatus::EventType code;
const std::string name;
};
// The global event table.
//
// When updating a status of a thread, the pointer of the EventInfo
// of the current ThreadStatusData will be pointing to one of the
// rows in this global table.
//
// Note that it's not designed to be constant as in the future we
// might consider adding global count to the EventInfo.
static EventInfo global_event_table[] = {
{ThreadStatus::EVENT_UNKNOWN, ""},
{ThreadStatus::EVENT_COMPACTION, "Compaction"},
{ThreadStatus::EVENT_FLUSH, "Flush"}
};
// The structure that describes a operation.
struct OperationInfo {
const ThreadStatus::OperationType code;
const std::string name;
};
// The global operation table.
//
// When updating a status of a thread, the pointer of the OperationInfo
// of the current ThreadStatusData will be pointing to one of the
// rows in this global table.
static OperationInfo global_operation_table[] = {
{ThreadStatus::OPERATION_UNKNOWN, ""},
{ThreadStatus::OPERATION_WRITE_FILE, "Writing SST file"},
{ThreadStatus::OPERATION_READ_FILE, "Reaing SST file"},
{ThreadStatus::OPERATION_WAIT_DB_MUTEX, "Waiting DB Mutex"}
};
#else
struct EventInfo {
};
struct OperationInfo {
};
#endif // ROCKSDB_USING_THREAD_STATUS
} // namespace rocksdb
......@@ -29,20 +29,46 @@ void ThreadStatusUpdater::SetThreadType(
data->thread_type.store(ttype, std::memory_order_relaxed);
}
void ThreadStatusUpdater::ResetThreadStatus() {
ClearThreadState();
ClearThreadOperation();
SetColumnFamilyInfoKey(nullptr);
}
void ThreadStatusUpdater::SetColumnFamilyInfoKey(
const void* cf_key) {
auto* data = InitAndGet();
// set the tracking flag based on whether cf_key is non-null or not.
// If enable_thread_tracking is set to false, the input cf_key
// would be nullptr.
data->enable_tracking = (cf_key != nullptr);
data->cf_key.store(cf_key, std::memory_order_relaxed);
}
const void* ThreadStatusUpdater::GetColumnFamilyInfoKey() {
auto* data = InitAndGet();
if (data->enable_tracking == false) {
return nullptr;
}
return data->cf_key.load(std::memory_order_relaxed);
}
void ThreadStatusUpdater::SetThreadOperation(
const ThreadStatus::OperationType type) {
auto* data = InitAndGet();
if (!data->enable_tracking) {
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
return;
}
data->operation_type.store(type, std::memory_order_relaxed);
}
void ThreadStatusUpdater::ClearThreadOperation() {
auto* data = InitAndGet();
if (!data->enable_tracking) {
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
return;
}
data->operation_type.store(
ThreadStatus::OP_UNKNOWN, std::memory_order_relaxed);
}
......@@ -50,11 +76,19 @@ void ThreadStatusUpdater::ClearThreadOperation() {
void ThreadStatusUpdater::SetThreadState(
const ThreadStatus::StateType type) {
auto* data = InitAndGet();
if (!data->enable_tracking) {
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
return;
}
data->state_type.store(type, std::memory_order_relaxed);
}
void ThreadStatusUpdater::ClearThreadState() {
auto* data = InitAndGet();
if (!data->enable_tracking) {
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
return;
}
data->state_type.store(
ThreadStatus::STATE_UNKNOWN, std::memory_order_relaxed);
}
......@@ -176,6 +210,9 @@ void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) {
void ThreadStatusUpdater::UnregisterThread() {
}
void ThreadStatusUpdater::ResetThreadStatus() {
}
void ThreadStatusUpdater::SetThreadType(
ThreadStatus::ThreadType ttype) {
}
......
......@@ -64,13 +64,24 @@ struct ConstantColumnFamilyInfo {
// status of a thread using a set of atomic pointers.
struct ThreadStatusData {
#if ROCKSDB_USING_THREAD_STATUS
explicit ThreadStatusData() : thread_id(0) {
explicit ThreadStatusData() : thread_id(0), enable_tracking(false) {
thread_type.store(ThreadStatus::USER);
cf_key.store(0);
cf_key.store(nullptr);
operation_type.store(ThreadStatus::OP_UNKNOWN);
state_type.store(ThreadStatus::STATE_UNKNOWN);
}
uint64_t thread_id;
// A flag to indicate whether the thread tracking is enabled
// in the current thread. This value will be updated based on whether
// the associated Options::enable_thread_tracking is set to true
// in ThreadStatusUtil::SetColumnFamily().
//
// If set to false, then SetThreadOperation and SetThreadState
// will be no-op.
bool enable_tracking;
std::atomic<ThreadStatus::ThreadType> thread_type;
std::atomic<const void*> cf_key;
std::atomic<ThreadStatus::OperationType> operation_type;
......@@ -96,6 +107,10 @@ class ThreadStatusUpdater {
// Unregister the current thread.
void UnregisterThread();
// Reset the status of the current thread. This includes resetting
// ColumnFamilyInfoKey, ThreadOperation, and ThreadState.
void ResetThreadStatus();
// Set the thread type of the current thread.
void SetThreadType(ThreadStatus::ThreadType ttype);
......@@ -103,6 +118,9 @@ class ThreadStatusUpdater {
// its thread-local pointer of ThreadStateInfo to the correct entry.
void SetColumnFamilyInfoKey(const void* cf_key);
// returns the column family info key.
const void* GetColumnFamilyInfoKey();
// Update the thread operation of the current thread.
void SetThreadOperation(const ThreadStatus::OperationType type);
......@@ -143,7 +161,6 @@ class ThreadStatusUpdater {
bool check_exist);
protected:
#if ROCKSDB_USING_THREAD_STATUS
// The thread-local variable for storing thread status.
static __thread ThreadStatusData* thread_status_data_;
......@@ -169,6 +186,7 @@ class ThreadStatusUpdater {
// associated to the same db_key faster.
std::unordered_map<
const void*, std::unordered_set<const void*>> db_key_map_;
#else
static ThreadStatusData* thread_status_data_;
#endif // ROCKSDB_USING_THREAD_STATUS
......
......@@ -7,9 +7,11 @@
#include "util/thread_status_updater.h"
#include "db/column_family.h"
#if ROCKSDB_USING_THREAD_STATUS
namespace rocksdb {
#ifndef NDEBUG
#if ROCKSDB_USING_THREAD_STATUS
void ThreadStatusUpdater::TEST_VerifyColumnFamilyInfoMap(
const std::vector<ColumnFamilyHandle*>& handles,
bool check_exist) {
......@@ -29,5 +31,16 @@ void ThreadStatusUpdater::TEST_VerifyColumnFamilyInfoMap(
}
}
}
} // namespace rocksdb
#else
void ThreadStatusUpdater::TEST_VerifyColumnFamilyInfoMap(
const std::vector<ColumnFamilyHandle*>& handles,
bool check_exist) {
}
#endif // ROCKSDB_USING_THREAD_STATUS
#endif // !NDEBUG
} // namespace rocksdb
......@@ -9,6 +9,7 @@
namespace rocksdb {
#if ROCKSDB_USING_THREAD_STATUS
__thread ThreadStatusUpdater*
ThreadStatusUtil::thread_updater_local_cache_ = nullptr;
......@@ -36,7 +37,41 @@ void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* cfd) {
return;
}
assert(thread_updater_local_cache_);
thread_updater_local_cache_->SetColumnFamilyInfoKey(cfd);
if (cfd != nullptr && cfd->options()->enable_thread_tracking) {
thread_updater_local_cache_->SetColumnFamilyInfoKey(cfd);
} else {
// When cfd == nullptr or enable_thread_tracking == false, we set
// ColumnFamilyInfoKey to nullptr, which makes SetThreadOperation
// and SetThreadState become no-op.
thread_updater_local_cache_->SetColumnFamilyInfoKey(nullptr);
}
}
void ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType op) {
if (thread_updater_local_cache_ == nullptr) {
// thread_updater_local_cache_ must be set in SetColumnFamily
// or other ThreadStatusUtil functions.
return;
}
thread_updater_local_cache_->SetThreadOperation(op);
}
void ThreadStatusUtil::SetThreadState(ThreadStatus::StateType state) {
if (thread_updater_local_cache_ == nullptr) {
// thread_updater_local_cache_ must be set in SetColumnFamily
// or other ThreadStatusUtil functions.
return;
}
thread_updater_local_cache_->SetThreadState(state);
}
void ThreadStatusUtil::ResetThreadStatus() {
if (thread_updater_local_cache_ == nullptr) {
return;
}
thread_updater_local_cache_->ResetThreadStatus();
}
void ThreadStatusUtil::NewColumnFamilyInfo(
......@@ -86,6 +121,12 @@ bool ThreadStatusUtil::MaybeInitThreadLocalUpdater(const Env* env) {
void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* cfd) {
}
void ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType op) {
}
void ThreadStatusUtil::SetThreadState(ThreadStatus::StateType state) {
}
void ThreadStatusUtil::NewColumnFamilyInfo(
const DB* db, const ColumnFamilyData* cfd) {
}
......@@ -97,6 +138,9 @@ void ThreadStatusUtil::EraseColumnFamilyInfo(
void ThreadStatusUtil::EraseDatabaseInfo(const DB* db) {
}
void ThreadStatusUtil::ResetThreadStatus() {
}
#endif // ROCKSDB_USING_THREAD_STATUS
} // namespace rocksdb
......@@ -12,6 +12,7 @@
namespace rocksdb {
// The static utility class for updating thread-local status.
//
// The thread-local status is updated via the thread-local cached
......@@ -52,6 +53,22 @@ class ThreadStatusUtil {
// something related to the specified column family.
static void SetColumnFamily(const ColumnFamilyData* cfd);
static void SetThreadOperation(ThreadStatus::OperationType type);
static void SetThreadState(ThreadStatus::StateType type);
static void ResetThreadStatus();
#ifndef NDEBUG
static void TEST_SetOperationDelay(
const ThreadStatus::OperationType operation, int micro);
static void TEST_OperationDelay(
const ThreadStatus::OperationType operation);
static void TEST_SetStateDelay(
const ThreadStatus::StateType state, int micro);
static void TEST_StateDelay(const ThreadStatus::StateType state);
#endif
protected:
// Initialize the thread-local ThreadStatusUpdater when it finds
// the cached value is nullptr. Returns true if it has cached
......
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "rocksdb/env.h"
#include "util/thread_status_updater.h"
#include "util/thread_status_util.h"
namespace rocksdb {
#ifndef NDEBUG
// the delay for debugging purpose.
static int operations_delay[ThreadStatus::NUM_OP_TYPES] ={0};
static int states_delay[ThreadStatus::NUM_STATE_TYPES] = {0};
void ThreadStatusUtil::TEST_SetStateDelay(
const ThreadStatus::StateType state, int micro) {
states_delay[state] = micro;
}
void ThreadStatusUtil::TEST_StateDelay(
const ThreadStatus::StateType state) {
Env::Default()->SleepForMicroseconds(
states_delay[state]);
}
void ThreadStatusUtil::TEST_SetOperationDelay(
const ThreadStatus::OperationType operation, int micro) {
operations_delay[operation] = micro;
}
void ThreadStatusUtil::TEST_OperationDelay(
const ThreadStatus::OperationType operation) {
Env::Default()->SleepForMicroseconds(
operations_delay[operation]);
}
#endif // !NDEBUG
} // namespace rocksdb
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册