write_prepared_txn_db.cc 33.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).

#ifndef ROCKSDB_LITE

#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include "utilities/transactions/write_prepared_txn_db.h"

14
#include <inttypes.h>
15
#include <algorithm>
16 17 18 19 20 21 22 23 24
#include <string>
#include <unordered_set>
#include <vector>

#include "db/db_impl.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/transaction_db.h"
#include "util/mutexlock.h"
25
#include "util/string_util.h"
26 27 28 29 30 31 32 33 34 35 36 37 38
#include "util/sync_point.h"
#include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/transaction_db_mutex_impl.h"

namespace rocksdb {

Status WritePreparedTxnDB::Initialize(
    const std::vector<size_t>& compaction_enabled_cf_indices,
    const std::vector<ColumnFamilyHandle*>& handles) {
  auto dbimpl = reinterpret_cast<DBImpl*>(GetRootDB());
  assert(dbimpl != nullptr);
  auto rtxns = dbimpl->recovered_transactions();
  for (auto rtxn : rtxns) {
39 40 41 42
    auto cnt = rtxn.second->batch_cnt_ ? rtxn.second->batch_cnt_ : 1;
    for (size_t i = 0; i < cnt; i++) {
      AddPrepared(rtxn.second->seq_ + i);
    }
43 44 45 46 47 48 49 50 51 52 53 54
  }
  SequenceNumber prev_max = max_evicted_seq_;
  SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
  AdvanceMaxEvictedSeq(prev_max, last_seq);

  db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));

  auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices,
                                                handles);
  return s;
}

55 56 57 58 59 60 61 62 63 64 65 66 67 68
Status WritePreparedTxnDB::VerifyCFOptions(
    const ColumnFamilyOptions& cf_options) {
  Status s = PessimisticTransactionDB::VerifyCFOptions(cf_options);
  if (!s.ok()) {
    return s;
  }
  if (!cf_options.memtable_factory->CanHandleDuplicatedKey()) {
    return Status::InvalidArgument(
        "memtable_factory->CanHandleDuplicatedKey() cannot be false with "
        "WritePrpeared transactions");
  }
  return Status::OK();
}

69 70 71 72 73 74 75 76 77 78 79
Transaction* WritePreparedTxnDB::BeginTransaction(
    const WriteOptions& write_options, const TransactionOptions& txn_options,
    Transaction* old_txn) {
  if (old_txn != nullptr) {
    ReinitializeTransaction(old_txn, write_options, txn_options);
    return old_txn;
  } else {
    return new WritePreparedTxn(this, write_options, txn_options);
  }
}

80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
Status WritePreparedTxnDB::Write(
    const WriteOptions& opts,
    const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) {
  if (optimizations.skip_concurrency_control) {
    // Skip locking the rows
    const size_t UNKNOWN_BATCH_CNT = 0;
    const size_t ONE_BATCH_CNT = 1;
    const size_t batch_cnt = optimizations.skip_duplicate_key_check
                                 ? ONE_BATCH_CNT
                                 : UNKNOWN_BATCH_CNT;
    WritePreparedTxn* NO_TXN = nullptr;
    return WriteInternal(opts, updates, batch_cnt, NO_TXN);
  } else {
    // TODO(myabandeh): Make use of skip_duplicate_key_check hint
    // Fall back to unoptimized version
    return PessimisticTransactionDB::Write(opts, updates);
  }
}

Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
                                         WriteBatch* batch, size_t batch_cnt,
                                         WritePreparedTxn* txn) {
  ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
                    "CommitBatchInternal");
  if (batch->Count() == 0) {
    // Otherwise our 1 seq per batch logic will break since there is no seq
    // increased for this batch.
    return Status::OK();
  }
  if (batch_cnt == 0) {  // not provided, then compute it
    // TODO(myabandeh): add an option to allow user skipping this cost
    SubBatchCounter counter(*GetCFComparatorMap());
    auto s = batch->Iterate(&counter);
    assert(s.ok());
    batch_cnt = counter.BatchCount();
115 116 117
    // TODO(myabandeh): replace me with a stat
    ROCKS_LOG_WARN(info_log_, "Duplicate key overhead: %" PRIu64 " batches",
                   static_cast<uint64_t>(batch_cnt));
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
  }
  assert(batch_cnt);

  bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
  WriteOptions write_options(write_options_orig);
  bool sync = write_options.sync;
  if (!do_one_write) {
    // No need to sync on the first write
    write_options.sync = false;
  }
  // In the absence of Prepare markers, use Noop as a batch separator
  WriteBatchInternal::InsertNoop(batch);
  const bool DISABLE_MEMTABLE = true;
  const uint64_t no_log_ref = 0;
  uint64_t seq_used = kMaxSequenceNumber;
  const size_t ZERO_PREPARES = 0;
  WritePreparedCommitEntryPreReleaseCallback update_commit_map(
      this, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, batch_cnt);
  auto s = db_impl_->WriteImpl(
      write_options, batch, nullptr, nullptr, no_log_ref, !DISABLE_MEMTABLE,
      &seq_used, batch_cnt, do_one_write ? &update_commit_map : nullptr);
  assert(!s.ok() || seq_used != kMaxSequenceNumber);
  uint64_t& prepare_seq = seq_used;
  if (txn != nullptr) {
    txn->SetId(prepare_seq);
  }
  if (!s.ok()) {
    return s;
  }
  if (do_one_write) {
    return s;
  }  // else do the 2nd write for commit
  // Set the original value of sync
  write_options.sync = sync;
  ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
                    "CommitBatchInternal 2nd write prepare_seq: %" PRIu64,
                    prepare_seq);
  // TODO(myabandeh): What if max advances the prepare_seq_ in the meanwhile and
  // readers assume the prepared data as committed? Almost zero probability.

  // Commit the batch by writing an empty batch to the 2nd queue that will
  // release the commit sequence number to readers.
