diff --git a/db/db_bench.cc b/db/db_bench.cc index ca05b2b207b569ebe2fc607122789000c2edb68b..a98e7c7d3a24e4ea5a62365d3083f1f99e84b824 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -898,6 +898,7 @@ class ReportFileOpEnv : public EnvWrapper { return rv; } + Status Truncate(uint64_t size) override { return target_->Truncate(size); } Status Close() override { return target_->Close(); } Status Flush() override { return target_->Flush(); } Status Sync() override { return target_->Sync(); } diff --git a/db/db_iter_test.cc b/db/db_iter_test.cc index 73ef91b6a925c1c30addfb96ef6fc8e52d0772e3..90ec90831e407ff42930645d873a180dd6d5b297 100644 --- a/db/db_iter_test.cc +++ b/db/db_iter_test.cc @@ -1884,6 +1884,8 @@ TEST_F(DBIterWithMergeIterTest, InnerMergeIterator2) { ASSERT_EQ(db_iter_->value().ToString(), "4"); } +#if !(defined NDEBUG) || !defined(OS_WIN) + TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace1) { // Test Prev() when one child iterator is at its end but more rows // are added. @@ -2234,6 +2236,7 @@ TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace8) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } +#endif // #if !(defined NDEBUG) || !defined(OS_WIN) } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/fault_injection_test.cc b/db/fault_injection_test.cc index 87028e381b29cacc763b57aa637be34a70e479f1..7ce18c8e71bdc4d4bcf4f33031f73e646ac79d0f 100644 --- a/db/fault_injection_test.cc +++ b/db/fault_injection_test.cc @@ -141,6 +141,7 @@ class TestWritableFile : public WritableFile { FaultInjectionTestEnv* env); virtual ~TestWritableFile(); virtual Status Append(const Slice& data) override; + virtual Status Truncate(uint64_t size) override { return target_->Truncate(size); } virtual Status Close() override; virtual Status Flush() override; virtual Status Sync() override; diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 093292f66d2e45f29db5fc581c98f04ad0f56bda..6300a47034c8638735a16f5290bcced29e2dd4ed 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -393,6 +393,12 @@ class RandomAccessFile { virtual Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const = 0; + // Used by the file_reader_writer to decide if the ReadAhead wrapper + // should simply forward the call and do not enact buffering or locking. + virtual bool ShouldForwardRawRequest() const { + return false; + } + // Tries to get an unique ID for this file that will be the same each time // the file is opened (and will stay the same while the file is open). // Furthermore, it tries to make this ID at most "max_size" bytes. If such an @@ -413,7 +419,6 @@ class RandomAccessFile { // compatibility. }; - enum AccessPattern { NORMAL, RANDOM, SEQUENTIAL, WILLNEED, DONTNEED }; virtual void Hint(AccessPattern pattern) {} @@ -438,7 +443,35 @@ class WritableFile { } virtual ~WritableFile(); + // Indicates if the class makes use of unbuffered I/O + virtual bool UseOSBuffer() const { + return true; + } + + const size_t c_DefaultPageSize = 4 * 1024; + + // This is needed when you want to allocate + // AlignedBuffer for use with file I/O classes + // Used for unbuffered file I/O when UseOSBuffer() returns false + virtual size_t GetRequiredBufferAlignment() const { + return c_DefaultPageSize; + } + virtual Status Append(const Slice& data) = 0; + + // Positioned write for unbuffered access default forward + // to simple append as most of the tests are buffered by default + virtual Status PositionedAppend(const Slice& /* data */, uint64_t /* offset */) { + return Status::NotSupported(); + } + + // Truncate is necessary to trim the file to the correct size + // before closing. It is not always possible to keep track of the file + // size due to whole pages writes. The behavior is undefined if called + // with other writes to follow. + virtual Status Truncate(uint64_t size) { + return Status::OK(); + } virtual Status Close() = 0; virtual Status Flush() = 0; virtual Status Sync() = 0; // sync data @@ -839,6 +872,10 @@ class WritableFileWrapper : public WritableFile { explicit WritableFileWrapper(WritableFile* t) : target_(t) { } Status Append(const Slice& data) override { return target_->Append(data); } + Status PositionedAppend(const Slice& data, uint64_t offset) override { + return target_->PositionedAppend(data, offset); + } + Status Truncate(uint64_t size) override { return target_->Truncate(size); } Status Close() override { return target_->Close(); } Status Flush() override { return target_->Flush(); } Status Sync() override { return target_->Sync(); } diff --git a/port/win/env_win.cc b/port/win/env_win.cc index 70c68b8fbc1177e7306798ddceacecf03425da9b..2668ad1d28f04a3650e858d0f4f45f107fe197d1 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -30,6 +30,7 @@ #include "util/iostats_context_imp.h" #include "util/rate_limiter.h" #include "util/sync_point.h" +#include "util/aligned_buffer.h" #include "util/thread_status_updater.h" #include "util/thread_status_util.h" @@ -161,15 +162,6 @@ inline int fsync(HANDLE hFile) { return 0; } -inline size_t TruncateToPageBoundary(size_t page_size, size_t s) { - s -= (s & (page_size - 1)); - assert((s % page_size) == 0); - return s; -} - -// Roundup x to a multiple of y -inline size_t Roundup(size_t x, size_t y) { return ((x + y - 1) / y) * y; } - // SetFileInformationByHandle() is capable of fast pre-allocates. // However, this does not change the file end position unless the file is // truncated and the pre-allocated space is not considered filled with zeros. @@ -492,7 +484,6 @@ class WinMmapFile : public WritableFile { size_t n = std::min(left, avail); memcpy(dst_, src, n); - IOSTATS_ADD(bytes_written, n); dst_ += n; src += n; left -= n; @@ -502,6 +493,12 @@ class WinMmapFile : public WritableFile { return Status::OK(); } + // Means Close() will properly take care of truncate + // and it does not need any additional information + virtual Status Truncate(uint64_t size) override { + return Status::OK(); + } + virtual Status Close() override { Status s; @@ -612,94 +609,6 @@ class WinMmapFile : public WritableFile { } }; -// This class is to manage an aligned user -// allocated buffer for unbuffered I/O purposes -// though it does not make a difference if you need a buffer. -class AlignedBuffer { - const size_t alignment_; - std::unique_ptr buf_; - size_t capacity_; - size_t cursize_; - char* bufstart_; - - public: - explicit AlignedBuffer(size_t alignment) - : alignment_(alignment), capacity_(0), cursize_(0), bufstart_(nullptr) { - assert(alignment > 0); - assert((alignment & (alignment - 1)) == 0); - } - - size_t GetAlignment() const { return alignment_; } - - size_t GetCapacity() const { return capacity_; } - - size_t GetCurrentSize() const { return cursize_; } - - const char* GetBufferStart() const { return bufstart_; } - - void Clear() { cursize_ = 0; } - - // Allocates a new buffer and sets bufstart_ to the aligned first byte - void AllocateNewBuffer(size_t requestedCapacity) { - size_t size = Roundup(requestedCapacity, alignment_); - buf_.reset(new char[size + alignment_]); - - char* p = buf_.get(); - bufstart_ = reinterpret_cast( - (reinterpret_cast(p) + (alignment_ - 1)) & - ~static_cast(alignment_ - 1)); - capacity_ = size; - cursize_ = 0; - } - - // Used for write - // Returns the number of bytes appended - size_t Append(const char* src, size_t append_size) { - size_t buffer_remaining = capacity_ - cursize_; - size_t to_copy = std::min(append_size, buffer_remaining); - - if (to_copy > 0) { - memcpy(bufstart_ + cursize_, src, to_copy); - cursize_ += to_copy; - } - return to_copy; - } - - size_t Read(char* dest, size_t offset, size_t read_size) const { - assert(offset < cursize_); - size_t to_read = std::min(cursize_ - offset, read_size); - if (to_read > 0) { - memcpy(dest, bufstart_ + offset, to_read); - } - return to_read; - } - - /// Pad to alignment - void PadToAlignmentWith(int padding) { - size_t total_size = Roundup(cursize_, alignment_); - size_t pad_size = total_size - cursize_; - - if (pad_size > 0) { - assert((pad_size + cursize_) <= capacity_); - memset(bufstart_ + cursize_, padding, pad_size); - cursize_ += pad_size; - } - } - - // After a partial flush move the tail to the beginning of the buffer - void RefitTail(size_t tail_offset, size_t tail_size) { - if (tail_size > 0) { - memmove(bufstart_, bufstart_ + tail_offset, tail_size); - } - cursize_ = tail_size; - } - - // Returns place to start writing - char* GetDestination() { return bufstart_ + cursize_; } - - void SetSize(size_t cursize) { cursize_ = cursize; } -}; - class WinSequentialFile : public SequentialFile { private: const std::string filename_; @@ -734,7 +643,7 @@ class WinSequentialFile : public SequentialFile { // Windows ReadFile API accepts a DWORD. // While it is possible to read in a loop if n is > UINT_MAX // it is a highly unlikely case. - if (n > UINT_MAX) { + if (n > UINT_MAX) { return IOErrorFromWindowsError(filename_, ERROR_INVALID_PARAMETER); } @@ -747,8 +656,6 @@ class WinSequentialFile : public SequentialFile { return IOErrorFromWindowsError(filename_, GetLastError()); } - IOSTATS_ADD(bytes_read, r); - *result = Slice(scratch, r); return s; @@ -791,12 +698,13 @@ class WinRandomAccessFile : public RandomAccessFile { : filename_(fname), hFile_(hFile), use_os_buffer_(options.use_os_buffer), - buffer_(alignment), + buffer_(), buffered_start_(0) { assert(!options.use_mmap_reads); // Unbuffered access, use internal buffer for reads if (!use_os_buffer_) { + buffer_.Alignment(alignment); // Random read, no need in a big buffer // We read things in database blocks which are likely to be similar to // the alignment we use. @@ -826,7 +734,7 @@ class WinRandomAccessFile : public RandomAccessFile { // Let's see if at least some of the requested data is already // in the buffer if (offset >= buffered_start_ && - offset < (buffered_start_ + buffer_.GetCurrentSize())) { + offset < (buffered_start_ + buffer_.CurrentSize())) { size_t buffer_offset = offset - buffered_start_; r = buffer_.Read(dest, buffer_offset, left); assert(r >= 0); @@ -839,7 +747,7 @@ class WinRandomAccessFile : public RandomAccessFile { // Still some left or none was buffered if (left > 0) { // Figure out the start/end offset for reading and amount to read - const size_t alignment = buffer_.GetAlignment(); + const size_t alignment = buffer_.Alignment(); const size_t start_page_start = TruncateToPageBoundary(alignment, offset); const size_t end_page_start = @@ -847,21 +755,18 @@ class WinRandomAccessFile : public RandomAccessFile { const size_t actual_bytes_toread = (end_page_start - start_page_start) + alignment; - if (buffer_.GetCapacity() < actual_bytes_toread) { + if (buffer_.Capacity() < actual_bytes_toread) { buffer_.AllocateNewBuffer(actual_bytes_toread); } else { buffer_.Clear(); } SSIZE_T read = 0; - { - IOSTATS_TIMER_GUARD(read_nanos); - read = pread(hFile_, buffer_.GetDestination(), actual_bytes_toread, - start_page_start); - } + read = pread(hFile_, buffer_.Destination(), actual_bytes_toread, + start_page_start); if (read > 0) { - buffer_.SetSize(read); + buffer_.Size(read); buffered_start_ = start_page_start; // Let's figure out how much we read from the users standpoint @@ -884,7 +789,6 @@ class WinRandomAccessFile : public RandomAccessFile { } } - IOSTATS_ADD_IF_POSITIVE(bytes_read, n - left); *result = Slice(scratch, (r < 0) ? 0 : n - left); if (r < 0) { @@ -893,6 +797,10 @@ class WinRandomAccessFile : public RandomAccessFile { return s; } + virtual bool ShouldForwardRawRequest() const override { + return true; + } + virtual void Hint(AccessPattern pattern) override {} virtual Status InvalidateCache(size_t offset, size_t length) override { @@ -915,33 +823,23 @@ class WinRandomAccessFile : public RandomAccessFile { class WinWritableFile : public WritableFile { private: const std::string filename_; - HANDLE hFile_; - AlignedBuffer buffer_; - - uint64_t filesize_; // How much data is actually written disk - uint64_t reservedsize_; // how far we have reserved space - - bool pending_sync_; - - RateLimiter* rate_limiter_; - - const bool use_os_buffer_; // Used to indicate unbuffered access, the file - // must be opened as unbuffered if false + HANDLE hFile_; + const bool use_os_buffer_; // Used to indicate unbuffered access, the file + const uint64_t alignment_; + // must be opened as unbuffered if false + uint64_t filesize_; // How much data is actually written disk + uint64_t reservedsize_; // how far we have reserved space public: WinWritableFile(const std::string& fname, HANDLE hFile, size_t alignment, size_t capacity, const EnvOptions& options) : filename_(fname), hFile_(hFile), - buffer_(alignment), + use_os_buffer_(options.use_os_buffer), + alignment_(alignment), filesize_(0), - reservedsize_(0), - pending_sync_(false), - rate_limiter_(options.rate_limiter), - use_os_buffer_(options.use_os_buffer) { + reservedsize_(0) { assert(!options.use_mmap_writes); - - buffer_.AllocateNewBuffer(capacity); } ~WinWritableFile() { @@ -950,106 +848,84 @@ class WinWritableFile : public WritableFile { } } - virtual Status Append(const Slice& data) override { - const char* src = data.data(); + // Indicates if the class makes use of unbuffered I/O + virtual bool UseOSBuffer() const override { + return use_os_buffer_; + } - assert(data.size() < INT_MAX); + virtual size_t GetRequiredBufferAlignment() const override { + return alignment_; + } - size_t left = data.size(); - Status s; - pending_sync_ = true; + virtual Status Append(const Slice& data) override { - // This would call Alloc() if we are out of blocks - PrepareWrite(GetFileSize(), left); + // Used for buffered access ONLY + assert(use_os_buffer_); + assert(data.size() < std::numeric_limits::max()); - // Flush only when I/O is buffered - if (use_os_buffer_ && - (buffer_.GetCapacity() - buffer_.GetCurrentSize()) < left) { - if (buffer_.GetCurrentSize() > 0) { - s = Flush(); - if (!s.ok()) { - return s; - } - } + Status s; - if (buffer_.GetCapacity() < c_OneMB) { - size_t desiredCapacity = buffer_.GetCapacity() * 2; - desiredCapacity = std::min(desiredCapacity, c_OneMB); - buffer_.AllocateNewBuffer(desiredCapacity); - } + DWORD bytesWritten = 0; + if (!WriteFile(hFile_, data.data(), + data.size(), &bytesWritten, NULL)) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError( + "Failed to WriteFile: " + filename_, + lastError); + } else { + assert(size_t(bytesWritten) == data.size()); + filesize_ += data.size(); } - // We always use the internal buffer for the unbuffered I/O - // or we simply use it for its original purpose to accumulate many small - // chunks - if (!use_os_buffer_ || (buffer_.GetCapacity() >= left)) { - while (left > 0) { - size_t appended = buffer_.Append(src, left); - left -= appended; - src += appended; + return s; + } - if (left > 0) { - s = Flush(); - if (!s.ok()) { - break; - } + virtual Status PositionedAppend(const Slice& data, uint64_t offset) override { + Status s; - size_t cursize = buffer_.GetCurrentSize(); - size_t capacity = buffer_.GetCapacity(); + SSIZE_T ret = pwrite(hFile_, data.data(), + data.size(), offset); - // We double the buffer here because - // Flush calls do not keep up with the incoming bytes - // This is the only place when buffer is changed with unbuffered I/O - if (cursize == 0 && capacity < c_OneMB) { - size_t desiredCapacity = capacity * 2; - desiredCapacity = std::min(desiredCapacity, c_OneMB); - buffer_.AllocateNewBuffer(desiredCapacity); - } - } - } + // Error break + if (ret < 0) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError( + "Failed to pwrite for: " + filename_, lastError); } else { - // Writing directly to file bypassing what is in the buffer - assert(buffer_.GetCurrentSize() == 0); - // Use rate limiter for normal I/O very large request if available - s = WriteBuffered(src, left); + // With positional write it is not clear at all + // if this actually extends the filesize + assert(size_t(ret) == data.size()); + filesize_ += data.size(); } + return s; + } + // Need to implement this so the file is truncated correctly + // when buffered and unbuffered mode + virtual Status Truncate(uint64_t size) override { + Status s = ftruncate(filename_, hFile_, size); + if (s.ok()) { + filesize_ = size; + } return s; } virtual Status Close() override { - Status s; - // If there is any data in the cache not written we need to deal with it - const size_t cursize = buffer_.GetCurrentSize(); - const uint64_t final_size = filesize_ + cursize; - - if (cursize > 0) { - // If OS buffering is on, we just flush the remainder, otherwise need - if (!use_os_buffer_) { - s = WriteUnbuffered(); - } else { - s = WriteBuffered(buffer_.GetBufferStart(), cursize); - } - } + Status s; - if (s.ok()) { - s = ftruncate(filename_, hFile_, final_size); - } + assert(INVALID_HANDLE_VALUE != hFile_); - // Sync data if buffer was flushed - if (s.ok() && (cursize > 0) && fsync(hFile_) < 0) { + if (fsync(hFile_) < 0) { auto lastError = GetLastError(); s = IOErrorFromWindowsError("fsync failed at Close() for: " + filename_, - lastError); + lastError); } if (FALSE == ::CloseHandle(hFile_)) { - if (s.ok()) { - auto lastError = GetLastError(); - s = IOErrorFromWindowsError("CloseHandle failed for: " + filename_, - lastError); - } + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("CloseHandle failed for: " + filename_, + lastError); } hFile_ = INVALID_HANDLE_VALUE; @@ -1057,36 +933,18 @@ class WinWritableFile : public WritableFile { } // write out the cached data to the OS cache + // This is now taken care of the WritableFileWriter virtual Status Flush() override { - Status status; - - if (buffer_.GetCurrentSize() > 0) { - if (!use_os_buffer_) { - status = WriteUnbuffered(); - } else { - status = - WriteBuffered(buffer_.GetBufferStart(), buffer_.GetCurrentSize()); - if (status.ok()) { - buffer_.SetSize(0); - } - } - } - return status; + return Status::OK(); } virtual Status Sync() override { - Status s = Flush(); - if (!s.ok()) { - return s; - } - + Status s; // Calls flush buffers - if (pending_sync_ && fsync(hFile_) < 0) { + if (fsync(hFile_) < 0) { auto lastError = GetLastError(); s = IOErrorFromWindowsError("fsync failed at Sync() for: " + filename_, lastError); - } else { - pending_sync_ = false; } return s; } @@ -1094,7 +952,12 @@ class WinWritableFile : public WritableFile { virtual Status Fsync() override { return Sync(); } virtual uint64_t GetFileSize() override { - return filesize_ + buffer_.GetCurrentSize(); + // Double accounting now here with WritableFileWriter + // and this size will be wrong when unbuffered access is used + // but tests implement their own writable files and do not use WritableFileWrapper + // so we need to squeeze a square peg through + // a round hole here. + return filesize_; } virtual Status Allocate(off_t offset, off_t len) override { @@ -1104,7 +967,7 @@ class WinWritableFile : public WritableFile { // Make sure that we reserve an aligned amount of space // since the reservation block size is driven outside so we want // to check if we are ok with reservation here - size_t spaceToReserve = Roundup(offset + len, buffer_.GetAlignment()); + size_t spaceToReserve = Roundup(offset + len, alignment_); // Nothing to do if (spaceToReserve <= reservedsize_) { return status; @@ -1117,133 +980,6 @@ class WinWritableFile : public WritableFile { } return status; } - - private: - // This method writes to disk the specified data and makes use of the rate - // limiter - // if available - Status WriteBuffered(const char* data, size_t size) { - Status s; - assert(use_os_buffer_); - const char* src = data; - size_t left = size; - - size_t actually_written = 0; - - while (left > 0) { - size_t bytes_allowed = RequestToken(left, false); - - DWORD bytesWritten = 0; - if (!WriteFile(hFile_, src, bytes_allowed, &bytesWritten, NULL)) { - auto lastError = GetLastError(); - s = IOErrorFromWindowsError( - "Failed to write buffered via rate_limiter: " + filename_, - lastError); - break; - } else { - actually_written += bytesWritten; - src += bytesWritten; - left -= bytesWritten; - } - } - - IOSTATS_ADD(bytes_written, actually_written); - filesize_ += actually_written; - - return s; - } - - // This flushes the accumulated data in the buffer. We pad data with zeros if - // necessary to the whole page. - // However, during automatic flushes padding would not be necessary. - // We always use RateLimiter if available. We move (Refit) any buffer bytes - // that are left over the - // whole number of pages to be written again on the next flush because we can - // only write on aligned - // offsets. - Status WriteUnbuffered() { - Status s; - - assert(!use_os_buffer_); - size_t alignment = buffer_.GetAlignment(); - assert((filesize_ % alignment) == 0); - - // Calculate whole page final file advance if all writes succeed - size_t file_advance = - TruncateToPageBoundary(alignment, buffer_.GetCurrentSize()); - - // Calculate the leftover tail, we write it here padded with zeros BUT we - // will write - // it again in the future either on Close() OR when the current whole page - // fills out - size_t leftover_tail = buffer_.GetCurrentSize() - file_advance; - - // Round up and pad - buffer_.PadToAlignmentWith(0); - - const char* src = buffer_.GetBufferStart(); - size_t left = buffer_.GetCurrentSize(); - uint64_t file_offset = filesize_; - size_t actually_written = 0; - - while (left > 0) { - // Request how much is allowed. If this is less than one alignment we may - // be blocking a lot on every write - // because we can not write less than one alignment (page) unit thus check - // the configuration. - size_t bytes_allowed = RequestToken(left, true); - SSIZE_T ret = pwrite(hFile_, buffer_.GetBufferStart() + actually_written, - bytes_allowed, file_offset); - - // Error break - if (ret < 0) { - auto lastError = GetLastError(); - s = IOErrorFromWindowsError( - "Failed to pwrite for unbuffered: " + filename_, lastError); - buffer_.SetSize(file_advance + leftover_tail); - break; - } - actually_written += ret; - file_offset += ret; - left -= ret; - } - - IOSTATS_ADD(bytes_written, actually_written); - - if (s.ok()) { - // Move the tail to the beginning of the buffer - // This never happens during normal Append but rather during - // explicit call to Flush()/Sync() or Close() - buffer_.RefitTail(file_advance, leftover_tail); - // This is where we start writing next time which may or not be - // the actual file size on disk. They match if the buffer size - // is a multiple of whole pages otherwise filesize_ is leftover_tail - // behind - filesize_ += file_advance; - } - return s; - } - - // This truncates the request to a single burst bytes - // and then goes through the request to make sure we are - // satisfied in the order of the I/O priority - size_t RequestToken(size_t bytes, bool align) const { - if (rate_limiter_ && io_priority_ < Env::IO_TOTAL) { - bytes = std::min( - bytes, static_cast(rate_limiter_->GetSingleBurstBytes())); - - if (align) { - // Here we may actually require more than burst and block - // but we can not write less than one page at a time on unbuffered - // thus we may want not to use ratelimiter s - size_t alignment = buffer_.GetAlignment(); - bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes)); - } - - rate_limiter_->Request(bytes, io_priority_); - } - return bytes; - } }; class WinDirectory : public Directory { @@ -2092,7 +1828,7 @@ class WinEnv : public Env { ThreadPool* thread_pool_; size_t thread_id_; // Thread count in the thread. - explicit BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id) + BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id) : thread_pool_(thread_pool), thread_id_(thread_id) {} }; diff --git a/util/aligned_buffer.h b/util/aligned_buffer.h new file mode 100644 index 0000000000000000000000000000000000000000..2244316feed2b44ba0b6be50fce61601f005160f --- /dev/null +++ b/util/aligned_buffer.h @@ -0,0 +1,154 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once + +#include +#include "port/port.h" + +namespace rocksdb { + +inline size_t TruncateToPageBoundary(size_t page_size, size_t s) { + s -= (s & (page_size - 1)); + assert((s % page_size) == 0); + return s; +} + +inline size_t Roundup(size_t x, size_t y) { + return ((x + y - 1) / y) * y; +} + +// This class is to manage an aligned user +// allocated buffer for unbuffered I/O purposes +// though can be used for any purpose. +class AlignedBuffer { + size_t alignment_; + std::unique_ptr buf_; + size_t capacity_; + size_t cursize_; + char* bufstart_; + +public: + AlignedBuffer() + : alignment_(), + capacity_(0), + cursize_(0), + bufstart_(nullptr) { + } + + AlignedBuffer(AlignedBuffer&& o) ROCKSDB_NOEXCEPT { + *this = std::move(o); + } + + AlignedBuffer& operator=(AlignedBuffer&& o) ROCKSDB_NOEXCEPT { + alignment_ = std::move(o.alignment_); + buf_ = std::move(o.buf_); + capacity_ = std::move(o.capacity_); + cursize_ = std::move(o.cursize_); + bufstart_ = std::move(o.bufstart_); + return *this; + } + + AlignedBuffer(const AlignedBuffer&) = delete; + + AlignedBuffer& operator=(const AlignedBuffer&) = delete; + + size_t Alignment() const { + return alignment_; + } + + size_t Capacity() const { + return capacity_; + } + + size_t CurrentSize() const { + return cursize_; + } + + const char* BufferStart() const { + return bufstart_; + } + + void Clear() { + cursize_ = 0; + } + + void Alignment(size_t alignment) { + assert(alignment > 0); + assert((alignment & (alignment - 1)) == 0); + alignment_ = alignment; + } + + // Allocates a new buffer and sets bufstart_ to the aligned first byte + void AllocateNewBuffer(size_t requestedCapacity) { + + assert(alignment_ > 0); + assert((alignment_ & (alignment_ - 1)) == 0); + + size_t size = Roundup(requestedCapacity, alignment_); + buf_.reset(new char[size + alignment_]); + + char* p = buf_.get(); + bufstart_ = reinterpret_cast( + (reinterpret_cast(p)+(alignment_ - 1)) & + ~static_cast(alignment_ - 1)); + capacity_ = size; + cursize_ = 0; + } + // Used for write + // Returns the number of bytes appended + size_t Append(const char* src, size_t append_size) { + size_t buffer_remaining = capacity_ - cursize_; + size_t to_copy = std::min(append_size, buffer_remaining); + + if (to_copy > 0) { + memcpy(bufstart_ + cursize_, src, to_copy); + cursize_ += to_copy; + } + return to_copy; + } + + size_t Read(char* dest, size_t offset, size_t read_size) const { + assert(offset < cursize_); + size_t to_read = std::min(cursize_ - offset, read_size); + if (to_read > 0) { + memcpy(dest, bufstart_ + offset, to_read); + } + return to_read; + } + + /// Pad to alignment + void PadToAlignmentWith(int padding) { + size_t total_size = Roundup(cursize_, alignment_); + size_t pad_size = total_size - cursize_; + + if (pad_size > 0) { + assert((pad_size + cursize_) <= capacity_); + memset(bufstart_ + cursize_, padding, pad_size); + cursize_ += pad_size; + } + } + + // After a partial flush move the tail to the beginning of the buffer + void RefitTail(size_t tail_offset, size_t tail_size) { + if (tail_size > 0) { + memmove(bufstart_, bufstart_ + tail_offset, tail_size); + } + cursize_ = tail_size; + } + + // Returns place to start writing + char* Destination() { + return bufstart_ + cursize_; + } + + void Size(size_t cursize) { + cursize_ = cursize; + } +}; +} diff --git a/util/db_test_util.h b/util/db_test_util.h index bb1bfa7cb2fc5678ba360f96ae6c555f24897285..dfb2b3e6d42f8ae6da73627ec855198d3099e681 100644 --- a/util/db_test_util.h +++ b/util/db_test_util.h @@ -148,6 +148,9 @@ class SpecialEnv : public EnvWrapper { return base_->Append(data); } } + Status Truncate(uint64_t size) override { + return base_->Truncate(size); + } Status Close() override { // SyncPoint is not supported in Released Windows Mode. #if !(defined NDEBUG) || !defined(OS_WIN) @@ -185,6 +188,7 @@ class SpecialEnv : public EnvWrapper { return base_->Append(data); } } + Status Truncate(uint64_t size) override { return base_->Truncate(size); } Status Close() override { return base_->Close(); } Status Flush() override { return base_->Flush(); } Status Sync() override { @@ -225,6 +229,7 @@ class SpecialEnv : public EnvWrapper { #endif return s; } + Status Truncate(uint64_t size) override { return base_->Truncate(size); } Status Close() override { return base_->Close(); } Status Flush() override { return base_->Flush(); } Status Sync() override { diff --git a/util/env_posix.cc b/util/env_posix.cc index af2ab8e5dedb4bd2add11d0d648b1e61f92d0455..b856f1d7089f464fa1000117de24e87284d31f21 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -494,6 +494,12 @@ class PosixMmapFile : public WritableFile { return Status::OK(); } + // Means Close() will properly take care of truncate + // and it does not need any additional information + virtual Status Truncate(uint64_t size) override { + return Status::OK(); + } + virtual Status Close() override { Status s; size_t unused = limit_ - dst_; @@ -624,6 +630,12 @@ class PosixWritableFile : public WritableFile { return Status::OK(); } + // Means Close() will properly take care of truncate + // and it does not need any additional information + virtual Status Truncate(uint64_t size) override { + return Status::OK(); + } + virtual Status Close() override { Status s; diff --git a/util/env_test.cc b/util/env_test.cc index 0e4feabfdd60724cef0f9eab14d3a5ce57eefaa1..6fe8ed80bc156900bb25c5f0334dbb9b4ee77d71 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -989,6 +989,7 @@ TEST_F(EnvPosixTest, WritableFileWrapper) { } Status Append(const Slice& data) override { inc(1); return Status::OK(); } + Status Truncate(uint64_t size) override { return Status::OK(); } Status Close() override { inc(2); return Status::OK(); } Status Flush() override { inc(3); return Status::OK(); } Status Sync() override { inc(4); return Status::OK(); } diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index 140969843794bb848c50df732bc92e0b1449f878..86d70b62d608a0cd381b4fc1e06c8fa2e4c4e4d7 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -20,6 +20,11 @@ #include "util/sync_point.h" namespace rocksdb { + +namespace { + const size_t c_OneMb = (1 << 20); +} + Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { Status s = file_->Read(n, result, scratch); IOSTATS_ADD(bytes_read, result->size()); @@ -59,81 +64,116 @@ Status WritableFileWriter::Append(const Slice& data) { TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite"); writable_file_->PrepareWrite(static_cast(GetFileSize()), left); } - // if there is no space in the cache, then flush - if (cursize_ + left > capacity_) { - s = Flush(); - if (!s.ok()) { - return s; + + // Flush only when I/O is buffered + if (use_os_buffer_ && + (buf_.Capacity() - buf_.CurrentSize()) < left) { + if (buf_.CurrentSize() > 0) { + s = Flush(); + if (!s.ok()) { + return s; + } } - // Increase the buffer size, but capped at 1MB - if (capacity_ < (1 << 20)) { - capacity_ *= 2; - buf_.reset(new char[capacity_]); + + if (buf_.Capacity() < c_OneMb) { + size_t desiredCapacity = buf_.Capacity() * 2; + desiredCapacity = std::min(desiredCapacity, c_OneMb); + buf_.AllocateNewBuffer(desiredCapacity); } - assert(cursize_ == 0); + assert(buf_.CurrentSize() == 0); } - // if the write fits into the cache, then write to cache - // otherwise do a write() syscall to write to OS buffers. - if (cursize_ + left <= capacity_) { - memcpy(buf_.get() + cursize_, src, left); - cursize_ += left; - } else { - while (left != 0) { - size_t size = RequestToken(left); - { - IOSTATS_TIMER_GUARD(write_nanos); - s = writable_file_->Append(Slice(src, size)); + // We never write directly to disk with unbuffered I/O on. + // or we simply use it for its original purpose to accumulate many small + // chunks + if (!use_os_buffer_ || (buf_.Capacity() >= left)) { + while (left > 0) { + size_t appended = buf_.Append(src, left); + left -= appended; + src += appended; + + if (left > 0) { + s = Flush(); if (!s.ok()) { - return s; + break; } - } - IOSTATS_ADD(bytes_written, size); - TEST_KILL_RANDOM(rocksdb_kill_odds); - left -= size; - src += size; + // We double the buffer here because + // Flush calls do not keep up with the incoming bytes + // This is the only place when buffer is changed with unbuffered I/O + if (buf_.Capacity() < (1 << 20)) { + size_t desiredCapacity = buf_.Capacity() * 2; + desiredCapacity = std::min(desiredCapacity, c_OneMb); + buf_.AllocateNewBuffer(desiredCapacity); + } + } } + } else { + // Writing directly to file bypassing the buffer + assert(buf_.CurrentSize() == 0); + s = WriteBuffered(src, left); } + TEST_KILL_RANDOM(rocksdb_kill_odds); filesize_ += data.size(); return Status::OK(); } Status WritableFileWriter::Close() { + + // Do not quit immediately on failure the file MUST be closed Status s; - s = Flush(); // flush cache to OS - if (!s.ok()) { + + // Possible to close it twice now as we MUST close + // in __dtor, simply flushing is not enough + // Windows when pre-allocating does not fill with zeros + // also with unbuffered access we also set the end of data. + if (!writable_file_) { return s; } + s = Flush(); // flush cache to OS + + // In unbuffered mode we write whole pages so + // we need to let the file know where data ends. + Status interim = writable_file_->Truncate(filesize_); + if (!interim.ok() && s.ok()) { + s = interim; + } + TEST_KILL_RANDOM(rocksdb_kill_odds); - return writable_file_->Close(); + interim = writable_file_->Close(); + if (!interim.ok() && s.ok()) { + s = interim; + } + + writable_file_.reset(); + + return s; } + // write out the cached data to the OS cache Status WritableFileWriter::Flush() { + Status s; TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2); - size_t left = cursize_; - char* src = buf_.get(); - while (left != 0) { - size_t size = RequestToken(left); - { - IOSTATS_TIMER_GUARD(write_nanos); - TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); - Status s = writable_file_->Append(Slice(src, size)); - if (!s.ok()) { - return s; - } + + if (buf_.CurrentSize() > 0) { + if (use_os_buffer_) { + s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize()); + } else { + s = WriteUnbuffered(); + } + if (!s.ok()) { + return s; } - IOSTATS_ADD(bytes_written, size); - TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2); - left -= size; - src += size; } - cursize_ = 0; - writable_file_->Flush(); + s = writable_file_->Flush(); + + if (!s.ok()) { + return s; + } // sync OS cache to disk for every bytes_per_sync_ // TODO: give log file and sst file different options (log @@ -147,21 +187,21 @@ Status WritableFileWriter::Flush() { // Xfs does neighbor page flushing outside of the specified ranges. We // need to make sure sync range is far from the write offset. if (!direct_io_ && bytes_per_sync_) { - uint64_t kBytesNotSyncRange = 1024 * 1024; // recent 1MB is not synced. - uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB. + const uint64_t kBytesNotSyncRange = 1024 * 1024; // recent 1MB is not synced. + const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB. if (filesize_ > kBytesNotSyncRange) { uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange; offset_sync_to -= offset_sync_to % kBytesAlignWhenSync; assert(offset_sync_to >= last_sync_size_); if (offset_sync_to > 0 && offset_sync_to - last_sync_size_ >= bytes_per_sync_) { - RangeSync(last_sync_size_, offset_sync_to - last_sync_size_); + s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_); last_sync_size_ = offset_sync_to; } } } - return Status::OK(); + return s; } Status WritableFileWriter::Sync(bool use_fsync) { @@ -214,27 +254,140 @@ Status WritableFileWriter::RangeSync(off_t offset, off_t nbytes) { return writable_file_->RangeSync(offset, nbytes); } -size_t WritableFileWriter::RequestToken(size_t bytes) { +size_t WritableFileWriter::RequestToken(size_t bytes, bool align) { Env::IOPriority io_priority; - if (rate_limiter_&&(io_priority = writable_file_->GetIOPriority()) < + if (rate_limiter_ && (io_priority = writable_file_->GetIOPriority()) < Env::IO_TOTAL) { - bytes = std::min(bytes, - static_cast(rate_limiter_->GetSingleBurstBytes())); + bytes = std::min( + bytes, static_cast(rate_limiter_->GetSingleBurstBytes())); + + if (align) { + // Here we may actually require more than burst and block + // but we can not write less than one page at a time on unbuffered + // thus we may want not to use ratelimiter s + size_t alignment = buf_.Alignment(); + bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes)); + } rate_limiter_->Request(bytes, io_priority); } return bytes; } +// This method writes to disk the specified data and makes use of the rate +// limiter if available +Status WritableFileWriter::WriteBuffered(const char* data, size_t size) { + Status s; + assert(use_os_buffer_); + const char* src = data; + size_t left = size; + + while (left > 0) { + size_t allowed = RequestToken(left, false); + + { + IOSTATS_TIMER_GUARD(write_nanos); + TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); + s = writable_file_->Append(Slice(src, allowed)); + if (!s.ok()) { + return s; + } + } + + IOSTATS_ADD(bytes_written, allowed); + TEST_KILL_RANDOM(rocksdb_kill_odds); + + left -= allowed; + src += allowed; + } + buf_.Size(0); + return s; +} + + +// This flushes the accumulated data in the buffer. We pad data with zeros if +// necessary to the whole page. +// However, during automatic flushes padding would not be necessary. +// We always use RateLimiter if available. We move (Refit) any buffer bytes +// that are left over the +// whole number of pages to be written again on the next flush because we can +// only write on aligned +// offsets. +Status WritableFileWriter::WriteUnbuffered() { + Status s; + + assert(!use_os_buffer_); + const size_t alignment = buf_.Alignment(); + assert((next_write_offset_ % alignment) == 0); + + // Calculate whole page final file advance if all writes succeed + size_t file_advance = + TruncateToPageBoundary(alignment, buf_.CurrentSize()); + + // Calculate the leftover tail, we write it here padded with zeros BUT we + // will write + // it again in the future either on Close() OR when the current whole page + // fills out + size_t leftover_tail = buf_.CurrentSize() - file_advance; + + // Round up and pad + buf_.PadToAlignmentWith(0); + + const char* src = buf_.BufferStart(); + uint64_t write_offset = next_write_offset_; + size_t left = buf_.CurrentSize(); + + while (left > 0) { + // Check how much is allowed + size_t size = RequestToken(left, true); + + { + IOSTATS_TIMER_GUARD(write_nanos); + TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); + // Unbuffered writes must be positional + s = writable_file_->PositionedAppend(Slice(src, size), write_offset); + if (!s.ok()) { + buf_.Size(file_advance + leftover_tail); + return s; + } + } + + IOSTATS_ADD(bytes_written, size); + left -= size; + src += size; + write_offset += size; + assert((next_write_offset_ % alignment) == 0); + } + + if (s.ok()) { + // Move the tail to the beginning of the buffer + // This never happens during normal Append but rather during + // explicit call to Flush()/Sync() or Close() + buf_.RefitTail(file_advance, leftover_tail); + // This is where we start writing next time which may or not be + // the actual file size on disk. They match if the buffer size + // is a multiple of whole pages otherwise filesize_ is leftover_tail + // behind + next_write_offset_ += file_advance; + } + return s; +} + + namespace { class ReadaheadRandomAccessFile : public RandomAccessFile { public: - ReadaheadRandomAccessFile(std::unique_ptr file, - size_t readahead_size) - : file_(std::move(file)), - readahead_size_(readahead_size), - buffer_(new char[readahead_size_]), - buffer_offset_(0), - buffer_len_(0) {} + ReadaheadRandomAccessFile(std::unique_ptr&& file, + size_t readahead_size) + : file_(std::move(file)), + readahead_size_(readahead_size), + forward_calls_(file_->ShouldForwardRawRequest()), + buffer_(new char[readahead_size_]), + buffer_offset_(0), + buffer_len_(0) {} + + ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete; + + ReadaheadRandomAccessFile& operator=(const ReadaheadRandomAccessFile&) = delete; virtual Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const override { @@ -242,6 +395,14 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { return file_->Read(offset, n, result, scratch); } + // On Windows in unbuffered mode this will lead to double buffering + // and double locking so we avoid that. + // In normal mode Windows caches so much data from disk that we do + // not need readahead. + if (forward_calls_) { + return file_->Read(offset, n, result, scratch); + } + std::unique_lock lk(lock_); size_t copied = 0; @@ -249,7 +410,7 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { if (offset >= buffer_offset_ && offset < buffer_len_ + buffer_offset_) { uint64_t offset_in_buffer = offset - buffer_offset_; copied = std::min(static_cast(buffer_len_) - offset_in_buffer, - static_cast(n)); + static_cast(n)); memcpy(scratch, buffer_.get() + offset_in_buffer, copied); if (copied == n) { // fully cached @@ -259,7 +420,7 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { } Slice readahead_result; Status s = file_->Read(offset + copied, readahead_size_, &readahead_result, - buffer_.get()); + buffer_.get()); if (!s.ok()) { return s; } @@ -290,20 +451,21 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { private: std::unique_ptr file_; - size_t readahead_size_; + size_t readahead_size_; + const bool forward_calls_; - mutable std::mutex lock_; + mutable std::mutex lock_; mutable std::unique_ptr buffer_; - mutable uint64_t buffer_offset_; - mutable size_t buffer_len_; + mutable uint64_t buffer_offset_; + mutable size_t buffer_len_; }; } // namespace std::unique_ptr NewReadaheadRandomAccessFile( - std::unique_ptr file, size_t readahead_size) { - std::unique_ptr wrapped_file( - new ReadaheadRandomAccessFile(std::move(file), readahead_size)); - return std::move(wrapped_file); + std::unique_ptr&& file, size_t readahead_size) { + std::unique_ptr result( + new ReadaheadRandomAccessFile(std::move(file), readahead_size)); + return result; } } // namespace rocksdb diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index 616d174a23356e8b024475cc3af48606cc4c8cba..f33965dc6d6b7b0916991d4d7c7632979c6aef59 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -8,6 +8,8 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once #include "rocksdb/env.h" +#include "util/aligned_buffer.h" +#include "port/port.h" namespace rocksdb { @@ -15,7 +17,7 @@ class Statistics; class HistogramImpl; std::unique_ptr NewReadaheadRandomAccessFile( - std::unique_ptr file, size_t readahead_size); + std::unique_ptr&& file, size_t readahead_size); class SequentialFileReader { private: @@ -24,6 +26,19 @@ class SequentialFileReader { public: explicit SequentialFileReader(std::unique_ptr&& _file) : file_(std::move(_file)) {} + + SequentialFileReader(SequentialFileReader&& o) ROCKSDB_NOEXCEPT { + *this = std::move(o); + } + + SequentialFileReader& operator=(SequentialFileReader&& o) ROCKSDB_NOEXCEPT { + file_ = std::move(o.file_); + return *this; + } + + SequentialFileReader(SequentialFileReader&) = delete; + SequentialFileReader& operator=(SequentialFileReader&) = delete; + Status Read(size_t n, Slice* result, char* scratch); Status Skip(uint64_t n); @@ -34,10 +49,10 @@ class SequentialFileReader { class RandomAccessFileReader : public RandomAccessFile { private: std::unique_ptr file_; - Env* env_; - Statistics* stats_; - uint32_t hist_type_; - HistogramImpl* file_read_hist_; + Env* env_; + Statistics* stats_; + uint32_t hist_type_; + HistogramImpl* file_read_hist_; public: explicit RandomAccessFileReader(std::unique_ptr&& raf, @@ -51,6 +66,22 @@ class RandomAccessFileReader : public RandomAccessFile { hist_type_(hist_type), file_read_hist_(file_read_hist) {} + RandomAccessFileReader(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT { + *this = std::move(o); + } + + RandomAccessFileReader& operator=(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT{ + file_ = std::move(o.file_); + env_ = std::move(o.env_); + stats_ = std::move(o.stats_); + hist_type_ = std::move(o.hist_type_); + file_read_hist_ = std::move(o.file_read_hist_); + return *this; + } + + RandomAccessFileReader(const RandomAccessFileReader&) = delete; + RandomAccessFileReader& operator=(const RandomAccessFileReader&) = delete; + Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const; RandomAccessFile* file() { return file_.get(); } @@ -60,33 +91,47 @@ class RandomAccessFileReader : public RandomAccessFile { class WritableFileWriter { private: std::unique_ptr writable_file_; - size_t cursize_; // current size of cached data in buf_ - size_t capacity_; // max size of buf_ - unique_ptr buf_; // a buffer to cache writes - uint64_t filesize_; - bool pending_sync_; - bool pending_fsync_; - bool direct_io_; - uint64_t last_sync_size_; - uint64_t bytes_per_sync_; - RateLimiter* rate_limiter_; + AlignedBuffer buf_; + // Actually written data size can be used for truncate + // not counting padding data + uint64_t filesize_; + // This is necessary when we use unbuffered access + // and writes must happen on aligned offsets + // so we need to go back and write that page again + uint64_t next_write_offset_; + bool pending_sync_; + bool pending_fsync_; + const bool direct_io_; + const bool use_os_buffer_; + uint64_t last_sync_size_; + uint64_t bytes_per_sync_; + RateLimiter* rate_limiter_; public: - explicit WritableFileWriter(std::unique_ptr&& file, - const EnvOptions& options) + WritableFileWriter(std::unique_ptr&& file, + const EnvOptions& options) : writable_file_(std::move(file)), - cursize_(0), - capacity_(65536), - buf_(new char[capacity_]), + buf_(), filesize_(0), + next_write_offset_(0), pending_sync_(false), pending_fsync_(false), direct_io_(writable_file_->UseDirectIO()), + use_os_buffer_(writable_file_->UseOSBuffer()), last_sync_size_(0), bytes_per_sync_(options.bytes_per_sync), - rate_limiter_(options.rate_limiter) {} + rate_limiter_(options.rate_limiter) { + + buf_.Alignment(writable_file_->GetRequiredBufferAlignment()); + buf_.AllocateNewBuffer(65536); + } + + WritableFileWriter(const WritableFileWriter&) = delete; + + WritableFileWriter& operator=(const WritableFileWriter&) = delete; + + ~WritableFileWriter() { Close(); } - ~WritableFileWriter() { Flush(); } Status Append(const Slice& data); Status Flush(); @@ -109,8 +154,13 @@ class WritableFileWriter { WritableFile* writable_file() const { return writable_file_.get(); } private: + // Used when os buffering is OFF and we are writing + // DMA such as in Windows unbuffered mode + Status WriteUnbuffered(); + // Normal write + Status WriteBuffered(const char* data, size_t size); Status RangeSync(off_t offset, off_t nbytes); - size_t RequestToken(size_t bytes); + size_t RequestToken(size_t bytes, bool align); Status SyncInternal(bool use_fsync); }; } // namespace rocksdb diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index 924c171b70c2e7cc35b4397d3a989648c16a2a5a..d1f0dcbec7fc6b95922f899b2d7b3aa68e834db1 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -24,6 +24,9 @@ TEST_F(WritableFileWriterTest, RangeSync) { size_ += data.size(); return Status::OK(); } + virtual Status Truncate(uint64_t size) override { + return Status::OK(); + } Status Close() override { EXPECT_GE(size_, last_synced_ + kMb); EXPECT_LT(size_, last_synced_ + 2 * kMb); diff --git a/util/memenv.cc b/util/memenv.cc index 72fdbd2421d4f0503c154121419f3576ee09cd76..57373702309a1cdbec8077a99bc7321da21eb0bf 100644 --- a/util/memenv.cc +++ b/util/memenv.cc @@ -232,7 +232,9 @@ class WritableFileImpl : public WritableFile { virtual Status Append(const Slice& data) override { return file_->Append(data); } - + virtual Status Truncate(uint64_t size) override { + return Status::OK(); + } virtual Status Close() override { return Status::OK(); } virtual Status Flush() override { return Status::OK(); } virtual Status Sync() override { return Status::OK(); } diff --git a/util/mock_env.cc b/util/mock_env.cc index 088675071cbcb37703690b1ecf258b87371e1e4e..409e16e3afb18b3f2db10eabaea5e019cf7f6308 100644 --- a/util/mock_env.cc +++ b/util/mock_env.cc @@ -250,7 +250,9 @@ class MockWritableFile : public WritableFile { } return Status::OK(); } - + virtual Status Truncate(uint64_t size) override { + return Status::OK(); + } virtual Status Close() override { return file_->Fsync(); } virtual Status Flush() override { return Status::OK(); } diff --git a/util/testutil.h b/util/testutil.h index 67a2aafad7ab3b1a060a115076473352bc7fb9bf..990a3ba818048d4980000d08adf565688b0a887e 100644 --- a/util/testutil.h +++ b/util/testutil.h @@ -184,6 +184,10 @@ class StringSink: public WritableFile { const std::string& contents() const { return contents_; } + virtual Status Truncate(uint64_t size) override { + contents_.resize(size); + return Status::OK(); + } virtual Status Close() override { return Status::OK(); } virtual Status Flush() override { if (reader_contents_ != nullptr) {