提交 24a24f01 编写于 作者: Y Yi Wu

Enable configurable readahead for iterators

Summary:
Add an option `iterator_readahead_size` to `ReadOptions` to enable
configurable readahead for iterators similar to the corresponding
option for compaction.

Test Plan:
```
make commit_prereq
```

Reviewers: kumar.rangarajan, ott, igor, sdong

Reviewed By: sdong

Subscribers: yiwu, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D55419
上级 ff4b3fb5
......@@ -5,6 +5,8 @@
* Delete deprecated classes for creating backups (BackupableDB) and restoring from backups (RestoreBackupableDB). Now, BackupEngine should be used for creating backups, and BackupEngineReadOnly should be used for restorations. For more details, see https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F
* Expose estimate of per-level compression ratio via DB property: "rocksdb.compression-ratio-at-levelN".
* Added EventListener::OnTableFileCreationStarted. EventListener::OnTableFileCreated will be called on failure case. User can check creation status via TableFileCreationInfo::status.
### New Features
* Add ReadOptions::readahead_size. If non-zero, NewIterator will create a new table reader which performs reads of the given size.
## 4.7.0 (4/8/2016)
### Public API Change
......
......@@ -9,6 +9,7 @@
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "rocksdb/iostats_context.h"
#include "rocksdb/perf_context.h"
namespace rocksdb {
......@@ -1525,6 +1526,75 @@ TEST_F(DBIteratorTest, IteratorWithLocalStatistics) {
ASSERT_EQ(TestGetTickerCount(options, ITER_BYTES_READ), total_bytes);
}
TEST_F(DBIteratorTest, ReadAhead) {
Options options;
auto env = new SpecialEnv(Env::Default());
env->count_random_reads_ = true;
options.env = env;
options.disable_auto_compactions = true;
options.write_buffer_size = 4 << 20;
options.statistics = rocksdb::CreateDBStatistics();
BlockBasedTableOptions table_options;
table_options.block_size = 1024;
table_options.no_block_cache = true;
options.table_factory.reset(new BlockBasedTableFactory(table_options));
Reopen(options);
std::string value(1024, 'a');
for (int i = 0; i < 100; i++) {
Put(Key(i), value);
}
ASSERT_OK(Flush());
MoveFilesToLevel(2);
for (int i = 0; i < 100; i++) {
Put(Key(i), value);
}
ASSERT_OK(Flush());
MoveFilesToLevel(1);
for (int i = 0; i < 100; i++) {
Put(Key(i), value);
}
ASSERT_OK(Flush());
ASSERT_EQ("1,1,1", FilesPerLevel());
env->random_read_bytes_counter_ = 0;
options.statistics->setTickerCount(NO_FILE_OPENS, 0);
ReadOptions read_options;
auto* iter = db_->NewIterator(read_options);
iter->SeekToFirst();
int64_t num_file_opens = TestGetTickerCount(options, NO_FILE_OPENS);
int64_t bytes_read = env->random_read_bytes_counter_;
delete iter;
env->random_read_bytes_counter_ = 0;
options.statistics->setTickerCount(NO_FILE_OPENS, 0);
read_options.readahead_size = 1024 * 10;
iter = db_->NewIterator(read_options);
iter->SeekToFirst();
int64_t num_file_opens_readahead = TestGetTickerCount(options, NO_FILE_OPENS);
int64_t bytes_read_readahead = env->random_read_bytes_counter_;
delete iter;
ASSERT_EQ(num_file_opens + 3, num_file_opens_readahead);
ASSERT_GT(bytes_read_readahead, bytes_read);
ASSERT_GT(bytes_read_readahead, read_options.readahead_size * 3);
// Verify correctness.
iter = db_->NewIterator(read_options);
int count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_EQ(value, iter->value());
count++;
}
ASSERT_EQ(100, count);
for (int i = 0; i < 100; i++) {
iter->Seek(Key(i));
ASSERT_EQ(value, iter->value());
}
delete iter;
}
} // namespace rocksdb
int main(int argc, char** argv) {
......
......@@ -358,23 +358,30 @@ class SpecialEnv : public EnvWrapper {
class CountingFile : public RandomAccessFile {
public:
CountingFile(unique_ptr<RandomAccessFile>&& target,
anon::AtomicCounter* counter)
: target_(std::move(target)), counter_(counter) {}
anon::AtomicCounter* counter,
std::atomic<int64_t>* bytes_read)
: target_(std::move(target)),
counter_(counter),
bytes_read_(bytes_read) {}
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
counter_->Increment();
return target_->Read(offset, n, result, scratch);
Status s = target_->Read(offset, n, result, scratch);
*bytes_read_ += result->size();
return s;
}
private:
unique_ptr<RandomAccessFile> target_;
anon::AtomicCounter* counter_;
std::atomic<int64_t>* bytes_read_;
};
Status s = target()->NewRandomAccessFile(f, r, soptions);
random_file_open_counter_++;
if (s.ok() && count_random_reads_) {
r->reset(new CountingFile(std::move(*r), &random_read_counter_));
r->reset(new CountingFile(std::move(*r), &random_read_counter_,
&random_read_bytes_counter_));
}
return s;
}
......@@ -464,6 +471,7 @@ class SpecialEnv : public EnvWrapper {
bool count_random_reads_;
anon::AtomicCounter random_read_counter_;
std::atomic<int64_t> random_read_bytes_counter_;
std::atomic<int> random_file_open_counter_;
bool count_sequential_reads_;
......
......@@ -87,15 +87,16 @@ void TableCache::ReleaseHandle(Cache::Handle* handle) {
Status TableCache::GetTableReader(
const EnvOptions& env_options,
const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist,
unique_ptr<TableReader>* table_reader, bool skip_filters, int level) {
bool sequential_mode, size_t readahead, bool record_read_stats,
HistogramImpl* file_read_hist, unique_ptr<TableReader>* table_reader,
bool skip_filters, int level) {
std::string fname =
TableFileName(ioptions_.db_paths, fd.GetNumber(), fd.GetPathId());
unique_ptr<RandomAccessFile> file;
Status s = ioptions_.env->NewRandomAccessFile(fname, &file, env_options);
if (sequential_mode && ioptions_.compaction_readahead_size > 0) {
file = NewReadaheadRandomAccessFile(std::move(file),
ioptions_.compaction_readahead_size);
if (readahead > 0) {
file = NewReadaheadRandomAccessFile(std::move(file), readahead);
}
RecordTick(ioptions_.statistics, NO_FILE_OPENS);
if (s.ok()) {
......@@ -143,8 +144,9 @@ Status TableCache::FindTable(const EnvOptions& env_options,
}
unique_ptr<TableReader> table_reader;
s = GetTableReader(env_options, internal_comparator, fd,
false /* sequential mode */, record_read_stats,
file_read_hist, &table_reader, skip_filters, level);
false /* sequential mode */, 0 /* readahead */,
record_read_stats, file_read_hist, &table_reader,
skip_filters, level);
if (!s.ok()) {
assert(table_reader == nullptr);
RecordTick(ioptions_.statistics, NO_FILE_ERRORS);
......@@ -175,13 +177,24 @@ InternalIterator* TableCache::NewIterator(
TableReader* table_reader = nullptr;
Cache::Handle* handle = nullptr;
bool create_new_table_reader =
(for_compaction && ioptions_.new_table_reader_for_compaction_inputs);
size_t readahead = 0;
bool create_new_table_reader = false;
if (for_compaction) {
if (ioptions_.new_table_reader_for_compaction_inputs) {
readahead = ioptions_.compaction_readahead_size;
create_new_table_reader = true;
}
} else {
readahead = options.readahead_size;
create_new_table_reader = readahead > 0;
}
if (create_new_table_reader) {
unique_ptr<TableReader> table_reader_unique_ptr;
Status s = GetTableReader(
env_options, icomparator, fd, /* sequential mode */ true,
/* record stats */ false, nullptr, &table_reader_unique_ptr,
env_options, icomparator, fd, true /* sequential_mode */, readahead,
!for_compaction /* record stats */, nullptr, &table_reader_unique_ptr,
false /* skip_filters */, level);
if (!s.ok()) {
return NewErrorInternalIterator(s, arena);
......
......@@ -111,7 +111,8 @@ class TableCache {
Status GetTableReader(const EnvOptions& env_options,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& fd, bool sequential_mode,
bool record_read_stats, HistogramImpl* file_read_hist,
size_t readahead, bool record_read_stats,
HistogramImpl* file_read_hist,
unique_ptr<TableReader>* table_reader,
bool skip_filters = false, int level = -1);
......
......@@ -1466,6 +1466,12 @@ struct ReadOptions {
// Default: false
bool pin_data;
// If non-zero, NewIterator will create a new table reader which
// performs reads of the given size. Using a large size (> 2MB) can
// improve the performance of forward iteration on spinning disks.
// Default: 0
size_t readahead_size;
ReadOptions();
ReadOptions(bool cksum, bool cache);
};
......
......@@ -794,7 +794,8 @@ ReadOptions::ReadOptions()
managed(false),
total_order_seek(false),
prefix_same_as_start(false),
pin_data(false) {
pin_data(false),
readahead_size(0) {
XFUNC_TEST("", "managed_options", managed_options, xf_manage_options,
reinterpret_cast<ReadOptions*>(this));
}
......@@ -809,7 +810,8 @@ ReadOptions::ReadOptions(bool cksum, bool cache)
managed(false),
total_order_seek(false),
prefix_same_as_start(false),
pin_data(false) {
pin_data(false),
readahead_size(0) {
XFUNC_TEST("", "managed_options", managed_options, xf_manage_options,
reinterpret_cast<ReadOptions*>(this));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册