write_batch.cc 7.2 KB
Newer Older
1 2 3 4 5
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9 10 11 12 13 14
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// WriteBatch::rep_ :=
//    sequence: fixed64
//    count: fixed32
//    data: record[count]
// record :=
15 16
//    kTypeValue varstring varstring
//    kTypeMerge varstring varstring
J
jorlow@chromium.org 已提交
17 18 19 20 21
//    kTypeDeletion varstring
// varstring :=
//    len: varint32
//    data: uint8[len]

22
#include "rocksdb/write_batch.h"
J
jorlow@chromium.org 已提交
23

24 25
#include "rocksdb/options.h"
#include "rocksdb/statistics.h"
J
jorlow@chromium.org 已提交
26
#include "db/dbformat.h"
27
#include "db/db_impl.h"
J
jorlow@chromium.org 已提交
28
#include "db/memtable.h"
29
#include "db/snapshot.h"
J
jorlow@chromium.org 已提交
30 31
#include "db/write_batch_internal.h"
#include "util/coding.h"
32
#include <stdexcept>
J
jorlow@chromium.org 已提交
33

34
namespace rocksdb {
J
jorlow@chromium.org 已提交
35

36 37 38
// WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
static const size_t kHeader = 12;

J
jorlow@chromium.org 已提交
39 40 41 42 43 44
WriteBatch::WriteBatch() {
  Clear();
}

WriteBatch::~WriteBatch() { }

45 46
WriteBatch::Handler::~Handler() { }

47 48 49 50
void WriteBatch::Handler::Merge(const Slice& key, const Slice& value) {
  throw std::runtime_error("Handler::Merge not implemented!");
}

J
Jim Paton 已提交
51 52 53 54 55
void WriteBatch::Handler::LogData(const Slice& blob) {
  // If the user has not specified something to do with blobs, then we ignore
  // them.
}

56 57 58 59
bool WriteBatch::Handler::Continue() {
  return true;
}

J
jorlow@chromium.org 已提交
60 61
void WriteBatch::Clear() {
  rep_.clear();
62
  rep_.resize(kHeader);
J
jorlow@chromium.org 已提交
63 64
}

H
Haobo Xu 已提交
65 66 67 68
int WriteBatch::Count() const {
  return WriteBatchInternal::Count(this);
}

69 70
Status WriteBatch::Iterate(Handler* handler) const {
  Slice input(rep_);
71
  if (input.size() < kHeader) {
72 73 74
    return Status::Corruption("malformed WriteBatch (too small)");
  }

75
  input.remove_prefix(kHeader);
J
Jim Paton 已提交
76
  Slice key, value, blob;
77
  int found = 0;
78
  while (!input.empty() && handler->Continue()) {
79 80 81 82 83 84 85
    char tag = input[0];
    input.remove_prefix(1);
    switch (tag) {
      case kTypeValue:
        if (GetLengthPrefixedSlice(&input, &key) &&
            GetLengthPrefixedSlice(&input, &value)) {
          handler->Put(key, value);
J
Jim Paton 已提交
86
          found++;
87 88 89 90 91 92 93
        } else {
          return Status::Corruption("bad WriteBatch Put");
        }
        break;
      case kTypeDeletion:
        if (GetLengthPrefixedSlice(&input, &key)) {
          handler->Delete(key);
J
Jim Paton 已提交
94
          found++;
95 96 97 98
        } else {
          return Status::Corruption("bad WriteBatch Delete");
        }
        break;
99 100 101 102
      case kTypeMerge:
        if (GetLengthPrefixedSlice(&input, &key) &&
            GetLengthPrefixedSlice(&input, &value)) {
          handler->Merge(key, value);
J
Jim Paton 已提交
103
          found++;
104 105 106 107
        } else {
          return Status::Corruption("bad WriteBatch Merge");
        }
        break;
J
Jim Paton 已提交
108 109 110 111 112 113 114
      case kTypeLogData:
        if (GetLengthPrefixedSlice(&input, &blob)) {
          handler->LogData(blob);
        } else {
          return Status::Corruption("bad WriteBatch Blob");
        }
        break;
115 116 117 118 119 120 121 122 123 124 125
      default:
        return Status::Corruption("unknown WriteBatch tag");
    }
  }
  if (found != WriteBatchInternal::Count(this)) {
    return Status::Corruption("WriteBatch has wrong count");
  } else {
    return Status::OK();
  }
}

J
jorlow@chromium.org 已提交
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
int WriteBatchInternal::Count(const WriteBatch* b) {
  return DecodeFixed32(b->rep_.data() + 8);
}

void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
  EncodeFixed32(&b->rep_[8], n);
}

SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
  return SequenceNumber(DecodeFixed64(b->rep_.data()));
}

void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
  EncodeFixed64(&b->rep_[0], seq);
}

void WriteBatch::Put(const Slice& key, const Slice& value) {
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  rep_.push_back(static_cast<char>(kTypeValue));
  PutLengthPrefixedSlice(&rep_, key);
  PutLengthPrefixedSlice(&rep_, value);
}

149 150 151 152 153 154 155
void WriteBatch::Put(const SliceParts& key, const SliceParts& value) {
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  rep_.push_back(static_cast<char>(kTypeValue));
  PutLengthPrefixedSliceParts(&rep_, key);
  PutLengthPrefixedSliceParts(&rep_, value);
}

J
jorlow@chromium.org 已提交
156 157 158 159 160 161
void WriteBatch::Delete(const Slice& key) {
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  rep_.push_back(static_cast<char>(kTypeDeletion));
  PutLengthPrefixedSlice(&rep_, key);
}

162 163 164 165 166 167 168
void WriteBatch::Merge(const Slice& key, const Slice& value) {
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  rep_.push_back(static_cast<char>(kTypeMerge));
  PutLengthPrefixedSlice(&rep_, key);
  PutLengthPrefixedSlice(&rep_, value);
}

J
Jim Paton 已提交
169 170 171 172
void WriteBatch::PutLogData(const Slice& blob) {
  rep_.push_back(static_cast<char>(kTypeLogData));
  PutLengthPrefixedSlice(&rep_, blob);
}
173

174 175 176 177 178
namespace {
class MemTableInserter : public WriteBatch::Handler {
 public:
  SequenceNumber sequence_;
  MemTable* mem_;
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
  const Options* options_;
  DBImpl* db_;
  const bool filter_deletes_;

  MemTableInserter(SequenceNumber sequence, MemTable* mem, const Options* opts,
                   DB* db, const bool filter_deletes)
    : sequence_(sequence),
      mem_(mem),
      options_(opts),
      db_(reinterpret_cast<DBImpl*>(db)),
      filter_deletes_(filter_deletes) {
    assert(mem_);
    if (filter_deletes_) {
      assert(options_);
      assert(db_);
    }
  }
196 197

  virtual void Put(const Slice& key, const Slice& value) {
198 199 200 201 202 203
    if (options_->inplace_update_support
        && mem_->Update(sequence_, kTypeValue, key, value)) {
      RecordTick(options_->statistics, NUMBER_KEYS_UPDATED);
    } else {
      mem_->Add(sequence_, kTypeValue, key, value);
    }
204
    sequence_++;
J
jorlow@chromium.org 已提交
205
  }
206 207 208 209
  virtual void Merge(const Slice& key, const Slice& value) {
    mem_->Add(sequence_, kTypeMerge, key, value);
    sequence_++;
  }
210
  virtual void Delete(const Slice& key) {
211 212 213 214 215 216 217 218 219 220
    if (filter_deletes_) {
      SnapshotImpl read_from_snapshot;
      read_from_snapshot.number_ = sequence_;
      ReadOptions ropts;
      ropts.snapshot = &read_from_snapshot;
      std::string value;
      if (!db_->KeyMayExist(ropts, key, &value)) {
        RecordTick(options_->statistics, NUMBER_FILTERED_DELETES);
        return;
      }
221
    }
222 223
    mem_->Add(sequence_, kTypeDeletion, key, Slice());
    sequence_++;
J
jorlow@chromium.org 已提交
224
  }
225
};
H
Hans Wennborg 已提交
226
}  // namespace
227

228 229 230 231 232
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* mem,
                                      const Options* opts, DB* db,
                                      const bool filter_deletes) {
  MemTableInserter inserter(WriteBatchInternal::Sequence(b), mem, opts, db,
                            filter_deletes);
233
  return b->Iterate(&inserter);
J
jorlow@chromium.org 已提交
234 235 236
}

void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
237
  assert(contents.size() >= kHeader);
J
jorlow@chromium.org 已提交
238 239 240
  b->rep_.assign(contents.data(), contents.size());
}

241 242 243 244 245 246
void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
  SetCount(dst, Count(dst) + Count(src));
  assert(src->rep_.size() >= kHeader);
  dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
}

247
}  // namespace rocksdb