write_unprepared_txn.h 12.7 KB
Newer Older
1 2 3 4 5 6 7 8 9
// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).

#pragma once

#ifndef ROCKSDB_LITE

10 11
#include <set>

12
#include "utilities/transactions/write_prepared_txn.h"
13
#include "utilities/transactions/write_unprepared_txn_db.h"
14 15 16

namespace rocksdb {

17 18 19
class WriteUnpreparedTxnDB;
class WriteUnpreparedTxn;

20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
// WriteUnprepared transactions needs to be able to read their own uncommitted
// writes, and supporting this requires some careful consideration. Because
// writes in the current transaction may be flushed to DB already, we cannot
// rely on the contents of WriteBatchWithIndex to determine whether a key should
// be visible or not, so we have to remember to check the DB for any uncommitted
// keys that should be visible to us. First, we will need to change the seek to
// snapshot logic, to seek to max_visible_seq = max(snap_seq, max_unprep_seq).
// Any key greater than max_visible_seq should not be visible because they
// cannot be unprepared by the current transaction and they are not in its
// snapshot.
//
// When we seek to max_visible_seq, one of these cases will happen:
// 1. We hit a unprepared key from the current transaction.
// 2. We hit a unprepared key from the another transaction.
// 3. We hit a committed key with snap_seq < seq < max_unprep_seq.
// 4. We hit a committed key with seq <= snap_seq.
//
// IsVisibleFullCheck handles all cases correctly.
//
// Other notes:
// Note that max_visible_seq is only calculated once at iterator construction
// time, meaning if the same transaction is adding more unprep seqs through
// writes during iteration, these newer writes may not be visible. This is not a
// problem for MySQL though because it avoids modifying the index as it is
// scanning through it to avoid the Halloween Problem. Instead, it scans the
// index once up front, and modifies based on a temporary copy.
//
// In DBIter, there is a "reseek" optimization if the iterator skips over too
// many keys. However, this assumes that the reseek seeks exactly to the
// required key. In write unprepared, even after seeking directly to
// max_visible_seq, some iteration may be required before hitting a visible key,
// and special precautions must be taken to avoid performing another reseek,
// leading to an infinite loop.
//
54 55
class WriteUnpreparedTxnReadCallback : public ReadCallback {
 public:
56 57 58
  WriteUnpreparedTxnReadCallback(
      WritePreparedTxnDB* db, SequenceNumber snapshot,
      SequenceNumber min_uncommitted,
59 60
      const std::map<SequenceNumber, size_t>& unprep_seqs,
      SnapshotBackup backed_by_snapshot)
61 62
      // Pass our last uncommitted seq as the snapshot to the parent class to
      // ensure that the parent will not prematurely filter out own writes. We
63
      // will do the exact comparison against snapshots in IsVisibleFullCheck
64
      // override.
65
      : ReadCallback(CalcMaxVisibleSeq(unprep_seqs, snapshot), min_uncommitted),
66
        db_(db),
67
        unprep_seqs_(unprep_seqs),
68 69 70 71 72 73 74 75 76
        wup_snapshot_(snapshot),
        backed_by_snapshot_(backed_by_snapshot) {
    (void)backed_by_snapshot_;  // to silence unused private field warning
  }

  virtual ~WriteUnpreparedTxnReadCallback() {
    // If it is not backed by snapshot, the caller must check validity
    assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot);
  }
77 78

  virtual bool IsVisibleFullCheck(SequenceNumber seq) override;
79

80 81 82 83 84
  inline bool valid() {
    valid_checked_ = true;
    return snap_released_ == false;
  }

85 86 87 88 89
  void Refresh(SequenceNumber seq) override {
    max_visible_seq_ = std::max(max_visible_seq_, seq);
    wup_snapshot_ = seq;
  }

90 91 92 93 94 95 96 97
  static SequenceNumber CalcMaxVisibleSeq(
      const std::map<SequenceNumber, size_t>& unprep_seqs,
      SequenceNumber snapshot_seq) {
    SequenceNumber max_unprepared = 0;
    if (unprep_seqs.size()) {
      max_unprepared =
          unprep_seqs.rbegin()->first + unprep_seqs.rbegin()->second - 1;
    }
98 99
    return std::max(max_unprepared, snapshot_seq);
  }
100 101

