提交 1ae27113 编写于 作者: N Nathan Bronson

reduce comparisons by skiplist

Summary:
Key comparison is the single largest CPU user for CPU-bound
workloads. This diff reduces the number of comparisons in two ways.

The first is that it moves predecessor array gathering from
FindGreaterOrEqual to FindLessThan, so that FindGreaterOrEqual can
return immediately if compare_ returns 0.  As part of this change I
moved the sequential insertion optimization into Insert, to remove the
undocumented (and smelly) requirement that prev must be equal to prev_
if it is non-null.

The second optimization is that all of the search functions skip calling
compare_ when moving to a lower level that has the same Next pointer.
With a branching factor of 4 we would expect this to happen 1/4 of
the time.

On a single-threaded CPU-bound workload (-benchmarks=fillrandom -threads=1
-batch_size=1 -memtablerep=skip_list -value_size=0 --num=1600000
-level0_slowdown_writes_trigger=9999 -level0_stop_writes_trigger=9999
-disable_auto_compactions --max_write_buffer_number=8
-max_background_flushes=8 --disable_wal --write_buffer_size=160000000)
on my dev server this is good for a 7% perf win.

Test Plan: unit tests

Reviewers: rven, ljin, yhchiang, sdong, igor

Reviewed By: igor

Subscribers: dhruba