160 161
  const size_t ZERO_COMMITS = 0;
  const bool PREP_HEAP_SKIPPED = true;
162
  WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
163
      this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS, PREP_HEAP_SKIPPED);
164 165 166 167 168 169 170 171 172 173 174 175
  WriteBatch empty_batch;
  empty_batch.PutLogData(Slice());
  const size_t ONE_BATCH = 1;
  // In the absence of Prepare markers, use Noop as a batch separator
  WriteBatchInternal::InsertNoop(&empty_batch);
  s = db_impl_->WriteImpl(write_options, &empty_batch, nullptr, nullptr,
                          no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
                          &update_commit_map_with_prepare);
  assert(!s.ok() || seq_used != kMaxSequenceNumber);
  return s;
}

176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
Status WritePreparedTxnDB::Get(const ReadOptions& options,
                               ColumnFamilyHandle* column_family,
                               const Slice& key, PinnableSlice* value) {
  // We are fine with the latest committed value. This could be done by
  // specifying the snapshot as kMaxSequenceNumber.
  SequenceNumber seq = kMaxSequenceNumber;
  if (options.snapshot != nullptr) {
    seq = options.snapshot->GetSequenceNumber();
  }
  WritePreparedTxnReadCallback callback(this, seq);
  bool* dont_care = nullptr;
  // Note: no need to specify a snapshot for read options as no specific
  // snapshot is requested by the user.
  return db_impl_->GetImpl(options, column_family, key, value, dont_care,
                           &callback);
}

193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
void WritePreparedTxnDB::UpdateCFComparatorMap(
    const std::vector<ColumnFamilyHandle*>& handles) {
  auto cf_map = new std::map<uint32_t, const Comparator*>();
  for (auto h : handles) {
    auto id = h->GetID();
    const Comparator* comparator = h->GetComparator();
    (*cf_map)[id] = comparator;
  }
  cf_map_.store(cf_map);
  cf_map_gc_.reset(cf_map);
}

void WritePreparedTxnDB::UpdateCFComparatorMap(
    const ColumnFamilyHandle* h) {
  auto old_cf_map_ptr = cf_map_.load();
  assert(old_cf_map_ptr);
  auto cf_map = new std::map<uint32_t, const Comparator*>(*old_cf_map_ptr);
  auto id = h->GetID();
  const Comparator* comparator = h->GetComparator();
  (*cf_map)[id] = comparator;
  cf_map_.store(cf_map);
  cf_map_gc_.reset(cf_map);
}