 private:
102
  WritePreparedTxnDB* db_;
103
  const std::map<SequenceNumber, size_t>& unprep_seqs_;
104
  SequenceNumber wup_snapshot_;
105 106 107 108 109
  // Whether max_visible_seq_ is backed by a snapshot
  const SnapshotBackup backed_by_snapshot_;
  bool snap_released_ = false;
  // Safety check to ensure that the caller has checked invalid statuses
  bool valid_checked_ = false;
110 111
};

112
class WriteUnpreparedTxn : public WritePreparedTxn {
113 114 115 116 117
 public:
  WriteUnpreparedTxn(WriteUnpreparedTxnDB* db,
                     const WriteOptions& write_options,
                     const TransactionOptions& txn_options);

118 119 120 121
  virtual ~WriteUnpreparedTxn();

  using TransactionBaseImpl::Put;
  virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key,
122 123
                     const Slice& value,
                     const bool assume_tracked = false) override;
124
  virtual Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
125 126
                     const SliceParts& value,
                     const bool assume_tracked = false) override;
127 128 129

  using TransactionBaseImpl::Merge;
  virtual Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
130 131
                       const Slice& value,
                       const bool assume_tracked = false) override;
132 133

  using TransactionBaseImpl::Delete;
134 135
  virtual Status Delete(ColumnFamilyHandle* column_family, const Slice& key,
                        const bool assume_tracked = false) override;
136
  virtual Status Delete(ColumnFamilyHandle* column_family,
137 138
                        const SliceParts& key,
                        const bool assume_tracked = false) override;
139 140 141

  using TransactionBaseImpl::SingleDelete;
  virtual Status SingleDelete(ColumnFamilyHandle* column_family,
142 143
                              const Slice& key,
                              const bool assume_tracked = false) override;
144
  virtual Status SingleDelete(ColumnFamilyHandle* column_family,
145 146
                              const SliceParts& key,
                              const bool assume_tracked = false) override;
147

148 149 150 151 152 153 154 155 156
  // In WriteUnprepared, untracked writes will break snapshot validation logic.
  // Snapshot validation will only check the largest sequence number of a key to
  // see if it was committed or not. However, an untracked unprepared write will
  // hide smaller committed sequence numbers.
  //
  // TODO(lth): Investigate whether it is worth having snapshot validation
  // validate all values larger than snap_seq. Otherwise, we should return
  // Status::NotSupported for untracked writes.

157
  virtual Status RebuildFromWriteBatch(WriteBatch*) override;
158

159 160 161 162
  virtual uint64_t GetLastLogNumber() const override {
    return last_log_number_;
  }

163 164 165 166 167 168 169 170 171
 protected:
  void Initialize(const TransactionOptions& txn_options) override;

  Status PrepareInternal() override;

  Status CommitWithoutPrepareInternal() override;
  Status CommitInternal() override;

  Status RollbackInternal() override;
172

173 174
  void Clear() override;

175 176 177 178
  void SetSavePoint() override;
  Status RollbackToSavePoint() override;
  Status PopSavePoint() override;

179 180 181 182 183 184 185
  // Get and GetIterator needs to be overridden so that a ReadCallback to
  // handle read-your-own-write is used.
  using Transaction::Get;
  virtual Status Get(const ReadOptions& options,
                     ColumnFamilyHandle* column_family, const Slice& key,
                     PinnableSlice* value) override;

186 187 188 189 190 191 192
  using Transaction::MultiGet;
  virtual void MultiGet(const ReadOptions& options,
                        ColumnFamilyHandle* column_family,
                        const size_t num_keys, const Slice* keys,
                        PinnableSlice* values, Status* statuses,
                        bool sorted_input = false) override;

193 194 195 196 197
  using Transaction::GetIterator;
  virtual Iterator* GetIterator(const ReadOptions& options) override;
  virtual Iterator* GetIterator(const ReadOptions& options,
                                ColumnFamilyHandle* column_family) override;

198 199 200 201
  virtual Status ValidateSnapshot(ColumnFamilyHandle* column_family,
                                  const Slice& key,
                                  SequenceNumber* tracked_at_seq) override;

202 203
 private:
  friend class WriteUnpreparedTransactionTest_ReadYourOwnWrite_Test;
204 205
  friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
  friend class WriteUnpreparedTransactionTest_UnpreparedBatch_Test;
206
  friend class WriteUnpreparedTxnDB;
207

208 209
  const std::map<SequenceNumber, size_t>& GetUnpreparedSequenceNumbers();

210 211
  Status MaybeFlushWriteBatchToDB();
  Status FlushWriteBatchToDB(bool prepared);
