pessimistic_transaction.cc 20.2 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
A
agiardullo 已提交
5 6 7

#ifndef ROCKSDB_LITE

8
#include "utilities/transactions/pessimistic_transaction.h"
A
agiardullo 已提交
9 10 11 12 13 14 15 16 17 18

#include <map>
#include <set>
#include <string>
#include <vector>

#include "db/column_family.h"
#include "db/db_impl.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
19
#include "rocksdb/snapshot.h"
A
agiardullo 已提交
20 21
#include "rocksdb/status.h"
#include "rocksdb/utilities/transaction_db.h"
S
Siying Dong 已提交
22
#include "util/cast_util.h"
A
agiardullo 已提交
23
#include "util/string_util.h"
24
#include "util/sync_point.h"
M
Maysam Yabandeh 已提交
25
#include "utilities/transactions/pessimistic_transaction_db.h"
A
agiardullo 已提交
26 27 28 29 30 31
#include "utilities/transactions/transaction_util.h"

namespace rocksdb {

struct WriteOptions;

32
std::atomic<TransactionID> PessimisticTransaction::txn_id_counter_(1);
A
agiardullo 已提交
33

34
TransactionID PessimisticTransaction::GenTxnID() {
A
agiardullo 已提交
35 36 37
  return txn_id_counter_.fetch_add(1);
}

38 39 40
PessimisticTransaction::PessimisticTransaction(
    TransactionDB* txn_db, const WriteOptions& write_options,
    const TransactionOptions& txn_options)
41
    : TransactionBaseImpl(txn_db->GetRootDB(), write_options),
A
agiardullo 已提交
42
      txn_db_impl_(nullptr),
M
Maysam Yabandeh 已提交
43
      expiration_time_(0),
44
      txn_id_(0),
45 46
      waiting_cf_id_(0),
      waiting_key_(nullptr),
M
Manuel Ung 已提交
47 48 49
      lock_timeout_(0),
      deadlock_detect_(false),
      deadlock_detect_depth_(0) {
S
Siying Dong 已提交
50
  txn_db_impl_ =
M
Maysam Yabandeh 已提交
51
      static_cast_with_check<PessimisticTransactionDB, TransactionDB>(txn_db);
M
Maysam Yabandeh 已提交
52
  db_impl_ = static_cast_with_check<DBImpl, DB>(db_);
53 54 55
  Initialize(txn_options);
}

56
void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) {
57 58
  txn_id_ = GenTxnID();

R
Reid Horuff 已提交
59
  txn_state_ = STARTED;
60

M
Manuel Ung 已提交
61 62
  deadlock_detect_ = txn_options.deadlock_detect;
  deadlock_detect_depth_ = txn_options.deadlock_detect_depth;
63
  write_batch_.SetMaxBytes(txn_options.max_write_batch_size);
M
Manuel Ung 已提交
64

65
  lock_timeout_ = txn_options.lock_timeout * 1000;
A
agiardullo 已提交
66 67
  if (lock_timeout_ < 0) {
    // Lock timeout not set, use default
A
agiardullo 已提交
68 69
    lock_timeout_ =
        txn_db_impl_->GetTxnDBOptions().transaction_lock_timeout * 1000;
A
agiardullo 已提交
70 71
  }

72 73 74 75 76 77
  if (txn_options.expiration >= 0) {
    expiration_time_ = start_time_ + txn_options.expiration * 1000;
  } else {
    expiration_time_ = 0;
  }

A
agiardullo 已提交
78 79 80
  if (txn_options.set_snapshot) {
    SetSnapshot();
  }
81

82 83 84
  if (expiration_time_ > 0) {
    txn_db_impl_->InsertExpirableTransaction(txn_id_, this);
  }
85 86
  use_only_the_last_commit_time_batch_for_recovery_ =
      txn_options.use_only_the_last_commit_time_batch_for_recovery;
A
agiardullo 已提交
87 88
}

