From 09b6bf828a808264992cc8b1c562e45d10b6b07b Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Fri, 23 Mar 2018 12:12:15 -0700 Subject: [PATCH] InlineSkiplist: don't decode keys unnecessarily during comparisons Summary: Summary ======== `InlineSkipList<>::Insert` takes the `key` parameter as a C-string. Then, it performs multiple comparisons with it requiring the `GetLengthPrefixedSlice()` to be spawn in `MemTable::KeyComparator::operator()(const char* prefix_len_key1, const char* prefix_len_key2)` on the same data over and over. The patch tries to optimize that. Rough performance comparison ===== Big keys, no compression. ``` $ ./db_bench --writes 20000000 --benchmarks="fillrandom" --compression_type none -key_size 256 (...) fillrandom : 4.222 micros/op 236836 ops/sec; 80.4 MB/s ``` ``` $ ./db_bench --writes 20000000 --benchmarks="fillrandom" --compression_type none -key_size 256 (...) fillrandom : 4.064 micros/op 246059 ops/sec; 83.5 MB/s ``` TODO ====== In ~~a separated~~ this PR: - [x] Go outside the write path. Maybe even eradicate the C-string-taking variant of `KeyIsAfterNode` entirely. - [x] Try to cache the transformations applied by `KeyComparator` & friends in situations where we havy many comparisons with the same key. Closes https://github.com/facebook/rocksdb/pull/3516 Differential Revision: D7059300 Pulled By: ajkr fbshipit-source-id: 6f027dbb619a488129f79f79b5f7dbe566fb2dbb --- db/memtable.cc | 2 +- db/memtable.h | 2 +- include/rocksdb/memtablerep.h | 12 ++++++- memtable/inlineskiplist.h | 55 +++++++++++++++++++++++---------- memtable/inlineskiplist_test.cc | 16 ++++++++++ 5 files changed, 67 insertions(+), 20 deletions(-) diff --git a/db/memtable.cc b/db/memtable.cc index 384275c3d..467790135 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -229,7 +229,7 @@ int MemTable::KeyComparator::operator()(const char* prefix_len_key1, } int MemTable::KeyComparator::operator()(const char* prefix_len_key, - const Slice& key) + const KeyComparator::DecodedType& key) const { // Internal keys are encoded as length-prefixed strings. Slice a = GetLengthPrefixedSlice(prefix_len_key); diff --git a/db/memtable.h b/db/memtable.h index 43c959e3c..6082a8e6c 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -84,7 +84,7 @@ class MemTable { virtual int operator()(const char* prefix_len_key1, const char* prefix_len_key2) const override; virtual int operator()(const char* prefix_len_key, - const Slice& key) const override; + const DecodedType& key) const override; }; // MemTables are reference counted. The initial reference count diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index e14a66438..4b6e897a6 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -39,24 +39,34 @@ #include #include #include +#include namespace rocksdb { class Arena; class Allocator; class LookupKey; -class Slice; class SliceTransform; class Logger; typedef void* KeyHandle; +extern Slice GetLengthPrefixedSlice(const char* data); + class MemTableRep { public: // KeyComparator provides a means to compare keys, which are internal keys // concatenated with values. class KeyComparator { public: + typedef rocksdb::Slice DecodedType; + + virtual DecodedType decode_key(const char* key) const { + // The format of key is frozen and can be terated as a part of the API + // contract. Refer to MemTable::Add for details. + return GetLengthPrefixedSlice(key); + } + // Compare a and b. Return a negative value if a is less than b, 0 if they // are equal, and a positive value if a is greater than b virtual int operator()(const char* prefix_len_key1, diff --git a/memtable/inlineskiplist.h b/memtable/inlineskiplist.h index 702a7336d..efcb93c85 100644 --- a/memtable/inlineskiplist.h +++ b/memtable/inlineskiplist.h @@ -45,9 +45,12 @@ #include #include #include +#include #include "port/likely.h" #include "port/port.h" +#include "rocksdb/slice.h" #include "util/allocator.h" +#include "util/coding.h" #include "util/random.h" namespace rocksdb { @@ -59,6 +62,9 @@ class InlineSkipList { struct Splice; public: + using DecodedKey = \ + typename std::remove_reference::type::DecodedType; + static const uint16_t kMaxPossibleHeight = 32; // Create a new InlineSkipList object that will use "cmp" for comparing @@ -212,6 +218,7 @@ class InlineSkipList { // Return true if key is greater than the data stored in "n". Null n // is considered infinite. n should not be head_. bool KeyIsAfterNode(const char* key, Node* n) const; + bool KeyIsAfterNode(const DecodedKey& key, Node* n) const; // Returns the earliest node with a key >= key. // Return nullptr if there is no such node. @@ -241,12 +248,12 @@ class InlineSkipList { // a node that is after the key. after should be nullptr if a good after // node isn't conveniently available. template - void FindSpliceForLevel(const char* key, Node* before, Node* after, int level, + void FindSpliceForLevel(const DecodedKey& key, Node* before, Node* after, int level, Node** out_prev, Node** out_next); // Recomputes Splice levels from highest_level (inclusive) down to // lowest_level (inclusive). - void RecomputeSpliceLevels(const char* key, Splice* splice, + void RecomputeSpliceLevels(const DecodedKey& key, Splice* splice, int recompute_level); // No copying allowed @@ -435,6 +442,14 @@ bool InlineSkipList::KeyIsAfterNode(const char* key, return (n != nullptr) && (compare_(n->Key(), key) < 0); } +template +bool InlineSkipList::KeyIsAfterNode(const DecodedKey& key, + Node* n) const { + // nullptr n is considered infinite + assert(n != head_); + return (n != nullptr) && (compare_(n->Key(), key) < 0); +} + template typename InlineSkipList::Node* InlineSkipList::FindGreaterOrEqual(const char* key) const { @@ -446,6 +461,7 @@ InlineSkipList::FindGreaterOrEqual(const char* key) const { Node* x = head_; int level = GetMaxHeight() - 1; Node* last_bigger = nullptr; + const DecodedKey key_decoded = compare_.decode_key(key); while (true) { Node* next = x->Next(level); if (next != nullptr) { @@ -454,10 +470,10 @@ InlineSkipList::FindGreaterOrEqual(const char* key) const { // Make sure the lists are sorted assert(x == head_ || next == nullptr || KeyIsAfterNode(next->Key(), x)); // Make sure we haven't overshot during our search - assert(x == head_ || KeyIsAfterNode(key, x)); + assert(x == head_ || KeyIsAfterNode(key_decoded, x)); int cmp = (next == nullptr || next == last_bigger) ? 1 - : compare_(next->Key(), key); + : compare_(next->Key(), key_decoded); if (cmp == 0 || (cmp > 0 && level == 0)) { return next; } else if (cmp < 0) { @@ -487,6 +503,7 @@ InlineSkipList::FindLessThan(const char* key, Node** prev, Node* x = root; // KeyIsAfter(key, last_not_after) is definitely false Node* last_not_after = nullptr; + const DecodedKey key_decoded = compare_.decode_key(key); while (true) { assert(x != nullptr); Node* next = x->Next(level); @@ -494,8 +511,8 @@ InlineSkipList::FindLessThan(const char* key, Node** prev, PREFETCH(next->Next(level), 0, 1); } assert(x == head_ || next == nullptr || KeyIsAfterNode(next->Key(), x)); - assert(x == head_ || KeyIsAfterNode(key, x)); - if (next != last_not_after && KeyIsAfterNode(key, next)) { + assert(x == head_ || KeyIsAfterNode(key_decoded, x)); + if (next != last_not_after && KeyIsAfterNode(key_decoded, next)) { // Keep searching in this list assert(next != nullptr); x = next; @@ -540,13 +557,14 @@ uint64_t InlineSkipList::EstimateCount(const char* key) const { Node* x = head_; int level = GetMaxHeight() - 1; + const DecodedKey key_decoded = compare_.decode_key(key); while (true) { - assert(x == head_ || compare_(x->Key(), key) < 0); + assert(x == head_ || compare_(x->Key(), key_decoded) < 0); Node* next = x->Next(level); if (next != nullptr) { PREFETCH(next->Next(level), 0, 1); } - if (next == nullptr || compare_(next->Key(), key) >= 0) { + if (next == nullptr || compare_(next->Key(), key_decoded) >= 0) { if (level == 0) { return count; } else { @@ -654,7 +672,7 @@ bool InlineSkipList::InsertWithHint(const char* key, void** hint) { template template -void InlineSkipList::FindSpliceForLevel(const char* key, +void InlineSkipList::FindSpliceForLevel(const DecodedKey& key, Node* before, Node* after, int level, Node** out_prev, Node** out_next) { @@ -682,7 +700,7 @@ void InlineSkipList::FindSpliceForLevel(const char* key, } template -void InlineSkipList::RecomputeSpliceLevels(const char* key, +void InlineSkipList::RecomputeSpliceLevels(const DecodedKey& key, Splice* splice, int recompute_level) { assert(recompute_level > 0); @@ -698,6 +716,7 @@ template bool InlineSkipList::Insert(const char* key, Splice* splice, bool allow_partial_splice_fix) { Node* x = reinterpret_cast(const_cast(key)) - 1; + const DecodedKey key_decoded = compare_.decode_key(key); int height = x->UnstashHeight(); assert(height >= 1 && height <= kMaxHeight_); @@ -765,7 +784,8 @@ bool InlineSkipList::Insert(const char* key, Splice* splice, // our chances of success. ++recompute_height; } else if (splice->prev_[recompute_height] != head_ && - !KeyIsAfterNode(key, splice->prev_[recompute_height])) { + !KeyIsAfterNode(key_decoded, + splice->prev_[recompute_height])) { // key is from before splice if (allow_partial_splice_fix) { // skip all levels with the same node without more comparisons @@ -777,7 +797,8 @@ bool InlineSkipList::Insert(const char* key, Splice* splice, // we're pessimistic, recompute everything recompute_height = max_height; } - } else if (KeyIsAfterNode(key, splice->next_[recompute_height])) { + } else if (KeyIsAfterNode(key_decoded, + splice->next_[recompute_height])) { // key is from after splice if (allow_partial_splice_fix) { Node* bad = splice->next_[recompute_height]; @@ -795,7 +816,7 @@ bool InlineSkipList::Insert(const char* key, Splice* splice, } assert(recompute_height <= max_height); if (recompute_height > 0) { - RecomputeSpliceLevels(key, splice, recompute_height); + RecomputeSpliceLevels(key_decoded, splice, recompute_height); } bool splice_is_valid = true; @@ -827,8 +848,8 @@ bool InlineSkipList::Insert(const char* key, Splice* splice, // search, because it should be unlikely that lots of nodes have // been inserted between prev[i] and next[i]. No point in using // next[i] as the after hint, because we know it is stale. - FindSpliceForLevel(key, splice->prev_[i], nullptr, i, &splice->prev_[i], - &splice->next_[i]); + FindSpliceForLevel(key_decoded, splice->prev_[i], nullptr, i, + &splice->prev_[i], &splice->next_[i]); // Since we've narrowed the bracket for level i, we might have // violated the Splice constraint between i and i-1. Make sure @@ -842,8 +863,8 @@ bool InlineSkipList::Insert(const char* key, Splice* splice, for (int i = 0; i < height; ++i) { if (i >= recompute_height && splice->prev_[i]->Next(i) != splice->next_[i]) { - FindSpliceForLevel(key, splice->prev_[i], nullptr, i, &splice->prev_[i], - &splice->next_[i]); + FindSpliceForLevel(key_decoded, splice->prev_[i], nullptr, i, + &splice->prev_[i], &splice->next_[i]); } // Checking for duplicate keys on the level 0 is sufficient if (UNLIKELY(i == 0 && splice->next_[i] != nullptr && diff --git a/memtable/inlineskiplist_test.cc b/memtable/inlineskiplist_test.cc index 70fd11a76..10667396c 100644 --- a/memtable/inlineskiplist_test.cc +++ b/memtable/inlineskiplist_test.cc @@ -32,6 +32,12 @@ static Key Decode(const char* key) { } struct TestComparator { + typedef Key DecodedType; + + static DecodedType decode_key(const char* b) { + return Decode(b); + } + int operator()(const char* a, const char* b) const { if (Decode(a) < Decode(b)) { return -1; @@ -41,6 +47,16 @@ struct TestComparator { return 0; } } + + int operator()(const char* a, const DecodedType b) const { + if (Decode(a) < b) { + return -1; + } else if (Decode(a) > b) { + return +1; + } else { + return 0; + } + } }; typedef InlineSkipList TestInlineSkipList; -- GitLab