memtable.cc 22.2 KB
Newer Older
1 2 3 4 5
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9 10
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/memtable.h"
J
Jim Paton 已提交
11 12

#include <memory>
13
#include <algorithm>
14
#include <limits>
J
Jim Paton 已提交
15

J
jorlow@chromium.org 已提交
16
#include "db/dbformat.h"
17
#include "db/merge_context.h"
18 19 20 21
#include "rocksdb/comparator.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
K
kailiu 已提交
22
#include "rocksdb/slice_transform.h"
23
#include "table/merger.h"
K
kailiu 已提交
24
#include "util/arena.h"
J
jorlow@chromium.org 已提交
25
#include "util/coding.h"
J
Jim Paton 已提交
26
#include "util/murmurhash.h"
K
Kai Liu 已提交
27
#include "util/mutexlock.h"
28
#include "util/perf_context_imp.h"
I
Igor Canadi 已提交
29
#include "util/statistics.h"
30
#include "util/stop_watch.h"
J
jorlow@chromium.org 已提交
31

32
namespace rocksdb {
J
jorlow@chromium.org 已提交
33

34
MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options)
J
jorlow@chromium.org 已提交
35
    : comparator_(cmp),
36
      refs_(0),
37 38
      kArenaBlockSize(OptimizeBlockSize(options.arena_block_size)),
      kWriteBufferSize(options.write_buffer_size),
K
kailiu 已提交
39
      arena_(options.arena_block_size),
40
      table_(options.memtable_factory->CreateMemTableRep(
41 42
          comparator_, &arena_, options.prefix_extractor.get(),
          options.info_log.get())),
43
      num_entries_(0),
44 45
      flush_in_progress_(false),
      flush_completed_(false),
A
Abhishek Kona 已提交
46
      file_number_(0),
47
      first_seqno_(0),
48
      mem_next_logfile_number_(0),
49
      locks_(options.inplace_update_support ? options.inplace_update_num_locks
K
kailiu 已提交
50
                                            : 0),
51 52 53 54 55
      prefix_extractor_(options.prefix_extractor.get()),
      should_flush_(ShouldFlushNow()) {
  // if should_flush_ == true without an entry inserted, something must have
  // gone wrong already.
  assert(!should_flush_);
K
kailiu 已提交
56
  if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) {
57
    prefix_bloom_.reset(new DynamicBloom(
58
        &arena_,
59 60
        options.memtable_prefix_bloom_bits, options.bloom_locality,
        options.memtable_prefix_bloom_probes, nullptr,
61 62
        options.memtable_prefix_bloom_huge_page_tlb_size,
        options.info_log.get()));
63 64
  }
}
J
jorlow@chromium.org 已提交
65 66

MemTable::~MemTable() {
67
  assert(refs_ == 0);
J
jorlow@chromium.org 已提交
68 69
}

J
Jim Paton 已提交
70
size_t MemTable::ApproximateMemoryUsage() {
71 72 73 74 75 76 77 78 79 80
  size_t arena_usage = arena_.ApproximateMemoryUsage();
  size_t table_usage = table_->ApproximateMemoryUsage();
  // let MAX_USAGE =  std::numeric_limits<size_t>::max()
  // then if arena_usage + total_usage >= MAX_USAGE, return MAX_USAGE.
  // the following variation is to avoid numeric overflow.
  if (arena_usage >= std::numeric_limits<size_t>::max() - table_usage) {
    return std::numeric_limits<size_t>::max();
  }
  // otherwise, return the actual usage
  return arena_usage + table_usage;
J
Jim Paton 已提交
81
}
J
jorlow@chromium.org 已提交
82

