memtable.cc 12.2 KB
Newer Older
1 2 3 4 5
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9 10
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/memtable.h"
J
Jim Paton 已提交
11 12 13

#include <memory>

J
jorlow@chromium.org 已提交
14
#include "db/dbformat.h"
15
#include "db/merge_context.h"
16 17 18 19
#include "rocksdb/comparator.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
J
jorlow@chromium.org 已提交
20
#include "util/coding.h"
21
#include "util/mutexlock.h"
J
Jim Paton 已提交
22
#include "util/murmurhash.h"
I
Igor Canadi 已提交
23
#include "util/statistics.h"
J
jorlow@chromium.org 已提交
24

25 26 27 28 29 30 31 32 33
namespace std {
template <>
struct hash<rocksdb::Slice> {
  size_t operator()(const rocksdb::Slice& slice) const {
    return MurmurHash(slice.data(), slice.size(), 0);
  }
};
}

34
namespace rocksdb {
J
jorlow@chromium.org 已提交
35

36
MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options)
J
jorlow@chromium.org 已提交
37
    : comparator_(cmp),
38
      refs_(0),
X
Xing Jin 已提交
39
      arena_impl_(options.arena_block_size),
40 41
      table_(options.memtable_factory->CreateMemTableRep(comparator_,
                                                         &arena_impl_)),
42 43
      flush_in_progress_(false),
      flush_completed_(false),
A
Abhishek Kona 已提交
44
      file_number_(0),
45
      first_seqno_(0),
46
      mem_next_logfile_number_(0),
47
      mem_logfile_number_(0),
48 49
      locks_(options.inplace_update_support ? options.inplace_update_num_locks
                                            : 0) {}
J
jorlow@chromium.org 已提交
50 51

MemTable::~MemTable() {
52
  assert(refs_ == 0);
J
jorlow@chromium.org 已提交
53 54
}

J
Jim Paton 已提交
55
size_t MemTable::ApproximateMemoryUsage() {
J
Jim Paton 已提交
56
  return arena_impl_.ApproximateMemoryUsage() +
57
         table_->ApproximateMemoryUsage();
J
Jim Paton 已提交
58
}
J
jorlow@chromium.org 已提交
59 60 61 62 63 64 65 66 67

int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr)
    const {
  // Internal keys are encoded as length-prefixed strings.
  Slice a = GetLengthPrefixedSlice(aptr);
  Slice b = GetLengthPrefixedSlice(bptr);
  return comparator.Compare(a, b);
}

J
Jim Paton 已提交
68 69 70 71 72
Slice MemTableRep::UserKey(const char* key) const {
  Slice slice = GetLengthPrefixedSlice(key);
  return Slice(slice.data(), slice.size() - 8);
}

J
jorlow@chromium.org 已提交
73 74 75 76 77 78 79 80 81 82 83 84
// Encode a suitable internal key target for "target" and return it.
// Uses *scratch as scratch space, and the returned pointer will point
// into this scratch space.
static const char* EncodeKey(std::string* scratch, const Slice& target) {
  scratch->clear();
  PutVarint32(scratch, target.size());
  scratch->append(target.data(), target.size());
  return scratch->data();
}

class MemTableIterator: public Iterator {
 public:
85 86 87
  MemTableIterator(MemTableRep* table, const ReadOptions& options)
    : iter_() {
    if (options.prefix) {
88
      iter_.reset(table->GetPrefixIterator(*options.prefix));
89
    } else if (options.prefix_seek) {
90
      iter_.reset(table->GetDynamicPrefixIterator());
91
    } else {
92
      iter_.reset(table->GetIterator());
93 94
    }
  }
J
Jim Paton 已提交
95

J
Jim Paton 已提交
96 97 98 99 100 101 102 103 104
  virtual bool Valid() const { return iter_->Valid(); }
  virtual void Seek(const Slice& k) { iter_->Seek(EncodeKey(&tmp_, k)); }
  virtual void SeekToFirst() { iter_->SeekToFirst(); }
  virtual void SeekToLast() { iter_->SeekToLast(); }
  virtual void Next() { iter_->Next(); }
  virtual void Prev() { iter_->Prev(); }
  virtual Slice key() const {
    return GetLengthPrefixedSlice(iter_->key());
  }
J
jorlow@chromium.org 已提交
105
  virtual Slice value() const {
J
Jim Paton 已提交
106
    Slice key_slice = GetLengthPrefixedSlice(iter_->key());
J
jorlow@chromium.org 已提交
107 108 109 110 111 112
    return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
  }

