提交 67412e05 编写于 作者: Z ZhaoMing

Enable fiber compaction

上级 40c09e40
......@@ -2473,8 +2473,6 @@ void rocksdb_options_set_max_background_jobs(rocksdb_options_t* opt, int n) {
}
void rocksdb_options_set_max_task_per_thread(rocksdb_options_t* opt, int n) {
// NOT released
n = 1;
opt->rep.max_task_per_thread = n;
}
......
......@@ -1332,8 +1332,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
size_t yield_count = 0;
while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
if (max_task_per_thread_ > 1 && ++yield_count % 128 == 0) {
// NOT released
//boost::this_fiber::yield();
boost::this_fiber::yield();
}
// Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
// returns true.
......@@ -1652,8 +1651,7 @@ void CompactionJob::ProcessGarbageCollection(SubcompactionState* sub_compact) {
while (status.ok() && !cfd->IsDropped() && input->Valid()) {
++counter.input;
if (max_task_per_thread_ > 1 && counter.input % 32 == 0) {
// NOT released
//boost::this_fiber::yield();
boost::this_fiber::yield();
}
Slice curr_key = input->key();
uint64_t curr_file_number = uint64_t(-1);
......
......@@ -912,8 +912,6 @@ Status DBImpl::SetDBOptions(
s = GetMutableDBOptionsFromStrings(mutable_db_options_, options_map,
&new_options);
if (s.ok()) {
// NOT released
new_options.max_task_per_thread = 1;
auto bg_job_limits = DBImpl::GetBGJobLimits(
immutable_db_options_.max_background_flushes,
new_options.max_background_compactions,
......
......@@ -70,8 +70,6 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes,
Env::Priority::HIGH);
// NOT released
result.max_task_per_thread = 1;
if (result.max_task_per_thread < 1) {
result.max_task_per_thread = 1;
}
......@@ -218,10 +216,6 @@ static Status ValidateOptions(
"More than four DB paths are not supported yet. ");
}
if (db_options.max_task_per_thread != 1) {
return Status::InvalidArgument("NOT released");
}
if (db_options.max_task_per_thread > 4 ||
db_options.max_task_per_thread < 1) {
return Status::InvalidArgument(
......
......@@ -547,7 +547,7 @@ struct DBOptions {
int max_background_jobs = 2;
// Max task per BG threads
// valid [1 , 4]
// valid [1 , 8]
int max_task_per_thread = 1;
// NOT SUPPORTED ANYMORE: RocksDB automatically decides this based on the
......
......@@ -31,7 +31,7 @@ void InstrumentedMutex::LockInternal() {
#ifndef NDEBUG
ThreadStatusUtil::TEST_StateDelay(ThreadStatus::STATE_MUTEX_WAIT);
#endif
mutex_.Lock();
mutex_.lock();
}
void InstrumentedCondVar::Wait() {
......@@ -45,7 +45,8 @@ void InstrumentedCondVar::WaitInternal() {
#ifndef NDEBUG
ThreadStatusUtil::TEST_StateDelay(ThreadStatus::STATE_MUTEX_WAIT);
#endif
cond_.Wait();
std::unique_lock<boost::fibers::mutex> lock(*mutex_);
cond_.wait(lock);
}
bool InstrumentedCondVar::TimedWait(uint64_t abs_time_us) {
......@@ -63,7 +64,9 @@ bool InstrumentedCondVar::TimedWaitInternal(uint64_t abs_time_us) {
TEST_SYNC_POINT_CALLBACK("InstrumentedCondVar::TimedWaitInternal",
&abs_time_us);
return cond_.TimedWait(abs_time_us);
std::unique_lock<boost::fibers::mutex> lock(*mutex_);
return cond_.wait_for(lock, std::chrono::microseconds(abs_time_us)) ==
boost::fibers::cv_status::timeout;
}
} // namespace rocksdb
......@@ -6,7 +6,7 @@
#pragma once
#include "monitoring/statistics.h"
#include "port/port.h"
#include <boost/fiber/all.hpp>
#include "rocksdb/env.h"
#include "rocksdb/statistics.h"
#include "rocksdb/thread_status.h"
......@@ -19,30 +19,30 @@ class InstrumentedCondVar;
// for collecting stats and instrumentation.
class InstrumentedMutex {
public:
explicit InstrumentedMutex(bool adaptive = false)
: mutex_(adaptive), stats_(nullptr), env_(nullptr),
explicit InstrumentedMutex(bool /*adaptive*/ = false)
: mutex_(), stats_(nullptr), env_(nullptr),
stats_code_(0) {}
InstrumentedMutex(
Statistics* stats, Env* env,
int stats_code, bool adaptive = false)
: mutex_(adaptive), stats_(stats), env_(env),
int stats_code, bool /*adaptive*/ = false)
: mutex_(), stats_(stats), env_(env),
stats_code_(stats_code) {}
void Lock();
void Unlock() {
mutex_.Unlock();
mutex_.unlock();
}
void AssertHeld() {
mutex_.AssertHeld();
assert(!mutex_.try_lock());
}
private:
void LockInternal();
friend class InstrumentedCondVar;
port::Mutex mutex_;
boost::fibers::mutex mutex_;
Statistics* stats_;
Env* env_;
int stats_code_;
......@@ -69,7 +69,8 @@ class InstrumentedMutexLock {
class InstrumentedCondVar {
public:
explicit InstrumentedCondVar(InstrumentedMutex* instrumented_mutex)
: cond_(&(instrumented_mutex->mutex_)),
: cond_(),
mutex_(&instrumented_mutex->mutex_),
stats_(instrumented_mutex->stats_),
env_(instrumented_mutex->env_),
stats_code_(instrumented_mutex->stats_code_) {}
......@@ -79,17 +80,18 @@ class InstrumentedCondVar {
bool TimedWait(uint64_t abs_time_us);
void Signal() {
cond_.Signal();
cond_.notify_one();
}
void SignalAll() {
cond_.SignalAll();
cond_.notify_all();
}
private:
void WaitInternal();
bool TimedWaitInternal(uint64_t abs_time_us);
port::CondVar cond_;
boost::fibers::condition_variable cond_;
boost::fibers::mutex* mutex_;
Statistics* stats_;
Env* env_;
int stats_code_;
......
......@@ -363,8 +363,7 @@ void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(
return;
}
if (max_task_per_thread > 0) {
// NOT released
//max_task_per_thread_ = max_task_per_thread;
max_task_per_thread_ = max_task_per_thread;
}
if (num >= 0 && (num > total_threads_limit_ ||
(num < total_threads_limit_ && allow_reduce))) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册