83 84 85 86 87 88 89 90 91 92 93 94 95 96
bool MemTable::ShouldFlushNow() const {
  // In a lot of times, we cannot allocate arena blocks that exactly matches the
  // buffer size. Thus we have to decide if we should over-allocate or
  // under-allocate.
  // This constant avariable can be interpreted as: if we still have more than
  // "kAllowOverAllocationRatio * kArenaBlockSize" space left, we'd try to over
  // allocate one more block.
  const double kAllowOverAllocationRatio = 0.6;

  // If arena still have room for new block allocation, we can safely say it
  // shouldn't flush.
  auto allocated_memory =
      table_->ApproximateMemoryUsage() + arena_.MemoryAllocatedBytes();

97 98 99 100
  // if we can still allocate one more block without exceeding the
  // over-allocation ratio, then we should not flush.
  if (allocated_memory + kArenaBlockSize <
      kWriteBufferSize + kArenaBlockSize * kAllowOverAllocationRatio) {
101 102 103 104
    return false;
  }

  // if user keeps adding entries that exceeds kWriteBufferSize, we need to
105 106 107
  // flush earlier even though we still have much available memory left.
  if (allocated_memory >
      kWriteBufferSize + kArenaBlockSize * kAllowOverAllocationRatio) {
108 109 110 111 112
    return true;
  }

  // In this code path, Arena has already allocated its "last block", which
  // means the total allocatedmemory size is either:
113
  //  (1) "moderately" over allocated the memory (no more than `0.6 * arena
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
  // block size`. Or,
  //  (2) the allocated memory is less than write buffer size, but we'll stop
  // here since if we allocate a new arena block, we'll over allocate too much
  // more (half of the arena block size) memory.
  //
  // In either case, to avoid over-allocate, the last block will stop allocation
  // when its usage reaches a certain ratio, which we carefully choose "0.75
  // full" as the stop condition because it addresses the following issue with
  // great simplicity: What if the next inserted entry's size is
  // bigger than AllocatedAndUnused()?
  //
  // The answer is: if the entry size is also bigger than 0.25 *
  // kArenaBlockSize, a dedicated block will be allocated for it; otherwise
  // arena will anyway skip the AllocatedAndUnused() and allocate a new, empty
  // and regular block. In either case, we *overly* over-allocated.
  //
  // Therefore, setting the last block to be at most "0.75 full" avoids both
  // cases.
  //
  // NOTE: the average percentage of waste space of this approach can be counted
  // as: "arena block size * 0.25 / write buffer size". User who specify a small
  // write buffer size and/or big arena block size may suffer.
  return arena_.AllocatedAndUnused() < kArenaBlockSize / 4;
}

139 140 141 142 143 144 145 146 147 148
int MemTable::KeyComparator::operator()(const char* prefix_len_key1,
                                        const char* prefix_len_key2) const {
  // Internal keys are encoded as length-prefixed strings.
  Slice k1 = GetLengthPrefixedSlice(prefix_len_key1);
  Slice k2 = GetLengthPrefixedSlice(prefix_len_key2);
  return comparator.Compare(k1, k2);
}

int MemTable::KeyComparator::operator()(const char* prefix_len_key,
                                        const Slice& key)
J
jorlow@chromium.org 已提交
149 150
    const {
  // Internal keys are encoded as length-prefixed strings.
151 152
  Slice a = GetLengthPrefixedSlice(prefix_len_key);
  return comparator.Compare(a, key);
J
jorlow@chromium.org 已提交
153 154
}

J
Jim Paton 已提交
155 156 157 158 159
Slice MemTableRep::UserKey(const char* key) const {
  Slice slice = GetLengthPrefixedSlice(key);
  return Slice(slice.data(), slice.size() - 8);
}

160 161 162 163 164
KeyHandle MemTableRep::Allocate(const size_t len, char** buf) {
  *buf = arena_->Allocate(len);
  return static_cast<KeyHandle>(*buf);
}

J
jorlow@chromium.org 已提交
165 166 167
// Encode a suitable internal key target for "target" and return it.
// Uses *scratch as scratch space, and the returned pointer will point
// into this scratch space.
168
const char* EncodeKey(std::string* scratch, const Slice& target) {
J
jorlow@chromium.org 已提交
169 170 171 172 173 174 175 176
  scratch->clear();
  PutVarint32(scratch, target.size());
  scratch->append(target.data(), target.size());
  return scratch->data();
}

class MemTableIterator: public Iterator {
 public:
177 178
  MemTableIterator(
      const MemTable& mem, const ReadOptions& options, Arena* arena)
179 180
      : bloom_(nullptr),
        prefix_extractor_(mem.prefix_extractor_),
181 182
        valid_(false),
        arena_mode_(arena != nullptr) {
183
    if (prefix_extractor_ != nullptr && !options.total_order_seek) {
184
      bloom_ = mem.prefix_bloom_.get();
185
      iter_ = mem.table_->GetDynamicPrefixIterator(arena);
186
    } else {
187 188 189 190 191 192 193 194 195
      iter_ = mem.table_->GetIterator(arena);
    }
  }

