memtable.cc 34.1 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
5
//
J
jorlow@chromium.org 已提交
6 7 8 9 10
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/memtable.h"
J
Jim Paton 已提交
11

12
#include <algorithm>
Y
Yi Wu 已提交
13 14
#include <limits>
#include <memory>
J
Jim Paton 已提交
15

J
jorlow@chromium.org 已提交
16
#include "db/dbformat.h"
17
#include "db/merge_context.h"
18
#include "db/merge_helper.h"
19
#include "db/range_tombstone_fragmenter.h"
20
#include "db/read_callback.h"
21 22
#include "monitoring/perf_context_imp.h"
#include "monitoring/statistics.h"
23
#include "port/port.h"
24 25 26 27
#include "rocksdb/comparator.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
K
kailiu 已提交
28
#include "rocksdb/slice_transform.h"
29
#include "rocksdb/write_buffer_manager.h"
S
sdong 已提交
30
#include "table/internal_iterator.h"
A
Andrew Kryczka 已提交
31
#include "table/iterator_wrapper.h"
32
#include "table/merging_iterator.h"
K
kailiu 已提交
33
#include "util/arena.h"
34
#include "util/autovector.h"
J
jorlow@chromium.org 已提交
35
#include "util/coding.h"
36
#include "util/memory_usage.h"
J
Jim Paton 已提交
37
#include "util/murmurhash.h"
K
Kai Liu 已提交
38
#include "util/mutexlock.h"
T
Tamir Duberstein 已提交
39
#include "util/util.h"
J
jorlow@chromium.org 已提交
40

41
namespace rocksdb {
J
jorlow@chromium.org 已提交
42

43 44 45 46
ImmutableMemTableOptions::ImmutableMemTableOptions(
    const ImmutableCFOptions& ioptions,
    const MutableCFOptions& mutable_cf_options)
    : arena_block_size(mutable_cf_options.arena_block_size),
47 48 49 50 51
      memtable_prefix_bloom_bits(
          static_cast<uint32_t>(
              static_cast<double>(mutable_cf_options.write_buffer_size) *
              mutable_cf_options.memtable_prefix_bloom_size_ratio) *
          8u),
52
      memtable_huge_page_size(mutable_cf_options.memtable_huge_page_size),
53 54 55 56 57 58 59
      inplace_update_support(ioptions.inplace_update_support),
      inplace_update_num_locks(mutable_cf_options.inplace_update_num_locks),
      inplace_callback(ioptions.inplace_callback),
      max_successive_merges(mutable_cf_options.max_successive_merges),
      statistics(ioptions.statistics),
      merge_operator(ioptions.merge_operator),
      info_log(ioptions.info_log) {}
L
Lei Jin 已提交
60 61 62

MemTable::MemTable(const InternalKeyComparator& cmp,
                   const ImmutableCFOptions& ioptions,
63
                   const MutableCFOptions& mutable_cf_options,
R
rockeet 已提交
64
                   bool needs_dup_key_check,
65
                   WriteBufferManager* write_buffer_manager,
66
                   SequenceNumber latest_seq, uint32_t column_family_id)
J
jorlow@chromium.org 已提交
67
    : comparator_(cmp),
L
Lei Jin 已提交
68
      moptions_(ioptions, mutable_cf_options),
69
      refs_(0),
L
Lei Jin 已提交
70
      kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)),
71
      mem_tracker_(write_buffer_manager),
72 73 74 75 76 77 78
      arena_(moptions_.arena_block_size,
             (write_buffer_manager != nullptr &&
              (write_buffer_manager->enabled() ||
               write_buffer_manager->cost_to_cache()))
                 ? &mem_tracker_
                 : nullptr,
             mutable_cf_options.memtable_huge_page_size),
79
      table_(mutable_cf_options.memtable_factory->CreateMemTableRep(
Z
ZhaoMing 已提交
80 81
          comparator_, needs_dup_key_check, &arena_,
          mutable_cf_options.prefix_extractor.get(), ioptions.info_log,
82
          column_family_id)),
83
      range_del_table_(SkipListFactory().CreateMemTableRep(
Z
ZhaoMing 已提交
84 85
          comparator_, needs_dup_key_check, &arena_, nullptr /* transform */,
          ioptions.info_log, column_family_id)),
86
      data_size_(0),
87
      num_entries_(0),
88
      num_deletes_(0),
89
      num_range_del_(0),
90
      write_buffer_size_(mutable_cf_options.write_buffer_size),
91 92
      flush_in_progress_(false),
      flush_completed_(false),
93
      is_range_del_slow_(false),
A
Abhishek Kona 已提交
94
      file_number_(0),
95
      first_seqno_(0),
96 97
      earliest_seqno_(latest_seq),
      creation_seq_(latest_seq),
98
      mem_next_logfile_number_(0),
99
      min_prep_log_referenced_(0),
100 101 102
      locks_(moptions_.inplace_update_support
                 ? moptions_.inplace_update_num_locks
                 : 0),
103
      prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
104
      flush_state_(FLUSH_NOT_REQUESTED),
105 106
      env_(ioptions.env),
      insert_with_hint_prefix_extractor_(
Y
Yi Wu 已提交
107
          ioptions.memtable_insert_with_hint_prefix_extractor),
108 109
      oldest_key_time_(std::numeric_limits<uint64_t>::max()),
      atomic_flush_seqno_(kMaxSequenceNumber) {
110 111 112 113
  UpdateFlushState();
  // something went wrong if we need to flush before inserting anything
  assert(!ShouldScheduleFlush());

L
Lei Jin 已提交
114
  if (prefix_extractor_ && moptions_.memtable_prefix_bloom_bits > 0) {
115
    prefix_bloom_.reset(new DynamicBloom(
116 117 118
        &arena_, moptions_.memtable_prefix_bloom_bits, ioptions.bloom_locality,
        6 /* hard coded 6 probes */, nullptr, moptions_.memtable_huge_page_size,
        ioptions.info_log));
119 120
  }
}
J
jorlow@chromium.org 已提交
121