212 213 214
  Status FlushWriteBatchToDBInternal(bool prepared);
  Status FlushWriteBatchWithSavePointToDB();
  Status RollbackToSavePointInternal();
215
  Status HandleWrite(std::function<Status()> do_write);
216 217

  // For write unprepared, we check on every writebatch append to see if
218
  // write_batch_flush_threshold_ has been exceeded, and then call
219 220
  // FlushWriteBatchToDB if so. This logic is encapsulated in
  // MaybeFlushWriteBatchToDB.
221
  int64_t write_batch_flush_threshold_;
222
  WriteUnpreparedTxnDB* wupt_db_;
223

224 225 226
  // Ordered list of unprep_seq sequence numbers that we have already written
  // to DB.
  //
227 228 229 230 231 232
  // This maps unprep_seq => prepare_batch_cnt for each unprepared batch
  // written by this transaction.
  //
  // Note that this contains both prepared and unprepared batches, since they
  // are treated similarily in prepare heap/commit map, so it simplifies the
  // commit callbacks.
233
  std::map<SequenceNumber, size_t> unprep_seqs_;
234

235 236
  uint64_t last_log_number_;

237 238 239 240
  // Recovered transactions have tracked_keys_ populated, but are not actually
  // locked for efficiency reasons. For recovered transactions, skip unlocking
  // keys when transaction ends.
  bool recovered_txn_;
241 242 243 244 245 246 247 248 249 250 251 252 253 254

  // Track the largest sequence number at which we performed snapshot
  // validation. If snapshot validation was skipped because no snapshot was set,
  // then this is set to kMaxSequenceNumber. This value is useful because it
  // means that for keys that have unprepared seqnos, we can guarantee that no
  // committed keys by other transactions can exist between
  // largest_validated_seq_ and max_unprep_seq. See
  // WriteUnpreparedTxnDB::NewIterator for an explanation for why this is
  // necessary for iterator Prev().
  //
  // Currently this value only increases during the lifetime of a transaction,
  // but in some cases, we should be able to restore the previously largest
  // value when calling RollbackToSavepoint.
  SequenceNumber largest_validated_seq_;
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304

  struct SavePoint {
    // Record of unprep_seqs_ at this savepoint. The set of unprep_seq is
    // used during RollbackToSavepoint to determine visibility when restoring
    // old values.
    //
    // TODO(lth): Since all unprep_seqs_ sets further down the stack must be
    // subsets, this can potentially be deduplicated by just storing set
    // difference. Investigate if this is worth it.
    std::map<SequenceNumber, size_t> unprep_seqs_;

    // This snapshot will be used to read keys at this savepoint if we call
    // RollbackToSavePoint.
    std::unique_ptr<ManagedSnapshot> snapshot_;

    SavePoint(const std::map<SequenceNumber, size_t>& seqs,
              ManagedSnapshot* snapshot)
        : unprep_seqs_(seqs), snapshot_(snapshot){};
  };

  // We have 3 data structures holding savepoint information:
  // 1. TransactionBaseImpl::save_points_
  // 2. WriteUnpreparedTxn::flushed_save_points_
  // 3. WriteUnpreparecTxn::unflushed_save_points_
  //
  // TransactionBaseImpl::save_points_ holds information about all write
  // batches, including the current in-memory write_batch_, or unprepared
  // batches that have been written out. Its responsibility is just to track
  // which keys have been modified in every savepoint.
  //
  // WriteUnpreparedTxn::flushed_save_points_ holds information about savepoints
  // set on unprepared batches that have already flushed. It holds the snapshot
  // and unprep_seqs at that savepoint, so that the rollback process can
  // determine which keys were visible at that point in time.
  //
  // WriteUnpreparecTxn::unflushed_save_points_ holds information about
  // savepoints on the current in-memory write_batch_. It simply records the
  // size of the write batch at every savepoint.
  //
  // TODO(lth): Remove the redundancy between save_point_boundaries_ and
  // write_batch_.save_points_.
  //
  // Based on this information, here are some invariants:
  // size(unflushed_save_points_) = size(write_batch_.save_points_)
  // size(flushed_save_points_) + size(unflushed_save_points_)
  //   = size(save_points_)
  //
  std::unique_ptr<autovector<WriteUnpreparedTxn::SavePoint>>
      flushed_save_points_;
  std::unique_ptr<autovector<size_t>> unflushed_save_points_;
305 306 307 308 309
};

}  // namespace rocksdb

#endif  // ROCKSDB_LITE