218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
std::vector<Status> WritePreparedTxnDB::MultiGet(
    const ReadOptions& options,
    const std::vector<ColumnFamilyHandle*>& column_family,
    const std::vector<Slice>& keys, std::vector<std::string>* values) {
  assert(values);
  size_t num_keys = keys.size();
  values->resize(num_keys);

  std::vector<Status> stat_list(num_keys);
  for (size_t i = 0; i < num_keys; ++i) {
    std::string* value = values ? &(*values)[i] : nullptr;
    stat_list[i] = this->Get(options, column_family[i], keys[i], value);
  }
  return stat_list;
}

234 235 236 237 238 239 240 241 242 243 244
// Struct to hold ownership of snapshot and read callback for iterator cleanup.
struct WritePreparedTxnDB::IteratorState {
  IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
                std::shared_ptr<ManagedSnapshot> s)
      : callback(txn_db, sequence), snapshot(s) {}

  WritePreparedTxnReadCallback callback;
  std::shared_ptr<ManagedSnapshot> snapshot;
};

namespace {
A
Andrew Kryczka 已提交
245
static void CleanupWritePreparedTxnDBIterator(void* arg1, void* /*arg2*/) {
246 247 248 249 250 251
  delete reinterpret_cast<WritePreparedTxnDB::IteratorState*>(arg1);
}
}  // anonymous namespace

Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options,
                                          ColumnFamilyHandle* column_family) {
252 253
  constexpr bool ALLOW_BLOB = true;
  constexpr bool ALLOW_REFRESH = true;
254 255 256 257 258 259
  std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
  SequenceNumber snapshot_seq = kMaxSequenceNumber;
  if (options.snapshot != nullptr) {
    snapshot_seq = options.snapshot->GetSequenceNumber();
  } else {
    auto* snapshot = db_impl_->GetSnapshot();
260 261
    // We take a snapshot to make sure that the related data in the commit map
    // are not deleted.
262 263 264 265 266 267 268
    snapshot_seq = snapshot->GetSequenceNumber();
    own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
  }
  assert(snapshot_seq != kMaxSequenceNumber);
  auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
  auto* state = new IteratorState(this, snapshot_seq, own_snapshot);
  auto* db_iter =
269 270
      db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
                                !ALLOW_BLOB, !ALLOW_REFRESH);
271 272 273 274 275 276 277 278
  db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
  return db_iter;
}

Status WritePreparedTxnDB::NewIterators(
    const ReadOptions& options,
    const std::vector<ColumnFamilyHandle*>& column_families,
    std::vector<Iterator*>* iterators) {
279 280
  constexpr bool ALLOW_BLOB = true;
  constexpr bool ALLOW_REFRESH = true;
281 282 283 284 285 286
  std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
  SequenceNumber snapshot_seq = kMaxSequenceNumber;
  if (options.snapshot != nullptr) {
    snapshot_seq = options.snapshot->GetSequenceNumber();
  } else {
    auto* snapshot = db_impl_->GetSnapshot();
287 288
    // We take a snapshot to make sure that the related data in the commit map
    // are not deleted.
289 290 291 292 293 294 295 296 297
    snapshot_seq = snapshot->GetSequenceNumber();
    own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
  }
  iterators->clear();
  iterators->reserve(column_families.size());
  for (auto* column_family : column_families) {
    auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
    auto* state = new IteratorState(this, snapshot_seq, own_snapshot);
    auto* db_iter =
298 299
        db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
                                  !ALLOW_BLOB, !ALLOW_REFRESH);
300 301 302 303 304 305 306 307 308 309
    db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
    iterators->push_back(db_iter);
  }
  return Status::OK();
}

void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) {
  // Adcance max_evicted_seq_ no more than 100 times before the cache wraps
  // around.
  INC_STEP_FOR_MAX_EVICTED =
310
      std::max(COMMIT_CACHE_SIZE / 100, static_cast<size_t>(1));
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
  snapshot_cache_ = unique_ptr<std::atomic<SequenceNumber>[]>(
      new std::atomic<SequenceNumber>[SNAPSHOT_CACHE_SIZE] {});
  commit_cache_ = unique_ptr<std::atomic<CommitEntry64b>[]>(
      new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {});
}