122
MemTableRep* MemTableRepFactory::CreateMemTableRep(
Z
ZhaoMing 已提交
123 124 125
    const MemTableRep::KeyComparator& key_cmp, bool needs_dup_key_check,
    Allocator* allocator, const ImmutableCFOptions& ioptions,
    const MutableCFOptions& mutable_cf_options, uint32_t column_family_id) {
R
rockeet 已提交
126
  return CreateMemTableRep(key_cmp, needs_dup_key_check, allocator,
127
                           mutable_cf_options.prefix_extractor.get(),
R
rockeet 已提交
128
                           ioptions.info_log, column_family_id);
129 130
}

131 132 133 134
MemTable::~MemTable() {
  mem_tracker_.FreeMem();
  assert(refs_ == 0);
}
J
jorlow@chromium.org 已提交
135

J
Jim Paton 已提交
136
size_t MemTable::ApproximateMemoryUsage() {
137 138 139 140 141 142 143 144 145 146 147 148
  autovector<size_t> usages = {arena_.ApproximateMemoryUsage(),
                               table_->ApproximateMemoryUsage(),
                               range_del_table_->ApproximateMemoryUsage(),
                               rocksdb::ApproximateMemoryUsage(insert_hints_)};
  size_t total_usage = 0;
  for (size_t usage : usages) {
    // If usage + total_usage >= kMaxSizet, return kMaxSizet.
    // the following variation is to avoid numeric overflow.
    if (usage >= port::kMaxSizet - total_usage) {
      return port::kMaxSizet;
    }
    total_usage += usage;
149 150
  }
  // otherwise, return the actual usage
151
  return total_usage;
J
Jim Paton 已提交
152
}
J
jorlow@chromium.org 已提交
153

154
bool MemTable::ShouldFlushNow() const {
155 156 157
  if (is_range_del_slow_) {
    return true;
  }
158
  size_t write_buffer_size = write_buffer_size_.load(std::memory_order_relaxed);
159 160 161
  // In a lot of times, we cannot allocate arena blocks that exactly matches the
  // buffer size. Thus we have to decide if we should over-allocate or
  // under-allocate.
162
  // This constant variable can be interpreted as: if we still have more than
163 164 165 166 167 168
  // "kAllowOverAllocationRatio * kArenaBlockSize" space left, we'd try to over
  // allocate one more block.
  const double kAllowOverAllocationRatio = 0.6;

  // If arena still have room for new block allocation, we can safely say it
  // shouldn't flush.
169 170 171
  auto allocated_memory = table_->ApproximateMemoryUsage() +
                          range_del_table_->ApproximateMemoryUsage() +
                          arena_.MemoryAllocatedBytes();
172

173 174 175
  // if we can still allocate one more block without exceeding the
  // over-allocation ratio, then we should not flush.
  if (allocated_memory + kArenaBlockSize <
176
      write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
177 178 179
    return false;
  }

180 181 182 183
  // if user keeps adding entries that exceeds write_buffer_size, we need to
  // flush earlier even though we still have much available memory left.
  if (allocated_memory >
      write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
184 185 186 187
    return true;
  }

  // In this code path, Arena has already allocated its "last block", which
R
rockeet 已提交
188
  // means the total allocated memory size is either:
189
  //  (1) "moderately" over allocated the memory (no more than `0.6 * arena
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
  // block size`. Or,
  //  (2) the allocated memory is less than write buffer size, but we'll stop
  // here since if we allocate a new arena block, we'll over allocate too much
  // more (half of the arena block size) memory.
  //
  // In either case, to avoid over-allocate, the last block will stop allocation
  // when its usage reaches a certain ratio, which we carefully choose "0.75
  // full" as the stop condition because it addresses the following issue with
  // great simplicity: What if the next inserted entry's size is
  // bigger than AllocatedAndUnused()?
  //
  // The answer is: if the entry size is also bigger than 0.25 *
  // kArenaBlockSize, a dedicated block will be allocated for it; otherwise
  // arena will anyway skip the AllocatedAndUnused() and allocate a new, empty
  // and regular block. In either case, we *overly* over-allocated.
  //
  // Therefore, setting the last block to be at most "0.75 full" avoids both
  // cases.
  //
  // NOTE: the average percentage of waste space of this approach can be counted
  // as: "arena block size * 0.25 / write buffer size". User who specify a small
  // write buffer size and/or big arena block size may suffer.
  return arena_.AllocatedAndUnused() < kArenaBlockSize / 4;
}

215 216 217 218 219 220 221 222 223 224 225
void MemTable::UpdateFlushState() {
  auto state = flush_state_.load(std::memory_order_relaxed);
  if (state == FLUSH_NOT_REQUESTED && ShouldFlushNow()) {
    // ignore CAS failure, because that means somebody else requested
    // a flush
    flush_state_.compare_exchange_strong(state, FLUSH_REQUESTED,
                                         std::memory_order_relaxed,
                                         std::memory_order_relaxed);
  }
}

Y
Yi Wu 已提交
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
void MemTable::UpdateOldestKeyTime() {
  uint64_t oldest_key_time = oldest_key_time_.load(std::memory_order_relaxed);
  if (oldest_key_time == std::numeric_limits<uint64_t>::max()) {
    int64_t current_time = 0;
    auto s = env_->GetCurrentTime(&current_time);
    if (s.ok()) {
      assert(current_time >= 0);
      // If fail, the timestamp is already set.
      oldest_key_time_.compare_exchange_strong(
          oldest_key_time, static_cast<uint64_t>(current_time),
          std::memory_order_relaxed, std::memory_order_relaxed);
    }
  }
}

