提交 57402867 编写于 作者: L Lei Jin

dynamic max_sequential_skip_in_iterations

Summary:
This is not a critical options. Making it dynamic so that we can remove
more reference to cfd->options()

Test Plan: unit test

Reviewers: yhchiang, sdong, igor

Reviewed By: igor

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D24957
上级 4b1786e9
......@@ -166,7 +166,7 @@ class ColumnFamilyData {
bool IsDropped() const { return dropped_; }
// thread-safe
int NumberLevels() const { return options_.num_levels; }
int NumberLevels() const { return ioptions_.num_levels; }
void SetLogNumber(uint64_t log_number) { log_number_ = log_number; }
uint64_t GetLogNumber() const { return log_number_; }
......
......@@ -3828,16 +3828,16 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
// not supported in lite version
return nullptr;
#else
auto iter = new ForwardIterator(this, read_options, cfd);
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
auto iter = new ForwardIterator(this, read_options, cfd, sv);
return NewDBIterator(env_, *cfd->ioptions(), cfd->user_comparator(), iter,
kMaxSequenceNumber,
cfd->options()->max_sequential_skip_in_iterations,
read_options.iterate_upper_bound);
kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
read_options.iterate_upper_bound);
#endif
} else {
SequenceNumber latest_snapshot = versions_->LastSequence();
SuperVersion* sv = nullptr;
sv = cfd->GetReferencedSuperVersion(&mutex_);
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
auto snapshot =
read_options.snapshot != nullptr
......@@ -3889,7 +3889,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
// that they are likely to be in the same cache line and/or page.
ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
env_, *cfd->ioptions(), cfd->user_comparator(),
snapshot, cfd->options()->max_sequential_skip_in_iterations,
snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations,
read_options.iterate_upper_bound);
Iterator* internal_iter =
......@@ -3908,19 +3908,6 @@ Status DBImpl::NewIterators(
std::vector<Iterator*>* iterators) {
iterators->clear();
iterators->reserve(column_families.size());
SequenceNumber latest_snapshot = 0;
std::vector<SuperVersion*> super_versions;
super_versions.reserve(column_families.size());
if (!read_options.tailing) {
mutex_.Lock();
latest_snapshot = versions_->LastSequence();
for (auto cfh : column_families) {
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
super_versions.push_back(cfd->GetSuperVersion()->Ref());
}
mutex_.Unlock();
}
if (read_options.tailing) {
#ifdef ROCKSDB_LITE
......@@ -3929,17 +3916,21 @@ Status DBImpl::NewIterators(
#else
for (auto cfh : column_families) {
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
auto iter = new ForwardIterator(this, read_options, cfd);
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
auto iter = new ForwardIterator(this, read_options, cfd, sv);
iterators->push_back(
NewDBIterator(env_, *cfd->ioptions(), cfd->user_comparator(), iter,
kMaxSequenceNumber,
cfd->options()->max_sequential_skip_in_iterations));
kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations));
}
#endif
} else {
SequenceNumber latest_snapshot = versions_->LastSequence();
for (size_t i = 0; i < column_families.size(); ++i) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_families[i]);
auto cfd = cfh->cfd();
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
column_families[i])->cfd();
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
auto snapshot =
read_options.snapshot != nullptr
......@@ -3949,9 +3940,9 @@ Status DBImpl::NewIterators(
ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
env_, *cfd->ioptions(), cfd->user_comparator(), snapshot,
cfd->options()->max_sequential_skip_in_iterations);
sv->mutable_cf_options.max_sequential_skip_in_iterations);
Iterator* internal_iter = NewInternalIterator(
read_options, cfd, super_versions[i], db_iter->GetArena());
read_options, cfd, sv, db_iter->GetArena());
db_iter->SetIterUnderDBIter(internal_iter);
iterators->push_back(db_iter);
}
......
......@@ -53,7 +53,7 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
? reinterpret_cast<const SnapshotImpl*>(
read_options.snapshot)->number_
: latest_snapshot),
cfd->options()->max_sequential_skip_in_iterations);
super_version->mutable_cf_options.max_sequential_skip_in_iterations);
auto internal_iter = NewInternalIterator(
read_options, cfd, super_version, db_iter->GetArena());
db_iter->SetIterUnderDBIter(internal_iter);
......@@ -72,16 +72,17 @@ Status DBImplReadOnly::NewIterators(
SequenceNumber latest_snapshot = versions_->LastSequence();
for (auto cfh : column_families) {
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
auto db_iter = NewArenaWrappedDbIterator(
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
auto* sv = cfd->GetSuperVersion()->Ref();
auto* db_iter = NewArenaWrappedDbIterator(
env_, *cfd->ioptions(), cfd->user_comparator(),
(read_options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(
read_options.snapshot)->number_
: latest_snapshot),
cfd->options()->max_sequential_skip_in_iterations);
auto internal_iter = NewInternalIterator(
read_options, cfd, cfd->GetSuperVersion()->Ref(), db_iter->GetArena());
sv->mutable_cf_options.max_sequential_skip_in_iterations);
auto* internal_iter = NewInternalIterator(
read_options, cfd, sv, db_iter->GetArena());
db_iter->SetIterUnderDBIter(internal_iter);
iterators->push_back(db_iter);
}
......
......@@ -8889,6 +8889,56 @@ TEST(DBTest, DynamicCompactionOptions) {
ASSERT_TRUE(Put(Key(count), RandomString(&rnd, 1024), wo).ok());
}
TEST(DBTest, DynamicMiscOptions) {
// Test max_sequential_skip_in_iterations
Options options;
options.env = env_;
options.create_if_missing = true;
options.max_sequential_skip_in_iterations = 16;
options.compression = kNoCompression;
options.statistics = rocksdb::CreateDBStatistics();
DestroyAndReopen(&options);
auto assert_reseek_count = [this, &options](int key_start, int num_reseek) {
int key0 = key_start;
int key1 = key_start + 1;
int key2 = key_start + 2;
Random rnd(301);
ASSERT_OK(Put(Key(key0), RandomString(&rnd, 8)));
for (int i = 0; i < 10; ++i) {
ASSERT_OK(Put(Key(key1), RandomString(&rnd, 8)));
}
ASSERT_OK(Put(Key(key2), RandomString(&rnd, 8)));
std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
iter->Seek(Key(key1));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Key(key1)), 0);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Key(key2)), 0);
ASSERT_EQ(num_reseek,
TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION));
};
// No reseek
assert_reseek_count(100, 0);
ASSERT_TRUE(dbfull()->SetOptions({
{"max_sequential_skip_in_iterations", "4"}
}));
// Clear memtable and make new option effective
dbfull()->TEST_FlushMemTable(true);
// Trigger reseek
assert_reseek_count(200, 1);
ASSERT_TRUE(dbfull()->SetOptions({
{"max_sequential_skip_in_iterations", "16"}
}));
// Clear memtable and make new option effective
dbfull()->TEST_FlushMemTable(true);
// No reseek
assert_reseek_count(300, 1);
}
} // namespace rocksdb
int main(int argc, char** argv) {
......
......@@ -114,25 +114,29 @@ class LevelIterator : public Iterator {
};
ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options,
ColumnFamilyData* cfd)
ColumnFamilyData* cfd, SuperVersion* current_sv)
: db_(db),
read_options_(read_options),
cfd_(cfd),
prefix_extractor_(cfd->options()->prefix_extractor.get()),
user_comparator_(cfd->user_comparator()),
immutable_min_heap_(MinIterComparator(&cfd_->internal_comparator())),
sv_(nullptr),
sv_(current_sv),
mutable_iter_(nullptr),
current_(nullptr),
valid_(false),
is_prev_set_(false),
is_prev_inclusive_(false) {}
is_prev_inclusive_(false) {
if (sv_) {
RebuildIterators(false);
}
}
ForwardIterator::~ForwardIterator() {
Cleanup();
Cleanup(true);
}
void ForwardIterator::Cleanup() {
void ForwardIterator::Cleanup(bool release_sv) {
if (mutable_iter_ != nullptr) {
mutable_iter_->~Iterator();
}
......@@ -149,15 +153,17 @@ void ForwardIterator::Cleanup() {
}
level_iters_.clear();
if (sv_ != nullptr && sv_->Unref()) {
DBImpl::DeletionState deletion_state;
db_->mutex_.Lock();
sv_->Cleanup();
db_->FindObsoleteFiles(deletion_state, false, true);
db_->mutex_.Unlock();
delete sv_;
if (deletion_state.HaveSomethingToDelete()) {
db_->PurgeObsoleteFiles(deletion_state);
if (release_sv) {
if (sv_ != nullptr && sv_->Unref()) {
DBImpl::DeletionState deletion_state;
db_->mutex_.Lock();
sv_->Cleanup();
db_->FindObsoleteFiles(deletion_state, false, true);
db_->mutex_.Unlock();
delete sv_;
if (deletion_state.HaveSomethingToDelete()) {
db_->PurgeObsoleteFiles(deletion_state);
}
}
}
}
......@@ -169,7 +175,7 @@ bool ForwardIterator::Valid() const {
void ForwardIterator::SeekToFirst() {
if (sv_ == nullptr ||
sv_ ->version_number != cfd_->GetSuperVersionNumber()) {
RebuildIterators();
RebuildIterators(true);
} else if (status_.IsIncomplete()) {
ResetIncompleteIterators();
}
......@@ -179,7 +185,7 @@ void ForwardIterator::SeekToFirst() {
void ForwardIterator::Seek(const Slice& internal_key) {
if (sv_ == nullptr ||
sv_ ->version_number != cfd_->GetSuperVersionNumber()) {
RebuildIterators();
RebuildIterators(true);
} else if (status_.IsIncomplete()) {
ResetIncompleteIterators();
}
......@@ -188,6 +194,7 @@ void ForwardIterator::Seek(const Slice& internal_key) {
void ForwardIterator::SeekInternal(const Slice& internal_key,
bool seek_to_first) {
assert(mutable_iter_);
// mutable
seek_to_first ? mutable_iter_->SeekToFirst() :
mutable_iter_->Seek(internal_key);
......@@ -338,7 +345,7 @@ void ForwardIterator::Next() {
std::string current_key = key().ToString();
Slice old_key(current_key.data(), current_key.size());
RebuildIterators();
RebuildIterators(true);
SeekInternal(old_key, false);
if (!valid_ || key().compare(old_key) != 0) {
return;
......@@ -412,11 +419,13 @@ Status ForwardIterator::status() const {
return Status::OK();
}
void ForwardIterator::RebuildIterators() {
void ForwardIterator::RebuildIterators(bool refresh_sv) {
// Clean up
Cleanup();
// New
sv_ = cfd_->GetReferencedSuperVersion(&(db_->mutex_));
Cleanup(refresh_sv);
if (refresh_sv) {
// New
sv_ = cfd_->GetReferencedSuperVersion(&(db_->mutex_));
}
mutable_iter_ = sv_->mem->NewIterator(read_options_, &arena_);
sv_->imm->AddIterators(read_options_, &imm_iters_, &arena_);
const auto& l0_files = sv_->current->files_[0];
......
......@@ -51,7 +51,7 @@ typedef std::priority_queue<Iterator*,
class ForwardIterator : public Iterator {
public:
ForwardIterator(DBImpl* db, const ReadOptions& read_options,
ColumnFamilyData* cfd);
ColumnFamilyData* cfd, SuperVersion* current_sv = nullptr);
virtual ~ForwardIterator();
void SeekToLast() override {
......@@ -72,8 +72,8 @@ class ForwardIterator : public Iterator {
virtual Status status() const override;
private:
void Cleanup();
void RebuildIterators();
void Cleanup(bool release_sv);
void RebuildIterators(bool refresh_sv);
void ResetIncompleteIterators();
void SeekInternal(const Slice& internal_key, bool seek_to_first);
void UpdateCurrent();
......
......@@ -37,7 +37,9 @@ struct MutableCFOptions {
max_bytes_for_level_base(options.max_bytes_for_level_base),
max_bytes_for_level_multiplier(options.max_bytes_for_level_multiplier),
max_bytes_for_level_multiplier_additional(
options.max_bytes_for_level_multiplier_additional)
options.max_bytes_for_level_multiplier_additional),
max_sequential_skip_in_iterations(
options.max_sequential_skip_in_iterations)
{
RefreshDerivedOptions(ioptions);
}
......@@ -62,7 +64,8 @@ struct MutableCFOptions {
target_file_size_base(0),
target_file_size_multiplier(0),
max_bytes_for_level_base(0),
max_bytes_for_level_multiplier(0)
max_bytes_for_level_multiplier(0),
max_sequential_skip_in_iterations(0)
{}
// Must be called after any change to MutableCFOptions
......@@ -106,6 +109,9 @@ struct MutableCFOptions {
int max_bytes_for_level_multiplier;
std::vector<int> max_bytes_for_level_multiplier_additional;
// Misc options
uint64_t max_sequential_skip_in_iterations;
// Derived options
// Per-level target file size.
std::vector<uint64_t> max_file_size;
......
......@@ -150,6 +150,17 @@ bool ParseCompactionOptions(const std::string& name, const std::string& value,
return true;
}
template<typename OptionsType>
bool ParseMiscOptions(const std::string& name, const std::string& value,
OptionsType* new_options) {
if (name == "max_sequential_skip_in_iterations") {
new_options->max_sequential_skip_in_iterations = ParseUint64(value);
} else {
return false;
}
return true;
}
bool GetMutableOptionsFromStrings(
const MutableCFOptions& base_options,
const std::unordered_map<std::string, std::string>& options_map,
......@@ -160,6 +171,7 @@ bool GetMutableOptionsFromStrings(
for (const auto& o : options_map) {
if (ParseMemtableOptions(o.first, o.second, new_options)) {
} else if (ParseCompactionOptions(o.first, o.second, new_options)) {
} else if (ParseMiscOptions(o.first, o.second, new_options)) {
} else {
return false;
}
......@@ -228,6 +240,7 @@ bool GetColumnFamilyOptionsFromMap(
try {
if (ParseMemtableOptions(o.first, o.second, new_options)) {
} else if (ParseCompactionOptions(o.first, o.second, new_options)) {
} else if (ParseMiscOptions(o.first, o.second, new_options)) {
} else if (o.first == "min_write_buffer_number_to_merge") {
new_options->min_write_buffer_number_to_merge = ParseInt(o.second);
} else if (o.first == "compression") {
......@@ -286,8 +299,6 @@ bool GetColumnFamilyOptionsFromMap(
} else if (o.first == "compaction_options_fifo") {
new_options->compaction_options_fifo.max_table_files_size
= ParseUint64(o.second);
} else if (o.first == "max_sequential_skip_in_iterations") {
new_options->max_sequential_skip_in_iterations = ParseUint64(o.second);
} else if (o.first == "inplace_update_support") {
new_options->inplace_update_support = ParseBoolean(o.first, o.second);
} else if (o.first == "inplace_update_num_locks") {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册