// Returns true if commit_seq <= snapshot_seq
bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq,
                                      uint64_t snapshot_seq) const {
  // Here we try to infer the return value without looking into prepare list.
  // This would help avoiding synchronization over a shared map.
  // TODO(myabandeh): optimize this. This sequence of checks must be correct but
  // not necessary efficient
  if (prep_seq == 0) {
    // Compaction will output keys to bottom-level with sequence number 0 if
    // it is visible to the earliest snapshot.
327
    ROCKS_LOG_DETAILS(
M
Maysam Yabandeh 已提交
328 329
        info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
        prep_seq, snapshot_seq, 1);
330 331 332 333
    return true;
  }
  if (snapshot_seq < prep_seq) {
    // snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq
334
    ROCKS_LOG_DETAILS(
M
Maysam Yabandeh 已提交
335 336
        info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
        prep_seq, snapshot_seq, 0);
337 338 339 340 341
    return false;
  }
  if (!delayed_prepared_empty_.load(std::memory_order_acquire)) {
    // We should not normally reach here
    ReadLock rl(&prepared_mutex_);
342 343 344
    // TODO(myabandeh): also add a stat
    ROCKS_LOG_WARN(info_log_, "prepared_mutex_ overhead %" PRIu64,
                   static_cast<uint64_t>(delayed_prepared_.size()));
345 346
    if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
      // Then it is not committed yet
347
      ROCKS_LOG_DETAILS(
M
Maysam Yabandeh 已提交
348 349
          info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
          prep_seq, snapshot_seq, 0);
350 351 352 353 354 355 356 357 358
      return false;
    }
  }
  auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
  CommitEntry64b dont_care;
  CommitEntry cached;
  bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
  if (exist && prep_seq == cached.prep_seq) {
    // It is committed and also not evicted from commit cache
359
    ROCKS_LOG_DETAILS(
M
Maysam Yabandeh 已提交
360 361
        info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
        prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
362 363 364 365 366 367 368 369
    return cached.commit_seq <= snapshot_seq;
  }
  // else it could be committed but not inserted in the map which could happen
  // after recovery, or it could be committed and evicted by another commit, or
  // never committed.

  // At this point we dont know if it was committed or it is still prepared
  auto max_evicted_seq = max_evicted_seq_.load(std::memory_order_acquire);
370
  // max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq now
371 372
  if (max_evicted_seq < prep_seq) {
    // Not evicted from cache and also not present, so must be still prepared
373
    ROCKS_LOG_DETAILS(
M
Maysam Yabandeh 已提交
374 375
        info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
        prep_seq, snapshot_seq, 0);
376 377 378 379 380 381
    return false;
  }
  // When advancing max_evicted_seq_, we move older entires from prepared to
  // delayed_prepared_. Also we move evicted entries from commit cache to
  // old_commit_map_ if it overlaps with any snapshot. Since prep_seq <=
  // max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in
382 383
  // old_commit_map_, iii) committed with no conflict with any snapshot. Case
  // (i) delayed_prepared_ is checked above
384 385 386 387
  if (max_evicted_seq < snapshot_seq) {  // then (ii) cannot be the case
    // only (iii) is the case: committed
    // commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq <
    // snapshot_seq
388
    ROCKS_LOG_DETAILS(
M
Maysam Yabandeh 已提交
389 390
        info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
        prep_seq, snapshot_seq, 1);
391 392 393 394 395 396
    return true;
  }
  // else (ii) might be the case: check the commit data saved for this snapshot.
  // If there was no overlapping commit entry, then it is committed with a
  // commit_seq lower than any live snapshot, including snapshot_seq.
  if (old_commit_map_empty_.load(std::memory_order_acquire)) {
397
    ROCKS_LOG_DETAILS(
M
Maysam Yabandeh 已提交
398 399
        info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
        prep_seq, snapshot_seq, 1);
400 401 402
    return true;
  }
  {
403 404 405
    // We should not normally reach here unless sapshot_seq is old. This is a
    // rare case and it is ok to pay the cost of mutex ReadLock for such old,
    // reading transactions.
406 407
    // TODO(myabandeh): also add a stat
    ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
408
    ReadLock rl(&old_commit_map_mutex_);
409 410 411 412 413 414 415
    auto prep_set_entry = old_commit_map_.find(snapshot_seq);
    bool found = prep_set_entry != old_commit_map_.end();
    if (found) {
      auto& vec = prep_set_entry->second;
      found = std::binary_search(vec.begin(), vec.end(), prep_seq);
    }
    if (!found) {
416
      ROCKS_LOG_DETAILS(
M
Maysam Yabandeh 已提交
417 418
          info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
          prep_seq, snapshot_seq, 1);
419 420 421 422
      return true;
    }
  }
  // (ii) it the case: it is committed but after the snapshot_seq
423 424 425
  ROCKS_LOG_DETAILS(info_log_,
                    "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
                    prep_seq, snapshot_seq, 0);
426 427 428 429
  return false;
}

