diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index d34a65c82b8bdad3765b49aba7fb942e0106585f..5c9c2797ca0f9a43593eb8dc391748eca373c284 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -545,7 +545,9 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal(const IOOptions& opts, assert(roundup_len1 >= chunk_len1); read_len1 = static_cast(roundup_len1 - chunk_len1); } - { + + // Prefetch in second buffer only if readahead_size_ > 0. + if (readahead_size_ > 0) { // offset and size alignment for second buffer for asynchronous // prefetching uint64_t rounddown_start2 = roundup_end1; @@ -733,7 +735,9 @@ bool FilePrefetchBuffer::TryReadFromCacheAsyncUntracked( (bufs_[curr_].async_read_in_progress_ || offset + n > bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize())) { - if (readahead_size_ > 0) { + // In case readahead_size is trimmed (=0), we still want to poll the data + // submitted with explicit_prefetch_submitted_=true. + if (readahead_size_ > 0 || explicit_prefetch_submitted_) { Status s; assert(reader != nullptr); assert(max_readahead_size_ >= readahead_size_); @@ -825,14 +829,12 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, num_file_reads_ = 0; explicit_prefetch_submitted_ = false; bool is_eligible_for_prefetching = false; + + UpdateReadAheadSizeForUpperBound(offset, n); if (readahead_size_ > 0 && (!implicit_auto_readahead_ || num_file_reads_ >= num_file_reads_for_auto_readahead_)) { - UpdateReadAheadSizeForUpperBound(offset, n); - // After trim, readahead size can be 0. - if (readahead_size_ > 0) { is_eligible_for_prefetching = true; - } } // 1. Cancel any pending async read to make code simpler as buffers can be out diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h index 334e32b6e4c1ddcb9ebf724a170453fb9b875272..8c2e8247668b261a229fd208692219d5ca9cd44e 100644 --- a/file/file_prefetch_buffer.h +++ b/file/file_prefetch_buffer.h @@ -389,6 +389,12 @@ class FilePrefetchBuffer { bufs_[second].offset_)) { return false; } + + // Readahead size can be 0 because of trimming. + if (readahead_size_ == 0) { + return false; + } + bufs_[second].buffer_.Clear(); return true; } diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index 14c7c28a5e99e4cc3b4b05e2f765100d0043b055..782751c8295cd530066ca03396965e60c987057c 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -2164,6 +2164,121 @@ TEST_P(PrefetchTest, IterReadAheadSizeWithUpperBound) { } } +// This test checks if readahead_size is trimmed when upper_bound is reached +// during Seek in async_io and it goes for polling without any extra +// prefetching. +TEST_P(PrefetchTest, IterReadAheadSizeWithUpperBoundSeekOnly) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } + + // First param is if the mockFS support_prefetch or not + std::shared_ptr fs = + std::make_shared(FileSystem::Default(), false); + + bool use_direct_io = false; + if (std::get<0>(GetParam())) { + use_direct_io = true; + } + + std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); + Options options; + SetGenericOptions(env.get(), use_direct_io, options); + options.statistics = CreateDBStatistics(); + BlockBasedTableOptions table_options; + SetBlockBasedTableOptions(table_options); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + Random rnd(309); + WriteBatch batch; + + for (int i = 0; i < 26; i++) { + std::string key = "my_key_"; + + for (int j = 0; j < 10; j++) { + key += char('a' + i); + ASSERT_OK(batch.Put(key, rnd.RandomString(1000))); + } + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + std::string start_key = "my_key_a"; + + std::string end_key = "my_key_"; + for (int j = 0; j < 10; j++) { + end_key += char('a' + 25); + } + + Slice least(start_key.data(), start_key.size()); + Slice greatest(end_key.data(), end_key.size()); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest)); + + s = TryReopen(options); + ASSERT_OK(s); + + int buff_count_with_tuning = 0; + + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::PrefetchAsyncInternal:Start", + [&](void*) { buff_count_with_tuning++; }); + + bool read_async_called = false; + SyncPoint::GetInstance()->SetCallBack( + "UpdateResults::io_uring_result", + [&](void* /*arg*/) { read_async_called = true; }); + + SyncPoint::GetInstance()->EnableProcessing(); + + SyncPoint::GetInstance()->EnableProcessing(); + + ReadOptions ropts; + if (std::get<1>(GetParam())) { + ropts.readahead_size = 32768; + } + ropts.async_io = true; + + Slice ub = Slice("my_key_aaa"); + ropts.iterate_upper_bound = &ub; + Slice seek_key = Slice("my_key_aaa"); + + // With tuning readahead_size. + { + ASSERT_OK(options.statistics->Reset()); + ropts.auto_readahead_size = true; + + auto iter = std::unique_ptr(db_->NewIterator(ropts)); + + iter->Seek(seek_key); + + ASSERT_OK(iter->status()); + + // Verify results. + uint64_t readhahead_trimmed = + options.statistics->getAndResetTickerCount(READAHEAD_TRIMMED); + // Readahead got trimmed. + if (read_async_called) { + ASSERT_GT(readhahead_trimmed, 0); + // Seek called PrefetchAsync to poll the data. + ASSERT_EQ(1, buff_count_with_tuning); + } else { + // async_io disabled. + ASSERT_GE(readhahead_trimmed, 0); + ASSERT_EQ(0, buff_count_with_tuning); + } + } + Close(); +} + namespace { #ifdef GFLAGS const int kMaxArgCount = 100;