From db2b4eb50ea4d371dc48a0d0ed88a594f7abe616 Mon Sep 17 00:00:00 2001 From: Aaron Gao Date: Thu, 16 Feb 2017 10:25:06 -0800 Subject: [PATCH] avoid direct io in rocksdb_lite Summary: fix lite bugs disable direct io in lite mode Closes https://github.com/facebook/rocksdb/pull/1870 Differential Revision: D4559866 Pulled By: yiwu-arbug fbshipit-source-id: 3761c51 --- db/db_flush_test.cc | 2 + db/repair_test.cc | 3 + db/version_set.cc | 5 +- include/rocksdb/options.h | 2 + util/env_posix.cc | 12 +++ util/env_test.cc | 138 +++++++++++++++++--------------- util/file_reader_writer.cc | 59 +++----------- util/file_reader_writer.h | 2 + util/file_reader_writer_test.cc | 2 + 9 files changed, 111 insertions(+), 114 deletions(-) diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 3bacb513f..e7e000a8a 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -74,7 +74,9 @@ TEST_F(DBFlushTest, SyncFail) { TEST_SYNC_POINT("DBFlushTest::SyncFail:2"); fault_injection_env->SetFilesystemActive(true); dbfull()->TEST_WaitForFlushMemTable(); +#ifndef ROCKSDB_LITE ASSERT_EQ("", FilesPerLevel()); // flush failed. +#endif // ROCKSDB_LITE // Flush job should release ref count to current version. ASSERT_EQ(refs_before, cfd->current()->TEST_refs()); Destroy(options); diff --git a/db/repair_test.cc b/db/repair_test.cc index 9db9d1292..3d4bf11f5 100644 --- a/db/repair_test.cc +++ b/db/repair_test.cc @@ -19,6 +19,7 @@ namespace rocksdb { +#ifndef ROCKSDB_LITE class RepairTest : public DBTestBase { public: RepairTest() : DBTestBase("/repair_test") {} @@ -273,6 +274,8 @@ TEST_F(RepairTest, RepairColumnFamilyOptions) { } } } + +#endif // ROCKSDB_LITE } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/version_set.cc b/db/version_set.cc index ffcd60241..61e49e771 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2361,9 +2361,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, // If we just created a new descriptor file, install it by writing a // new CURRENT file that points to it. if (s.ok() && new_descriptor_log) { - s = SetCurrentFile( - env_, dbname_, pending_manifest_file_number_, - db_directory); + s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_, + db_directory); } if (s.ok()) { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 82a97fe75..800140dc1 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1121,10 +1121,12 @@ struct DBOptions { // Use O_DIRECT for reading file // Default: false + // Not supported in ROCKSDB_LITE mode! bool use_direct_reads = false; // Use O_DIRECT for writing file // Default: false + // Not supported in ROCKSDB_LITE mode! bool use_direct_writes = false; // If false, fallocate() calls are bypassed diff --git a/util/env_posix.cc b/util/env_posix.cc index 0103e9ed1..a913329a8 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -156,6 +156,9 @@ class PosixEnv : public Env { FILE* file = nullptr; if (options.use_direct_reads && !options.use_mmap_reads) { +#ifdef ROCKSDB_LITE + return Status::IOError(fname, "Direct I/O not supported in RocksDB lite"); +#endif // !ROCKSDB_LITE #ifndef OS_MACOSX flags |= O_DIRECT; #endif @@ -200,6 +203,9 @@ class PosixEnv : public Env { int fd; int flags = O_RDONLY; if (options.use_direct_reads && !options.use_mmap_reads) { +#ifdef ROCKSDB_LITE + return Status::IOError(fname, "Direct I/O not supported in RocksDB lite"); +#endif // !ROCKSDB_LITE #ifndef OS_MACOSX flags |= O_DIRECT; TEST_SYNC_POINT_CALLBACK("NewRandomAccessFile:O_DIRECT", &flags); @@ -261,6 +267,9 @@ class PosixEnv : public Env { // appends data to the end of the file, regardless of the value of // offset. // More info here: https://linux.die.net/man/2/pwrite +#ifdef ROCKSDB_LITE + return Status::IOError(fname, "Direct I/O not supported in RocksDB lite"); +#endif // !ROCKSDB_LITE flags |= O_WRONLY; #ifndef OS_MACOSX flags |= O_DIRECT; @@ -325,6 +334,9 @@ class PosixEnv : public Env { int flags = 0; // Direct IO mode with O_DIRECT flag or F_NOCAHCE (MAC OSX) if (options.use_direct_writes && !options.use_mmap_writes) { +#ifdef ROCKSDB_LITE + return Status::IOError(fname, "Direct I/O not supported in RocksDB lite"); +#endif // !ROCKSDB_LITE flags |= O_WRONLY; #ifndef OS_MACOSX flags |= O_DIRECT; diff --git a/util/env_test.cc b/util/env_test.cc index f5ca32621..643a94e94 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -88,13 +88,19 @@ class EnvPosixTest : public testing::Test { public: Env* env_; - EnvPosixTest() : env_(Env::Default()) { } + bool direct_io_; + EnvPosixTest() : env_(Env::Default()), direct_io_(false) {} }; -class EnvPosixTestWithParam : public EnvPosixTest, - public ::testing::WithParamInterface { +class EnvPosixTestWithParam + : public EnvPosixTest, + public ::testing::WithParamInterface> { public: - EnvPosixTestWithParam() { env_ = GetParam(); } + EnvPosixTestWithParam() { + std::pair param_pair = GetParam(); + env_ = param_pair.first; + direct_io_ = param_pair.second; + } void WaitThreadPoolsEmpty() { // Wait until the thread pools are empty. @@ -678,46 +684,48 @@ class IoctlFriendlyTmpdir { std::string dir_; }; -TEST_F(EnvPosixTest, PositionedAppend) { - unique_ptr writable_file; - - EnvOptions options; - options.use_direct_writes = true; - options.use_mmap_writes = false; - IoctlFriendlyTmpdir ift; - ASSERT_OK(env_->NewWritableFile(ift.name() + "/f", &writable_file, options)); - - const size_t kBlockSize = 512; - const size_t kPageSize = 4096; - const size_t kDataSize = kPageSize; - // Write a page worth of 'a' - auto data_ptr = NewAligned(kDataSize, 'a'); - Slice data_a(data_ptr.get(), kDataSize); - ASSERT_OK(writable_file->PositionedAppend(data_a, 0U)); - // Write a page worth of 'b' right after the first sector - data_ptr = NewAligned(kDataSize, 'b'); - Slice data_b(data_ptr.get(), kDataSize); - ASSERT_OK(writable_file->PositionedAppend(data_b, kBlockSize)); - ASSERT_OK(writable_file->Close()); - // The file now has 1 sector worth of a followed by a page worth of b - - // Verify the above - unique_ptr seq_file; - ASSERT_OK(env_->NewSequentialFile(ift.name() + "/f", &seq_file, options)); - char scratch[kPageSize * 2]; - Slice result; - ASSERT_OK(seq_file->Read(sizeof(scratch), &result, scratch)); - ASSERT_EQ(kPageSize + kBlockSize, result.size()); - ASSERT_EQ('a', result[kBlockSize - 1]); - ASSERT_EQ('b', result[kBlockSize]); +TEST_P(EnvPosixTestWithParam, PositionedAppend) { + if (direct_io_ && env_ == Env::Default()) { + unique_ptr writable_file; + EnvOptions options; + options.use_direct_writes = direct_io_; + options.use_mmap_writes = false; + IoctlFriendlyTmpdir ift; + ASSERT_OK( + env_->NewWritableFile(ift.name() + "/f", &writable_file, options)); + + const size_t kBlockSize = 512; + const size_t kPageSize = 4096; + const size_t kDataSize = kPageSize; + // Write a page worth of 'a' + auto data_ptr = NewAligned(kDataSize, 'a'); + Slice data_a(data_ptr.get(), kDataSize); + ASSERT_OK(writable_file->PositionedAppend(data_a, 0U)); + // Write a page worth of 'b' right after the first sector + data_ptr = NewAligned(kDataSize, 'b'); + Slice data_b(data_ptr.get(), kDataSize); + ASSERT_OK(writable_file->PositionedAppend(data_b, kBlockSize)); + ASSERT_OK(writable_file->Close()); + // The file now has 1 sector worth of a followed by a page worth of b + + // Verify the above + unique_ptr seq_file; + ASSERT_OK(env_->NewSequentialFile(ift.name() + "/f", &seq_file, options)); + char scratch[kPageSize * 2]; + Slice result; + ASSERT_OK(seq_file->Read(sizeof(scratch), &result, scratch)); + ASSERT_EQ(kPageSize + kBlockSize, result.size()); + ASSERT_EQ('a', result[kBlockSize - 1]); + ASSERT_EQ('b', result[kBlockSize]); + } } // Only works in linux platforms -TEST_F(EnvPosixTest, RandomAccessUniqueID) { - for (bool directio : {true, false}) { - // Create file. +TEST_P(EnvPosixTestWithParam, RandomAccessUniqueID) { + // Create file. + if (env_ == Env::Default()) { EnvOptions soptions; - soptions.use_direct_reads = soptions.use_direct_writes = directio; + soptions.use_direct_reads = soptions.use_direct_writes = direct_io_; IoctlFriendlyTmpdir ift; std::string fname = ift.name() + "/testfile"; unique_ptr wfile; @@ -758,8 +766,8 @@ TEST_F(EnvPosixTest, RandomAccessUniqueID) { // only works in linux platforms #ifdef ROCKSDB_FALLOCATE_PRESENT -TEST_F(EnvPosixTest, AllocateTest) { - for (bool directio : {true, false}) { +TEST_P(EnvPosixTestWithParam, AllocateTest) { + if (env_ == Env::Default()) { IoctlFriendlyTmpdir ift; std::string fname = ift.name() + "/preallocate_testfile"; @@ -789,7 +797,7 @@ TEST_F(EnvPosixTest, AllocateTest) { EnvOptions soptions; soptions.use_mmap_writes = false; - soptions.use_direct_reads = soptions.use_direct_writes = directio; + soptions.use_direct_reads = soptions.use_direct_writes = direct_io_; unique_ptr wfile; ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); @@ -846,11 +854,11 @@ bool HasPrefix(const std::unordered_set& ss) { } // Only works in linux and WIN platforms -TEST_F(EnvPosixTest, RandomAccessUniqueIDConcurrent) { - for (bool directio : {true, false}) { +TEST_P(EnvPosixTestWithParam, RandomAccessUniqueIDConcurrent) { + if (env_ == Env::Default()) { // Check whether a bunch of concurrently existing files have unique IDs. EnvOptions soptions; - soptions.use_direct_reads = soptions.use_direct_writes = directio; + soptions.use_direct_reads = soptions.use_direct_writes = direct_io_; // Create the files IoctlFriendlyTmpdir ift; @@ -888,10 +896,10 @@ TEST_F(EnvPosixTest, RandomAccessUniqueIDConcurrent) { } // Only works in linux and WIN platforms -TEST_F(EnvPosixTest, RandomAccessUniqueIDDeletes) { - for (bool directio : {true, false}) { +TEST_P(EnvPosixTestWithParam, RandomAccessUniqueIDDeletes) { + if (env_ == Env::Default()) { EnvOptions soptions; - soptions.use_direct_reads = soptions.use_direct_writes = directio; + soptions.use_direct_reads = soptions.use_direct_writes = direct_io_; IoctlFriendlyTmpdir ift; std::string fname = ift.name() + "/" + "testfile"; @@ -935,9 +943,8 @@ TEST_P(EnvPosixTestWithParam, DISABLED_InvalidateCache) { TEST_P(EnvPosixTestWithParam, InvalidateCache) { #endif rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - for (bool directio : {true, false}) { EnvOptions soptions; - soptions.use_direct_reads = soptions.use_direct_writes = directio; + soptions.use_direct_reads = soptions.use_direct_writes = direct_io_; std::string fname = test::TmpDir(env_) + "/" + "testfile"; const size_t kSectorSize = 512; @@ -997,7 +1004,6 @@ TEST_P(EnvPosixTestWithParam, InvalidateCache) { } // Delete the file ASSERT_OK(env_->DeleteFile(fname)); - } rocksdb::SyncPoint::GetInstance()->ClearTrace(); } #endif // not TRAVIS @@ -1121,12 +1127,10 @@ TEST_P(EnvPosixTestWithParam, LogBufferMaxSizeTest) { TEST_P(EnvPosixTestWithParam, Preallocation) { rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - - for (bool directio : {true, false}) { const std::string src = test::TmpDir(env_) + "/" + "testfile"; unique_ptr srcfile; EnvOptions soptions; - soptions.use_direct_reads = soptions.use_direct_writes = directio; + soptions.use_direct_reads = soptions.use_direct_writes = direct_io_; #if !defined(OS_MACOSX) && !defined(OS_WIN) if (soptions.use_direct_writes) { rocksdb::SyncPoint::GetInstance()->SetCallBack( @@ -1172,7 +1176,6 @@ TEST_P(EnvPosixTestWithParam, Preallocation) { srcfile->GetPreallocationStatus(&block_size, &last_allocated_block); ASSERT_EQ(last_allocated_block, 7UL); } - } rocksdb::SyncPoint::GetInstance()->ClearTrace(); } @@ -1180,9 +1183,8 @@ TEST_P(EnvPosixTestWithParam, Preallocation) { // individually) behave consistently. TEST_P(EnvPosixTestWithParam, ConsistentChildrenAttributes) { rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - for (bool directio : {true, false}) { EnvOptions soptions; - soptions.use_direct_reads = soptions.use_direct_writes = directio; + soptions.use_direct_reads = soptions.use_direct_writes = direct_io_; const int kNumChildren = 10; std::string data; @@ -1224,7 +1226,6 @@ TEST_P(EnvPosixTestWithParam, ConsistentChildrenAttributes) { ASSERT_EQ(size, 512 * i); ASSERT_EQ(size, file_attrs_iter->size_bytes); } - } rocksdb::SyncPoint::GetInstance()->ClearTrace(); } @@ -1462,13 +1463,24 @@ TEST_P(EnvPosixTestWithParam, PosixRandomRWFileRandomized) { env_->DeleteFile(path); } -INSTANTIATE_TEST_CASE_P(DefaultEnv, EnvPosixTestWithParam, - ::testing::Values(Env::Default())); +INSTANTIATE_TEST_CASE_P(DefaultEnvWithoutDirectIO, EnvPosixTestWithParam, + ::testing::Values(std::pair(Env::Default(), + false))); +#if !defined(ROCKSDB_LITE) +INSTANTIATE_TEST_CASE_P(DefaultEnvWithDirectIO, EnvPosixTestWithParam, + ::testing::Values(std::pair(Env::Default(), + true))); +#endif // !defined(ROCKSDB_LITE) + #if !defined(ROCKSDB_LITE) && !defined(OS_WIN) static unique_ptr chroot_env(NewChrootEnv(Env::Default(), test::TmpDir(Env::Default()))); -INSTANTIATE_TEST_CASE_P(ChrootEnv, EnvPosixTestWithParam, - ::testing::Values(chroot_env.get())); +INSTANTIATE_TEST_CASE_P( + ChrootEnvWithoutDirectIO, EnvPosixTestWithParam, + ::testing::Values(std::pair(chroot_env.get(), false))); +INSTANTIATE_TEST_CASE_P( + ChrootEnvWithDirectIO, EnvPosixTestWithParam, + ::testing::Values(std::pair(chroot_env.get(), true))); #endif // !defined(ROCKSDB_LITE) && !defined(OS_WIN) } // namespace rocksdb diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index 877d48b25..642864598 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -24,6 +24,7 @@ namespace rocksdb { Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { Status s; if (use_direct_io()) { +#ifndef ROCKSDB_LITE size_t offset = offset_.fetch_add(n); size_t alignment = file_->GetRequiredBufferAlignment(); size_t aligned_offset = TruncateToPageBoundary(alignment, offset); @@ -41,6 +42,7 @@ Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { std::min(tmp.size() - offset_advance, n)); } *result = Slice(scratch, r); +#endif // !ROCKSDB_LITE } else { s = file_->Read(n, result, scratch); } @@ -48,37 +50,15 @@ Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { return s; } -Status SequentialFileReader::DirectRead(size_t n, Slice* result, - char* scratch) { - size_t offset = offset_.fetch_add(n); - size_t alignment = file_->GetRequiredBufferAlignment(); - size_t aligned_offset = TruncateToPageBoundary(alignment, offset); - size_t offset_advance = offset - aligned_offset; - size_t size = Roundup(offset + n, alignment) - aligned_offset; - AlignedBuffer buf; - buf.Alignment(alignment); - buf.AllocateNewBuffer(size); - Slice tmp; - Status s = - file_->PositionedRead(aligned_offset, size, &tmp, buf.BufferStart()); - if (s.ok()) { - buf.Size(tmp.size()); - size_t r = buf.Read(scratch, offset_advance, - tmp.size() <= offset_advance - ? 0 - : std::min(tmp.size() - offset_advance, n)); - *result = Slice(scratch, r); - } - return s; -} Status SequentialFileReader::Skip(uint64_t n) { +#ifndef ROCKSDB_LITE if (use_direct_io()) { offset_ += n; return Status::OK(); - } else { - return file_->Skip(n); } +#endif // !ROCKSDB_LITE + return file_->Skip(n); } Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, @@ -90,6 +70,7 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, (stats_ != nullptr) ? &elapsed : nullptr); IOSTATS_TIMER_GUARD(read_nanos); if (use_direct_io()) { +#ifndef ROCKSDB_LITE size_t alignment = file_->GetRequiredBufferAlignment(); size_t aligned_offset = TruncateToPageBoundary(alignment, offset); size_t offset_advance = offset - aligned_offset; @@ -106,6 +87,7 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, std::min(tmp.size() - offset_advance, n)); } *result = Slice(scratch, r); +#endif // !ROCKSDB_LITE } else { s = file_->Read(offset, n, result, scratch); } @@ -117,28 +99,6 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, return s; } -Status RandomAccessFileReader::DirectRead(uint64_t offset, size_t n, - Slice* result, char* scratch) const { - size_t alignment = file_->GetRequiredBufferAlignment(); - size_t aligned_offset = TruncateToPageBoundary(alignment, offset); - size_t offset_advance = offset - aligned_offset; - size_t size = Roundup(offset + n, alignment) - aligned_offset; - AlignedBuffer buf; - buf.Alignment(alignment); - buf.AllocateNewBuffer(size); - Slice tmp; - Status s = file_->Read(aligned_offset, size, &tmp, buf.BufferStart()); - if (s.ok()) { - buf.Size(tmp.size()); - size_t r = buf.Read(scratch, offset_advance, - tmp.size() <= offset_advance - ? 0 - : std::min(tmp.size() - offset_advance, n)); - *result = Slice(scratch, r); - } - return s; -} - Status WritableFileWriter::Append(const Slice& data) { const char* src = data.data(); size_t left = data.size(); @@ -252,7 +212,9 @@ Status WritableFileWriter::Flush() { if (buf_.CurrentSize() > 0) { if (direct_io_) { +#ifndef ROCKSDB_LITE s = WriteDirect(); +#endif // !ROCKSDB_LITE } else { s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize()); } @@ -401,6 +363,7 @@ Status WritableFileWriter::WriteBuffered(const char* data, size_t size) { // whole number of pages to be written again on the next flush because we can // only write on aligned // offsets. +#ifndef ROCKSDB_LITE Status WritableFileWriter::WriteDirect() { Status s; @@ -460,7 +423,7 @@ Status WritableFileWriter::WriteDirect() { } return s; } - +#endif // !ROCKSDB_LITE namespace { class ReadaheadRandomAccessFile : public RandomAccessFile { diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index de9f2bf2e..4d2b65dac 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -170,7 +170,9 @@ class WritableFileWriter { private: // Used when os buffering is OFF and we are writing // DMA such as in Direct I/O mode +#ifndef ROCKSDB_LITE Status WriteDirect(); +#endif // !ROCKSDB_LITE // Normal write Status WriteBuffered(const char* data, size_t size); Status RangeSync(uint64_t offset, uint64_t nbytes); diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index bcaad0f48..af6b825f6 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -85,6 +85,7 @@ TEST_F(WritableFileWriterTest, RangeSync) { writer->Close(); } +#ifndef ROCKSDB_LITE TEST_F(WritableFileWriterTest, AppendStatusReturn) { class FakeWF : public WritableFile { public: @@ -124,6 +125,7 @@ TEST_F(WritableFileWriterTest, AppendStatusReturn) { dynamic_cast(writer->writable_file())->SetIOError(true); ASSERT_NOK(writer->Append(std::string(2 * kMb, 'b'))); } +#endif } // namespace rocksdb -- GitLab