write_batch.cc 12.6 KB
Newer Older
1 2 3 4 5
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9 10 11 12 13 14
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// WriteBatch::rep_ :=
//    sequence: fixed64
//    count: fixed32
//    data: record[count]
// record :=
15 16
//    kTypeValue varstring varstring
//    kTypeMerge varstring varstring
J
jorlow@chromium.org 已提交
17
//    kTypeDeletion varstring
18 19 20
//    kTypeColumnFamilyValue varint32 varstring varstring
//    kTypeColumnFamilyMerge varint32 varstring varstring
//    kTypeColumnFamilyDeletion varint32 varstring varstring
J
jorlow@chromium.org 已提交
21 22 23 24
// varstring :=
//    len: varint32
//    data: uint8[len]

25 26
#include "rocksdb/write_batch.h"
#include "rocksdb/options.h"
27
#include "rocksdb/merge_operator.h"
J
jorlow@chromium.org 已提交
28
#include "db/dbformat.h"
29
#include "db/db_impl.h"
J
jorlow@chromium.org 已提交
30
#include "db/memtable.h"
31
#include "db/snapshot.h"
J
jorlow@chromium.org 已提交
32 33
#include "db/write_batch_internal.h"
#include "util/coding.h"
I
Igor Canadi 已提交
34
#include "util/statistics.h"
35
#include <stdexcept>
J
jorlow@chromium.org 已提交
36

37
namespace rocksdb {
J
jorlow@chromium.org 已提交
38

39 40 41
// WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
static const size_t kHeader = 12;

42 43
WriteBatch::WriteBatch(size_t reserved_bytes) {
  rep_.reserve((reserved_bytes > kHeader) ? reserved_bytes : kHeader);
J
jorlow@chromium.org 已提交
44 45 46 47 48
  Clear();
}

WriteBatch::~WriteBatch() { }

49 50
WriteBatch::Handler::~Handler() { }

51 52 53 54 55
void WriteBatch::Handler::Put(const Slice& key, const Slice& value) {
  // you need to either implement Put or PutCF
  throw std::runtime_error("Handler::Put not implemented!");
}

56 57 58 59
void WriteBatch::Handler::Merge(const Slice& key, const Slice& value) {
  throw std::runtime_error("Handler::Merge not implemented!");
}

60 61 62 63 64
void WriteBatch::Handler::Delete(const Slice& key) {
  // you need to either implement Delete or DeleteCF
  throw std::runtime_error("Handler::Delete not implemented!");
}

J
Jim Paton 已提交
65 66 67 68 69
void WriteBatch::Handler::LogData(const Slice& blob) {
  // If the user has not specified something to do with blobs, then we ignore
  // them.
}

70 71 72 73
bool WriteBatch::Handler::Continue() {
  return true;
}

J
jorlow@chromium.org 已提交
74 75
void WriteBatch::Clear() {
  rep_.clear();
76
  rep_.resize(kHeader);
J
jorlow@chromium.org 已提交
77 78
}

H
Haobo Xu 已提交
79 80 81 82
int WriteBatch::Count() const {
  return WriteBatchInternal::Count(this);
}

83 84
Status WriteBatch::Iterate(Handler* handler) const {
  Slice input(rep_);
85
  if (input.size() < kHeader) {
86 87 88
    return Status::Corruption("malformed WriteBatch (too small)");
  }

89
  input.remove_prefix(kHeader);
J
Jim Paton 已提交
90
  Slice key, value, blob;
91
  int found = 0;
92
  while (!input.empty() && handler->Continue()) {
93 94
    char tag = input[0];
    input.remove_prefix(1);
95
    uint32_t column_family = 0; // default
96
    switch (tag) {
97 98 99 100 101
      case kTypeColumnFamilyValue:
        if (!GetVarint32(&input, &column_family)) {
          return Status::Corruption("bad WriteBatch Put");
        }
        // intentional fallthrough
102 103 104
      case kTypeValue:
        if (GetLengthPrefixedSlice(&input, &key) &&
            GetLengthPrefixedSlice(&input, &value)) {
105
          handler->PutCF(column_family, key, value);
J
Jim Paton 已提交
106
          found++;
107 108 109 110
        } else {
          return Status::Corruption("bad WriteBatch Put");
        }
        break;
111 112 113 114 115
      case kTypeColumnFamilyDeletion:
        if (!GetVarint32(&input, &column_family)) {
          return Status::Corruption("bad WriteBatch Delete");
        }
        // intentional fallthrough
116 117
      case kTypeDeletion:
        if (GetLengthPrefixedSlice(&input, &key)) {
118
          handler->DeleteCF(column_family, key);
J
Jim Paton 已提交
119
          found++;
120 121 122 123
        } else {
          return Status::Corruption("bad WriteBatch Delete");
        }
        break;
124 125 126 127 128
      case kTypeColumnFamilyMerge:
        if (!GetVarint32(&input, &column_family)) {
          return Status::Corruption("bad WriteBatch Merge");
        }
        // intentional fallthrough
129 130 131
      case kTypeMerge:
        if (GetLengthPrefixedSlice(&input, &key) &&
            GetLengthPrefixedSlice(&input, &value)) {
132
          handler->MergeCF(column_family, key, value);
J
Jim Paton 已提交
133
          found++;
134 135 136 137
        } else {
          return Status::Corruption("bad WriteBatch Merge");
        }
        break;
J
Jim Paton 已提交
138 139 140 141 142 143 144
      case kTypeLogData:
        if (GetLengthPrefixedSlice(&input, &blob)) {
          handler->LogData(blob);
        } else {
          return Status::Corruption("bad WriteBatch Blob");
        }
        break;
145 146 147 148 149 150 151 152 153 154 155
      default:
        return Status::Corruption("unknown WriteBatch tag");
    }
  }
  if (found != WriteBatchInternal::Count(this)) {
    return Status::Corruption("WriteBatch has wrong count");
  } else {
    return Status::OK();
  }
}

J
jorlow@chromium.org 已提交
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
int WriteBatchInternal::Count(const WriteBatch* b) {
  return DecodeFixed32(b->rep_.data() + 8);
}

void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
  EncodeFixed32(&b->rep_[8], n);
}

SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
  return SequenceNumber(DecodeFixed64(b->rep_.data()));
}

void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
  EncodeFixed64(&b->rep_[0], seq);
}

172
void WriteBatch::Put(uint32_t column_family_id, const Slice& key,
173
                     const Slice& value) {
J
jorlow@chromium.org 已提交
174
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
175 176 177 178 179 180 181
  if (column_family_id == 0) {
    // save some data on disk by not writing default column family
    rep_.push_back(static_cast<char>(kTypeValue));
  } else {
    rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
    PutVarint32(&rep_, column_family_id);
  }
J
jorlow@chromium.org 已提交
182 183 184 185
  PutLengthPrefixedSlice(&rep_, key);
  PutLengthPrefixedSlice(&rep_, value);
}

186 187
void WriteBatch::Put(uint32_t column_family_id, const SliceParts& key,
                     const SliceParts& value) {
188
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
189 190 191 192 193 194
  if (column_family_id == 0) {
    rep_.push_back(static_cast<char>(kTypeValue));
  } else {
    rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
    PutVarint32(&rep_, column_family_id);
  }
195 196 197 198
  PutLengthPrefixedSliceParts(&rep_, key);
  PutLengthPrefixedSliceParts(&rep_, value);
}

199
void WriteBatch::Delete(uint32_t column_family_id, const Slice& key) {
J
jorlow@chromium.org 已提交
200
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
201 202 203 204 205 206
  if (column_family_id == 0) {
    rep_.push_back(static_cast<char>(kTypeDeletion));
  } else {
    rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
    PutVarint32(&rep_, column_family_id);
  }
J
jorlow@chromium.org 已提交
207 208 209
  PutLengthPrefixedSlice(&rep_, key);
}

210 211
void WriteBatch::Merge(uint32_t column_family_id, const Slice& key,
                       const Slice& value) {
212
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
213 214 215 216 217 218
  if (column_family_id == 0) {
    rep_.push_back(static_cast<char>(kTypeMerge));
  } else {
    rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
    PutVarint32(&rep_, column_family_id);
  }
219 220 221 222
  PutLengthPrefixedSlice(&rep_, key);
  PutLengthPrefixedSlice(&rep_, value);
}

J
Jim Paton 已提交
223 224 225 226
void WriteBatch::PutLogData(const Slice& blob) {
  rep_.push_back(static_cast<char>(kTypeLogData));
  PutLengthPrefixedSlice(&rep_, blob);
}
227

228 229 230 231 232
namespace {
class MemTableInserter : public WriteBatch::Handler {
 public:
  SequenceNumber sequence_;
  MemTable* mem_;
233
  ColumnFamilyMemTables* cf_mems_;
234 235 236 237 238 239
  const Options* options_;
  DBImpl* db_;
  const bool filter_deletes_;

  MemTableInserter(SequenceNumber sequence, MemTable* mem, const Options* opts,
                   DB* db, const bool filter_deletes)
240 241 242 243 244 245
      : sequence_(sequence),
        mem_(mem),
        cf_mems_(nullptr),
        options_(opts),
        db_(reinterpret_cast<DBImpl*>(db)),
        filter_deletes_(filter_deletes) {
246 247 248 249 250 251
    assert(mem_);
    if (filter_deletes_) {
      assert(options_);
      assert(db_);
    }
  }
252

253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
  MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems,
                   const Options* opts, DB* db, const bool filter_deletes)
      : sequence_(sequence),
        mem_(nullptr),
        cf_mems_(cf_mems),
        options_(opts),
        db_(reinterpret_cast<DBImpl*>(db)),
        filter_deletes_(filter_deletes) {
    assert(cf_mems);
    if (filter_deletes_) {
      assert(options_);
      assert(db_);
    }
  }

