memtable.cc 24.2 KB
Newer Older
1 2 3 4 5
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9 10
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/memtable.h"
J
Jim Paton 已提交
11 12

#include <memory>
13
#include <algorithm>
14
#include <limits>
J
Jim Paton 已提交
15

J
jorlow@chromium.org 已提交
16
#include "db/dbformat.h"
17
#include "db/merge_context.h"
18
#include "db/writebuffer.h"
19 20 21 22
#include "rocksdb/comparator.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
K
kailiu 已提交
23
#include "rocksdb/slice_transform.h"
24
#include "table/merger.h"
K
kailiu 已提交
25
#include "util/arena.h"
J
jorlow@chromium.org 已提交
26
#include "util/coding.h"
J
Jim Paton 已提交
27
#include "util/murmurhash.h"
K
Kai Liu 已提交
28
#include "util/mutexlock.h"
29
#include "util/perf_context_imp.h"
I
Igor Canadi 已提交
30
#include "util/statistics.h"
31
#include "util/stop_watch.h"
J
jorlow@chromium.org 已提交
32

33
namespace rocksdb {
J
jorlow@chromium.org 已提交
34

35
MemTableOptions::MemTableOptions(
L
Lei Jin 已提交
36 37
    const ImmutableCFOptions& ioptions,
    const MutableCFOptions& mutable_cf_options)
38 39 40 41 42
  : write_buffer_size(mutable_cf_options.write_buffer_size),
    arena_block_size(mutable_cf_options.arena_block_size),
    memtable_prefix_bloom_bits(mutable_cf_options.memtable_prefix_bloom_bits),
    memtable_prefix_bloom_probes(
        mutable_cf_options.memtable_prefix_bloom_probes),
L
Lei Jin 已提交
43
    memtable_prefix_bloom_huge_page_tlb_size(
44
        mutable_cf_options.memtable_prefix_bloom_huge_page_tlb_size),
L
Lei Jin 已提交
45 46 47
    inplace_update_support(ioptions.inplace_update_support),
    inplace_update_num_locks(mutable_cf_options.inplace_update_num_locks),
    inplace_callback(ioptions.inplace_callback),
48
    max_successive_merges(mutable_cf_options.max_successive_merges),
L
Lei Jin 已提交
49 50 51 52
    filter_deletes(mutable_cf_options.filter_deletes),
    statistics(ioptions.statistics),
    merge_operator(ioptions.merge_operator),
    info_log(ioptions.info_log) {}
L
Lei Jin 已提交
53 54 55

MemTable::MemTable(const InternalKeyComparator& cmp,
                   const ImmutableCFOptions& ioptions,
56 57
                   const MutableCFOptions& mutable_cf_options,
                   WriteBuffer* write_buffer)
J
jorlow@chromium.org 已提交
58
    : comparator_(cmp),
L
Lei Jin 已提交
59
      moptions_(ioptions, mutable_cf_options),
60
      refs_(0),
L
Lei Jin 已提交
61 62
      kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)),
      arena_(moptions_.arena_block_size),
63
      allocator_(&arena_, write_buffer),
L
Lei Jin 已提交
64
      table_(ioptions.memtable_factory->CreateMemTableRep(
65 66
          comparator_, &allocator_, ioptions.prefix_extractor,
          ioptions.info_log)),
67
      num_entries_(0),
68
      num_deletes_(0),
69 70
      flush_in_progress_(false),
      flush_completed_(false),
A
Abhishek Kona 已提交
71
      file_number_(0),
72
      first_seqno_(0),
73
      mem_next_logfile_number_(0),
74 75 76
      locks_(moptions_.inplace_update_support
                 ? moptions_.inplace_update_num_locks
                 : 0),
L
Lei Jin 已提交
77
      prefix_extractor_(ioptions.prefix_extractor),
I
Igor Canadi 已提交
78
      should_flush_(ShouldFlushNow()),