241 242 243 244 245
int MemTable::KeyComparator::operator()(const char* prefix_len_key1,
                                        const char* prefix_len_key2) const {
  // Internal keys are encoded as length-prefixed strings.
  Slice k1 = GetLengthPrefixedSlice(prefix_len_key1);
  Slice k2 = GetLengthPrefixedSlice(prefix_len_key2);
246
  return comparator.CompareKeySeq(k1, k2);
247 248 249
}

int MemTable::KeyComparator::operator()(const char* prefix_len_key,
250
                                        const Slice& key) const {
J
jorlow@chromium.org 已提交
251
  // Internal keys are encoded as length-prefixed strings.
252
  Slice a = GetLengthPrefixedSlice(prefix_len_key);
253
  return comparator.CompareKeySeq(a, key);
J
jorlow@chromium.org 已提交
254 255 256 257 258
}

// Encode a suitable internal key target for "target" and return it.
// Uses *scratch as scratch space, and the returned pointer will point
// into this scratch space.
259
const char* EncodeKey(std::string* scratch, const Slice& target) {
J
jorlow@chromium.org 已提交
260
  scratch->clear();
261
  PutVarint32(scratch, static_cast<uint32_t>(target.size()));
J
jorlow@chromium.org 已提交
262 263 264 265
  scratch->append(target.data(), target.size());
  return scratch->data();
}

Z
ZhaoMing 已提交
266 267
template<class TValue>
class MemTableIteratorBase : public InternalIteratorBase<TValue> {
J
jorlow@chromium.org 已提交
268
 public:
Z
ZhaoMing 已提交
269 270
  MemTableIteratorBase(const MemTable& mem, const ReadOptions& read_options,
                       Arena* arena, bool use_range_del_table = false)
271 272
      : bloom_(nullptr),
        prefix_extractor_(mem.prefix_extractor_),
A
Aaron Gao 已提交
273
        comparator_(mem.comparator_),
274
        valid_(false),
275
        arena_mode_(arena != nullptr),
276 277
        value_pinned_(
            !mem.GetImmutableMemTableOptions()->inplace_update_support) {
278 279 280
    if (use_range_del_table) {
      iter_ = mem.range_del_table_->GetIterator(arena);
    } else if (prefix_extractor_ != nullptr && !read_options.total_order_seek) {
281
      bloom_ = mem.prefix_bloom_.get();
282
      iter_ = mem.table_->GetDynamicPrefixIterator(arena);
283
    } else {
284 285
      iter_ = mem.table_->GetIterator(arena);
    }
286
    is_seek_for_prev_supported_ = iter_->IsSeekForPrevSupported();
287 288
  }

Z
ZhaoMing 已提交
289
  ~MemTableIteratorBase() {
290 291 292 293
    if (arena_mode_) {
      iter_->~Iterator();
    } else {
      delete iter_;
294 295
    }
  }
J
Jim Paton 已提交
296

I
Igor Sugak 已提交
297 298
  virtual bool Valid() const override { return valid_; }
  virtual void Seek(const Slice& k) override {
I
Igor Canadi 已提交
299 300
    PERF_TIMER_GUARD(seek_on_memtable_time);
    PERF_COUNTER_ADD(seek_on_memtable_count, 1);
301 302 303 304 305 306 307 308 309
    if (bloom_ != nullptr) {
      if (!bloom_->MayContain(
              prefix_extractor_->Transform(ExtractUserKey(k)))) {
        PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
        valid_ = false;
        return;
      } else {
        PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
      }
310 311 312 313
    }
    iter_->Seek(k, nullptr);
    valid_ = iter_->Valid();
  }
A
Aaron Gao 已提交
314 315 316 317 318 319 320 321 322 323 324 325 326
  virtual void SeekForPrev(const Slice& k) override {
    PERF_TIMER_GUARD(seek_on_memtable_time);
    PERF_COUNTER_ADD(seek_on_memtable_count, 1);
    if (bloom_ != nullptr) {
      if (!bloom_->MayContain(
              prefix_extractor_->Transform(ExtractUserKey(k)))) {
        PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
        valid_ = false;
        return;
      } else {
        PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
      }
    }
327 328 329 330 331 332 333 334 335 336 337 338
    if (is_seek_for_prev_supported_) {
      iter_->SeekForPrev(k, nullptr);
      valid_ = iter_->Valid();
    } else {
      iter_->Seek(k, nullptr);
      valid_ = iter_->Valid();
      if (!Valid()) {
        SeekToLast();
      }
      while (Valid() && comparator_.comparator.Compare(k, key()) < 0) {
        Prev();
      }
A
Aaron Gao 已提交
339 340
    }
  }
I
Igor Sugak 已提交
341
  virtual void SeekToFirst() override {
342 343 344
    iter_->SeekToFirst();
    valid_ = iter_->Valid();
  }
I
Igor Sugak 已提交
345
  virtual void SeekToLast() override {
346 347 348
    iter_->SeekToLast();
    valid_ = iter_->Valid();
  }
I
Igor Sugak 已提交
349
  virtual void Next() override {
350
    PERF_COUNTER_ADD(next_on_memtable_count, 1);
Z
ZhaoMing 已提交
351
    assert(valid_);
352 353 354
    iter_->Next();
    valid_ = iter_->Valid();
  }
I
Igor Sugak 已提交
355
  virtual void Prev() override {
356
    PERF_COUNTER_ADD(prev_on_memtable_count, 1);
Z
ZhaoMing 已提交
357
    assert(valid_);
358 359 360
    iter_->Prev();
    valid_ = iter_->Valid();
  }
I
Igor Sugak 已提交
361
  virtual Slice key() const override {
Z
ZhaoMing 已提交
362
    assert(valid_);
Z
ZhaoMing 已提交
363
    return iter_->key();
J
Jim Paton 已提交
364
  }
J
jorlow@chromium.org 已提交
365

I
Igor Sugak 已提交
366
  virtual Status status() const override { return Status::OK(); }
J
jorlow@chromium.org 已提交
367

Z
ZhaoMing 已提交
368
 protected:
369 370
  DynamicBloom* bloom_;
  const SliceTransform* const prefix_extractor_;
A
Aaron Gao 已提交
371
  const MemTable::KeyComparator comparator_;
372
  MemTableRep::Iterator* iter_;
373
  bool valid_;
374
  bool arena_mode_;
375
  bool value_pinned_;
376
  bool is_seek_for_prev_supported_;
J
jorlow@chromium.org 已提交
377 378