void WritePreparedTxnDB::AddPrepared(uint64_t seq) {
430
  ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Prepareing", seq);
431
  assert(seq > max_evicted_seq_);
432 433 434 435 436
  if (seq <= max_evicted_seq_) {
    throw std::runtime_error(
        "Added prepare_seq is larger than max_evicted_seq_: " + ToString(seq) +
        " <= " + ToString(max_evicted_seq_.load()));
  }
437 438 439 440 441
  WriteLock wl(&prepared_mutex_);
  prepared_txns_.push(seq);
}

void WritePreparedTxnDB::RollbackPrepared(uint64_t prep_seq,
A
Andrew Kryczka 已提交
442
                                          uint64_t /*rollback_seq*/) {
443
  ROCKS_LOG_DETAILS(
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468
      info_log_, "Txn %" PRIu64 " rolling back with rollback seq of " PRIu64 "",
      prep_seq, rollback_seq);
  std::vector<SequenceNumber> snapshots =
      GetSnapshotListFromDB(kMaxSequenceNumber);
  // TODO(myabandeh): currently we are assuming that there is no snapshot taken
  // when a transaciton is rolled back. This is the case the way MySQL does
  // rollback which is after recovery. We should extend it to be able to
  // rollback txns that overlap with exsiting snapshots.
  assert(snapshots.size() == 0);
  if (snapshots.size()) {
    throw std::runtime_error(
        "Rollback reqeust while there are live snapshots.");
  }
  WriteLock wl(&prepared_mutex_);
  prepared_txns_.erase(prep_seq);
  bool was_empty = delayed_prepared_.empty();
  if (!was_empty) {
    delayed_prepared_.erase(prep_seq);
    bool is_empty = delayed_prepared_.empty();
    if (was_empty != is_empty) {
      delayed_prepared_empty_.store(is_empty, std::memory_order_release);
    }
  }
}

469 470
void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
                                      bool prepare_skipped, uint8_t loop_cnt) {
471 472 473 474
  ROCKS_LOG_DETAILS(info_log_,
                    "Txn %" PRIu64 " Committing with %" PRIu64
                    "(prepare_skipped=%d)",
                    prepare_seq, commit_seq, prepare_skipped);
475 476
  TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start");
  TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause");
477 478 479 480
  auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE;
  CommitEntry64b evicted_64b;
  CommitEntry evicted;
  bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted_64b, &evicted);
481 482
  if (LIKELY(to_be_evicted)) {
    assert(evicted.prep_seq != prepare_seq);
483
    auto prev_max = max_evicted_seq_.load(std::memory_order_acquire);
484 485 486
    ROCKS_LOG_DETAILS(info_log_,
                      "Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64,
                      evicted.prep_seq, evicted.commit_seq, prev_max);
487 488 489 490 491 492 493 494 495 496 497
    if (prev_max < evicted.commit_seq) {
      // Inc max in larger steps to avoid frequent updates
      auto max_evicted_seq = evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED;
      AdvanceMaxEvictedSeq(prev_max, max_evicted_seq);
    }
    // After each eviction from commit cache, check if the commit entry should
    // be kept around because it overlaps with a live snapshot.
    CheckAgainstSnapshots(evicted);
  }
  bool succ =
      ExchangeCommitEntry(indexed_seq, evicted_64b, {prepare_seq, commit_seq});
498 499 500 501 502
  if (UNLIKELY(!succ)) {
    ROCKS_LOG_ERROR(info_log_,
                    "ExchangeCommitEntry failed on [%" PRIu64 "] %" PRIu64
                    ",%" PRIu64 " retrying...",
                    indexed_seq, prepare_seq, commit_seq);
503 504
    // A very rare event, in which the commit entry is updated before we do.
    // Here we apply a very simple solution of retrying.
505 506 507 508
    if (loop_cnt > 100) {
      throw std::runtime_error("Infinite loop in AddCommitted!");
    }
    AddCommitted(prepare_seq, commit_seq, prepare_skipped, ++loop_cnt);
509 510
    return;
  }