  virtual Status status() const { return Status::OK(); }

 private:
113
  std::unique_ptr<MemTableRep::Iterator> iter_;
J
jorlow@chromium.org 已提交
114 115 116 117 118 119 120
  std::string tmp_;       // For passing to EncodeKey

  // No copying allowed
  MemTableIterator(const MemTableIterator&);
  void operator=(const MemTableIterator&);
};

121 122
Iterator* MemTable::NewIterator(const ReadOptions& options) {
  return new MemTableIterator(table_.get(), options);
J
jorlow@chromium.org 已提交
123 124
}

125 126 127 128
port::RWMutex* MemTable::GetLock(const Slice& key) {
  return &locks_[std::hash<Slice>()(key) % locks_.size()];
}

J
jorlow@chromium.org 已提交
129 130 131 132 133 134 135 136 137 138 139 140 141 142
void MemTable::Add(SequenceNumber s, ValueType type,
                   const Slice& key,
                   const Slice& value) {
  // Format of an entry is concatenation of:
  //  key_size     : varint32 of internal_key.size()
  //  key bytes    : char[internal_key.size()]
  //  value_size   : varint32 of value.size()
  //  value bytes  : char[value.size()]
  size_t key_size = key.size();
  size_t val_size = value.size();
  size_t internal_key_size = key_size + 8;
  const size_t encoded_len =
      VarintLength(internal_key_size) + internal_key_size +
      VarintLength(val_size) + val_size;
X
Xing Jin 已提交
143
  char* buf = arena_impl_.Allocate(encoded_len);
J
jorlow@chromium.org 已提交
144 145 146 147 148 149 150
  char* p = EncodeVarint32(buf, internal_key_size);
  memcpy(p, key.data(), key_size);
  p += key_size;
  EncodeFixed64(p, (s << 8) | type);
  p += 8;
  p = EncodeVarint32(p, val_size);
  memcpy(p, value.data(), val_size);
151
  assert((p + val_size) - buf == (unsigned)encoded_len);
J
Jim Paton 已提交
152
  table_->Insert(buf);
153 154 155 156 157 158

  // The first sequence number inserted into the memtable
  assert(first_seqno_ == 0 || s > first_seqno_);
  if (first_seqno_ == 0) {
    first_seqno_ = s;
  }
J
jorlow@chromium.org 已提交
159 160
}

161
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
162
                   MergeContext& merge_context, const Options& options) {
163
  Slice memkey = key.memtable_key();
164 165
  std::unique_ptr<MemTableRep::Iterator> iter(
      table_->GetIterator(key.user_key()));
J
Jim Paton 已提交
166
  iter->Seek(memkey.data());
167

168
  bool merge_in_progress = s->IsMergeInProgress();
169
  auto merge_operator = options.merge_operator.get();
170
  auto logger = options.info_log;
171 172
  std::string merge_result;

173
  for (; iter->Valid(); iter->Next()) {
174 175
    // entry format is:
    //    klength  varint32
176
    //    userkey  char[klength-8]
177 178 179 180 181 182
    //    tag      uint64
    //    vlength  varint32
    //    value    char[vlength]
    // Check that it belongs to same user key.  We do not check the
    // sequence number since the Seek() call above should have skipped
    // all entries with overly large sequence numbers.
J
Jim Paton 已提交
183
    const char* entry = iter->key();
184
    uint32_t key_length;
185
    const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
186
    if (comparator_.comparator.user_comparator()->Compare(
187
        Slice(key_ptr, key_length - 8), key.user_key()) == 0) {
188 189 190 191
      // Correct user key
      const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
      switch (static_cast<ValueType>(tag & 0xff)) {
        case kTypeValue: {
192 193 194
          if (options.inplace_update_support) {
            GetLock(key.user_key())->ReadLock();
          }
195
          Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
196
          *s = Status::OK();
197
          if (merge_in_progress) {
198
            assert(merge_operator);
199 200 201
          if (!merge_operator->FullMerge(key.user_key(), &v,
                                         merge_context.GetOperands(), value,
                                         logger.get())) {
202
              RecordTick(options.statistics.get(), NUMBER_MERGE_FAILURES);
203 204
              *s = Status::Corruption("Error: Could not perform merge.");
            }
205 206 207
          } else {
            value->assign(v.data(), v.size());
          }
208 209 210
          if (options.inplace_update_support) {
            GetLock(key.user_key())->Unlock();
          }
211
          return true;
212
        }
213 214
        case kTypeDeletion: {
          if (merge_in_progress) {
215 216
            assert(merge_operator);
            *s = Status::OK();
217 218 219
          if (!merge_operator->FullMerge(key.user_key(), nullptr,
                                         merge_context.GetOperands(), value,
                                         logger.get())) {
220
              RecordTick(options.statistics.get(), NUMBER_MERGE_FAILURES);
221 222
              *s = Status::Corruption("Error: Could not perform merge.");
            }
223
          } else {
224
            *s = Status::NotFound();
225
          }
226
          return true;
227
        }
228 229 230
        case kTypeMerge: {
          Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
          merge_in_progress = true;
231 232
          merge_context.PushOperand(v);
          while(merge_context.GetNumOperands() >= 2) {
233
            // Attempt to associative merge. (Returns true if successful)
234 235 236 237 238
          if (merge_operator->PartialMerge(key.user_key(),
                                           merge_context.GetOperand(0),
                                           merge_context.GetOperand(1),
                                           &merge_result, logger.get())) {
              merge_context.PushPartialMergeResult(merge_result);
239 240 241 242 243 244 245
            } else {
              // Stack them because user can't associative merge
              break;
            }
          }
          break;
        }
J
Jim Paton 已提交
246 247 248
        case kTypeLogData:
          assert(false);
          break;
249
      }
250 251 252
    } else {
      // exit loop if user key does not match
      break;
253 254
    }
  }
255

256 257
  // No change to value, since we have not yet found a Put/Delete

258
  if (merge_in_progress) {
259 260
    *s = Status::MergeInProgress("");
  }
261
  return false;
262 263
}

