diff --git a/db/db_iter_test.cc b/db/db_iter_test.cc index 73ef91b6a925c1c30addfb96ef6fc8e52d0772e3..90ec90831e407ff42930645d873a180dd6d5b297 100644 --- a/db/db_iter_test.cc +++ b/db/db_iter_test.cc @@ -1884,6 +1884,8 @@ TEST_F(DBIterWithMergeIterTest, InnerMergeIterator2) { ASSERT_EQ(db_iter_->value().ToString(), "4"); } +#if !(defined NDEBUG) || !defined(OS_WIN) + TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace1) { // Test Prev() when one child iterator is at its end but more rows // are added. @@ -2234,6 +2236,7 @@ TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace8) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } +#endif // #if !(defined NDEBUG) || !defined(OS_WIN) } // namespace rocksdb int main(int argc, char** argv) { diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index b346effbad960b4cdfbc81996232ecf5d8d7252a..6300a47034c8638735a16f5290bcced29e2dd4ed 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -395,7 +395,7 @@ class RandomAccessFile { // Used by the file_reader_writer to decide if the ReadAhead wrapper // should simply forward the call and do not enact buffering or locking. - virtual bool ReaderWriterForward() const { + virtual bool ShouldForwardRawRequest() const { return false; } @@ -448,18 +448,20 @@ class WritableFile { return true; } + const size_t c_DefaultPageSize = 4 * 1024; + // This is needed when you want to allocate // AlignedBuffer for use with file I/O classes // Used for unbuffered file I/O when UseOSBuffer() returns false virtual size_t GetRequiredBufferAlignment() const { - return 4 * 1024; + return c_DefaultPageSize; } virtual Status Append(const Slice& data) = 0; - // Positional write for unbuffered access default forward + // Positioned write for unbuffered access default forward // to simple append as most of the tests are buffered by default - virtual Status Append(const Slice& /* data */, uint64_t /* offset */) { + virtual Status PositionedAppend(const Slice& /* data */, uint64_t /* offset */) { return Status::NotSupported(); } @@ -467,7 +469,9 @@ class WritableFile { // before closing. It is not always possible to keep track of the file // size due to whole pages writes. The behavior is undefined if called // with other writes to follow. - virtual Status Truncate(uint64_t size) = 0; + virtual Status Truncate(uint64_t size) { + return Status::OK(); + } virtual Status Close() = 0; virtual Status Flush() = 0; virtual Status Sync() = 0; // sync data @@ -868,6 +872,9 @@ class WritableFileWrapper : public WritableFile { explicit WritableFileWrapper(WritableFile* t) : target_(t) { } Status Append(const Slice& data) override { return target_->Append(data); } + Status PositionedAppend(const Slice& data, uint64_t offset) override { + return target_->PositionedAppend(data, offset); + } Status Truncate(uint64_t size) override { return target_->Truncate(size); } Status Close() override { return target_->Close(); } Status Flush() override { return target_->Flush(); } diff --git a/port/win/env_win.cc b/port/win/env_win.cc index c6d0cb3caf766345ccc337356f437164d59e2b86..2668ad1d28f04a3650e858d0f4f45f107fe197d1 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -704,7 +704,7 @@ class WinRandomAccessFile : public RandomAccessFile { // Unbuffered access, use internal buffer for reads if (!use_os_buffer_) { - buffer_.SetAlignment(alignment); + buffer_.Alignment(alignment); // Random read, no need in a big buffer // We read things in database blocks which are likely to be similar to // the alignment we use. @@ -734,7 +734,7 @@ class WinRandomAccessFile : public RandomAccessFile { // Let's see if at least some of the requested data is already // in the buffer if (offset >= buffered_start_ && - offset < (buffered_start_ + buffer_.GetCurrentSize())) { + offset < (buffered_start_ + buffer_.CurrentSize())) { size_t buffer_offset = offset - buffered_start_; r = buffer_.Read(dest, buffer_offset, left); assert(r >= 0); @@ -747,7 +747,7 @@ class WinRandomAccessFile : public RandomAccessFile { // Still some left or none was buffered if (left > 0) { // Figure out the start/end offset for reading and amount to read - const size_t alignment = buffer_.GetAlignment(); + const size_t alignment = buffer_.Alignment(); const size_t start_page_start = TruncateToPageBoundary(alignment, offset); const size_t end_page_start = @@ -755,18 +755,18 @@ class WinRandomAccessFile : public RandomAccessFile { const size_t actual_bytes_toread = (end_page_start - start_page_start) + alignment; - if (buffer_.GetCapacity() < actual_bytes_toread) { + if (buffer_.Capacity() < actual_bytes_toread) { buffer_.AllocateNewBuffer(actual_bytes_toread); } else { buffer_.Clear(); } SSIZE_T read = 0; - read = pread(hFile_, buffer_.GetDestination(), actual_bytes_toread, + read = pread(hFile_, buffer_.Destination(), actual_bytes_toread, start_page_start); if (read > 0) { - buffer_.SetSize(read); + buffer_.Size(read); buffered_start_ = start_page_start; // Let's figure out how much we read from the users standpoint @@ -797,7 +797,7 @@ class WinRandomAccessFile : public RandomAccessFile { return s; } - virtual bool ReaderWriterForward() const override { + virtual bool ShouldForwardRawRequest() const override { return true; } @@ -880,7 +880,7 @@ class WinWritableFile : public WritableFile { return s; } - virtual Status Append(const Slice& data, uint64_t offset) override { + virtual Status PositionedAppend(const Slice& data, uint64_t offset) override { Status s; SSIZE_T ret = pwrite(hFile_, data.data(), diff --git a/util/aligned_buffer.h b/util/aligned_buffer.h index 7fa80926cc8b1a85d4f94657dbbc0d7e87087d80..2244316feed2b44ba0b6be50fce61601f005160f 100644 --- a/util/aligned_buffer.h +++ b/util/aligned_buffer.h @@ -25,7 +25,7 @@ inline size_t Roundup(size_t x, size_t y) { // This class is to manage an aligned user // allocated buffer for unbuffered I/O purposes -// though it does not make a difference if you need a buffer. +// though can be used for any purpose. class AlignedBuffer { size_t alignment_; std::unique_ptr buf_; @@ -58,19 +58,19 @@ public: AlignedBuffer& operator=(const AlignedBuffer&) = delete; - size_t GetAlignment() const { + size_t Alignment() const { return alignment_; } - size_t GetCapacity() const { + size_t Capacity() const { return capacity_; } - size_t GetCurrentSize() const { + size_t CurrentSize() const { return cursize_; } - const char* GetBufferStart() const { + const char* BufferStart() const { return bufstart_; } @@ -78,7 +78,7 @@ public: cursize_ = 0; } - void SetAlignment(size_t alignment) { + void Alignment(size_t alignment) { assert(alignment > 0); assert((alignment & (alignment - 1)) == 0); alignment_ = alignment; @@ -143,11 +143,11 @@ public: } // Returns place to start writing - char* GetDestination() { + char* Destination() { return bufstart_ + cursize_; } - void SetSize(size_t cursize) { + void Size(size_t cursize) { cursize_ = cursize; } }; diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index 453bb7461f20626ee15196299f7a54aaa5a5cc4a..86d70b62d608a0cd381b4fc1e06c8fa2e4c4e4d7 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -20,6 +20,11 @@ #include "util/sync_point.h" namespace rocksdb { + +namespace { + const size_t c_OneMb = (1 << 20); +} + Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { Status s = file_->Read(n, result, scratch); IOSTATS_ADD(bytes_read, result->size()); @@ -62,26 +67,26 @@ Status WritableFileWriter::Append(const Slice& data) { // Flush only when I/O is buffered if (use_os_buffer_ && - (buf_.GetCapacity() - buf_.GetCurrentSize()) < left) { - if (buf_.GetCurrentSize() > 0) { + (buf_.Capacity() - buf_.CurrentSize()) < left) { + if (buf_.CurrentSize() > 0) { s = Flush(); if (!s.ok()) { return s; } } - if (buf_.GetCapacity() < (1 << 20)) { - size_t desiredCapacity = buf_.GetCapacity() * 2; - desiredCapacity = std::min(desiredCapacity, size_t(1 << 20)); + if (buf_.Capacity() < c_OneMb) { + size_t desiredCapacity = buf_.Capacity() * 2; + desiredCapacity = std::min(desiredCapacity, c_OneMb); buf_.AllocateNewBuffer(desiredCapacity); } - assert(buf_.GetCurrentSize() == 0); + assert(buf_.CurrentSize() == 0); } // We never write directly to disk with unbuffered I/O on. // or we simply use it for its original purpose to accumulate many small // chunks - if (!use_os_buffer_ || (buf_.GetCapacity() >= left)) { + if (!use_os_buffer_ || (buf_.Capacity() >= left)) { while (left > 0) { size_t appended = buf_.Append(src, left); left -= appended; @@ -96,16 +101,16 @@ Status WritableFileWriter::Append(const Slice& data) { // We double the buffer here because // Flush calls do not keep up with the incoming bytes // This is the only place when buffer is changed with unbuffered I/O - if (buf_.GetCapacity() < (1 << 20)) { - size_t desiredCapacity = buf_.GetCapacity() * 2; - desiredCapacity = std::min(desiredCapacity, size_t(1 << 20)); + if (buf_.Capacity() < (1 << 20)) { + size_t desiredCapacity = buf_.Capacity() * 2; + desiredCapacity = std::min(desiredCapacity, c_OneMb); buf_.AllocateNewBuffer(desiredCapacity); } } } } else { // Writing directly to file bypassing the buffer - assert(buf_.GetCurrentSize() == 0); + assert(buf_.CurrentSize() == 0); s = WriteBuffered(src, left); } @@ -153,9 +158,9 @@ Status WritableFileWriter::Flush() { Status s; TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2); - if (buf_.GetCurrentSize() > 0) { + if (buf_.CurrentSize() > 0) { if (use_os_buffer_) { - s = WriteBuffered(buf_.GetBufferStart(), buf_.GetCurrentSize()); + s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize()); } else { s = WriteUnbuffered(); } @@ -260,7 +265,7 @@ size_t WritableFileWriter::RequestToken(size_t bytes, bool align) { // Here we may actually require more than burst and block // but we can not write less than one page at a time on unbuffered // thus we may want not to use ratelimiter s - size_t alignment = buf_.GetAlignment(); + size_t alignment = buf_.Alignment(); bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes)); } rate_limiter_->Request(bytes, io_priority); @@ -294,7 +299,7 @@ Status WritableFileWriter::WriteBuffered(const char* data, size_t size) { left -= allowed; src += allowed; } - buf_.SetSize(0); + buf_.Size(0); return s; } @@ -311,25 +316,25 @@ Status WritableFileWriter::WriteUnbuffered() { Status s; assert(!use_os_buffer_); - const size_t alignment = buf_.GetAlignment(); + const size_t alignment = buf_.Alignment(); assert((next_write_offset_ % alignment) == 0); // Calculate whole page final file advance if all writes succeed size_t file_advance = - TruncateToPageBoundary(alignment, buf_.GetCurrentSize()); + TruncateToPageBoundary(alignment, buf_.CurrentSize()); // Calculate the leftover tail, we write it here padded with zeros BUT we // will write // it again in the future either on Close() OR when the current whole page // fills out - size_t leftover_tail = buf_.GetCurrentSize() - file_advance; + size_t leftover_tail = buf_.CurrentSize() - file_advance; // Round up and pad buf_.PadToAlignmentWith(0); - const char* src = buf_.GetBufferStart(); + const char* src = buf_.BufferStart(); uint64_t write_offset = next_write_offset_; - size_t left = buf_.GetCurrentSize(); + size_t left = buf_.CurrentSize(); while (left > 0) { // Check how much is allowed @@ -339,9 +344,9 @@ Status WritableFileWriter::WriteUnbuffered() { IOSTATS_TIMER_GUARD(write_nanos); TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); // Unbuffered writes must be positional - s = writable_file_->Append(Slice(src, size), write_offset); + s = writable_file_->PositionedAppend(Slice(src, size), write_offset); if (!s.ok()) { - buf_.SetSize(file_advance + leftover_tail); + buf_.Size(file_advance + leftover_tail); return s; } } @@ -375,7 +380,7 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { size_t readahead_size) : file_(std::move(file)), readahead_size_(readahead_size), - forward_calls_(file_->ReaderWriterForward()), + forward_calls_(file_->ShouldForwardRawRequest()), buffer_(new char[readahead_size_]), buffer_offset_(0), buffer_len_(0) {} @@ -404,7 +409,7 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { // if offset between [buffer_offset_, buffer_offset_ + buffer_len> if (offset >= buffer_offset_ && offset < buffer_len_ + buffer_offset_) { uint64_t offset_in_buffer = offset - buffer_offset_; - copied = std::min(static_cast(buffer_len_)-offset_in_buffer, + copied = std::min(static_cast(buffer_len_) - offset_in_buffer, static_cast(n)); memcpy(scratch, buffer_.get() + offset_in_buffer, copied); if (copied == n) { @@ -458,8 +463,9 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { std::unique_ptr NewReadaheadRandomAccessFile( std::unique_ptr&& file, size_t readahead_size) { - return std::make_unique ( - std::move(file), readahead_size); + std::unique_ptr result( + new ReadaheadRandomAccessFile(std::move(file), readahead_size)); + return result; } } // namespace rocksdb diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index a6d52c88148b6d2ab8048ff9b03d8ba995779ea4..f33965dc6d6b7b0916991d4d7c7632979c6aef59 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -122,7 +122,7 @@ class WritableFileWriter { bytes_per_sync_(options.bytes_per_sync), rate_limiter_(options.rate_limiter) { - buf_.SetAlignment(writable_file_->GetRequiredBufferAlignment()); + buf_.Alignment(writable_file_->GetRequiredBufferAlignment()); buf_.AllocateNewBuffer(65536); }