diff --git a/db/write_controller.cc b/db/write_controller.cc index d46d8d3ddca84673e615ead238603528b6db5343..a5842d18be8af3bf4dcdd407f061989e43781f73 100644 --- a/db/write_controller.cc +++ b/db/write_controller.cc @@ -7,6 +7,7 @@ #include #include +#include #include "rocksdb/env.h" namespace rocksdb { @@ -56,7 +57,7 @@ uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) { } // The frequency to get time inside DB mutex is less than one per refill // interval. - auto time_now = env->NowMicros(); + auto time_now = NowMicrosMonotonic(env); uint64_t sleep_debt = 0; uint64_t time_since_last_refill = 0; @@ -103,6 +104,10 @@ uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) { return sleep_amount; } +uint64_t WriteController::NowMicrosMonotonic(Env* env) { + return env->NowNanos() / std::milli::den; +} + StopWriteToken::~StopWriteToken() { assert(controller_->total_stopped_ >= 1); --controller_->total_stopped_; diff --git a/db/write_controller.h b/db/write_controller.h index 36a90590262bfe501705e808c96898b2c3dd2255..b84092ca6f7d0ed7b6b71995e290dc23d2881429 100644 --- a/db/write_controller.h +++ b/db/write_controller.h @@ -77,6 +77,9 @@ class WriteController { uint64_t max_delayed_write_rate() const { return max_delayed_write_rate_; } + private: + uint64_t NowMicrosMonotonic(Env* env); + private: friend class WriteControllerToken; friend class StopWriteToken; diff --git a/db/write_controller_test.cc b/db/write_controller_test.cc index ae890467fb4b878e8a6e6b0b3b711b4e6eaac202..737386fce00087d0e5c8db7162ad7c59365556bf 100644 --- a/db/write_controller_test.cc +++ b/db/write_controller_test.cc @@ -3,6 +3,8 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. // +#include + #include "db/write_controller.h" #include "rocksdb/env.h" @@ -16,7 +18,7 @@ class TimeSetEnv : public EnvWrapper { public: explicit TimeSetEnv() : EnvWrapper(nullptr) {} uint64_t now_micros_ = 6666; - virtual uint64_t NowMicros() override { return now_micros_; } + virtual uint64_t NowNanos() override { return now_micros_ * std::milli::den; } }; TEST_F(WriteControllerTest, ChangeDelayRateTest) { diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index d90124146907efc5311dd5f1a572c01726ba72bd..9ebba61c3a4d0bde0cfbfd3acbb037ac06eb7c6e 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -318,15 +318,16 @@ class Env { virtual Status NewLogger(const std::string& fname, shared_ptr* result) = 0; - // Returns the number of micro-seconds since some fixed point in time. Only - // useful for computing deltas of time. - // However, it is often used as system time such as in GenericRateLimiter + // Returns the number of micro-seconds since some fixed point in time. + // It is often used as system time such as in GenericRateLimiter // and other places so a port needs to return system time in order to work. virtual uint64_t NowMicros() = 0; // Returns the number of nano-seconds since some fixed point in time. Only // useful for computing deltas of time in one run. - // Default implementation simply relies on NowMicros + // Default implementation simply relies on NowMicros. + // In platform-specific implementations, NowNanos() should return time points + // that are MONOTONIC. virtual uint64_t NowNanos() { return NowMicros() * 1000; } @@ -982,6 +983,7 @@ class EnvWrapper : public Env { return target_->NewLogger(fname, result); } uint64_t NowMicros() override { return target_->NowMicros(); } + void SleepForMicroseconds(int micros) override { target_->SleepForMicroseconds(micros); } diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index 9f2a84e43616802ba7981a598b1b5b8fceadc7fb..f06c550cda735f9fb77a1e9052d8f6bb72051716 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -36,7 +36,7 @@ GenericRateLimiter::GenericRateLimiter(int64_t rate_bytes_per_sec, exit_cv_(&request_mutex_), requests_to_wait_(0), available_bytes_(0), - next_refill_us_(env_->NowMicros()), + next_refill_us_(NowMicrosMonotonic(env_)), fairness_(fairness > 100 ? 100 : fairness), rnd_((uint32_t)time(nullptr)), leader_(nullptr) { @@ -107,7 +107,14 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri) { (!queue_[Env::IO_LOW].empty() && &r == queue_[Env::IO_LOW].front()))) { leader_ = &r; - timedout = r.cv.TimedWait(next_refill_us_); + int64_t delta = next_refill_us_ - NowMicrosMonotonic(env_); + delta = delta > 0 ? delta : 0; + if (delta == 0) { + timedout = true; + } else { + int64_t wait_until = env_->NowMicros() + delta; + timedout = r.cv.TimedWait(wait_until); + } } else { // Not at the front of queue or an leader has already been elected r.cv.Wait(); @@ -178,7 +185,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri) { void GenericRateLimiter::Refill() { TEST_SYNC_POINT("GenericRateLimiter::Refill"); - next_refill_us_ = env_->NowMicros() + refill_period_us_; + next_refill_us_ = NowMicrosMonotonic(env_) + refill_period_us_; // Carry over the left over quota from the last period auto refill_bytes_per_period = refill_bytes_per_period_.load(std::memory_order_relaxed); diff --git a/util/rate_limiter.h b/util/rate_limiter.h index ddeaeba10b6d6a4fc6f19286f77bcffe8c76925e..896dc595d97ad167468c204652fdfaef832b6c21 100644 --- a/util/rate_limiter.h +++ b/util/rate_limiter.h @@ -61,6 +61,9 @@ class GenericRateLimiter : public RateLimiter { private: void Refill(); int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec); + uint64_t NowMicrosMonotonic(Env* env) { + return env->NowNanos() / std::milli::den; + } // This mutex guard all internal states mutable port::Mutex request_mutex_;