write_batch.cc 5.6 KB
Newer Older
J
jorlow@chromium.org 已提交
1 2 3 4 5 6 7 8 9
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// WriteBatch::rep_ :=
//    sequence: fixed64
//    count: fixed32
//    data: record[count]
// record :=
10 11
//    kTypeValue varstring varstring
//    kTypeMerge varstring varstring
J
jorlow@chromium.org 已提交
12 13 14 15 16
//    kTypeDeletion varstring
// varstring :=
//    len: varint32
//    data: uint8[len]

17
#include "leveldb/write_batch.h"
J
jorlow@chromium.org 已提交
18

19
#include "leveldb/statistics.h"
J
jorlow@chromium.org 已提交
20
#include "db/dbformat.h"
21
#include "db/db_impl.h"
J
jorlow@chromium.org 已提交
22 23 24
#include "db/memtable.h"
#include "db/write_batch_internal.h"
#include "util/coding.h"
25
#include <stdexcept>
J
jorlow@chromium.org 已提交
26 27 28

namespace leveldb {

29 30 31
// WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
static const size_t kHeader = 12;

J
jorlow@chromium.org 已提交
32 33 34 35 36 37
WriteBatch::WriteBatch() {
  Clear();
}

WriteBatch::~WriteBatch() { }

38 39
WriteBatch::Handler::~Handler() { }

40 41 42 43
void WriteBatch::Handler::Merge(const Slice& key, const Slice& value) {
  throw std::runtime_error("Handler::Merge not implemented!");
}

J
jorlow@chromium.org 已提交
44 45
void WriteBatch::Clear() {
  rep_.clear();
46
  rep_.resize(kHeader);
J
jorlow@chromium.org 已提交
47 48
}

H
Haobo Xu 已提交
49 50 51 52
int WriteBatch::Count() const {
  return WriteBatchInternal::Count(this);
}

53 54
Status WriteBatch::Iterate(Handler* handler) const {
  Slice input(rep_);
55
  if (input.size() < kHeader) {
56 57 58
    return Status::Corruption("malformed WriteBatch (too small)");
  }

59
  input.remove_prefix(kHeader);
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
  Slice key, value;
  int found = 0;
  while (!input.empty()) {
    found++;
    char tag = input[0];
    input.remove_prefix(1);
    switch (tag) {
      case kTypeValue:
        if (GetLengthPrefixedSlice(&input, &key) &&
            GetLengthPrefixedSlice(&input, &value)) {
          handler->Put(key, value);
        } else {
          return Status::Corruption("bad WriteBatch Put");
        }
        break;
      case kTypeDeletion:
        if (GetLengthPrefixedSlice(&input, &key)) {
          handler->Delete(key);
        } else {
          return Status::Corruption("bad WriteBatch Delete");
        }
        break;
82 83 84 85 86 87 88 89
      case kTypeMerge:
        if (GetLengthPrefixedSlice(&input, &key) &&
            GetLengthPrefixedSlice(&input, &value)) {
          handler->Merge(key, value);
        } else {
          return Status::Corruption("bad WriteBatch Merge");
        }
        break;
90 91 92 93 94 95 96 97 98 99 100
      default:
        return Status::Corruption("unknown WriteBatch tag");
    }
  }
  if (found != WriteBatchInternal::Count(this)) {
    return Status::Corruption("WriteBatch has wrong count");
  } else {
    return Status::OK();
  }
}

J
jorlow@chromium.org 已提交
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
int WriteBatchInternal::Count(const WriteBatch* b) {
  return DecodeFixed32(b->rep_.data() + 8);
}

void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
  EncodeFixed32(&b->rep_[8], n);
}

SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
  return SequenceNumber(DecodeFixed64(b->rep_.data()));
}

void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
  EncodeFixed64(&b->rep_[0], seq);
}

void WriteBatch::Put(const Slice& key, const Slice& value) {
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  rep_.push_back(static_cast<char>(kTypeValue));
  PutLengthPrefixedSlice(&rep_, key);
  PutLengthPrefixedSlice(&rep_, value);
}

void WriteBatch::Delete(const Slice& key) {
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  rep_.push_back(static_cast<char>(kTypeDeletion));
  PutLengthPrefixedSlice(&rep_, key);
}

130 131 132 133 134 135 136 137
void WriteBatch::Merge(const Slice& key, const Slice& value) {
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  rep_.push_back(static_cast<char>(kTypeMerge));
  PutLengthPrefixedSlice(&rep_, key);
  PutLengthPrefixedSlice(&rep_, value);
}


138 139 140 141 142
namespace {
class MemTableInserter : public WriteBatch::Handler {
 public:
  SequenceNumber sequence_;
  MemTable* mem_;
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
  const Options* options_;
  DBImpl* db_;
  const bool filter_deletes_;

  MemTableInserter(SequenceNumber sequence, MemTable* mem, const Options* opts,
                   DB* db, const bool filter_deletes)
    : sequence_(sequence),
      mem_(mem),
      options_(opts),
      db_(reinterpret_cast<DBImpl*>(db)),
      filter_deletes_(filter_deletes) {
    assert(mem_);
    if (filter_deletes_) {
      assert(options_);
      assert(db_);
    }
  }
160 161 162 163

  virtual void Put(const Slice& key, const Slice& value) {
    mem_->Add(sequence_, kTypeValue, key, value);
    sequence_++;
J
jorlow@chromium.org 已提交
164
  }
165 166 167 168
  virtual void Merge(const Slice& key, const Slice& value) {
    mem_->Add(sequence_, kTypeMerge, key, value);
    sequence_++;
  }
169
  virtual void Delete(const Slice& key) {
170 171 172 173
    if (filter_deletes_ && !db_->KeyMayExistImpl(key, sequence_)) {
      RecordTick(options_->statistics, NUMBER_FILTERED_DELETES);
      return;
    }
174 175
    mem_->Add(sequence_, kTypeDeletion, key, Slice());
    sequence_++;
J
jorlow@chromium.org 已提交
176
  }
177
};
H
Hans Wennborg 已提交
178
}  // namespace
179

180 181 182 183 184
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* mem,
                                      const Options* opts, DB* db,
                                      const bool filter_deletes) {
  MemTableInserter inserter(WriteBatchInternal::Sequence(b), mem, opts, db,
                            filter_deletes);
185
  return b->Iterate(&inserter);
J
jorlow@chromium.org 已提交
186 187 188
}

void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
189
  assert(contents.size() >= kHeader);
J
jorlow@chromium.org 已提交
190 191 192
  b->rep_.assign(contents.data(), contents.size());
}

193 194 195 196 197 198
void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
  SetCount(dst, Count(dst) + Count(src));
  assert(src->rep_.size() >= kHeader);
  dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
}

H
Hans Wennborg 已提交
199
}  // namespace leveldb