89
PessimisticTransaction::~PessimisticTransaction() {
90
  txn_db_impl_->UnLock(this, &GetTrackedKeys());
91 92 93
  if (expiration_time_ > 0) {
    txn_db_impl_->RemoveExpirableTransaction(txn_id_);
  }
R
Reid Horuff 已提交
94
  if (!name_.empty() && txn_state_ != COMMITED) {
R
Reid Horuff 已提交
95 96
    txn_db_impl_->UnregisterTransaction(this);
  }
A
agiardullo 已提交
97 98
}

99
void PessimisticTransaction::Clear() {
100
  txn_db_impl_->UnLock(this, &GetTrackedKeys());
A
agiardullo 已提交
101
  TransactionBaseImpl::Clear();
A
agiardullo 已提交
102 103
}

104 105 106
void PessimisticTransaction::Reinitialize(
    TransactionDB* txn_db, const WriteOptions& write_options,
    const TransactionOptions& txn_options) {
R
Reid Horuff 已提交
107
  if (!name_.empty() && txn_state_ != COMMITED) {
R
Reid Horuff 已提交
108 109
    txn_db_impl_->UnregisterTransaction(this);
  }
110
  TransactionBaseImpl::Reinitialize(txn_db->GetRootDB(), write_options);
111 112 113
  Initialize(txn_options);
}

114
bool PessimisticTransaction::IsExpired() const {
A
agiardullo 已提交
115
  if (expiration_time_ > 0) {
A
agiardullo 已提交
116
    if (db_->GetEnv()->NowMicros() >= expiration_time_) {
A
agiardullo 已提交
117 118 119 120 121 122 123 124
      // Transaction is expired.
      return true;
    }
  }

  return false;
}

125 126 127
WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db,
                                     const WriteOptions& write_options,
                                     const TransactionOptions& txn_options)
128
    : PessimisticTransaction(txn_db, write_options, txn_options){};
M
Maysam Yabandeh 已提交
129

130
Status PessimisticTransaction::CommitBatch(WriteBatch* batch) {
A
agiardullo 已提交
131 132 133
  TransactionKeyMap keys_to_unlock;
  Status s = LockBatch(batch, &keys_to_unlock);

R
Reid Horuff 已提交
134 135 136
  if (!s.ok()) {
    return s;
  }
A
agiardullo 已提交
137

R
Reid Horuff 已提交
138 139 140 141 142
  bool can_commit = false;

  if (IsExpired()) {
    s = Status::Expired();
  } else if (expiration_time_ > 0) {
R
Reid Horuff 已提交
143 144
    TransactionState expected = STARTED;
    can_commit = std::atomic_compare_exchange_strong(&txn_state_, &expected,
R
Reid Horuff 已提交
145
                                                     AWAITING_COMMIT);
R
Reid Horuff 已提交
146
  } else if (txn_state_ == STARTED) {
R
Reid Horuff 已提交
147 148 149 150 151
    // lock stealing is not a concern
    can_commit = true;
  }

  if (can_commit) {
R
Reid Horuff 已提交
152
    txn_state_.store(AWAITING_COMMIT);
M
Maysam Yabandeh 已提交
153
    s = CommitBatchInternal(batch);
R
Reid Horuff 已提交
154
    if (s.ok()) {
R
Reid Horuff 已提交
155
      txn_state_.store(COMMITED);
R
Reid Horuff 已提交
156
    }
R
Reid Horuff 已提交
157
  } else if (txn_state_ == LOCKS_STOLEN) {
R
Reid Horuff 已提交
158 159 160
    s = Status::Expired();
  } else {
    s = Status::InvalidArgument("Transaction is not in state for commit.");
A
agiardullo 已提交
161 162
  }

R
Reid Horuff 已提交
163 164
  txn_db_impl_->UnLock(this, &keys_to_unlock);

A
agiardullo 已提交
165 166 167
  return s;
}

