memtable.cc 11.2 KB
Newer Older
1 2 3 4 5
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9 10
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/memtable.h"
J
Jim Paton 已提交
11 12 13

#include <memory>

J
jorlow@chromium.org 已提交
14
#include "db/dbformat.h"
15 16 17 18
#include "rocksdb/comparator.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
J
jorlow@chromium.org 已提交
19
#include "util/coding.h"
20
#include "util/mutexlock.h"
J
Jim Paton 已提交
21
#include "util/murmurhash.h"
J
jorlow@chromium.org 已提交
22

23 24 25 26 27 28 29 30 31
namespace std {
template <>
struct hash<rocksdb::Slice> {
  size_t operator()(const rocksdb::Slice& slice) const {
    return MurmurHash(slice.data(), slice.size(), 0);
  }
};
}

32
namespace rocksdb {
J
jorlow@chromium.org 已提交
33

J
Jim Paton 已提交
34 35
MemTable::MemTable(const InternalKeyComparator& cmp,
                   std::shared_ptr<MemTableRepFactory> table_factory,
X
Xing Jin 已提交
36 37
                   int numlevel,
                   const Options& options)
J
jorlow@chromium.org 已提交
38
    : comparator_(cmp),
39
      refs_(0),
X
Xing Jin 已提交
40 41
      arena_impl_(options.arena_block_size),
      table_(table_factory->CreateMemTableRep(comparator_, &arena_impl_)),
42 43
      flush_in_progress_(false),
      flush_completed_(false),
A
Abhishek Kona 已提交
44
      file_number_(0),
45
      edit_(numlevel),
46
      first_seqno_(0),
47
      mem_next_logfile_number_(0),
48 49 50 51
      mem_logfile_number_(0),
      locks_(options.inplace_update_support
             ? options.inplace_update_num_locks
             : 0) { }
J
jorlow@chromium.org 已提交
52 53

MemTable::~MemTable() {
54
  assert(refs_ == 0);
J
jorlow@chromium.org 已提交
55 56
}

J
Jim Paton 已提交
57
size_t MemTable::ApproximateMemoryUsage() {
J
Jim Paton 已提交
58 59
  return arena_impl_.ApproximateMemoryUsage() +
    table_->ApproximateMemoryUsage();
J
Jim Paton 已提交
60
}
J
jorlow@chromium.org 已提交
61 62 63 64 65 66 67 68 69

int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr)
    const {
  // Internal keys are encoded as length-prefixed strings.
  Slice a = GetLengthPrefixedSlice(aptr);
  Slice b = GetLengthPrefixedSlice(bptr);
  return comparator.Compare(a, b);
}

J
Jim Paton 已提交
70 71 72 73 74
Slice MemTableRep::UserKey(const char* key) const {
  Slice slice = GetLengthPrefixedSlice(key);
  return Slice(slice.data(), slice.size() - 8);
}

J
jorlow@chromium.org 已提交
75 76 77 78 79 80 81 82 83 84 85 86
// Encode a suitable internal key target for "target" and return it.
// Uses *scratch as scratch space, and the returned pointer will point
// into this scratch space.
static const char* EncodeKey(std::string* scratch, const Slice& target) {
  scratch->clear();
  PutVarint32(scratch, target.size());
  scratch->append(target.data(), target.size());
  return scratch->data();
}

class MemTableIterator: public Iterator {
 public:
87 88 89 90 91 92 93 94 95 96
  MemTableIterator(MemTableRep* table, const ReadOptions& options)
    : iter_() {
    if (options.prefix) {
      iter_ = table->GetPrefixIterator(*options.prefix);
    } else if (options.prefix_seek) {
      iter_ = table->GetDynamicPrefixIterator();
    } else {
      iter_ = table->GetIterator();
    }
  }
J
Jim Paton 已提交
97

J
Jim Paton 已提交
98 99 100 101 102 103 104 105 106
  virtual bool Valid() const { return iter_->Valid(); }
  virtual void Seek(const Slice& k) { iter_->Seek(EncodeKey(&tmp_, k)); }
  virtual void SeekToFirst() { iter_->SeekToFirst(); }
  virtual void SeekToLast() { iter_->SeekToLast(); }
  virtual void Next() { iter_->Next(); }
  virtual void Prev() { iter_->Prev(); }
  virtual Slice key() const {
    return GetLengthPrefixedSlice(iter_->key());
  }
J
jorlow@chromium.org 已提交
107
  virtual Slice value() const {
J
Jim Paton 已提交
108
    Slice key_slice = GetLengthPrefixedSlice(iter_->key());
J
jorlow@chromium.org 已提交
109 110 111 112 113 114
    return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
  }

