提交 1026e794 编写于 作者: A Andrew Kryczka 提交者: Facebook Github Bot

rate limit auto-tuning

Summary:
Dynamic adjustment of rate limit according to demand for background I/O. It increases by a factor when limiter is drained too frequently, and decreases by the same factor when limiter is not drained frequently enough. The parameters for this behavior are fixed in `GenericRateLimiter::Tune`. Other changes:

- make rate limiter's `Env*` configurable for testing
- track num drain intervals in RateLimiter so we don't have to rely on stats, which may be shared across different DB instances from the ones that share the RateLimiter.
Closes https://github.com/facebook/rocksdb/pull/2899

Differential Revision: D5858704

Pulled By: ajkr

fbshipit-source-id: cc2bac30f85e7f6fd63655d0a6732ef9ed7403b1
上级 75f7f42d
......@@ -5,6 +5,8 @@
### New Features
* `DBOptions::bytes_per_sync` and `DBOptions::wal_bytes_per_sync` can now be changed dynamically, `DBOptions::wal_bytes_per_sync` will flush all memtables and switch to a new WAL file.
* Support dynamic adjustment of rate limit according to demand for background I/O. It can be enabled by passing `true` to the `auto_tuned` parameter in `NewGenericRateLimiter()`. The value passed as `rate_bytes_per_sec` will still be respected as an upper-bound.
### Bug Fixes
* Fix a potential data inconsistency issue during point-in-time recovery. `DB:Open()` will abort if column family inconsistency is found during PIT recovery.
......
......@@ -1272,7 +1272,7 @@ env_test: env/env_test.o $(LIBOBJECTS) $(TESTHARNESS)
fault_injection_test: db/fault_injection_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
rate_limiter_test: util/rate_limiter_test.o $(LIBOBJECTS) $(TESTHARNESS)
rate_limiter_test: util/rate_limiter_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
delete_scheduler_test: util/delete_scheduler_test.o $(LIBOBJECTS) $(TESTHARNESS)
......
......@@ -127,9 +127,13 @@ class RateLimiter {
// 1/fairness chance even though high-pri requests exist to avoid starvation.
// You should be good by leaving it at default 10.
// @mode: Mode indicates which types of operations count against the limit.
// @auto_tuned: Enables dynamic adjustment of rate limit within the range
// `[rate_bytes_per_sec / 20, rate_bytes_per_sec]`, according to
// the recent demand for background I/O.
extern RateLimiter* NewGenericRateLimiter(
int64_t rate_bytes_per_sec, int64_t refill_period_us = 100 * 1000,
int32_t fairness = 10,
RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly);
RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly,
bool auto_tuned = false);
} // namespace rocksdb
......@@ -826,6 +826,10 @@ DEFINE_int32(rate_limit_delay_max_milliseconds, 1000,
DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value.");
DEFINE_bool(rate_limiter_auto_tuned, false,
"Enable dynamic adjustment of rate limit according to demand for "
"background I/O");
DEFINE_bool(rate_limit_bg_reads, false,
"Use options.rate_limiter on compaction reads");
......@@ -3257,7 +3261,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
FLAGS_rate_limiter_bytes_per_sec, 100 * 1000 /* refill_period_us */,
10 /* fairness */,
FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly
: RateLimiter::Mode::kWritesOnly));
: RateLimiter::Mode::kWritesOnly,
FLAGS_rate_limiter_auto_tuned));
}
if (FLAGS_num_multi_db <= 1) {
......
......@@ -45,13 +45,15 @@ struct GenericRateLimiter::Req {
GenericRateLimiter::GenericRateLimiter(int64_t rate_bytes_per_sec,
int64_t refill_period_us,
int32_t fairness, RateLimiter::Mode mode)
int32_t fairness, RateLimiter::Mode mode,
Env* env, bool auto_tuned)
: RateLimiter(mode),
refill_period_us_(refill_period_us),
rate_bytes_per_sec_(rate_bytes_per_sec),
rate_bytes_per_sec_(auto_tuned ? rate_bytes_per_sec / 2
: rate_bytes_per_sec),
refill_bytes_per_period_(
CalculateRefillBytesPerPeriod(rate_bytes_per_sec)),
env_(Env::Default()),
CalculateRefillBytesPerPeriod(rate_bytes_per_sec_)),
env_(env),
stop_(false),
exit_cv_(&request_mutex_),
requests_to_wait_(0),
......@@ -59,7 +61,12 @@ GenericRateLimiter::GenericRateLimiter(int64_t rate_bytes_per_sec,
next_refill_us_(NowMicrosMonotonic(env_)),
fairness_(fairness > 100 ? 100 : fairness),
rnd_((uint32_t)time(nullptr)),
leader_(nullptr) {
leader_(nullptr),
auto_tuned_(auto_tuned),
num_drains_(0),
prev_num_drains_(0),
max_bytes_per_sec_(rate_bytes_per_sec),
tuned_time_(NowMicrosMonotonic(env_)) {
total_requests_[0] = 0;
total_requests_[1] = 0;
total_bytes_through_[0] = 0;
......@@ -98,6 +105,16 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:1",
&rate_bytes_per_sec_);
MutexLock g(&request_mutex_);
if (auto_tuned_) {
static const int kRefillsPerTune = 100;
std::chrono::microseconds now(NowMicrosMonotonic(env_));
if (now - tuned_time_ >=
kRefillsPerTune * std::chrono::microseconds(refill_period_us_)) {
Tune();
}
}
if (stop_) {
return;
}
......@@ -138,6 +155,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
} else {
int64_t wait_until = env_->NowMicros() + delta;
RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS);
++num_drains_;
timedout = r.cv.TimedWait(wait_until);
}
} else {
......@@ -256,15 +274,66 @@ int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod(
}
}
Status GenericRateLimiter::Tune() {
const int kLowWatermarkPct = 50;
const int kHighWatermarkPct = 90;
const int kAdjustFactorPct = 5;
// computed rate limit will be in
// `[max_bytes_per_sec_ / kAllowedRangeFactor, max_bytes_per_sec_]`.
const int kAllowedRangeFactor = 20;
std::chrono::microseconds prev_tuned_time = tuned_time_;
tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic(env_));
int64_t elapsed_intervals = (tuned_time_ - prev_tuned_time +
std::chrono::microseconds(refill_period_us_) -
std::chrono::microseconds(1)) /
std::chrono::microseconds(refill_period_us_);
// We tune every kRefillsPerTune intervals, so the overflow and division-by-
// zero conditions should never happen.
assert(num_drains_ - prev_num_drains_ <= port::kMaxInt64 / 100);
assert(elapsed_intervals > 0);
int64_t drained_pct =
(num_drains_ - prev_num_drains_) * 100 / elapsed_intervals;
int64_t prev_bytes_per_sec = GetBytesPerSecond();
int64_t new_bytes_per_sec;
if (drained_pct == 0) {
new_bytes_per_sec = max_bytes_per_sec_ / kAllowedRangeFactor;
} else if (drained_pct < kLowWatermarkPct) {
// sanitize to prevent overflow
int64_t sanitized_prev_bytes_per_sec =
std::min(prev_bytes_per_sec, port::kMaxInt64 / 100);
new_bytes_per_sec =
std::max(max_bytes_per_sec_ / kAllowedRangeFactor,
sanitized_prev_bytes_per_sec * 100 / (100 + kAdjustFactorPct));
} else if (drained_pct > kHighWatermarkPct) {
// sanitize to prevent overflow
int64_t sanitized_prev_bytes_per_sec = std::min(
prev_bytes_per_sec, port::kMaxInt64 / (100 + kAdjustFactorPct));
new_bytes_per_sec =
std::min(max_bytes_per_sec_,
sanitized_prev_bytes_per_sec * (100 + kAdjustFactorPct) / 100);
} else {
new_bytes_per_sec = prev_bytes_per_sec;
}
if (new_bytes_per_sec != prev_bytes_per_sec) {
SetBytesPerSecond(new_bytes_per_sec);
}
num_drains_ = prev_num_drains_;
return Status::OK();
}
RateLimiter* NewGenericRateLimiter(
int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */,
int32_t fairness /* = 10 */,
RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */) {
RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */,
bool auto_tuned /* = false */) {
assert(rate_bytes_per_sec > 0);
assert(refill_period_us > 0);
assert(fairness > 0);
return new GenericRateLimiter(rate_bytes_per_sec, refill_period_us, fairness,
mode);
mode, Env::Default(), auto_tuned);
}
} // namespace rocksdb
......@@ -11,20 +11,21 @@
#include <algorithm>
#include <atomic>
#include <chrono>
#include <deque>
#include "port/port.h"
#include "util/mutexlock.h"
#include "util/random.h"
#include "rocksdb/env.h"
#include "rocksdb/rate_limiter.h"
#include "util/mutexlock.h"
#include "util/random.h"
namespace rocksdb {
class GenericRateLimiter : public RateLimiter {
public:
GenericRateLimiter(int64_t refill_bytes, int64_t refill_period_us,
int32_t fairness,
RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly);
int32_t fairness, RateLimiter::Mode mode, Env* env,
bool auto_tuned);
virtual ~GenericRateLimiter();
......@@ -68,6 +69,8 @@ class GenericRateLimiter : public RateLimiter {
private:
void Refill();
int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec);
Status Tune();
uint64_t NowMicrosMonotonic(Env* env) {
return env->NowNanos() / std::milli::den;
}
......@@ -99,6 +102,12 @@ class GenericRateLimiter : public RateLimiter {
struct Req;
Req* leader_;
std::deque<Req*> queue_[Env::IO_TOTAL];
bool auto_tuned_;
int64_t num_drains_;
int64_t prev_num_drains_;
const int64_t max_bytes_per_sec_;
std::chrono::microseconds tuned_time_;
};
} // namespace rocksdb
......@@ -12,8 +12,12 @@
#endif
#include "util/rate_limiter.h"
#include <inttypes.h>
#include <chrono>
#include <limits>
#include "db/db_test_util.h"
#include "rocksdb/env.h"
#include "util/random.h"
#include "util/sync_point.h"
......@@ -25,7 +29,9 @@ namespace rocksdb {
class RateLimiterTest : public testing::Test {};
TEST_F(RateLimiterTest, OverflowRate) {
GenericRateLimiter limiter(port::kMaxInt64, 1000, 10);
GenericRateLimiter limiter(port::kMaxInt64, 1000, 10,
RateLimiter::Mode::kWritesOnly, Env::Default(),
false /* auto_tuned */);
ASSERT_GT(limiter.GetSingleBurstBytes(), 1000000000ll);
}
......@@ -36,9 +42,9 @@ TEST_F(RateLimiterTest, StartStop) {
TEST_F(RateLimiterTest, Modes) {
for (auto mode : {RateLimiter::Mode::kWritesOnly,
RateLimiter::Mode::kReadsOnly, RateLimiter::Mode::kAllIo}) {
GenericRateLimiter limiter(2000 /* rate_bytes_per_sec */,
1000 * 1000 /* refill_period_us */,
10 /* fairness */, mode);
GenericRateLimiter limiter(
2000 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
10 /* fairness */, mode, Env::Default(), false /* auto_tuned */);
limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */,
RateLimiter::OpType::kRead);
if (mode == RateLimiter::Mode::kWritesOnly) {
......@@ -147,7 +153,9 @@ TEST_F(RateLimiterTest, LimitChangeTest) {
// refill per second
for (int iter = 0; iter < 2; iter++) {
std::shared_ptr<RateLimiter> limiter =
std::make_shared<GenericRateLimiter>(target, refill_period, 10);
std::make_shared<GenericRateLimiter>(
target, refill_period, 10, RateLimiter::Mode::kWritesOnly,
Env::Default(), false /* auto_tuned */);
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"GenericRateLimiter::Request",
"RateLimiterTest::LimitChangeTest:changeLimitStart"},
......@@ -172,6 +180,57 @@ TEST_F(RateLimiterTest, LimitChangeTest) {
}
}
TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) {
const std::chrono::seconds kTimePerRefill(1);
const int kRefillsPerTune = 100; // needs to match util/rate_limiter.cc
SpecialEnv special_env(Env::Default());
special_env.no_slowdown_ = true;
special_env.time_elapse_only_sleep_ = true;
auto stats = CreateDBStatistics();
std::unique_ptr<RateLimiter> rate_limiter(new GenericRateLimiter(
1000 /* rate_bytes_per_sec */,
std::chrono::microseconds(kTimePerRefill).count(), 10 /* fairness */,
RateLimiter::Mode::kWritesOnly, &special_env, true /* auto_tuned */));
// Use callback to advance time because we need to advance (1) after Request()
// has determined the bytes are not available; and (2) before Refill()
// computes the next refill time (ensuring refill time in the future allows
// the next request to drain the rate limiter).
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"GenericRateLimiter::Refill", [&](void* arg) {
special_env.SleepForMicroseconds(static_cast<int>(
std::chrono::microseconds(kTimePerRefill).count()));
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// verify rate limit increases after a sequence of periods where rate limiter
// is always drained
int64_t orig_bytes_per_sec = rate_limiter->GetSingleBurstBytes();
rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(),
RateLimiter::OpType::kWrite);
while (std::chrono::microseconds(special_env.NowMicros()) <=
kRefillsPerTune * kTimePerRefill) {
rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(),
RateLimiter::OpType::kWrite);
}
int64_t new_bytes_per_sec = rate_limiter->GetSingleBurstBytes();
ASSERT_GT(new_bytes_per_sec, orig_bytes_per_sec);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
// decreases after a sequence of periods where rate limiter is not drained
orig_bytes_per_sec = new_bytes_per_sec;
special_env.SleepForMicroseconds(static_cast<int>(
kRefillsPerTune * std::chrono::microseconds(kTimePerRefill).count()));
// make a request so tuner can be triggered
rate_limiter->Request(1 /* bytes */, Env::IO_HIGH, stats.get(),
RateLimiter::OpType::kWrite);
new_bytes_per_sec = rate_limiter->GetSingleBurstBytes();
ASSERT_LT(new_bytes_per_sec, orig_bytes_per_sec);
}
} // namespace rocksdb
int main(int argc, char** argv) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册