提交 592f6bf7 编写于 作者: S Siying Dong

Merge pull request #716 from yuslepukhin/refactor_file_reader_writer_win

Refactor to support file_reader_writer on Windows.
...@@ -898,6 +898,7 @@ class ReportFileOpEnv : public EnvWrapper { ...@@ -898,6 +898,7 @@ class ReportFileOpEnv : public EnvWrapper {
return rv; return rv;
} }
Status Truncate(uint64_t size) override { return target_->Truncate(size); }
Status Close() override { return target_->Close(); } Status Close() override { return target_->Close(); }
Status Flush() override { return target_->Flush(); } Status Flush() override { return target_->Flush(); }
Status Sync() override { return target_->Sync(); } Status Sync() override { return target_->Sync(); }
......
...@@ -1884,6 +1884,8 @@ TEST_F(DBIterWithMergeIterTest, InnerMergeIterator2) { ...@@ -1884,6 +1884,8 @@ TEST_F(DBIterWithMergeIterTest, InnerMergeIterator2) {
ASSERT_EQ(db_iter_->value().ToString(), "4"); ASSERT_EQ(db_iter_->value().ToString(), "4");
} }
#if !(defined NDEBUG) || !defined(OS_WIN)
TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace1) { TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace1) {
// Test Prev() when one child iterator is at its end but more rows // Test Prev() when one child iterator is at its end but more rows
// are added. // are added.
...@@ -2234,6 +2236,7 @@ TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace8) { ...@@ -2234,6 +2236,7 @@ TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace8) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
#endif // #if !(defined NDEBUG) || !defined(OS_WIN)
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {
......
...@@ -141,6 +141,7 @@ class TestWritableFile : public WritableFile { ...@@ -141,6 +141,7 @@ class TestWritableFile : public WritableFile {
FaultInjectionTestEnv* env); FaultInjectionTestEnv* env);
virtual ~TestWritableFile(); virtual ~TestWritableFile();
virtual Status Append(const Slice& data) override; virtual Status Append(const Slice& data) override;
virtual Status Truncate(uint64_t size) override { return target_->Truncate(size); }
virtual Status Close() override; virtual Status Close() override;
virtual Status Flush() override; virtual Status Flush() override;
virtual Status Sync() override; virtual Status Sync() override;
......
...@@ -393,6 +393,12 @@ class RandomAccessFile { ...@@ -393,6 +393,12 @@ class RandomAccessFile {
virtual Status Read(uint64_t offset, size_t n, Slice* result, virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const = 0; char* scratch) const = 0;
// Used by the file_reader_writer to decide if the ReadAhead wrapper
// should simply forward the call and do not enact buffering or locking.
virtual bool ShouldForwardRawRequest() const {
return false;
}
// Tries to get an unique ID for this file that will be the same each time // Tries to get an unique ID for this file that will be the same each time
// the file is opened (and will stay the same while the file is open). // the file is opened (and will stay the same while the file is open).
// Furthermore, it tries to make this ID at most "max_size" bytes. If such an // Furthermore, it tries to make this ID at most "max_size" bytes. If such an
...@@ -413,7 +419,6 @@ class RandomAccessFile { ...@@ -413,7 +419,6 @@ class RandomAccessFile {
// compatibility. // compatibility.
}; };
enum AccessPattern { NORMAL, RANDOM, SEQUENTIAL, WILLNEED, DONTNEED }; enum AccessPattern { NORMAL, RANDOM, SEQUENTIAL, WILLNEED, DONTNEED };
virtual void Hint(AccessPattern pattern) {} virtual void Hint(AccessPattern pattern) {}
...@@ -438,7 +443,35 @@ class WritableFile { ...@@ -438,7 +443,35 @@ class WritableFile {
} }
virtual ~WritableFile(); virtual ~WritableFile();
// Indicates if the class makes use of unbuffered I/O
virtual bool UseOSBuffer() const {
return true;
}
const size_t c_DefaultPageSize = 4 * 1024;
// This is needed when you want to allocate
// AlignedBuffer for use with file I/O classes
// Used for unbuffered file I/O when UseOSBuffer() returns false
virtual size_t GetRequiredBufferAlignment() const {
return c_DefaultPageSize;
}
virtual Status Append(const Slice& data) = 0; virtual Status Append(const Slice& data) = 0;
// Positioned write for unbuffered access default forward
// to simple append as most of the tests are buffered by default
virtual Status PositionedAppend(const Slice& /* data */, uint64_t /* offset */) {
return Status::NotSupported();
}
// Truncate is necessary to trim the file to the correct size
// before closing. It is not always possible to keep track of the file
// size due to whole pages writes. The behavior is undefined if called
// with other writes to follow.
virtual Status Truncate(uint64_t size) {
return Status::OK();
}
virtual Status Close() = 0; virtual Status Close() = 0;
virtual Status Flush() = 0; virtual Status Flush() = 0;
virtual Status Sync() = 0; // sync data virtual Status Sync() = 0; // sync data
...@@ -839,6 +872,10 @@ class WritableFileWrapper : public WritableFile { ...@@ -839,6 +872,10 @@ class WritableFileWrapper : public WritableFile {
explicit WritableFileWrapper(WritableFile* t) : target_(t) { } explicit WritableFileWrapper(WritableFile* t) : target_(t) { }
Status Append(const Slice& data) override { return target_->Append(data); } Status Append(const Slice& data) override { return target_->Append(data); }
Status PositionedAppend(const Slice& data, uint64_t offset) override {
return target_->PositionedAppend(data, offset);
}
Status Truncate(uint64_t size) override { return target_->Truncate(size); }
Status Close() override { return target_->Close(); } Status Close() override { return target_->Close(); }
Status Flush() override { return target_->Flush(); } Status Flush() override { return target_->Flush(); }
Status Sync() override { return target_->Sync(); } Status Sync() override { return target_->Sync(); }
......
此差异已折叠。
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <algorithm>
#include "port/port.h"
namespace rocksdb {
inline size_t TruncateToPageBoundary(size_t page_size, size_t s) {
s -= (s & (page_size - 1));
assert((s % page_size) == 0);
return s;
}
inline size_t Roundup(size_t x, size_t y) {
return ((x + y - 1) / y) * y;
}
// This class is to manage an aligned user
// allocated buffer for unbuffered I/O purposes
// though can be used for any purpose.
class AlignedBuffer {
size_t alignment_;
std::unique_ptr<char[]> buf_;
size_t capacity_;
size_t cursize_;
char* bufstart_;
public:
AlignedBuffer()
: alignment_(),
capacity_(0),
cursize_(0),
bufstart_(nullptr) {
}
AlignedBuffer(AlignedBuffer&& o) ROCKSDB_NOEXCEPT {
*this = std::move(o);
}
AlignedBuffer& operator=(AlignedBuffer&& o) ROCKSDB_NOEXCEPT {
alignment_ = std::move(o.alignment_);
buf_ = std::move(o.buf_);
capacity_ = std::move(o.capacity_);
cursize_ = std::move(o.cursize_);
bufstart_ = std::move(o.bufstart_);
return *this;
}
AlignedBuffer(const AlignedBuffer&) = delete;
AlignedBuffer& operator=(const AlignedBuffer&) = delete;
size_t Alignment() const {
return alignment_;
}
size_t Capacity() const {
return capacity_;
}
size_t CurrentSize() const {
return cursize_;
}
const char* BufferStart() const {
return bufstart_;
}
void Clear() {
cursize_ = 0;
}
void Alignment(size_t alignment) {
assert(alignment > 0);
assert((alignment & (alignment - 1)) == 0);
alignment_ = alignment;
}
// Allocates a new buffer and sets bufstart_ to the aligned first byte
void AllocateNewBuffer(size_t requestedCapacity) {
assert(alignment_ > 0);
assert((alignment_ & (alignment_ - 1)) == 0);
size_t size = Roundup(requestedCapacity, alignment_);
buf_.reset(new char[size + alignment_]);
char* p = buf_.get();
bufstart_ = reinterpret_cast<char*>(
(reinterpret_cast<uintptr_t>(p)+(alignment_ - 1)) &
~static_cast<uintptr_t>(alignment_ - 1));
capacity_ = size;
cursize_ = 0;
}
// Used for write
// Returns the number of bytes appended
size_t Append(const char* src, size_t append_size) {
size_t buffer_remaining = capacity_ - cursize_;
size_t to_copy = std::min(append_size, buffer_remaining);
if (to_copy > 0) {
memcpy(bufstart_ + cursize_, src, to_copy);
cursize_ += to_copy;
}
return to_copy;
}
size_t Read(char* dest, size_t offset, size_t read_size) const {
assert(offset < cursize_);
size_t to_read = std::min(cursize_ - offset, read_size);
if (to_read > 0) {
memcpy(dest, bufstart_ + offset, to_read);
}
return to_read;
}
/// Pad to alignment
void PadToAlignmentWith(int padding) {
size_t total_size = Roundup(cursize_, alignment_);
size_t pad_size = total_size - cursize_;
if (pad_size > 0) {
assert((pad_size + cursize_) <= capacity_);
memset(bufstart_ + cursize_, padding, pad_size);
cursize_ += pad_size;
}
}
// After a partial flush move the tail to the beginning of the buffer
void RefitTail(size_t tail_offset, size_t tail_size) {
if (tail_size > 0) {
memmove(bufstart_, bufstart_ + tail_offset, tail_size);
}
cursize_ = tail_size;
}
// Returns place to start writing
char* Destination() {
return bufstart_ + cursize_;
}
void Size(size_t cursize) {
cursize_ = cursize;
}
};
}
...@@ -148,6 +148,9 @@ class SpecialEnv : public EnvWrapper { ...@@ -148,6 +148,9 @@ class SpecialEnv : public EnvWrapper {
return base_->Append(data); return base_->Append(data);
} }
} }
Status Truncate(uint64_t size) override {
return base_->Truncate(size);
}
Status Close() override { Status Close() override {
// SyncPoint is not supported in Released Windows Mode. // SyncPoint is not supported in Released Windows Mode.
#if !(defined NDEBUG) || !defined(OS_WIN) #if !(defined NDEBUG) || !defined(OS_WIN)
...@@ -185,6 +188,7 @@ class SpecialEnv : public EnvWrapper { ...@@ -185,6 +188,7 @@ class SpecialEnv : public EnvWrapper {
return base_->Append(data); return base_->Append(data);
} }
} }
Status Truncate(uint64_t size) override { return base_->Truncate(size); }
Status Close() override { return base_->Close(); } Status Close() override { return base_->Close(); }
Status Flush() override { return base_->Flush(); } Status Flush() override { return base_->Flush(); }
Status Sync() override { Status Sync() override {
...@@ -225,6 +229,7 @@ class SpecialEnv : public EnvWrapper { ...@@ -225,6 +229,7 @@ class SpecialEnv : public EnvWrapper {
#endif #endif
return s; return s;
} }
Status Truncate(uint64_t size) override { return base_->Truncate(size); }
Status Close() override { return base_->Close(); } Status Close() override { return base_->Close(); }
Status Flush() override { return base_->Flush(); } Status Flush() override { return base_->Flush(); }
Status Sync() override { Status Sync() override {
......
...@@ -494,6 +494,12 @@ class PosixMmapFile : public WritableFile { ...@@ -494,6 +494,12 @@ class PosixMmapFile : public WritableFile {
return Status::OK(); return Status::OK();
} }
// Means Close() will properly take care of truncate
// and it does not need any additional information
virtual Status Truncate(uint64_t size) override {
return Status::OK();
}
virtual Status Close() override { virtual Status Close() override {
Status s; Status s;
size_t unused = limit_ - dst_; size_t unused = limit_ - dst_;
...@@ -624,6 +630,12 @@ class PosixWritableFile : public WritableFile { ...@@ -624,6 +630,12 @@ class PosixWritableFile : public WritableFile {
return Status::OK(); return Status::OK();
} }
// Means Close() will properly take care of truncate
// and it does not need any additional information
virtual Status Truncate(uint64_t size) override {
return Status::OK();
}
virtual Status Close() override { virtual Status Close() override {
Status s; Status s;
......
...@@ -989,6 +989,7 @@ TEST_F(EnvPosixTest, WritableFileWrapper) { ...@@ -989,6 +989,7 @@ TEST_F(EnvPosixTest, WritableFileWrapper) {
} }
Status Append(const Slice& data) override { inc(1); return Status::OK(); } Status Append(const Slice& data) override { inc(1); return Status::OK(); }
Status Truncate(uint64_t size) override { return Status::OK(); }
Status Close() override { inc(2); return Status::OK(); } Status Close() override { inc(2); return Status::OK(); }
Status Flush() override { inc(3); return Status::OK(); } Status Flush() override { inc(3); return Status::OK(); }
Status Sync() override { inc(4); return Status::OK(); } Status Sync() override { inc(4); return Status::OK(); }
......
...@@ -20,6 +20,11 @@ ...@@ -20,6 +20,11 @@
#include "util/sync_point.h" #include "util/sync_point.h"
namespace rocksdb { namespace rocksdb {
namespace {
const size_t c_OneMb = (1 << 20);
}
Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
Status s = file_->Read(n, result, scratch); Status s = file_->Read(n, result, scratch);
IOSTATS_ADD(bytes_read, result->size()); IOSTATS_ADD(bytes_read, result->size());
...@@ -59,81 +64,116 @@ Status WritableFileWriter::Append(const Slice& data) { ...@@ -59,81 +64,116 @@ Status WritableFileWriter::Append(const Slice& data) {
TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite"); TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left); writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left);
} }
// if there is no space in the cache, then flush
if (cursize_ + left > capacity_) { // Flush only when I/O is buffered
s = Flush(); if (use_os_buffer_ &&
if (!s.ok()) { (buf_.Capacity() - buf_.CurrentSize()) < left) {
return s; if (buf_.CurrentSize() > 0) {
s = Flush();
if (!s.ok()) {
return s;
}
} }
// Increase the buffer size, but capped at 1MB
if (capacity_ < (1 << 20)) { if (buf_.Capacity() < c_OneMb) {
capacity_ *= 2; size_t desiredCapacity = buf_.Capacity() * 2;
buf_.reset(new char[capacity_]); desiredCapacity = std::min(desiredCapacity, c_OneMb);
buf_.AllocateNewBuffer(desiredCapacity);
} }
assert(cursize_ == 0); assert(buf_.CurrentSize() == 0);
} }
// if the write fits into the cache, then write to cache // We never write directly to disk with unbuffered I/O on.
// otherwise do a write() syscall to write to OS buffers. // or we simply use it for its original purpose to accumulate many small
if (cursize_ + left <= capacity_) { // chunks
memcpy(buf_.get() + cursize_, src, left); if (!use_os_buffer_ || (buf_.Capacity() >= left)) {
cursize_ += left; while (left > 0) {
} else { size_t appended = buf_.Append(src, left);
while (left != 0) { left -= appended;
size_t size = RequestToken(left); src += appended;
{
IOSTATS_TIMER_GUARD(write_nanos); if (left > 0) {
s = writable_file_->Append(Slice(src, size)); s = Flush();
if (!s.ok()) { if (!s.ok()) {
return s; break;
} }
}
IOSTATS_ADD(bytes_written, size);
TEST_KILL_RANDOM(rocksdb_kill_odds);
left -= size; // We double the buffer here because
src += size; // Flush calls do not keep up with the incoming bytes
// This is the only place when buffer is changed with unbuffered I/O
if (buf_.Capacity() < (1 << 20)) {
size_t desiredCapacity = buf_.Capacity() * 2;
desiredCapacity = std::min(desiredCapacity, c_OneMb);
buf_.AllocateNewBuffer(desiredCapacity);
}
}
} }
} else {
// Writing directly to file bypassing the buffer
assert(buf_.CurrentSize() == 0);
s = WriteBuffered(src, left);
} }
TEST_KILL_RANDOM(rocksdb_kill_odds); TEST_KILL_RANDOM(rocksdb_kill_odds);
filesize_ += data.size(); filesize_ += data.size();
return Status::OK(); return Status::OK();
} }
Status WritableFileWriter::Close() { Status WritableFileWriter::Close() {
// Do not quit immediately on failure the file MUST be closed
Status s; Status s;
s = Flush(); // flush cache to OS
if (!s.ok()) { // Possible to close it twice now as we MUST close
// in __dtor, simply flushing is not enough
// Windows when pre-allocating does not fill with zeros
// also with unbuffered access we also set the end of data.
if (!writable_file_) {
return s; return s;
} }
s = Flush(); // flush cache to OS
// In unbuffered mode we write whole pages so
// we need to let the file know where data ends.
Status interim = writable_file_->Truncate(filesize_);
if (!interim.ok() && s.ok()) {
s = interim;
}
TEST_KILL_RANDOM(rocksdb_kill_odds); TEST_KILL_RANDOM(rocksdb_kill_odds);
return writable_file_->Close(); interim = writable_file_->Close();
if (!interim.ok() && s.ok()) {
s = interim;
}
writable_file_.reset();
return s;
} }
// write out the cached data to the OS cache // write out the cached data to the OS cache
Status WritableFileWriter::Flush() { Status WritableFileWriter::Flush() {
Status s;
TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2); TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2);
size_t left = cursize_;
char* src = buf_.get(); if (buf_.CurrentSize() > 0) {
while (left != 0) { if (use_os_buffer_) {
size_t size = RequestToken(left); s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize());
{ } else {
IOSTATS_TIMER_GUARD(write_nanos); s = WriteUnbuffered();
TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); }
Status s = writable_file_->Append(Slice(src, size)); if (!s.ok()) {
if (!s.ok()) { return s;
return s;
}
} }
IOSTATS_ADD(bytes_written, size);
TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2);
left -= size;
src += size;
} }
cursize_ = 0;
writable_file_->Flush(); s = writable_file_->Flush();
if (!s.ok()) {
return s;
}
// sync OS cache to disk for every bytes_per_sync_ // sync OS cache to disk for every bytes_per_sync_
// TODO: give log file and sst file different options (log // TODO: give log file and sst file different options (log
...@@ -147,21 +187,21 @@ Status WritableFileWriter::Flush() { ...@@ -147,21 +187,21 @@ Status WritableFileWriter::Flush() {
// Xfs does neighbor page flushing outside of the specified ranges. We // Xfs does neighbor page flushing outside of the specified ranges. We
// need to make sure sync range is far from the write offset. // need to make sure sync range is far from the write offset.
if (!direct_io_ && bytes_per_sync_) { if (!direct_io_ && bytes_per_sync_) {
uint64_t kBytesNotSyncRange = 1024 * 1024; // recent 1MB is not synced. const uint64_t kBytesNotSyncRange = 1024 * 1024; // recent 1MB is not synced.
uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB. const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB.
if (filesize_ > kBytesNotSyncRange) { if (filesize_ > kBytesNotSyncRange) {
uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange; uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange;
offset_sync_to -= offset_sync_to % kBytesAlignWhenSync; offset_sync_to -= offset_sync_to % kBytesAlignWhenSync;
assert(offset_sync_to >= last_sync_size_); assert(offset_sync_to >= last_sync_size_);
if (offset_sync_to > 0 && if (offset_sync_to > 0 &&
offset_sync_to - last_sync_size_ >= bytes_per_sync_) { offset_sync_to - last_sync_size_ >= bytes_per_sync_) {
RangeSync(last_sync_size_, offset_sync_to - last_sync_size_); s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_);
last_sync_size_ = offset_sync_to; last_sync_size_ = offset_sync_to;
} }
} }
} }
return Status::OK(); return s;
} }
Status WritableFileWriter::Sync(bool use_fsync) { Status WritableFileWriter::Sync(bool use_fsync) {
...@@ -214,27 +254,140 @@ Status WritableFileWriter::RangeSync(off_t offset, off_t nbytes) { ...@@ -214,27 +254,140 @@ Status WritableFileWriter::RangeSync(off_t offset, off_t nbytes) {
return writable_file_->RangeSync(offset, nbytes); return writable_file_->RangeSync(offset, nbytes);
} }
size_t WritableFileWriter::RequestToken(size_t bytes) { size_t WritableFileWriter::RequestToken(size_t bytes, bool align) {
Env::IOPriority io_priority; Env::IOPriority io_priority;
if (rate_limiter_&&(io_priority = writable_file_->GetIOPriority()) < if (rate_limiter_ && (io_priority = writable_file_->GetIOPriority()) <
Env::IO_TOTAL) { Env::IO_TOTAL) {
bytes = std::min(bytes, bytes = std::min(
static_cast<size_t>(rate_limiter_->GetSingleBurstBytes())); bytes, static_cast<size_t>(rate_limiter_->GetSingleBurstBytes()));
if (align) {
// Here we may actually require more than burst and block
// but we can not write less than one page at a time on unbuffered
// thus we may want not to use ratelimiter s
size_t alignment = buf_.Alignment();
bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes));
}
rate_limiter_->Request(bytes, io_priority); rate_limiter_->Request(bytes, io_priority);
} }
return bytes; return bytes;
} }
// This method writes to disk the specified data and makes use of the rate
// limiter if available
Status WritableFileWriter::WriteBuffered(const char* data, size_t size) {
Status s;
assert(use_os_buffer_);
const char* src = data;
size_t left = size;
while (left > 0) {
size_t allowed = RequestToken(left, false);
{
IOSTATS_TIMER_GUARD(write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
s = writable_file_->Append(Slice(src, allowed));
if (!s.ok()) {
return s;
}
}
IOSTATS_ADD(bytes_written, allowed);
TEST_KILL_RANDOM(rocksdb_kill_odds);
left -= allowed;
src += allowed;
}
buf_.Size(0);
return s;
}
// This flushes the accumulated data in the buffer. We pad data with zeros if
// necessary to the whole page.
// However, during automatic flushes padding would not be necessary.
// We always use RateLimiter if available. We move (Refit) any buffer bytes
// that are left over the
// whole number of pages to be written again on the next flush because we can
// only write on aligned
// offsets.
Status WritableFileWriter::WriteUnbuffered() {
Status s;
assert(!use_os_buffer_);
const size_t alignment = buf_.Alignment();
assert((next_write_offset_ % alignment) == 0);
// Calculate whole page final file advance if all writes succeed
size_t file_advance =
TruncateToPageBoundary(alignment, buf_.CurrentSize());
// Calculate the leftover tail, we write it here padded with zeros BUT we
// will write
// it again in the future either on Close() OR when the current whole page
// fills out
size_t leftover_tail = buf_.CurrentSize() - file_advance;
// Round up and pad
buf_.PadToAlignmentWith(0);
const char* src = buf_.BufferStart();
uint64_t write_offset = next_write_offset_;
size_t left = buf_.CurrentSize();
while (left > 0) {
// Check how much is allowed
size_t size = RequestToken(left, true);
{
IOSTATS_TIMER_GUARD(write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
// Unbuffered writes must be positional
s = writable_file_->PositionedAppend(Slice(src, size), write_offset);
if (!s.ok()) {
buf_.Size(file_advance + leftover_tail);
return s;
}
}
IOSTATS_ADD(bytes_written, size);
left -= size;
src += size;
write_offset += size;
assert((next_write_offset_ % alignment) == 0);
}
if (s.ok()) {
// Move the tail to the beginning of the buffer
// This never happens during normal Append but rather during
// explicit call to Flush()/Sync() or Close()
buf_.RefitTail(file_advance, leftover_tail);
// This is where we start writing next time which may or not be
// the actual file size on disk. They match if the buffer size
// is a multiple of whole pages otherwise filesize_ is leftover_tail
// behind
next_write_offset_ += file_advance;
}
return s;
}
namespace { namespace {
class ReadaheadRandomAccessFile : public RandomAccessFile { class ReadaheadRandomAccessFile : public RandomAccessFile {
public: public:
ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile> file, ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile>&& file,
size_t readahead_size) size_t readahead_size)
: file_(std::move(file)), : file_(std::move(file)),
readahead_size_(readahead_size), readahead_size_(readahead_size),
buffer_(new char[readahead_size_]), forward_calls_(file_->ShouldForwardRawRequest()),
buffer_offset_(0), buffer_(new char[readahead_size_]),
buffer_len_(0) {} buffer_offset_(0),
buffer_len_(0) {}
ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete;
ReadaheadRandomAccessFile& operator=(const ReadaheadRandomAccessFile&) = delete;
virtual Status Read(uint64_t offset, size_t n, Slice* result, virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override { char* scratch) const override {
...@@ -242,6 +395,14 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { ...@@ -242,6 +395,14 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
return file_->Read(offset, n, result, scratch); return file_->Read(offset, n, result, scratch);
} }
// On Windows in unbuffered mode this will lead to double buffering
// and double locking so we avoid that.
// In normal mode Windows caches so much data from disk that we do
// not need readahead.
if (forward_calls_) {
return file_->Read(offset, n, result, scratch);
}
std::unique_lock<std::mutex> lk(lock_); std::unique_lock<std::mutex> lk(lock_);
size_t copied = 0; size_t copied = 0;
...@@ -249,7 +410,7 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { ...@@ -249,7 +410,7 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
if (offset >= buffer_offset_ && offset < buffer_len_ + buffer_offset_) { if (offset >= buffer_offset_ && offset < buffer_len_ + buffer_offset_) {
uint64_t offset_in_buffer = offset - buffer_offset_; uint64_t offset_in_buffer = offset - buffer_offset_;
copied = std::min(static_cast<uint64_t>(buffer_len_) - offset_in_buffer, copied = std::min(static_cast<uint64_t>(buffer_len_) - offset_in_buffer,
static_cast<uint64_t>(n)); static_cast<uint64_t>(n));
memcpy(scratch, buffer_.get() + offset_in_buffer, copied); memcpy(scratch, buffer_.get() + offset_in_buffer, copied);
if (copied == n) { if (copied == n) {
// fully cached // fully cached
...@@ -259,7 +420,7 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { ...@@ -259,7 +420,7 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
} }
Slice readahead_result; Slice readahead_result;
Status s = file_->Read(offset + copied, readahead_size_, &readahead_result, Status s = file_->Read(offset + copied, readahead_size_, &readahead_result,
buffer_.get()); buffer_.get());
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
...@@ -290,20 +451,21 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { ...@@ -290,20 +451,21 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
private: private:
std::unique_ptr<RandomAccessFile> file_; std::unique_ptr<RandomAccessFile> file_;
size_t readahead_size_; size_t readahead_size_;
const bool forward_calls_;
mutable std::mutex lock_; mutable std::mutex lock_;
mutable std::unique_ptr<char[]> buffer_; mutable std::unique_ptr<char[]> buffer_;
mutable uint64_t buffer_offset_; mutable uint64_t buffer_offset_;
mutable size_t buffer_len_; mutable size_t buffer_len_;
}; };
} // namespace } // namespace
std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile( std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile(
std::unique_ptr<RandomAccessFile> file, size_t readahead_size) { std::unique_ptr<RandomAccessFile>&& file, size_t readahead_size) {
std::unique_ptr<ReadaheadRandomAccessFile> wrapped_file( std::unique_ptr<RandomAccessFile> result(
new ReadaheadRandomAccessFile(std::move(file), readahead_size)); new ReadaheadRandomAccessFile(std::move(file), readahead_size));
return std::move(wrapped_file); return result;
} }
} // namespace rocksdb } // namespace rocksdb
...@@ -8,6 +8,8 @@ ...@@ -8,6 +8,8 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once #pragma once
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "util/aligned_buffer.h"
#include "port/port.h"
namespace rocksdb { namespace rocksdb {
...@@ -15,7 +17,7 @@ class Statistics; ...@@ -15,7 +17,7 @@ class Statistics;
class HistogramImpl; class HistogramImpl;
std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile( std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile(
std::unique_ptr<RandomAccessFile> file, size_t readahead_size); std::unique_ptr<RandomAccessFile>&& file, size_t readahead_size);
class SequentialFileReader { class SequentialFileReader {
private: private:
...@@ -24,6 +26,19 @@ class SequentialFileReader { ...@@ -24,6 +26,19 @@ class SequentialFileReader {
public: public:
explicit SequentialFileReader(std::unique_ptr<SequentialFile>&& _file) explicit SequentialFileReader(std::unique_ptr<SequentialFile>&& _file)
: file_(std::move(_file)) {} : file_(std::move(_file)) {}
SequentialFileReader(SequentialFileReader&& o) ROCKSDB_NOEXCEPT {
*this = std::move(o);
}
SequentialFileReader& operator=(SequentialFileReader&& o) ROCKSDB_NOEXCEPT {
file_ = std::move(o.file_);
return *this;
}
SequentialFileReader(SequentialFileReader&) = delete;
SequentialFileReader& operator=(SequentialFileReader&) = delete;
Status Read(size_t n, Slice* result, char* scratch); Status Read(size_t n, Slice* result, char* scratch);
Status Skip(uint64_t n); Status Skip(uint64_t n);
...@@ -34,10 +49,10 @@ class SequentialFileReader { ...@@ -34,10 +49,10 @@ class SequentialFileReader {
class RandomAccessFileReader : public RandomAccessFile { class RandomAccessFileReader : public RandomAccessFile {
private: private:
std::unique_ptr<RandomAccessFile> file_; std::unique_ptr<RandomAccessFile> file_;
Env* env_; Env* env_;
Statistics* stats_; Statistics* stats_;
uint32_t hist_type_; uint32_t hist_type_;
HistogramImpl* file_read_hist_; HistogramImpl* file_read_hist_;
public: public:
explicit RandomAccessFileReader(std::unique_ptr<RandomAccessFile>&& raf, explicit RandomAccessFileReader(std::unique_ptr<RandomAccessFile>&& raf,
...@@ -51,6 +66,22 @@ class RandomAccessFileReader : public RandomAccessFile { ...@@ -51,6 +66,22 @@ class RandomAccessFileReader : public RandomAccessFile {
hist_type_(hist_type), hist_type_(hist_type),
file_read_hist_(file_read_hist) {} file_read_hist_(file_read_hist) {}
RandomAccessFileReader(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT {
*this = std::move(o);
}
RandomAccessFileReader& operator=(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT{
file_ = std::move(o.file_);
env_ = std::move(o.env_);
stats_ = std::move(o.stats_);
hist_type_ = std::move(o.hist_type_);
file_read_hist_ = std::move(o.file_read_hist_);
return *this;
}
RandomAccessFileReader(const RandomAccessFileReader&) = delete;
RandomAccessFileReader& operator=(const RandomAccessFileReader&) = delete;
Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const; Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const;
RandomAccessFile* file() { return file_.get(); } RandomAccessFile* file() { return file_.get(); }
...@@ -60,33 +91,47 @@ class RandomAccessFileReader : public RandomAccessFile { ...@@ -60,33 +91,47 @@ class RandomAccessFileReader : public RandomAccessFile {
class WritableFileWriter { class WritableFileWriter {
private: private:
std::unique_ptr<WritableFile> writable_file_; std::unique_ptr<WritableFile> writable_file_;
size_t cursize_; // current size of cached data in buf_ AlignedBuffer buf_;
size_t capacity_; // max size of buf_ // Actually written data size can be used for truncate
unique_ptr<char[]> buf_; // a buffer to cache writes // not counting padding data
uint64_t filesize_; uint64_t filesize_;
bool pending_sync_; // This is necessary when we use unbuffered access
bool pending_fsync_; // and writes must happen on aligned offsets
bool direct_io_; // so we need to go back and write that page again
uint64_t last_sync_size_; uint64_t next_write_offset_;
uint64_t bytes_per_sync_; bool pending_sync_;
RateLimiter* rate_limiter_; bool pending_fsync_;
const bool direct_io_;
const bool use_os_buffer_;
uint64_t last_sync_size_;
uint64_t bytes_per_sync_;
RateLimiter* rate_limiter_;
public: public:
explicit WritableFileWriter(std::unique_ptr<WritableFile>&& file, WritableFileWriter(std::unique_ptr<WritableFile>&& file,
const EnvOptions& options) const EnvOptions& options)
: writable_file_(std::move(file)), : writable_file_(std::move(file)),
cursize_(0), buf_(),
capacity_(65536),
buf_(new char[capacity_]),
filesize_(0), filesize_(0),
next_write_offset_(0),
pending_sync_(false), pending_sync_(false),
pending_fsync_(false), pending_fsync_(false),
direct_io_(writable_file_->UseDirectIO()), direct_io_(writable_file_->UseDirectIO()),
use_os_buffer_(writable_file_->UseOSBuffer()),
last_sync_size_(0), last_sync_size_(0),
bytes_per_sync_(options.bytes_per_sync), bytes_per_sync_(options.bytes_per_sync),
rate_limiter_(options.rate_limiter) {} rate_limiter_(options.rate_limiter) {
buf_.Alignment(writable_file_->GetRequiredBufferAlignment());
buf_.AllocateNewBuffer(65536);
}
WritableFileWriter(const WritableFileWriter&) = delete;
WritableFileWriter& operator=(const WritableFileWriter&) = delete;
~WritableFileWriter() { Close(); }
~WritableFileWriter() { Flush(); }
Status Append(const Slice& data); Status Append(const Slice& data);
Status Flush(); Status Flush();
...@@ -109,8 +154,13 @@ class WritableFileWriter { ...@@ -109,8 +154,13 @@ class WritableFileWriter {
WritableFile* writable_file() const { return writable_file_.get(); } WritableFile* writable_file() const { return writable_file_.get(); }
private: private:
// Used when os buffering is OFF and we are writing
// DMA such as in Windows unbuffered mode
Status WriteUnbuffered();
// Normal write
Status WriteBuffered(const char* data, size_t size);
Status RangeSync(off_t offset, off_t nbytes); Status RangeSync(off_t offset, off_t nbytes);
size_t RequestToken(size_t bytes); size_t RequestToken(size_t bytes, bool align);
Status SyncInternal(bool use_fsync); Status SyncInternal(bool use_fsync);
}; };
} // namespace rocksdb } // namespace rocksdb
...@@ -24,6 +24,9 @@ TEST_F(WritableFileWriterTest, RangeSync) { ...@@ -24,6 +24,9 @@ TEST_F(WritableFileWriterTest, RangeSync) {
size_ += data.size(); size_ += data.size();
return Status::OK(); return Status::OK();
} }
virtual Status Truncate(uint64_t size) override {
return Status::OK();
}
Status Close() override { Status Close() override {
EXPECT_GE(size_, last_synced_ + kMb); EXPECT_GE(size_, last_synced_ + kMb);
EXPECT_LT(size_, last_synced_ + 2 * kMb); EXPECT_LT(size_, last_synced_ + 2 * kMb);
......
...@@ -232,7 +232,9 @@ class WritableFileImpl : public WritableFile { ...@@ -232,7 +232,9 @@ class WritableFileImpl : public WritableFile {
virtual Status Append(const Slice& data) override { virtual Status Append(const Slice& data) override {
return file_->Append(data); return file_->Append(data);
} }
virtual Status Truncate(uint64_t size) override {
return Status::OK();
}
virtual Status Close() override { return Status::OK(); } virtual Status Close() override { return Status::OK(); }
virtual Status Flush() override { return Status::OK(); } virtual Status Flush() override { return Status::OK(); }
virtual Status Sync() override { return Status::OK(); } virtual Status Sync() override { return Status::OK(); }
......
...@@ -250,7 +250,9 @@ class MockWritableFile : public WritableFile { ...@@ -250,7 +250,9 @@ class MockWritableFile : public WritableFile {
} }
return Status::OK(); return Status::OK();
} }
virtual Status Truncate(uint64_t size) override {
return Status::OK();
}
virtual Status Close() override { return file_->Fsync(); } virtual Status Close() override { return file_->Fsync(); }
virtual Status Flush() override { return Status::OK(); } virtual Status Flush() override { return Status::OK(); }
......
...@@ -184,6 +184,10 @@ class StringSink: public WritableFile { ...@@ -184,6 +184,10 @@ class StringSink: public WritableFile {
const std::string& contents() const { return contents_; } const std::string& contents() const { return contents_; }
virtual Status Truncate(uint64_t size) override {
contents_.resize(size);
return Status::OK();
}
virtual Status Close() override { return Status::OK(); } virtual Status Close() override { return Status::OK(); }
virtual Status Flush() override { virtual Status Flush() override {
if (reader_contents_ != nullptr) { if (reader_contents_ != nullptr) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册