  virtual Status status() const { return Status::OK(); }

 private:
J
Jim Paton 已提交
115
  std::shared_ptr<MemTableRep::Iterator> iter_;
J
jorlow@chromium.org 已提交
116 117 118 119 120 121 122
  std::string tmp_;       // For passing to EncodeKey

  // No copying allowed
  MemTableIterator(const MemTableIterator&);
  void operator=(const MemTableIterator&);
};

123 124
Iterator* MemTable::NewIterator(const ReadOptions& options) {
  return new MemTableIterator(table_.get(), options);
J
jorlow@chromium.org 已提交
125 126
}

127 128 129 130
port::RWMutex* MemTable::GetLock(const Slice& key) {
  return &locks_[std::hash<Slice>()(key) % locks_.size()];
}

J
jorlow@chromium.org 已提交
131 132 133 134 135 136 137 138 139 140 141 142 143 144
void MemTable::Add(SequenceNumber s, ValueType type,
                   const Slice& key,
                   const Slice& value) {
  // Format of an entry is concatenation of:
  //  key_size     : varint32 of internal_key.size()
  //  key bytes    : char[internal_key.size()]
  //  value_size   : varint32 of value.size()
  //  value bytes  : char[value.size()]
  size_t key_size = key.size();
  size_t val_size = value.size();
  size_t internal_key_size = key_size + 8;
  const size_t encoded_len =
      VarintLength(internal_key_size) + internal_key_size +
      VarintLength(val_size) + val_size;
X
Xing Jin 已提交
145
  char* buf = arena_impl_.Allocate(encoded_len);
J
jorlow@chromium.org 已提交
146 147 148 149 150 151 152
  char* p = EncodeVarint32(buf, internal_key_size);
  memcpy(p, key.data(), key_size);
  p += key_size;
  EncodeFixed64(p, (s << 8) | type);
  p += 8;
  p = EncodeVarint32(p, val_size);
  memcpy(p, value.data(), val_size);
153
  assert((p + val_size) - buf == (unsigned)encoded_len);
J
Jim Paton 已提交
154
  table_->Insert(buf);
155 156 157 158 159 160

  // The first sequence number inserted into the memtable
  assert(first_seqno_ == 0 || s > first_seqno_);
  if (first_seqno_ == 0) {
    first_seqno_ = s;
  }
J
jorlow@chromium.org 已提交
161 162
}

