提交 f36394ff 编写于 作者: A akankshamahajan 提交者: Facebook GitHub Bot

Fix seg fault in auto_readahead_size with async_io (#11769)

Summary:
Fix seg fault in auto_readahead_size with async_io when readahead_size = 0. If readahead_size is trimmed and is 0, it's not eligible for further prefetching and should return.

Error occured when the first buffer already contains data and it goes for prefetching in second buffer leading to assertion failure -

`assert(roundup_len1 >= alignment);
`
because roundup_len1 = length + readahead_size.
length is 0 and readahead_size is also 0.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11769

Test Plan: Reproducible with db_stress with async_io enabled.

Reviewed By: anand1976

Differential Revision: D48743031

Pulled By: akankshamahajan15

fbshipit-source-id: 0e08c41f862f6287ca223fbfaf6cd42fc97b3c87
上级 310a242c
......@@ -545,7 +545,9 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal(const IOOptions& opts,
assert(roundup_len1 >= chunk_len1);
read_len1 = static_cast<size_t>(roundup_len1 - chunk_len1);
}
{
// Prefetch in second buffer only if readahead_size_ > 0.
if (readahead_size_ > 0) {
// offset and size alignment for second buffer for asynchronous
// prefetching
uint64_t rounddown_start2 = roundup_end1;
......@@ -733,7 +735,9 @@ bool FilePrefetchBuffer::TryReadFromCacheAsyncUntracked(
(bufs_[curr_].async_read_in_progress_ ||
offset + n >
bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize())) {
if (readahead_size_ > 0) {
// In case readahead_size is trimmed (=0), we still want to poll the data
// submitted with explicit_prefetch_submitted_=true.
if (readahead_size_ > 0 || explicit_prefetch_submitted_) {
Status s;
assert(reader != nullptr);
assert(max_readahead_size_ >= readahead_size_);
......@@ -825,14 +829,12 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
num_file_reads_ = 0;
explicit_prefetch_submitted_ = false;
bool is_eligible_for_prefetching = false;
UpdateReadAheadSizeForUpperBound(offset, n);
if (readahead_size_ > 0 &&
(!implicit_auto_readahead_ ||
num_file_reads_ >= num_file_reads_for_auto_readahead_)) {
UpdateReadAheadSizeForUpperBound(offset, n);
// After trim, readahead size can be 0.
if (readahead_size_ > 0) {
is_eligible_for_prefetching = true;
}
}
// 1. Cancel any pending async read to make code simpler as buffers can be out
......
......@@ -389,6 +389,12 @@ class FilePrefetchBuffer {
bufs_[second].offset_)) {
return false;
}
// Readahead size can be 0 because of trimming.
if (readahead_size_ == 0) {
return false;
}
bufs_[second].buffer_.Clear();
return true;
}
......
......@@ -2164,6 +2164,121 @@ TEST_P(PrefetchTest, IterReadAheadSizeWithUpperBound) {
}
}
// This test checks if readahead_size is trimmed when upper_bound is reached
// during Seek in async_io and it goes for polling without any extra
// prefetching.
TEST_P(PrefetchTest, IterReadAheadSizeWithUpperBoundSeekOnly) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
return;
}
// First param is if the mockFS support_prefetch or not
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(FileSystem::Default(), false);
bool use_direct_io = false;
if (std::get<0>(GetParam())) {
use_direct_io = true;
}
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
options.statistics = CreateDBStatistics();
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
// If direct IO is not supported, skip the test
return;
} else {
ASSERT_OK(s);
}
Random rnd(309);
WriteBatch batch;
for (int i = 0; i < 26; i++) {
std::string key = "my_key_";
for (int j = 0; j < 10; j++) {
key += char('a' + i);
ASSERT_OK(batch.Put(key, rnd.RandomString(1000)));
}
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
std::string start_key = "my_key_a";
std::string end_key = "my_key_";
for (int j = 0; j < 10; j++) {
end_key += char('a' + 25);
}
Slice least(start_key.data(), start_key.size());
Slice greatest(end_key.data(), end_key.size());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
s = TryReopen(options);
ASSERT_OK(s);
int buff_count_with_tuning = 0;
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::PrefetchAsyncInternal:Start",
[&](void*) { buff_count_with_tuning++; });
bool read_async_called = false;
SyncPoint::GetInstance()->SetCallBack(
"UpdateResults::io_uring_result",
[&](void* /*arg*/) { read_async_called = true; });
SyncPoint::GetInstance()->EnableProcessing();
SyncPoint::GetInstance()->EnableProcessing();
ReadOptions ropts;
if (std::get<1>(GetParam())) {
ropts.readahead_size = 32768;
}
ropts.async_io = true;
Slice ub = Slice("my_key_aaa");
ropts.iterate_upper_bound = &ub;
Slice seek_key = Slice("my_key_aaa");
// With tuning readahead_size.
{
ASSERT_OK(options.statistics->Reset());
ropts.auto_readahead_size = true;
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ropts));
iter->Seek(seek_key);
ASSERT_OK(iter->status());
// Verify results.
uint64_t readhahead_trimmed =
options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED);
// Readahead got trimmed.
if (read_async_called) {
ASSERT_GT(readhahead_trimmed, 0);
// Seek called PrefetchAsync to poll the data.
ASSERT_EQ(1, buff_count_with_tuning);
} else {
// async_io disabled.
ASSERT_GE(readhahead_trimmed, 0);
ASSERT_EQ(0, buff_count_with_tuning);
}
}
Close();
}
namespace {
#ifdef GFLAGS
const int kMaxArgCount = 100;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册