79 80
      flush_scheduled_(false),
      env_(ioptions.env) {
81 82 83
  // if should_flush_ == true without an entry inserted, something must have
  // gone wrong already.
  assert(!should_flush_);
L
Lei Jin 已提交
84
  if (prefix_extractor_ && moptions_.memtable_prefix_bloom_bits > 0) {
85
    prefix_bloom_.reset(new DynamicBloom(
86
        &allocator_,
L
Lei Jin 已提交
87 88 89
        moptions_.memtable_prefix_bloom_bits, ioptions.bloom_locality,
        moptions_.memtable_prefix_bloom_probes, nullptr,
        moptions_.memtable_prefix_bloom_huge_page_tlb_size,
L
Lei Jin 已提交
90
        ioptions.info_log));
91 92
  }
}
J
jorlow@chromium.org 已提交
93

I
Igor Canadi 已提交
94
MemTable::~MemTable() { assert(refs_ == 0); }
J
jorlow@chromium.org 已提交
95

J
Jim Paton 已提交
96
size_t MemTable::ApproximateMemoryUsage() {
97 98 99 100 101 102 103 104 105 106
  size_t arena_usage = arena_.ApproximateMemoryUsage();
  size_t table_usage = table_->ApproximateMemoryUsage();
  // let MAX_USAGE =  std::numeric_limits<size_t>::max()
  // then if arena_usage + total_usage >= MAX_USAGE, return MAX_USAGE.
  // the following variation is to avoid numeric overflow.
  if (arena_usage >= std::numeric_limits<size_t>::max() - table_usage) {
    return std::numeric_limits<size_t>::max();
  }
  // otherwise, return the actual usage
  return arena_usage + table_usage;
J
Jim Paton 已提交
107
}
J
jorlow@chromium.org 已提交
108

109 110 111 112 113 114 115 116 117 118 119 120 121 122
bool MemTable::ShouldFlushNow() const {
  // In a lot of times, we cannot allocate arena blocks that exactly matches the
  // buffer size. Thus we have to decide if we should over-allocate or
  // under-allocate.
  // This constant avariable can be interpreted as: if we still have more than
  // "kAllowOverAllocationRatio * kArenaBlockSize" space left, we'd try to over
  // allocate one more block.
  const double kAllowOverAllocationRatio = 0.6;

  // If arena still have room for new block allocation, we can safely say it
  // shouldn't flush.
  auto allocated_memory =
      table_->ApproximateMemoryUsage() + arena_.MemoryAllocatedBytes();

123 124 125
  // if we can still allocate one more block without exceeding the
  // over-allocation ratio, then we should not flush.
  if (allocated_memory + kArenaBlockSize <
L
Lei Jin 已提交
126 127
      moptions_.write_buffer_size +
      kArenaBlockSize * kAllowOverAllocationRatio) {
128 129 130
    return false;
  }

L
Lei Jin 已提交
131 132 133 134 135
  // if user keeps adding entries that exceeds moptions.write_buffer_size,
  // we need to flush earlier even though we still have much available
  // memory left.
  if (allocated_memory > moptions_.write_buffer_size +
      kArenaBlockSize * kAllowOverAllocationRatio) {
136 137 138 139 140
    return true;
  }

  // In this code path, Arena has already allocated its "last block", which
  // means the total allocatedmemory size is either:
141
  //  (1) "moderately" over allocated the memory (no more than `0.6 * arena
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
  // block size`. Or,
  //  (2) the allocated memory is less than write buffer size, but we'll stop
  // here since if we allocate a new arena block, we'll over allocate too much
  // more (half of the arena block size) memory.
  //
  // In either case, to avoid over-allocate, the last block will stop allocation
  // when its usage reaches a certain ratio, which we carefully choose "0.75
  // full" as the stop condition because it addresses the following issue with
  // great simplicity: What if the next inserted entry's size is
  // bigger than AllocatedAndUnused()?
  //
  // The answer is: if the entry size is also bigger than 0.25 *
  // kArenaBlockSize, a dedicated block will be allocated for it; otherwise
  // arena will anyway skip the AllocatedAndUnused() and allocate a new, empty
  // and regular block. In either case, we *overly* over-allocated.
  //
  // Therefore, setting the last block to be at most "0.75 full" avoids both
  // cases.
  //
  // NOTE: the average percentage of waste space of this approach can be counted
  // as: "arena block size * 0.25 / write buffer size". User who specify a small
  // write buffer size and/or big arena block size may suffer.
  return arena_.AllocatedAndUnused() < kArenaBlockSize / 4;
}

