提交 2482d5fb 编写于 作者: A Aaron Gao

support Prev() in prefix seek mode

Summary: As title, make sure Prev() works as expected with Next() when the current iter->key() in the range of the same prefix in prefix seek mode

Test Plan: make all check -j64 (add prefix_test with PrefixSeekModePrev test case)

Reviewers: andrewkr, sdong, IslamAbdelRahman

Reviewed By: IslamAbdelRahman

Subscribers: yoshinorim, andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D61419
上级 7541c7a7
......@@ -3718,7 +3718,8 @@ InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
InternalIterator* internal_iter;
assert(arena != nullptr);
// Need to create internal iterator from the arena.
MergeIteratorBuilder merge_iter_builder(&cfd->internal_comparator(), arena);
MergeIteratorBuilder merge_iter_builder(&cfd->internal_comparator(), arena,
cfd->ioptions()->prefix_extractor);
// Collect iterator for mutable mem
merge_iter_builder.AddIterator(
super_version->mem->NewIterator(read_options, arena));
......
......@@ -21,6 +21,7 @@
#include "db/column_family.h"
#include "db/compaction_job.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
#include "db/flush_job.h"
#include "db/flush_scheduler.h"
......
......@@ -116,6 +116,7 @@ class DBIter: public Iterator {
direction_(kForward),
valid_(false),
current_entry_is_merged_(false),
prefix_is_saved_(false),
statistics_(ioptions.statistics),
version_number_(version_number),
iterate_upper_bound_(iterate_upper_bound),
......@@ -203,6 +204,7 @@ class DBIter: public Iterator {
virtual void SeekToLast() override;
private:
void ReverseToForward();
void ReverseToBackward();
void PrevInternal();
void FindParseableKey(ParsedInternalKey* ikey, Direction direction);
......@@ -255,6 +257,8 @@ class DBIter: public Iterator {
Direction direction_;
bool valid_;
bool current_entry_is_merged_;
// for prefix seek mode to support prev()
bool prefix_is_saved_;
Statistics* statistics_;
uint64_t max_skip_;
uint64_t version_number_;
......@@ -293,11 +297,7 @@ void DBIter::Next() {
// Release temporarily pinned blocks from last operation
ReleaseTempPinnedData();
if (direction_ == kReverse) {
FindNextUserKey();
direction_ = kForward;
if (!iter_->Valid()) {
iter_->SeekToFirst();
}
ReverseToForward();
} else if (iter_->Valid() && !current_entry_is_merged_) {
// If the current value is not a merge, the iter position is the
// current key, which is already returned. We can safely issue a
......@@ -510,7 +510,20 @@ void DBIter::Prev() {
}
}
void DBIter::ReverseToForward() {
FindNextUserKey();
direction_ = kForward;
if (!iter_->Valid()) {
iter_->SeekToFirst();
}
}
void DBIter::ReverseToBackward() {
if (prefix_extractor_ != nullptr) {
Slice prefix = prefix_extractor_->Transform(key());
iter_->ResetPrefix(&prefix);
prefix_is_saved_ = true;
}
if (current_entry_is_merged_) {
// Not placed in the same key. Need to call Prev() until finding the
// previous key.
......@@ -729,6 +742,11 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
// Don't use Seek(),
// because next user key will be very close
void DBIter::FindNextUserKey() {
if (prefix_extractor_ != nullptr) {
Slice prefix = prefix_extractor_->Transform(key());
iter_->ResetPrefix(&prefix);
prefix_is_saved_ = true;
}
if (!iter_->Valid()) {
return;
}
......@@ -792,7 +810,6 @@ void DBIter::Seek(const Slice& target) {
PERF_TIMER_GUARD(seek_internal_seek_time);
iter_->Seek(saved_key_.GetKey());
}
RecordTick(statistics_, NUMBER_DB_SEEK);
if (iter_->Valid()) {
if (prefix_extractor_ && prefix_same_as_start_) {
......@@ -813,6 +830,11 @@ void DBIter::Seek(const Slice& target) {
} else {
valid_ = false;
}
// Need to reset prefix if change direction
if (prefix_is_saved_) {
iter_->ResetPrefix();
prefix_is_saved_ = false;
}
if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
prefix_start_buf_.SetKey(prefix_start_key_);
prefix_start_key_ = prefix_start_buf_.GetKey();
......
......@@ -21,14 +21,16 @@ int main() {
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/perf_context.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/table.h"
#include "util/histogram.h"
#include "util/random.h"
#include "util/stop_watch.h"
#include "util/string_util.h"
#include "util/testharness.h"
#include "utilities/merge_operators.h"
using GFLAGS::ParseCommandLineFlags;
......@@ -46,6 +48,7 @@ DEFINE_int32(skiplist_height, 4, "");
DEFINE_double(memtable_prefix_bloom_size_ratio, 0.1, "");
DEFINE_int32(memtable_huge_page_size, 2 * 1024 * 1024, "");
DEFINE_int32(value_size, 40, "");
DEFINE_bool(enable_print, false, "Print options generated to console.");
// Path to the database on file system
const std::string kDbName = rocksdb::test::TmpDir() + "/prefix_test";
......@@ -106,6 +109,10 @@ class TestKeyComparator : public Comparator {
return 0;
}
bool operator()(const TestKey& a, const TestKey& b) const {
return Compare(TestKeyToSlice(a), TestKeyToSlice(b)) < 0;
}
virtual const char* Name() const override {
return "TestKeyComparator";
}
......@@ -124,6 +131,23 @@ void PutKey(DB* db, WriteOptions write_options, uint64_t prefix,
ASSERT_OK(db->Put(write_options, key, value));
}
void PutKey(DB* db, WriteOptions write_options, const TestKey& test_key,
const Slice& value) {
Slice key = TestKeyToSlice(test_key);
ASSERT_OK(db->Put(write_options, key, value));
}
void MergeKey(DB* db, WriteOptions write_options, const TestKey& test_key,
const Slice& value) {
Slice key = TestKeyToSlice(test_key);
ASSERT_OK(db->Merge(write_options, key, value));
}
void DeleteKey(DB* db, WriteOptions write_options, const TestKey& test_key) {
Slice key = TestKeyToSlice(test_key);
ASSERT_OK(db->Delete(write_options, key));
}
void SeekIterator(Iterator* iter, uint64_t prefix, uint64_t suffix) {
TestKey test_key(prefix, suffix);
Slice key = TestKeyToSlice(test_key);
......@@ -629,8 +653,115 @@ TEST_F(PrefixTest, DynamicPrefixIterator) {
}
}
TEST_F(PrefixTest, PrefixSeekModePrev) {
// Only for SkipListFactory
options.memtable_factory.reset(new SkipListFactory);
options.merge_operator = MergeOperators::CreatePutOperator();
options.write_buffer_size = 1024 * 1024;
Random rnd(1);
for (size_t m = 1; m < 100; m++) {
std::cout << "[" + std::to_string(m) + "]" + "*** Mem table: "
<< options.memtable_factory->Name() << std::endl;
DestroyDB(kDbName, Options());
auto db = OpenDb();
WriteOptions write_options;
ReadOptions read_options;
std::map<TestKey, std::string, TestKeyComparator> entry_maps[3], whole_map;
for (uint64_t i = 0; i < 10; i++) {
int div = i % 3 + 1;
for (uint64_t j = 0; j < 10; j++) {
whole_map[TestKey(i, j)] = entry_maps[rnd.Uniform(div)][TestKey(i, j)] =
'v' + std::to_string(i) + std::to_string(j);
}
}
std::map<TestKey, std::string, TestKeyComparator> type_map;
for (size_t i = 0; i < 3; i++) {
for (auto& kv : entry_maps[i]) {
if (rnd.OneIn(3)) {
PutKey(db.get(), write_options, kv.first, kv.second);
type_map[kv.first] = "value";
} else {
MergeKey(db.get(), write_options, kv.first, kv.second);
type_map[kv.first] = "merge";
}
}
if (i < 2) {
db->Flush(FlushOptions());
}
}
for (size_t i = 0; i < 2; i++) {
for (auto& kv : entry_maps[i]) {
if (rnd.OneIn(10)) {
whole_map.erase(kv.first);
DeleteKey(db.get(), write_options, kv.first);
entry_maps[2][kv.first] = "delete";
}
}
}
if (FLAGS_enable_print) {
for (size_t i = 0; i < 3; i++) {
for (auto& kv : entry_maps[i]) {
std::cout << "[" << i << "]" << kv.first.prefix << kv.first.sorted
<< " " << kv.second + " " + type_map[kv.first] << std::endl;
}
}
}
std::unique_ptr<Iterator> iter(db->NewIterator(read_options));
for (uint64_t prefix = 0; prefix < 10; prefix++) {
uint64_t start_suffix = rnd.Uniform(9);
SeekIterator(iter.get(), prefix, start_suffix);
auto it = whole_map.find(TestKey(prefix, start_suffix));
if (it == whole_map.end()) {
continue;
}
ASSERT_NE(it, whole_map.end());
ASSERT_TRUE(iter->Valid());
if (FLAGS_enable_print) {
std::cout << "round " << prefix
<< " iter: " << SliceToTestKey(iter->key())->prefix
<< SliceToTestKey(iter->key())->sorted
<< " | map: " << it->first.prefix << it->first.sorted << " | "
<< iter->value().ToString() << " " << it->second << std::endl;
}
ASSERT_EQ(iter->value(), it->second);
for (size_t k = 0; k < 9; k++) {
if (rnd.OneIn(2) || it == whole_map.begin()) {
iter->Next();
it++;
if (FLAGS_enable_print) {
std::cout << "Next >> ";
}
} else {
iter->Prev();
it--;
if (FLAGS_enable_print) {
std::cout << "Prev >> ";
}
}
if (!iter->Valid() || SliceToTestKey(iter->key())->prefix != prefix) {
break;
}
ASSERT_TRUE(iter->Valid());
ASSERT_NE(it, whole_map.end());
ASSERT_EQ(iter->value(), it->second);
if (FLAGS_enable_print) {
std::cout << "iter: " << SliceToTestKey(iter->key())->prefix
<< SliceToTestKey(iter->key())->sorted
<< " | map: " << it->first.prefix << it->first.sorted
<< " | " << iter->value().ToString() << " " << it->second
<< std::endl;
}
}
}
}
}
} // end namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
ParseCommandLineFlags(&argc, &argv, true);
......
......@@ -89,6 +89,8 @@ class InternalIterator : public Cleanable {
return Status::NotSupported("");
}
virtual void ResetPrefix(const Slice* prefix = nullptr) {}
private:
// No copying allowed
InternalIterator(const InternalIterator&) = delete;
......
......@@ -6,11 +6,12 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "table/merger.h"
#include <string>
#include <vector>
#include "db/dbformat.h"
#include "db/pinned_iterators_manager.h"
#include "rocksdb/comparator.h"
#include "rocksdb/iterator.h"
......@@ -20,6 +21,7 @@
#include "table/iterator_wrapper.h"
#include "util/arena.h"
#include "util/autovector.h"
#include "util/coding.h"
#include "util/heap.h"
#include "util/perf_context_imp.h"
#include "util/stop_watch.h"
......@@ -37,12 +39,14 @@ const size_t kNumIterReserve = 4;
class MergingIterator : public InternalIterator {
public:
MergingIterator(const Comparator* comparator, InternalIterator** children,
int n, bool is_arena_mode)
int n, bool is_arena_mode,
const SliceTransform* const prefix_extractor)
: is_arena_mode_(is_arena_mode),
comparator_(comparator),
current_(nullptr),
direction_(kForward),
minHeap_(comparator_),
prefix_extractor_(prefix_extractor),
pinned_iters_mgr_(nullptr) {
children_.resize(n);
for (int i = 0; i < n; i++) {
......@@ -109,8 +113,8 @@ class MergingIterator : public InternalIterator {
PERF_TIMER_GUARD(seek_child_seek_time);
child.Seek(target);
}
PERF_COUNTER_ADD(seek_child_seek_count, 1);
PERF_COUNTER_ADD(seek_child_seek_count, 1);
if (child.Valid()) {
PERF_TIMER_GUARD(seek_min_heap_time);
minHeap_.push(&child);
......@@ -125,7 +129,6 @@ class MergingIterator : public InternalIterator {
virtual void Next() override {
assert(Valid());
// Ensure that all children are positioned after key().
// If we are moving in the forward direction, it is already
// true for all of the non-current children since current_ is
......@@ -136,13 +139,30 @@ class MergingIterator : public InternalIterator {
ClearHeaps();
for (auto& child : children_) {
if (&child != current_) {
child.Seek(key());
if (prefix_extractor_ == nullptr) {
child.Seek(key());
} else {
// only for prefix_seek_mode
// we should not call Seek() here
if (child.Valid()) {
child.Next();
} else {
child.SeekToFirst();
}
}
if (child.Valid() && comparator_->Equal(key(), child.key())) {
child.Next();
}
}
if (child.Valid()) {
minHeap_.push(&child);
bool skip_iter =
prefix_extractor_ != nullptr &&
prefix_extractor_->InDomain(ExtractUserKey(child.key())) &&
prefix_extractor_->Transform(ExtractUserKey(child.key())) !=
Slice(*prefix_);
if (&child == current_ || !skip_iter) {
minHeap_.push(&child);
}
}
}
direction_ = kForward;
......@@ -182,7 +202,12 @@ class MergingIterator : public InternalIterator {
InitMaxHeap();
for (auto& child : children_) {
if (&child != current_) {
child.Seek(key());
if (prefix_extractor_ == nullptr) {
child.Seek(key());
} else {
// only for prefix_seek_mode
// we should not call Seek() here
}
if (child.Valid()) {
// Child is at first entry >= key(). Step back one to be < key()
TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child);
......@@ -193,8 +218,16 @@ class MergingIterator : public InternalIterator {
child.SeekToLast();
}
}
if (child.Valid()) {
maxHeap_->push(&child);
bool skip_iter =
prefix_extractor_ != nullptr &&
prefix_extractor_->InDomain(ExtractUserKey(child.key())) &&
prefix_extractor_->Transform(ExtractUserKey(child.key())) !=
Slice(*prefix_);
if (&child == current_ || !skip_iter) {
maxHeap_->push(&child);
}
}
}
direction_ = kReverse;
......@@ -263,6 +296,17 @@ class MergingIterator : public InternalIterator {
current_->IsValuePinned();
}
virtual void ResetPrefix(const Slice* prefix) override {
if (prefix == nullptr) {
prefix_.reset();
return;
}
if (!prefix_) {
prefix_.reset(new std::string);
}
*prefix_ = prefix->ToString();
}
private:
// Clears heaps for both directions, used when changing direction or seeking
void ClearHeaps();
......@@ -288,7 +332,9 @@ class MergingIterator : public InternalIterator {
// Max heap is used for reverse iteration, which is way less common than
// forward. Lazily initialize it to save memory.
std::unique_ptr<MergerMaxIterHeap> maxHeap_;
const SliceTransform* const prefix_extractor_;
PinnedIteratorsManager* pinned_iters_mgr_;
std::unique_ptr<std::string> prefix_;
IteratorWrapper* CurrentForward() const {
assert(direction_ == kForward);
......@@ -315,9 +361,9 @@ void MergingIterator::InitMaxHeap() {
}
}
InternalIterator* NewMergingIterator(const Comparator* cmp,
InternalIterator** list, int n,
Arena* arena) {
InternalIterator* NewMergingIterator(
const Comparator* cmp, InternalIterator** list, int n, Arena* arena,
const SliceTransform* const prefix_extractor) {
assert(n >= 0);
if (n == 0) {
return NewEmptyInternalIterator(arena);
......@@ -325,20 +371,21 @@ InternalIterator* NewMergingIterator(const Comparator* cmp,
return list[0];
} else {
if (arena == nullptr) {
return new MergingIterator(cmp, list, n, false);
return new MergingIterator(cmp, list, n, false, prefix_extractor);
} else {
auto mem = arena->AllocateAligned(sizeof(MergingIterator));
return new (mem) MergingIterator(cmp, list, n, true);
return new (mem) MergingIterator(cmp, list, n, true, prefix_extractor);
}
}
}
MergeIteratorBuilder::MergeIteratorBuilder(const Comparator* comparator,
Arena* a)
MergeIteratorBuilder::MergeIteratorBuilder(
const Comparator* comparator, Arena* a,
const SliceTransform* const prefix_extractor)
: first_iter(nullptr), use_merging_iter(false), arena(a) {
auto mem = arena->AllocateAligned(sizeof(MergingIterator));
merge_iter = new (mem) MergingIterator(comparator, nullptr, 0, true);
merge_iter =
new (mem) MergingIterator(comparator, nullptr, 0, true, prefix_extractor);
}
void MergeIteratorBuilder::AddIterator(InternalIterator* iter) {
......
......@@ -9,6 +9,7 @@
#pragma once
#include "rocksdb/slice_transform.h"
#include "rocksdb/types.h"
namespace rocksdb {
......@@ -26,9 +27,10 @@ class Arena;
// key is present in K child iterators, it will be yielded K times.
//
// REQUIRES: n >= 0
extern InternalIterator* NewMergingIterator(const Comparator* comparator,
InternalIterator** children, int n,
Arena* arena = nullptr);
extern InternalIterator* NewMergingIterator(
const Comparator* comparator, InternalIterator** children, int n,
Arena* arena = nullptr,
const SliceTransform* const prefix_extractor = nullptr);
class MergingIterator;
......@@ -37,7 +39,9 @@ class MergeIteratorBuilder {
public:
// comparator: the comparator used in merging comparator
// arena: where the merging iterator needs to be allocated from.
explicit MergeIteratorBuilder(const Comparator* comparator, Arena* arena);
explicit MergeIteratorBuilder(
const Comparator* comparator, Arena* arena,
const SliceTransform* const prefix_extractor = nullptr);
~MergeIteratorBuilder() {}
// Add iter to the merging iterator.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册