From f36394ff20b8cb3476deb19c141fae2651ebb641 Mon Sep 17 00:00:00 2001 From: akankshamahajan Date: Mon, 28 Aug 2023 17:08:28 -0700 Subject: [PATCH] Fix seg fault in auto_readahead_size with async_io (#11769) Summary: Fix seg fault in auto_readahead_size with async_io when readahead_size = 0. If readahead_size is trimmed and is 0, it's not eligible for further prefetching and should return. Error occured when the first buffer already contains data and it goes for prefetching in second buffer leading to assertion failure - `assert(roundup_len1 >= alignment); ` because roundup_len1 = length + readahead_size. length is 0 and readahead_size is also 0. Pull Request resolved: https://github.com/facebook/rocksdb/pull/11769 Test Plan: Reproducible with db_stress with async_io enabled. Reviewed By: anand1976 Differential Revision: D48743031 Pulled By: akankshamahajan15 fbshipit-source-id: 0e08c41f862f6287ca223fbfaf6cd42fc97b3c87 --- file/file_prefetch_buffer.cc | 14 +++-- file/file_prefetch_buffer.h | 6 ++ file/prefetch_test.cc | 115 +++++++++++++++++++++++++++++++++++ 3 files changed, 129 insertions(+), 6 deletions(-) diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index d34a65c82..5c9c2797c 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -545,7 +545,9 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal(const IOOptions& opts, assert(roundup_len1 >= chunk_len1); read_len1 = static_cast(roundup_len1 - chunk_len1); } - { + + // Prefetch in second buffer only if readahead_size_ > 0. + if (readahead_size_ > 0) { // offset and size alignment for second buffer for asynchronous // prefetching uint64_t rounddown_start2 = roundup_end1; @@ -733,7 +735,9 @@ bool FilePrefetchBuffer::TryReadFromCacheAsyncUntracked( (bufs_[curr_].async_read_in_progress_ || offset + n > bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize())) { - if (readahead_size_ > 0) { + // In case readahead_size is trimmed (=0), we still want to poll the data + // submitted with explicit_prefetch_submitted_=true. + if (readahead_size_ > 0 || explicit_prefetch_submitted_) { Status s; assert(reader != nullptr); assert(max_readahead_size_ >= readahead_size_); @@ -825,14 +829,12 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, num_file_reads_ = 0; explicit_prefetch_submitted_ = false; bool is_eligible_for_prefetching = false; + + UpdateReadAheadSizeForUpperBound(offset, n); if (readahead_size_ > 0 && (!implicit_auto_readahead_ || num_file_reads_ >= num_file_reads_for_auto_readahead_)) { - UpdateReadAheadSizeForUpperBound(offset, n); - // After trim, readahead size can be 0. - if (readahead_size_ > 0) { is_eligible_for_prefetching = true; - } } // 1. Cancel any pending async read to make code simpler as buffers can be out diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h index 334e32b6e..8c2e82476 100644 --- a/file/file_prefetch_buffer.h +++ b/file/file_prefetch_buffer.h @@ -389,6 +389,12 @@ class FilePrefetchBuffer { bufs_[second].offset_)) { return false; } + + // Readahead size can be 0 because of trimming. + if (readahead_size_ == 0) { + return false; + } + bufs_[second].buffer_.Clear(); return true; } diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index 14c7c28a5..782751c82 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -2164,6 +2164,121 @@ TEST_P(PrefetchTest, IterReadAheadSizeWithUpperBound) { } } +// This test checks if readahead_size is trimmed when upper_bound is reached +// during Seek in async_io and it goes for polling without any extra +// prefetching. +TEST_P(PrefetchTest, IterReadAheadSizeWithUpperBoundSeekOnly) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } + + // First param is if the mockFS support_prefetch or not + std::shared_ptr fs = + std::make_shared(FileSystem::Default(), false); + + bool use_direct_io = false; + if (std::get<0>(GetParam())) { + use_direct_io = true; + } + + std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); + Options options; + SetGenericOptions(env.get(), use_direct_io, options); + options.statistics = CreateDBStatistics(); + BlockBasedTableOptions table_options; + SetBlockBasedTableOptions(table_options); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + Random rnd(309); + WriteBatch batch; + + for (int i = 0; i < 26; i++) { + std::string key = "my_key_"; + + for (int j = 0; j < 10; j++) { + key += char('a' + i); + ASSERT_OK(batch.Put(key, rnd.RandomString(1000))); + } + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + std::string start_key = "my_key_a"; + + std::string end_key = "my_key_"; + for (int j = 0; j < 10; j++) { + end_key += char('a' + 25); + } + + Slice least(start_key.data(), start_key.size()); + Slice greatest(end_key.data(), end_key.size()); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest)); + + s = TryReopen(options); + ASSERT_OK(s); + + int buff_count_with_tuning = 0; + + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::PrefetchAsyncInternal:Start", + [&](void*) { buff_count_with_tuning++; }); + + bool read_async_called = false; + SyncPoint::GetInstance()->SetCallBack( + "UpdateResults::io_uring_result", + [&](void* /*arg*/) { read_async_called = true; }); + + SyncPoint::GetInstance()->EnableProcessing(); + + SyncPoint::GetInstance()->EnableProcessing(); + + ReadOptions ropts; + if (std::get<1>(GetParam())) { + ropts.readahead_size = 32768; + } + ropts.async_io = true; + + Slice ub = Slice("my_key_aaa"); + ropts.iterate_upper_bound = &ub; + Slice seek_key = Slice("my_key_aaa"); + + // With tuning readahead_size. + { + ASSERT_OK(options.statistics->Reset()); + ropts.auto_readahead_size = true; + + auto iter = std::unique_ptr(db_->NewIterator(ropts)); + + iter->Seek(seek_key); + + ASSERT_OK(iter->status()); + + // Verify results. + uint64_t readhahead_trimmed = + options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED); + // Readahead got trimmed. + if (read_async_called) { + ASSERT_GT(readhahead_trimmed, 0); + // Seek called PrefetchAsync to poll the data. + ASSERT_EQ(1, buff_count_with_tuning); + } else { + // async_io disabled. + ASSERT_GE(readhahead_trimmed, 0); + ASSERT_EQ(0, buff_count_with_tuning); + } + } + Close(); +} + namespace { #ifdef GFLAGS const int kMaxArgCount = 100; -- GitLab