168
Status PessimisticTransaction::Prepare() {
R
Reid Horuff 已提交
169 170 171 172 173 174
  Status s;

  if (name_.empty()) {
    return Status::InvalidArgument(
        "Cannot prepare a transaction that has not been named.");
  }
A
agiardullo 已提交
175

R
Reid Horuff 已提交
176 177 178 179 180 181 182 183 184
  if (IsExpired()) {
    return Status::Expired();
  }

  bool can_prepare = false;

  if (expiration_time_ > 0) {
    // must concern ourselves with expiraton and/or lock stealing
    // need to compare/exchange bc locks could be stolen under us here
R
Reid Horuff 已提交
185 186
    TransactionState expected = STARTED;
    can_prepare = std::atomic_compare_exchange_strong(&txn_state_, &expected,
R
Reid Horuff 已提交
187
                                                      AWAITING_PREPARE);
R
Reid Horuff 已提交
188
  } else if (txn_state_ == STARTED) {
R
Reid Horuff 已提交
189 190 191 192 193
    // expiration and lock stealing is not possible
    can_prepare = true;
  }

  if (can_prepare) {
R
Reid Horuff 已提交
194
    txn_state_.store(AWAITING_PREPARE);
R
Reid Horuff 已提交
195 196
    // transaction can't expire after preparation
    expiration_time_ = 0;
197
    s = PrepareInternal();
R
Reid Horuff 已提交
198 199 200
    if (s.ok()) {
      assert(log_number_ != 0);
      dbimpl_->MarkLogAsContainingPrepSection(log_number_);
R
Reid Horuff 已提交
201
      txn_state_.store(PREPARED);
R
Reid Horuff 已提交
202
    }
R
Reid Horuff 已提交
203
  } else if (txn_state_ == LOCKS_STOLEN) {
R
Reid Horuff 已提交
204
    s = Status::Expired();
R
Reid Horuff 已提交
205
  } else if (txn_state_ == PREPARED) {
R
Reid Horuff 已提交
206
    s = Status::InvalidArgument("Transaction has already been prepared.");
R
Reid Horuff 已提交
207
  } else if (txn_state_ == COMMITED) {
R
Reid Horuff 已提交
208
    s = Status::InvalidArgument("Transaction has already been committed.");
R
Reid Horuff 已提交
209
  } else if (txn_state_ == ROLLEDBACK) {
R
Reid Horuff 已提交
210 211 212 213
    s = Status::InvalidArgument("Transaction has already been rolledback.");
  } else {
    s = Status::InvalidArgument("Transaction is not in state for commit.");
  }
A
agiardullo 已提交
214 215 216 217

  return s;
}

218 219 220 221 222 223 224 225 226 227 228 229
Status WriteCommittedTxn::PrepareInternal() {
  WriteOptions write_options = write_options_;
  write_options.disableWAL = false;
  WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_);
  Status s =
      db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
                          /*callback*/ nullptr, &log_number_, /*log ref*/ 0,
                          /* disable_memtable*/ true);
  return s;
}

