提交 30701dc1 编写于 作者: R rockeet

memtable.cc: Refactory & minor bugfix

上级 dae84261
......@@ -36,6 +36,7 @@
#include "util/memory_usage.h"
#include "util/murmurhash.h"
#include "util/mutexlock.h"
#include "util/util.h"
namespace rocksdb {
......@@ -117,10 +118,10 @@ MemTableRep* MemTableRepFactory::CreateMemTableRep(
const MemTableRep::KeyComparator& key_cmp, Allocator* allocator,
const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
uint32_t /* column_family_id */) {
uint32_t column_family_id) {
return CreateMemTableRep(key_cmp, allocator,
mutable_cf_options.prefix_extractor.get(),
ioptions.info_log);
ioptions.info_log, column_family_id);
}
MemTable::~MemTable() {
......@@ -679,7 +680,7 @@ static bool SaveValue(void* arg, const MemTableRep::KeyValuePair* pair) {
*(s->found_final_value) = true;
return false;
}
// intentional fallthrough
FALLTHROUGH_INTENDED;
case kTypeValue: {
if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadLock();
......@@ -819,7 +820,8 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
return found_final_value;
}
void MemTable::Update(SequenceNumber seq, const Slice& key,
void MemTable::Update(SequenceNumber seq,
const Slice& key,
const Slice& value) {
LookupKey lkey(key, seq);
Slice mem_key = lkey.memtable_key();
......@@ -831,8 +833,8 @@ void MemTable::Update(SequenceNumber seq, const Slice& key,
if (iter->Valid()) {
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
Slice internal_key, prev_value;
std::tie(internal_key, prev_value) = iter->GetKeyValue();
Slice internal_key, old_value;
std::tie(internal_key, old_value) = iter->GetKeyValue();
if (comparator_.comparator.user_comparator()->Equal(
ExtractUserKey(internal_key), lkey.user_key())) {
// Correct user key
......@@ -842,13 +844,13 @@ void MemTable::Update(SequenceNumber seq, const Slice& key,
SequenceNumber unused;
UnPackSequenceAndType(tag, &unused, &type);
if (type == kTypeValue) {
uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
uint32_t old_size = static_cast<uint32_t>(old_value.size());
uint32_t new_size = static_cast<uint32_t>(value.size());
// Update value, if new value size <= previous value size
if (new_size <= prev_size) {
// Update value, if new value size <= old value size
if (new_size <= old_size) {
char* p =
const_cast<char*>(prev_value.data()) - VarintLength(prev_size);
const_cast<char*>(old_value.data()) - VarintLength(old_size);
p = EncodeVarint32(p, new_size);
WriteLock wl(GetLock(lkey.user_key()));
memcpy(p, value.data(), value.size());
......@@ -863,7 +865,8 @@ void MemTable::Update(SequenceNumber seq, const Slice& key,
Add(seq, kTypeValue, key, value);
}
bool MemTable::UpdateCallback(SequenceNumber seq, const Slice& key,
bool MemTable::UpdateCallback(SequenceNumber seq,
const Slice& key,
const Slice& delta) {
LookupKey lkey(key, seq);
Slice memkey = lkey.memtable_key();
......@@ -876,8 +879,8 @@ bool MemTable::UpdateCallback(SequenceNumber seq, const Slice& key,
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
Slice internal_key, prev_value;
std::tie(internal_key, prev_value) = iter->GetKeyValue();
Slice internal_key, old_value;
std::tie(internal_key, old_value) = iter->GetKeyValue();
if (comparator_.comparator.user_comparator()->Equal(
ExtractUserKey(internal_key), lkey.user_key())) {
// Correct user key
......@@ -888,33 +891,33 @@ bool MemTable::UpdateCallback(SequenceNumber seq, const Slice& key,
UnPackSequenceAndType(tag, &unused, &type);
switch (type) {
case kTypeValue: {
uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
uint32_t old_size = static_cast<uint32_t>(old_value.size());
char* prev_buffer = const_cast<char*>(prev_value.data());
uint32_t new_prev_size = prev_size;
char* old_buffer = const_cast<char*>(old_value.data());
uint32_t new_inplace_size = old_size;
std::string str_value;
std::string merged_value;
WriteLock wl(GetLock(lkey.user_key()));
auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size,
delta, &str_value);
auto status = moptions_.inplace_callback(old_buffer, &new_inplace_size,
delta, &merged_value);
if (status == UpdateStatus::UPDATED_INPLACE) {
// Value already updated by callback.
assert(new_prev_size <= prev_size);
if (new_prev_size < prev_size) {
// overwrite the new prev_size
assert(new_inplace_size <= old_size);
if (new_inplace_size < old_size) {
// overwrite the new inplace size
char* p =
const_cast<char*>(prev_value.data()) - VarintLength(prev_size);
p = EncodeVarint32(p, new_prev_size);
if (p < prev_buffer) {
const_cast<char*>(old_value.data()) - VarintLength(old_size);
p = EncodeVarint32(p, new_inplace_size);
if (p < old_buffer) {
// shift the value buffer as well.
memmove(p, prev_buffer, new_prev_size);
memmove(p, old_buffer, new_inplace_size);
}
}
RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
UpdateFlushState();
return true;
} else if (status == UpdateStatus::UPDATED) {
Add(seq, kTypeValue, key, Slice(str_value));
Add(seq, kTypeValue, key, Slice(merged_value));
RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN);
UpdateFlushState();
return true;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册