提交 0a97143d 编写于 作者: N noobpwnftw 提交者: 奏之章

Fix Patricia Trie memtable

上级 5b9ad139
......@@ -991,6 +991,7 @@ if(WITH_TESTS)
env/mock_env_test.cc
memtable/inlineskiplist_test.cc
memtable/skiplist_test.cc
memtable/terark_zip_memtable_test.cc
memtable/write_buffer_manager_test.cc
monitoring/histogram_test.cc
monitoring/iostats_context_test.cc
......
......@@ -65,7 +65,8 @@ DEFINE_string(memtablerep, "skiplist",
"\tvector -- backed by an std::vector\n"
"\thashskiplist -- backed by a hash skip list\n"
"\thashlinklist -- backed by a hash linked list\n"
"\tcuckoo -- backed by a cuckoo hash table");
"\tcuckoo -- backed by a cuckoo hash table\n"
"\tpatricia_trie -- backed by a patricia trie\n");
DEFINE_int64(bucket_count, 1000000,
"bucket_count parameter to pass into NewHashSkiplistRepFactory or "
......@@ -247,24 +248,13 @@ class FillBenchmarkThread : public BenchmarkThread {
num_ops, read_hits) {}
void FillOne() {
char* buf = nullptr;
auto internal_key_size = 16;
auto encoded_len =
FLAGS_item_size + VarintLength(internal_key_size) + internal_key_size;
KeyHandle handle = table_->Allocate(encoded_len, &buf);
assert(buf != nullptr);
char* p = EncodeVarint32(buf, internal_key_size);
std::string user_key;
auto key = key_gen_->Next();
EncodeFixed64(p, key);
p += 8;
EncodeFixed64(p, ++(*sequence_));
p += 8;
PutFixed64(&user_key, key);
InternalKey internal_key(user_key, ++(*sequence_), kTypeValue);
Slice bytes = generator_.Generate(FLAGS_item_size);
memcpy(p, bytes.data(), FLAGS_item_size);
p += FLAGS_item_size;
assert(p == buf + encoded_len);
table_->Insert(handle);
*bytes_written_ += encoded_len;
table_->InsertKeyValue(internal_key.Encode(), bytes);
*bytes_written_ += MemTableRep::EncodeKeyValueSize(internal_key.Encode(), bytes);
}
void operator()() override {
......@@ -287,9 +277,7 @@ class ConcurrentFillBenchmarkThread : public FillBenchmarkThread {
}
void operator()() override {
// # of read threads will be total threads - write threads (always 1). Loop
// while all reads complete.
while ((*threads_done_).load() < (FLAGS_num_threads - 1)) {
for (unsigned int i = 0; i < num_ops_; ++i) {
FillOne();
}
}
......@@ -591,6 +579,8 @@ int main(int argc, char** argv) {
if (FLAGS_memtablerep == "skiplist") {
factory.reset(new rocksdb::SkipListFactory);
#ifndef ROCKSDB_LITE
} else if (FLAGS_memtablerep == "patricia_trie") {
factory.reset(rocksdb::NewPatriciaTrieRepFactory());
} else if (FLAGS_memtablerep == "vector") {
factory.reset(new rocksdb::VectorRepFactory);
} else if (FLAGS_memtablerep == "hashskiplist") {
......
......@@ -41,8 +41,8 @@ namespace rocksdb {
namespace details = terark_memtable_details;
static const uint32_t LOCK_FLAG = 1u << 31;
static const uint32_t SIZE_MASK = ~LOCK_FLAG;
static const uint64_t LOCK_FLAG = 1ULL << 63;
static const uint32_t SIZE_MASK = INT32_MAX;
bool MemWriterToken::init_value(void* valptr, size_t valsize) noexcept {
assert(valsize == sizeof(uint32_t));
......@@ -67,8 +67,7 @@ bool MemWriterToken::init_value(void* valptr, size_t valsize) noexcept {
data->loc = (uint32_t)value_loc;
data->tag = tag_;
auto* vector = (details::tag_vector_t*)trie->mem_get(vector_loc);
vector->loc.store((uint32_t)data_loc, std::memory_order_release);
vector->size.store(1, std::memory_order_release);
vector->size_loc.store((1ULL << 32) + data_loc, std::memory_order_release);
*(uint32_t*)valptr = (uint32_t)vector_loc;
return true;
} while (false);
......@@ -164,10 +163,9 @@ bool PatriciaTrieRep::Contains(const Slice& internal_key) const {
if (trie->lookup(find_key, token)) {
auto vector =
(details::tag_vector_t*)trie->mem_get(*(uint32_t*)token->value());
size_t size = vector->size.load(std::memory_order_relaxed) & SIZE_MASK;
auto data = (details::tag_vector_t::data_t*)trie->mem_get(
vector->loc.load(std::memory_order_relaxed));
bool ret = terark::binary_search_0(data, size, tag);
uint64_t size_loc = vector->size_loc.load(std::memory_order_relaxed);
auto data = (details::tag_vector_t::data_t*)trie->mem_get((uint32_t)size_loc);
bool ret = terark::binary_search_0(data, (size_loc >> 32) & SIZE_MASK, tag);
token->idle();
return ret;
}
......@@ -200,14 +198,14 @@ void PatriciaTrieRep::Get(const LookupKey& k, void* callback_args,
Slice internal_key = k.internal_key();
auto find_key = terark::fstring(internal_key.data(), internal_key.size() - 8);
uint64_t tag = DecodeFixed64(find_key.end());
auto tag = ExtractInternalKeyFooter(internal_key);
auto do_callback = [&](HeapItem* heap) -> bool {
build_key(find_key, heap->tag, buffer);
auto trie = heap->trie();
auto vector = (details::tag_vector_t*)trie->mem_get(heap->loc);
auto data = (details::tag_vector_t::data_t*)trie->mem_get(
vector->loc.load(std::memory_order_relaxed));
(uint32_t)vector->size_loc.load(std::memory_order_relaxed));
auto value = (const char*)trie->mem_get(data[heap->idx].loc);
return callback_func(callback_args, Slice(buffer->data(), buffer->size()),
value);
......@@ -225,10 +223,9 @@ void PatriciaTrieRep::Get(const LookupKey& k, void* callback_args,
if (trie_vec_[i]->lookup(find_key, token)) {
uint32_t loc = token->value_of<uint32_t>();
auto vector = (details::tag_vector_t*)trie->mem_get(loc);
size_t size = vector->size.load(std::memory_order_relaxed) & SIZE_MASK;
auto data = (details::tag_vector_t::data_t*)trie->mem_get(
vector->loc.load(std::memory_order_relaxed));
size_t idx = terark::upper_bound_0(data, size, tag) - 1;
uint64_t size_loc = vector->size_loc.load(std::memory_order_relaxed);
auto data = (details::tag_vector_t::data_t*)trie->mem_get((uint32_t)size_loc);
size_t idx = terark::upper_bound_0(data, (size_loc >> 32) & SIZE_MASK, tag) - 1;
if (idx != size_t(-1)) {
heap.emplace_back(HeapItem{(uint32_t)idx, loc, data[idx].tag, token});
continue;
......@@ -260,7 +257,7 @@ void PatriciaTrieRep::Get(const LookupKey& k, void* callback_args,
auto idx = --item.idx;
auto trie = item.trie();
auto vector = (details::tag_vector_t*)(trie->mem_get(item.loc));
auto dataloc = vector->loc.load(std::memory_order_relaxed);
auto dataloc = (uint32_t)vector->size_loc.load(std::memory_order_relaxed);
auto data = (details::tag_vector_t::data_t*)(trie->mem_get(dataloc));
item.tag = data[idx].tag;
terark::adjust_heap_top(heap.begin(), heap.size(), heap_comp);
......@@ -315,22 +312,22 @@ bool PatriciaTrieRep::InsertKeyValue(const Slice& internal_key,
auto valptr = (char*)trie->mem_get(value_loc);
valptr = EncodeVarint32(valptr, (uint32_t)value.size());
memcpy(valptr, value.data(), value.size());
uint32_t size;
uint64_t size_loc;
// row lock: infinite spin on LOCK_FLAG
do {
do {
size = vector->size.load(std::memory_order_relaxed);
} while (size & LOCK_FLAG);
size = vector->size.fetch_or(LOCK_FLAG, std::memory_order_acq_rel);
} while (size & LOCK_FLAG);
uint32_t data_loc = vector->loc.load(std::memory_order_relaxed);
auto* data = (details::tag_vector_t::data_t*)trie->mem_get(data_loc);
size_loc = vector->size_loc.load(std::memory_order_relaxed);
} while (size_loc & LOCK_FLAG);
size_loc = vector->size_loc.fetch_or(LOCK_FLAG, std::memory_order_acq_rel);
} while (size_loc & LOCK_FLAG);
auto* data = (details::tag_vector_t::data_t*)trie->mem_get((uint32_t)size_loc);
uint32_t size = (size_loc >> 32);
assert(size > 0);
size_t insert_pos = terark::lower_bound_ex_n(
data, 0, size, tag >> 8,
[](details::tag_vector_t::data_t& item) { return item.tag >> 8; });
if (insert_pos < size && (tag >> 8) == (data[insert_pos].tag >> 8)) {
vector->size.store(size, std::memory_order_release);
vector->size_loc.store(size_loc, std::memory_order_release);
trie->mem_free(value_loc, value_size);
return details::InsertResult::Duplicated;
}
......@@ -339,14 +336,15 @@ bool PatriciaTrieRep::InsertKeyValue(const Slice& internal_key,
data[size].tag = tag;
// update 'size' and unlock
vector->size.store(size + 1, std::memory_order_release);
vector->size_loc.store(size_loc + (1ULL << 32), std::memory_order_release);
return details::InsertResult::Success;
}
size_t old_data_cap = sizeof(details::tag_vector_t::data_t) * size;
size_t old_data_cap = sizeof(details::tag_vector_t::data_t) *
(1u << (32 - details::tag_vector_t::full(size) - fast_clz32(size)));
size_t cow_data_loc = trie->mem_alloc(
old_data_cap * (1 + details::tag_vector_t::full(size)));
if (cow_data_loc == MainPatricia::mem_alloc_fail) {
vector->size.store(size, std::memory_order_release);
vector->size_loc.store(size_loc, std::memory_order_release);
trie->mem_free(value_loc, value_size);
return details::InsertResult::Fail;
}
......@@ -356,11 +354,10 @@ bool PatriciaTrieRep::InsertKeyValue(const Slice& internal_key,
sizeof(details::tag_vector_t::data_t) * insert_pos);
cow_data[insert_pos].loc = (uint32_t)value_loc;
cow_data[insert_pos].tag = tag;
memcpy(cow_data + insert_pos, data + insert_pos + 1,
memcpy(cow_data + insert_pos + 1, data + insert_pos,
sizeof(details::tag_vector_t::data_t) * (size - insert_pos));
vector->loc.store((uint32_t)cow_data_loc, std::memory_order_relaxed);
vector->size.store(size + 1, std::memory_order_release);
trie->mem_lazy_free(data_loc, old_data_cap);
vector->size_loc.store((uint64_t(size + 1) << 32) + cow_data_loc, std::memory_order_release);
trie->mem_lazy_free((uint32_t)size_loc, old_data_cap);
return details::InsertResult::Success;
} else if (token->value() != nullptr) {
const auto token_value_loc = token->value_of<uint32_t>();
......@@ -385,7 +382,6 @@ bool PatriciaTrieRep::InsertKeyValue(const Slice& internal_key,
// tool lambda fn end
// function start
if (handle_duplicate_) {
uint64_t tag = DecodeFixed64(key.end());
for (size_t i = 0; i < trie_vec_size_; ++i) {
auto* trie = trie_vec_[i];
auto token = trie->tls_reader_token();
......@@ -394,10 +390,9 @@ bool PatriciaTrieRep::InsertKeyValue(const Slice& internal_key,
if (trie->lookup(key, token)) {
auto vector =
(details::tag_vector_t*)trie->mem_get(token->value_of<uint32_t>());
size_t size = vector->size.load(std::memory_order_relaxed) & SIZE_MASK;
auto data = (details::tag_vector_t::data_t*)trie->mem_get(
vector->loc.load(std::memory_order_relaxed));
if (terark::binary_search_0(data, size, tag)) {
uint64_t size_loc = vector->size_loc.load(std::memory_order_relaxed);
auto data = (details::tag_vector_t::data_t*)trie->mem_get((uint32_t)size_loc);
if (terark::binary_search_0(data, (size_loc >> 32) & SIZE_MASK, tag)) {
return false;
}
}
......@@ -430,10 +425,10 @@ PatriciaRepIterator<heap_mode>::HeapItem::GetVector() {
auto trie = static_cast<terark::MainPatricia*>(handle->trie());
auto vectorloc = handle->value_of<uint32_t>();
auto vector = (details::tag_vector_t*)trie->mem_get(vectorloc);
auto size = vector->size.load(std::memory_order_relaxed) & SIZE_MASK;
auto dataloc = vector->loc.load(std::memory_order_relaxed);
uint64_t size_loc = vector->size_loc.load(std::memory_order_relaxed);
auto dataloc = (uint32_t)size_loc;
auto data = (details::tag_vector_t::data_t*)trie->mem_get(dataloc);
return {size, data};
return {(size_loc >> 32) & SIZE_MASK, data};
}
template <bool heap_mode>
......@@ -441,7 +436,8 @@ uint32_t PatriciaRepIterator<heap_mode>::HeapItem::GetValue() const {
auto trie = static_cast<terark::MainPatricia*>(handle->trie());
auto vectorloc = handle->value_of<uint32_t>();
auto vector = (details::tag_vector_t*)trie->mem_get(vectorloc);
auto dataloc = vector->loc.load(std::memory_order_relaxed);
uint64_t size_loc = vector->size_loc.load(std::memory_order_relaxed);
auto dataloc = (uint32_t)size_loc;
auto data = (details::tag_vector_t::data_t*)trie->mem_get(dataloc);
return data[index].loc;
}
......@@ -626,7 +622,7 @@ void PatriciaRepIterator<heap_mode>::Next() {
if (heap_mode) {
if (direction_ != 1) {
terark::fstring find_key(buffer_.data(), buffer_.size() - 8);
uint64_t tag = DecodeFixed64(find_key.end());
auto tag = ExtractInternalKeyFooter(buffer_);
Rebuild<1>([&](HeapItem* item) {
item->Seek(find_key, tag);
return item->index != size_t(-1);
......@@ -661,7 +657,7 @@ void PatriciaRepIterator<heap_mode>::Prev() {
if (heap_mode) {
if (direction_ != -1) {
terark::fstring find_key(buffer_.data(), buffer_.size() - 8);
uint64_t tag = DecodeFixed64(find_key.end());
auto tag = ExtractInternalKeyFooter(buffer_);
Rebuild<-1>([&](HeapItem* item) {
item->SeekForPrev(find_key, tag);
return item->index != size_t(-1);
......@@ -695,13 +691,15 @@ template <bool heap_mode>
void PatriciaRepIterator<heap_mode>::Seek(const Slice& user_key,
const char* memtable_key) {
terark::fstring find_key;
uint64_t tag;
if (memtable_key != nullptr) {
Slice internal_key = GetLengthPrefixedSlice(memtable_key);
find_key = terark::fstring(internal_key.data(), internal_key.size() - 8);
tag = ExtractInternalKeyFooter(internal_key);
} else {
find_key = terark::fstring(user_key.data(), user_key.size() - 8);
tag = ExtractInternalKeyFooter(user_key);
}
uint64_t tag = DecodeFixed64(find_key.end());
if (heap_mode) {
Rebuild<1>([&](HeapItem* item) {
......@@ -727,13 +725,15 @@ template <bool heap_mode>
void PatriciaRepIterator<heap_mode>::SeekForPrev(const Slice& user_key,
const char* memtable_key) {
terark::fstring find_key;
uint64_t tag;
if (memtable_key != nullptr) {
Slice internal_key = GetLengthPrefixedSlice(memtable_key);
find_key = terark::fstring(internal_key.data(), internal_key.size() - 8);
tag = ExtractInternalKeyFooter(internal_key);
} else {
find_key = terark::fstring(user_key.data(), user_key.size() - 8);
tag = ExtractInternalKeyFooter(user_key);
}
uint64_t tag = DecodeFixed64(find_key.end());
if (heap_mode) {
Rebuild<-1>([&](HeapItem* item) {
......
......@@ -55,14 +55,13 @@ enum class InsertResult { Success, Duplicated, Fail };
#pragma pack(push)
#pragma pack(4)
struct tag_vector_t {
std::atomic<uint32_t> size;
std::atomic<uint32_t> loc;
std::atomic<uint64_t> size_loc;
struct data_t {
uint64_t tag;
uint32_t loc;
operator uint64_t() const { return tag; }
};
static bool full(uint32_t size) { return ((size - 1) & size) == 0; }
static bool full(uint32_t size) { return !((size - 1) & size); }
};
#pragma pack(pop)
......
// Copyright (c) 2020-present, Bytedance Inc. All rights reserved.
// This source code is licensed under Apache 2.0 License.
#include "terark_zip_memtable.h"
#include <inttypes.h>
#include <atomic>
#include <chrono>
#include <string>
#include <memory>
#include "db/dbformat.h"
#include "gtest/gtest.h"
namespace rocksdb {
class TerarkZipMemtableTest : public testing::Test {};
TEST_F(TerarkZipMemtableTest, SimpleTest) {
std::shared_ptr<MemTable> mem_;
Options options;
options.memtable_factory =
std::shared_ptr<MemTableRepFactory>(NewPatriciaTrieRepFactory());
InternalKeyComparator cmp(BytewiseComparator());
ImmutableCFOptions ioptions(options);
WriteBufferManager wb(options.db_write_buffer_size);
mem_ = std::shared_ptr<MemTable>(
new MemTable(cmp, ioptions, MutableCFOptions(options),
/* needs_dup_key_check */ true, &wb, kMaxSequenceNumber,
0 /* column_family_id */));
// Run some basic tests
SequenceNumber seq = 123;
bool res;
res = mem_->Add(seq, kTypeValue, "key", "value2");
ASSERT_TRUE(res);
res = mem_->Add(seq, kTypeValue, "key", "value2");
ASSERT_FALSE(res);
// Changing the type should still cause the duplicatae key
res = mem_->Add(seq, kTypeMerge, "key", "value2");
ASSERT_FALSE(res);
// Changing the seq number will make the key fresh
res = mem_->Add(seq + 1, kTypeMerge, "key", "value2");
ASSERT_TRUE(res);
// Test with different types for duplicate keys
res = mem_->Add(seq, kTypeDeletion, "key", "");
ASSERT_FALSE(res);
res = mem_->Add(seq, kTypeSingleDeletion, "key", "");
ASSERT_FALSE(res);
}
// Test multi-threading insertion
TEST_F(TerarkZipMemtableTest, MultiThreadingTest) {
MemTable* mem_;
Options options;
options.memtable_factory =
std::shared_ptr<MemTableRepFactory>(NewPatriciaTrieRepFactory());
InternalKeyComparator cmp(BytewiseComparator());
ImmutableCFOptions ioptions(options);
WriteBufferManager wb(options.db_write_buffer_size);
mem_ = new MemTable(cmp, ioptions, MutableCFOptions(options),
/* needs_dup_key_check */ true, &wb, kMaxSequenceNumber,
0 /* column_family_id */);
size_t records = 1 << 20;
int thread_cnt = 10;
SequenceNumber seq = 0;
// Single Thread CSPPTrie
auto start = std::chrono::system_clock::now();
for (size_t i = 0; i < records; ++i) {
std::string key("key " + std::to_string(i));
std::string value("value " + std::to_string(i));
auto ret = mem_->Add(seq, kTypeValue, key, value);
ASSERT_TRUE(ret);
seq++;
}
auto end = std::chrono::system_clock::now();
auto dur =
std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
printf("[CSPPTrie] Single-Thread Time Cost: %" PRId64 ", mem_->size = %" PRId64 "\n",
dur, mem_->num_entries());
delete mem_;
// Single thread SkipList
options.memtable_factory =
std::shared_ptr<MemTableRepFactory>(new SkipListFactory());
ImmutableCFOptions ioptions2(options);
WriteBufferManager wb2(options.db_write_buffer_size);
mem_ = new MemTable(cmp, ioptions2, MutableCFOptions(options),true, &wb2, kMaxSequenceNumber, 0);
start = std::chrono::system_clock::now();
seq = 0;
for (size_t i = 0; i < records; ++i) {
std::string key("key " + std::to_string(i));
std::string value("value " + std::to_string(i));
auto ret = mem_->Add(seq, kTypeValue, key, value);
ASSERT_TRUE(ret);
seq++;
}
end = std::chrono::system_clock::now();
dur = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
printf("[SkipList] Single-Thread Time Cost: %" PRId64 ", mem_->size = %" PRId64 "\n",
dur, mem_->num_entries());
delete mem_;
// Multi Thread CSPPTrie
options.allow_concurrent_memtable_write = true;
options.memtable_factory =
std::shared_ptr<MemTableRepFactory>(NewPatriciaTrieRepFactory());
ImmutableCFOptions ioptions3(options);
WriteBufferManager wb3(options.db_write_buffer_size);
mem_ = new MemTable(cmp, ioptions3, MutableCFOptions(options), true, &wb3, kMaxSequenceNumber, 0);
std::vector<std::thread> threads;
// Each thread should has its own post_process_info
std::vector<MemTablePostProcessInfo> infos(thread_cnt);
std::atomic<SequenceNumber> atomic_seq{0};
start = std::chrono::system_clock::now();
for (int t = 0; t < thread_cnt; ++t) {
threads.emplace_back(std::thread([&, t]() {
int start = (records / thread_cnt) * t;
int end = (records / thread_cnt) * (t + 1);
for (size_t i = start; i < end; ++i) {
std::string key("key " + std::to_string(i));
std::string value("value " + std::to_string(i));
auto ret = mem_->Add(atomic_seq++, kTypeValue, key, value, true, &infos[t]);
ASSERT_TRUE(ret);
}
}));
}
for (auto& thread : threads) {
thread.join();
}
end = std::chrono::system_clock::now();
dur = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
uint64_t total_size = 0;
for(auto info : infos) {
total_size += info.num_entries;
}
printf("[CSPPTrie] Multi-Thread Time Cost: %" PRId64 ", mem_->size = %" PRId64 "\n", dur, total_size);
delete mem_;
// Multi Thread SkipList
options.allow_concurrent_memtable_write = true;
options.memtable_factory =
std::shared_ptr<MemTableRepFactory>(new SkipListFactory());
ImmutableCFOptions ioptions4(options);
WriteBufferManager wb4(options.db_write_buffer_size);
mem_ = new MemTable(cmp, ioptions4, MutableCFOptions(options), true, &wb4, kMaxSequenceNumber, 0);
threads.clear();
infos.clear();
infos.resize(thread_cnt);
atomic_seq = {0};
start = std::chrono::system_clock::now();
for (int t = 0; t < thread_cnt; ++t) {
threads.emplace_back(std::thread([&, t]() {
int start = (records / thread_cnt) * t;
int end = (records / thread_cnt) * (t + 1);
for (size_t i = start; i < end; ++i) {
std::string key("key " + std::to_string(i));
std::string value("value " + std::to_string(i));
auto ret = mem_->Add(atomic_seq++, kTypeValue, key, value, true, &infos[t]);
ASSERT_TRUE(ret);
}
}));
}
for (auto& thread : threads) {
thread.join();
}
end = std::chrono::system_clock::now();
dur = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
total_size = 0;
for(auto info : infos) {
total_size += info.num_entries;
}
printf("[SkipList] Multi-Thread Time Cost: %" PRId64 ", mem_->size = %" PRId64 "\n", dur, total_size);
delete mem_;
}
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册