提交 eac6b6d0 编写于 作者: A anand76 提交者: Facebook GitHub Bot

Ignore async_io ReadOption if FileSystem doesn't support it (#11296)

Summary:
In PosixFileSystem, IO uring support is opt-in. If the support is not enabled by the user, then ignore the async_io ReadOption in MultiGet and iteration at the top, rather than follow the async_io codepath and transparently switch to sync IO at the FileSystem layer.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11296

Test Plan: Add new unit tests

Reviewed By: akankshamahajan15

Differential Revision: D44045776

Pulled By: anand1976

fbshipit-source-id: a0881bf763ca2fde50b84063d0068bb521edd8b9
上级 a72d55c9
......@@ -2,6 +2,7 @@
## Unreleased
### Behavior changes
* Compaction output file cutting logic now considers range tombstone start keys. For example, SST partitioner now may receive ParitionRequest for range tombstone start keys.
* If the async_io ReadOption is specified for MultiGet or NewIterator on a platform that doesn't support IO uring, the option is ignored and synchronous IO is used.
### Bug Fixes
* Fixed an issue for backward iteration when user defined timestamp is enabled in combination with BlobDB.
......
......@@ -47,6 +47,9 @@ void ArenaWrappedDBIter::Init(
read_options_ = read_options;
allow_refresh_ = allow_refresh;
memtable_range_tombstone_iter_ = nullptr;
if (!env->GetFileSystem()->use_async_io()) {
read_options_.async_io = false;
}
}
Status ArenaWrappedDBIter::Refresh() {
......
......@@ -2302,9 +2302,7 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL0) {
ASSERT_EQ(multiget_io_batch_size.count, 3);
}
#else // ROCKSDB_IOURING_PRESENT
if (GetParam()) {
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
}
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 0);
#endif // ROCKSDB_IOURING_PRESENT
}
......@@ -2338,16 +2336,18 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1) {
ASSERT_EQ(values[1], "val_l1_" + std::to_string(54));
ASSERT_EQ(values[2], "val_l1_" + std::to_string(102));
#ifdef ROCKSDB_IOURING_PRESENT
HistogramData multiget_io_batch_size;
statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
#ifdef ROCKSDB_IOURING_PRESENT
// A batch of 3 async IOs is expected, one for each overlapping file in L1
ASSERT_EQ(multiget_io_batch_size.count, 1);
ASSERT_EQ(multiget_io_batch_size.max, 3);
#endif // ROCKSDB_IOURING_PRESENT
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
#else // ROCKSDB_IOURING_PRESENT
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 0);
#endif // ROCKSDB_IOURING_PRESENT
}
#ifdef ROCKSDB_IOURING_PRESENT
......@@ -2531,8 +2531,12 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) {
ASSERT_EQ(values[0], "val_l2_" + std::to_string(19));
ASSERT_EQ(values[1], "val_l2_" + std::to_string(26));
#ifdef ROCKSDB_IOURING_PRESENT
// Bloom filters in L0/L1 will avoid the coroutine calls in those levels
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2);
#else // ROCKSDB_IOURING_PRESENT
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 0);
#endif // ROCKSDB_IOURING_PRESENT
}
#ifdef ROCKSDB_IOURING_PRESENT
......@@ -2623,18 +2627,17 @@ TEST_P(DBMultiGetAsyncIOTest, GetNoIOUring) {
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data());
ASSERT_EQ(values.size(), 3);
ASSERT_EQ(statuses[0], Status::NotSupported());
ASSERT_EQ(statuses[1], Status::NotSupported());
ASSERT_EQ(statuses[2], Status::NotSupported());
ASSERT_EQ(statuses[0], Status::OK());
ASSERT_EQ(statuses[1], Status::OK());
ASSERT_EQ(statuses[2], Status::OK());
HistogramData multiget_io_batch_size;
HistogramData async_read_bytes;
statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
statistics()->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
// A batch of 3 async IOs is expected, one for each overlapping file in L1
ASSERT_EQ(multiget_io_batch_size.count, 1);
ASSERT_EQ(multiget_io_batch_size.max, 3);
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
ASSERT_EQ(async_read_bytes.count, 0);
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 0);
}
INSTANTIATE_TEST_CASE_P(DBMultiGetAsyncIOTest, DBMultiGetAsyncIOTest,
......
......@@ -238,6 +238,9 @@ ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options,
if (sv_) {
RebuildIterators(false);
}
if (!cfd_->ioptions()->env->GetFileSystem()->use_async_io()) {
read_options_.async_io = false;
}
// immutable_status_ is a local aggregation of the
// status of the immutable Iterators.
......
......@@ -122,7 +122,7 @@ class ForwardIterator : public InternalIterator {
void DeleteIterator(InternalIterator* iter, bool is_arena = false);
DBImpl* const db_;
const ReadOptions read_options_;
ReadOptions read_options_;
ColumnFamilyData* const cfd_;
const SliceTransform* const prefix_extractor_;
const Comparator* user_comparator_;
......
......@@ -2121,7 +2121,8 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
max_file_size_for_l0_meta_pin_(
MaxFileSizeForL0MetaPin(mutable_cf_options_)),
version_number_(version_number),
io_tracer_(io_tracer) {}
io_tracer_(io_tracer),
use_async_io_(env_->GetFileSystem()->use_async_io()) {}
Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
const Slice& blob_index_slice,
......@@ -2505,7 +2506,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
MultiGetRange keys_with_blobs_range(*range, range->begin(), range->end());
#if USE_COROUTINES
if (read_options.async_io && read_options.optimize_multiget_for_io &&
using_coroutines()) {
using_coroutines() && use_async_io_) {
s = MultiGetAsync(read_options, range, &blob_ctxs);
} else
#endif // USE_COROUTINES
......@@ -2531,7 +2532,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
// Avoid using the coroutine version if we're looking in a L0 file, since
// L0 files won't be parallelized anyway. The regular synchronous version
// is faster.
if (!read_options.async_io || !using_coroutines() ||
if (!read_options.async_io || !using_coroutines() || !use_async_io_ ||
fp.GetHitFileLevel() == 0 || !fp.RemainingOverlapInLevel()) {
if (f) {
bool skip_filters =
......
......@@ -1075,6 +1075,7 @@ class Version {
// used for debugging and logging purposes only.
uint64_t version_number_;
std::shared_ptr<IOTracer> io_tracer_;
bool use_async_io_;
Version(ColumnFamilyData* cfd, VersionSet* vset, const FileOptions& file_opt,
MutableCFOptions mutable_cf_options,
......
......@@ -1183,6 +1183,14 @@ class PosixFileSystem : public FileSystem {
#endif
}
bool use_async_io() override {
#if defined(ROCKSDB_IOURING_PRESENT)
return IsIOUringEnabled();
#else
return false;
#endif
}
#if defined(ROCKSDB_IOURING_PRESENT)
// io_uring instance
std::unique_ptr<ThreadLocalPtr> thread_local_io_urings_;
......
......@@ -13,6 +13,11 @@
#endif
#include "util/random.h"
namespace {
static bool enable_io_uring = true;
extern "C" bool RocksDbIOUringEnable() { return enable_io_uring; }
} // namespace
namespace ROCKSDB_NAMESPACE {
class MockFS;
......@@ -1179,6 +1184,104 @@ TEST_P(PrefetchTest, DBIterLevelReadAheadWithAsyncIO) {
Close();
}
TEST_P(PrefetchTest, DBIterAsyncIONoIOUring) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
return;
}
const int kNumKeys = 1000;
// Set options
bool use_direct_io = std::get<0>(GetParam());
bool is_adaptive_readahead = std::get<1>(GetParam());
Options options;
SetGenericOptions(Env::Default(), use_direct_io, options);
options.statistics = CreateDBStatistics();
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
enable_io_uring = false;
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
// If direct IO is not supported, skip the test
return;
} else {
ASSERT_OK(s);
}
WriteBatch batch;
Random rnd(309);
int total_keys = 0;
for (int j = 0; j < 5; j++) {
for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
total_keys++;
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
}
MoveFilesToLevel(2);
// Test - Iterate over the keys sequentially.
{
ReadOptions ro;
if (is_adaptive_readahead) {
ro.adaptive_readahead = true;
}
ro.async_io = true;
ASSERT_OK(options.statistics->Reset());
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
num_keys++;
}
ASSERT_EQ(num_keys, total_keys);
// Check stats to make sure async prefetch is done.
{
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
ASSERT_EQ(async_read_bytes.count, 0);
ASSERT_EQ(options.statistics->getTickerCount(READ_ASYNC_MICROS), 0);
}
}
{
ReadOptions ro;
if (is_adaptive_readahead) {
ro.adaptive_readahead = true;
}
ro.async_io = true;
ro.tailing = true;
ASSERT_OK(options.statistics->Reset());
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
num_keys++;
}
ASSERT_EQ(num_keys, total_keys);
// Check stats to make sure async prefetch is done.
{
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
ASSERT_EQ(async_read_bytes.count, 0);
ASSERT_EQ(options.statistics->getTickerCount(READ_ASYNC_MICROS), 0);
}
}
Close();
enable_io_uring = true;
}
class PrefetchTest1 : public DBTestBase,
public ::testing::WithParamInterface<bool> {
public:
......@@ -1527,8 +1630,6 @@ TEST_P(PrefetchTest1, SeekParallelizationTest) {
Close();
}
extern "C" bool RocksDbIOUringEnable() { return true; }
namespace {
#ifdef GFLAGS
const int kMaxArgCount = 100;
......@@ -1647,7 +1748,8 @@ TEST_P(PrefetchTest, ReadAsyncWithPosixFS) {
} else {
// Not all platforms support iouring. In that case, ReadAsync in posix
// won't submit async requests.
ASSERT_EQ(iter->status(), Status::NotSupported());
ASSERT_EQ(num_keys, total_keys);
ASSERT_EQ(buff_prefetch_count, 0);
}
}
......@@ -1760,18 +1862,19 @@ TEST_P(PrefetchTest, MultipleSeekWithPosixFS) {
iter->Next();
}
if (read_async_called) {
ASSERT_OK(iter->status());
ASSERT_EQ(num_keys, num_keys_first_batch);
// Check stats to make sure async prefetch is done.
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
if (read_async_called) {
ASSERT_GT(async_read_bytes.count, 0);
ASSERT_GT(get_perf_context()->number_async_seek, 0);
} else {
// Not all platforms support iouring. In that case, ReadAsync in posix
// won't submit async requests.
ASSERT_EQ(iter->status(), Status::NotSupported());
ASSERT_EQ(async_read_bytes.count, 0);
ASSERT_EQ(get_perf_context()->number_async_seek, 0);
}
}
......@@ -1788,25 +1891,26 @@ TEST_P(PrefetchTest, MultipleSeekWithPosixFS) {
iter->Next();
}
if (read_async_called) {
ASSERT_OK(iter->status());
ASSERT_EQ(num_keys, num_keys_second_batch);
ASSERT_GT(buff_prefetch_count, 0);
// Check stats to make sure async prefetch is done.
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
HistogramData prefetched_bytes_discarded;
options.statistics->histogramData(PREFETCHED_BYTES_DISCARDED,
&prefetched_bytes_discarded);
ASSERT_GT(prefetched_bytes_discarded.count, 0);
if (read_async_called) {
ASSERT_GT(buff_prefetch_count, 0);
// Check stats to make sure async prefetch is done.
ASSERT_GT(async_read_bytes.count, 0);
ASSERT_GT(get_perf_context()->number_async_seek, 0);
ASSERT_GT(prefetched_bytes_discarded.count, 0);
} else {
// Not all platforms support iouring. In that case, ReadAsync in posix
// won't submit async requests.
ASSERT_EQ(iter->status(), Status::NotSupported());
ASSERT_EQ(async_read_bytes.count, 0);
ASSERT_EQ(get_perf_context()->number_async_seek, 0);
}
}
}
......@@ -1885,9 +1989,6 @@ TEST_P(PrefetchTest, SeekParallelizationTestWithPosix) {
// Each block contains around 4 keys.
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
iter->Seek(BuildKey(0)); // Prefetch data because of seek parallelization.
if (std::get<1>(GetParam()) && !read_async_called) {
ASSERT_EQ(iter->status(), Status::NotSupported());
} else {
ASSERT_TRUE(iter->Valid());
iter->Next();
ASSERT_TRUE(iter->Valid());
......@@ -1910,13 +2011,10 @@ TEST_P(PrefetchTest, SeekParallelizationTestWithPosix) {
// Prefetch data.
iter->Next();
if (read_async_called) {
ASSERT_TRUE(iter->Valid());
// Check stats to make sure async prefetch is done.
{
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES,
&async_read_bytes);
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
if (read_async_called) {
ASSERT_GT(async_read_bytes.count, 0);
ASSERT_GT(get_perf_context()->number_async_seek, 0);
if (std::get<1>(GetParam())) {
......@@ -1924,12 +2022,11 @@ TEST_P(PrefetchTest, SeekParallelizationTestWithPosix) {
} else {
ASSERT_EQ(buff_prefetch_count, 2);
}
}
} else {
// Not all platforms support iouring. In that case, ReadAsync in posix
// won't submit async requests.
ASSERT_EQ(iter->status(), Status::NotSupported());
}
ASSERT_EQ(async_read_bytes.count, 0);
ASSERT_EQ(get_perf_context()->number_async_seek, 0);
}
}
Close();
......@@ -2023,17 +2120,17 @@ TEST_P(PrefetchTest, TraceReadAsyncWithCallbackWrapper) {
ASSERT_OK(db_->EndIOTrace());
ASSERT_OK(env_->FileExists(trace_file_path));
if (read_async_called) {
ASSERT_EQ(num_keys, total_keys);
ASSERT_GT(buff_prefetch_count, 0);
// Check stats to make sure async prefetch is done.
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
if (read_async_called) {
ASSERT_GT(buff_prefetch_count, 0);
// Check stats to make sure async prefetch is done.
ASSERT_GT(async_read_bytes.count, 0);
} else {
// Not all platforms support iouring. In that case, ReadAsync in posix
// won't submit async requests.
ASSERT_EQ(iter->status(), Status::NotSupported());
ASSERT_EQ(async_read_bytes.count, 0);
}
// Check the file to see if ReadAsync is logged.
......
......@@ -682,6 +682,10 @@ class FileSystem : public Customizable {
return IOStatus::OK();
}
// Indicates to upper layers whether the FileSystem supports/uses async IO
// or not
virtual bool use_async_io() { return true; }
// If you're adding methods here, remember to add them to EnvWrapper too.
private:
......@@ -1522,6 +1526,8 @@ class FileSystemWrapper : public FileSystem {
return target_->AbortIO(io_handles);
}
virtual bool use_async_io() override { return target_->use_async_io(); }
protected:
std::shared_ptr<FileSystem> target_;
};
......
......@@ -227,6 +227,7 @@ class WinFileSystem : public FileSystem {
const FileOptions& file_options) const override;
FileOptions OptimizeForManifestWrite(
const FileOptions& file_options) const override;
bool use_async_io() override { return false; }
protected:
static uint64_t FileTimeToUnixTime(const FILETIME& ftTime);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册