write_batch.cc 8.0 KB
Newer Older
1 2 3 4 5
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9 10 11 12 13 14
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// WriteBatch::rep_ :=
//    sequence: fixed64
//    count: fixed32
//    data: record[count]
// record :=
15 16
//    kTypeValue varstring varstring
//    kTypeMerge varstring varstring
J
jorlow@chromium.org 已提交
17 18 19 20 21
//    kTypeDeletion varstring
// varstring :=
//    len: varint32
//    data: uint8[len]

22 23
#include "rocksdb/write_batch.h"
#include "rocksdb/options.h"
J
jorlow@chromium.org 已提交
24
#include "db/dbformat.h"
25
#include "db/db_impl.h"
J
jorlow@chromium.org 已提交
26
#include "db/memtable.h"
27
#include "db/snapshot.h"
J
jorlow@chromium.org 已提交
28 29
#include "db/write_batch_internal.h"
#include "util/coding.h"
30
#include "util/statistics_imp.h"
31
#include <stdexcept>
J
jorlow@chromium.org 已提交
32

33
namespace rocksdb {
J
jorlow@chromium.org 已提交
34

35 36 37
// WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
static const size_t kHeader = 12;

J
jorlow@chromium.org 已提交
38 39 40 41 42 43
WriteBatch::WriteBatch() {
  Clear();
}

WriteBatch::~WriteBatch() { }

44 45
WriteBatch::Handler::~Handler() { }

46 47 48 49 50
void WriteBatch::Handler::Put(const Slice& key, const Slice& value) {
  // you need to either implement Put or PutCF
  throw std::runtime_error("Handler::Put not implemented!");
}

51 52 53 54
void WriteBatch::Handler::Merge(const Slice& key, const Slice& value) {
  throw std::runtime_error("Handler::Merge not implemented!");
}

55 56 57 58 59
void WriteBatch::Handler::Delete(const Slice& key) {
  // you need to either implement Delete or DeleteCF
  throw std::runtime_error("Handler::Delete not implemented!");
}

J
Jim Paton 已提交
60 61 62 63 64
void WriteBatch::Handler::LogData(const Slice& blob) {
  // If the user has not specified something to do with blobs, then we ignore
  // them.
}

65 66 67 68
bool WriteBatch::Handler::Continue() {
  return true;
}

J
jorlow@chromium.org 已提交
69 70
void WriteBatch::Clear() {
  rep_.clear();
71
  rep_.resize(kHeader);
J
jorlow@chromium.org 已提交
72 73
}

H
Haobo Xu 已提交
74 75 76 77
int WriteBatch::Count() const {
  return WriteBatchInternal::Count(this);
}

78 79
Status WriteBatch::Iterate(Handler* handler) const {
  Slice input(rep_);
80
  if (input.size() < kHeader) {
81 82 83
    return Status::Corruption("malformed WriteBatch (too small)");
  }

84
  input.remove_prefix(kHeader);
J
Jim Paton 已提交
85
  Slice key, value, blob;
86
  int found = 0;
87
  while (!input.empty() && handler->Continue()) {
88 89 90 91 92 93
    char tag = input[0];
    input.remove_prefix(1);
    switch (tag) {
      case kTypeValue:
        if (GetLengthPrefixedSlice(&input, &key) &&
            GetLengthPrefixedSlice(&input, &value)) {
94
          handler->PutCF(default_column_family, key, value);
J
Jim Paton 已提交
95
          found++;
96 97 98 99 100 101
        } else {
          return Status::Corruption("bad WriteBatch Put");
        }
        break;
      case kTypeDeletion:
        if (GetLengthPrefixedSlice(&input, &key)) {
102
          handler->DeleteCF(default_column_family, key);
J
Jim Paton 已提交
103
          found++;
104 105 106 107
        } else {
          return Status::Corruption("bad WriteBatch Delete");
        }
        break;
108 109 110
      case kTypeMerge:
        if (GetLengthPrefixedSlice(&input, &key) &&
            GetLengthPrefixedSlice(&input, &value)) {
111
          handler->MergeCF(default_column_family, key, value);
J
Jim Paton 已提交
112
          found++;
113 114 115 116
        } else {
          return Status::Corruption("bad WriteBatch Merge");
        }
        break;
J
Jim Paton 已提交
117 118 119 120 121 122 123
      case kTypeLogData:
        if (GetLengthPrefixedSlice(&input, &blob)) {
          handler->LogData(blob);
        } else {
          return Status::Corruption("bad WriteBatch Blob");
        }
        break;
124 125 126 127 128 129 130 131 132 133 134
      default:
        return Status::Corruption("unknown WriteBatch tag");
    }
  }
  if (found != WriteBatchInternal::Count(this)) {
    return Status::Corruption("WriteBatch has wrong count");
  } else {
    return Status::OK();
  }
}

J
jorlow@chromium.org 已提交
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
int WriteBatchInternal::Count(const WriteBatch* b) {
  return DecodeFixed32(b->rep_.data() + 8);
}

void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
  EncodeFixed32(&b->rep_[8], n);
}

SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
  return SequenceNumber(DecodeFixed64(b->rep_.data()));
}

void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
  EncodeFixed64(&b->rep_[0], seq);
}

151 152
void WriteBatch::Put(const ColumnFamilyHandle& column_family, const Slice& key,
                     const Slice& value) {
J
jorlow@chromium.org 已提交
153 154 155 156 157 158
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  rep_.push_back(static_cast<char>(kTypeValue));
  PutLengthPrefixedSlice(&rep_, key);
  PutLengthPrefixedSlice(&rep_, value);
}

159 160
void WriteBatch::Put(const ColumnFamilyHandle& column_family,
                     const SliceParts& key, const SliceParts& value) {
161 162 163 164 165 166
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  rep_.push_back(static_cast<char>(kTypeValue));
  PutLengthPrefixedSliceParts(&rep_, key);
  PutLengthPrefixedSliceParts(&rep_, value);
}

167 168
void WriteBatch::Delete(const ColumnFamilyHandle& column_family,
                        const Slice& key) {
J
jorlow@chromium.org 已提交
169 170 171 172 173
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  rep_.push_back(static_cast<char>(kTypeDeletion));
  PutLengthPrefixedSlice(&rep_, key);
}

174 175
void WriteBatch::Merge(const ColumnFamilyHandle& column_family,
                       const Slice& key, const Slice& value) {
176 177 178 179 180 181
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  rep_.push_back(static_cast<char>(kTypeMerge));
  PutLengthPrefixedSlice(&rep_, key);
  PutLengthPrefixedSlice(&rep_, value);
}

J
Jim Paton 已提交
182 183 184 185
void WriteBatch::PutLogData(const Slice& blob) {
  rep_.push_back(static_cast<char>(kTypeLogData));
  PutLengthPrefixedSlice(&rep_, blob);
}
186

187 188 189 190 191
namespace {
class MemTableInserter : public WriteBatch::Handler {
 public:
  SequenceNumber sequence_;
  MemTable* mem_;
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
  const Options* options_;
  DBImpl* db_;
  const bool filter_deletes_;

  MemTableInserter(SequenceNumber sequence, MemTable* mem, const Options* opts,
                   DB* db, const bool filter_deletes)
    : sequence_(sequence),
      mem_(mem),
      options_(opts),
      db_(reinterpret_cast<DBImpl*>(db)),
      filter_deletes_(filter_deletes) {
    assert(mem_);
    if (filter_deletes_) {
      assert(options_);
      assert(db_);
    }
  }
209

210 211
  virtual void PutCF(const ColumnFamilyHandle& column_family, const Slice& key,
                     const Slice& value) {
212 213
    if (options_->inplace_update_support
        && mem_->Update(sequence_, kTypeValue, key, value)) {
214
      RecordTick(options_->statistics.get(), NUMBER_KEYS_UPDATED);
215 216 217
    } else {
      mem_->Add(sequence_, kTypeValue, key, value);
    }
218
    sequence_++;
J
jorlow@chromium.org 已提交
219
  }
220 221
  virtual void MergeCF(const ColumnFamilyHandle& column_family,
                       const Slice& key, const Slice& value) {
222 223 224
    mem_->Add(sequence_, kTypeMerge, key, value);
    sequence_++;
  }
225 226
  virtual void DeleteCF(const ColumnFamilyHandle& column_family,
                        const Slice& key) {
227 228 229 230 231 232 233
    if (filter_deletes_) {
      SnapshotImpl read_from_snapshot;
      read_from_snapshot.number_ = sequence_;
      ReadOptions ropts;
      ropts.snapshot = &read_from_snapshot;
      std::string value;
      if (!db_->KeyMayExist(ropts, key, &value)) {
234
        RecordTick(options_->statistics.get(), NUMBER_FILTERED_DELETES);
235 236
        return;
      }
237
    }
238 239
    mem_->Add(sequence_, kTypeDeletion, key, Slice());
    sequence_++;
J
jorlow@chromium.org 已提交
240
  }
241
};
H
Hans Wennborg 已提交
242
}  // namespace
243

244 245 246 247 248
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* mem,
                                      const Options* opts, DB* db,
                                      const bool filter_deletes) {
  MemTableInserter inserter(WriteBatchInternal::Sequence(b), mem, opts, db,
                            filter_deletes);
249
  return b->Iterate(&inserter);
J
jorlow@chromium.org 已提交
250 251 252
}

void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
253
  assert(contents.size() >= kHeader);
J
jorlow@chromium.org 已提交
254 255 256
  b->rep_.assign(contents.data(), contents.size());
}

257 258 259 260 261 262
void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
  SetCount(dst, Count(dst) + Count(src));
  assert(src->rep_.size() >= kHeader);
  dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
}

263
}  // namespace rocksdb