提交 182b940e 编写于 作者: M Maysam Yabandeh 提交者: Facebook Github Bot

Add WriteOptions.no_slowdown

Summary:
If the WriteOptions.no_slowdown flag is set AND we need to wait or sleep for
the write request, then fail immediately with Status::Incomplete().
Closes https://github.com/facebook/rocksdb/pull/1527

Differential Revision: D4191405

Pulled By: maysamyabandeh

fbshipit-source-id: 7f3ce3f
上级 4118e133
......@@ -4739,7 +4739,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// for previous one. It might create a fairness issue that expiration
// might happen for smaller writes but larger writes can go through.
// Can optimize it if it is an issue.
status = DelayWrite(last_batch_group_size_);
status = DelayWrite(last_batch_group_size_, write_options);
PERF_TIMER_START(write_pre_and_post_process_time);
}
......@@ -4749,6 +4749,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
bool need_log_sync = !write_options.disableWAL && write_options.sync;
bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
bool logs_getting_synced = false;
if (status.ok()) {
if (need_log_sync) {
while (logs_.front().getting_synced) {
......@@ -4758,6 +4759,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
assert(!log.getting_synced);
log.getting_synced = true;
}
logs_getting_synced = true;
}
// Add to log and apply to memtable. We can release the lock
......@@ -4977,7 +4979,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
PERF_TIMER_START(write_pre_and_post_process_time);
if (immutable_db_options_.paranoid_checks && !status.ok() &&
!w.CallbackFailed() && !status.IsBusy()) {
!w.CallbackFailed() && !status.IsBusy() && !status.IsIncomplete()) {
mutex_.Lock();
if (bg_error_.ok()) {
bg_error_ = status; // stop compaction & fail any further writes
......@@ -4985,7 +4987,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
mutex_.Unlock();
}
if (need_log_sync) {
if (logs_getting_synced) {
mutex_.Lock();
MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
mutex_.Unlock();
......@@ -5040,13 +5042,17 @@ uint64_t DBImpl::GetMaxTotalWalSize() const {
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::DelayWrite(uint64_t num_bytes) {
Status DBImpl::DelayWrite(uint64_t num_bytes,
const WriteOptions& write_options) {
uint64_t time_delayed = 0;
bool delayed = false;
{
StopWatch sw(env_, stats_, WRITE_STALL, &time_delayed);
auto delay = write_controller_.GetDelay(env_, num_bytes);
if (delay > 0) {
if (write_options.no_slowdown) {
return Status::Incomplete();
}
mutex_.Unlock();
delayed = true;
TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep");
......@@ -5056,11 +5062,15 @@ Status DBImpl::DelayWrite(uint64_t num_bytes) {
}
while (bg_error_.ok() && write_controller_.IsStopped()) {
if (write_options.no_slowdown) {
return Status::Incomplete();
}
delayed = true;
TEST_SYNC_POINT("DBImpl::DelayWrite:Wait");
bg_cv_.Wait();
}
}
assert(!delayed || !write_options.no_slowdown);
if (delayed) {
default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_STALL_MICROS,
time_delayed);
......
......@@ -648,7 +648,7 @@ class DBImpl : public DB {
// num_bytes: for slowdown case, delay time is calculated based on
// `num_bytes` going through.
Status DelayWrite(uint64_t num_bytes);
Status DelayWrite(uint64_t num_bytes, const WriteOptions& write_options);
Status ScheduleFlushes(WriteContext* context);
......
......@@ -33,7 +33,7 @@ TEST_F(DBIOFailureTest, DropWrites) {
// Force out-of-space errors
env_->drop_writes_.store(true, std::memory_order_release);
env_->sleep_counter_.Reset();
env_->no_sleep_ = true;
env_->no_slowdown_ = true;
for (int i = 0; i < 5; i++) {
if (option_config_ != kUniversalCompactionMultiLevel &&
option_config_ != kUniversalSubcompactions) {
......
......@@ -322,7 +322,7 @@ TEST_F(DBSSTTest, RateLimitedDelete) {
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
env_->no_sleep_ = true;
env_->no_slowdown_ = true;
env_->time_elapse_only_sleep_ = true;
Options options = CurrentOptions();
options.disable_auto_compactions = true;
......
......@@ -215,6 +215,50 @@ TEST_F(DBTest, WriteEmptyBatch) {
ASSERT_EQ("bar", Get(1, "foo"));
}
TEST_F(DBTest, SkipDelay) {
Options options = CurrentOptions();
options.env = env_;
options.write_buffer_size = 100000;
CreateAndReopenWithCF({"pikachu"}, options);
for (bool sync : {true, false}) {
for (bool disableWAL : {true, false}) {
// Use a small number to ensure a large delay that is still effective
// when we do Put
// TODO(myabandeh): this is time dependent and could potentially make
// the test flaky
auto token = dbfull()->TEST_write_controler().GetDelayToken(1);
std::atomic<int> sleep_count(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DelayWrite:Sleep",
[&](void* arg) { sleep_count.fetch_add(1); });
std::atomic<int> wait_count(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DelayWrite:Wait",
[&](void* arg) { wait_count.fetch_add(1); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
WriteOptions wo;
wo.sync = sync;
wo.disableWAL = disableWAL;
wo.no_slowdown = true;
dbfull()->Put(wo, "foo", "bar");
// We need the 2nd write to trigger delay. This is because delay is
// estimated based on the last write size which is 0 for the first write.
ASSERT_NOK(dbfull()->Put(wo, "foo2", "bar2"));
ASSERT_GE(sleep_count.load(), 0);
ASSERT_GE(wait_count.load(), 0);
token.reset();
token = dbfull()->TEST_write_controler().GetDelayToken(1000000000);
wo.no_slowdown = false;
ASSERT_OK(dbfull()->Put(wo, "foo3", "bar3"));
ASSERT_GE(sleep_count.load(), 1);
token.reset();
}
}
}
#ifndef ROCKSDB_LITE
TEST_F(DBTest, ReadOnlyDB) {
ASSERT_OK(Put("foo", "v1"));
......@@ -4894,7 +4938,7 @@ TEST_F(DBTest, MergeTestTime) {
SetPerfLevel(kEnableTime);
this->env_->addon_time_.store(0);
this->env_->time_elapse_only_sleep_ = true;
this->env_->no_sleep_ = true;
this->env_->no_slowdown_ = true;
Options options = CurrentOptions();
options.statistics = rocksdb::CreateDBStatistics();
options.merge_operator.reset(new DelayedMergeOperator(this));
......@@ -5339,7 +5383,7 @@ TEST_F(DBTest, DelayedWriteRate) {
Options options = CurrentOptions();
env_->SetBackgroundThreads(1, Env::LOW);
options.env = env_;
env_->no_sleep_ = true;
env_->no_slowdown_ = true;
options.write_buffer_size = 100000000;
options.max_write_buffer_number = 256;
options.max_background_compactions = 1;
......@@ -5393,7 +5437,7 @@ TEST_F(DBTest, DelayedWriteRate) {
ASSERT_LT(env_->addon_time_.load(),
static_cast<int64_t>(estimated_sleep_time * 2));
env_->no_sleep_ = false;
env_->no_slowdown_ = false;
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilDone();
......
......@@ -19,7 +19,7 @@ SpecialEnv::SpecialEnv(Env* base)
sleep_counter_(this),
addon_time_(0),
time_elapse_only_sleep_(false),
no_sleep_(false) {
no_slowdown_(false) {
delay_sstable_sync_.store(false, std::memory_order_release);
drop_writes_.store(false, std::memory_order_release);
no_space_.store(false, std::memory_order_release);
......
......@@ -432,10 +432,10 @@ class SpecialEnv : public EnvWrapper {
virtual void SleepForMicroseconds(int micros) override {
sleep_counter_.Increment();
if (no_sleep_ || time_elapse_only_sleep_) {
if (no_slowdown_ || time_elapse_only_sleep_) {
addon_time_.fetch_add(micros);
}
if (!no_sleep_) {
if (!no_slowdown_) {
target()->SleepForMicroseconds(micros);
}
}
......@@ -524,7 +524,7 @@ class SpecialEnv : public EnvWrapper {
bool time_elapse_only_sleep_;
bool no_sleep_;
bool no_slowdown_;
std::atomic<bool> is_wal_sync_thread_safe_{true};
};
......
......@@ -268,6 +268,12 @@ size_t WriteThread::EnterAsBatchGroupLeader(
break;
}
if (w->no_slowdown != leader->no_slowdown) {
// Do not mix writes that are ok with delays with the ones that
// request fail on delays.
break;
}
if (!w->disableWAL && leader->disableWAL) {
// Do not include a write that needs WAL into a batch that has
// WAL disabled.
......
......@@ -79,6 +79,7 @@ class WriteThread {
struct Writer {
WriteBatch* batch;
bool sync;
bool no_slowdown;
bool disableWAL;
bool disable_memtable;
uint64_t log_used; // log number that this batch was inserted into
......@@ -99,6 +100,7 @@ class WriteThread {
Writer()
: batch(nullptr),
sync(false),
no_slowdown(false),
disableWAL(false),
disable_memtable(false),
log_used(0),
......
......@@ -1593,11 +1593,16 @@ struct WriteOptions {
// Default: false
bool ignore_missing_column_families;
// If true and we need to wait or sleep for the write request, fails
// immediately with Status::Incomplete().
bool no_slowdown;
WriteOptions()
: sync(false),
disableWAL(false),
timeout_hint_us(0),
ignore_missing_column_families(false) {}
ignore_missing_column_families(false),
no_slowdown(false) {}
};
// Options that control flush operations
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册