  // No copying allowed
Z
ZhaoMing 已提交
379 380 381 382 383 384 385
  MemTableIteratorBase(const MemTableIteratorBase&);
  void operator=(const MemTableIteratorBase&);
};

class MemTableTombstoneIterator : public MemTableIteratorBase<Slice> {
  using Base = MemTableIteratorBase<Slice>;
  using Base::iter_;
Z
ZhaoMing 已提交
386 387
  using Base::valid_;
  using Base::value_pinned_;
Z
ZhaoMing 已提交
388

Z
ZhaoMing 已提交
389
 public:
Z
ZhaoMing 已提交
390
  using Base::Base;
Z
ZhaoMing 已提交
391 392

  virtual Slice value() const override {
Z
ZhaoMing 已提交
393
    assert(valid_);
Z
ZhaoMing 已提交
394 395
    LazyBuffer v = iter_->value();
    assert(v.fetch().ok());
Z
ZhaoMing 已提交
396
    return v.fetch().ok() ? v.slice() : Slice::Invalid();
Z
ZhaoMing 已提交
397 398 399
  }
};

Z
ZhaoMing 已提交
400
class MemTableIterator
Z
ZhaoMing 已提交
401
    : public MemTableIteratorBase<LazyBuffer>, public LazyBufferState {
Z
ZhaoMing 已提交
402
  using Base = MemTableIteratorBase<LazyBuffer>;
Z
ZhaoMing 已提交
403
  using Base::iter_;
Z
ZhaoMing 已提交
404 405
  using Base::valid_;
  using Base::value_pinned_;
Z
ZhaoMing 已提交
406

Z
ZhaoMing 已提交
407
 public:
Z
ZhaoMing 已提交
408
  using Base::Base;
Z
ZhaoMing 已提交
409

Z
ZhaoMing 已提交
410 411 412
  void destroy(LazyBuffer* /*buffer*/) const override {}

  void pin_buffer(LazyBuffer* buffer) const override {
Z
ZhaoMing 已提交
413
    if (!value_pinned_ || !iter_->IsValuePinned()) {
Z
ZhaoMing 已提交
414
      LazyBufferState::pin_buffer(buffer);
Z
ZhaoMing 已提交
415
    } else {
Z
ZhaoMing 已提交
416 417
      *buffer = iter_->value();
      buffer->pin();
Z
ZhaoMing 已提交
418 419
    }
  }
Z
ZhaoMing 已提交
420 421 422 423 424 425 426 427 428

  Status dump_buffer(LazyBuffer* buffer, LazyBuffer* target) const override {
    *buffer = iter_->value();
    return std::move(*buffer).dump(*target);
  }

  Status fetch_buffer(LazyBuffer* buffer) const override {
    *buffer = iter_->value();
    return buffer->fetch();
Z
ZhaoMing 已提交
429 430
  }

Z
ZhaoMing 已提交
431
  virtual LazyBuffer value() const override {
Z
ZhaoMing 已提交
432
    assert(valid_);
Z
ZhaoMing 已提交
433
    return LazyBuffer(this, {});
Z
ZhaoMing 已提交
434
  }
J
jorlow@chromium.org 已提交
435 436
};

S
sdong 已提交
437 438
InternalIterator* MemTable::NewIterator(const ReadOptions& read_options,
                                        Arena* arena) {
439 440
  assert(arena != nullptr);
  auto mem = arena->AllocateAligned(sizeof(MemTableIterator));
L
Lei Jin 已提交
441
  return new (mem) MemTableIterator(*this, read_options, arena);
J
jorlow@chromium.org 已提交
442 443
}

444 445
FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIterator(
    const ReadOptions& read_options, SequenceNumber read_seq) {
446 447
  size_t num_range_del = num_range_del_.load(std::memory_order_relaxed);
  if (read_options.ignore_range_deletions || num_range_del == 0) {
A
Andrew Kryczka 已提交
448
    return nullptr;
A
Andrew Kryczka 已提交
449
  }
450 451 452 453 454
  auto fragmented_tombstone_list = fragmented_range_dels_;

  if (!fragmented_range_dels_ ||
      fragmented_range_dels_->user_tag() != num_range_del) {

Z
ZhaoMing 已提交
455
    auto* unfragmented_iter = new MemTableTombstoneIterator(
456 457 458 459 460 461 462 463
        *this, read_options, nullptr /* arena */,
        true /* use_range_del_table */);
    if (unfragmented_iter == nullptr) {
      return nullptr;
    }
    StopWatchNano timer(env_, true);
    fragmented_tombstone_list =
        std::make_shared<FragmentedRangeTombstoneList>(
Z
ZhaoMing 已提交
464
            std::unique_ptr<InternalIteratorBase<Slice>>(unfragmented_iter),
465 466 467 468 469 470 471 472
            comparator_.comparator, false /* for_compaction */,
            std::vector<SequenceNumber>() /* snapshots */, num_range_del);
    if (timer.ElapsedNanos() > 10000000ULL) {
      is_range_del_slow_ = true;
    }
    if (num_range_del == num_range_del_.load(std::memory_order_relaxed)) {
      fragmented_range_dels_ = fragmented_tombstone_list;
    }
473 474 475
  }

  auto* fragmented_iter = new FragmentedRangeTombstoneIterator(
476
      fragmented_tombstone_list, comparator_.comparator, read_seq);
477
  return fragmented_iter;
478 479
}