Status PessimisticTransaction::Commit() {
A
agiardullo 已提交
230
  Status s;
231
  bool commit_without_prepare = false;
R
Reid Horuff 已提交
232
  bool commit_prepared = false;
A
agiardullo 已提交
233

R
Reid Horuff 已提交
234 235 236
  if (IsExpired()) {
    return Status::Expired();
  }
237

R
Reid Horuff 已提交
238 239 240 241 242 243
  if (expiration_time_ > 0) {
    // we must atomicaly compare and exchange the state here because at
    // this state in the transaction it is possible for another thread
    // to change our state out from under us in the even that we expire and have
    // our locks stolen. In this case the only valid state is STARTED because
    // a state of PREPARED would have a cleared expiration_time_.
R
Reid Horuff 已提交
244
    TransactionState expected = STARTED;
245 246
    commit_without_prepare = std::atomic_compare_exchange_strong(
        &txn_state_, &expected, AWAITING_COMMIT);
247
    TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1");
R
Reid Horuff 已提交
248
  } else if (txn_state_ == PREPARED) {
R
Reid Horuff 已提交
249 250
    // expiration and lock stealing is not a concern
    commit_prepared = true;
R
Reid Horuff 已提交
251
  } else if (txn_state_ == STARTED) {
R
Reid Horuff 已提交
252
    // expiration and lock stealing is not a concern
253 254 255 256
    commit_without_prepare = true;
    // TODO(myabandeh): what if the user mistakenly forgets prepare? We should
    // add an option so that the user explictly express the intention of
    // skipping the prepare phase.
R
Reid Horuff 已提交
257
  }
258

259
  if (commit_without_prepare) {
R
Reid Horuff 已提交
260 261 262 263
    assert(!commit_prepared);
    if (WriteBatchInternal::Count(GetCommitTimeWriteBatch()) > 0) {
      s = Status::InvalidArgument(
          "Commit-time batch contains values that will not be committed.");
264
    } else {
R
Reid Horuff 已提交
265
      txn_state_.store(AWAITING_COMMIT);
266
      s = CommitWithoutPrepareInternal();
R
Reid Horuff 已提交
267 268
      Clear();
      if (s.ok()) {
R
Reid Horuff 已提交
269
        txn_state_.store(COMMITED);
R
Reid Horuff 已提交
270
      }
271
    }
R
Reid Horuff 已提交
272
  } else if (commit_prepared) {
R
Reid Horuff 已提交
273
    txn_state_.store(AWAITING_COMMIT);
R
Reid Horuff 已提交
274

275
    s = CommitInternal();
276

R
Reid Horuff 已提交
277
    if (!s.ok()) {
278 279
      ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
                     "Commit write failed");
R
Reid Horuff 已提交
280 281 282 283 284 285 286 287 288 289 290
      return s;
    }

    // FindObsoleteFiles must now look to the memtables
    // to determine what prep logs must be kept around,
    // not the prep section heap.
    assert(log_number_ > 0);
    dbimpl_->MarkLogAsHavingPrepSectionFlushed(log_number_);
    txn_db_impl_->UnregisterTransaction(this);

    Clear();
R
Reid Horuff 已提交
291 292
    txn_state_.store(COMMITED);
  } else if (txn_state_ == LOCKS_STOLEN) {
R
Reid Horuff 已提交
293
    s = Status::Expired();
R
Reid Horuff 已提交
294
  } else if (txn_state_ == COMMITED) {
R
Reid Horuff 已提交
295
    s = Status::InvalidArgument("Transaction has already been committed.");
R
Reid Horuff 已提交
296
  } else if (txn_state_ == ROLLEDBACK) {
R
Reid Horuff 已提交
297
    s = Status::InvalidArgument("Transaction has already been rolledback.");
A
agiardullo 已提交
298
  } else {
R
Reid Horuff 已提交
299
    s = Status::InvalidArgument("Transaction is not in state for commit.");
A
agiardullo 已提交
300 301 302 303 304
  }

  return s;
}

305 306 307 308 309
Status WriteCommittedTxn::CommitWithoutPrepareInternal() {
  Status s = db_->Write(write_options_, GetWriteBatch()->GetWriteBatch());
  return s;
}

M
Maysam Yabandeh 已提交
310 311 312 313 314
Status WriteCommittedTxn::CommitBatchInternal(WriteBatch* batch) {
  Status s = db_->Write(write_options_, batch);
  return s;
}

315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333
Status WriteCommittedTxn::CommitInternal() {
  // We take the commit-time batch and append the Commit marker.
  // The Memtable will ignore the Commit marker in non-recovery mode
  WriteBatch* working_batch = GetCommitTimeWriteBatch();
  WriteBatchInternal::MarkCommit(working_batch, name_);

  // any operations appended to this working_batch will be ignored from WAL
  working_batch->MarkWalTerminationPoint();

  // insert prepared batch into Memtable only skipping WAL.
  // Memtable will ignore BeginPrepare/EndPrepare markers
  // in non recovery mode and simply insert the values
  WriteBatchInternal::Append(working_batch, GetWriteBatch()->GetWriteBatch());

  auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
                               log_number_);
  return s;
}