511
  if (!prepare_skipped) {
512 513 514 515 516 517 518 519 520 521 522
    WriteLock wl(&prepared_mutex_);
    prepared_txns_.erase(prepare_seq);
    bool was_empty = delayed_prepared_.empty();
    if (!was_empty) {
      delayed_prepared_.erase(prepare_seq);
      bool is_empty = delayed_prepared_.empty();
      if (was_empty != is_empty) {
        delayed_prepared_empty_.store(is_empty, std::memory_order_release);
      }
    }
  }
523 524
  TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end");
  TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end:pause");
525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555
}

bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq,
                                        CommitEntry64b* entry_64b,
                                        CommitEntry* entry) const {
  *entry_64b = commit_cache_[indexed_seq].load(std::memory_order_acquire);
  bool valid = entry_64b->Parse(indexed_seq, entry, FORMAT);
  return valid;
}

bool WritePreparedTxnDB::AddCommitEntry(const uint64_t indexed_seq,
                                        const CommitEntry& new_entry,
                                        CommitEntry* evicted_entry) {
  CommitEntry64b new_entry_64b(new_entry, FORMAT);
  CommitEntry64b evicted_entry_64b = commit_cache_[indexed_seq].exchange(
      new_entry_64b, std::memory_order_acq_rel);
  bool valid = evicted_entry_64b.Parse(indexed_seq, evicted_entry, FORMAT);
  return valid;
}

bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq,
                                             CommitEntry64b& expected_entry_64b,
                                             const CommitEntry& new_entry) {
  auto& atomic_entry = commit_cache_[indexed_seq];
  CommitEntry64b new_entry_64b(new_entry, FORMAT);
  bool succ = atomic_entry.compare_exchange_strong(
      expected_entry_64b, new_entry_64b, std::memory_order_acq_rel,
      std::memory_order_acquire);
  return succ;
}

556 557 558 559 560
void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
                                              const SequenceNumber& new_max) {
  ROCKS_LOG_DETAILS(info_log_,
                    "AdvanceMaxEvictedSeq overhead %" PRIu64 " => %" PRIu64,
                    prev_max, new_max);
561 562 563 564 565 566 567 568 569 570
  // When max_evicted_seq_ advances, move older entries from prepared_txns_
  // to delayed_prepared_. This guarantees that if a seq is lower than max,
  // then it is not in prepared_txns_ ans save an expensive, synchronized
  // lookup from a shared set. delayed_prepared_ is expected to be empty in
  // normal cases.
  {
    WriteLock wl(&prepared_mutex_);
    while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) {
      auto to_be_popped = prepared_txns_.top();
      delayed_prepared_.insert(to_be_popped);
571 572 573 574 575 576
      // TODO(myabandeh): also add a stat
      ROCKS_LOG_WARN(info_log_,
                     "prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64
                     " new_max=%" PRIu64 " oldmax=%" PRIu64,
                     static_cast<uint64_t>(delayed_prepared_.size()),
                     to_be_popped, new_max, prev_max);
577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598
      prepared_txns_.pop();
      delayed_prepared_empty_.store(false, std::memory_order_release);
    }
  }

  // With each change to max_evicted_seq_ fetch the live snapshots behind it.
  // We use max as the version of snapshots to identify how fresh are the
  // snapshot list. This works because the snapshots are between 0 and
  // max, so the larger the max, the more complete they are.
  SequenceNumber new_snapshots_version = new_max;
  std::vector<SequenceNumber> snapshots;
  bool update_snapshots = false;
  if (new_snapshots_version > snapshots_version_) {
    // This is to avoid updating the snapshots_ if it already updated
    // with a more recent vesion by a concrrent thread
    update_snapshots = true;
    // We only care about snapshots lower then max
    snapshots = GetSnapshotListFromDB(new_max);
  }
  if (update_snapshots) {
    UpdateSnapshots(snapshots, new_snapshots_version);
  }
599 600 601 602 603
  auto updated_prev_max = prev_max;
  while (updated_prev_max < new_max &&
         !max_evicted_seq_.compare_exchange_weak(updated_prev_max, new_max,
                                                 std::memory_order_acq_rel,
                                                 std::memory_order_relaxed)) {
604 605 606 607 608
  };
}

