write_unprepared_txn.h 5.0 KB
Newer Older
1 2 3 4 5 6 7 8 9
// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).

#pragma once

#ifndef ROCKSDB_LITE

10 11
#include <set>

12
#include "utilities/transactions/write_prepared_txn.h"
13
#include "utilities/transactions/write_unprepared_txn_db.h"
14 15 16

namespace rocksdb {

17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
class WriteUnpreparedTxnDB;
class WriteUnpreparedTxn;

class WriteUnpreparedTxnReadCallback : public ReadCallback {
 public:
  WriteUnpreparedTxnReadCallback(WritePreparedTxnDB* db,
                                 SequenceNumber snapshot,
                                 SequenceNumber min_uncommitted,
                                 WriteUnpreparedTxn* txn)
      : db_(db),
        snapshot_(snapshot),
        min_uncommitted_(min_uncommitted),
        txn_(txn) {}

  virtual bool IsVisible(SequenceNumber seq) override;
  virtual SequenceNumber MaxUnpreparedSequenceNumber() override;

 private:
  WritePreparedTxnDB* db_;
  SequenceNumber snapshot_;
  SequenceNumber min_uncommitted_;
  WriteUnpreparedTxn* txn_;
};

41
class WriteUnpreparedTxn : public WritePreparedTxn {
42 43 44 45 46
 public:
  WriteUnpreparedTxn(WriteUnpreparedTxnDB* db,
                     const WriteOptions& write_options,
                     const TransactionOptions& txn_options);

47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
  virtual ~WriteUnpreparedTxn();

  using TransactionBaseImpl::Put;
  virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key,
                     const Slice& value) override;
  virtual Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
                     const SliceParts& value) override;

  using TransactionBaseImpl::Merge;
  virtual Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
                       const Slice& value) override;

  using TransactionBaseImpl::Delete;
  virtual Status Delete(ColumnFamilyHandle* column_family,
                        const Slice& key) override;
  virtual Status Delete(ColumnFamilyHandle* column_family,
                        const SliceParts& key) override;

  using TransactionBaseImpl::SingleDelete;
  virtual Status SingleDelete(ColumnFamilyHandle* column_family,
                              const Slice& key) override;
  virtual Status SingleDelete(ColumnFamilyHandle* column_family,
                              const SliceParts& key) override;

  virtual Status RebuildFromWriteBatch(WriteBatch*) override {
    // This function was only useful for recovering prepared transactions, but
    // is unused for write prepared because a transaction may consist of
    // multiple write batches.
    //
    // If there are use cases outside of recovery that can make use of this,
    // then support could be added.
    return Status::NotSupported("Not supported for WriteUnprepared");
  }

  const std::map<SequenceNumber, size_t>& GetUnpreparedSequenceNumbers();

  void UpdateWriteKeySet(uint32_t cfid, const Slice& key);

 protected:
  void Initialize(const TransactionOptions& txn_options) override;

  Status PrepareInternal() override;

  Status CommitWithoutPrepareInternal() override;
  Status CommitInternal() override;

  Status RollbackInternal() override;
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108

  // Get and GetIterator needs to be overridden so that a ReadCallback to
  // handle read-your-own-write is used.
  using Transaction::Get;
  virtual Status Get(const ReadOptions& options,
                     ColumnFamilyHandle* column_family, const Slice& key,
                     PinnableSlice* value) override;

  using Transaction::GetIterator;
  virtual Iterator* GetIterator(const ReadOptions& options) override;
  virtual Iterator* GetIterator(const ReadOptions& options,
                                ColumnFamilyHandle* column_family) override;

 private:
  friend class WriteUnpreparedTransactionTest_ReadYourOwnWrite_Test;
109 110
  friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
  friend class WriteUnpreparedTransactionTest_UnpreparedBatch_Test;
111
  friend class WriteUnpreparedTxnDB;
112

113 114 115 116 117 118 119 120
  Status MaybeFlushWriteBatchToDB();
  Status FlushWriteBatchToDB(bool prepared);

  // For write unprepared, we check on every writebatch append to see if
  // max_write_batch_size_ has been exceeded, and then call
  // FlushWriteBatchToDB if so. This logic is encapsulated in
  // MaybeFlushWriteBatchToDB.
  size_t max_write_batch_size_;
121
  WriteUnpreparedTxnDB* wupt_db_;
122

123 124 125
  // Ordered list of unprep_seq sequence numbers that we have already written
  // to DB.
  //
126 127 128 129 130 131
  // This maps unprep_seq => prepare_batch_cnt for each unprepared batch
  // written by this transaction.
  //
  // Note that this contains both prepared and unprepared batches, since they
  // are treated similarily in prepare heap/commit map, so it simplifies the
  // commit callbacks.
132
  std::map<SequenceNumber, size_t> unprep_seqs_;
133 134 135 136 137

  // Set of keys that have written to that have already been written to DB
  // (ie. not in write_batch_).
  //
  std::map<uint32_t, std::vector<std::string>> write_set_keys_;
138 139 140 141 142
};

}  // namespace rocksdb

#endif  // ROCKSDB_LITE