M
Maysam Yabandeh 已提交
334
Status PessimisticTransaction::Rollback() {
R
Reid Horuff 已提交
335
  Status s;
R
Reid Horuff 已提交
336 337
  if (txn_state_ == PREPARED) {
    txn_state_.store(AWAITING_ROLLBACK);
M
Maysam Yabandeh 已提交
338 339 340

    s = RollbackInternal();

R
Reid Horuff 已提交
341 342 343 344 345
    if (s.ok()) {
      // we do not need to keep our prepared section around
      assert(log_number_ > 0);
      dbimpl_->MarkLogAsHavingPrepSectionFlushed(log_number_);
      Clear();
R
Reid Horuff 已提交
346
      txn_state_.store(ROLLEDBACK);
R
Reid Horuff 已提交
347
    }
R
Reid Horuff 已提交
348
  } else if (txn_state_ == STARTED) {
R
Reid Horuff 已提交
349 350
    // prepare couldn't have taken place
    Clear();
R
Reid Horuff 已提交
351
  } else if (txn_state_ == COMMITED) {
R
Reid Horuff 已提交
352 353 354 355 356 357 358 359
    s = Status::InvalidArgument("This transaction has already been committed.");
  } else {
    s = Status::InvalidArgument(
        "Two phase transaction is not in state for rollback.");
  }

  return s;
}
M
Maysam Yabandeh 已提交
360 361 362 363 364 365 366

Status WriteCommittedTxn::RollbackInternal() {
  WriteBatch rollback_marker;
  WriteBatchInternal::MarkRollback(&rollback_marker, name_);
  auto s = db_impl_->WriteImpl(write_options_, &rollback_marker);
  return s;
}
A
agiardullo 已提交
367

368
Status PessimisticTransaction::RollbackToSavePoint() {
R
Reid Horuff 已提交
369
  if (txn_state_ != STARTED) {
R
Reid Horuff 已提交
370 371 372
    return Status::InvalidArgument("Transaction is beyond state for rollback.");
  }

373
  // Unlock any keys locked since last transaction
A
agiardullo 已提交
374 375 376
  const std::unique_ptr<TransactionKeyMap>& keys =
      GetTrackedKeysSinceSavePoint();

377
  if (keys) {
A
agiardullo 已提交
378
    txn_db_impl_->UnLock(this, keys.get());
379 380 381 382 383
  }

  return TransactionBaseImpl::RollbackToSavePoint();
}

A
agiardullo 已提交
384 385
// Lock all keys in this batch.
// On success, caller should unlock keys_to_unlock
386
Status PessimisticTransaction::LockBatch(WriteBatch* batch,
387
                                         TransactionKeyMap* keys_to_unlock) {
A
agiardullo 已提交
388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408
  class Handler : public WriteBatch::Handler {
   public:
    // Sorted map of column_family_id to sorted set of keys.
    // Since LockBatch() always locks keys in sorted order, it cannot deadlock
    // with itself.  We're not using a comparator here since it doesn't matter
    // what the sorting is as long as it's consistent.
    std::map<uint32_t, std::set<std::string>> keys_;

    Handler() {}

    void RecordKey(uint32_t column_family_id, const Slice& key) {
      std::string key_str = key.ToString();

      auto iter = (keys_)[column_family_id].find(key_str);
      if (iter == (keys_)[column_family_id].end()) {
        // key not yet seen, store it.
        (keys_)[column_family_id].insert({std::move(key_str)});
      }
    }

    virtual Status PutCF(uint32_t column_family_id, const Slice& key,
409
                         const Slice& /* unused */) override {
A
agiardullo 已提交
410 411 412 413
      RecordKey(column_family_id, key);
      return Status::OK();
    }
    virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
414
                           const Slice& /* unused */) override {
A
agiardullo 已提交
415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438
      RecordKey(column_family_id, key);
      return Status::OK();
    }
    virtual Status DeleteCF(uint32_t column_family_id,
                            const Slice& key) override {
      RecordKey(column_family_id, key);
      return Status::OK();
    }
  };

  // Iterating on this handler will add all keys in this batch into keys
  Handler handler;
  batch->Iterate(&handler);

  Status s;

  // Attempt to lock all keys
  for (const auto& cf_iter : handler.keys_) {
    uint32_t cfh_id = cf_iter.first;
    auto& cfh_keys = cf_iter.second;

    for (const auto& key_iter : cfh_keys) {
      const std::string& key = key_iter;

M
Manuel Ung 已提交
439
      s = txn_db_impl_->TryLock(this, cfh_id, key, true /* exclusive */);
A
agiardullo 已提交
440 441 442
      if (!s.ok()) {
        break;
      }
A
agiardullo 已提交
443
      TrackKey(keys_to_unlock, cfh_id, std::move(key), kMaxSequenceNumber,
M
Manuel Ung 已提交
444
               false, true /* exclusive */);
A
agiardullo 已提交
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463
    }

    if (!s.ok()) {
      break;
    }
  }

  if (!s.ok()) {
    txn_db_impl_->UnLock(this, keys_to_unlock);
  }

  return s;
}