  ~MemTableIterator() {
    if (arena_mode_) {
      iter_->~Iterator();
    } else {
      delete iter_;
196 197
    }
  }
J
Jim Paton 已提交
198

199 200
  virtual bool Valid() const { return valid_; }
  virtual void Seek(const Slice& k) {
201 202
    if (bloom_ != nullptr &&
        !bloom_->MayContain(prefix_extractor_->Transform(ExtractUserKey(k)))) {
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
      valid_ = false;
      return;
    }
    iter_->Seek(k, nullptr);
    valid_ = iter_->Valid();
  }
  virtual void SeekToFirst() {
    iter_->SeekToFirst();
    valid_ = iter_->Valid();
  }
  virtual void SeekToLast() {
    iter_->SeekToLast();
    valid_ = iter_->Valid();
  }
  virtual void Next() {
    assert(Valid());
    iter_->Next();
    valid_ = iter_->Valid();
  }
  virtual void Prev() {
    assert(Valid());
    iter_->Prev();
    valid_ = iter_->Valid();
  }
J
Jim Paton 已提交
227
  virtual Slice key() const {
228
    assert(Valid());
J
Jim Paton 已提交
229 230
    return GetLengthPrefixedSlice(iter_->key());
  }
J
jorlow@chromium.org 已提交
231
  virtual Slice value() const {
232
    assert(Valid());
J
Jim Paton 已提交
233
    Slice key_slice = GetLengthPrefixedSlice(iter_->key());
J
jorlow@chromium.org 已提交
234 235 236 237 238 239
    return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
  }

  virtual Status status() const { return Status::OK(); }

 private:
240 241
  DynamicBloom* bloom_;
  const SliceTransform* const prefix_extractor_;
242
  MemTableRep::Iterator* iter_;
243
  bool valid_;
244
  bool arena_mode_;
J
jorlow@chromium.org 已提交
245 246 247 248 249 250

  // No copying allowed
  MemTableIterator(const MemTableIterator&);
  void operator=(const MemTableIterator&);
};

251
Iterator* MemTable::NewIterator(const ReadOptions& options, Arena* arena) {
252
  if (arena == nullptr) {
253
    return new MemTableIterator(*this, options, nullptr);
254 255 256
  } else {
    auto mem = arena->AllocateAligned(sizeof(MemTableIterator));
    return new (mem)
257
        MemTableIterator(*this, options, arena);
258
  }
J
jorlow@chromium.org 已提交
259 260
}

261
port::RWMutex* MemTable::GetLock(const Slice& key) {
K
kailiu 已提交
262 263
  static murmur_hash hash;
  return &locks_[hash(key) % locks_.size()];
264 265
}

J
jorlow@chromium.org 已提交
266
void MemTable::Add(SequenceNumber s, ValueType type,
267
                   const Slice& key, /* user key */
J
jorlow@chromium.org 已提交
268 269 270 271 272 273 274 275 276 277 278 279
                   const Slice& value) {
  // Format of an entry is concatenation of:
  //  key_size     : varint32 of internal_key.size()
  //  key bytes    : char[internal_key.size()]
  //  value_size   : varint32 of value.size()
  //  value bytes  : char[value.size()]
  size_t key_size = key.size();
  size_t val_size = value.size();
  size_t internal_key_size = key_size + 8;
  const size_t encoded_len =
      VarintLength(internal_key_size) + internal_key_size +
      VarintLength(val_size) + val_size;
280 281 282
  char* buf = nullptr;
  KeyHandle handle = table_->Allocate(encoded_len, &buf);
  assert(buf != nullptr);
J
jorlow@chromium.org 已提交
283 284 285 286 287 288 289
  char* p = EncodeVarint32(buf, internal_key_size);
  memcpy(p, key.data(), key_size);
  p += key_size;
  EncodeFixed64(p, (s << 8) | type);
  p += 8;
  p = EncodeVarint32(p, val_size);
  memcpy(p, value.data(), val_size);
I
Igor Canadi 已提交
290
  assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len);
