diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index 9b726822738e18493ec7f00f9194dc5ca4938f8f..4127e9b2b98e3835aca4204e139f2929a45b9965 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -93,12 +93,10 @@ class Transaction { virtual void SetSavePoint() = 0; // Undo all operations in this transaction (Put, Merge, Delete, PutLogData) - // since the - // most recent call to SetSavePoint() and removes the most recent + // since the most recent call to SetSavePoint() and removes the most recent // SetSavePoint(). - // If there is no previous call to SetSavePoint(), behaves the same as - // Rollback() - virtual void RollbackToSavePoint() = 0; + // If there is no previous call to SetSavePoint(), returns Status::NotFound() + virtual Status RollbackToSavePoint() = 0; // This function is similar to DB::Get() except it will also read pending // changes in this transaction. diff --git a/utilities/transactions/optimistic_transaction_impl.cc b/utilities/transactions/optimistic_transaction_impl.cc index 1defd32a6dea51215512839032e0bea996753e6b..f3f184d3e12fcee904f60e2dfca549d8ebcb0f6e 100644 --- a/utilities/transactions/optimistic_transaction_impl.cc +++ b/utilities/transactions/optimistic_transaction_impl.cc @@ -30,30 +30,24 @@ OptimisticTransactionImpl::OptimisticTransactionImpl( : txn_db_(txn_db), db_(txn_db->GetBaseDB()), write_options_(write_options), - snapshot_(nullptr), cmp_(txn_options.cmp), write_batch_(new WriteBatchWithIndex(txn_options.cmp, 0, true)) { if (txn_options.set_snapshot) { SetSnapshot(); - } else { - start_sequence_number_ = db_->GetLatestSequenceNumber(); } } OptimisticTransactionImpl::~OptimisticTransactionImpl() { +} + +void OptimisticTransactionImpl::Cleanup() { tracked_keys_.clear(); - if (snapshot_ != nullptr) { - db_->ReleaseSnapshot(snapshot_); - } + save_points_.reset(nullptr); + write_batch_->Clear(); } void OptimisticTransactionImpl::SetSnapshot() { - if (snapshot_ != nullptr) { - db_->ReleaseSnapshot(snapshot_); - } - - snapshot_ = db_->GetSnapshot(); - start_sequence_number_ = snapshot_->GetSequenceNumber(); + snapshot_.reset(new ManagedSnapshot(db_)); } Status OptimisticTransactionImpl::Commit() { @@ -73,66 +67,38 @@ Status OptimisticTransactionImpl::Commit() { write_options_, write_batch_->GetWriteBatch(), &callback); if (s.ok()) { - tracked_keys_.clear(); - write_batch_->Clear(); - num_entries_ = 0; + Cleanup(); } return s; } void OptimisticTransactionImpl::Rollback() { - tracked_keys_.clear(); - write_batch_->Clear(); - num_entries_ = 0; + Cleanup(); } void OptimisticTransactionImpl::SetSavePoint() { - if (num_entries_ > 0) { - // If transaction is empty, no need to record anything. - - if (save_points_ == nullptr) { - save_points_.reset(new std::stack()); - } - save_points_->push(num_entries_); + if (save_points_ == nullptr) { + save_points_.reset(new std::stack>()); } + save_points_->push(snapshot_); + write_batch_->SetSavePoint(); } -void OptimisticTransactionImpl::RollbackToSavePoint() { - size_t savepoint_entries = 0; - +Status OptimisticTransactionImpl::RollbackToSavePoint() { if (save_points_ != nullptr && save_points_->size() > 0) { - savepoint_entries = save_points_->top(); + // Restore saved snapshot + snapshot_ = save_points_->top(); save_points_->pop(); - } - assert(savepoint_entries <= num_entries_); + // Rollback batch + Status s = write_batch_->RollbackToSavePoint(); + assert(s.ok()); - if (savepoint_entries == num_entries_) { - // No changes to rollback - } else if (savepoint_entries == 0) { - // Rollback everything - Rollback(); + return s; } else { - DBImpl* db_impl = dynamic_cast(db_->GetRootDB()); - assert(db_impl); - - WriteBatchWithIndex* new_batch = new WriteBatchWithIndex(cmp_, 0, true); - Status s = TransactionUtil::CopyFirstN( - savepoint_entries, write_batch_.get(), new_batch, db_impl); - - if (!s.ok()) { - // TODO: Should we change this function to return a Status or should we - // somehow make it - // so RollbackToSavePoint() can never fail?? - // Consider moving this functionality into WriteBatchWithIndex - fprintf(stderr, "STATUS: %s \n", s.ToString().c_str()); - delete new_batch; - } else { - write_batch_.reset(new_batch); - } - - num_entries_ = savepoint_entries; + assert(write_batch_->RollbackToSavePoint().IsNotFound()); + return Status::NotFound(); } } @@ -143,7 +109,7 @@ void OptimisticTransactionImpl::RecordOperation( SequenceNumber seq; if (snapshot_) { - seq = start_sequence_number_; + seq = snapshot_->snapshot()->GetSequenceNumber(); } else { seq = db_->GetLatestSequenceNumber(); } @@ -261,7 +227,6 @@ Status OptimisticTransactionImpl::Put(ColumnFamilyHandle* column_family, RecordOperation(column_family, key); write_batch_->Put(column_family, key, value); - num_entries_++; return Status::OK(); } @@ -272,7 +237,6 @@ Status OptimisticTransactionImpl::Put(ColumnFamilyHandle* column_family, RecordOperation(column_family, key); write_batch_->Put(column_family, key, value); - num_entries_++; return Status::OK(); } @@ -307,7 +271,6 @@ Status OptimisticTransactionImpl::Delete(ColumnFamilyHandle* column_family, Status OptimisticTransactionImpl::PutUntracked( ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { write_batch_->Put(column_family, key, value); - num_entries_++; return Status::OK(); } @@ -316,7 +279,6 @@ Status OptimisticTransactionImpl::PutUntracked( ColumnFamilyHandle* column_family, const SliceParts& key, const SliceParts& value) { write_batch_->Put(column_family, key, value); - num_entries_++; return Status::OK(); } @@ -324,7 +286,6 @@ Status OptimisticTransactionImpl::PutUntracked( Status OptimisticTransactionImpl::MergeUntracked( ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { write_batch_->Merge(column_family, key, value); - num_entries_++; return Status::OK(); } @@ -332,7 +293,6 @@ Status OptimisticTransactionImpl::MergeUntracked( Status OptimisticTransactionImpl::DeleteUntracked( ColumnFamilyHandle* column_family, const Slice& key) { write_batch_->Delete(column_family, key); - num_entries_++; return Status::OK(); } @@ -340,14 +300,12 @@ Status OptimisticTransactionImpl::DeleteUntracked( Status OptimisticTransactionImpl::DeleteUntracked( ColumnFamilyHandle* column_family, const SliceParts& key) { write_batch_->Delete(column_family, key); - num_entries_++; return Status::OK(); } void OptimisticTransactionImpl::PutLogData(const Slice& blob) { write_batch_->PutLogData(blob); - num_entries_++; } WriteBatchWithIndex* OptimisticTransactionImpl::GetWriteBatch() { diff --git a/utilities/transactions/optimistic_transaction_impl.h b/utilities/transactions/optimistic_transaction_impl.h index faf6a57948f9c754668c63f0fa56970a5bcb7085..c8f84c387ef38eaa16f25e5d923889f864ba2d36 100644 --- a/utilities/transactions/optimistic_transaction_impl.h +++ b/utilities/transactions/optimistic_transaction_impl.h @@ -15,6 +15,7 @@ #include "db/write_callback.h" #include "rocksdb/db.h" #include "rocksdb/slice.h" +#include "rocksdb/snapshot.h" #include "rocksdb/status.h" #include "rocksdb/types.h" #include "rocksdb/utilities/transaction.h" @@ -38,7 +39,7 @@ class OptimisticTransactionImpl : public Transaction { void SetSavePoint() override; - void RollbackToSavePoint() override; + Status RollbackToSavePoint() override; Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value) override; @@ -147,7 +148,9 @@ class OptimisticTransactionImpl : public Transaction { const TransactionKeyMap* GetTrackedKeys() const { return &tracked_keys_; } - const Snapshot* GetSnapshot() const override { return snapshot_; } + const Snapshot* GetSnapshot() const override { + return snapshot_ ? snapshot_->snapshot() : nullptr; + } void SetSnapshot() override; @@ -157,8 +160,7 @@ class OptimisticTransactionImpl : public Transaction { OptimisticTransactionDB* const txn_db_; DB* db_; const WriteOptions write_options_; - const Snapshot* snapshot_; - SequenceNumber start_sequence_number_; + std::shared_ptr snapshot_; const Comparator* cmp_; std::unique_ptr write_batch_; @@ -169,13 +171,9 @@ class OptimisticTransactionImpl : public Transaction { // not changed since this sequence number. TransactionKeyMap tracked_keys_; - // Records the number of entries currently in the WriteBatch including calls - // to - // Put, Merge, Delete, and PutLogData() - size_t num_entries_ = 0; - - // Stack of number of entries in write_batch at each save point - std::unique_ptr> save_points_; + // Stack of the Snapshot saved at each save point. Saved snapshots may be + // nullptr if there was no snapshot at the time SetSavePoint() was called. + std::unique_ptr>> save_points_; friend class OptimisticTransactionCallback; @@ -190,6 +188,8 @@ class OptimisticTransactionImpl : public Transaction { void RecordOperation(ColumnFamilyHandle* column_family, const SliceParts& key); + void Cleanup(); + // No copying allowed OptimisticTransactionImpl(const OptimisticTransactionImpl&); void operator=(const OptimisticTransactionImpl&); diff --git a/utilities/transactions/optimistic_transaction_test.cc b/utilities/transactions/optimistic_transaction_test.cc index 114fbb811bba57b38700d7b9b3b648460ee8c612..1efaa252d612f19ec883fc808863131000aee7ad 100644 --- a/utilities/transactions/optimistic_transaction_test.cc +++ b/utilities/transactions/optimistic_transaction_test.cc @@ -955,12 +955,14 @@ TEST_F(OptimisticTransactionTest, SavepointTest) { Transaction* txn = txn_db->BeginTransaction(write_options); ASSERT_TRUE(txn); - txn->RollbackToSavePoint(); + s = txn->RollbackToSavePoint(); + ASSERT_TRUE(s.IsNotFound()); txn->SetSavePoint(); // 1 - txn->RollbackToSavePoint(); // Rollback to beginning of txn - txn->RollbackToSavePoint(); + ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to beginning of txn + s = txn->RollbackToSavePoint(); + ASSERT_TRUE(s.IsNotFound()); s = txn->Put("B", "b"); ASSERT_OK(s); @@ -996,7 +998,7 @@ TEST_F(OptimisticTransactionTest, SavepointTest) { s = txn->Put("D", "d"); ASSERT_OK(s); - txn->RollbackToSavePoint(); // Rollback to 2 + ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 2 s = txn->Get(read_options, "A", &value); ASSERT_OK(s); @@ -1019,7 +1021,10 @@ TEST_F(OptimisticTransactionTest, SavepointTest) { s = txn->Put("E", "e"); ASSERT_OK(s); - txn->RollbackToSavePoint(); // Rollback to beginning of txn + // Rollback to beginning of txn + s = txn->RollbackToSavePoint(); + ASSERT_TRUE(s.IsNotFound()); + txn->Rollback(); s = txn->Get(read_options, "A", &value); ASSERT_TRUE(s.IsNotFound()); @@ -1065,7 +1070,7 @@ TEST_F(OptimisticTransactionTest, SavepointTest) { s = txn->Get(read_options, "B", &value); ASSERT_TRUE(s.IsNotFound()); - txn->RollbackToSavePoint(); // Rollback to 3 + ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 3 s = txn->Get(read_options, "F", &value); ASSERT_OK(s); diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/transaction_impl.cc index 1bbdfcac243c2804124d2775644a2e2604e55c86..d4a197e2af667db7cf3fb01789c8f4b39d7dc450 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/transaction_impl.cc @@ -16,6 +16,7 @@ #include "db/db_impl.h" #include "rocksdb/comparator.h" #include "rocksdb/db.h" +#include "rocksdb/snapshot.h" #include "rocksdb/status.h" #include "rocksdb/utilities/transaction_db.h" #include "util/string_util.h" @@ -39,7 +40,6 @@ TransactionImpl::TransactionImpl(TransactionDB* txn_db, txn_db_impl_(nullptr), txn_id_(GenTxnID()), write_options_(write_options), - snapshot_(nullptr), cmp_(GetColumnFamilyUserComparator(txn_db->DefaultColumnFamily())), write_batch_(new WriteBatchWithIndex(cmp_, 0, true)), start_time_( @@ -62,24 +62,15 @@ TransactionImpl::TransactionImpl(TransactionDB* txn_db, } TransactionImpl::~TransactionImpl() { - Cleanup(); - - if (snapshot_ != nullptr) { - db_->ReleaseSnapshot(snapshot_); - } + txn_db_impl_->UnLock(this, &tracked_keys_); } void TransactionImpl::SetSnapshot() { - if (snapshot_ != nullptr) { - db_->ReleaseSnapshot(snapshot_); - } - - snapshot_ = db_->GetSnapshot(); + snapshot_.reset(new ManagedSnapshot(db_)); } void TransactionImpl::Cleanup() { write_batch_->Clear(); - num_entries_ = 0; txn_db_impl_->UnLock(this, &tracked_keys_); tracked_keys_.clear(); save_points_.reset(nullptr); @@ -145,53 +136,27 @@ Status TransactionImpl::DoCommit(WriteBatch* batch) { void TransactionImpl::Rollback() { Cleanup(); } void TransactionImpl::SetSavePoint() { - if (num_entries_ > 0) { - // If transaction is empty, no need to record anything. - - if (save_points_ == nullptr) { - save_points_.reset(new std::stack()); - } - save_points_->push(num_entries_); + if (save_points_ == nullptr) { + save_points_.reset(new std::stack>()); } + save_points_->push(snapshot_); + write_batch_->SetSavePoint(); } -void TransactionImpl::RollbackToSavePoint() { - size_t savepoint_entries = 0; - +Status TransactionImpl::RollbackToSavePoint() { if (save_points_ != nullptr && save_points_->size() > 0) { - savepoint_entries = save_points_->top(); + // Restore saved snapshot + snapshot_ = save_points_->top(); save_points_->pop(); - } - assert(savepoint_entries <= num_entries_); + // Rollback batch + Status s = write_batch_->RollbackToSavePoint(); + assert(s.ok()); - if (savepoint_entries == num_entries_) { - // No changes to rollback - } else if (savepoint_entries == 0) { - // Rollback everything - Rollback(); + return s; } else { - assert(dynamic_cast(db_->GetBaseDB()) != nullptr); - auto db_impl = reinterpret_cast(db_->GetBaseDB()); - - WriteBatchWithIndex* new_batch = new WriteBatchWithIndex(cmp_, 0, true); - Status s = TransactionUtil::CopyFirstN( - savepoint_entries, write_batch_.get(), new_batch, db_impl); - if (!s.ok()) { - // TODO: Should we change this function to return a Status or should we - // somehow make it so RollbackToSavePoint() can never fail?? Not easy to - // handle the case where a client accesses a column family that's been - // dropped. - // After chatting with Siying, I'm going to send a diff that adds - // savepoint support in WriteBatchWithIndex and let reviewers decide which - // approach is cleaner. - fprintf(stderr, "STATUS: %s \n", s.ToString().c_str()); - delete new_batch; - } else { - write_batch_.reset(new_batch); - } - - num_entries_ = savepoint_entries; + assert(write_batch_->RollbackToSavePoint().IsNotFound()); + return Status::NotFound(); } } @@ -331,7 +296,8 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family, // If the key has been previous validated at a sequence number earlier // than the curent snapshot's sequence number, we already know it has not // been modified. - bool already_validated = iter->second <= snapshot_->GetSequenceNumber(); + SequenceNumber seq = snapshot_->snapshot()->GetSequenceNumber(); + bool already_validated = iter->second <= seq; if (!already_validated) { s = CheckKeySequence(column_family, key); @@ -339,7 +305,7 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family, if (s.ok()) { // Record that there have been no writes to this key after this // sequence. - iter->second = snapshot_->GetSequenceNumber(); + iter->second = seq; } else { // Failed to validate key if (!previously_locked) { @@ -369,7 +335,7 @@ Status TransactionImpl::CheckKeySequence(ColumnFamilyHandle* column_family, result = TransactionUtil::CheckKeyForConflicts( db_impl, cfh, key.ToString(), - snapshot_->GetSequenceNumber()); + snapshot_->snapshot()->GetSequenceNumber()); } return result; @@ -457,7 +423,6 @@ Status TransactionImpl::Put(ColumnFamilyHandle* column_family, const Slice& key, if (s.ok()) { write_batch_->Put(column_family, key, value); - num_entries_++; } return s; @@ -469,7 +434,6 @@ Status TransactionImpl::Put(ColumnFamilyHandle* column_family, if (s.ok()) { write_batch_->Put(column_family, key, value); - num_entries_++; } return s; @@ -481,7 +445,6 @@ Status TransactionImpl::Merge(ColumnFamilyHandle* column_family, if (s.ok()) { write_batch_->Merge(column_family, key, value); - num_entries_++; } return s; @@ -493,7 +456,6 @@ Status TransactionImpl::Delete(ColumnFamilyHandle* column_family, if (s.ok()) { write_batch_->Delete(column_family, key); - num_entries_++; } return s; @@ -505,7 +467,6 @@ Status TransactionImpl::Delete(ColumnFamilyHandle* column_family, if (s.ok()) { write_batch_->Delete(column_family, key); - num_entries_++; } return s; @@ -525,7 +486,6 @@ Status TransactionImpl::PutUntracked(ColumnFamilyHandle* column_family, if (s.ok()) { write_batch_->Put(column_family, key, value); - num_entries_++; } return s; @@ -539,7 +499,6 @@ Status TransactionImpl::PutUntracked(ColumnFamilyHandle* column_family, if (s.ok()) { write_batch_->Put(column_family, key, value); - num_entries_++; } return s; @@ -552,7 +511,6 @@ Status TransactionImpl::MergeUntracked(ColumnFamilyHandle* column_family, if (s.ok()) { write_batch_->Merge(column_family, key, value); - num_entries_++; } return s; @@ -565,7 +523,6 @@ Status TransactionImpl::DeleteUntracked(ColumnFamilyHandle* column_family, if (s.ok()) { write_batch_->Delete(column_family, key); - num_entries_++; } return s; @@ -578,7 +535,6 @@ Status TransactionImpl::DeleteUntracked(ColumnFamilyHandle* column_family, if (s.ok()) { write_batch_->Delete(column_family, key); - num_entries_++; } return s; @@ -586,7 +542,6 @@ Status TransactionImpl::DeleteUntracked(ColumnFamilyHandle* column_family, void TransactionImpl::PutLogData(const Slice& blob) { write_batch_->PutLogData(blob); - num_entries_++; } WriteBatchWithIndex* TransactionImpl::GetWriteBatch() { diff --git a/utilities/transactions/transaction_impl.h b/utilities/transactions/transaction_impl.h index e670f0fc32411a87e6902b5d1743545a13a44799..06d5903e2331ad5c9be67c9b323d754b8c09b0e1 100644 --- a/utilities/transactions/transaction_impl.h +++ b/utilities/transactions/transaction_impl.h @@ -16,6 +16,7 @@ #include "db/write_callback.h" #include "rocksdb/db.h" #include "rocksdb/slice.h" +#include "rocksdb/snapshot.h" #include "rocksdb/status.h" #include "rocksdb/types.h" #include "rocksdb/utilities/transaction.h" @@ -44,7 +45,7 @@ class TransactionImpl : public Transaction { void SetSavePoint() override; - void RollbackToSavePoint() override; + Status RollbackToSavePoint() override; Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value) override; @@ -151,7 +152,9 @@ class TransactionImpl : public Transaction { void PutLogData(const Slice& blob) override; - const Snapshot* GetSnapshot() const override { return snapshot_; } + const Snapshot* GetSnapshot() const override { + return snapshot_ ? snapshot_->snapshot() : nullptr; + } void SetSnapshot() override; @@ -190,7 +193,7 @@ class TransactionImpl : public Transaction { // If snapshot_ is set, all keys that locked must also have not been written // since this snapshot - const Snapshot* snapshot_; + std::shared_ptr snapshot_; const Comparator* cmp_; @@ -214,12 +217,9 @@ class TransactionImpl : public Transaction { // stored. TransactionKeyMap tracked_keys_; - // Records the number of entries currently in the WriteBatch include calls to - // PutLogData() - size_t num_entries_ = 0; - - // Stack of number of entries in write_batch at each save point - std::unique_ptr> save_points_; + // Stack of the Snapshot saved at each save point. Saved snapshots may be + // nullptr if there was no snapshot at the time SetSavePoint() was called. + std::unique_ptr>> save_points_; Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, bool check_snapshot = true); diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 828462dcdcea80e620c44c20b3fc7dd3a89082ea..0be31716e88f5b408a9849812b285dbda0ccd948 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -1299,12 +1299,14 @@ TEST_F(TransactionTest, SavepointTest) { Transaction* txn = db->BeginTransaction(write_options); ASSERT_TRUE(txn); - txn->RollbackToSavePoint(); + s = txn->RollbackToSavePoint(); + ASSERT_TRUE(s.IsNotFound()); txn->SetSavePoint(); // 1 - txn->RollbackToSavePoint(); // Rollback to beginning of txn - txn->RollbackToSavePoint(); + ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to beginning of txn + s = txn->RollbackToSavePoint(); + ASSERT_TRUE(s.IsNotFound()); s = txn->Put("B", "b"); ASSERT_OK(s); @@ -1340,7 +1342,7 @@ TEST_F(TransactionTest, SavepointTest) { s = txn->Put("D", "d"); ASSERT_OK(s); - txn->RollbackToSavePoint(); // Rollback to 2 + ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 2 s = txn->Get(read_options, "A", &value); ASSERT_OK(s); @@ -1363,7 +1365,10 @@ TEST_F(TransactionTest, SavepointTest) { s = txn->Put("E", "e"); ASSERT_OK(s); - txn->RollbackToSavePoint(); // Rollback to beginning of txn + // Rollback to beginning of txn + s = txn->RollbackToSavePoint(); + ASSERT_TRUE(s.IsNotFound()); + txn->Rollback(); s = txn->Get(read_options, "A", &value); ASSERT_TRUE(s.IsNotFound()); @@ -1409,7 +1414,7 @@ TEST_F(TransactionTest, SavepointTest) { s = txn->Get(read_options, "B", &value); ASSERT_TRUE(s.IsNotFound()); - txn->RollbackToSavePoint(); // Rollback to 3 + ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 3 s = txn->Get(read_options, "F", &value); ASSERT_OK(s); diff --git a/utilities/transactions/transaction_util.cc b/utilities/transactions/transaction_util.cc index a84133e229325340a958a2c596845bb8d843681e..699e8fd830b4b0404bc960c783457b615c02a502 100644 --- a/utilities/transactions/transaction_util.cc +++ b/utilities/transactions/transaction_util.cc @@ -141,125 +141,6 @@ Status TransactionUtil::CheckKeysForConflicts(DBImpl* db_impl, return result; } -Status TransactionUtil::CopyFirstN(size_t num, WriteBatchWithIndex* batch, - WriteBatchWithIndex* new_batch, - DBImpl* db_impl) { - // Handler for iterating through batch and copying entries to new_batch - class Handler : public WriteBatch::Handler { - public: - WriteBatchWithIndex* batch; - const size_t limit; - DBImpl* db_impl; - size_t seen = 0; - std::unordered_map super_versions; - std::unordered_map handles; - - Handler(WriteBatchWithIndex* dest, size_t new_limit, DBImpl* db) - : batch(dest), limit(new_limit), db_impl(db) {} - - ~Handler() { - for (auto& iter : super_versions) { - db_impl->ReturnAndCleanupSuperVersionUnlocked(iter.first, iter.second); - } - } - - Status GetColumnFamily(uint32_t column_family_id, - ColumnFamilyHandle** cfh) { - // Need to look up ColumnFamilyHandle for this column family id. Since - // doing this requires grabbing a mutex, lets only do it once per column - // family and cache it. - // In order to ensure that the ColumnFamilyHandle is still valid, we need - // to hold the superversion. - const auto& iter = handles.find(column_family_id); - if (iter == handles.end()) { - // Don't have ColumnFamilyHandle cached, look it up from the db. - SuperVersion* sv = - db_impl->GetAndRefSuperVersionUnlocked(column_family_id); - if (sv == nullptr) { - return Status::InvalidArgument( - "Could not find column family for ID " + - ToString(column_family_id)); - } - super_versions.insert({column_family_id, sv}); - - *cfh = db_impl->GetColumnFamilyHandleUnlocked(column_family_id); - if (*cfh == nullptr) { - return Status::InvalidArgument( - "Could not find column family handle for ID " + - ToString(column_family_id)); - } - handles.insert({column_family_id, *cfh}); - } else { - *cfh = iter->second; - } - - return Status::OK(); - } - - virtual Status PutCF(uint32_t column_family_id, const Slice& key, - const Slice& value) override { - if (seen >= limit) { - // Found the first N entries, return Aborted to stop the Iteration. - return Status::Aborted(); - } - ColumnFamilyHandle* cfh = nullptr; - Status s = GetColumnFamily(column_family_id, &cfh); - if (s.ok()) { - batch->Put(cfh, key, value); - } - seen++; - return s; - } - virtual Status MergeCF(uint32_t column_family_id, const Slice& key, - const Slice& value) override { - if (seen >= limit) { - // Found the first N entries, return Aborted to stop the Iteration. - return Status::Aborted(); - } - ColumnFamilyHandle* cfh = nullptr; - Status s = GetColumnFamily(column_family_id, &cfh); - if (s.ok()) { - batch->Merge(cfh, key, value); - } - seen++; - return s; - } - virtual Status DeleteCF(uint32_t column_family_id, - const Slice& key) override { - if (seen >= limit) { - // Found the first N entries, return Aborted to stop the Iteration. - return Status::Aborted(); - } - ColumnFamilyHandle* cfh = nullptr; - Status s = GetColumnFamily(column_family_id, &cfh); - if (s.ok()) { - batch->Delete(cfh, key); - } - seen++; - return s; - } - - virtual void LogData(const Slice& blob) override { - if (seen < limit) { - batch->PutLogData(blob); - } - seen++; - } - }; - - // Iterating on this handler will add all keys in this batch into a new batch - // up to - // the limit. - Handler handler(new_batch, num, db_impl); - Status s = batch->GetWriteBatch()->Iterate(&handler); - - if (s.IsAborted()) { - // Handler returns Aborted when it is done copying to stop the iteration. - s = Status::OK(); - } - - return s; -} } // namespace rocksdb diff --git a/utilities/transactions/transaction_util.h b/utilities/transactions/transaction_util.h index 21f69a022c80b60462607e08492525777fa151cc..620dbdbecbcfac46a9d4b59b3574b5a58df8d53f 100644 --- a/utilities/transactions/transaction_util.h +++ b/utilities/transactions/transaction_util.h @@ -48,12 +48,6 @@ class TransactionUtil { // mutex is held. static Status CheckKeysForConflicts(DBImpl* db_impl, TransactionKeyMap* keys); - // Copies the first num entries from batch into new_batch (including Put, - // Merge, Delete, and PutLogData). - // Returns non-OK on error. - static Status CopyFirstN(size_t num, WriteBatchWithIndex* batch, - WriteBatchWithIndex* new_batch, DBImpl* db_impl); - private: static Status CheckKey(DBImpl* db_impl, SuperVersion* sv, SequenceNumber earliest_seq, SequenceNumber key_seq,