// Attempt to lock this key.
// Returns OK if the key has been successfully locked.  Non-ok, otherwise.
// If check_shapshot is true and this transaction has a snapshot set,
// this key will only be locked if there have been no writes to this key since
// the snapshot time.
464
Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
465
                                       const Slice& key, bool read_only,
466
                                       bool exclusive, bool skip_validate) {
A
agiardullo 已提交
467 468 469
  uint32_t cfh_id = GetColumnFamilyID(column_family);
  std::string key_str = key.ToString();
  bool previously_locked;
M
Manuel Ung 已提交
470
  bool lock_upgrade = false;
A
agiardullo 已提交
471 472
  Status s;

473
  // lock this key if this transactions hasn't already locked it
474
  SequenceNumber tracked_at_seq = kMaxSequenceNumber;
475 476 477 478

  const auto& tracked_keys = GetTrackedKeys();
  const auto tracked_keys_cf = tracked_keys.find(cfh_id);
  if (tracked_keys_cf == tracked_keys.end()) {
A
agiardullo 已提交
479
    previously_locked = false;
480 481 482 483 484
  } else {
    auto iter = tracked_keys_cf->second.find(key_str);
    if (iter == tracked_keys_cf->second.end()) {
      previously_locked = false;
    } else {
M
Manuel Ung 已提交
485 486 487
      if (!iter->second.exclusive && exclusive) {
        lock_upgrade = true;
      }
488
      previously_locked = true;
489
      tracked_at_seq = iter->second.seq;
490 491
    }
  }
A
agiardullo 已提交
492

M
Manuel Ung 已提交
493 494 495
  // Lock this key if this transactions hasn't already locked it or we require
  // an upgrade.
  if (!previously_locked || lock_upgrade) {
M
Manuel Ung 已提交
496
    s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive);
A
agiardullo 已提交
497 498
  }

499 500 501 502 503 504 505 506
  SetSnapshotIfNeeded();

  // Even though we do not care about doing conflict checking for this write,
  // we still need to take a lock to make sure we do not cause a conflict with
  // some other write.  However, we do not need to check if there have been
  // any writes since this transaction's snapshot.
  // TODO(agiardullo): could optimize by supporting shared txn locks in the
  // future
507
  if (skip_validate || snapshot_ == nullptr) {
508 509 510 511
    // Need to remember the earliest sequence number that we know that this
    // key has not been modified after.  This is useful if this same
    // transaction
    // later tries to lock this key again.
512
    if (tracked_at_seq == kMaxSequenceNumber) {
513 514
      // Since we haven't checked a snapshot, we only know this key has not
      // been modified since after we locked it.
515 516 517 518 519 520
      // Note: when allocate_seq_only_for_data_==false this is less than the
      // latest allocated seq but it is ok since i) this is just a heuristic
      // used only as a hint to avoid actual check for conflicts, ii) this would
      // cause a false positive only if the snapthot is taken right after the
      // lock, which would be an unusual sequence.
      tracked_at_seq = db_->GetLatestSequenceNumber();
521 522
    }
  } else {
A
agiardullo 已提交
523 524
    // If a snapshot is set, we need to make sure the key hasn't been modified
    // since the snapshot.  This must be done after we locked the key.
525 526
    // If we already have validated an earilier snapshot it must has been
    // reflected in tracked_at_seq and ValidateSnapshot will return OK.
527
    if (s.ok()) {
528
      s = ValidateSnapshot(column_family, key, &tracked_at_seq);
529 530 531 532 533

      if (!s.ok()) {
        // Failed to validate key
        if (!previously_locked) {
          // Unlock key we just locked
M
Manuel Ung 已提交
534 535 536 537 538 539 540
          if (lock_upgrade) {
            s = txn_db_impl_->TryLock(this, cfh_id, key_str,
                                      false /* exclusive */);
            assert(s.ok());
          } else {
            txn_db_impl_->UnLock(this, cfh_id, key.ToString());
          }
A
agiardullo 已提交
541 542 543 544 545
        }
      }
    }
  }