const std::vector<SequenceNumber> WritePreparedTxnDB::GetSnapshotListFromDB(
    SequenceNumber max) {
609
  ROCKS_LOG_DETAILS(info_log_, "GetSnapshotListFromDB with max %" PRIu64, max);
610 611 612 613
  InstrumentedMutex(db_impl_->mutex());
  return db_impl_->snapshots().GetAll(nullptr, max);
}

614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630
void WritePreparedTxnDB::ReleaseSnapshot(const Snapshot* snapshot) {
  auto snap_seq = snapshot->GetSequenceNumber();
  ReleaseSnapshotInternal(snap_seq);
  db_impl_->ReleaseSnapshot(snapshot);
}

void WritePreparedTxnDB::ReleaseSnapshotInternal(
    const SequenceNumber snap_seq) {
  // relax is enough since max increases monotonically, i.e., if snap_seq <
  // old_max => snap_seq < new_max as well.
  if (snap_seq < max_evicted_seq_.load(std::memory_order_relaxed)) {
    // Then this is a rare case that transaction did not finish before max
    // advances. It is expected for a few read-only backup snapshots. For such
    // snapshots we might have kept around a couple of entries in the
    // old_commit_map_. Check and do garbage collection if that is the case.
    bool need_gc = false;
    {
631 632
      // TODO(myabandeh): also add a stat
      ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
633 634 635 636 637
      ReadLock rl(&old_commit_map_mutex_);
      auto prep_set_entry = old_commit_map_.find(snap_seq);
      need_gc = prep_set_entry != old_commit_map_.end();
    }
    if (need_gc) {
638 639
      // TODO(myabandeh): also add a stat
      ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
640 641 642 643 644 645 646 647
      WriteLock wl(&old_commit_map_mutex_);
      old_commit_map_.erase(snap_seq);
      old_commit_map_empty_.store(old_commit_map_.empty(),
                                  std::memory_order_release);
    }
  }
}

648 649 650
void WritePreparedTxnDB::UpdateSnapshots(
    const std::vector<SequenceNumber>& snapshots,
    const SequenceNumber& version) {
651 652
  ROCKS_LOG_DETAILS(info_log_, "UpdateSnapshots with version %" PRIu64,
                    version);
653 654 655 656 657
  TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:start");
  TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:start");
#ifndef NDEBUG
  size_t sync_i = 0;
#endif
658 659
  // TODO(myabandeh): replace me with a stat
  ROCKS_LOG_WARN(info_log_, "snapshots_mutex_ overhead");
660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738
  WriteLock wl(&snapshots_mutex_);
  snapshots_version_ = version;
  // We update the list concurrently with the readers.
  // Both new and old lists are sorted and the new list is subset of the
  // previous list plus some new items. Thus if a snapshot repeats in
  // both new and old lists, it will appear upper in the new list. So if
  // we simply insert the new snapshots in order, if an overwritten item
  // is still valid in the new list is either written to the same place in
  // the array or it is written in a higher palce before it gets
  // overwritten by another item. This guarantess a reader that reads the
  // list bottom-up will eventaully see a snapshot that repeats in the
  // update, either before it gets overwritten by the writer or
  // afterwards.
  size_t i = 0;
  auto it = snapshots.begin();
  for (; it != snapshots.end() && i < SNAPSHOT_CACHE_SIZE; it++, i++) {
    snapshot_cache_[i].store(*it, std::memory_order_release);
    TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", ++sync_i);
    TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
  }
#ifndef NDEBUG
  // Release the remaining sync points since they are useless given that the
  // reader would also use lock to access snapshots
  for (++sync_i; sync_i <= 10; ++sync_i) {
    TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", sync_i);
    TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
  }
#endif
  snapshots_.clear();
  for (; it != snapshots.end(); it++) {
    // Insert them to a vector that is less efficient to access
    // concurrently
    snapshots_.push_back(*it);
  }
  // Update the size at the end. Otherwise a parallel reader might read
  // items that are not set yet.
  snapshots_total_.store(snapshots.size(), std::memory_order_release);
  TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:end");
  TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:end");
}

