diff --git a/CMakeLists.txt b/CMakeLists.txt index 997f57cc6d38c76270437b92cfaeeee5214f5987..51749bae0d42c3fcf07854d267b60dd921048c29 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -326,6 +326,7 @@ set(TESTS db/file_indexer_test.cc db/filename_test.cc db/flush_job_test.cc + db/inlineskiplist_test.cc db/listener_test.cc db/log_test.cc db/manual_compaction_test.cc diff --git a/Makefile b/Makefile index bca757b38b52f0aec68ade1b693b4fc12f9ca88f..2ef49fb3c56ff2a97f04228ff7ea810dd7876024 100644 --- a/Makefile +++ b/Makefile @@ -274,6 +274,7 @@ TESTS = \ block_based_filter_block_test \ full_filter_block_test \ histogram_test \ + inlineskiplist_test \ log_test \ manual_compaction_test \ memenv_test \ @@ -607,7 +608,7 @@ asan_crash_test: $(MAKE) clean valgrind_check: $(TESTS) - for t in $(filter-out skiplist_test,$(TESTS)); do \ + for t in $(filter-out %skiplist_test,$(TESTS)); do \ $(VALGRIND_VER) $(VALGRIND_OPTS) ./$$t; \ ret_code=$$?; \ if [ $$ret_code -ne 0 ]; then \ @@ -864,6 +865,9 @@ table_test: table/table_test.o $(LIBOBJECTS) $(TESTHARNESS) block_test: table/block_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +inlineskiplist_test: db/inlineskiplist_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + skiplist_test: db/skiplist_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/db/inlineskiplist.h b/db/inlineskiplist.h new file mode 100644 index 0000000000000000000000000000000000000000..dad87b37304113271309412228e8e1550cdcc58e --- /dev/null +++ b/db/inlineskiplist.h @@ -0,0 +1,482 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// Thread safety ------------- +// +// Writes require external synchronization, most likely a mutex. Reads +// require a guarantee that the InlineSkipList will not be destroyed while +// the read is in progress. Apart from that, reads progress without any +// internal locking or synchronization. +// +// Invariants: +// +// (1) Allocated nodes are never deleted until the InlineSkipList is +// destroyed. This is trivially guaranteed by the code since we never +// delete any skip list nodes. +// +// (2) The contents of a Node except for the next/prev pointers are +// immutable after the Node has been linked into the InlineSkipList. +// Only Insert() modifies the list, and it is careful to initialize a +// node and use release-stores to publish the nodes in one or more lists. +// +// ... prev vs. next pointer ordering ... +// + +#pragma once +#include +#include +#include +#include "port/port.h" +#include "util/allocator.h" +#include "util/random.h" + +namespace rocksdb { + +template +class InlineSkipList { + private: + struct Node; + + public: + // Create a new InlineSkipList object that will use "cmp" for comparing + // keys, and will allocate memory using "*allocator". Objects allocated + // in the allocator must remain allocated for the lifetime of the + // skiplist object. + explicit InlineSkipList(Comparator cmp, Allocator* allocator, + int32_t max_height = 12, + int32_t branching_factor = 4); + + // Insert key into the list. + // REQUIRES: nothing that compares equal to key is currently in the list. + void Insert(const Key& key); + + // Returns true iff an entry that compares equal to key is in the list. + bool Contains(const Key& key) const; + + // Return estimated number of entries smaller than `key`. + uint64_t EstimateCount(const Key& key) const; + + // Iteration over the contents of a skip list + class Iterator { + public: + // Initialize an iterator over the specified list. + // The returned iterator is not valid. + explicit Iterator(const InlineSkipList* list); + + // Change the underlying skiplist used for this iterator + // This enables us not changing the iterator without deallocating + // an old one and then allocating a new one + void SetList(const InlineSkipList* list); + + // Returns true iff the iterator is positioned at a valid node. + bool Valid() const; + + // Returns the key at the current position. + // REQUIRES: Valid() + const Key& key() const; + + // Advances to the next position. + // REQUIRES: Valid() + void Next(); + + // Advances to the previous position. + // REQUIRES: Valid() + void Prev(); + + // Advance to the first entry with a key >= target + void Seek(const Key& target); + + // Position at the first entry in list. + // Final state of iterator is Valid() iff list is not empty. + void SeekToFirst(); + + // Position at the last entry in list. + // Final state of iterator is Valid() iff list is not empty. + void SeekToLast(); + + private: + const InlineSkipList* list_; + Node* node_; + // Intentionally copyable + }; + + private: + const uint16_t kMaxHeight_; + const uint16_t kBranching_; + const uint32_t kScaledInverseBranching_; + + // Immutable after construction + Comparator const compare_; + Allocator* const allocator_; // Allocator used for allocations of nodes + + Node* const head_; + + // Modified only by Insert(). Read racily by readers, but stale + // values are ok. + std::atomic max_height_; // Height of the entire list + + // Used for optimizing sequential insert patterns. Tricky. prev_[i] for + // i up to max_height_ is the predecessor of prev_[0] and prev_height_ + // is the height of prev_[0]. prev_[0] can only be equal to head before + // insertion, in which case max_height_ and prev_height_ are 1. + Node** prev_; + int32_t prev_height_; + + inline int GetMaxHeight() const { + return max_height_.load(std::memory_order_relaxed); + } + + Node* NewNode(const Key& key, int height); + int RandomHeight(); + bool Equal(const Key& a, const Key& b) const { return (compare_(a, b) == 0); } + + // Return true if key is greater than the data stored in "n" + bool KeyIsAfterNode(const Key& key, Node* n) const; + + // Returns the earliest node with a key >= key. + // Return nullptr if there is no such node. + Node* FindGreaterOrEqual(const Key& key) const; + + // Return the latest node with a key < key. + // Return head_ if there is no such node. + // Fills prev[level] with pointer to previous node at "level" for every + // level in [0..max_height_-1], if prev is non-null. + Node* FindLessThan(const Key& key, Node** prev = nullptr) const; + + // Return the last node in the list. + // Return head_ if list is empty. + Node* FindLast() const; + + // No copying allowed + InlineSkipList(const InlineSkipList&); + void operator=(const InlineSkipList&); +}; + +// Implementation details follow +template +struct InlineSkipList::Node { + explicit Node(const Key& k) : key(k) {} + + Key const key; + + // Accessors/mutators for links. Wrapped in methods so we can + // add the appropriate barriers as necessary. + Node* Next(int n) { + assert(n >= 0); + // Use an 'acquire load' so that we observe a fully initialized + // version of the returned Node. + return (next_[n].load(std::memory_order_acquire)); + } + void SetNext(int n, Node* x) { + assert(n >= 0); + // Use a 'release store' so that anybody who reads through this + // pointer observes a fully initialized version of the inserted node. + next_[n].store(x, std::memory_order_release); + } + + // No-barrier variants that can be safely used in a few locations. + Node* NoBarrier_Next(int n) { + assert(n >= 0); + return next_[n].load(std::memory_order_relaxed); + } + void NoBarrier_SetNext(int n, Node* x) { + assert(n >= 0); + next_[n].store(x, std::memory_order_relaxed); + } + + private: + // Array of length equal to the node height. next_[0] is lowest level link. + std::atomic next_[1]; +}; + +template +typename InlineSkipList::Node* +InlineSkipList::NewNode(const Key& key, int height) { + char* mem = allocator_->AllocateAligned( + sizeof(Node) + sizeof(std::atomic) * (height - 1)); + return new (mem) Node(key); +} + +template +inline InlineSkipList::Iterator::Iterator( + const InlineSkipList* list) { + SetList(list); +} + +template +inline void InlineSkipList::Iterator::SetList( + const InlineSkipList* list) { + list_ = list; + node_ = nullptr; +} + +template +inline bool InlineSkipList::Iterator::Valid() const { + return node_ != nullptr; +} + +template +inline const Key& InlineSkipList::Iterator::key() const { + assert(Valid()); + return node_->key; +} + +template +inline void InlineSkipList::Iterator::Next() { + assert(Valid()); + node_ = node_->Next(0); +} + +template +inline void InlineSkipList::Iterator::Prev() { + // Instead of using explicit "prev" links, we just search for the + // last node that falls before key. + assert(Valid()); + node_ = list_->FindLessThan(node_->key); + if (node_ == list_->head_) { + node_ = nullptr; + } +} + +template +inline void InlineSkipList::Iterator::Seek(const Key& target) { + node_ = list_->FindGreaterOrEqual(target); +} + +template +inline void InlineSkipList::Iterator::SeekToFirst() { + node_ = list_->head_->Next(0); +} + +template +inline void InlineSkipList::Iterator::SeekToLast() { + node_ = list_->FindLast(); + if (node_ == list_->head_) { + node_ = nullptr; + } +} + +template +int InlineSkipList::RandomHeight() { + auto rnd = Random::GetTLSInstance(); + + // Increase height with probability 1 in kBranching + int height = 1; + while (height < kMaxHeight_ && rnd->Next() < kScaledInverseBranching_) { + height++; + } + assert(height > 0); + assert(height <= kMaxHeight_); + return height; +} + +template +bool InlineSkipList::KeyIsAfterNode(const Key& key, + Node* n) const { + // nullptr n is considered infinite + return (n != nullptr) && (compare_(n->key, key) < 0); +} + +template +typename InlineSkipList::Node* +InlineSkipList::FindGreaterOrEqual(const Key& key) const { + // Note: It looks like we could reduce duplication by implementing + // this function as FindLessThan(key)->Next(0), but we wouldn't be able + // to exit early on equality and the result wouldn't even be correct. + // A concurrent insert might occur after FindLessThan(key) but before + // we get a chance to call Next(0). + Node* x = head_; + int level = GetMaxHeight() - 1; + Node* last_bigger = nullptr; + while (true) { + Node* next = x->Next(level); + // Make sure the lists are sorted + assert(x == head_ || next == nullptr || KeyIsAfterNode(next->key, x)); + // Make sure we haven't overshot during our search + assert(x == head_ || KeyIsAfterNode(key, x)); + int cmp = + (next == nullptr || next == last_bigger) ? 1 : compare_(next->key, key); + if (cmp == 0 || (cmp > 0 && level == 0)) { + return next; + } else if (cmp < 0) { + // Keep searching in this list + x = next; + } else { + // Switch to next list, reuse compare_() result + last_bigger = next; + level--; + } + } +} + +template +typename InlineSkipList::Node* +InlineSkipList::FindLessThan(const Key& key, + Node** prev) const { + Node* x = head_; + int level = GetMaxHeight() - 1; + // KeyIsAfter(key, last_not_after) is definitely false + Node* last_not_after = nullptr; + while (true) { + Node* next = x->Next(level); + assert(x == head_ || next == nullptr || KeyIsAfterNode(next->key, x)); + assert(x == head_ || KeyIsAfterNode(key, x)); + if (next != last_not_after && KeyIsAfterNode(key, next)) { + // Keep searching in this list + x = next; + } else { + if (prev != nullptr) { + prev[level] = x; + } + if (level == 0) { + return x; + } else { + // Switch to next list, reuse KeyIUsAfterNode() result + last_not_after = next; + level--; + } + } + } +} + +template +typename InlineSkipList::Node* +InlineSkipList::FindLast() const { + Node* x = head_; + int level = GetMaxHeight() - 1; + while (true) { + Node* next = x->Next(level); + if (next == nullptr) { + if (level == 0) { + return x; + } else { + // Switch to next list + level--; + } + } else { + x = next; + } + } +} + +template +uint64_t InlineSkipList::EstimateCount(const Key& key) const { + uint64_t count = 0; + + Node* x = head_; + int level = GetMaxHeight() - 1; + while (true) { + assert(x == head_ || compare_(x->key, key) < 0); + Node* next = x->Next(level); + if (next == nullptr || compare_(next->key, key) >= 0) { + if (level == 0) { + return count; + } else { + // Switch to next list + count *= kBranching_; + level--; + } + } else { + x = next; + count++; + } + } +} + +template +InlineSkipList::InlineSkipList(const Comparator cmp, + Allocator* allocator, + int32_t max_height, + int32_t branching_factor) + : kMaxHeight_(max_height), + kBranching_(branching_factor), + kScaledInverseBranching_((Random::kMaxNext + 1) / kBranching_), + compare_(cmp), + allocator_(allocator), + head_(NewNode(0 /* any key will do */, max_height)), + max_height_(1), + prev_height_(1) { + assert(max_height > 0 && kMaxHeight_ == static_cast(max_height)); + assert(branching_factor > 0 && + kBranching_ == static_cast(branching_factor)); + assert(kScaledInverseBranching_ > 0); + // Allocate the prev_ Node* array, directly from the passed-in allocator. + // prev_ does not need to be freed, as its life cycle is tied up with + // the allocator as a whole. + prev_ = reinterpret_cast( + allocator_->AllocateAligned(sizeof(Node*) * kMaxHeight_)); + for (int i = 0; i < kMaxHeight_; i++) { + head_->SetNext(i, nullptr); + prev_[i] = head_; + } +} + +template +void InlineSkipList::Insert(const Key& key) { + // fast path for sequential insertion + if (!KeyIsAfterNode(key, prev_[0]->NoBarrier_Next(0)) && + (prev_[0] == head_ || KeyIsAfterNode(key, prev_[0]))) { + assert(prev_[0] != head_ || (prev_height_ == 1 && GetMaxHeight() == 1)); + + // Outside of this method prev_[1..max_height_] is the predecessor + // of prev_[0], and prev_height_ refers to prev_[0]. Inside Insert + // prev_[0..max_height - 1] is the predecessor of key. Switch from + // the external state to the internal + for (int i = 1; i < prev_height_; i++) { + prev_[i] = prev_[0]; + } + } else { + // TODO(opt): we could use a NoBarrier predecessor search as an + // optimization for architectures where memory_order_acquire needs + // a synchronization instruction. Doesn't matter on x86 + FindLessThan(key, prev_); + } + + // Our data structure does not allow duplicate insertion + assert(prev_[0]->Next(0) == nullptr || !Equal(key, prev_[0]->Next(0)->key)); + + int height = RandomHeight(); + if (height > GetMaxHeight()) { + for (int i = GetMaxHeight(); i < height; i++) { + prev_[i] = head_; + } + // fprintf(stderr, "Change height from %d to %d\n", max_height_, height); + + // It is ok to mutate max_height_ without any synchronization + // with concurrent readers. A concurrent reader that observes + // the new value of max_height_ will see either the old value of + // new level pointers from head_ (nullptr), or a new value set in + // the loop below. In the former case the reader will + // immediately drop to the next level since nullptr sorts after all + // keys. In the latter case the reader will use the new node. + max_height_.store(height, std::memory_order_relaxed); + } + + Node* x = NewNode(key, height); + for (int i = 0; i < height; i++) { + // NoBarrier_SetNext() suffices since we will add a barrier when + // we publish a pointer to "x" in prev[i]. + x->NoBarrier_SetNext(i, prev_[i]->NoBarrier_Next(i)); + prev_[i]->SetNext(i, x); + } + prev_[0] = x; + prev_height_ = height; +} + +template +bool InlineSkipList::Contains(const Key& key) const { + Node* x = FindGreaterOrEqual(key); + if (x != nullptr && Equal(key, x->key)) { + return true; + } else { + return false; + } +} + +} // namespace rocksdb diff --git a/db/inlineskiplist_test.cc b/db/inlineskiplist_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..39fa7b4df91758f0d14a2ba4be9f041b17a9c4db --- /dev/null +++ b/db/inlineskiplist_test.cc @@ -0,0 +1,373 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/inlineskiplist.h" +#include +#include "rocksdb/env.h" +#include "util/arena.h" +#include "util/hash.h" +#include "util/random.h" +#include "util/testharness.h" + +namespace rocksdb { + +typedef uint64_t Key; + +struct TestComparator { + int operator()(const Key& a, const Key& b) const { + if (a < b) { + return -1; + } else if (a > b) { + return +1; + } else { + return 0; + } + } +}; + +class InlineSkipTest : public testing::Test {}; + +TEST_F(InlineSkipTest, Empty) { + Arena arena; + TestComparator cmp; + InlineSkipList list(cmp, &arena); + ASSERT_TRUE(!list.Contains(10)); + + InlineSkipList::Iterator iter(&list); + ASSERT_TRUE(!iter.Valid()); + iter.SeekToFirst(); + ASSERT_TRUE(!iter.Valid()); + iter.Seek(100); + ASSERT_TRUE(!iter.Valid()); + iter.SeekToLast(); + ASSERT_TRUE(!iter.Valid()); +} + +TEST_F(InlineSkipTest, InsertAndLookup) { + const int N = 2000; + const int R = 5000; + Random rnd(1000); + std::set keys; + Arena arena; + TestComparator cmp; + InlineSkipList list(cmp, &arena); + for (int i = 0; i < N; i++) { + Key key = rnd.Next() % R; + if (keys.insert(key).second) { + list.Insert(key); + } + } + + for (int i = 0; i < R; i++) { + if (list.Contains(i)) { + ASSERT_EQ(keys.count(i), 1U); + } else { + ASSERT_EQ(keys.count(i), 0U); + } + } + + // Simple iterator tests + { + InlineSkipList::Iterator iter(&list); + ASSERT_TRUE(!iter.Valid()); + + iter.Seek(0); + ASSERT_TRUE(iter.Valid()); + ASSERT_EQ(*(keys.begin()), iter.key()); + + iter.SeekToFirst(); + ASSERT_TRUE(iter.Valid()); + ASSERT_EQ(*(keys.begin()), iter.key()); + + iter.SeekToLast(); + ASSERT_TRUE(iter.Valid()); + ASSERT_EQ(*(keys.rbegin()), iter.key()); + } + + // Forward iteration test + for (int i = 0; i < R; i++) { + InlineSkipList::Iterator iter(&list); + iter.Seek(i); + + // Compare against model iterator + std::set::iterator model_iter = keys.lower_bound(i); + for (int j = 0; j < 3; j++) { + if (model_iter == keys.end()) { + ASSERT_TRUE(!iter.Valid()); + break; + } else { + ASSERT_TRUE(iter.Valid()); + ASSERT_EQ(*model_iter, iter.key()); + ++model_iter; + iter.Next(); + } + } + } + + // Backward iteration test + { + InlineSkipList::Iterator iter(&list); + iter.SeekToLast(); + + // Compare against model iterator + for (std::set::reverse_iterator model_iter = keys.rbegin(); + model_iter != keys.rend(); ++model_iter) { + ASSERT_TRUE(iter.Valid()); + ASSERT_EQ(*model_iter, iter.key()); + iter.Prev(); + } + ASSERT_TRUE(!iter.Valid()); + } +} + +// We want to make sure that with a single writer and multiple +// concurrent readers (with no synchronization other than when a +// reader's iterator is created), the reader always observes all the +// data that was present in the skip list when the iterator was +// constructor. Because insertions are happening concurrently, we may +// also observe new values that were inserted since the iterator was +// constructed, but we should never miss any values that were present +// at iterator construction time. +// +// We generate multi-part keys: +// +// where: +// key is in range [0..K-1] +// gen is a generation number for key +// hash is hash(key,gen) +// +// The insertion code picks a random key, sets gen to be 1 + the last +// generation number inserted for that key, and sets hash to Hash(key,gen). +// +// At the beginning of a read, we snapshot the last inserted +// generation number for each key. We then iterate, including random +// calls to Next() and Seek(). For every key we encounter, we +// check that it is either expected given the initial snapshot or has +// been concurrently added since the iterator started. +class ConcurrentTest { + private: + static const uint32_t K = 4; + + static uint64_t key(Key key) { return (key >> 40); } + static uint64_t gen(Key key) { return (key >> 8) & 0xffffffffu; } + static uint64_t hash(Key key) { return key & 0xff; } + + static uint64_t HashNumbers(uint64_t k, uint64_t g) { + uint64_t data[2] = {k, g}; + return Hash(reinterpret_cast(data), sizeof(data), 0); + } + + static Key MakeKey(uint64_t k, uint64_t g) { + assert(sizeof(Key) == sizeof(uint64_t)); + assert(k <= K); // We sometimes pass K to seek to the end of the skiplist + assert(g <= 0xffffffffu); + return ((k << 40) | (g << 8) | (HashNumbers(k, g) & 0xff)); + } + + static bool IsValidKey(Key k) { + return hash(k) == (HashNumbers(key(k), gen(k)) & 0xff); + } + + static Key RandomTarget(Random* rnd) { + switch (rnd->Next() % 10) { + case 0: + // Seek to beginning + return MakeKey(0, 0); + case 1: + // Seek to end + return MakeKey(K, 0); + default: + // Seek to middle + return MakeKey(rnd->Next() % K, 0); + } + } + + // Per-key generation + struct State { + std::atomic generation[K]; + void Set(int k, int v) { + generation[k].store(v, std::memory_order_release); + } + int Get(int k) { return generation[k].load(std::memory_order_acquire); } + + State() { + for (unsigned int k = 0; k < K; k++) { + Set(k, 0); + } + } + }; + + // Current state of the test + State current_; + + Arena arena_; + + // InlineSkipList is not protected by mu_. We just use a single writer + // thread to modify it. + InlineSkipList list_; + + public: + ConcurrentTest() : list_(TestComparator(), &arena_) {} + + // REQUIRES: External synchronization + void WriteStep(Random* rnd) { + const uint32_t k = rnd->Next() % K; + const int g = current_.Get(k) + 1; + const Key new_key = MakeKey(k, g); + list_.Insert(new_key); + current_.Set(k, g); + } + + void ReadStep(Random* rnd) { + // Remember the initial committed state of the skiplist. + State initial_state; + for (unsigned int k = 0; k < K; k++) { + initial_state.Set(k, current_.Get(k)); + } + + Key pos = RandomTarget(rnd); + InlineSkipList::Iterator iter(&list_); + iter.Seek(pos); + while (true) { + Key current; + if (!iter.Valid()) { + current = MakeKey(K, 0); + } else { + current = iter.key(); + ASSERT_TRUE(IsValidKey(current)) << current; + } + ASSERT_LE(pos, current) << "should not go backwards"; + + // Verify that everything in [pos,current) was not present in + // initial_state. + while (pos < current) { + ASSERT_LT(key(pos), K) << pos; + + // Note that generation 0 is never inserted, so it is ok if + // <*,0,*> is missing. + ASSERT_TRUE((gen(pos) == 0U) || + (gen(pos) > static_cast(initial_state.Get( + static_cast(key(pos)))))) + << "key: " << key(pos) << "; gen: " << gen(pos) + << "; initgen: " << initial_state.Get(static_cast(key(pos))); + + // Advance to next key in the valid key space + if (key(pos) < key(current)) { + pos = MakeKey(key(pos) + 1, 0); + } else { + pos = MakeKey(key(pos), gen(pos) + 1); + } + } + + if (!iter.Valid()) { + break; + } + + if (rnd->Next() % 2) { + iter.Next(); + pos = MakeKey(key(pos), gen(pos) + 1); + } else { + Key new_target = RandomTarget(rnd); + if (new_target > pos) { + pos = new_target; + iter.Seek(new_target); + } + } + } + } +}; +const uint32_t ConcurrentTest::K; + +// Simple test that does single-threaded testing of the ConcurrentTest +// scaffolding. +TEST_F(InlineSkipTest, ConcurrentWithoutThreads) { + ConcurrentTest test; + Random rnd(test::RandomSeed()); + for (int i = 0; i < 10000; i++) { + test.ReadStep(&rnd); + test.WriteStep(&rnd); + } +} + +class TestState { + public: + ConcurrentTest t_; + int seed_; + std::atomic quit_flag_; + + enum ReaderState { STARTING, RUNNING, DONE }; + + explicit TestState(int s) + : seed_(s), quit_flag_(false), state_(STARTING), state_cv_(&mu_) {} + + void Wait(ReaderState s) { + mu_.Lock(); + while (state_ != s) { + state_cv_.Wait(); + } + mu_.Unlock(); + } + + void Change(ReaderState s) { + mu_.Lock(); + state_ = s; + state_cv_.Signal(); + mu_.Unlock(); + } + + private: + port::Mutex mu_; + ReaderState state_; + port::CondVar state_cv_; +}; + +static void ConcurrentReader(void* arg) { + TestState* state = reinterpret_cast(arg); + Random rnd(state->seed_); + int64_t reads = 0; + state->Change(TestState::RUNNING); + while (!state->quit_flag_.load(std::memory_order_acquire)) { + state->t_.ReadStep(&rnd); + ++reads; + } + state->Change(TestState::DONE); +} + +static void RunConcurrent(int run) { + const int seed = test::RandomSeed() + (run * 100); + Random rnd(seed); + const int N = 1000; + const int kSize = 1000; + for (int i = 0; i < N; i++) { + if ((i % 100) == 0) { + fprintf(stderr, "Run %d of %d\n", i, N); + } + TestState state(seed + 1); + Env::Default()->Schedule(ConcurrentReader, &state); + state.Wait(TestState::RUNNING); + for (int k = 0; k < kSize; k++) { + state.t_.WriteStep(&rnd); + } + state.quit_flag_.store(true, std::memory_order_release); + state.Wait(TestState::DONE); + } +} + +TEST_F(InlineSkipTest, Concurrent1) { RunConcurrent(1); } +TEST_F(InlineSkipTest, Concurrent2) { RunConcurrent(2); } +TEST_F(InlineSkipTest, Concurrent3) { RunConcurrent(3); } +TEST_F(InlineSkipTest, Concurrent4) { RunConcurrent(4); } +TEST_F(InlineSkipTest, Concurrent5) { RunConcurrent(5); } + +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src.mk b/src.mk index b6cc81cac81f6aa85220fff2133cd50e9b02e170..05c084ac5137332ef1c18bf5283ce8b275f579cb 100644 --- a/src.mk +++ b/src.mk @@ -201,6 +201,7 @@ TEST_BENCH_SOURCES = \ db/file_indexer_test.cc \ db/filename_test.cc \ db/flush_job_test.cc \ + db/inlineskiplist_test.cc \ db/listener_test.cc \ db/log_and_apply_bench.cc \ db/log_test.cc \