提交 832b056a 编写于 作者: A anand76 提交者: Facebook GitHub Bot

Enable IO timeouts for iterators (#7161)

Summary:
Introduce io_timeout in ReadOptions and enabled deadline/io_timeout for
Iterators.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7161

Test Plan: New unit tests in db_basic_test

Reviewed By: riversand963

Differential Revision: D22687352

Pulled By: anand1976

fbshipit-source-id: 67bbb0e6d7ae80b256589244468494292538c6ec
上级 b79f13b2
......@@ -2829,12 +2829,11 @@ class DeadlineFS;
class DeadlineRandomAccessFile : public FSRandomAccessFileWrapper {
public:
DeadlineRandomAccessFile(DeadlineFS& fs, SpecialEnv* env,
DeadlineRandomAccessFile(DeadlineFS& fs,
std::unique_ptr<FSRandomAccessFile>& file)
: FSRandomAccessFileWrapper(file.get()),
fs_(fs),
file_(std::move(file)),
env_(env) {}
file_(std::move(file)) {}
IOStatus Read(uint64_t offset, size_t len, const IOOptions& opts,
Slice* result, char* scratch,
......@@ -2846,18 +2845,22 @@ class DeadlineRandomAccessFile : public FSRandomAccessFileWrapper {
private:
DeadlineFS& fs_;
std::unique_ptr<FSRandomAccessFile> file_;
SpecialEnv* env_;
};
class DeadlineFS : public FileSystemWrapper {
public:
explicit DeadlineFS(SpecialEnv* env)
// The error_on_delay parameter specifies whether a IOStatus::TimedOut()
// status should be returned after delaying the IO to exceed the timeout,
// or to simply delay but return success anyway. The latter mimics the
// behavior of PosixFileSystem, which does not enforce any timeout
explicit DeadlineFS(SpecialEnv* env, bool error_on_delay)
: FileSystemWrapper(FileSystem::Default()),
delay_idx_(0),
deadline_(std::chrono::microseconds::zero()),
io_timeout_(std::chrono::microseconds::zero()),
env_(env),
timedout_(false),
ignore_deadline_(false) {}
ignore_deadline_(false),
error_on_delay_(error_on_delay) {}
IOStatus NewRandomAccessFile(const std::string& fname,
const FileOptions& opts,
......@@ -2867,100 +2870,111 @@ class DeadlineFS : public FileSystemWrapper {
IOStatus s;
s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
result->reset(new DeadlineRandomAccessFile(*this, env_, file));
result->reset(new DeadlineRandomAccessFile(*this, file));
int delay;
const std::chrono::microseconds deadline = GetDeadline();
if (deadline.count()) {
AssertDeadline(deadline, opts.io_options);
const std::chrono::microseconds io_timeout = GetIOTimeout();
if (deadline.count() || io_timeout.count()) {
AssertDeadline(deadline, io_timeout, opts.io_options);
}
if (ShouldDelay(&delay, &s)) {
env_->SleepForMicroseconds(delay);
}
return s;
return ShouldDelay(opts.io_options);
}
// Set a vector of {IO counter, delay in microseconds, return status} tuples
// that control when to inject a delay and duration of the delay
void SetDelaySequence(
const std::chrono::microseconds deadline,
const std::vector<std::tuple<int, int, IOStatus>>&& seq) {
int total_delay = 0;
for (auto& seq_iter : seq) {
// Ensure no individual delay is > 500ms
ASSERT_LT(std::get<1>(seq_iter), 500000);
total_delay += std::get<1>(seq_iter);
}
// ASSERT total delay is < 1s. This is mainly to keep the test from
// timing out in CI test frameworks
ASSERT_LT(total_delay, 1000000);
delay_seq_ = seq;
delay_idx_ = 0;
void SetDelayTrigger(const std::chrono::microseconds deadline,
const std::chrono::microseconds io_timeout,
const int trigger) {
delay_trigger_ = trigger;
io_count_ = 0;
deadline_ = deadline;
io_timeout_ = io_timeout;
timedout_ = false;
}
// Increment the IO counter and return a delay in microseconds
bool ShouldDelay(int* delay, IOStatus* s) {
if (!ignore_deadline_ && delay_idx_ < delay_seq_.size() &&
std::get<0>(delay_seq_[delay_idx_]) == io_count_++) {
*delay = std::get<1>(delay_seq_[delay_idx_]);
*s = std::get<2>(delay_seq_[delay_idx_]);
delay_idx_++;
IOStatus ShouldDelay(const IOOptions& opts) {
if (!deadline_.count() && !io_timeout_.count()) {
return IOStatus::OK();
}
if (!ignore_deadline_ && delay_trigger_ == io_count_++) {
env_->SleepForMicroseconds(static_cast<int>(opts.timeout.count() + 1));
timedout_ = true;
return true;
if (error_on_delay_) {
return IOStatus::TimedOut();
}
}
*s = IOStatus::OK();
return false;
return IOStatus::OK();
}
const std::chrono::microseconds GetDeadline() {
return ignore_deadline_ ? std::chrono::microseconds::zero() : deadline_;
}
const std::chrono::microseconds GetIOTimeout() {
return ignore_deadline_ ? std::chrono::microseconds::zero() : io_timeout_;
}
bool TimedOut() { return timedout_; }
void IgnoreDeadline(bool ignore) { ignore_deadline_ = ignore; }
void AssertDeadline(const std::chrono::microseconds deadline,
const std::chrono::microseconds io_timeout,
const IOOptions& opts) const {
// Give a leeway of +- 10us as it can take some time for the Get/
// MultiGet call to reach here, in order to avoid false alarms
std::chrono::microseconds now =
std::chrono::microseconds(env_->NowMicros());
if (deadline - now != opts.timeout) {
ASSERT_EQ(deadline - now, opts.timeout);
std::chrono::microseconds timeout;
if (deadline.count()) {
timeout = deadline - now;
if (io_timeout.count()) {
timeout = std::min(timeout, io_timeout);
}
} else {
timeout = io_timeout;
}
if (opts.timeout != timeout) {
ASSERT_EQ(timeout, opts.timeout);
}
}
private:
std::vector<std::tuple<int, int, IOStatus>> delay_seq_;
size_t delay_idx_;
// The number of IOs to trigger the delay after
int delay_trigger_;
// Current IO count
int io_count_;
// ReadOptions deadline for the Get/MultiGet/Iterator
std::chrono::microseconds deadline_;
// ReadOptions io_timeout for the Get/MultiGet/Iterator
std::chrono::microseconds io_timeout_;
SpecialEnv* env_;
// Flag to indicate whether we injected a delay
bool timedout_;
// Temporarily ignore deadlines/timeouts
bool ignore_deadline_;
// Return IOStatus::TimedOut() or IOStatus::OK()
bool error_on_delay_;
};
IOStatus DeadlineRandomAccessFile::Read(uint64_t offset, size_t len,
const IOOptions& opts, Slice* result,
char* scratch,
IODebugContext* dbg) const {
int delay;
const std::chrono::microseconds deadline = fs_.GetDeadline();
const std::chrono::microseconds io_timeout = fs_.GetIOTimeout();
IOStatus s;
if (deadline.count()) {
fs_.AssertDeadline(deadline, opts);
}
if (fs_.ShouldDelay(&delay, &s)) {
env_->SleepForMicroseconds(delay);
if (deadline.count() || io_timeout.count()) {
fs_.AssertDeadline(deadline, io_timeout, opts);
}
if (s.ok()) {
s = FSRandomAccessFileWrapper::Read(offset, len, opts, result, scratch,
dbg);
}
if (s.ok()) {
s = fs_.ShouldDelay(opts);
}
return s;
}
......@@ -2968,23 +2982,23 @@ IOStatus DeadlineRandomAccessFile::MultiRead(FSReadRequest* reqs,
size_t num_reqs,
const IOOptions& options,
IODebugContext* dbg) {
int delay;
const std::chrono::microseconds deadline = fs_.GetDeadline();
const std::chrono::microseconds io_timeout = fs_.GetIOTimeout();
IOStatus s;
if (deadline.count()) {
fs_.AssertDeadline(deadline, options);
}
if (fs_.ShouldDelay(&delay, &s)) {
env_->SleepForMicroseconds(delay);
if (deadline.count() || io_timeout.count()) {
fs_.AssertDeadline(deadline, io_timeout, options);
}
if (s.ok()) {
s = FSRandomAccessFileWrapper::MultiRead(reqs, num_reqs, options, dbg);
}
if (s.ok()) {
s = fs_.ShouldDelay(options);
}
return s;
}
// A test class for intercepting random reads and injecting artificial
// delays. Used for testing the deadline/timeout feature
// delays. Used for testing the MultiGet deadline feature
class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet {
public:
DBBasicTestMultiGetDeadline()
......@@ -3000,14 +3014,16 @@ class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet {
if (i < num_ok) {
EXPECT_OK(statuses[i]);
} else {
EXPECT_EQ(statuses[i], Status::TimedOut());
if (statuses[i] != Status::TimedOut()) {
EXPECT_EQ(statuses[i], Status::TimedOut());
}
}
}
}
};
TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
std::shared_ptr<DeadlineFS> fs = std::make_shared<DeadlineFS>(env_);
std::shared_ptr<DeadlineFS> fs = std::make_shared<DeadlineFS>(env_, false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options = CurrentOptions();
env_->SetTimeElapseOnlySleep(&options);
......@@ -3037,9 +3053,8 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
ReadOptions ro;
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
// Delay the first IO by 200ms
fs->SetDelaySequence(
ro.deadline, {std::tuple<int, int, IOStatus>{0, 20000, IOStatus::OK()}});
// Delay the first IO
fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 0);
std::vector<Status> statuses = dbfull()->MultiGet(ro, cfs, keys, &values);
// The first key is successful because we check after the lookup, but
......@@ -3064,8 +3079,7 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
keys[i] = Slice(key_str[i].data(), key_str[i].size());
}
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
fs->SetDelaySequence(
ro.deadline, {std::tuple<int, int, IOStatus>{1, 20000, IOStatus::OK()}});
fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 1);
statuses = dbfull()->MultiGet(ro, cfs, keys, &values);
CheckStatus(statuses, 3);
......@@ -3079,8 +3093,7 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
statuses.clear();
statuses.resize(keys.size());
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
fs->SetDelaySequence(
ro.deadline, {std::tuple<int, int, IOStatus>{0, 20000, IOStatus::OK()}});
fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 0);
dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(),
pin_values.data(), statuses.data());
CheckStatus(statuses, 2);
......@@ -3095,8 +3108,7 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
statuses.clear();
statuses.resize(keys.size());
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
fs->SetDelaySequence(
ro.deadline, {std::tuple<int, int, IOStatus>{2, 20000, IOStatus::OK()}});
fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 2);
dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(),
pin_values.data(), statuses.data());
CheckStatus(statuses, 6);
......@@ -3110,8 +3122,7 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
statuses.clear();
statuses.resize(keys.size());
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
fs->SetDelaySequence(
ro.deadline, {std::tuple<int, int, IOStatus>{3, 20000, IOStatus::OK()}});
fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 3);
dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(),
pin_values.data(), statuses.data());
CheckStatus(statuses, 8);
......@@ -3137,8 +3148,7 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
statuses.clear();
statuses.resize(keys.size());
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
fs->SetDelaySequence(
ro.deadline, {std::tuple<int, int, IOStatus>{1, 20000, IOStatus::OK()}});
fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 1);
dbfull()->MultiGet(ro, handles_[0], keys.size(), keys.data(),
pin_values.data(), statuses.data());
CheckStatus(statuses, 64);
......@@ -3172,9 +3182,17 @@ TEST_F(DBBasicTest, ManifestWriteFailure) {
Reopen(options);
}
TEST_F(DBBasicTest, PointLookupDeadline) {
std::shared_ptr<DeadlineFS> fs = std::make_shared<DeadlineFS>(env_);
// A test class for intercepting random reads and injecting artificial
// delays. Used for testing the deadline/timeout feature
class DBBasicTestDeadline
: public DBBasicTest,
public testing::WithParamInterface<std::tuple<bool, bool>> {};
TEST_P(DBBasicTestDeadline, PointLookupDeadline) {
std::shared_ptr<DeadlineFS> fs = std::make_shared<DeadlineFS>(env_, true);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool set_deadline = std::get<0>(GetParam());
bool set_timeout = std::get<1>(GetParam());
// Since we call SetTimeElapseOnlySleep, Close() later on may not work
// properly for the DB that's opened by the DBTestBase constructor.
......@@ -3241,10 +3259,13 @@ TEST_F(DBBasicTest, PointLookupDeadline) {
// and cause the Get() to fail.
while (timedout) {
ReadOptions ro;
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
fs->SetDelaySequence(
ro.deadline, {std::tuple<int, int, IOStatus>{
io_deadline_trigger, 20000, IOStatus::TimedOut()}});
if (set_deadline) {
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
}
if (set_timeout) {
ro.io_timeout = std::chrono::microseconds{5000};
}
fs->SetDelayTrigger(ro.deadline, ro.io_timeout, io_deadline_trigger);
block_cache->SetCapacity(0);
block_cache->SetCapacity(1048576);
......@@ -3260,11 +3281,112 @@ TEST_F(DBBasicTest, PointLookupDeadline) {
io_deadline_trigger++;
}
// Reset the delay sequence in order to avoid false alarms during Reopen
fs->SetDelaySequence(std::chrono::microseconds::zero(), {});
fs->SetDelayTrigger(std::chrono::microseconds::zero(),
std::chrono::microseconds::zero(), 0);
}
Close();
}
TEST_P(DBBasicTestDeadline, IteratorDeadline) {
std::shared_ptr<DeadlineFS> fs = std::make_shared<DeadlineFS>(env_, true);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool set_deadline = std::get<0>(GetParam());
bool set_timeout = std::get<1>(GetParam());
// Since we call SetTimeElapseOnlySleep, Close() later on may not work
// properly for the DB that's opened by the DBTestBase constructor.
Close();
for (int option_config = kDefault; option_config < kEnd; ++option_config) {
if (ShouldSkipOptions(option_config, kSkipPlainTable | kSkipMmapReads)) {
continue;
}
Options options = CurrentOptions();
if (options.use_direct_reads) {
continue;
}
options.env = env.get();
options.disable_auto_compactions = true;
Cache* block_cache = nullptr;
env_->SetTimeElapseOnlySleep(&options);
// DB open will create table readers unless we reduce the table cache
// capacity.
// SanitizeOptions will set max_open_files to minimum of 20. Table cache
// is allocated with max_open_files - 10 as capacity. So override
// max_open_files to 11 so table cache capacity will become 1. This will
// prevent file open during DB open and force the file to be opened
// during MultiGet
SyncPoint::GetInstance()->SetCallBack(
"SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
int* max_open_files = (int*)arg;
*max_open_files = 11;
});
SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
if (options.table_factory &&
!strcmp(options.table_factory->Name(),
BlockBasedTableFactory::kName.c_str())) {
BlockBasedTableFactory* bbtf =
static_cast<BlockBasedTableFactory*>(options.table_factory.get());
block_cache = bbtf->table_options().block_cache.get();
}
Random rnd(301);
for (int i = 0; i < 400; ++i) {
std::string key = "k" + ToString(i);
Put(key, rnd.RandomString(100));
}
Flush();
bool timedout = true;
// A timeout will be forced when the IO counter reaches this value
int io_deadline_trigger = 0;
// Keep incrementing io_deadline_trigger and call Get() until there is an
// iteration that doesn't cause a timeout. This ensures that we cover
// all file reads in the point lookup path that can potentially timeout
while (timedout) {
ReadOptions ro;
if (set_deadline) {
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
}
if (set_timeout) {
ro.io_timeout = std::chrono::microseconds{5000};
}
fs->SetDelayTrigger(ro.deadline, ro.io_timeout, io_deadline_trigger);
block_cache->SetCapacity(0);
block_cache->SetCapacity(1048576);
Iterator* iter = dbfull()->NewIterator(ro);
int count = 0;
iter->Seek("k50");
while (iter->Valid() && count++ < 100) {
iter->Next();
}
if (fs->TimedOut()) {
ASSERT_FALSE(iter->Valid());
ASSERT_EQ(iter->status(), Status::TimedOut());
} else {
timedout = false;
ASSERT_OK(iter->status());
}
delete iter;
io_deadline_trigger++;
}
// Reset the delay sequence in order to avoid false alarms during Reopen
fs->SetDelayTrigger(std::chrono::microseconds::zero(),
std::chrono::microseconds::zero(), 0);
}
Close();
}
// Param 0: If true, set read_options.deadline
// Param 1: If true, set read_options.io_timeout
INSTANTIATE_TEST_CASE_P(DBBasicTestDeadline, DBBasicTestDeadline,
::testing::Values(std::make_tuple(true, false),
std::make_tuple(false, true),
std::make_tuple(true, true)));
} // namespace ROCKSDB_NAMESPACE
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
......
......@@ -2708,10 +2708,6 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
return NewErrorIterator(
Status::NotSupported("Managed iterator is not supported anymore."));
}
if (read_options.deadline != std::chrono::microseconds::zero()) {
return NewErrorIterator(
Status::NotSupported("ReadOptions deadline is not supported"));
}
Iterator* result = nullptr;
if (read_options.read_tier == kPersistedTier) {
return NewErrorIterator(Status::NotSupported(
......
......@@ -45,11 +45,18 @@ inline IOStatus PrepareIOFromReadOptions(const ReadOptions& ro, Env* env,
if (ro.deadline.count()) {
std::chrono::microseconds now = std::chrono::microseconds(env->NowMicros());
if (now > ro.deadline) {
// Ensure there is atleast 1us available. We don't want to pass a value of
// 0 as that means no timeout
if (now >= ro.deadline) {
return IOStatus::TimedOut("Deadline exceeded");
}
opts.timeout = ro.deadline - now;
}
if (ro.io_timeout.count() &&
(!opts.timeout.count() || ro.io_timeout < opts.timeout)) {
opts.timeout = ro.io_timeout;
}
return IOStatus::OK();
}
......
......@@ -1347,7 +1347,8 @@ struct ReadOptions {
const Slice* timestamp;
const Slice* iter_start_ts;
// Deadline for completing the read request (only Get/MultiGet for now) in us.
// Deadline for completing an API call (Get/MultiGet/Seek/Next for now)
// in microseconds.
// It should be set to microseconds since epoch, i.e, gettimeofday or
// equivalent plus allowed duration in microseconds. The best way is to use
// env->NowMicros() + some timeout.
......@@ -1357,6 +1358,12 @@ struct ReadOptions {
// processing a batch
std::chrono::microseconds deadline;
// A timeout in microseconds to be passed to the underlying FileSystem for
// reads. As opposed to deadline, this determines the timeout for each
// individual file read request. If a MultiGet/Get/Seek/Next etc call
// results in multiple reads, each read can last upto io_timeout us.
std::chrono::microseconds io_timeout;
// It limits the maximum cumulative value size of the keys in batch while
// reading through MultiGet. Once the cumulative value size exceeds this
// soft limit then all the remaining keys are returned with status Aborted.
......
......@@ -613,6 +613,7 @@ ReadOptions::ReadOptions()
timestamp(nullptr),
iter_start_ts(nullptr),
deadline(std::chrono::microseconds::zero()),
io_timeout(std::chrono::microseconds::zero()),
value_size_soft_limit(std::numeric_limits<uint64_t>::max()) {}
ReadOptions::ReadOptions(bool cksum, bool cache)
......@@ -636,6 +637,7 @@ ReadOptions::ReadOptions(bool cksum, bool cache)
timestamp(nullptr),
iter_start_ts(nullptr),
deadline(std::chrono::microseconds::zero()),
io_timeout(std::chrono::microseconds::zero()),
value_size_soft_limit(std::numeric_limits<uint64_t>::max()) {}
} // namespace ROCKSDB_NAMESPACE
......@@ -595,12 +595,14 @@ Status BlockBasedTable::Open(
Footer footer;
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
// Only retain read_options.deadline. In future, we may retain more
// Only retain read_options.deadline and read_options.io_timeout.
// In future, we may retain more
// options. Specifically, w ignore verify_checksums and default to
// checksum verification anyway when creating the index and filter
// readers.
ReadOptions ro;
ro.deadline = read_options.deadline;
ro.io_timeout = read_options.io_timeout;
// prefetch both index and filters, down to all partitions
const bool prefetch_all = prefetch_index_and_filter_in_cache || level == 0;
......
......@@ -78,6 +78,7 @@ InternalIteratorBase<IndexValue>* PartitionIndexReader::NewIterator(
ReadOptions ro;
ro.fill_cache = read_options.fill_cache;
ro.deadline = read_options.deadline;
ro.io_timeout = read_options.io_timeout;
// We don't return pinned data from index blocks, so no need
// to set `block_contents_pinned`.
std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册