  // returns nullptr if the update to the column family is not needed
  MemTable* GetMemTable(uint32_t column_family_id) {
    if (mem_ != nullptr) {
      return (column_family_id == 0) ? mem_ : nullptr;
    } else {
      return cf_mems_->GetMemTable(column_family_id);
    }
  }

277
  virtual void PutCF(uint32_t column_family_id, const Slice& key,
278
                     const Slice& value) {
279 280 281 282 283 284
    MemTable* mem = GetMemTable(column_family_id);
    if (mem == nullptr) {
      return;
    }
    if (options_->inplace_update_support &&
        mem->Update(sequence_, kTypeValue, key, value)) {
285
      RecordTick(options_->statistics.get(), NUMBER_KEYS_UPDATED);
286
    } else {
287
      mem->Add(sequence_, kTypeValue, key, value);
288
    }
289
    sequence_++;
J
jorlow@chromium.org 已提交
290
  }
291 292
  virtual void MergeCF(uint32_t column_family_id, const Slice& key,
                       const Slice& value) {
293 294 295 296
    MemTable* mem = GetMemTable(column_family_id);
    if (mem == nullptr) {
      return;
    }
297 298 299 300 301 302 303
    bool perform_merge = false;

    if (options_->max_successive_merges > 0 && db_ != nullptr) {
      LookupKey lkey(key, sequence_);

      // Count the number of successive merges at the head
      // of the key in the memtable
304
      size_t num_merges = mem->CountSuccessiveMergeEntries(lkey);
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343

      if (num_merges >= options_->max_successive_merges) {
        perform_merge = true;
      }
    }

    if (perform_merge) {
      // 1) Get the existing value
      std::string get_value;

      // Pass in the sequence number so that we also include previous merge
      // operations in the same batch.
      SnapshotImpl read_from_snapshot;
      read_from_snapshot.number_ = sequence_;
      ReadOptions read_options;
      read_options.snapshot = &read_from_snapshot;

      db_->Get(read_options, key, &get_value);
      Slice get_value_slice = Slice(get_value);

      // 2) Apply this merge
      auto merge_operator = options_->merge_operator.get();
      assert(merge_operator);

      std::deque<std::string> operands;
      operands.push_front(value.ToString());
      std::string new_value;
      if (!merge_operator->FullMerge(key,
                                     &get_value_slice,
                                     operands,
                                     &new_value,
                                     options_->info_log.get())) {
          // Failed to merge!
          RecordTick(options_->statistics.get(), NUMBER_MERGE_FAILURES);

          // Store the delta in memtable
          perform_merge = false;
      } else {
        // 3) Add value to memtable
344
        mem->Add(sequence_, kTypeValue, key, new_value);
345 346 347 348 349
      }
    }

    if (!perform_merge) {
      // Add merge operator to memtable
350
      mem->Add(sequence_, kTypeMerge, key, value);
351 352
    }

353 354
    sequence_++;
  }
355
  virtual void DeleteCF(uint32_t column_family_id, const Slice& key) {
356 357 358 359
    MemTable* mem = GetMemTable(column_family_id);
    if (mem == nullptr) {
      return;
    }
360 361 362 363 364 365 366
    if (filter_deletes_) {
      SnapshotImpl read_from_snapshot;
      read_from_snapshot.number_ = sequence_;
      ReadOptions ropts;
      ropts.snapshot = &read_from_snapshot;
      std::string value;
      if (!db_->KeyMayExist(ropts, key, &value)) {
367
        RecordTick(options_->statistics.get(), NUMBER_FILTERED_DELETES);
368 369
        return;
      }
370
    }
371
    mem->Add(sequence_, kTypeDeletion, key, Slice());
372
    sequence_++;
J
jorlow@chromium.org 已提交
373
  }
374
};
H
Hans Wennborg 已提交
375
}  // namespace
376

377 378 379 380 381
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* mem,
                                      const Options* opts, DB* db,
                                      const bool filter_deletes) {
  MemTableInserter inserter(WriteBatchInternal::Sequence(b), mem, opts, db,
                            filter_deletes);
382
  return b->Iterate(&inserter);
J
jorlow@chromium.org 已提交
383 384
}

385 386 387 388 389 390 391 392 393
Status WriteBatchInternal::InsertInto(const WriteBatch* b,
                                      ColumnFamilyMemTables* memtables,
                                      const Options* opts, DB* db,
                                      const bool filter_deletes) {
  MemTableInserter inserter(WriteBatchInternal::Sequence(b), memtables, opts,
                            db, filter_deletes);
  return b->Iterate(&inserter);
}

J
jorlow@chromium.org 已提交
394
void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
395
  assert(contents.size() >= kHeader);
J
jorlow@chromium.org 已提交
396 397 398
  b->rep_.assign(contents.data(), contents.size());
}

399 400 401 402 403 404
void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
  SetCount(dst, Count(dst) + Count(src));
  assert(src->rep_.size() >= kHeader);
  dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
}

405
}  // namespace rocksdb