546
  if (s.ok()) {
547 548 549 550 551
    // We must track all the locked keys so that we can unlock them later. If
    // the key is already locked, this func will update some stats on the
    // tracked key. It could also update the tracked_at_seq if it is lower than
    // the existing trackey seq.
    TrackKey(cfh_id, key_str, tracked_at_seq, read_only, exclusive);
552 553
  }

A
agiardullo 已提交
554 555 556 557 558
  return s;
}

// Return OK() if this key has not been modified more recently than the
// transaction snapshot_.
559 560
// tracked_at_seq is the global seq at which we either locked the key or already
// have done ValidateSnapshot.
561 562
Status PessimisticTransaction::ValidateSnapshot(
    ColumnFamilyHandle* column_family, const Slice& key,
563
    SequenceNumber* tracked_at_seq) {
564 565
  assert(snapshot_);

566 567 568 569 570
  SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
  if (*tracked_at_seq <= snap_seq) {
    // If the key has been previous validated (or locked) at a sequence number
    // earlier than the current snapshot's sequence number, we already know it
    // has not been modified aftter snap_seq either.
571 572
    return Status::OK();
  }
573 574 575 576 577
  // Otherwise we have either
  // 1: tracked_at_seq == kMaxSequenceNumber, i.e., first time tracking the key
  // 2: snap_seq < tracked_at_seq: last time we lock the key was via
  // skip_validate option which means we had skipped ValidateSnapshot. In both
  // cases we should do ValidateSnapshot now.
A
agiardullo 已提交
578

579
  *tracked_at_seq = snap_seq;
A
agiardullo 已提交
580

581
  ColumnFamilyHandle* cfh =
M
Maysam Yabandeh 已提交
582
      column_family ? column_family : db_impl_->DefaultColumnFamily();
A
agiardullo 已提交
583

584 585
  return TransactionUtil::CheckKeyForConflicts(
      db_impl_, cfh, key.ToString(), snap_seq, false /* cache_only */);
A
agiardullo 已提交
586 587
}

588
bool PessimisticTransaction::TryStealingLocks() {
589
  assert(IsExpired());
R
Reid Horuff 已提交
590 591
  TransactionState expected = STARTED;
  return std::atomic_compare_exchange_strong(&txn_state_, &expected,
592 593 594
                                             LOCKS_STOLEN);
}

595 596
void PessimisticTransaction::UnlockGetForUpdate(
    ColumnFamilyHandle* column_family, const Slice& key) {
A
agiardullo 已提交
597 598 599
  txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString());
}

600
Status PessimisticTransaction::SetName(const TransactionName& name) {
R
Reid Horuff 已提交
601
  Status s;
R
Reid Horuff 已提交
602
  if (txn_state_ == STARTED) {
R
Reid Horuff 已提交
603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619
    if (name_.length()) {
      s = Status::InvalidArgument("Transaction has already been named.");
    } else if (txn_db_impl_->GetTransactionByName(name) != nullptr) {
      s = Status::InvalidArgument("Transaction name must be unique.");
    } else if (name.length() < 1 || name.length() > 512) {
      s = Status::InvalidArgument(
          "Transaction name length must be between 1 and 512 chars.");
    } else {
      name_ = name;
      txn_db_impl_->RegisterTransaction(this);
    }
  } else {
    s = Status::InvalidArgument("Transaction is beyond state for naming.");
  }
  return s;
}

A
agiardullo 已提交
620 621 622
}  // namespace rocksdb

#endif  // ROCKSDB_LITE