提交 d916593e 编写于 作者: S Stanislau Hlebik

Add Prev() for merge operator

Summary: Implement Prev() with merge operator for DBIterator. Request from mongoDB. Task 4673663.

Test Plan: make all check

Reviewers: sdong

Reviewed By: sdong

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D19743
上级 0abaed2e
......@@ -10,6 +10,7 @@
#include "db/db_iter.h"
#include <stdexcept>
#include <deque>
#include <string>
#include "db/filename.h"
#include "db/dbformat.h"
......@@ -109,9 +110,14 @@ class DBIter: public Iterator {
virtual void SeekToLast();
private:
void PrevInternal();
void FindParseableKey(ParsedInternalKey* ikey, Direction direction);
bool FindValueForCurrentKey();
bool FindValueForCurrentKeyUsingSeek();
void FindPrevUserKey();
void FindNextUserKey();
inline void FindNextUserEntry(bool skipping);
void FindNextUserEntryInternal(bool skipping);
void FindPrevUserEntry();
bool ParseKey(ParsedInternalKey* key);
void MergeValuesNewToOld();
......@@ -133,8 +139,8 @@ class DBIter: public Iterator {
SequenceNumber const sequence_;
Status status_;
IterKey saved_key_; // == current key when direction_==kReverse
std::string saved_value_; // == current raw value when direction_==kReverse
IterKey saved_key_;
std::string saved_value_;
Direction direction_;
bool valid_;
bool current_entry_is_merged_;
......@@ -160,20 +166,11 @@ inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
void DBIter::Next() {
assert(valid_);
if (direction_ == kReverse) { // Switch directions?
if (direction_ == kReverse) {
FindNextUserKey();
direction_ = kForward;
// iter_ is pointing just before the entries for this->key(),
// so advance into the range of entries for this->key() and then
// use the normal skipping code below.
if (!iter_->Valid()) {
iter_->SeekToFirst();
} else {
iter_->Next();
}
if (!iter_->Valid()) {
valid_ = false;
saved_key_.Clear();
return;
}
}
......@@ -185,7 +182,6 @@ void DBIter::Next() {
FindNextUserEntry(true /* skipping the current user key */);
}
// PRE: saved_key_ has the current user key if skipping
// POST: saved_key_ should have the next user key if valid_,
// if the current entry is a result of merge
......@@ -327,100 +323,228 @@ void DBIter::MergeValuesNewToOld() {
void DBIter::Prev() {
assert(valid_);
if (direction_ == kForward) {
FindPrevUserKey();
direction_ = kReverse;
}
PrevInternal();
}
// Throw an exception now if merge_operator is provided
// TODO: support backward iteration
if (user_merge_operator_) {
Log(logger_, "Prev not supported yet if merge_operator is provided");
throw std::logic_error("DBIter::Prev backward iteration not supported"
" if merge_operator is provided");
void DBIter::PrevInternal() {
if (!iter_->Valid()) {
valid_ = false;
return;
}
if (direction_ == kForward) { // Switch directions?
// iter_ is pointing at the current entry. Scan backwards until
// the key changes so we can use the normal reverse scanning code.
assert(iter_->Valid()); // Otherwise valid_ would have been false
ParsedInternalKey ikey;
while (iter_->Valid()) {
saved_key_.SetKey(ExtractUserKey(iter_->key()));
while (true) {
iter_->Prev();
if (FindValueForCurrentKey()) {
valid_ = true;
if (!iter_->Valid()) {
valid_ = false;
saved_key_.Clear();
ClearSavedValue();
return;
}
if (user_comparator_->Compare(ExtractUserKey(iter_->key()),
saved_key_.GetKey()) < 0) {
break;
FindParseableKey(&ikey, kReverse);
if (user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) == 0) {
FindPrevUserKey();
}
return;
}
direction_ = kReverse;
}
if (!iter_->Valid()) {
break;
}
FindParseableKey(&ikey, kReverse);
if (user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) == 0) {
FindPrevUserEntry();
FindPrevUserKey();
}
}
// We haven't found any key - iterator is not valid
assert(!iter_->Valid());
valid_ = false;
}
void DBIter::FindPrevUserEntry() {
assert(direction_ == kReverse);
uint64_t num_skipped = 0;
// This function checks, if the entry with biggest sequence_number <= sequence_
// is non kTypeDeletion. If it's not, we save value in saved_value_
bool DBIter::FindValueForCurrentKey() {
assert(iter_->Valid());
// Contains operands for merge operator.
std::deque<std::string> operands;
// last entry before merge (could be kTypeDeletion or kTypeValue)
ValueType last_not_merge_type = kTypeDeletion;
ValueType last_key_entry_type = kTypeDeletion;
ValueType value_type = kTypeDeletion;
bool saved_key_valid = true;
if (iter_->Valid()) {
do {
ParsedInternalKey ikey;
if (ParseKey(&ikey) && ikey.sequence <= sequence_) {
if ((value_type != kTypeDeletion) &&
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) < 0) {
// We encountered a non-deleted value in entries for previous keys,
break;
}
value_type = ikey.type;
if (value_type == kTypeDeletion) {
saved_key_.Clear();
ClearSavedValue();
saved_key_valid = false;
} else {
Slice raw_value = iter_->value();
if (saved_value_.capacity() > raw_value.size() + 1048576) {
std::string empty;
swap(empty, saved_value_);
}
saved_key_.SetKey(ExtractUserKey(iter_->key()));
saved_value_.assign(raw_value.data(), raw_value.size());
}
} else {
// In the case of ikey.sequence > sequence_, we might have already
// iterated to a different user key.
saved_key_valid = false;
}
num_skipped++;
// If we have sequentially iterated via numerous keys and still not
// found the prev user-key, then it is better to seek so that we can
// avoid too many key comparisons. We seek to the first occurence of
// our current key by looking for max sequence number.
if (saved_key_valid && num_skipped > max_skip_) {
num_skipped = 0;
std::string last_key;
AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(),
kMaxSequenceNumber,
kValueTypeForSeek));
iter_->Seek(last_key);
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
ParsedInternalKey ikey;
FindParseableKey(&ikey, kReverse);
size_t num_skipped = 0;
while (iter_->Valid() && ikey.sequence <= sequence_ &&
(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) == 0)) {
// We iterate too much: let's use Seek() to avoid too much key comparisons
if (num_skipped >= max_skip_) {
return FindValueForCurrentKeyUsingSeek();
}
last_key_entry_type = ikey.type;
switch (last_key_entry_type) {
case kTypeValue:
operands.clear();
saved_value_ = iter_->value().ToString();
last_not_merge_type = kTypeValue;
break;
case kTypeDeletion:
operands.clear();
last_not_merge_type = kTypeDeletion;
break;
case kTypeMerge:
assert(user_merge_operator_ != nullptr);
operands.push_back(iter_->value().ToString());
break;
default:
assert(false);
}
assert(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) == 0);
iter_->Prev();
++num_skipped;
FindParseableKey(&ikey, kReverse);
}
switch (last_key_entry_type) {
case kTypeDeletion:
valid_ = false;
return false;
case kTypeMerge:
if (last_not_merge_type == kTypeDeletion) {
user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands,
&saved_value_, logger_);
} else {
iter_->Prev();
assert(last_not_merge_type == kTypeValue);
std::string last_put_value = saved_value_;
Slice temp_slice(last_put_value);
user_merge_operator_->FullMerge(saved_key_.GetKey(), &temp_slice,
operands, &saved_value_, logger_);
}
} while (iter_->Valid());
break;
case kTypeValue:
// do nothing - we've already has value in saved_value_
break;
default:
assert(false);
break;
}
valid_ = true;
return true;
}
if (value_type == kTypeDeletion) {
// End
// This function is used in FindValueForCurrentKey.
// We use Seek() function instead of Prev() to find necessary value
bool DBIter::FindValueForCurrentKeyUsingSeek() {
std::string last_key;
AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(), sequence_,
kValueTypeForSeek));
iter_->Seek(last_key);
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
// assume there is at least one parseable key for this user key
ParsedInternalKey ikey;
FindParseableKey(&ikey, kForward);
if (ikey.type == kTypeValue || ikey.type == kTypeDeletion) {
if (ikey.type == kTypeValue) {
saved_value_ = iter_->value().ToString();
valid_ = true;
return true;
}
valid_ = false;
saved_key_.Clear();
ClearSavedValue();
direction_ = kForward;
} else {
return false;
}
// kTypeMerge. We need to collect all kTypeMerge values and save them
// in operands
std::deque<std::string> operands;
while (iter_->Valid() &&
(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) == 0) &&
ikey.type == kTypeMerge) {
operands.push_front(iter_->value().ToString());
iter_->Next();
FindParseableKey(&ikey, kForward);
}
if (!iter_->Valid() ||
(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) != 0) ||
ikey.type == kTypeDeletion) {
user_merge_operator_->FullMerge(saved_key_.GetKey(), nullptr, operands,
&saved_value_, logger_);
// Make iter_ valid and point to saved_key_
if (!iter_->Valid() ||
(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) != 0)) {
iter_->Seek(last_key);
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
}
valid_ = true;
return true;
}
const Slice& value = iter_->value();
user_merge_operator_->FullMerge(saved_key_.GetKey(), &value, operands,
&saved_value_, logger_);
valid_ = true;
return true;
}
// Used in Next to change directions
// Go to next user key
// Don't use Seek(),
// because next user key will be very close
void DBIter::FindNextUserKey() {
if (!iter_->Valid()) {
return;
}
ParsedInternalKey ikey;
FindParseableKey(&ikey, kForward);
while (iter_->Valid() &&
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) != 0) {
iter_->Next();
FindParseableKey(&ikey, kForward);
}
}
// Go to previous user_key
void DBIter::FindPrevUserKey() {
if (!iter_->Valid()) {
return;
}
size_t num_skipped = 0;
ParsedInternalKey ikey;
FindParseableKey(&ikey, kReverse);
while (iter_->Valid() &&
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) == 0) {
if (num_skipped >= max_skip_) {
num_skipped = 0;
IterKey last_key;
last_key.SetInternalKey(ParsedInternalKey(
saved_key_.GetKey(), kMaxSequenceNumber, kValueTypeForSeek));
iter_->Seek(last_key.GetKey());
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
}
iter_->Prev();
++num_skipped;
FindParseableKey(&ikey, kReverse);
}
}
// Skip all unparseable keys
void DBIter::FindParseableKey(ParsedInternalKey* ikey, Direction direction) {
while (iter_->Valid() && !ParseKey(ikey)) {
if (direction == kReverse) {
iter_->Prev();
} else {
iter_->Next();
}
}
}
......@@ -454,20 +578,13 @@ void DBIter::SeekToFirst() {
}
void DBIter::SeekToLast() {
// Throw an exception for now if merge_operator is provided
// TODO: support backward iteration
if (user_merge_operator_) {
Log(logger_, "SeekToLast not supported yet if merge_operator is provided");
throw std::logic_error("DBIter::SeekToLast: backward iteration not"
" supported if merge_operator is provided");
}
direction_ = kReverse;
ClearSavedValue();
PERF_TIMER_AUTO(seek_internal_seek_time);
iter_->SeekToLast();
PERF_TIMER_STOP(seek_internal_seek_time);
FindPrevUserEntry();
PrevInternal();
}
Iterator* NewDBIterator(Env* env, const Options& options,
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册