void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) {
  TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:start");
  TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:start");
#ifndef NDEBUG
  size_t sync_i = 0;
#endif
  // First check the snapshot cache that is efficient for concurrent access
  auto cnt = snapshots_total_.load(std::memory_order_acquire);
  // The list might get updated concurrently as we are reading from it. The
  // reader should be able to read all the snapshots that are still valid
  // after the update. Since the survived snapshots are written in a higher
  // place before gets overwritten the reader that reads bottom-up will
  // eventully see it.
  const bool next_is_larger = true;
  SequenceNumber snapshot_seq = kMaxSequenceNumber;
  size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE);
  for (; 0 < ip1; ip1--) {
    snapshot_seq = snapshot_cache_[ip1 - 1].load(std::memory_order_acquire);
    TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:",
                        ++sync_i);
    TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
    if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
                                 snapshot_seq, !next_is_larger)) {
      break;
    }
  }
#ifndef NDEBUG
  // Release the remaining sync points before accquiring the lock
  for (++sync_i; sync_i <= 10; ++sync_i) {
    TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", sync_i);
    TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
  }
#endif
  TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:end");
  TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:end");
  if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && ip1 == SNAPSHOT_CACHE_SIZE &&
               snapshot_seq < evicted.prep_seq)) {
    // Then access the less efficient list of snapshots_
739 740
    // TODO(myabandeh): also add a stat
    ROCKS_LOG_WARN(info_log_, "snapshots_mutex_ overhead");
741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763
    ReadLock rl(&snapshots_mutex_);
    // Items could have moved from the snapshots_ to snapshot_cache_ before
    // accquiring the lock. To make sure that we do not miss a valid snapshot,
    // read snapshot_cache_ again while holding the lock.
    for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) {
      snapshot_seq = snapshot_cache_[i].load(std::memory_order_acquire);
      if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
                                   snapshot_seq, next_is_larger)) {
        break;
      }
    }
    for (auto snapshot_seq_2 : snapshots_) {
      if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
                                   snapshot_seq_2, next_is_larger)) {
        break;
      }
    }
  }
}

bool WritePreparedTxnDB::MaybeUpdateOldCommitMap(
    const uint64_t& prep_seq, const uint64_t& commit_seq,
    const uint64_t& snapshot_seq, const bool next_is_larger = true) {
764 765
  // If we do not store an entry in old_commit_map_ we assume it is committed in
  // all snapshots. If commit_seq <= snapshot_seq, it is considered already in
766 767 768 769 770 771 772
  // the snapshot so we need not to keep the entry around for this snapshot.
  if (commit_seq <= snapshot_seq) {
    // continue the search if the next snapshot could be smaller than commit_seq
    return !next_is_larger;
  }
  // then snapshot_seq < commit_seq
  if (prep_seq <= snapshot_seq) {  // overlapping range
773 774
    ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
    // TODO(myabandeh): also add a stat
775 776
    WriteLock wl(&old_commit_map_mutex_);
    old_commit_map_empty_.store(false, std::memory_order_release);
777 778 779 780 781
    auto& vec = old_commit_map_[snapshot_seq];
    vec.insert(std::upper_bound(vec.begin(), vec.end(), prep_seq), prep_seq);
    // We need to store it once for each overlapping snapshot. Returning true to
    // continue the search if there is more overlapping snapshot.
    return true;
782 783 784 785 786 787 788 789 790 791 792 793
  }
  // continue the search if the next snapshot could be larger than prep_seq
  return next_is_larger;
}

WritePreparedTxnDB::~WritePreparedTxnDB() {
  // At this point there could be running compaction/flush holding a
  // SnapshotChecker, which holds a pointer back to WritePreparedTxnDB.
  // Make sure those jobs finished before destructing WritePreparedTxnDB.
  db_impl_->CancelAllBackgroundWork(true /*wait*/);
}

794 795 796 797 798
void SubBatchCounter::InitWithComp(const uint32_t cf) {
  auto cmp = comparators_[cf];
  keys_[cf] = CFKeys(SetComparator(cmp));
}

799 800 801
void SubBatchCounter::AddKey(const uint32_t cf, const Slice& key) {
  CFKeys& cf_keys = keys_[cf];
  if (cf_keys.size() == 0) {  // just inserted
802
    InitWithComp(cf);
803 804 805 806 807
  }
  auto it = cf_keys.insert(key);
  if (it.second == false) {  // second is false if a element already existed.
    batches_++;
    keys_.clear();
808
    InitWithComp(cf);
809 810 811 812
    keys_[cf].insert(key);
  }
}

813 814
}  //  namespace rocksdb
#endif  // ROCKSDB_LITE