167 168 169 170 171 172 173 174 175 176
int MemTable::KeyComparator::operator()(const char* prefix_len_key1,
                                        const char* prefix_len_key2) const {
  // Internal keys are encoded as length-prefixed strings.
  Slice k1 = GetLengthPrefixedSlice(prefix_len_key1);
  Slice k2 = GetLengthPrefixedSlice(prefix_len_key2);
  return comparator.Compare(k1, k2);
}

int MemTable::KeyComparator::operator()(const char* prefix_len_key,
                                        const Slice& key)
J
jorlow@chromium.org 已提交
177 178
    const {
  // Internal keys are encoded as length-prefixed strings.
179 180
  Slice a = GetLengthPrefixedSlice(prefix_len_key);
  return comparator.Compare(a, key);
J
jorlow@chromium.org 已提交
181 182
}

J
Jim Paton 已提交
183 184 185 186 187
Slice MemTableRep::UserKey(const char* key) const {
  Slice slice = GetLengthPrefixedSlice(key);
  return Slice(slice.data(), slice.size() - 8);
}

188
KeyHandle MemTableRep::Allocate(const size_t len, char** buf) {
189
  *buf = allocator_->Allocate(len);
190 191 192
  return static_cast<KeyHandle>(*buf);
}

J
jorlow@chromium.org 已提交
193 194 195
// Encode a suitable internal key target for "target" and return it.
// Uses *scratch as scratch space, and the returned pointer will point
// into this scratch space.
196
const char* EncodeKey(std::string* scratch, const Slice& target) {
J
jorlow@chromium.org 已提交
197
  scratch->clear();
198
  PutVarint32(scratch, static_cast<uint32_t>(target.size()));
J
jorlow@chromium.org 已提交
199 200 201 202 203 204
  scratch->append(target.data(), target.size());
  return scratch->data();
}

class MemTableIterator: public Iterator {
 public:
205
  MemTableIterator(
L
Lei Jin 已提交
206
      const MemTable& mem, const ReadOptions& read_options, Arena* arena)
207 208
      : bloom_(nullptr),
        prefix_extractor_(mem.prefix_extractor_),
209 210
        valid_(false),
        arena_mode_(arena != nullptr) {
L
Lei Jin 已提交
211
    if (prefix_extractor_ != nullptr && !read_options.total_order_seek) {
212
      bloom_ = mem.prefix_bloom_.get();
213
      iter_ = mem.table_->GetDynamicPrefixIterator(arena);
214
    } else {
215 216 217 218 219 220 221 222 223
      iter_ = mem.table_->GetIterator(arena);
    }
  }

  ~MemTableIterator() {
    if (arena_mode_) {
      iter_->~Iterator();
    } else {
      delete iter_;
224 225
    }
  }
J
Jim Paton 已提交
226

I
Igor Sugak 已提交
227 228
  virtual bool Valid() const override { return valid_; }
  virtual void Seek(const Slice& k) override {
I
Igor Canadi 已提交
229 230
    PERF_TIMER_GUARD(seek_on_memtable_time);
    PERF_COUNTER_ADD(seek_on_memtable_count, 1);
231 232
    if (bloom_ != nullptr &&
        !bloom_->MayContain(prefix_extractor_->Transform(ExtractUserKey(k)))) {
233 234 235 236 237 238
      valid_ = false;
      return;
    }
    iter_->Seek(k, nullptr);
    valid_ = iter_->Valid();
  }
I
Igor Sugak 已提交
239
  virtual void SeekToFirst() override {
240 241 242
    iter_->SeekToFirst();
    valid_ = iter_->Valid();
  }
I
Igor Sugak 已提交
243
  virtual void SeekToLast() override {
244 245 246
    iter_->SeekToLast();
    valid_ = iter_->Valid();
  }
I
Igor Sugak 已提交
247
  virtual void Next() override {
248 249 250 251
    assert(Valid());
    iter_->Next();
    valid_ = iter_->Valid();
  }
I
Igor Sugak 已提交
252
  virtual void Prev() override {
253 254 255 256
    assert(Valid());
    iter_->Prev();
    valid_ = iter_->Valid();
  }
I
Igor Sugak 已提交
257
  virtual Slice key() const override {
258
    assert(Valid());
J
Jim Paton 已提交
259 260
    return GetLengthPrefixedSlice(iter_->key());
  }
I
Igor Sugak 已提交
261
  virtual Slice value() const override {
262
    assert(Valid());
J
Jim Paton 已提交
263
    Slice key_slice = GetLengthPrefixedSlice(iter_->key());
J
jorlow@chromium.org 已提交
264 265 266
    return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
  }

I
Igor Sugak 已提交
267
  virtual Status status() const override { return Status::OK(); }
J
jorlow@chromium.org 已提交
268 269