480
port::RWMutex* MemTable::GetLock(const Slice& key) {
K
kailiu 已提交
481 482
  static murmur_hash hash;
  return &locks_[hash(key) % locks_.size()];
483 484
}

485 486
MemTable::MemTableStats MemTable::ApproximateStats(const Slice& start_ikey,
                                                   const Slice& end_ikey) {
487
  uint64_t entry_count = table_->ApproximateNumEntries(start_ikey, end_ikey);
488
  entry_count += range_del_table_->ApproximateNumEntries(start_ikey, end_ikey);
489
  if (entry_count == 0) {
490
    return {0, 0};
491 492 493
  }
  uint64_t n = num_entries_.load(std::memory_order_relaxed);
  if (n == 0) {
494
    return {0, 0};
495 496
  }
  if (entry_count > n) {
497 498 499
    // (range_del_)table_->ApproximateNumEntries() is just an estimate so it can
    // be larger than actual entries we have. Cap it to entries we have to limit
    // the inaccuracy.
500 501 502
    entry_count = n;
  }
  uint64_t data_size = data_size_.load(std::memory_order_relaxed);
503
  return {entry_count * (data_size / n), entry_count};
504 505
}

506
bool MemTable::Add(SequenceNumber s, ValueType type,
507
                   const Slice& key, /* user key */
508 509
                   const Slice& value, bool allow_concurrent,
                   MemTablePostProcessInfo* post_process_info) {
510 511
  std::unique_ptr<MemTableRep>& table =
      type == kTypeRangeDeletion ? range_del_table_ : table_;
512 513 514 515

  InternalKey internal_key(key, s, type);
  size_t encoded_len =
      MemTableRep::EncodeKeyValueSize(internal_key.Encode(), value);
516
  if (!allow_concurrent) {
517
    // Extract prefix for insert with hint.
518
    if (insert_with_hint_prefix_extractor_ != nullptr &&
519 520 521 522 523
        insert_with_hint_prefix_extractor_->InDomain(internal_key.user_key())) {
      Slice prefix = insert_with_hint_prefix_extractor_->Transform(
          internal_key.user_key());
      bool res = table->InsertKeyValueWithHint(internal_key.Encode(), value,
                                               &insert_hints_[prefix]);
M
Maysam Yabandeh 已提交
524
      if (UNLIKELY(!res)) {
525 526
        return res;
      }
527
    } else {
528
      bool res = table->InsertKeyValue(internal_key.Encode(), value);
M
Maysam Yabandeh 已提交
529
      if (UNLIKELY(!res)) {
530 531
        return res;
      }
532
    }
533 534 535 536 537 538

    // this is a bit ugly, but is the way to avoid locked instructions
    // when incrementing an atomic
    num_entries_.store(num_entries_.load(std::memory_order_relaxed) + 1,
                       std::memory_order_relaxed);
    data_size_.store(data_size_.load(std::memory_order_relaxed) + encoded_len,
539
                     std::memory_order_relaxed);
540 541 542 543
    if (type == kTypeDeletion) {
      num_deletes_.store(num_deletes_.load(std::memory_order_relaxed) + 1,
                         std::memory_order_relaxed);
    }
544

545 546 547 548
    if (prefix_bloom_) {
      assert(prefix_extractor_);
      prefix_bloom_->Add(prefix_extractor_->Transform(key));
    }
549

550
    // The first sequence number inserted into the memtable
M
Maysam Yabandeh 已提交
551
    assert(first_seqno_ == 0 || s >= first_seqno_);
552 553
    if (first_seqno_ == 0) {
      first_seqno_.store(s, std::memory_order_relaxed);
A
agiardullo 已提交
554

555 556 557 558 559 560
      if (earliest_seqno_ == kMaxSequenceNumber) {
        earliest_seqno_.store(GetFirstSequenceNumber(),
                              std::memory_order_relaxed);
      }
      assert(first_seqno_.load() >= earliest_seqno_.load());
    }
561 562
    assert(post_process_info == nullptr);
    UpdateFlushState();
563
  } else {
564
    bool res = table->InsertKeyValueConcurrently(internal_key.Encode(), value);
M
Maysam Yabandeh 已提交
565
    if (UNLIKELY(!res)) {
566 567
      return res;
    }
568

569 570 571
    assert(post_process_info != nullptr);
    post_process_info->num_entries++;
    post_process_info->data_size += encoded_len;
572
    if (type == kTypeDeletion) {
573
      post_process_info->num_deletes++;
574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590
    }

    if (prefix_bloom_) {
      assert(prefix_extractor_);
      prefix_bloom_->AddConcurrently(prefix_extractor_->Transform(key));
    }

    // atomically update first_seqno_ and earliest_seqno_.
    uint64_t cur_seq_num = first_seqno_.load(std::memory_order_relaxed);
    while ((cur_seq_num == 0 || s < cur_seq_num) &&
           !first_seqno_.compare_exchange_weak(cur_seq_num, s)) {
    }
    uint64_t cur_earliest_seqno =
        earliest_seqno_.load(std::memory_order_relaxed);
    while (
        (cur_earliest_seqno == kMaxSequenceNumber || s < cur_earliest_seqno) &&
        !first_seqno_.compare_exchange_weak(cur_earliest_seqno, s)) {
A
agiardullo 已提交
591
    }
592
  }
593 594 595
  if (type == kTypeRangeDeletion) {
    num_range_del_.store(num_range_del_.load(std::memory_order_relaxed) + 1,
                         std::memory_order_relaxed);
A
Andrew Kryczka 已提交
596
  }
Y
Yi Wu 已提交
597
  UpdateOldestKeyTime();
598
  return true;
J
jorlow@chromium.org 已提交
599 600
}