291
  table_->Insert(handle);
292
  num_entries_++;
293

294 295 296 297 298
  if (prefix_bloom_) {
    assert(prefix_extractor_);
    prefix_bloom_->Add(prefix_extractor_->Transform(key));
  }

299 300 301 302 303
  // The first sequence number inserted into the memtable
  assert(first_seqno_ == 0 || s > first_seqno_);
  if (first_seqno_ == 0) {
    first_seqno_ = s;
  }
304 305

  should_flush_ = ShouldFlushNow();
J
jorlow@chromium.org 已提交
306 307
}

308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
// Callback from MemTable::Get()
namespace {

struct Saver {
  Status* status;
  const LookupKey* key;
  bool* found_final_value;  // Is value set correctly? Used by KeyMayExist
  bool* merge_in_progress;
  std::string* value;
  const MergeOperator* merge_operator;
  // the merge operations encountered;
  MergeContext* merge_context;
  MemTable* mem;
  Logger* logger;
  Statistics* statistics;
  bool inplace_update_support;
};
}  // namespace

static bool SaveValue(void* arg, const char* entry) {
  Saver* s = reinterpret_cast<Saver*>(arg);
  MergeContext* merge_context = s->merge_context;
  const MergeOperator* merge_operator = s->merge_operator;

  assert(s != nullptr && merge_context != nullptr);

  // entry format is:
  //    klength  varint32
  //    userkey  char[klength-8]
  //    tag      uint64
  //    vlength  varint32
  //    value    char[vlength]
  // Check that it belongs to same user key.  We do not check the
  // sequence number since the Seek() call above should have skipped
  // all entries with overly large sequence numbers.
  uint32_t key_length;
  const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
  if (s->mem->GetInternalKeyComparator().user_comparator()->Compare(
          Slice(key_ptr, key_length - 8), s->key->user_key()) == 0) {
    // Correct user key
    const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
    switch (static_cast<ValueType>(tag & 0xff)) {
      case kTypeValue: {
        if (s->inplace_update_support) {
          s->mem->GetLock(s->key->user_key())->ReadLock();
        }
        Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
        *(s->status) = Status::OK();
        if (*(s->merge_in_progress)) {
          assert(merge_operator);
          if (!merge_operator->FullMerge(s->key->user_key(), &v,
                                         merge_context->GetOperands(), s->value,
                                         s->logger)) {
            RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
            *(s->status) =
                Status::Corruption("Error: Could not perform merge.");
          }
        } else {
          s->value->assign(v.data(), v.size());
        }
        if (s->inplace_update_support) {
369
          s->mem->GetLock(s->key->user_key())->ReadUnlock();
370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391
        }
        *(s->found_final_value) = true;
        return false;
      }
      case kTypeDeletion: {
        if (*(s->merge_in_progress)) {
          assert(merge_operator);
          *(s->status) = Status::OK();
          if (!merge_operator->FullMerge(s->key->user_key(), nullptr,
                                         merge_context->GetOperands(), s->value,
                                         s->logger)) {
            RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
            *(s->status) =
                Status::Corruption("Error: Could not perform merge.");
          }
        } else {
          *(s->status) = Status::NotFound();
        }
        *(s->found_final_value) = true;
        return false;
      }
      case kTypeMerge: {
392 393 394 395
        if (!merge_operator) {
          *(s->status) = Status::InvalidArgument(
              "merge_operator is not properly initialized.");
          // Normally we continue the loop (return true) when we see a merge
396
          // operand.  But in case of an error, we should stop the loop
397 398 399 400 401
          // immediately and pretend we have found the value to stop further
          // seek.  Otherwise, the later call will override this error status.
          *(s->found_final_value) = true;
          return false;
        }
402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417
        std::string merge_result;  // temporary area for merge results later
        Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
        *(s->merge_in_progress) = true;
        merge_context->PushOperand(v);
        return true;
      }
      default:
        assert(false);
        return true;
    }
  }

  // s->state could be Corrupt, merge or notfound
  return false;
}