 private:
270 271
  DynamicBloom* bloom_;
  const SliceTransform* const prefix_extractor_;
272
  MemTableRep::Iterator* iter_;
273
  bool valid_;
274
  bool arena_mode_;
J
jorlow@chromium.org 已提交
275 276 277 278 279 280

  // No copying allowed
  MemTableIterator(const MemTableIterator&);
  void operator=(const MemTableIterator&);
};

L
Lei Jin 已提交
281
Iterator* MemTable::NewIterator(const ReadOptions& read_options, Arena* arena) {
282 283
  assert(arena != nullptr);
  auto mem = arena->AllocateAligned(sizeof(MemTableIterator));
L
Lei Jin 已提交
284
  return new (mem) MemTableIterator(*this, read_options, arena);
J
jorlow@chromium.org 已提交
285 286
}

287
port::RWMutex* MemTable::GetLock(const Slice& key) {
K
kailiu 已提交
288 289
  static murmur_hash hash;
  return &locks_[hash(key) % locks_.size()];
290 291
}

J
jorlow@chromium.org 已提交
292
void MemTable::Add(SequenceNumber s, ValueType type,
293
                   const Slice& key, /* user key */
J
jorlow@chromium.org 已提交
294 295 296 297 298 299
                   const Slice& value) {
  // Format of an entry is concatenation of:
  //  key_size     : varint32 of internal_key.size()
  //  key bytes    : char[internal_key.size()]
  //  value_size   : varint32 of value.size()
  //  value bytes  : char[value.size()]
300 301 302 303 304 305
  uint32_t key_size = static_cast<uint32_t>(key.size());
  uint32_t val_size = static_cast<uint32_t>(value.size());
  uint32_t internal_key_size = key_size + 8;
  const uint32_t encoded_len = VarintLength(internal_key_size) +
                               internal_key_size + VarintLength(val_size) +
                               val_size;
306 307 308
  char* buf = nullptr;
  KeyHandle handle = table_->Allocate(encoded_len, &buf);
  assert(buf != nullptr);
J
jorlow@chromium.org 已提交
309 310 311 312 313 314 315
  char* p = EncodeVarint32(buf, internal_key_size);
  memcpy(p, key.data(), key_size);
  p += key_size;
  EncodeFixed64(p, (s << 8) | type);
  p += 8;
  p = EncodeVarint32(p, val_size);
  memcpy(p, value.data(), val_size);
I
Igor Canadi 已提交
316
  assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len);
317
  table_->Insert(handle);
318
  num_entries_++;
319 320 321
  if (type == kTypeDeletion) {
    num_deletes_++;
  }
322

323 324 325 326 327
  if (prefix_bloom_) {
    assert(prefix_extractor_);
    prefix_bloom_->Add(prefix_extractor_->Transform(key));
  }

328 329 330 331 332
  // The first sequence number inserted into the memtable
  assert(first_seqno_ == 0 || s > first_seqno_);
  if (first_seqno_ == 0) {
    first_seqno_ = s;
  }
333 334

  should_flush_ = ShouldFlushNow();
J
jorlow@chromium.org 已提交
335 336
}

337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
// Callback from MemTable::Get()
namespace {

struct Saver {
  Status* status;
  const LookupKey* key;
  bool* found_final_value;  // Is value set correctly? Used by KeyMayExist
  bool* merge_in_progress;
  std::string* value;
  const MergeOperator* merge_operator;
  // the merge operations encountered;
  MergeContext* merge_context;
  MemTable* mem;
  Logger* logger;
  Statistics* statistics;
  bool inplace_update_support;
353
  Env* env_;
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387
};
}  // namespace