601 602 603 604 605 606 607 608
// Callback from MemTable::Get()
namespace {

struct Saver {
  Status* status;
  const LookupKey* key;
  bool* found_final_value;  // Is value set correctly? Used by KeyMayExist
  bool* merge_in_progress;
Z
ZhaoMing 已提交
609
  LazyBuffer* value;
A
agiardullo 已提交
610
  SequenceNumber seq;
611 612 613
  const MergeOperator* merge_operator;
  // the merge operations encountered;
  MergeContext* merge_context;
614
  SequenceNumber max_covering_tombstone_seq;
615 616 617 618
  MemTable* mem;
  Logger* logger;
  Statistics* statistics;
  bool inplace_update_support;
619
  Env* env_;
620
  ReadCallback* callback_;
Y
Yi Wu 已提交
621

622 623
  bool CheckCallback(SequenceNumber _seq) {
    if (callback_) {
624
      return callback_->IsVisible(_seq);
625 626 627
    }
    return true;
  }
628 629 630
};
}  // namespace

Z
ZhaoMing 已提交
631
static bool SaveValue(void* arg, const Slice& internal_key,
Z
ZhaoMing 已提交
632
                      LazyBuffer&& value) {
633 634
  Saver* s = reinterpret_cast<Saver*>(arg);
  MergeContext* merge_context = s->merge_context;
635
  SequenceNumber max_covering_tombstone_seq = s->max_covering_tombstone_seq;
636 637
  const MergeOperator* merge_operator = s->merge_operator;

638
  assert(merge_context != nullptr);
639 640 641 642

  // Check that it belongs to same user key.  We do not check the
  // sequence number since the Seek() call above should have skipped
  // all entries with overly large sequence numbers.
643
  if (s->mem->GetInternalKeyComparator().user_comparator()->Equal(
644
          ExtractUserKey(internal_key), s->key->user_key())) {
645
    // Correct user key
646 647
    const uint64_t tag =
        DecodeFixed64(internal_key.data() + internal_key.size() - 8);
A
agiardullo 已提交
648
    ValueType type;
649 650 651 652 653 654 655 656
    SequenceNumber seq;
    UnPackSequenceAndType(tag, &seq, &type);
    // If the value is not in the snapshot, skip it
    if (!s->CheckCallback(seq)) {
      return true;  // to continue to the next seq
    }

    s->seq = seq;
A
agiardullo 已提交
657

Z
ZhaoMing 已提交
658 659
    if ((type == kTypeValue || type == kTypeMerge || type == kTypeValueIndex ||
         type == kTypeMergeIndex) && max_covering_tombstone_seq > seq) {
A
Andrew Kryczka 已提交
660 661
      type = kTypeRangeDeletion;
    }
A
agiardullo 已提交
662
    switch (type) {
Z
ZhaoMing 已提交
663 664 665 666
      case kTypeValueIndex:
        assert(false);
        FALLTHROUGH_INTENDED;
      case kTypeValue: {
667 668 669
        if (s->inplace_update_support) {
          s->mem->GetLock(s->key->user_key())->ReadLock();
        }
Z
ZhaoMing 已提交
670 671
        *s->status = Status::OK();
        if (*s->merge_in_progress) {
672
          if (s->value != nullptr) {
Z
ZhaoMing 已提交
673
            *s->status = MergeHelper::TimedFullMerge(
Z
ZhaoMing 已提交
674
                merge_operator, s->key->user_key(), &value,
675
                merge_context->GetOperands(), s->value, s->logger,
Z
ZhaoMing 已提交
676 677
                s->statistics, s->env_, true);
            if (s->status->ok()) {
Z
ZhaoMing 已提交
678
              s->value->pin();
Z
ZhaoMing 已提交
679
            }
680
          }
M
Maysam Yabandeh 已提交
681
        } else if (s->value != nullptr) {
Z
ZhaoMing 已提交
682
          *s->status = std::move(value).dump(*s->value);
Z
ZhaoMing 已提交
683 684 685
          if (!s->status->ok()) {
            return false;
          }
686 687
        }
        if (s->inplace_update_support) {
688
          s->mem->GetLock(s->key->user_key())->ReadUnlock();
689
        }
Z
ZhaoMing 已提交
690
        *s->found_final_value = true;
691 692
        return false;
      }
A
Andres Noetzli 已提交
693
      case kTypeDeletion:
A
Andrew Kryczka 已提交
694 695
      case kTypeSingleDeletion:
      case kTypeRangeDeletion: {
Z
ZhaoMing 已提交
696
        if (*s->merge_in_progress) {
697
          if (s->value != nullptr) {
Z
ZhaoMing 已提交
698
            *s->status = MergeHelper::TimedFullMerge(
699 700
                merge_operator, s->key->user_key(), nullptr,
                merge_context->GetOperands(), s->value, s->logger,
Z
ZhaoMing 已提交
701 702
                s->statistics, s->env_, true);
            if (s->status->ok()) {
Z
ZhaoMing 已提交
703
              s->value->pin();
Z
ZhaoMing 已提交
704
            }
705
          }
706
        } else {
Z
ZhaoMing 已提交
707
          *s->status = Status::NotFound();
708
        }
Z
ZhaoMing 已提交
709
        *s->found_final_value = true;
710 711
        return false;
      }
Z
ZhaoMing 已提交
712 713 714 715
      case kTypeMergeIndex:
        assert(false);
        FALLTHROUGH_INTENDED;
      case kTypeMerge: {
716
        if (!merge_operator) {
Z
ZhaoMing 已提交
717
          *s->status = Status::InvalidArgument(
718 719
              "merge_operator is not properly initialized.");
          // Normally we continue the loop (return true) when we see a merge
720
          // operand.  But in case of an error, we should stop the loop
721 722
          // immediately and pretend we have found the value to stop further
          // seek.  Otherwise, the later call will override this error status.
Z
ZhaoMing 已提交
723
          *s->found_final_value = true;
724 725
          return false;
        }
Z
ZhaoMing 已提交
726 727
        *s->merge_in_progress = true;
        merge_context->PushOperand(std::move(value));
728
        if (merge_operator->ShouldMerge(merge_context->GetOperandsDirectionBackward())) {
Z
ZhaoMing 已提交
729
          *s->status = MergeHelper::TimedFullMerge(
730 731
              merge_operator, s->key->user_key(), nullptr,
              merge_context->GetOperands(), s->value, s->logger, s->statistics,
Z
ZhaoMing 已提交
732 733
              s->env_, true);
          if (s->status->ok()) {
Z
ZhaoMing 已提交
734
            s->value->pin();
Z
ZhaoMing 已提交
735 736
          }
          *s->found_final_value = true;
737 738
          return false;
        }
739 740 741 742 743 744 745 746 747 748 749 750
        return true;
      }
      default:
        assert(false);
        return true;
    }
  }

  // s->state could be Corrupt, merge or notfound
  return false;
}