418
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
419
                   MergeContext& merge_context, const Options& options) {
420 421 422 423 424
  // The sequence number is updated synchronously in version_set.h
  if (first_seqno_ == 0) {
    // Avoiding recording stats for speed.
    return false;
  }
425
  PERF_TIMER_GUARD(get_from_memtable_time);
426

427
  Slice user_key = key.user_key();
428 429
  bool found_final_value = false;
  bool merge_in_progress = s->IsMergeInProgress();
430 431 432 433 434

  if (prefix_bloom_ &&
      !prefix_bloom_->MayContain(prefix_extractor_->Transform(user_key))) {
    // iter is null if prefix bloom says the key does not exist
  } else {
435 436 437 438 439 440 441 442 443 444 445 446 447 448
    Saver saver;
    saver.status = s;
    saver.found_final_value = &found_final_value;
    saver.merge_in_progress = &merge_in_progress;
    saver.key = &key;
    saver.value = value;
    saver.status = s;
    saver.mem = this;
    saver.merge_context = &merge_context;
    saver.merge_operator = options.merge_operator.get();
    saver.logger = options.info_log.get();
    saver.inplace_update_support = options.inplace_update_support;
    saver.statistics = options.statistics.get();
    table_->Get(key, &saver, SaveValue);
449
  }
450

451
  // No change to value, since we have not yet found a Put/Delete
452
  if (!found_final_value && merge_in_progress) {
453 454
    *s = Status::MergeInProgress("");
  }
L
Lei Jin 已提交
455
  PERF_COUNTER_ADD(get_from_memtable_count, 1);
456
  return found_final_value;
457 458
}

459
void MemTable::Update(SequenceNumber seq,
460 461 462
                      const Slice& key,
                      const Slice& value) {
  LookupKey lkey(key, seq);
463
  Slice mem_key = lkey.memtable_key();
464

465
  std::unique_ptr<MemTableRep::Iterator> iter(
466
      table_->GetDynamicPrefixIterator());
467
  iter->Seek(lkey.internal_key(), mem_key.data());
468 469 470

  if (iter->Valid()) {
    // entry format is:
471
    //    key_length  varint32
472 473 474 475 476 477 478 479
    //    userkey  char[klength-8]
    //    tag      uint64
    //    vlength  varint32
    //    value    char[vlength]
    // Check that it belongs to same user key.  We do not check the
    // sequence number since the Seek() call above should have skipped
    // all entries with overly large sequence numbers.
    const char* entry = iter->key();
K
Kai Liu 已提交
480
    uint32_t key_length = 0;
481 482 483 484 485 486 487
    const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
    if (comparator_.comparator.user_comparator()->Compare(
        Slice(key_ptr, key_length - 8), lkey.user_key()) == 0) {
      // Correct user key
      const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
      switch (static_cast<ValueType>(tag & 0xff)) {
        case kTypeValue: {
488
          Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
489 490
          uint32_t prev_size = prev_value.size();
          uint32_t new_size = value.size();
491

492 493
          // Update value, if new value size  <= previous value size
          if (new_size <= prev_size ) {
494
            char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
495
                                     new_size);
496 497
            WriteLock wl(GetLock(lkey.user_key()));
            memcpy(p, value.data(), value.size());
I
Igor Canadi 已提交
498 499 500
            assert((unsigned)((p + value.size()) - entry) ==
                   (unsigned)(VarintLength(key_length) + key_length +
                              VarintLength(value.size()) + value.size()));
501
            return;
502 503 504 505
          }
        }
        default:
          // If the latest value is kTypeDeletion, kTypeMerge or kTypeLogData
506 507 508
          // we don't have enough space for update inplace
            Add(seq, kTypeValue, key, value);
            return;
509 510 511 512
      }
    }
  }

513 514 515 516 517
  // key doesn't exist
  Add(seq, kTypeValue, key, value);
}