163
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
164
                   std::deque<std::string>* operands, const Options& options) {
165
  Slice memkey = key.memtable_key();
J
Jim Paton 已提交
166 167
  std::shared_ptr<MemTableRep::Iterator> iter(
    table_->GetIterator(key.user_key()));
J
Jim Paton 已提交
168
  iter->Seek(memkey.data());
169

170 171
  // It is the caller's responsibility to allocate/delete operands list
  assert(operands != nullptr);
172

173
  bool merge_in_progress = s->IsMergeInProgress();
174
  auto merge_operator = options.merge_operator.get();
175
  auto logger = options.info_log;
176 177
  std::string merge_result;

178
  for (; iter->Valid(); iter->Next()) {
179 180
    // entry format is:
    //    klength  varint32
181
    //    userkey  char[klength-8]
182 183 184 185 186 187
    //    tag      uint64
    //    vlength  varint32
    //    value    char[vlength]
    // Check that it belongs to same user key.  We do not check the
    // sequence number since the Seek() call above should have skipped
    // all entries with overly large sequence numbers.
J
Jim Paton 已提交
188
    const char* entry = iter->key();
189
    uint32_t key_length;
190
    const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
191
    if (comparator_.comparator.user_comparator()->Compare(
192
        Slice(key_ptr, key_length - 8), key.user_key()) == 0) {
193 194 195 196
      // Correct user key
      const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
      switch (static_cast<ValueType>(tag & 0xff)) {
        case kTypeValue: {
197 198 199
          if (options.inplace_update_support) {
            GetLock(key.user_key())->ReadLock();
          }
200
          Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
201
          *s = Status::OK();
202
          if (merge_in_progress) {
203
            assert(merge_operator);
D
Deon Nicholas 已提交
204 205
            if (!merge_operator->FullMerge(key.user_key(), &v, *operands,
                                           value, logger.get())) {
M
Mayank Agarwal 已提交
206
              RecordTick(options.statistics, NUMBER_MERGE_FAILURES);
207 208
              *s = Status::Corruption("Error: Could not perform merge.");
            }
209 210 211
          } else {
            value->assign(v.data(), v.size());
          }
212 213 214
          if (options.inplace_update_support) {
            GetLock(key.user_key())->Unlock();
          }
215
          return true;
216
        }
217 218
        case kTypeDeletion: {
          if (merge_in_progress) {
219 220
            assert(merge_operator);
            *s = Status::OK();
D
Deon Nicholas 已提交
221 222
            if (!merge_operator->FullMerge(key.user_key(), nullptr, *operands,
                                           value, logger.get())) {
M
Mayank Agarwal 已提交
223
              RecordTick(options.statistics, NUMBER_MERGE_FAILURES);
224 225
              *s = Status::Corruption("Error: Could not perform merge.");
            }
226 227 228
          } else {
            *s = Status::NotFound(Slice());
          }
229
          return true;
230
        }
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
        case kTypeMerge: {
          Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
          merge_in_progress = true;
          operands->push_front(v.ToString());
          while(operands->size() >= 2) {
            // Attempt to associative merge. (Returns true if successful)
            if (merge_operator->PartialMerge(key.user_key(),
                                             Slice((*operands)[0]),
                                             Slice((*operands)[1]),
                                             &merge_result,
                                             logger.get())) {
              operands->pop_front();
              swap(operands->front(), merge_result);
            } else {
              // Stack them because user can't associative merge
              break;
            }
          }
          break;
        }
J
Jim Paton 已提交
251 252 253
        case kTypeLogData:
          assert(false);
          break;
254
      }
255 256 257
    } else {
      // exit loop if user key does not match
      break;
258 259
    }
  }
260

261 262
  // No change to value, since we have not yet found a Put/Delete

263
  if (merge_in_progress) {
264 265
    *s = Status::MergeInProgress("");
  }
266
  return false;
267 268
}

269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
bool MemTable::Update(SequenceNumber seq, ValueType type,
                      const Slice& key,
                      const Slice& value) {
  LookupKey lkey(key, seq);
  Slice memkey = lkey.memtable_key();

  std::shared_ptr<MemTableRep::Iterator> iter(
    table_.get()->GetIterator(lkey.user_key()));
  iter->Seek(memkey.data());

  if (iter->Valid()) {
    // entry format is:
    //    klength  varint32
    //    userkey  char[klength-8]
    //    tag      uint64
    //    vlength  varint32
    //    value    char[vlength]
    // Check that it belongs to same user key.  We do not check the
    // sequence number since the Seek() call above should have skipped
    // all entries with overly large sequence numbers.
    const char* entry = iter->key();
    uint32_t key_length;
    const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
    if (comparator_.comparator.user_comparator()->Compare(
        Slice(key_ptr, key_length - 8), lkey.user_key()) == 0) {
      // Correct user key
      const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
      switch (static_cast<ValueType>(tag & 0xff)) {
        case kTypeValue: {
          uint32_t vlength;
          GetVarint32Ptr(key_ptr + key_length,
                         key_ptr + key_length+5, &vlength);
          // Update value, if newValue size  <= curValue size
          if (value.size() <= vlength) {
            char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
                                     value.size());
            WriteLock wl(GetLock(lkey.user_key()));
            memcpy(p, value.data(), value.size());
            assert(
              (p + value.size()) - entry ==
              (unsigned) (VarintLength(key_length) +
                          key_length +
                          VarintLength(value.size()) +
                          value.size())
            );
            return true;
          }
        }
        default:
          // If the latest value is kTypeDeletion, kTypeMerge or kTypeLogData
          // then we probably don't have enough space to update in-place
          // Maybe do something later
          // Return false, and do normal Add()
          return false;
      }
    }
  }

  // Key doesn't exist
  return false;
}
330
}  // namespace rocksdb