Z
ZhaoMing 已提交
751
bool MemTable::Get(const LookupKey& key, LazyBuffer* value, Status* s,
A
Andrew Kryczka 已提交
752
                   MergeContext* merge_context,
753 754
                   SequenceNumber* max_covering_tombstone_seq,
                   SequenceNumber* seq, const ReadOptions& read_opts,
Z
ZhaoMing 已提交
755
                   ReadCallback* callback) {
756
  // The sequence number is updated synchronously in version_set.h
757
  if (IsEmpty()) {
758 759 760
    // Avoiding recording stats for speed.
    return false;
  }
761
  PERF_TIMER_GUARD(get_from_memtable_time);
762

763 764 765 766 767 768 769
  std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
      NewRangeTombstoneIterator(read_opts,
                                GetInternalKeySeqno(key.internal_key())));
  if (range_del_iter != nullptr) {
    *max_covering_tombstone_seq =
        std::max(*max_covering_tombstone_seq,
                 range_del_iter->MaxCoveringTombstoneSeqnum(key.user_key()));
770 771
  }

772
  Slice user_key = key.user_key();
773 774
  bool found_final_value = false;
  bool merge_in_progress = s->IsMergeInProgress();
775 776 777 778 779
  bool const may_contain =
      nullptr == prefix_bloom_
          ? false
          : prefix_bloom_->MayContain(prefix_extractor_->Transform(user_key));
  if (prefix_bloom_ && !may_contain) {
780
    // iter is null if prefix bloom says the key does not exist
781
    PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
A
agiardullo 已提交
782
    *seq = kMaxSequenceNumber;
783
  } else {
784 785 786
    if (prefix_bloom_) {
      PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
    }
787 788 789 790 791
    Saver saver;
    saver.status = s;
    saver.found_final_value = &found_final_value;
    saver.merge_in_progress = &merge_in_progress;
    saver.key = &key;
M
Maysam Yabandeh 已提交
792
    saver.value = value;
A
agiardullo 已提交
793
    saver.seq = kMaxSequenceNumber;
794
    saver.mem = this;
795
    saver.merge_context = merge_context;
796
    saver.max_covering_tombstone_seq = *max_covering_tombstone_seq;
L
Lei Jin 已提交
797 798
    saver.merge_operator = moptions_.merge_operator;
    saver.logger = moptions_.info_log;
L
Lei Jin 已提交
799
    saver.inplace_update_support = moptions_.inplace_update_support;
L
Lei Jin 已提交
800
    saver.statistics = moptions_.statistics;
801
    saver.env_ = env_;
802
    saver.callback_ = callback;
803
    table_->Get(key, &saver, SaveValue);
A
agiardullo 已提交
804 805

    *seq = saver.seq;
806
  }
807

808
  // No change to value, since we have not yet found a Put/Delete
809
  if (!found_final_value && merge_in_progress) {
810
    *s = Status::MergeInProgress();
811
  }
L
Lei Jin 已提交
812
  PERF_COUNTER_ADD(get_from_memtable_count, 1);
813
  return found_final_value;
814 815
}

816
void MemTable::Update(SequenceNumber seq,
817 818 819
                      const Slice& key,
                      const Slice& value) {
  LookupKey lkey(key, seq);
820
  Slice mem_key = lkey.memtable_key();
821

822
  std::unique_ptr<MemTableRep::Iterator> iter(
823
      table_->GetDynamicPrefixIterator());
824
  iter->Seek(lkey.internal_key(), mem_key.data());
825 826 827 828

  if (iter->Valid()) {
    // sequence number since the Seek() call above should have skipped
    // all entries with overly large sequence numbers.
829
    if (comparator_.comparator.user_comparator()->Equal(
Z
ZhaoMing 已提交
830
            ExtractUserKey(iter->key()), lkey.user_key())) {
Z
ZhaoMing 已提交
831
      // Correct user key
Z
ZhaoMing 已提交
832
      const uint64_t tag = ExtractInternalKeyFooter(iter->key());
A
agiardullo 已提交
833
      ValueType type;
834 835
      SequenceNumber unused;
      UnPackSequenceAndType(tag, &unused, &type);
Z
ZhaoMing 已提交
836
      LazyBuffer old_value = iter->value();
Z
ZhaoMing 已提交
837 838
      if (type == kTypeValue && old_value.valid()) {
        uint32_t old_size = static_cast<uint32_t>(old_value.size());
D
Daniel Black 已提交
839 840
        uint32_t new_size = static_cast<uint32_t>(value.size());

R
rockeet 已提交
841 842
        // Update value, if new value size <= old value size
        if (new_size <= old_size) {
Y
Yi Wu 已提交
843
          char* p =
Z
ZhaoMing 已提交
844
              const_cast<char*>(old_value.data()) - VarintLength(old_size);
845
          p = EncodeVarint32(p, new_size);
D
Daniel Black 已提交
846 847
          WriteLock wl(GetLock(lkey.user_key()));
          memcpy(p, value.data(), value.size());
Z
Zhongyi Xie 已提交
848
          RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
D
Daniel Black 已提交
849
          return;
850 851 852 853 854
        }
      }
    }
  }

855
  // key doesn't exist
856 857
  bool add_res __attribute__((__unused__));
  add_res = Add(seq, kTypeValue, key, value);
858 859
  // We already checked unused != seq above. In that case, Add should not fail.
  assert(add_res);
860 861 862
}

