提交 5b3b6549 编写于 作者: L Lei Jin

use super_version in NewIterator() and MultiGet() function

Summary:
Use super_version insider NewIterator to avoid Ref() each component
separately under mutex
The new added bench shows NewIterator QPS increases from 515K to 719K
No meaningful improvement for multiget I guess due to its relatively small
cost comparing to 90 keys fetch in the test.

Test Plan: unit test and db_bench

Reviewers: igor, sdong

Reviewed By: igor

CC: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D15609
上级 5c6ef561
......@@ -99,6 +99,7 @@ DEFINE_string(benchmarks,
"Must be used with merge_operator\n"
"\treadrandommergerandom -- perform N random read-or-merge "
"operations. Must be used with merge_operator\n"
"\tnewiterator -- repeated iterator creation\n"
"\tseekrandom -- N random seeks\n"
"\tcrc32c -- repeated crc32c of 4K of data\n"
"\tacquireload -- load N*1000 times\n"
......@@ -1089,6 +1090,8 @@ class Benchmark {
method = &Benchmark::ReadRandom;
} else if (name == Slice("readmissing")) {
method = &Benchmark::ReadMissing;
} else if (name == Slice("newiterator")) {
method = &Benchmark::IteratorCreation;
} else if (name == Slice("seekrandom")) {
method = &Benchmark::SeekRandom;
} else if (name == Slice("readhot")) {
......@@ -1877,6 +1880,16 @@ class Benchmark {
thread->stats.AddMessage(msg);
}
void IteratorCreation(ThreadState* thread) {
Duration duration(FLAGS_duration, reads_);
ReadOptions options(FLAGS_verify_checksum, true);
while (!duration.Done(1)) {
Iterator* iter = db_->NewIterator(options);
delete iter;
thread->stats.FinishedSingleOp(db_);
}
}
void SeekRandom(ThreadState* thread) {
Duration duration(FLAGS_duration, reads_);
ReadOptions options(FLAGS_verify_checksum, true);
......
......@@ -2668,34 +2668,29 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
namespace {
struct IterState {
IterState(DBImpl* db, port::Mutex* mu, DBImpl::SuperVersion* super_version)
: db(db), mu(mu), super_version(super_version) {}
DBImpl* db;
port::Mutex* mu;
Version* version = nullptr;
MemTable* mem = nullptr;
MemTableListVersion* imm = nullptr;
DBImpl *db;
DBImpl::SuperVersion* super_version;
};
static void CleanupIteratorState(void* arg1, void* arg2) {
IterState* state = reinterpret_cast<IterState*>(arg1);
DBImpl::DeletionState deletion_state(state->db->GetOptions().
max_write_buffer_number);
state->mu->Lock();
if (state->mem) { // not set for immutable iterator
MemTable* m = state->mem->Unref();
if (m != nullptr) {
deletion_state.memtables_to_free.push_back(m);
}
}
if (state->version) { // not set for memtable-only iterator
state->version->Unref();
}
if (state->imm) { // not set for memtable-only iterator
state->imm->Unref(&deletion_state.memtables_to_free);
bool need_cleanup = state->super_version->Unref();
if (need_cleanup) {
state->mu->Lock();
state->super_version->Cleanup();
state->db->FindObsoleteFiles(deletion_state, false, true);
state->mu->Unlock();
delete state->super_version;
state->db->PurgeObsoleteFiles(deletion_state);
}
// fast path FindObsoleteFiles
state->db->FindObsoleteFiles(deletion_state, false, true);
state->mu->Unlock();
state->db->PurgeObsoleteFiles(deletion_state);
delete state;
}
......@@ -2703,36 +2698,23 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
SequenceNumber* latest_snapshot) {
IterState* cleanup = new IterState;
MemTable* mutable_mem;
MemTableListVersion* immutable_mems;
Version* version;
// Collect together all needed child iterators for mem
mutex_.Lock();
*latest_snapshot = versions_->LastSequence();
mem_->Ref();
mutable_mem = mem_;
// Collect together all needed child iterators for imm_
immutable_mems = imm_.current();
immutable_mems->Ref();
versions_->current()->Ref();
version = versions_->current();
SuperVersion* super_version = super_version_->Ref();
mutex_.Unlock();
std::vector<Iterator*> iterator_list;
iterator_list.push_back(mutable_mem->NewIterator(options));
cleanup->mem = mutable_mem;
cleanup->imm = immutable_mems;
// Collect iterator for mutable mem
iterator_list.push_back(super_version->mem->NewIterator(options));
// Collect all needed child iterators for immutable memtables
immutable_mems->AddIterators(options, &iterator_list);
super_version->imm->AddIterators(options, &iterator_list);
// Collect iterators for files in L0 - Ln
version->AddIterators(options, storage_options_, &iterator_list);
super_version->current->AddIterators(options, storage_options_,
&iterator_list);
Iterator* internal_iter = NewMergingIterator(
&internal_comparator_, &iterator_list[0], iterator_list.size());
cleanup->version = version;
cleanup->mu = &mutex_;
cleanup->db = this;
IterState* cleanup = new IterState(this, &mutex_, super_version);
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
return internal_iter;
......@@ -2747,53 +2729,36 @@ std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
const ReadOptions& options,
uint64_t* superversion_number) {
MemTable* mutable_mem;
MemTableListVersion* immutable_mems;
Version* version;
// get all child iterators and bump their refcounts under lock
mutex_.Lock();
mutable_mem = mem_;
mutable_mem->Ref();
immutable_mems = imm_.current();
immutable_mems->Ref();
version = versions_->current();
version->Ref();
SuperVersion* super_version = super_version_->Ref();
if (superversion_number != nullptr) {
*superversion_number = CurrentVersionNumber();
}
mutex_.Unlock();
Iterator* mutable_iter = mutable_mem->NewIterator(options);
IterState* mutable_cleanup = new IterState();
mutable_cleanup->mem = mutable_mem;
mutable_cleanup->db = this;
mutable_cleanup->mu = &mutex_;
mutable_iter->RegisterCleanup(CleanupIteratorState, mutable_cleanup, nullptr);
Iterator* mutable_iter = super_version->mem->NewIterator(options);
// create a DBIter that only uses memtable content; see NewIterator()
mutable_iter = NewDBIterator(&dbname_, env_, options_, user_comparator(),
mutable_iter, kMaxSequenceNumber);
Iterator* immutable_iter;
IterState* immutable_cleanup = new IterState();
std::vector<Iterator*> list;
immutable_mems->AddIterators(options, &list);
immutable_cleanup->imm = immutable_mems;
version->AddIterators(options, storage_options_, &list);
immutable_cleanup->version = version;
immutable_cleanup->db = this;
immutable_cleanup->mu = &mutex_;
immutable_iter =
super_version->imm->AddIterators(options, &list);
super_version->current->AddIterators(options, storage_options_, &list);
Iterator* immutable_iter =
NewMergingIterator(&internal_comparator_, &list[0], list.size());
immutable_iter->RegisterCleanup(CleanupIteratorState, immutable_cleanup,
nullptr);
// create a DBIter that only uses memtable content; see NewIterator()
immutable_iter = NewDBIterator(&dbname_, env_, options_, user_comparator(),
immutable_iter, kMaxSequenceNumber);
// register cleanups
mutable_iter->RegisterCleanup(CleanupIteratorState,
new IterState(this, &mutex_, super_version), nullptr);
// bump the ref one more time since it will be Unref'ed twice
immutable_iter->RegisterCleanup(CleanupIteratorState,
new IterState(this, &mutex_, super_version->Ref()), nullptr);
return std::make_pair(mutable_iter, immutable_iter);
}
......@@ -2924,7 +2889,6 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
StopWatch sw(env_, options_.statistics.get(), DB_MULTIGET, false);
SequenceNumber snapshot;
std::vector<MemTable*> to_delete;
mutex_.Lock();
if (options.snapshot != nullptr) {
......@@ -2933,16 +2897,9 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
snapshot = versions_->LastSequence();
}
MemTable* mem = mem_;
MemTableListVersion* imm = imm_.current();
Version* current = versions_->current();
mem->Ref();
imm->Ref();
current->Ref();
// Unlock while reading from files and memtables
SuperVersion* get_version = super_version_->Ref();
mutex_.Unlock();
bool have_stat_update = false;
Version::GetStats stats;
......@@ -2967,12 +2924,14 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
std::string* value = &(*values)[i];
LookupKey lkey(keys[i], snapshot);
if (mem->Get(lkey, value, &s, merge_context, options_)) {
if (get_version->mem->Get(lkey, value, &s, merge_context, options_)) {
// Done
} else if (imm->Get(lkey, value, &s, merge_context, options_)) {
} else if (get_version->imm->Get(lkey, value, &s, merge_context,
options_)) {
// Done
} else {
current->Get(options, lkey, value, &s, &merge_context, &stats, options_);
get_version->current->Get(options, lkey, value, &s, &merge_context,
&stats, options_);
have_stat_update = true;
}
......@@ -2981,20 +2940,28 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
}
}
// Post processing (decrement reference counts and record statistics)
mutex_.Lock();
if (!options_.disable_seek_compaction &&
have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleFlushOrCompaction();
bool delete_get_version = false;
if (!options_.disable_seek_compaction && have_stat_update) {
mutex_.Lock();
if (get_version->current->UpdateStats(stats)) {
MaybeScheduleFlushOrCompaction();
}
if (get_version->Unref()) {
get_version->Cleanup();
delete_get_version = true;
}
mutex_.Unlock();
} else {
if (get_version->Unref()) {
mutex_.Lock();
get_version->Cleanup();
mutex_.Unlock();
delete_get_version = true;
}
}
if (delete_get_version) {
delete get_version;
}
MemTable* m = mem->Unref();
imm->Unref(&to_delete);
current->Unref();
mutex_.Unlock();
// free up all obsolete memtables outside the mutex
delete m;
for (MemTable* v: to_delete) delete v;
RecordTick(options_.statistics.get(), NUMBER_MULTIGET_CALLS);
RecordTick(options_.statistics.get(), NUMBER_MULTIGET_KEYS_READ, numKeys);
......
......@@ -249,8 +249,8 @@ class DBImpl : public DB {
return internal_comparator_.user_comparator();
}
MemTable* GetMemTable() {
return mem_;
SuperVersion* GetSuperVersion() {
return super_version_;
}
Iterator* NewInternalIterator(const ReadOptions&,
......
......@@ -56,15 +56,15 @@ Status DBImplReadOnly::Get(const ReadOptions& options,
const Slice& key,
std::string* value) {
Status s;
MemTable* mem = GetMemTable();
Version* current = versions_->current();
SequenceNumber snapshot = versions_->LastSequence();
SuperVersion* super_version = GetSuperVersion();
MergeContext merge_context;
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s, merge_context, options_)) {
if (super_version->mem->Get(lkey, value, &s, merge_context, options_)) {
} else {
Version::GetStats stats;
current->Get(options, lkey, value, &s, &merge_context, &stats, options_);
super_version->current->Get(options, lkey, value, &s, &merge_context,
&stats, options_);
}
return s;
}
......@@ -87,6 +87,9 @@ Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
DBImplReadOnly* impl = new DBImplReadOnly(options, dbname);
impl->mutex_.Lock();
Status s = impl->Recover(true /* read only */, error_if_log_file_exist);
if (s.ok()) {
delete impl->InstallSuperVersion(new DBImpl::SuperVersion());
}
impl->mutex_.Unlock();
if (s.ok()) {
*dbptr = impl;
......
......@@ -421,6 +421,10 @@ class DBTest {
return DB::Open(*options, dbname_, db);
}
Status ReadOnlyReopen(Options* options) {
return DB::OpenForReadOnly(*options, dbname_, &db_);
}
Status TryReopen(Options* options = nullptr) {
delete db_;
db_ = nullptr;
......@@ -727,6 +731,26 @@ TEST(DBTest, ReadWrite) {
} while (ChangeOptions());
}
TEST(DBTest, ReadOnlyDB) {
ASSERT_OK(Put("foo", "v1"));
ASSERT_OK(Put("bar", "v2"));
ASSERT_OK(Put("foo", "v3"));
Close();
Options options;
ASSERT_OK(ReadOnlyReopen(&options));
ASSERT_EQ("v3", Get("foo"));
ASSERT_EQ("v2", Get("bar"));
Iterator* iter = db_->NewIterator(ReadOptions());
int count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
++count;
}
ASSERT_EQ(count, 2);
delete iter;
}
// Make sure that when options.block_cache is set, after a new table is
// created its index/filter blocks are added to block cache.
TEST(DBTest, IndexAndFilterBlocksOfNewTableAddedToCache) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册