write_batch.cc 7.2 KB
Newer Older
1 2 3 4 5
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9 10 11 12 13 14
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// WriteBatch::rep_ :=
//    sequence: fixed64
//    count: fixed32
//    data: record[count]
// record :=
15 16
//    kTypeValue varstring varstring
//    kTypeMerge varstring varstring
J
jorlow@chromium.org 已提交
17 18 19 20 21
//    kTypeDeletion varstring
// varstring :=
//    len: varint32
//    data: uint8[len]

22 23
#include "rocksdb/write_batch.h"
#include "rocksdb/options.h"
J
jorlow@chromium.org 已提交
24
#include "db/dbformat.h"
25
#include "db/db_impl.h"
J
jorlow@chromium.org 已提交
26
#include "db/memtable.h"
27
#include "db/snapshot.h"
J
jorlow@chromium.org 已提交
28 29
#include "db/write_batch_internal.h"
#include "util/coding.h"
30
#include "util/statistics_imp.h"
31
#include <stdexcept>
J
jorlow@chromium.org 已提交
32

33
namespace rocksdb {
J
jorlow@chromium.org 已提交
34

35 36 37
// WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
static const size_t kHeader = 12;

J
jorlow@chromium.org 已提交
38 39 40 41 42 43
WriteBatch::WriteBatch() {
  Clear();
}

WriteBatch::~WriteBatch() { }

44 45
WriteBatch::Handler::~Handler() { }

46 47 48 49
void WriteBatch::Handler::Merge(const Slice& key, const Slice& value) {
  throw std::runtime_error("Handler::Merge not implemented!");
}

J
Jim Paton 已提交
50 51 52 53 54
void WriteBatch::Handler::LogData(const Slice& blob) {
  // If the user has not specified something to do with blobs, then we ignore
  // them.
}

55 56 57 58
bool WriteBatch::Handler::Continue() {
  return true;
}

J
jorlow@chromium.org 已提交
59 60
void WriteBatch::Clear() {
  rep_.clear();
61
  rep_.resize(kHeader);
J
jorlow@chromium.org 已提交
62 63
}

H
Haobo Xu 已提交
64 65 66 67
int WriteBatch::Count() const {
  return WriteBatchInternal::Count(this);
}

68 69
Status WriteBatch::Iterate(Handler* handler) const {
  Slice input(rep_);
70
  if (input.size() < kHeader) {
71 72 73
    return Status::Corruption("malformed WriteBatch (too small)");
  }

74
  input.remove_prefix(kHeader);
J
Jim Paton 已提交
75
  Slice key, value, blob;
76
  int found = 0;
77
  while (!input.empty() && handler->Continue()) {
78 79 80 81 82 83 84
    char tag = input[0];
    input.remove_prefix(1);
    switch (tag) {
      case kTypeValue:
        if (GetLengthPrefixedSlice(&input, &key) &&
            GetLengthPrefixedSlice(&input, &value)) {
          handler->Put(key, value);
J
Jim Paton 已提交
85
          found++;
86 87 88 89 90 91 92
        } else {
          return Status::Corruption("bad WriteBatch Put");
        }
        break;
      case kTypeDeletion:
        if (GetLengthPrefixedSlice(&input, &key)) {
          handler->Delete(key);
J
Jim Paton 已提交
93
          found++;
94 95 96 97
        } else {
          return Status::Corruption("bad WriteBatch Delete");
        }
        break;
98 99 100 101
      case kTypeMerge:
        if (GetLengthPrefixedSlice(&input, &key) &&
            GetLengthPrefixedSlice(&input, &value)) {
          handler->Merge(key, value);
J
Jim Paton 已提交
102
          found++;
103 104 105 106
        } else {
          return Status::Corruption("bad WriteBatch Merge");
        }
        break;
J
Jim Paton 已提交
107 108 109 110 111 112 113
      case kTypeLogData:
        if (GetLengthPrefixedSlice(&input, &blob)) {
          handler->LogData(blob);
        } else {
          return Status::Corruption("bad WriteBatch Blob");
        }
        break;
114 115 116 117 118 119 120 121 122 123 124
      default:
        return Status::Corruption("unknown WriteBatch tag");
    }
  }
  if (found != WriteBatchInternal::Count(this)) {
    return Status::Corruption("WriteBatch has wrong count");
  } else {
    return Status::OK();
  }
}

J
jorlow@chromium.org 已提交
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
int WriteBatchInternal::Count(const WriteBatch* b) {
  return DecodeFixed32(b->rep_.data() + 8);
}

void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
  EncodeFixed32(&b->rep_[8], n);
}

SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
  return SequenceNumber(DecodeFixed64(b->rep_.data()));
}

void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
  EncodeFixed64(&b->rep_[0], seq);
}

void WriteBatch::Put(const Slice& key, const Slice& value) {
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  rep_.push_back(static_cast<char>(kTypeValue));
  PutLengthPrefixedSlice(&rep_, key);
  PutLengthPrefixedSlice(&rep_, value);
}

148 149 150 151 152 153 154
void WriteBatch::Put(const SliceParts& key, const SliceParts& value) {
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  rep_.push_back(static_cast<char>(kTypeValue));
  PutLengthPrefixedSliceParts(&rep_, key);
  PutLengthPrefixedSliceParts(&rep_, value);
}

J
jorlow@chromium.org 已提交
155 156 157 158 159 160
void WriteBatch::Delete(const Slice& key) {
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  rep_.push_back(static_cast<char>(kTypeDeletion));
  PutLengthPrefixedSlice(&rep_, key);
}

161 162 163 164 165 166 167
void WriteBatch::Merge(const Slice& key, const Slice& value) {
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  rep_.push_back(static_cast<char>(kTypeMerge));
  PutLengthPrefixedSlice(&rep_, key);
  PutLengthPrefixedSlice(&rep_, value);
}

J
Jim Paton 已提交
168 169 170 171
void WriteBatch::PutLogData(const Slice& blob) {
  rep_.push_back(static_cast<char>(kTypeLogData));
  PutLengthPrefixedSlice(&rep_, blob);
}
172

173 174 175 176 177
namespace {
class MemTableInserter : public WriteBatch::Handler {
 public:
  SequenceNumber sequence_;
  MemTable* mem_;
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
  const Options* options_;
  DBImpl* db_;
  const bool filter_deletes_;

  MemTableInserter(SequenceNumber sequence, MemTable* mem, const Options* opts,
                   DB* db, const bool filter_deletes)
    : sequence_(sequence),
      mem_(mem),
      options_(opts),
      db_(reinterpret_cast<DBImpl*>(db)),
      filter_deletes_(filter_deletes) {
    assert(mem_);
    if (filter_deletes_) {
      assert(options_);
      assert(db_);
    }
  }
195 196

  virtual void Put(const Slice& key, const Slice& value) {
197 198
    if (options_->inplace_update_support
        && mem_->Update(sequence_, kTypeValue, key, value)) {
199
      RecordTick(options_->statistics.get(), NUMBER_KEYS_UPDATED);
200 201 202
    } else {
      mem_->Add(sequence_, kTypeValue, key, value);
    }
203
    sequence_++;
J
jorlow@chromium.org 已提交
204
  }
205 206 207 208
  virtual void Merge(const Slice& key, const Slice& value) {
    mem_->Add(sequence_, kTypeMerge, key, value);
    sequence_++;
  }
209
  virtual void Delete(const Slice& key) {
210 211 212 213 214 215 216
    if (filter_deletes_) {
      SnapshotImpl read_from_snapshot;
      read_from_snapshot.number_ = sequence_;
      ReadOptions ropts;
      ropts.snapshot = &read_from_snapshot;
      std::string value;
      if (!db_->KeyMayExist(ropts, key, &value)) {
217
        RecordTick(options_->statistics.get(), NUMBER_FILTERED_DELETES);
218 219
        return;
      }
220
    }
221 222
    mem_->Add(sequence_, kTypeDeletion, key, Slice());
    sequence_++;
J
jorlow@chromium.org 已提交
223
  }
224
};
H
Hans Wennborg 已提交
225
}  // namespace
226

227 228 229 230 231
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* mem,
                                      const Options* opts, DB* db,
                                      const bool filter_deletes) {
  MemTableInserter inserter(WriteBatchInternal::Sequence(b), mem, opts, db,
                            filter_deletes);
232
  return b->Iterate(&inserter);
J
jorlow@chromium.org 已提交
233 234 235
}

void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
236
  assert(contents.size() >= kHeader);
J
jorlow@chromium.org 已提交
237 238 239
  b->rep_.assign(contents.data(), contents.size());
}

240 241 242 243 244 245
void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
  SetCount(dst, Count(dst) + Count(src));
  assert(src->rep_.size() >= kHeader);
  dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
}

246
}  // namespace rocksdb