static bool SaveValue(void* arg, const char* entry) {
  Saver* s = reinterpret_cast<Saver*>(arg);
  MergeContext* merge_context = s->merge_context;
  const MergeOperator* merge_operator = s->merge_operator;

  assert(s != nullptr && merge_context != nullptr);

  // entry format is:
  //    klength  varint32
  //    userkey  char[klength-8]
  //    tag      uint64
  //    vlength  varint32
  //    value    char[vlength]
  // Check that it belongs to same user key.  We do not check the
  // sequence number since the Seek() call above should have skipped
  // all entries with overly large sequence numbers.
  uint32_t key_length;
  const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
  if (s->mem->GetInternalKeyComparator().user_comparator()->Compare(
          Slice(key_ptr, key_length - 8), s->key->user_key()) == 0) {
    // Correct user key
    const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
    switch (static_cast<ValueType>(tag & 0xff)) {
      case kTypeValue: {
        if (s->inplace_update_support) {
          s->mem->GetLock(s->key->user_key())->ReadLock();
        }
        Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
        *(s->status) = Status::OK();
        if (*(s->merge_in_progress)) {
          assert(merge_operator);
388 389 390 391 392 393 394 395 396 397 398
          bool merge_success = false;
          {
            StopWatchNano timer(s->env_, s->statistics != nullptr);
            PERF_TIMER_GUARD(merge_operator_time_nanos);
            merge_success = merge_operator->FullMerge(
                s->key->user_key(), &v, merge_context->GetOperands(), s->value,
                s->logger);
            RecordTick(s->statistics, MERGE_OPERATION_TOTAL_TIME,
                       timer.ElapsedNanos());
          }
          if (!merge_success) {
399 400 401 402 403 404 405 406
            RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
            *(s->status) =
                Status::Corruption("Error: Could not perform merge.");
          }
        } else {
          s->value->assign(v.data(), v.size());
        }
        if (s->inplace_update_support) {
407
          s->mem->GetLock(s->key->user_key())->ReadUnlock();
408 409 410 411 412 413 414 415
        }
        *(s->found_final_value) = true;
        return false;
      }
      case kTypeDeletion: {
        if (*(s->merge_in_progress)) {
          assert(merge_operator);
          *(s->status) = Status::OK();
416 417 418 419 420 421 422 423 424 425 426
          bool merge_success = false;
          {
            StopWatchNano timer(s->env_, s->statistics != nullptr);
            PERF_TIMER_GUARD(merge_operator_time_nanos);
            merge_success = merge_operator->FullMerge(
                s->key->user_key(), nullptr, merge_context->GetOperands(),
                s->value, s->logger);
            RecordTick(s->statistics, MERGE_OPERATION_TOTAL_TIME,
                       timer.ElapsedNanos());
          }
          if (!merge_success) {
427 428 429 430 431 432 433 434 435 436 437
            RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
            *(s->status) =
                Status::Corruption("Error: Could not perform merge.");
          }
        } else {
          *(s->status) = Status::NotFound();
        }
        *(s->found_final_value) = true;
        return false;
      }
      case kTypeMerge: {
438 439 440 441
        if (!merge_operator) {
          *(s->status) = Status::InvalidArgument(
              "merge_operator is not properly initialized.");
          // Normally we continue the loop (return true) when we see a merge
442
          // operand.  But in case of an error, we should stop the loop
443 444 445 446 447
          // immediately and pretend we have found the value to stop further
          // seek.  Otherwise, the later call will override this error status.
          *(s->found_final_value) = true;
          return false;
        }
448 449 450 451 452 453 454 455 456 457 458 459 460 461 462
        Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
        *(s->merge_in_progress) = true;
        merge_context->PushOperand(v);
        return true;
      }
      default:
        assert(false);
        return true;
    }
  }

  // s->state could be Corrupt, merge or notfound
  return false;
}

