提交 ddc8b449 编写于 作者: D Dmitri Smirnov

Address code review comments both GH and internal

 Fix compilation issues on GCC/CLANG
 Address Windows Release test build issues due to Sync
上级 30e82d5c
...@@ -1884,6 +1884,8 @@ TEST_F(DBIterWithMergeIterTest, InnerMergeIterator2) { ...@@ -1884,6 +1884,8 @@ TEST_F(DBIterWithMergeIterTest, InnerMergeIterator2) {
ASSERT_EQ(db_iter_->value().ToString(), "4"); ASSERT_EQ(db_iter_->value().ToString(), "4");
} }
#if !(defined NDEBUG) || !defined(OS_WIN)
TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace1) { TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace1) {
// Test Prev() when one child iterator is at its end but more rows // Test Prev() when one child iterator is at its end but more rows
// are added. // are added.
...@@ -2234,6 +2236,7 @@ TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace8) { ...@@ -2234,6 +2236,7 @@ TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace8) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
#endif // #if !(defined NDEBUG) || !defined(OS_WIN)
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {
......
...@@ -395,7 +395,7 @@ class RandomAccessFile { ...@@ -395,7 +395,7 @@ class RandomAccessFile {
// Used by the file_reader_writer to decide if the ReadAhead wrapper // Used by the file_reader_writer to decide if the ReadAhead wrapper
// should simply forward the call and do not enact buffering or locking. // should simply forward the call and do not enact buffering or locking.
virtual bool ReaderWriterForward() const { virtual bool ShouldForwardRawRequest() const {
return false; return false;
} }
...@@ -448,18 +448,20 @@ class WritableFile { ...@@ -448,18 +448,20 @@ class WritableFile {
return true; return true;
} }
const size_t c_DefaultPageSize = 4 * 1024;
// This is needed when you want to allocate // This is needed when you want to allocate
// AlignedBuffer for use with file I/O classes // AlignedBuffer for use with file I/O classes
// Used for unbuffered file I/O when UseOSBuffer() returns false // Used for unbuffered file I/O when UseOSBuffer() returns false
virtual size_t GetRequiredBufferAlignment() const { virtual size_t GetRequiredBufferAlignment() const {
return 4 * 1024; return c_DefaultPageSize;
} }
virtual Status Append(const Slice& data) = 0; virtual Status Append(const Slice& data) = 0;
// Positional write for unbuffered access default forward // Positioned write for unbuffered access default forward
// to simple append as most of the tests are buffered by default // to simple append as most of the tests are buffered by default
virtual Status Append(const Slice& /* data */, uint64_t /* offset */) { virtual Status PositionedAppend(const Slice& /* data */, uint64_t /* offset */) {
return Status::NotSupported(); return Status::NotSupported();
} }
...@@ -467,7 +469,9 @@ class WritableFile { ...@@ -467,7 +469,9 @@ class WritableFile {
// before closing. It is not always possible to keep track of the file // before closing. It is not always possible to keep track of the file
// size due to whole pages writes. The behavior is undefined if called // size due to whole pages writes. The behavior is undefined if called
// with other writes to follow. // with other writes to follow.
virtual Status Truncate(uint64_t size) = 0; virtual Status Truncate(uint64_t size) {
return Status::OK();
}
virtual Status Close() = 0; virtual Status Close() = 0;
virtual Status Flush() = 0; virtual Status Flush() = 0;
virtual Status Sync() = 0; // sync data virtual Status Sync() = 0; // sync data
...@@ -868,6 +872,9 @@ class WritableFileWrapper : public WritableFile { ...@@ -868,6 +872,9 @@ class WritableFileWrapper : public WritableFile {
explicit WritableFileWrapper(WritableFile* t) : target_(t) { } explicit WritableFileWrapper(WritableFile* t) : target_(t) { }
Status Append(const Slice& data) override { return target_->Append(data); } Status Append(const Slice& data) override { return target_->Append(data); }
Status PositionedAppend(const Slice& data, uint64_t offset) override {
return target_->PositionedAppend(data, offset);
}
Status Truncate(uint64_t size) override { return target_->Truncate(size); } Status Truncate(uint64_t size) override { return target_->Truncate(size); }
Status Close() override { return target_->Close(); } Status Close() override { return target_->Close(); }
Status Flush() override { return target_->Flush(); } Status Flush() override { return target_->Flush(); }
......
...@@ -704,7 +704,7 @@ class WinRandomAccessFile : public RandomAccessFile { ...@@ -704,7 +704,7 @@ class WinRandomAccessFile : public RandomAccessFile {
// Unbuffered access, use internal buffer for reads // Unbuffered access, use internal buffer for reads
if (!use_os_buffer_) { if (!use_os_buffer_) {
buffer_.SetAlignment(alignment); buffer_.Alignment(alignment);
// Random read, no need in a big buffer // Random read, no need in a big buffer
// We read things in database blocks which are likely to be similar to // We read things in database blocks which are likely to be similar to
// the alignment we use. // the alignment we use.
...@@ -734,7 +734,7 @@ class WinRandomAccessFile : public RandomAccessFile { ...@@ -734,7 +734,7 @@ class WinRandomAccessFile : public RandomAccessFile {
// Let's see if at least some of the requested data is already // Let's see if at least some of the requested data is already
// in the buffer // in the buffer
if (offset >= buffered_start_ && if (offset >= buffered_start_ &&
offset < (buffered_start_ + buffer_.GetCurrentSize())) { offset < (buffered_start_ + buffer_.CurrentSize())) {
size_t buffer_offset = offset - buffered_start_; size_t buffer_offset = offset - buffered_start_;
r = buffer_.Read(dest, buffer_offset, left); r = buffer_.Read(dest, buffer_offset, left);
assert(r >= 0); assert(r >= 0);
...@@ -747,7 +747,7 @@ class WinRandomAccessFile : public RandomAccessFile { ...@@ -747,7 +747,7 @@ class WinRandomAccessFile : public RandomAccessFile {
// Still some left or none was buffered // Still some left or none was buffered
if (left > 0) { if (left > 0) {
// Figure out the start/end offset for reading and amount to read // Figure out the start/end offset for reading and amount to read
const size_t alignment = buffer_.GetAlignment(); const size_t alignment = buffer_.Alignment();
const size_t start_page_start = const size_t start_page_start =
TruncateToPageBoundary(alignment, offset); TruncateToPageBoundary(alignment, offset);
const size_t end_page_start = const size_t end_page_start =
...@@ -755,18 +755,18 @@ class WinRandomAccessFile : public RandomAccessFile { ...@@ -755,18 +755,18 @@ class WinRandomAccessFile : public RandomAccessFile {
const size_t actual_bytes_toread = const size_t actual_bytes_toread =
(end_page_start - start_page_start) + alignment; (end_page_start - start_page_start) + alignment;
if (buffer_.GetCapacity() < actual_bytes_toread) { if (buffer_.Capacity() < actual_bytes_toread) {
buffer_.AllocateNewBuffer(actual_bytes_toread); buffer_.AllocateNewBuffer(actual_bytes_toread);
} else { } else {
buffer_.Clear(); buffer_.Clear();
} }
SSIZE_T read = 0; SSIZE_T read = 0;
read = pread(hFile_, buffer_.GetDestination(), actual_bytes_toread, read = pread(hFile_, buffer_.Destination(), actual_bytes_toread,
start_page_start); start_page_start);
if (read > 0) { if (read > 0) {
buffer_.SetSize(read); buffer_.Size(read);
buffered_start_ = start_page_start; buffered_start_ = start_page_start;
// Let's figure out how much we read from the users standpoint // Let's figure out how much we read from the users standpoint
...@@ -797,7 +797,7 @@ class WinRandomAccessFile : public RandomAccessFile { ...@@ -797,7 +797,7 @@ class WinRandomAccessFile : public RandomAccessFile {
return s; return s;
} }
virtual bool ReaderWriterForward() const override { virtual bool ShouldForwardRawRequest() const override {
return true; return true;
} }
...@@ -880,7 +880,7 @@ class WinWritableFile : public WritableFile { ...@@ -880,7 +880,7 @@ class WinWritableFile : public WritableFile {
return s; return s;
} }
virtual Status Append(const Slice& data, uint64_t offset) override { virtual Status PositionedAppend(const Slice& data, uint64_t offset) override {
Status s; Status s;
SSIZE_T ret = pwrite(hFile_, data.data(), SSIZE_T ret = pwrite(hFile_, data.data(),
......
...@@ -25,7 +25,7 @@ inline size_t Roundup(size_t x, size_t y) { ...@@ -25,7 +25,7 @@ inline size_t Roundup(size_t x, size_t y) {
// This class is to manage an aligned user // This class is to manage an aligned user
// allocated buffer for unbuffered I/O purposes // allocated buffer for unbuffered I/O purposes
// though it does not make a difference if you need a buffer. // though can be used for any purpose.
class AlignedBuffer { class AlignedBuffer {
size_t alignment_; size_t alignment_;
std::unique_ptr<char[]> buf_; std::unique_ptr<char[]> buf_;
...@@ -58,19 +58,19 @@ public: ...@@ -58,19 +58,19 @@ public:
AlignedBuffer& operator=(const AlignedBuffer&) = delete; AlignedBuffer& operator=(const AlignedBuffer&) = delete;
size_t GetAlignment() const { size_t Alignment() const {
return alignment_; return alignment_;
} }
size_t GetCapacity() const { size_t Capacity() const {
return capacity_; return capacity_;
} }
size_t GetCurrentSize() const { size_t CurrentSize() const {
return cursize_; return cursize_;
} }
const char* GetBufferStart() const { const char* BufferStart() const {
return bufstart_; return bufstart_;
} }
...@@ -78,7 +78,7 @@ public: ...@@ -78,7 +78,7 @@ public:
cursize_ = 0; cursize_ = 0;
} }
void SetAlignment(size_t alignment) { void Alignment(size_t alignment) {
assert(alignment > 0); assert(alignment > 0);
assert((alignment & (alignment - 1)) == 0); assert((alignment & (alignment - 1)) == 0);
alignment_ = alignment; alignment_ = alignment;
...@@ -143,11 +143,11 @@ public: ...@@ -143,11 +143,11 @@ public:
} }
// Returns place to start writing // Returns place to start writing
char* GetDestination() { char* Destination() {
return bufstart_ + cursize_; return bufstart_ + cursize_;
} }
void SetSize(size_t cursize) { void Size(size_t cursize) {
cursize_ = cursize; cursize_ = cursize;
} }
}; };
......
...@@ -20,6 +20,11 @@ ...@@ -20,6 +20,11 @@
#include "util/sync_point.h" #include "util/sync_point.h"
namespace rocksdb { namespace rocksdb {
namespace {
const size_t c_OneMb = (1 << 20);
}
Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
Status s = file_->Read(n, result, scratch); Status s = file_->Read(n, result, scratch);
IOSTATS_ADD(bytes_read, result->size()); IOSTATS_ADD(bytes_read, result->size());
...@@ -62,26 +67,26 @@ Status WritableFileWriter::Append(const Slice& data) { ...@@ -62,26 +67,26 @@ Status WritableFileWriter::Append(const Slice& data) {
// Flush only when I/O is buffered // Flush only when I/O is buffered
if (use_os_buffer_ && if (use_os_buffer_ &&
(buf_.GetCapacity() - buf_.GetCurrentSize()) < left) { (buf_.Capacity() - buf_.CurrentSize()) < left) {
if (buf_.GetCurrentSize() > 0) { if (buf_.CurrentSize() > 0) {
s = Flush(); s = Flush();
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
} }
if (buf_.GetCapacity() < (1 << 20)) { if (buf_.Capacity() < c_OneMb) {
size_t desiredCapacity = buf_.GetCapacity() * 2; size_t desiredCapacity = buf_.Capacity() * 2;
desiredCapacity = std::min(desiredCapacity, size_t(1 << 20)); desiredCapacity = std::min(desiredCapacity, c_OneMb);
buf_.AllocateNewBuffer(desiredCapacity); buf_.AllocateNewBuffer(desiredCapacity);
} }
assert(buf_.GetCurrentSize() == 0); assert(buf_.CurrentSize() == 0);
} }
// We never write directly to disk with unbuffered I/O on. // We never write directly to disk with unbuffered I/O on.
// or we simply use it for its original purpose to accumulate many small // or we simply use it for its original purpose to accumulate many small
// chunks // chunks
if (!use_os_buffer_ || (buf_.GetCapacity() >= left)) { if (!use_os_buffer_ || (buf_.Capacity() >= left)) {
while (left > 0) { while (left > 0) {
size_t appended = buf_.Append(src, left); size_t appended = buf_.Append(src, left);
left -= appended; left -= appended;
...@@ -96,16 +101,16 @@ Status WritableFileWriter::Append(const Slice& data) { ...@@ -96,16 +101,16 @@ Status WritableFileWriter::Append(const Slice& data) {
// We double the buffer here because // We double the buffer here because
// Flush calls do not keep up with the incoming bytes // Flush calls do not keep up with the incoming bytes
// This is the only place when buffer is changed with unbuffered I/O // This is the only place when buffer is changed with unbuffered I/O
if (buf_.GetCapacity() < (1 << 20)) { if (buf_.Capacity() < (1 << 20)) {
size_t desiredCapacity = buf_.GetCapacity() * 2; size_t desiredCapacity = buf_.Capacity() * 2;
desiredCapacity = std::min(desiredCapacity, size_t(1 << 20)); desiredCapacity = std::min(desiredCapacity, c_OneMb);
buf_.AllocateNewBuffer(desiredCapacity); buf_.AllocateNewBuffer(desiredCapacity);
} }
} }
} }
} else { } else {
// Writing directly to file bypassing the buffer // Writing directly to file bypassing the buffer
assert(buf_.GetCurrentSize() == 0); assert(buf_.CurrentSize() == 0);
s = WriteBuffered(src, left); s = WriteBuffered(src, left);
} }
...@@ -153,9 +158,9 @@ Status WritableFileWriter::Flush() { ...@@ -153,9 +158,9 @@ Status WritableFileWriter::Flush() {
Status s; Status s;
TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2); TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2);
if (buf_.GetCurrentSize() > 0) { if (buf_.CurrentSize() > 0) {
if (use_os_buffer_) { if (use_os_buffer_) {
s = WriteBuffered(buf_.GetBufferStart(), buf_.GetCurrentSize()); s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize());
} else { } else {
s = WriteUnbuffered(); s = WriteUnbuffered();
} }
...@@ -260,7 +265,7 @@ size_t WritableFileWriter::RequestToken(size_t bytes, bool align) { ...@@ -260,7 +265,7 @@ size_t WritableFileWriter::RequestToken(size_t bytes, bool align) {
// Here we may actually require more than burst and block // Here we may actually require more than burst and block
// but we can not write less than one page at a time on unbuffered // but we can not write less than one page at a time on unbuffered
// thus we may want not to use ratelimiter s // thus we may want not to use ratelimiter s
size_t alignment = buf_.GetAlignment(); size_t alignment = buf_.Alignment();
bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes)); bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes));
} }
rate_limiter_->Request(bytes, io_priority); rate_limiter_->Request(bytes, io_priority);
...@@ -294,7 +299,7 @@ Status WritableFileWriter::WriteBuffered(const char* data, size_t size) { ...@@ -294,7 +299,7 @@ Status WritableFileWriter::WriteBuffered(const char* data, size_t size) {
left -= allowed; left -= allowed;
src += allowed; src += allowed;
} }
buf_.SetSize(0); buf_.Size(0);
return s; return s;
} }
...@@ -311,25 +316,25 @@ Status WritableFileWriter::WriteUnbuffered() { ...@@ -311,25 +316,25 @@ Status WritableFileWriter::WriteUnbuffered() {
Status s; Status s;
assert(!use_os_buffer_); assert(!use_os_buffer_);
const size_t alignment = buf_.GetAlignment(); const size_t alignment = buf_.Alignment();
assert((next_write_offset_ % alignment) == 0); assert((next_write_offset_ % alignment) == 0);
// Calculate whole page final file advance if all writes succeed // Calculate whole page final file advance if all writes succeed
size_t file_advance = size_t file_advance =
TruncateToPageBoundary(alignment, buf_.GetCurrentSize()); TruncateToPageBoundary(alignment, buf_.CurrentSize());
// Calculate the leftover tail, we write it here padded with zeros BUT we // Calculate the leftover tail, we write it here padded with zeros BUT we
// will write // will write
// it again in the future either on Close() OR when the current whole page // it again in the future either on Close() OR when the current whole page
// fills out // fills out
size_t leftover_tail = buf_.GetCurrentSize() - file_advance; size_t leftover_tail = buf_.CurrentSize() - file_advance;
// Round up and pad // Round up and pad
buf_.PadToAlignmentWith(0); buf_.PadToAlignmentWith(0);
const char* src = buf_.GetBufferStart(); const char* src = buf_.BufferStart();
uint64_t write_offset = next_write_offset_; uint64_t write_offset = next_write_offset_;
size_t left = buf_.GetCurrentSize(); size_t left = buf_.CurrentSize();
while (left > 0) { while (left > 0) {
// Check how much is allowed // Check how much is allowed
...@@ -339,9 +344,9 @@ Status WritableFileWriter::WriteUnbuffered() { ...@@ -339,9 +344,9 @@ Status WritableFileWriter::WriteUnbuffered() {
IOSTATS_TIMER_GUARD(write_nanos); IOSTATS_TIMER_GUARD(write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
// Unbuffered writes must be positional // Unbuffered writes must be positional
s = writable_file_->Append(Slice(src, size), write_offset); s = writable_file_->PositionedAppend(Slice(src, size), write_offset);
if (!s.ok()) { if (!s.ok()) {
buf_.SetSize(file_advance + leftover_tail); buf_.Size(file_advance + leftover_tail);
return s; return s;
} }
} }
...@@ -375,7 +380,7 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { ...@@ -375,7 +380,7 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
size_t readahead_size) size_t readahead_size)
: file_(std::move(file)), : file_(std::move(file)),
readahead_size_(readahead_size), readahead_size_(readahead_size),
forward_calls_(file_->ReaderWriterForward()), forward_calls_(file_->ShouldForwardRawRequest()),
buffer_(new char[readahead_size_]), buffer_(new char[readahead_size_]),
buffer_offset_(0), buffer_offset_(0),
buffer_len_(0) {} buffer_len_(0) {}
...@@ -404,7 +409,7 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { ...@@ -404,7 +409,7 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
// if offset between [buffer_offset_, buffer_offset_ + buffer_len> // if offset between [buffer_offset_, buffer_offset_ + buffer_len>
if (offset >= buffer_offset_ && offset < buffer_len_ + buffer_offset_) { if (offset >= buffer_offset_ && offset < buffer_len_ + buffer_offset_) {
uint64_t offset_in_buffer = offset - buffer_offset_; uint64_t offset_in_buffer = offset - buffer_offset_;
copied = std::min(static_cast<uint64_t>(buffer_len_)-offset_in_buffer, copied = std::min(static_cast<uint64_t>(buffer_len_) - offset_in_buffer,
static_cast<uint64_t>(n)); static_cast<uint64_t>(n));
memcpy(scratch, buffer_.get() + offset_in_buffer, copied); memcpy(scratch, buffer_.get() + offset_in_buffer, copied);
if (copied == n) { if (copied == n) {
...@@ -458,8 +463,9 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { ...@@ -458,8 +463,9 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile( std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile(
std::unique_ptr<RandomAccessFile>&& file, size_t readahead_size) { std::unique_ptr<RandomAccessFile>&& file, size_t readahead_size) {
return std::make_unique<ReadaheadRandomAccessFile> ( std::unique_ptr<RandomAccessFile> result(
std::move(file), readahead_size); new ReadaheadRandomAccessFile(std::move(file), readahead_size));
return result;
} }
} // namespace rocksdb } // namespace rocksdb
...@@ -122,7 +122,7 @@ class WritableFileWriter { ...@@ -122,7 +122,7 @@ class WritableFileWriter {
bytes_per_sync_(options.bytes_per_sync), bytes_per_sync_(options.bytes_per_sync),
rate_limiter_(options.rate_limiter) { rate_limiter_(options.rate_limiter) {
buf_.SetAlignment(writable_file_->GetRequiredBufferAlignment()); buf_.Alignment(writable_file_->GetRequiredBufferAlignment());
buf_.AllocateNewBuffer(65536); buf_.AllocateNewBuffer(65536);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册