Differential Revision: https://reviews.facebook.net/D43233
上级 b47d65b3
......@@ -120,7 +120,10 @@ class SkipList {
// values are ok.
std::atomic<int> max_height_; // Height of the entire list
// Used for optimizing sequential insert patterns
// Used for optimizing sequential insert patterns. Tricky. prev_[i] for
// i up to max_height_ is the predecessor of prev_[0] and prev_height_
// is the height of prev_[0]. prev_[0] can only be equal to head before
// insertion, in which case max_height_ and prev_height_ are 1.
Node** prev_;
int32_t prev_height_;
......@@ -138,16 +141,15 @@ class SkipList {
// Return true if key is greater than the data stored in "n"
bool KeyIsAfterNode(const Key& key, Node* n) const;
// Return the earliest node that comes at or after key.
// Returns the earliest node with a key >= key.
// Return nullptr if there is no such node.
//
// If prev is non-nullptr, fills prev[level] with pointer to previous
// node at "level" for every level in [0..max_height_-1].
Node* FindGreaterOrEqual(const Key& key, Node** prev) const;
Node* FindGreaterOrEqual(const Key& key) const;
// Return the latest node with a key < key.
// Return head_ if there is no such node.
Node* FindLessThan(const Key& key) const;
// Fills prev[level] with pointer to previous node at "level" for every
// level in [0..max_height_-1], if prev is non-null.
Node* FindLessThan(const Key& key, Node** prev = nullptr) const;
// Return the last node in the list.
// Return head_ if list is empty.
......@@ -244,7 +246,7 @@ inline void SkipList<Key, Comparator>::Iterator::Prev() {
template<typename Key, class Comparator>
inline void SkipList<Key, Comparator>::Iterator::Seek(const Key& target) {
node_ = list_->FindGreaterOrEqual(target, nullptr);
node_ = list_->FindGreaterOrEqual(target);
}
template<typename Key, class Comparator>
......@@ -280,59 +282,61 @@ bool SkipList<Key, Comparator>::KeyIsAfterNode(const Key& key, Node* n) const {
template<typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node* SkipList<Key, Comparator>::
FindGreaterOrEqual(const Key& key, Node** prev) const {
// Use prev as an optimization hint and fallback to slow path
if (prev && !KeyIsAfterNode(key, prev[0]->Next(0))) {
Node* x = prev[0];
Node* next = x->Next(0);
if ((x == head_) || KeyIsAfterNode(key, x)) {
// Adjust all relevant insertion points to the previous entry
for (int i = 1; i < prev_height_; i++) {
prev[i] = x;
}
return next;
}
}
// Normal lookup
FindGreaterOrEqual(const Key& key) const {
// Note: It looks like we could reduce duplication by implementing
// this function as FindLessThan(key)->Next(0), but we wouldn't be able
// to exit early on equality and the result wouldn't even be correct.
// A concurrent insert might occur after FindLessThan(key) but before
// we get a chance to call Next(0).
Node* x = head_;
int level = GetMaxHeight() - 1;
Node* last_bigger = nullptr;
while (true) {
Node* next = x->Next(level);
// Make sure the lists are sorted.
// If x points to head_ or next points nullptr, it is trivially satisfied.
assert((x == head_) || (next == nullptr) || KeyIsAfterNode(next->key, x));
if (KeyIsAfterNode(key, next)) {
// Make sure the lists are sorted
assert(x == head_ || next == nullptr || KeyIsAfterNode(next->key, x));
// Make sure we haven't overshot during our search
assert(x == head_ || KeyIsAfterNode(key, x));
int cmp = (next == nullptr || next == last_bigger)
? 1 : compare_(next->key, key);
if (cmp == 0 || (cmp > 0 && level == 0)) {
return next;
} else if (cmp < 0) {
// Keep searching in this list
x = next;
} else {
if (prev != nullptr) prev[level] = x;
if (level == 0) {
return next;
} else {
// Switch to next list
level--;
}
// Switch to next list, reuse compare_() result
last_bigger = next;
level--;
}
}
}
template<typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindLessThan(const Key& key) const {
SkipList<Key, Comparator>::FindLessThan(const Key& key, Node** prev) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
// KeyIsAfter(key, last_not_after) is definitely false
Node* last_not_after = nullptr;
while (true) {
assert(x == head_ || compare_(x->key, key) < 0);
Node* next = x->Next(level);
if (next == nullptr || compare_(next->key, key) >= 0) {
assert(x == head_ || next == nullptr || KeyIsAfterNode(next->key, x));
assert(x == head_ || KeyIsAfterNode(key, x));
if (next != last_not_after && KeyIsAfterNode(key, next)) {
// Keep searching in this list
x = next;
} else {
if (prev != nullptr) {
prev[level] = x;
}
if (level == 0) {
return x;
} else {
// Switch to next list
// Switch to next list, reuse KeyIUsAfterNode() result
last_not_after = next;
level--;
}
} else {
x = next;
}
}
}
......@@ -408,12 +412,27 @@ SkipList<Key, Comparator>::SkipList(const Comparator cmp, Allocator* allocator,
template<typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
// here since Insert() is externally synchronized.
Node* x = FindGreaterOrEqual(key, prev_);
// fast path for sequential insertion
if (!KeyIsAfterNode(key, prev_[0]->NoBarrier_Next(0)) &&
(prev_[0] == head_ || KeyIsAfterNode(key, prev_[0]))) {
assert(prev_[0] != head_ || (prev_height_ == 1 && GetMaxHeight() == 1));
// Outside of this method prev_[1..max_height_] is the predecessor
// of prev_[0], and prev_height_ refers to prev_[0]. Inside Insert
// prev_[0..max_height - 1] is the predecessor of key. Switch from
// the external state to the internal
for (int i = 1; i < prev_height_; i++) {
prev_[i] = prev_[0];
}
} else {
// TODO(opt): we could use a NoBarrier predecessor search as an
// optimization for architectures where memory_order_acquire needs
// a synchronization instruction. Doesn't matter on x86
FindLessThan(key, prev_);
}
// Our data structure does not allow duplicate insertion
assert(x == nullptr || !Equal(key, x->key));
assert(prev_[0]->Next(0) == nullptr || !Equal(key, prev_[0]->Next(0)->key));
int height = RandomHeight();
if (height > GetMaxHeight()) {
......@@ -432,7 +451,7 @@ void SkipList<Key, Comparator>::Insert(const Key& key) {
max_height_.store(height, std::memory_order_relaxed);
}
x = NewNode(key, height);
Node* x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
......@@ -445,7 +464,7 @@ void SkipList<Key, Comparator>::Insert(const Key& key) {
template<typename Key, class Comparator>
bool SkipList<Key, Comparator>::Contains(const Key& key) const {
Node* x = FindGreaterOrEqual(key, nullptr);
Node* x = FindGreaterOrEqual(key);
if (x != nullptr && Equal(key, x->key)) {
return true;
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册