463
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
464
                   MergeContext* merge_context) {
465
  // The sequence number is updated synchronously in version_set.h
466
  if (IsEmpty()) {
467 468 469
    // Avoiding recording stats for speed.
    return false;
  }
470
  PERF_TIMER_GUARD(get_from_memtable_time);
471

472
  Slice user_key = key.user_key();
473 474
  bool found_final_value = false;
  bool merge_in_progress = s->IsMergeInProgress();
475 476 477 478 479

  if (prefix_bloom_ &&
      !prefix_bloom_->MayContain(prefix_extractor_->Transform(user_key))) {
    // iter is null if prefix bloom says the key does not exist
  } else {
480 481 482 483 484 485 486 487
    Saver saver;
    saver.status = s;
    saver.found_final_value = &found_final_value;
    saver.merge_in_progress = &merge_in_progress;
    saver.key = &key;
    saver.value = value;
    saver.status = s;
    saver.mem = this;
488
    saver.merge_context = merge_context;
L
Lei Jin 已提交
489 490
    saver.merge_operator = moptions_.merge_operator;
    saver.logger = moptions_.info_log;
L
Lei Jin 已提交
491
    saver.inplace_update_support = moptions_.inplace_update_support;
L
Lei Jin 已提交
492
    saver.statistics = moptions_.statistics;
493
    saver.env_ = env_;
494
    table_->Get(key, &saver, SaveValue);
495
  }
496

497
  // No change to value, since we have not yet found a Put/Delete
498
  if (!found_final_value && merge_in_progress) {
499 500
    *s = Status::MergeInProgress("");
  }
L
Lei Jin 已提交
501
  PERF_COUNTER_ADD(get_from_memtable_count, 1);
502
  return found_final_value;
503 504
}

505
void MemTable::Update(SequenceNumber seq,
506 507 508
                      const Slice& key,
                      const Slice& value) {
  LookupKey lkey(key, seq);
509
  Slice mem_key = lkey.memtable_key();
510

511
  std::unique_ptr<MemTableRep::Iterator> iter(
512
      table_->GetDynamicPrefixIterator());
513
  iter->Seek(lkey.internal_key(), mem_key.data());
514 515 516

  if (iter->Valid()) {
    // entry format is:
517
    //    key_length  varint32
518 519 520 521 522 523 524 525
    //    userkey  char[klength-8]
    //    tag      uint64
    //    vlength  varint32
    //    value    char[vlength]
    // Check that it belongs to same user key.  We do not check the
    // sequence number since the Seek() call above should have skipped
    // all entries with overly large sequence numbers.
    const char* entry = iter->key();
K
Kai Liu 已提交
526
    uint32_t key_length = 0;
527 528 529 530 531 532 533
    const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
    if (comparator_.comparator.user_comparator()->Compare(
        Slice(key_ptr, key_length - 8), lkey.user_key()) == 0) {
      // Correct user key
      const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
      switch (static_cast<ValueType>(tag & 0xff)) {
        case kTypeValue: {
534
          Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
535 536
          uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
          uint32_t new_size = static_cast<uint32_t>(value.size());
537

538 539
          // Update value, if new value size  <= previous value size
          if (new_size <= prev_size ) {
540
            char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
541
                                     new_size);
542 543
            WriteLock wl(GetLock(lkey.user_key()));
            memcpy(p, value.data(), value.size());
I
Igor Canadi 已提交
544 545 546
            assert((unsigned)((p + value.size()) - entry) ==
                   (unsigned)(VarintLength(key_length) + key_length +
                              VarintLength(value.size()) + value.size()));
547
            return;
548 549 550 551
          }
        }
        default:
          // If the latest value is kTypeDeletion, kTypeMerge or kTypeLogData
552 553 554
          // we don't have enough space for update inplace
            Add(seq, kTypeValue, key, value);
            return;
555 556 557 558
      }
    }
  }

559 560 561 562 563
  // key doesn't exist
  Add(seq, kTypeValue, key, value);
}

