From 09899f0b51977366b92dc2e71cd77aa5fa6e9836 Mon Sep 17 00:00:00 2001 From: sdong Date: Mon, 3 Nov 2014 14:11:33 -0800 Subject: [PATCH] DB::Open() to automatically increase thread pool size if it is smaller than max number of parallel compactions or flushes Summary: With the patch, thread pool size will be automatically increased if DB's options ask for more parallelism of compactions or flushes. Too many users have been confused by the API. Change it to make it harder for users to make mistakes Test Plan: Add two unit tests to cover the function. Reviewers: yhchiang, rven, igor, MarkCallaghan, ljin Reviewed By: ljin Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D27555 --- HISTORY.md | 2 +- db/db_impl.cc | 4 ++++ db/db_test.cc | 41 +++++++++++++++++++++++++++++++++++++++++ hdfs/env_hdfs.h | 6 +++++- include/rocksdb/env.h | 10 ++++++++++ util/env_posix.cc | 19 +++++++++++++++++-- util/env_test.cc | 38 ++++++++++++++++++++++++++++++++++++-- 7 files changed, 114 insertions(+), 6 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index b72bce080..7c0b5a9b8 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,7 +5,7 @@ ### Public API changes * Introduce 4 new convenient functions for converting Options from string: GetColumnFamilyOptionsFromMap(), GetColumnFamilyOptionsFromString(), GetDBOptionsFromMap(), GetDBOptionsFromString() * Remove WriteBatchWithIndex.Delete() overloads using SliceParts - +* When opening a DB, if options.max_background_compactions is larger than the existing low pri pool of options.env, it will enlarge it. Similarly, options.max_background_flushes is larger than the existing high pri pool of options.env, it will enlarge it. ## 3.6.0 (10/7/2014) ### Disk format changes diff --git a/db/db_impl.cc b/db/db_impl.cc index 2fbd40637..98caf98b5 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -118,6 +118,10 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { result.info_log = nullptr; } } + result.env->IncBackgroundThreadsIfNeeded(src.max_background_compactions, + Env::Priority::LOW); + result.env->IncBackgroundThreadsIfNeeded(src.max_background_flushes, + Env::Priority::HIGH); if (!result.rate_limiter) { if (result.bytes_per_sync == 0) { diff --git a/db/db_test.cc b/db/db_test.cc index 4807ef121..cd6cd5862 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -412,6 +412,8 @@ class DBTest { mem_env_(!getenv("MEM_ENV") ? nullptr : new MockEnv(Env::Default())), env_(new SpecialEnv(mem_env_ ? mem_env_ : Env::Default())) { + env_->SetBackgroundThreads(1, Env::LOW); + env_->SetBackgroundThreads(1, Env::HIGH); dbname_ = test::TmpDir(env_) + "/db_test"; auto options = CurrentOptions(); ASSERT_OK(DestroyDB(dbname_, options)); @@ -8193,6 +8195,45 @@ TEST(DBTest, TableOptionsSanitizeTest) { ASSERT_OK(TryReopen(options)); } +TEST(DBTest, SanitizeNumThreads) { + for (int attempt = 0; attempt < 2; attempt++) { + const size_t kTotalTasks = 8; + SleepingBackgroundTask sleeping_tasks[kTotalTasks]; + + Options options = CurrentOptions(); + if (attempt == 0) { + options.max_background_compactions = 3; + options.max_background_flushes = 2; + } + options.create_if_missing = true; + DestroyAndReopen(options); + + for (size_t i = 0; i < kTotalTasks; i++) { + // Insert 5 tasks to low priority queue and 5 tasks to high priority queue + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_tasks[i], + (i < 4) ? Env::Priority::LOW : Env::Priority::HIGH); + } + + // Wait 100 milliseconds for they are scheduled. + env_->SleepForMicroseconds(100000); + + // pool size 3, total task 4. Queue size should be 1. + ASSERT_EQ(1U, options.env->GetThreadPoolQueueLen(Env::Priority::LOW)); + // pool size 2, total task 4. Queue size should be 2. + ASSERT_EQ(2U, options.env->GetThreadPoolQueueLen(Env::Priority::HIGH)); + + for (size_t i = 0; i < kTotalTasks; i++) { + sleeping_tasks[i].WakeUp(); + sleeping_tasks[i].WaitUntilDone(); + } + + ASSERT_OK(Put("abc", "def")); + ASSERT_EQ("def", Get("abc")); + Flush(); + ASSERT_EQ("def", Get("abc")); + } +} + TEST(DBTest, DBIteratorBoundTest) { Options options = CurrentOptions(); options.env = env_; diff --git a/hdfs/env_hdfs.h b/hdfs/env_hdfs.h index 5e7de77d3..82f317f73 100644 --- a/hdfs/env_hdfs.h +++ b/hdfs/env_hdfs.h @@ -145,6 +145,10 @@ class HdfsEnv : public Env { posixEnv->SetBackgroundThreads(number, pri); } + virtual void IncBackgroundThreadsIfNeeded(int number, Priority pri) override { + posixEnv->IncBackgroundThreadsIfNeeded(number, pri); + } + virtual std::string TimeToString(uint64_t number) { return posixEnv->TimeToString(number); } @@ -319,7 +323,7 @@ class HdfsEnv : public Env { std::string* outputpath) {return notsup;} virtual void SetBackgroundThreads(int number, Priority pri = LOW) {} - + virtual void IncBackgroundThreadsIfNeeded(int number, Priority pri) {} virtual std::string TimeToString(uint64_t number) { return "";} }; } diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 70244bb31..e002fede1 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -272,6 +272,11 @@ class Env { // default number: 1 virtual void SetBackgroundThreads(int number, Priority pri = LOW) = 0; + // Enlarge number of background worker threads of a specific thread pool + // for this environment if it is smaller than specified. 'LOW' is the default + // pool. + virtual void IncBackgroundThreadsIfNeeded(int number, Priority pri) = 0; + // Lower IO priority for threads from the specified pool. virtual void LowerThreadPoolIOPriority(Priority pool = LOW) {} @@ -782,6 +787,11 @@ class EnvWrapper : public Env { void SetBackgroundThreads(int num, Priority pri) { return target_->SetBackgroundThreads(num, pri); } + + void IncBackgroundThreadsIfNeeded(int num, Priority pri) { + return target_->IncBackgroundThreadsIfNeeded(num, pri); + } + void LowerThreadPoolIOPriority(Priority pool = LOW) override { target_->LowerThreadPoolIOPriority(pool); } diff --git a/util/env_posix.cc b/util/env_posix.cc index e44ebc83e..b9987088c 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -1422,6 +1422,12 @@ class PosixEnv : public Env { thread_pools_[pri].SetBackgroundThreads(num); } + // Allow increasing the number of worker threads. + virtual void IncBackgroundThreadsIfNeeded(int num, Priority pri) { + assert(pri >= Priority::LOW && pri <= Priority::HIGH); + thread_pools_[pri].IncBackgroundThreadsIfNeeded(num); + } + virtual void LowerThreadPoolIOPriority(Priority pool = LOW) override { assert(pool >= Priority::LOW && pool <= Priority::HIGH); #ifdef OS_LINUX @@ -1642,13 +1648,14 @@ class PosixEnv : public Env { PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_)); } - void SetBackgroundThreads(int num) { + void SetBackgroundThreadsInternal(int num, bool allow_reduce) { PthreadCall("lock", pthread_mutex_lock(&mu_)); if (exit_all_threads_) { PthreadCall("unlock", pthread_mutex_unlock(&mu_)); return; } - if (num != total_threads_limit_) { + if (num > total_threads_limit_ || + (num < total_threads_limit_ && allow_reduce)) { total_threads_limit_ = num; WakeUpAllThreads(); StartBGThreads(); @@ -1657,6 +1664,14 @@ class PosixEnv : public Env { PthreadCall("unlock", pthread_mutex_unlock(&mu_)); } + void IncBackgroundThreadsIfNeeded(int num) { + SetBackgroundThreadsInternal(num, false); + } + + void SetBackgroundThreads(int num) { + SetBackgroundThreadsInternal(num, true); + } + void StartBGThreads() { // Start background thread if necessary while ((int)bgthreads_.size() < total_threads_limit_) { diff --git a/util/env_test.cc b/util/env_test.cc index 3bb4fb68c..54e52069a 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -144,7 +144,7 @@ TEST(EnvPosixTest, TwoPools) { std::cout << "Pool " << pool_name_ << ": " << num_running_ << " running threads.\n"; // make sure we don't have more than pool_size_ jobs running. - ASSERT_LE(num_running_, pool_size_); + ASSERT_LE(num_running_, pool_size_.load()); } // sleep for 1 sec @@ -162,11 +162,16 @@ TEST(EnvPosixTest, TwoPools) { return num_finished_; } + void Reset(int pool_size) { + pool_size_.store(pool_size); + num_finished_ = 0; + } + private: port::Mutex mu_; int num_running_; int num_finished_; - int pool_size_; + std::atomic pool_size_; std::string pool_name_; }; @@ -205,6 +210,35 @@ TEST(EnvPosixTest, TwoPools) { ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::LOW)); ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + + // call IncBackgroundThreadsIfNeeded to two pools. One increasing and + // the other decreasing + env_->IncBackgroundThreadsIfNeeded(kLowPoolSize - 1, Env::Priority::LOW); + env_->IncBackgroundThreadsIfNeeded(kHighPoolSize + 1, Env::Priority::HIGH); + high_pool_job.Reset(kHighPoolSize + 1); + low_pool_job.Reset(kLowPoolSize); + + // schedule same number of jobs in each pool + for (int i = 0; i < kJobs; i++) { + env_->Schedule(&CB::Run, &low_pool_job); + env_->Schedule(&CB::Run, &high_pool_job, Env::Priority::HIGH); + } + // Wait a short while for the jobs to be dispatched. + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize), + env_->GetThreadPoolQueueLen()); + ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize), + env_->GetThreadPoolQueueLen(Env::Priority::LOW)); + ASSERT_EQ((unsigned int)(kJobs - (kHighPoolSize + 1)), + env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + + // wait for all jobs to finish + while (low_pool_job.NumFinished() < kJobs || + high_pool_job.NumFinished() < kJobs) { + env_->SleepForMicroseconds(kDelayMicros); + } + + env_->SetBackgroundThreads(kHighPoolSize, Env::Priority::HIGH); } TEST(EnvPosixTest, DecreaseNumBgThreads) { -- GitLab