提交 6388e7f4 编写于 作者: S Siying Dong

Merge pull request #798 from yuslepukhin/readahead_buffermanagement

Implement smart buffer management in Windows Env.
......@@ -373,6 +373,9 @@ DEFINE_int32(new_table_reader_for_compaction_inputs, true,
DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size");
DEFINE_int32(random_access_max_buffer_size, 1024 * 1024,
"Maximum windows randomaccess buffer size");
DEFINE_int32(bloom_bits, -1, "Bloom filter bits per key. Negative means"
" use default settings.");
DEFINE_int32(memtable_bloom_bits, 0, "Bloom filter bits per key for memtable. "
......@@ -2295,6 +2298,7 @@ class Benchmark {
options.new_table_reader_for_compaction_inputs =
FLAGS_new_table_reader_for_compaction_inputs;
options.compaction_readahead_size = FLAGS_compaction_readahead_size;
options.random_access_max_buffer_size = FLAGS_random_access_max_buffer_size;
options.statistics = dbstats;
if (FLAGS_enable_io_prio) {
FLAGS_env->LowerThreadPoolIOPriority(Env::LOW);
......
......@@ -88,6 +88,12 @@ struct EnvOptions {
// WAL writes
bool fallocate_with_keep_size = true;
// See DBOPtions doc
size_t compaction_readahead_size;
// See DBOPtions doc
size_t random_access_max_buffer_size;
// If not nullptr, write rate limiting is enabled for flush and compaction
RateLimiter* rate_limiter = nullptr;
};
......@@ -408,6 +414,11 @@ class RandomAccessFile {
return false;
}
// For cases when read-ahead is implemented in the platform dependent
// layer
virtual void EnableReadAhead() {
}
// Tries to get an unique ID for this file that will be the same each time
// the file is opened (and will stay the same while the file is open).
// Furthermore, it tries to make this ID at most "max_size" bytes. If such an
......
......@@ -1075,6 +1075,20 @@ struct DBOptions {
// Default: 0
size_t compaction_readahead_size;
// This is a maximum buffer size that is used by WinMmapReadableFile in
// unbuffered disk I/O mode. We need to maintain an aligned buffer for
// reads. We allow the buffer to grow until the specified value and then
// for bigger requests allocate one shot buffers. In unbuffered mode we
// always bypass read-ahead buffer at ReadaheadRandomAccessFile
// When read-ahead is required we then make use of compaction_readahead_size
// value and always try to read ahead. With read-ahead we always
// pre-allocate buffer to the size instead of growing it up to a limit.
//
// This option is currently honored only on Windows
//
// Default: 1 Mb
size_t random_access_max_buffer_size;
// Use adaptive mutex, which spins in the user space before resorting
// to kernel. This could reduce context switch when the mutex is not
// heavily contended. However, if the mutex is hot, we could end up
......
......@@ -688,28 +688,98 @@ class WinRandomAccessFile : public RandomAccessFile {
const std::string filename_;
HANDLE hFile_;
const bool use_os_buffer_;
bool read_ahead_;
const size_t compaction_readahead_size_;
const size_t random_access_max_buffer_size_;
mutable std::mutex buffer_mut_;
mutable AlignedBuffer buffer_;
mutable uint64_t
buffered_start_; // file offset set that is currently buffered
/*
* The function reads a requested amount of bytes into the specified aligned buffer
* Upon success the function sets the length of the buffer to the amount of bytes actually
* read even though it might be less than actually requested.
* It then copies the amount of bytes requested by the user (left) to the user supplied
* buffer (dest) and reduces left by the amount of bytes copied to the user buffer
*
* @user_offset [in] - offset on disk where the read was requested by the user
* @first_page_start [in] - actual page aligned disk offset that we want to read from
* @bytes_to_read [in] - total amount of bytes that will be read from disk which is generally
* greater or equal to the amount that the user has requested due to the
* either alignment requirements or read_ahead in effect.
* @left [in/out] total amount of bytes that needs to be copied to the user buffer. It is reduced
* by the amount of bytes that actually copied
* @buffer - buffer to use
* @dest - user supplied buffer
*/
SSIZE_T ReadIntoBuffer(uint64_t user_offset, uint64_t first_page_start,
size_t bytes_to_read, size_t& left, AlignedBuffer& buffer, char* dest) const {
assert(buffer.CurrentSize() == 0);
assert(buffer.Capacity() >= bytes_to_read);
SSIZE_T read = pread(hFile_, buffer.Destination(), bytes_to_read,
first_page_start);
if (read > 0) {
buffer.Size(read);
// Let's figure out how much we read from the users standpoint
if ((first_page_start + buffer.CurrentSize()) > user_offset) {
assert(first_page_start <= user_offset);
size_t buffer_offset = user_offset - first_page_start;
read = buffer.Read(dest, buffer_offset, left);
} else {
read = 0;
}
left -= read;
}
return read;
}
SSIZE_T ReadIntoOneShotBuffer(uint64_t user_offset, uint64_t first_page_start,
size_t bytes_to_read, size_t& left, char* dest) const {
AlignedBuffer bigBuffer;
bigBuffer.Alignment(buffer_.Alignment());
bigBuffer.AllocateNewBuffer(bytes_to_read);
return ReadIntoBuffer(user_offset, first_page_start, bytes_to_read, left,
bigBuffer, dest);
}
SSIZE_T ReadIntoInstanceBuffer(uint64_t user_offset, uint64_t first_page_start,
size_t bytes_to_read, size_t& left, char* dest) const {
SSIZE_T read = ReadIntoBuffer(user_offset, first_page_start, bytes_to_read,
left, buffer_, dest);
if (read > 0) {
buffered_start_ = first_page_start;
}
return read;
}
public:
WinRandomAccessFile(const std::string& fname, HANDLE hFile, size_t alignment,
const EnvOptions& options)
: filename_(fname),
hFile_(hFile),
use_os_buffer_(options.use_os_buffer),
read_ahead_(false),
compaction_readahead_size_(options.compaction_readahead_size),
random_access_max_buffer_size_(options.random_access_max_buffer_size),
buffer_(),
buffered_start_(0) {
assert(!options.use_mmap_reads);
// Unbuffered access, use internal buffer for reads
if (!use_os_buffer_) {
// Do not allocate the buffer either until the first request or
// until there is a call to allocate a read-ahead buffer
buffer_.Alignment(alignment);
// Random read, no need in a big buffer
// We read things in database blocks which are likely to be similar to
// the alignment we use.
buffer_.AllocateNewBuffer(alignment * 2);
}
}
......@@ -719,6 +789,10 @@ class WinRandomAccessFile : public RandomAccessFile {
}
}
virtual void EnableReadAhead() override {
this->Hint(SEQUENTIAL);
}
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
Status s;
......@@ -730,7 +804,7 @@ class WinRandomAccessFile : public RandomAccessFile {
// - use our own aligned buffer
// - always read at the offset of that is a multiple of alignment
if (!use_os_buffer_) {
std::lock_guard<std::mutex> lg(buffer_mut_);
std::unique_lock<std::mutex> lock(buffer_mut_);
// Let's see if at least some of the requested data is already
// in the buffer
......@@ -749,40 +823,40 @@ class WinRandomAccessFile : public RandomAccessFile {
if (left > 0) {
// Figure out the start/end offset for reading and amount to read
const size_t alignment = buffer_.Alignment();
const size_t start_page_start =
TruncateToPageBoundary(alignment, offset);
const size_t end_page_start =
TruncateToPageBoundary(alignment, offset + left - 1);
const size_t actual_bytes_toread =
(end_page_start - start_page_start) + alignment;
const size_t first_page_start =
TruncateToPageBoundary(alignment, offset);
if (buffer_.Capacity() < actual_bytes_toread) {
buffer_.AllocateNewBuffer(actual_bytes_toread);
} else {
buffer_.Clear();
size_t bytes_requested = left;
if (read_ahead_ && bytes_requested < compaction_readahead_size_) {
bytes_requested = compaction_readahead_size_;
}
SSIZE_T read = 0;
read = pread(hFile_, buffer_.Destination(), actual_bytes_toread,
start_page_start);
if (read > 0) {
buffer_.Size(read);
buffered_start_ = start_page_start;
const size_t last_page_start =
TruncateToPageBoundary(alignment, offset + bytes_requested - 1);
const size_t actual_bytes_toread =
(last_page_start - first_page_start) + alignment;
// Let's figure out how much we read from the users standpoint
if ((buffered_start_ + uint64_t(read)) > offset) {
size_t buffer_offset = offset - buffered_start_;
r = buffer_.Read(dest, buffer_offset, left);
if (buffer_.Capacity() < actual_bytes_toread) {
// If we are in read-ahead mode or the requested size
// exceeds max buffer size then use one-shot
// big buffer otherwise reallocate main buffer
if (read_ahead_ ||
(actual_bytes_toread > random_access_max_buffer_size_)) {
// Unlock the mutex since we are not using instance buffer
lock.unlock();
r = ReadIntoOneShotBuffer(offset, first_page_start,
actual_bytes_toread, left, dest);
} else {
r = 0;
buffer_.AllocateNewBuffer(actual_bytes_toread);
r = ReadIntoInstanceBuffer(offset, first_page_start,
actual_bytes_toread, left, dest);
}
left -= r;
} else {
r = read;
buffer_.Clear();
r = ReadIntoInstanceBuffer(offset, first_page_start,
actual_bytes_toread, left, dest);
}
}
} else {
r = pread(hFile_, scratch, left, offset);
if (r > 0) {
......@@ -802,7 +876,23 @@ class WinRandomAccessFile : public RandomAccessFile {
return true;
}
virtual void Hint(AccessPattern pattern) override {}
virtual void Hint(AccessPattern pattern) override {
if (pattern == SEQUENTIAL &&
!use_os_buffer_ &&
compaction_readahead_size_ > 0) {
std::lock_guard<std::mutex> lg(buffer_mut_);
if (!read_ahead_) {
read_ahead_ = true;
// This would allocate read-ahead size + 2 alignments
// - one for memory alignment which added implicitly by AlignedBuffer
// - We add one more alignment because we will read one alignment more
// from disk
buffer_.AllocateNewBuffer(compaction_readahead_size_ + buffer_.Alignment());
}
}
}
virtual Status InvalidateCache(size_t offset, size_t length) override {
return Status::OK();
......
......@@ -292,6 +292,8 @@ void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) {
env_options->use_mmap_writes = options.allow_mmap_writes;
env_options->set_fd_cloexec = options.is_fd_close_on_exec;
env_options->bytes_per_sync = options.bytes_per_sync;
env_options->compaction_readahead_size = options.compaction_readahead_size;
env_options->random_access_max_buffer_size = options.random_access_max_buffer_size;
env_options->rate_limiter = options.rate_limiter.get();
env_options->allow_fallocate = options.allow_fallocate;
}
......
......@@ -386,9 +386,15 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
: file_(std::move(file)),
readahead_size_(readahead_size),
forward_calls_(file_->ShouldForwardRawRequest()),
buffer_(new char[readahead_size_]),
buffer_(),
buffer_offset_(0),
buffer_len_(0) {}
buffer_len_(0) {
if (!forward_calls_) {
buffer_.reset(new char[readahead_size_]);
} else if (readahead_size_ > 0) {
file_->EnableReadAhead();
}
}
ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete;
......
......@@ -250,6 +250,7 @@ DBOptions::DBOptions()
access_hint_on_compaction_start(NORMAL),
new_table_reader_for_compaction_inputs(false),
compaction_readahead_size(0),
random_access_max_buffer_size(1024 * 1024),
use_adaptive_mutex(false),
bytes_per_sync(0),
wal_bytes_per_sync(0),
......@@ -310,6 +311,7 @@ DBOptions::DBOptions(const Options& options)
new_table_reader_for_compaction_inputs(
options.new_table_reader_for_compaction_inputs),
compaction_readahead_size(options.compaction_readahead_size),
random_access_max_buffer_size(options.random_access_max_buffer_size),
use_adaptive_mutex(options.use_adaptive_mutex),
bytes_per_sync(options.bytes_per_sync),
wal_bytes_per_sync(options.wal_bytes_per_sync),
......@@ -403,6 +405,10 @@ void DBOptions::Dump(Logger* log) const {
" Options.compaction_readahead_size: %" ROCKSDB_PRIszt
"d",
compaction_readahead_size);
Header(log,
" Options.random_access_max_buffer_size: %" ROCKSDB_PRIszt
"d",
random_access_max_buffer_size);
Header(log, " Options.use_adaptive_mutex: %d",
use_adaptive_mutex);
Header(log, " Options.rate_limiter: %p",
......
......@@ -180,6 +180,9 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
{"compaction_readahead_size",
{offsetof(struct DBOptions, compaction_readahead_size), OptionType::kSizeT,
OptionVerificationType::kNormal}},
{"random_access_max_buffer_size",
{ offsetof(struct DBOptions, random_access_max_buffer_size), OptionType::kSizeT,
OptionVerificationType::kNormal}},
{"use_adaptive_mutex",
{offsetof(struct DBOptions, use_adaptive_mutex), OptionType::kBoolean,
OptionVerificationType::kNormal}},
......
......@@ -339,6 +339,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
{"use_adaptive_mutex", "false"},
{"new_table_reader_for_compaction_inputs", "true"},
{"compaction_readahead_size", "100"},
{"random_access_max_buffer_size", "3145728" },
{"bytes_per_sync", "47"},
{"wal_bytes_per_sync", "48"},
};
......@@ -449,6 +450,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_db_opt.use_adaptive_mutex, false);
ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true);
ASSERT_EQ(new_db_opt.compaction_readahead_size, 100);
ASSERT_EQ(new_db_opt.random_access_max_buffer_size, 3145728);
ASSERT_EQ(new_db_opt.bytes_per_sync, static_cast<uint64_t>(47));
ASSERT_EQ(new_db_opt.wal_bytes_per_sync, static_cast<uint64_t>(48));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册