bool MemTable::UpdateCallback(SequenceNumber seq,
564
                              const Slice& key,
L
Lei Jin 已提交
565
                              const Slice& delta) {
566 567 568
  LookupKey lkey(key, seq);
  Slice memkey = lkey.memtable_key();

L
Lei Jin 已提交
569
  std::unique_ptr<MemTableRep::Iterator> iter(
570
      table_->GetDynamicPrefixIterator());
571
  iter->Seek(lkey.internal_key(), memkey.data());
572 573 574 575 576 577 578 579 580 581 582 583

  if (iter->Valid()) {
    // entry format is:
    //    key_length  varint32
    //    userkey  char[klength-8]
    //    tag      uint64
    //    vlength  varint32
    //    value    char[vlength]
    // Check that it belongs to same user key.  We do not check the
    // sequence number since the Seek() call above should have skipped
    // all entries with overly large sequence numbers.
    const char* entry = iter->key();
K
Kai Liu 已提交
584
    uint32_t key_length = 0;
585 586 587 588 589 590 591 592
    const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
    if (comparator_.comparator.user_comparator()->Compare(
        Slice(key_ptr, key_length - 8), lkey.user_key()) == 0) {
      // Correct user key
      const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
      switch (static_cast<ValueType>(tag & 0xff)) {
        case kTypeValue: {
          Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
593
          uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
594 595

          char* prev_buffer = const_cast<char*>(prev_value.data());
596
          uint32_t new_prev_size = prev_size;
597 598

          std::string str_value;
599
          WriteLock wl(GetLock(lkey.user_key()));
L
Lei Jin 已提交
600 601
          auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size,
                                                   delta, &str_value);
602
          if (status == UpdateStatus::UPDATED_INPLACE) {
603
            // Value already updated by callback.
604 605 606 607 608 609 610 611 612 613
            assert(new_prev_size <= prev_size);
            if (new_prev_size < prev_size) {
              // overwrite the new prev_size
              char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
                                       new_prev_size);
              if (VarintLength(new_prev_size) < VarintLength(prev_size)) {
                // shift the value buffer as well.
                memcpy(p, prev_buffer, new_prev_size);
              }
            }
L
Lei Jin 已提交
614
            RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
615
            should_flush_ = ShouldFlushNow();
616
            return true;
617 618
          } else if (status == UpdateStatus::UPDATED) {
            Add(seq, kTypeValue, key, Slice(str_value));
L
Lei Jin 已提交
619
            RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN);
620
            should_flush_ = ShouldFlushNow();
621
            return true;
622 623
          } else if (status == UpdateStatus::UPDATE_FAILED) {
            // No action required. Return.
624
            should_flush_ = ShouldFlushNow();
625 626 627 628 629 630 631 632 633 634
            return true;
          }
        }
        default:
          break;
      }
    }
  }
  // If the latest value is not kTypeValue
  // or key doesn't exist
635 636
  return false;
}
637 638 639 640 641 642 643

size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) {
  Slice memkey = key.memtable_key();

  // A total ordered iterator is costly for some memtablerep (prefix aware
  // reps). By passing in the user key, we allow efficient iterator creation.
  // The iterator only needs to be ordered within the same user key.
644
  std::unique_ptr<MemTableRep::Iterator> iter(
645
      table_->GetDynamicPrefixIterator());
646
  iter->Seek(key.internal_key(), memkey.data());
647 648 649 650 651

  size_t num_successive_merges = 0;

  for (; iter->Valid(); iter->Next()) {
    const char* entry = iter->key();
K
Kai Liu 已提交
652
    uint32_t key_length = 0;
653
    const char* iter_key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
I
Igor Canadi 已提交
654 655
    if (comparator_.comparator.user_comparator()->Compare(
            Slice(iter_key_ptr, key_length - 8), key.user_key()) != 0) {
656 657 658 659 660 661 662 663 664 665 666 667 668 669
      break;
    }

    const uint64_t tag = DecodeFixed64(iter_key_ptr + key_length - 8);
    if (static_cast<ValueType>(tag & 0xff) != kTypeMerge) {
      break;
    }

    ++num_successive_merges;
  }

  return num_successive_merges;
}

670 671
void MemTableRep::Get(const LookupKey& k, void* callback_args,
                      bool (*callback_func)(void* arg, const char* entry)) {
672
  auto iter = GetDynamicPrefixIterator();
673 674 675 676 677 678
  for (iter->Seek(k.internal_key(), k.memtable_key().data());
       iter->Valid() && callback_func(callback_args, iter->key());
       iter->Next()) {
  }
}

679
}  // namespace rocksdb