264 265 266 267 268 269
bool MemTable::Update(SequenceNumber seq, ValueType type,
                      const Slice& key,
                      const Slice& value) {
  LookupKey lkey(key, seq);
  Slice memkey = lkey.memtable_key();

270 271
  std::unique_ptr<MemTableRep::Iterator> iter(
      table_->GetIterator(lkey.user_key()));
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
  iter->Seek(memkey.data());

  if (iter->Valid()) {
    // entry format is:
    //    klength  varint32
    //    userkey  char[klength-8]
    //    tag      uint64
    //    vlength  varint32
    //    value    char[vlength]
    // Check that it belongs to same user key.  We do not check the
    // sequence number since the Seek() call above should have skipped
    // all entries with overly large sequence numbers.
    const char* entry = iter->key();
    uint32_t key_length;
    const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
    if (comparator_.comparator.user_comparator()->Compare(
        Slice(key_ptr, key_length - 8), lkey.user_key()) == 0) {
      // Correct user key
      const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
      switch (static_cast<ValueType>(tag & 0xff)) {
        case kTypeValue: {
          uint32_t vlength;
          GetVarint32Ptr(key_ptr + key_length,
                         key_ptr + key_length+5, &vlength);
          // Update value, if newValue size  <= curValue size
          if (value.size() <= vlength) {
            char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
                                     value.size());
            WriteLock wl(GetLock(lkey.user_key()));
            memcpy(p, value.data(), value.size());
            assert(
              (p + value.size()) - entry ==
              (unsigned) (VarintLength(key_length) +
                          key_length +
                          VarintLength(value.size()) +
                          value.size())
            );
            return true;
          }
        }
        default:
          // If the latest value is kTypeDeletion, kTypeMerge or kTypeLogData
          // then we probably don't have enough space to update in-place
          // Maybe do something later
          // Return false, and do normal Add()
          return false;
      }
    }
  }

  // Key doesn't exist
  return false;
}
325 326 327 328 329 330 331

size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) {
  Slice memkey = key.memtable_key();

  // A total ordered iterator is costly for some memtablerep (prefix aware
  // reps). By passing in the user key, we allow efficient iterator creation.
  // The iterator only needs to be ordered within the same user key.
332 333
  std::unique_ptr<MemTableRep::Iterator> iter(
      table_->GetIterator(key.user_key()));
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
  iter->Seek(memkey.data());

  size_t num_successive_merges = 0;

  for (; iter->Valid(); iter->Next()) {
    const char* entry = iter->key();
    uint32_t key_length;
    const char* iter_key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
    if (!comparator_.comparator.user_comparator()->Compare(
        Slice(iter_key_ptr, key_length - 8), key.user_key()) == 0) {
      break;
    }

    const uint64_t tag = DecodeFixed64(iter_key_ptr + key_length - 8);
    if (static_cast<ValueType>(tag & 0xff) != kTypeMerge) {
      break;
    }

    ++num_successive_merges;
  }

  return num_successive_merges;
}

358
}  // namespace rocksdb