bool MemTable::UpdateCallback(SequenceNumber seq,
518 519 520
                              const Slice& key,
                              const Slice& delta,
                              const Options& options) {
521 522 523
  LookupKey lkey(key, seq);
  Slice memkey = lkey.memtable_key();

L
Lei Jin 已提交
524
  std::unique_ptr<MemTableRep::Iterator> iter(
525
      table_->GetDynamicPrefixIterator());
526
  iter->Seek(lkey.internal_key(), memkey.data());
527 528 529 530 531 532 533 534 535 536 537 538

  if (iter->Valid()) {
    // entry format is:
    //    key_length  varint32
    //    userkey  char[klength-8]
    //    tag      uint64
    //    vlength  varint32
    //    value    char[vlength]
    // Check that it belongs to same user key.  We do not check the
    // sequence number since the Seek() call above should have skipped
    // all entries with overly large sequence numbers.
    const char* entry = iter->key();
K
Kai Liu 已提交
539
    uint32_t key_length = 0;
540 541 542 543 544 545 546 547
    const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
    if (comparator_.comparator.user_comparator()->Compare(
        Slice(key_ptr, key_length - 8), lkey.user_key()) == 0) {
      // Correct user key
      const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
      switch (static_cast<ValueType>(tag & 0xff)) {
        case kTypeValue: {
          Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
548 549 550 551
          uint32_t  prev_size = prev_value.size();

          char* prev_buffer = const_cast<char*>(prev_value.data());
          uint32_t  new_prev_size = prev_size;
552 553

          std::string str_value;
554 555 556 557
          WriteLock wl(GetLock(lkey.user_key()));
          auto status = options.inplace_callback(prev_buffer, &new_prev_size,
                                                    delta, &str_value);
          if (status == UpdateStatus::UPDATED_INPLACE) {
558
            // Value already updated by callback.
559 560 561 562 563 564 565 566 567 568 569
            assert(new_prev_size <= prev_size);
            if (new_prev_size < prev_size) {
              // overwrite the new prev_size
              char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
                                       new_prev_size);
              if (VarintLength(new_prev_size) < VarintLength(prev_size)) {
                // shift the value buffer as well.
                memcpy(p, prev_buffer, new_prev_size);
              }
            }
            RecordTick(options.statistics.get(), NUMBER_KEYS_UPDATED);
570
            should_flush_ = ShouldFlushNow();
571
            return true;
572 573 574
          } else if (status == UpdateStatus::UPDATED) {
            Add(seq, kTypeValue, key, Slice(str_value));
            RecordTick(options.statistics.get(), NUMBER_KEYS_WRITTEN);
575
            should_flush_ = ShouldFlushNow();
576
            return true;
577 578
          } else if (status == UpdateStatus::UPDATE_FAILED) {
            // No action required. Return.
579
            should_flush_ = ShouldFlushNow();
580 581 582 583 584 585 586 587 588 589
            return true;
          }
        }
        default:
          break;
      }
    }
  }
  // If the latest value is not kTypeValue
  // or key doesn't exist
590 591
  return false;
}
592 593 594 595 596 597 598

size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) {
  Slice memkey = key.memtable_key();

  // A total ordered iterator is costly for some memtablerep (prefix aware
  // reps). By passing in the user key, we allow efficient iterator creation.
  // The iterator only needs to be ordered within the same user key.
599
  std::unique_ptr<MemTableRep::Iterator> iter(
600
      table_->GetDynamicPrefixIterator());
601
  iter->Seek(key.internal_key(), memkey.data());
602 603 604 605 606

  size_t num_successive_merges = 0;

  for (; iter->Valid(); iter->Next()) {
    const char* entry = iter->key();
K
Kai Liu 已提交
607
    uint32_t key_length = 0;
608
    const char* iter_key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
I
Igor Canadi 已提交
609 610
    if (comparator_.comparator.user_comparator()->Compare(
            Slice(iter_key_ptr, key_length - 8), key.user_key()) != 0) {
611 612 613 614 615 616 617 618 619 620 621 622 623 624
      break;
    }

    const uint64_t tag = DecodeFixed64(iter_key_ptr + key_length - 8);
    if (static_cast<ValueType>(tag & 0xff) != kTypeMerge) {
      break;
    }

    ++num_successive_merges;
  }

  return num_successive_merges;
}

625 626
void MemTableRep::Get(const LookupKey& k, void* callback_args,
                      bool (*callback_func)(void* arg, const char* entry)) {
627
  auto iter = GetDynamicPrefixIterator();
628 629 630 631 632 633
  for (iter->Seek(k.internal_key(), k.memtable_key().data());
       iter->Valid() && callback_func(callback_args, iter->key());
       iter->Next()) {
  }
}

634
}  // namespace rocksdb