提交 e60bc99f 编写于 作者: Y Yueh-Hsuan Chiang

Allow GetThreadList to reflect flush activity.

Summary: Allow GetThreadList to reflect flush activity.

Test Plan:
Developed ThreadStatusFlush test and updated ThreadStatusMultiCompaction test.

./db_test  ./thread_list_test

Reviewers: sdong, rven, igor

Reviewed By: igor

Subscribers: dhruba

Differential Revision: https://reviews.facebook.net/D32871
上级 b9a0213c
......@@ -275,9 +275,7 @@ Status CompactionJob::Run() {
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
ThreadStatusUtil::SetColumnFamily(cfd);
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
#ifndef NDEBUG
ThreadStatusUtil::TEST_OperationDelay(ThreadStatus::OP_COMPACTION);
#endif
TEST_SYNC_POINT("CompactionJob::Run:Start");
const uint64_t start_micros = env_->NowMicros();
std::unique_ptr<Iterator> input(
......@@ -467,6 +465,7 @@ Status CompactionJob::Run() {
RecordCompactionIOStats();
LogFlush(db_options_.info_log);
TEST_SYNC_POINT("CompactionJob::Run:End");
ThreadStatusUtil::ResetThreadStatus();
return status;
}
......
......@@ -2216,9 +2216,6 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
// TODO(yhchiang): add op details for showing trivial-move.
ThreadStatusUtil::SetColumnFamily(c->column_family_data());
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
#ifndef NDEBUG
ThreadStatusUtil::TEST_OperationDelay(ThreadStatus::OP_COMPACTION);
#endif
// Move file to next level
assert(c->num_input_files(0) == 1);
......
......@@ -9653,6 +9653,21 @@ TEST(DBTest, DynamicMemtableOptions) {
}
#if ROCKSDB_USING_THREAD_STATUS
namespace {
void VerifyOperationCount(Env* env, ThreadStatus::OperationType op_type,
int expected_count) {
int op_count = 0;
std::vector<ThreadStatus> thread_list;
ASSERT_OK(env->GetThreadList(&thread_list));
for (auto thread : thread_list) {
if (thread.operation_type == op_type) {
op_count++;
}
}
ASSERT_EQ(op_count, expected_count);
}
} // namespace
TEST(DBTest, GetThreadStatus) {
Options options;
options.env = env_;
......@@ -9723,6 +9738,38 @@ TEST(DBTest, DisableThreadStatus) {
handles_, false);
}
TEST(DBTest, ThreadStatusFlush) {
Options options;
options.env = env_;
options.write_buffer_size = 100000; // Small write buffer
options.enable_thread_tracking = true;
options = CurrentOptions(options);
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"FlushJob::Run:Start", "DBTest::ThreadStatusFlush:1"},
{"DBTest::ThreadStatusFlush:2", "FlushJob::Run:End"},
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
CreateAndReopenWithCF({"pikachu"}, options);
VerifyOperationCount(env_, ThreadStatus::OP_FLUSH, 0);
ASSERT_OK(Put(1, "foo", "v1"));
ASSERT_EQ("v1", Get(1, "foo"));
VerifyOperationCount(env_, ThreadStatus::OP_FLUSH, 0);
Put(1, "k1", std::string(100000, 'x')); // Fill memtable
VerifyOperationCount(env_, ThreadStatus::OP_FLUSH, 0);
Put(1, "k2", std::string(100000, 'y')); // Trigger flush
// wait for flush to be scheduled
env_->SleepForMicroseconds(250000);
TEST_SYNC_POINT("DBTest::ThreadStatusFlush:1");
VerifyOperationCount(env_, ThreadStatus::OP_FLUSH, 1);
TEST_SYNC_POINT("DBTest::ThreadStatusFlush:2");
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
TEST(DBTest, ThreadStatusSingleCompaction) {
const int kTestKeySize = 16;
const int kTestValueSize = 984;
......@@ -9741,14 +9788,15 @@ TEST(DBTest, ThreadStatusSingleCompaction) {
options.enable_thread_tracking = true;
const int kNumL0Files = 4;
options.level0_file_num_compaction_trigger = kNumL0Files;
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"CompactionJob::Run:Start", "DBTest::ThreadStatusSingleCompaction:1"},
{"DBTest::ThreadStatusSingleCompaction:2", "CompactionJob::Run:End"},
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
for (int tests = 0; tests < 2; ++tests) {
TryReopen(options);
// Each compaction will run at least 2 seconds, which allows
// the test to capture the status of compaction with fewer
// false alarm.
const int kCompactionDelayMicro = 2000000;
ThreadStatusUtil::TEST_SetOperationDelay(
ThreadStatus::OP_COMPACTION, kCompactionDelayMicro);
Random rnd(301);
for (int key = kEntriesPerBuffer * kNumL0Files; key >= 0; --key) {
......@@ -9756,33 +9804,22 @@ TEST(DBTest, ThreadStatusSingleCompaction) {
}
// wait for compaction to be scheduled
env_->SleepForMicroseconds(500000);
// check how many threads are doing compaction using GetThreadList
std::vector<ThreadStatus> thread_list;
Status s = env_->GetThreadList(&thread_list);
ASSERT_OK(s);
int compaction_count = 0;
for (auto thread : thread_list) {
if (thread.operation_type == ThreadStatus::OP_COMPACTION) {
compaction_count++;
}
}
env_->SleepForMicroseconds(250000);
TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:1");
if (options.enable_thread_tracking) {
// expecting one single L0 to L1 compaction
ASSERT_EQ(compaction_count, 1);
VerifyOperationCount(env_, ThreadStatus::OP_COMPACTION, 1);
} else {
// If thread tracking is not enabled, compaction count should be 0.
ASSERT_EQ(compaction_count, 0);
VerifyOperationCount(env_, ThreadStatus::OP_COMPACTION, 0);
}
ThreadStatusUtil::TEST_SetOperationDelay(
ThreadStatus::OP_COMPACTION, 0);
TEST_SYNC_POINT("DBTest::ThreadStatusSingleCompaction:2");
// repeat the test with disabling thread tracking.
options.enable_thread_tracking = false;
}
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
TEST(DBTest, ThreadStatusMultipleCompaction) {
......@@ -9812,53 +9849,58 @@ TEST(DBTest, ThreadStatusMultipleCompaction) {
options.max_bytes_for_level_multiplier = 2;
options.max_background_compactions = kLowPriCount;
for (int tests = 0; tests < 2; ++tests) {
TryReopen(options);
Random rnd(301);
TryReopen(options);
Random rnd(301);
int max_compaction_count = 0;
std::vector<ThreadStatus> thread_list;
const int kCompactionDelayMicro = 20000;
ThreadStatusUtil::TEST_SetOperationDelay(
ThreadStatus::OP_COMPACTION, kCompactionDelayMicro);
// Make rocksdb busy
int key = 0;
for (int file = 0; file < 64 * kNumL0Files; ++file) {
for (int k = 0; k < kEntriesPerBuffer; ++k) {
ASSERT_OK(Put(ToString(key++), RandomString(&rnd, kTestValueSize)));
}
std::vector<ThreadStatus> thread_list;
// Delay both flush and compaction
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"FlushJob::Run:Start",
"CompactionJob::Run:Start"},
{"CompactionJob::Run:Start",
"DBTest::ThreadStatusMultipleCompaction:GetThreadList"},
{"DBTest::ThreadStatusMultipleCompaction:VerifyStatus",
"CompactionJob::Run:End"},
{"CompactionJob::Run:End",
"FlushJob::Run:End"}});
// Make rocksdb busy
int key = 0;
int max_operation_count[ThreadStatus::NUM_OP_TYPES] = {0};
for (int file = 0; file < 64 * kNumL0Files; ++file) {
for (int k = 0; k < kEntriesPerBuffer; ++k) {
ASSERT_OK(Put(ToString(key++), RandomString(&rnd, kTestValueSize)));
}
// check how many threads are doing compaction using GetThreadList
int compaction_count = 0;
Status s = env_->GetThreadList(&thread_list);
for (auto thread : thread_list) {
if (thread.operation_type == ThreadStatus::OP_COMPACTION) {
compaction_count++;
}
}
// check how many threads are doing compaction using GetThreadList
int operation_count[ThreadStatus::NUM_OP_TYPES] = {0};
TEST_SYNC_POINT(
"DBTest::ThreadStatusMultipleCompaction:GetThreadList");
Status s = env_->GetThreadList(&thread_list);
TEST_SYNC_POINT(
"DBTest::ThreadStatusMultipleCompaction:VerifyStatus");
for (auto thread : thread_list) {
operation_count[thread.operation_type]++;
}
// Record the max number of compactions at a time.
if (max_compaction_count < compaction_count) {
max_compaction_count = compaction_count;
// Record the max number of compactions at a time.
for (int i = 0; i < ThreadStatus::NUM_OP_TYPES; ++i) {
if (max_operation_count[i] < operation_count[i]) {
max_operation_count[i] = operation_count[i];
}
}
if (options.enable_thread_tracking) {
// Expect rocksdb to at least utilize 60% of the compaction threads.
ASSERT_GE(1.0 * max_compaction_count,
0.6 * options.max_background_compactions);
} else {
// If thread tracking is not enabled, compaction count should be 0.
ASSERT_EQ(max_compaction_count, 0);
// Speed up the test
if (max_operation_count[ThreadStatus::OP_FLUSH] > 1 &&
max_operation_count[ThreadStatus::OP_COMPACTION] >
0.6 * options.max_background_compactions) {
break;
}
// repeat the test with disabling thread tracking.
options.enable_thread_tracking = false;
}
ThreadStatusUtil::TEST_SetOperationDelay(
ThreadStatus::OP_COMPACTION, 0);
ASSERT_GE(max_operation_count[ThreadStatus::OP_FLUSH], 1);
// Expect rocksdb to at least utilize 60% of the compaction threads.
ASSERT_GE(1.0 * max_operation_count[ThreadStatus::OP_COMPACTION],
0.6 * options.max_background_compactions);
}
#endif // ROCKSDB_USING_THREAD_STATUS
......
......@@ -48,6 +48,7 @@
#include "util/iostats_context_imp.h"
#include "util/stop_watch.h"
#include "util/sync_point.h"
#include "util/thread_status_util.h"
namespace rocksdb {
......@@ -88,6 +89,11 @@ Status FlushJob::Run(uint64_t* file_number) {
return Status::OK();
}
// Update the thread status to indicate flush.
ThreadStatusUtil::SetColumnFamily(cfd_);
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
TEST_SYNC_POINT("FlushJob::Run:Start");
// entries mems are (implicitly) sorted in ascending order by their created
// time. We will use the first memtable's `edit` to keep the meta info for
// this flush.
......@@ -120,6 +126,9 @@ Status FlushJob::Run(uint64_t* file_number) {
if (s.ok() && file_number != nullptr) {
*file_number = fn;
}
TEST_SYNC_POINT("FlushJob::Run:End");
ThreadStatusUtil::ResetThreadStatus();
return s;
}
......
......@@ -60,16 +60,6 @@ class ThreadStatusUtil {
static void ResetThreadStatus();
#ifndef NDEBUG
static void TEST_SetOperationDelay(
const ThreadStatus::OperationType operation, int micro);
static void TEST_OperationDelay(
const ThreadStatus::OperationType operation);
static void TEST_SetStateDelay(
const ThreadStatus::StateType state, int micro);
static void TEST_StateDelay(const ThreadStatus::StateType state);
#endif
protected:
// Initialize the thread-local ThreadStatusUpdater when it finds
// the cached value is nullptr. Returns true if it has cached
......
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <atomic>
#include "rocksdb/env.h"
#include "util/thread_status_updater.h"
#include "util/thread_status_util.h"
namespace rocksdb {
#ifndef NDEBUG
// the delay for debugging purpose.
static std::atomic<int> operations_delay[ThreadStatus::NUM_OP_TYPES];
static std::atomic<int> states_delay[ThreadStatus::NUM_STATE_TYPES];
void ThreadStatusUtil::TEST_SetStateDelay(
const ThreadStatus::StateType state, int micro) {
states_delay[state].store(micro, std::memory_order_relaxed);
}
void ThreadStatusUtil::TEST_StateDelay(
const ThreadStatus::StateType state) {
Env::Default()->SleepForMicroseconds(
states_delay[state].load(std::memory_order_relaxed));
}
void ThreadStatusUtil::TEST_SetOperationDelay(
const ThreadStatus::OperationType operation, int micro) {
operations_delay[operation].store(micro, std::memory_order_relaxed);
}
void ThreadStatusUtil::TEST_OperationDelay(
const ThreadStatus::OperationType operation) {
Env::Default()->SleepForMicroseconds(
operations_delay[operation].load(std::memory_order_relaxed));
}
#endif // !NDEBUG
} // namespace rocksdb
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册