optimistic_transaction_impl.cc 9.8 KB
Newer Older
A
agiardullo 已提交
1 2 3 4 5 6 7 8 9
//  Copyright (c) 2015, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.

#ifndef ROCKSDB_LITE

#include "utilities/transactions/optimistic_transaction_impl.h"

A
agiardullo 已提交
10
#include <algorithm>
A
agiardullo 已提交
11 12 13 14 15 16 17 18 19 20
#include <string>
#include <vector>

#include "db/column_family.h"
#include "db/db_impl.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/status.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "util/string_util.h"
A
agiardullo 已提交
21
#include "utilities/transactions/transaction_util.h"
A
agiardullo 已提交
22 23 24 25 26 27 28 29 30 31 32

namespace rocksdb {

struct WriteOptions;

OptimisticTransactionImpl::OptimisticTransactionImpl(
    OptimisticTransactionDB* txn_db, const WriteOptions& write_options,
    const OptimisticTransactionOptions& txn_options)
    : txn_db_(txn_db),
      db_(txn_db->GetBaseDB()),
      write_options_(write_options),
A
agiardullo 已提交
33 34
      cmp_(txn_options.cmp),
      write_batch_(new WriteBatchWithIndex(txn_options.cmp, 0, true)) {
A
agiardullo 已提交
35 36 37 38 39 40
  if (txn_options.set_snapshot) {
    SetSnapshot();
  }
}

OptimisticTransactionImpl::~OptimisticTransactionImpl() {
41 42 43
}

void OptimisticTransactionImpl::Cleanup() {
A
agiardullo 已提交
44
  tracked_keys_.clear();
45 46
  save_points_.reset(nullptr);
  write_batch_->Clear();
A
agiardullo 已提交
47 48 49
}

void OptimisticTransactionImpl::SetSnapshot() {
50
  snapshot_.reset(new ManagedSnapshot(db_));
A
agiardullo 已提交
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
}

Status OptimisticTransactionImpl::Commit() {
  // Set up callback which will call CheckTransactionForConflicts() to
  // check whether this transaction is safe to be committed.
  OptimisticTransactionCallback callback(this);

  DBImpl* db_impl = dynamic_cast<DBImpl*>(db_->GetRootDB());
  if (db_impl == nullptr) {
    // This should only happen if we support creating transactions from
    // a StackableDB and someone overrides GetRootDB().
    return Status::InvalidArgument(
        "DB::GetRootDB() returned an unexpected DB class");
  }

  Status s = db_impl->WriteWithCallback(
A
agiardullo 已提交
67
      write_options_, write_batch_->GetWriteBatch(), &callback);
A
agiardullo 已提交
68 69

  if (s.ok()) {
70
    Cleanup();
A
agiardullo 已提交
71 72 73 74 75 76
  }

  return s;
}

void OptimisticTransactionImpl::Rollback() {
77
  Cleanup();
A
agiardullo 已提交
78 79 80
}

void OptimisticTransactionImpl::SetSavePoint() {
81 82
  if (save_points_ == nullptr) {
    save_points_.reset(new std::stack<std::shared_ptr<ManagedSnapshot>>());
A
agiardullo 已提交
83
  }
84 85
  save_points_->push(snapshot_);
  write_batch_->SetSavePoint();
A
agiardullo 已提交
86 87
}

88
Status OptimisticTransactionImpl::RollbackToSavePoint() {
A
agiardullo 已提交
89
  if (save_points_ != nullptr && save_points_->size() > 0) {
90 91
    // Restore saved snapshot
    snapshot_ = save_points_->top();
A
agiardullo 已提交
92 93
    save_points_->pop();

94 95 96
    // Rollback batch
    Status s = write_batch_->RollbackToSavePoint();
    assert(s.ok());
A
agiardullo 已提交
97

98
    return s;
A
agiardullo 已提交
99
  } else {
100 101
    assert(write_batch_->RollbackToSavePoint().IsNotFound());
    return Status::NotFound();
A
agiardullo 已提交
102
  }
A
agiardullo 已提交
103 104 105 106 107 108 109 110 111
}

// Record this key so that we can check it for conflicts at commit time.
void OptimisticTransactionImpl::RecordOperation(
    ColumnFamilyHandle* column_family, const Slice& key) {
  uint32_t cfh_id = GetColumnFamilyID(column_family);

  SequenceNumber seq;
  if (snapshot_) {
112
    seq = snapshot_->snapshot()->GetSequenceNumber();
A
agiardullo 已提交
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
  } else {
    seq = db_->GetLatestSequenceNumber();
  }

  std::string key_str = key.ToString();

  auto iter = tracked_keys_[cfh_id].find(key_str);
  if (iter == tracked_keys_[cfh_id].end()) {
    // key not yet seen, store it.
    tracked_keys_[cfh_id].insert({std::move(key_str), seq});
  } else {
    SequenceNumber old_seq = iter->second;
    if (seq < old_seq) {
      // Snapshot has changed since we last saw this key, need to
      // store the earliest seen sequence number.
      tracked_keys_[cfh_id][key_str] = seq;
    }
  }
}

void OptimisticTransactionImpl::RecordOperation(
    ColumnFamilyHandle* column_family, const SliceParts& key) {
  size_t key_size = 0;
  for (int i = 0; i < key.num_parts; ++i) {
    key_size += key.parts[i].size();
  }

  std::string str;
  str.reserve(key_size);

  for (int i = 0; i < key.num_parts; ++i) {
    str.append(key.parts[i].data(), key.parts[i].size());
  }

  RecordOperation(column_family, str);
}

Status OptimisticTransactionImpl::Get(const ReadOptions& read_options,
                                      ColumnFamilyHandle* column_family,
                                      const Slice& key, std::string* value) {
A
agiardullo 已提交
153 154
  return write_batch_->GetFromBatchAndDB(db_, read_options, column_family, key,
                                         value);
A
agiardullo 已提交
155 156 157 158 159 160 161 162
}

Status OptimisticTransactionImpl::GetForUpdate(
    const ReadOptions& read_options, ColumnFamilyHandle* column_family,
    const Slice& key, std::string* value) {
  // Regardless of whether the Get succeeded, track this key.
  RecordOperation(column_family, key);

A
agiardullo 已提交
163 164 165 166 167
  if (value == nullptr) {
    return Status::OK();
  } else {
    return Get(read_options, column_family, key, value);
  }
A
agiardullo 已提交
168 169 170 171 172 173 174 175 176 177 178 179 180
}

std::vector<Status> OptimisticTransactionImpl::MultiGet(
    const ReadOptions& read_options,
    const std::vector<ColumnFamilyHandle*>& column_family,
    const std::vector<Slice>& keys, std::vector<std::string>* values) {
  // Regardless of whether the MultiGet succeeded, track these keys.
  size_t num_keys = keys.size();
  values->resize(num_keys);

  // TODO(agiardullo): optimize multiget?
  std::vector<Status> stat_list(num_keys);
  for (size_t i = 0; i < num_keys; ++i) {
A
agiardullo 已提交
181
    std::string* value = values ? &(*values)[i] : nullptr;
A
agiardullo 已提交
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
    stat_list[i] = Get(read_options, column_family[i], keys[i], value);
  }

  return stat_list;
}

std::vector<Status> OptimisticTransactionImpl::MultiGetForUpdate(
    const ReadOptions& read_options,
    const std::vector<ColumnFamilyHandle*>& column_family,
    const std::vector<Slice>& keys, std::vector<std::string>* values) {
  // Regardless of whether the MultiGet succeeded, track these keys.
  size_t num_keys = keys.size();
  values->resize(num_keys);

  // TODO(agiardullo): optimize multiget?
  std::vector<Status> stat_list(num_keys);
  for (size_t i = 0; i < num_keys; ++i) {
    // Regardless of whether the Get succeeded, track this key.
    RecordOperation(column_family[i], keys[i]);

A
agiardullo 已提交
202
    std::string* value = values ? &(*values)[i] : nullptr;
A
agiardullo 已提交
203 204 205 206 207 208
    stat_list[i] = Get(read_options, column_family[i], keys[i], value);
  }

  return stat_list;
}

A
agiardullo 已提交
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
Iterator* OptimisticTransactionImpl::GetIterator(
    const ReadOptions& read_options) {
  Iterator* db_iter = db_->NewIterator(read_options);
  assert(db_iter);

  return write_batch_->NewIteratorWithBase(db_iter);
}

Iterator* OptimisticTransactionImpl::GetIterator(
    const ReadOptions& read_options, ColumnFamilyHandle* column_family) {
  Iterator* db_iter = db_->NewIterator(read_options, column_family);
  assert(db_iter);

  return write_batch_->NewIteratorWithBase(column_family, db_iter);
}

Status OptimisticTransactionImpl::Put(ColumnFamilyHandle* column_family,
                                      const Slice& key, const Slice& value) {
A
agiardullo 已提交
227 228
  RecordOperation(column_family, key);

A
agiardullo 已提交
229 230 231
  write_batch_->Put(column_family, key, value);

  return Status::OK();
A
agiardullo 已提交
232 233
}

A
agiardullo 已提交
234 235 236
Status OptimisticTransactionImpl::Put(ColumnFamilyHandle* column_family,
                                      const SliceParts& key,
                                      const SliceParts& value) {
A
agiardullo 已提交
237 238
  RecordOperation(column_family, key);

A
agiardullo 已提交
239 240 241
  write_batch_->Put(column_family, key, value);

  return Status::OK();
A
agiardullo 已提交
242 243
}

A
agiardullo 已提交
244 245
Status OptimisticTransactionImpl::Merge(ColumnFamilyHandle* column_family,
                                        const Slice& key, const Slice& value) {
A
agiardullo 已提交
246 247
  RecordOperation(column_family, key);

A
agiardullo 已提交
248 249 250
  write_batch_->Merge(column_family, key, value);

  return Status::OK();
A
agiardullo 已提交
251 252
}

A
agiardullo 已提交
253 254
Status OptimisticTransactionImpl::Delete(ColumnFamilyHandle* column_family,
                                         const Slice& key) {
A
agiardullo 已提交
255 256
  RecordOperation(column_family, key);

A
agiardullo 已提交
257 258 259
  write_batch_->Delete(column_family, key);

  return Status::OK();
A
agiardullo 已提交
260 261
}

A
agiardullo 已提交
262 263
Status OptimisticTransactionImpl::Delete(ColumnFamilyHandle* column_family,
                                         const SliceParts& key) {
A
agiardullo 已提交
264 265
  RecordOperation(column_family, key);

A
agiardullo 已提交
266 267 268
  write_batch_->Delete(column_family, key);

  return Status::OK();
A
agiardullo 已提交
269 270
}

A
agiardullo 已提交
271 272 273 274 275
Status OptimisticTransactionImpl::PutUntracked(
    ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) {
  write_batch_->Put(column_family, key, value);

  return Status::OK();
A
agiardullo 已提交
276 277
}

A
agiardullo 已提交
278 279 280 281 282 283
Status OptimisticTransactionImpl::PutUntracked(
    ColumnFamilyHandle* column_family, const SliceParts& key,
    const SliceParts& value) {
  write_batch_->Put(column_family, key, value);

  return Status::OK();
A
agiardullo 已提交
284 285
}

A
agiardullo 已提交
286
Status OptimisticTransactionImpl::MergeUntracked(
A
agiardullo 已提交
287
    ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) {
A
agiardullo 已提交
288 289 290
  write_batch_->Merge(column_family, key, value);

  return Status::OK();
A
agiardullo 已提交
291 292
}

A
agiardullo 已提交
293
Status OptimisticTransactionImpl::DeleteUntracked(
A
agiardullo 已提交
294
    ColumnFamilyHandle* column_family, const Slice& key) {
A
agiardullo 已提交
295 296 297
  write_batch_->Delete(column_family, key);

  return Status::OK();
A
agiardullo 已提交
298 299
}

A
agiardullo 已提交
300
Status OptimisticTransactionImpl::DeleteUntracked(
A
agiardullo 已提交
301
    ColumnFamilyHandle* column_family, const SliceParts& key) {
A
agiardullo 已提交
302 303 304
  write_batch_->Delete(column_family, key);

  return Status::OK();
A
agiardullo 已提交
305 306 307
}

void OptimisticTransactionImpl::PutLogData(const Slice& blob) {
A
agiardullo 已提交
308
  write_batch_->PutLogData(blob);
A
agiardullo 已提交
309 310 311
}

WriteBatchWithIndex* OptimisticTransactionImpl::GetWriteBatch() {
A
agiardullo 已提交
312
  return write_batch_.get();
A
agiardullo 已提交
313 314 315 316 317 318
}

// Returns OK if it is safe to commit this transaction.  Returns Status::Busy
// if there are read or write conflicts that would prevent us from committing OR
// if we can not determine whether there would be any such conflicts.
//
A
agiardullo 已提交
319 320 321
// Should only be called on writer thread in order to avoid any race conditions
// in detecting
// write conflicts.
A
agiardullo 已提交
322 323 324 325 326 327
Status OptimisticTransactionImpl::CheckTransactionForConflicts(DB* db) {
  Status result;

  assert(dynamic_cast<DBImpl*>(db) != nullptr);
  auto db_impl = reinterpret_cast<DBImpl*>(db);

A
agiardullo 已提交
328
  return TransactionUtil::CheckKeysForConflicts(db_impl, &tracked_keys_);
A
agiardullo 已提交
329 330 331 332 333
}

}  // namespace rocksdb

#endif  // ROCKSDB_LITE