bool MemTable::UpdateCallback(SequenceNumber seq,
863
                              const Slice& key,
L
Lei Jin 已提交
864
                              const Slice& delta) {
865 866 867
  LookupKey lkey(key, seq);
  Slice memkey = lkey.memtable_key();

L
Lei Jin 已提交
868
  std::unique_ptr<MemTableRep::Iterator> iter(
869
      table_->GetDynamicPrefixIterator());
870
  iter->Seek(lkey.internal_key(), memkey.data());
871 872 873 874 875

  if (iter->Valid()) {
    // Check that it belongs to same user key.  We do not check the
    // sequence number since the Seek() call above should have skipped
    // all entries with overly large sequence numbers.
876
    if (comparator_.comparator.user_comparator()->Equal(
Z
ZhaoMing 已提交
877
            ExtractUserKey(iter->key()), lkey.user_key())) {
878
      // Correct user key
Z
ZhaoMing 已提交
879
      const uint64_t tag = ExtractInternalKeyFooter(iter->key());
A
agiardullo 已提交
880 881 882
      ValueType type;
      uint64_t unused;
      UnPackSequenceAndType(tag, &unused, &type);
Z
ZhaoMing 已提交
883
      LazyBuffer old_value = iter->value();
Z
ZhaoMing 已提交
884 885
      if(type == kTypeValue && old_value.valid()) {
        uint32_t old_size = static_cast<uint32_t>(old_value.size());
Z
ZhaoMing 已提交
886

Z
ZhaoMing 已提交
887
        char* old_buffer = const_cast<char*>(old_value.data());
Z
ZhaoMing 已提交
888 889 890 891 892 893 894 895 896 897 898
        uint32_t new_inplace_size = old_size;

        std::string merged_value;
        WriteLock wl(GetLock(lkey.user_key()));
        auto status = moptions_.inplace_callback(old_buffer, &new_inplace_size,
                                                 delta, &merged_value);
        if (status == UpdateStatus::UPDATED_INPLACE) {
          // Value already updated by callback.
          assert(new_inplace_size <= old_size);
          if (new_inplace_size < old_size) {
            // overwrite the new inplace size
Z
ZhaoMing 已提交
899
            char* p = old_buffer - VarintLength(old_size);
Z
ZhaoMing 已提交
900 901 902 903
            p = EncodeVarint32(p, new_inplace_size);
            if (p < old_buffer) {
              // shift the value buffer as well.
              memmove(p, old_buffer, new_inplace_size);
904
            }
905
          }
Z
ZhaoMing 已提交
906 907 908 909 910 911 912 913 914 915 916 917
          RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
          UpdateFlushState();
          return true;
        } else if (status == UpdateStatus::UPDATED) {
          Add(seq, kTypeValue, key, Slice(merged_value));
          RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN);
          UpdateFlushState();
          return true;
        } else if (status == UpdateStatus::UPDATE_FAILED) {
          // No action required. Return.
          UpdateFlushState();
          return true;
918 919 920 921 922 923
        }
      }
    }
  }
  // If the latest value is not kTypeValue
  // or key doesn't exist
924 925
  return false;
}
926 927 928 929 930 931 932

size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) {
  Slice memkey = key.memtable_key();

  // A total ordered iterator is costly for some memtablerep (prefix aware
  // reps). By passing in the user key, we allow efficient iterator creation.
  // The iterator only needs to be ordered within the same user key.
933
  std::unique_ptr<MemTableRep::Iterator> iter(
934
      table_->GetDynamicPrefixIterator());
935
  iter->Seek(key.internal_key(), memkey.data());
936 937 938

  size_t num_successive_merges = 0;
  for (; iter->Valid(); iter->Next()) {
Z
ZhaoMing 已提交
939
    Slice internal_key = iter->key();
940
    if (!comparator_.comparator.user_comparator()->Equal(
941
            ExtractUserKey(internal_key), key.user_key())) {
942 943 944
      break;
    }

945 946
    const uint64_t tag =
        DecodeFixed64(internal_key.data() + internal_key.size() - 8);
A
agiardullo 已提交
947 948 949
    ValueType type;
    uint64_t unused;
    UnPackSequenceAndType(tag, &unused, &type);
Z
ZhaoMing 已提交
950
    if (type != kTypeMerge && type != kTypeMergeIndex) {
951 952 953 954 955 956 957 958 959
      break;
    }

    ++num_successive_merges;
  }

  return num_successive_merges;
}

960 961 962 963 964 965 966 967 968 969 970 971 972
void MemTable::RefLogContainingPrepSection(uint64_t log) {
  assert(log > 0);
  auto cur = min_prep_log_referenced_.load();
  while ((log < cur || cur == 0) &&
         !min_prep_log_referenced_.compare_exchange_strong(cur, log)) {
    cur = min_prep_log_referenced_.load();
  }
}

uint64_t MemTable::GetMinLogContainingPrepSection() {
  return min_prep_log_referenced_.load();
}

973
}  // namespace rocksdb