提交 5f4166c9 编写于 作者: I Igor Canadi

ReadaheadRandomAccessFile -- userspace readahead

Summary:
ReadaheadRandomAccessFile acts as a transparent layer on top of RandomAccessFile. When a Read() request is issued, it issues a much bigger request to the OS and caches the result. When a new request comes in and we already have the data cached, it doesn't have to issue any requests to the OS.

We add ReadaheadRandomAccessFile layer only when file is read during compactions.

D45105 was incorrectly closed by Phabricator because I committed it to a separate branch (not master), so I'm resubmitting the diff.

Test Plan: make check

Reviewers: MarkCallaghan, sdong

Reviewed By: sdong

Subscribers: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D45123
上级 16ebe3a2
......@@ -357,6 +357,11 @@ DEFINE_int32(open_files, rocksdb::Options().max_open_files,
"Maximum number of files to keep open at the same time"
" (use default if == 0)");
DEFINE_int32(new_table_reader_for_compaction_inputs, true,
"If true, uses a separate file handle for compaction inputs");
DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size");
DEFINE_int32(bloom_bits, -1, "Bloom filter bits per key. Negative means"
" use default settings.");
DEFINE_int32(memtable_bloom_bits, 0, "Bloom filter bits per key for memtable. "
......@@ -2191,6 +2196,9 @@ class Benchmark {
options.memtable_prefix_bloom_bits = FLAGS_memtable_bloom_bits;
options.bloom_locality = FLAGS_bloom_locality;
options.max_open_files = FLAGS_open_files;
options.new_table_reader_for_compaction_inputs =
FLAGS_new_table_reader_for_compaction_inputs;
options.compaction_readahead_size = FLAGS_compaction_readahead_size;
options.statistics = dbstats;
if (FLAGS_enable_io_prio) {
FLAGS_env->LowerThreadPoolIOPriority(Env::LOW);
......
......@@ -156,6 +156,10 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
}
if (result.compaction_readahead_size > 0) {
result.new_table_reader_for_compaction_inputs = true;
}
return result;
}
......
......@@ -85,15 +85,19 @@ void TableCache::ReleaseHandle(Cache::Handle* handle) {
Status TableCache::GetTableReader(
const EnvOptions& env_options,
const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
bool advise_random_on_open, bool record_read_stats,
HistogramImpl* file_read_hist, unique_ptr<TableReader>* table_reader) {
bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist,
unique_ptr<TableReader>* table_reader) {
std::string fname =
TableFileName(ioptions_.db_paths, fd.GetNumber(), fd.GetPathId());
unique_ptr<RandomAccessFile> file;
Status s = ioptions_.env->NewRandomAccessFile(fname, &file, env_options);
if (sequential_mode && ioptions_.compaction_readahead_size > 0) {
file = NewReadaheadRandomAccessFile(std::move(file),
ioptions_.compaction_readahead_size);
}
RecordTick(ioptions_.statistics, NO_FILE_OPENS);
if (s.ok()) {
if (advise_random_on_open) {
if (!sequential_mode && ioptions_.advise_random_on_open) {
file->Hint(RandomAccessFile::RANDOM);
}
StopWatch sw(ioptions_.env, ioptions_.statistics, TABLE_OPEN_IO_MICROS);
......@@ -128,7 +132,7 @@ Status TableCache::FindTable(const EnvOptions& env_options,
}
unique_ptr<TableReader> table_reader;
s = GetTableReader(env_options, internal_comparator, fd,
ioptions_.advise_random_on_open, record_read_stats,
false /* sequential mode */, record_read_stats,
file_read_hist, &table_reader);
if (!s.ok()) {
assert(table_reader == nullptr);
......@@ -162,8 +166,9 @@ Iterator* TableCache::NewIterator(const ReadOptions& options,
(for_compaction && ioptions_.new_table_reader_for_compaction_inputs);
if (create_new_table_reader) {
unique_ptr<TableReader> table_reader_unique_ptr;
Status s = GetTableReader(env_options, icomparator, fd, false, false,
nullptr, &table_reader_unique_ptr);
Status s = GetTableReader(
env_options, icomparator, fd, /* sequential mode */ true,
/* record stats */ false, nullptr, &table_reader_unique_ptr);
if (!s.ok()) {
return NewErrorIterator(s, arena);
}
......
......@@ -61,13 +61,6 @@ class TableCache {
// Evict any entry for the specified file number
static void Evict(Cache* cache, uint64_t file_number);
// Build a table reader
Status GetTableReader(const EnvOptions& env_options,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& fd, bool advise_random_on_open,
bool record_read_stats, HistogramImpl* file_read_hist,
unique_ptr<TableReader>* table_reader);
// Find table reader
Status FindTable(const EnvOptions& toptions,
const InternalKeyComparator& internal_comparator,
......@@ -101,6 +94,13 @@ class TableCache {
void ReleaseHandle(Cache::Handle* handle);
private:
// Build a table reader
Status GetTableReader(const EnvOptions& env_options,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& fd, bool sequential_mode,
bool record_read_stats, HistogramImpl* file_read_hist,
unique_ptr<TableReader>* table_reader);
const ImmutableCFOptions& ioptions_;
const EnvOptions& env_options_;
Cache* const cache_;
......
......@@ -91,6 +91,8 @@ struct ImmutableCFOptions {
bool new_table_reader_for_compaction_inputs;
size_t compaction_readahead_size;
int num_levels;
bool optimize_filters_for_hits;
......
......@@ -1038,6 +1038,16 @@ struct DBOptions {
// Default: false
bool new_table_reader_for_compaction_inputs;
// If non-zero, we perform bigger reads when doing compaction. If you're
// running RocksDB on spinning disks, you should set this to at least 2MB.
// That way RocksDB's compaction is doing sequential instead of random reads.
//
// When non-zero, we also force new_table_reader_for_compaction_inputs to
// true.
//
// Default: 0
size_t compaction_readahead_size;
// Use adaptive mutex, which spins in the user space before resorting
// to kernel. This could reduce context switch when the mutex is not
// heavily contended. However, if the mutex is hot, we could end up
......
......@@ -231,6 +231,7 @@ Options DBTestBase::CurrentOptions(
case kFullFilterWithNewTableReaderForCompactions:
table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
options.new_table_reader_for_compaction_inputs = true;
options.compaction_readahead_size = 10 * 1024 * 1024;
break;
case kUncompressed:
options.compression = kNoCompression;
......
......@@ -10,6 +10,8 @@
#include "util/file_reader_writer.h"
#include <algorithm>
#include <mutex>
#include "port/port.h"
#include "util/histogram.h"
#include "util/iostats_context_imp.h"
......@@ -222,4 +224,86 @@ size_t WritableFileWriter::RequestToken(size_t bytes) {
}
return bytes;
}
namespace {
class ReadaheadRandomAccessFile : public RandomAccessFile {
public:
ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile> file,
size_t readahead_size)
: file_(std::move(file)),
readahead_size_(readahead_size),
buffer_(new char[readahead_size_]),
buffer_offset_(0),
buffer_len_(0) {}
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
if (n >= readahead_size_) {
return file_->Read(offset, n, result, scratch);
}
std::unique_lock<std::mutex> lk(lock_);
size_t copied = 0;
// if offset between [buffer_offset_, buffer_offset_ + buffer_len>
if (offset >= buffer_offset_ && offset < buffer_len_ + buffer_offset_) {
uint64_t offset_in_buffer = offset - buffer_offset_;
copied = std::min(static_cast<uint64_t>(buffer_len_) - offset_in_buffer,
static_cast<uint64_t>(n));
memcpy(scratch, buffer_.get() + offset_in_buffer, copied);
if (copied == n) {
// fully cached
*result = Slice(scratch, n);
return Status::OK();
}
}
Slice readahead_result;
Status s = file_->Read(offset + copied, readahead_size_, &readahead_result,
buffer_.get());
if (!s.ok()) {
return s;
}
auto left_to_copy = std::min(readahead_result.size(), n - copied);
memcpy(scratch + copied, readahead_result.data(), left_to_copy);
*result = Slice(scratch, copied + left_to_copy);
if (readahead_result.data() == buffer_.get()) {
buffer_offset_ = offset + copied;
buffer_len_ = readahead_result.size();
} else {
buffer_len_ = 0;
}
return Status::OK();
}
virtual size_t GetUniqueId(char* id, size_t max_size) const override {
return file_->GetUniqueId(id, max_size);
}
virtual void Hint(AccessPattern pattern) override { file_->Hint(pattern); }
virtual Status InvalidateCache(size_t offset, size_t length) override {
return file_->InvalidateCache(offset, length);
}
private:
std::unique_ptr<RandomAccessFile> file_;
size_t readahead_size_;
mutable std::mutex lock_;
mutable std::unique_ptr<char[]> buffer_;
mutable uint64_t buffer_offset_;
mutable size_t buffer_len_;
};
} // namespace
std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile(
std::unique_ptr<RandomAccessFile> file, size_t readahead_size) {
std::unique_ptr<ReadaheadRandomAccessFile> wrapped_file(
new ReadaheadRandomAccessFile(std::move(file), readahead_size));
return std::move(wrapped_file);
}
} // namespace rocksdb
......@@ -14,6 +14,9 @@ namespace rocksdb {
class Statistics;
class HistogramImpl;
std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile(
std::unique_ptr<RandomAccessFile> file, size_t readahead_size);
class SequentialFileReader {
private:
std::unique_ptr<SequentialFile> file_;
......
......@@ -71,6 +71,7 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options)
access_hint_on_compaction_start(options.access_hint_on_compaction_start),
new_table_reader_for_compaction_inputs(
options.new_table_reader_for_compaction_inputs),
compaction_readahead_size(options.compaction_readahead_size),
num_levels(options.num_levels),
optimize_filters_for_hits(options.optimize_filters_for_hits),
listeners(options.listeners),
......@@ -241,6 +242,7 @@ DBOptions::DBOptions()
db_write_buffer_size(0),
access_hint_on_compaction_start(NORMAL),
new_table_reader_for_compaction_inputs(false),
compaction_readahead_size(0),
use_adaptive_mutex(false),
bytes_per_sync(0),
wal_bytes_per_sync(0),
......@@ -294,6 +296,7 @@ DBOptions::DBOptions(const Options& options)
access_hint_on_compaction_start(options.access_hint_on_compaction_start),
new_table_reader_for_compaction_inputs(
options.new_table_reader_for_compaction_inputs),
compaction_readahead_size(options.compaction_readahead_size),
use_adaptive_mutex(options.use_adaptive_mutex),
bytes_per_sync(options.bytes_per_sync),
wal_bytes_per_sync(options.wal_bytes_per_sync),
......@@ -371,6 +374,10 @@ void DBOptions::Dump(Logger* log) const {
access_hints[access_hint_on_compaction_start]);
Warn(log, " Options.new_table_reader_for_compaction_inputs: %d",
new_table_reader_for_compaction_inputs);
Warn(log,
" Options.compaction_readahead_size: %" ROCKSDB_PRIszt
"d",
compaction_readahead_size);
Warn(log, " Options.use_adaptive_mutex: %d",
use_adaptive_mutex);
Warn(log, " Options.rate_limiter: %p",
......
......@@ -85,6 +85,9 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
{"new_table_reader_for_compaction_inputs",
{offsetof(struct DBOptions, new_table_reader_for_compaction_inputs),
OptionType::kBoolean}},
{"compaction_readahead_size",
{offsetof(struct DBOptions, compaction_readahead_size),
OptionType::kSizeT}},
{"use_adaptive_mutex",
{offsetof(struct DBOptions, use_adaptive_mutex), OptionType::kBoolean}},
{"use_fsync",
......
......@@ -172,9 +172,9 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
{"advise_random_on_open", "true"},
{"use_adaptive_mutex", "false"},
{"new_table_reader_for_compaction_inputs", "true"},
{"compaction_readahead_size", "100"},
{"bytes_per_sync", "47"},
{"wal_bytes_per_sync", "48"},
};
{"wal_bytes_per_sync", "48"}, };
ColumnFamilyOptions base_cf_opt;
ColumnFamilyOptions new_cf_opt;
......@@ -279,6 +279,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_db_opt.advise_random_on_open, true);
ASSERT_EQ(new_db_opt.use_adaptive_mutex, false);
ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true);
ASSERT_EQ(new_db_opt.compaction_readahead_size, 100);
ASSERT_EQ(new_db_opt.bytes_per_sync, static_cast<uint64_t>(47));
ASSERT_EQ(new_db_opt.wal_bytes_per_sync, static_cast<uint64_t>(48));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册