diff --git a/db/db_impl.cc b/db/db_impl.cc index 9837ed3b4a0562dcd5c7e648ef74549e092dc85f..8d99ad1801fb5acad9d6d688924006be7f5a0a31 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3427,6 +3427,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } Status status; + bool callback_failed = false; + bool xfunc_attempted_write = false; XFUNC_TEST("transaction", "transaction_xftest_write_impl", xf_transaction_write1, xf_transaction_write, write_options, @@ -3580,6 +3582,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // If this write has a validation callback, check to see if this write // is able to be written. Must be called on the write thread. status = callback->Callback(this); + callback_failed = true; } } else { mutex_.Unlock(); @@ -3686,7 +3689,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, mutex_.Lock(); } - if (db_options_.paranoid_checks && !status.ok() && !status.IsTimedOut() && + if (db_options_.paranoid_checks && !status.ok() && !callback_failed && !status.IsBusy() && bg_error_.ok()) { bg_error_ = status; // stop compaction & fail any further writes } diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index 888f212665e39fe11ccb5cc7ef9ff3ddeea37fb8..76e19069ccddd0912dcd4698911ee1d9c68e41fc 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -84,6 +84,14 @@ class Status { static Status TimedOut(const Slice& msg, const Slice& msg2 = Slice()) { return Status(kTimedOut, msg, msg2); } + static Status Expired() { return Status(kExpired); } + static Status Expired(const Slice& msg, const Slice& msg2 = Slice()) { + return Status(kExpired, msg, msg2); + } + static Status TryAgain() { return Status(kTryAgain); } + static Status TryAgain(const Slice& msg, const Slice& msg2 = Slice()) { + return Status(kTryAgain, msg, msg2); + } // Returns true iff the status indicates success. bool ok() const { return code() == kOk; } @@ -120,6 +128,14 @@ class Status { // temporarily could not be acquired. bool IsBusy() const { return code() == kBusy; } + // Returns true iff the status indicated that the operation has Expired. + bool IsExpired() const { return code() == kExpired; } + + // Returns true iff the status indicates a TryAgain error. + // This usually means that the operation failed, but may succeed if + // re-attempted. + bool IsTryAgain() const { return code() == kTryAgain; } + // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. std::string ToString() const; @@ -137,6 +153,8 @@ class Status { kTimedOut = 9, kAborted = 10, kBusy = 11, + kExpired = 12, + kTryAgain = 13 }; Code code() const { diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index 86345efeda7b9552034565737e5a774d6339bfcb..9b726822738e18493ec7f00f9194dc5ca4938f8f 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -75,9 +75,11 @@ class Transaction { // // If this transaction was created by an OptimisticTransactionDB(), // Status::Busy() may be returned if the transaction could not guarantee - // that there are no write conflicts. + // that there are no write conflicts. Status::TryAgain() may be returned + // if the memtable history size is not large enough + // (See max_write_buffer_number_to_maintain). // - // If this transaction was created by a TransactionDB(), Status::TimedOut() + // If this transaction was created by a TransactionDB(), Status::Expired() // may be returned if this transaction has lived for longer than // TransactionOptions.expiration. virtual Status Commit() = 0; @@ -136,10 +138,17 @@ class Transaction { // still ensure that this key cannot be written to by outside of this // transaction. // - // If this transaction was created by a TransactionDB, Status::Busy() may be - // returned. // If this transaction was created by an OptimisticTransaction, GetForUpdate() - // could cause commit() to later return Status::Busy(). + // could cause commit() to fail. Otherwise, it could return any error + // that could be returned by DB::Get(). + // + // If this transaction was created by a TransactionDB, it can return + // Status::OK() on success, + // Status::Busy() if there is a write conflict, + // Status::TimedOut() if a lock could not be acquired, + // Status::TryAgain() if the memtable history size is not large enough + // (See max_write_buffer_number_to_maintain) + // or other errors if this key could not be read. virtual Status GetForUpdate(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value) = 0; @@ -184,8 +193,15 @@ class Transaction { // // If this Transaction was created on an OptimisticTransactionDB, these // functions should always return Status::OK(). - // If this Transaction was created on a TransactionDB, the functions can - // return Status::Busy() if they could not acquire a lock. + // + // If this Transaction was created on a TransactionDB, the status returned + // can be: + // Status::OK() on success, + // Status::Busy() if there is a write conflict, + // Status::TimedOut() if a lock could not be acquired, + // Status::TryAgain() if the memtable history size is not large enough + // (See max_write_buffer_number_to_maintain) + // or other errors on unexpected failures. virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) = 0; virtual Status Put(const Slice& key, const Slice& value) = 0; @@ -211,8 +227,7 @@ class Transaction { // // If this Transaction was created on a TransactionDB, this function will // still acquire locks necessary to make sure this write doesn't cause - // conflicts in - // other transactions and may return Status::Busy(). + // conflicts in other transactions and may return Status::Busy(). virtual Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) = 0; virtual Status PutUntracked(const Slice& key, const Slice& value) = 0; diff --git a/util/status.cc b/util/status.cc index 3fe292dd384e43e7f2de14c989e9e047017bafe7..3935dc561f75cc805e63424d32bc1a9f9524293a 100644 --- a/util/status.cc +++ b/util/status.cc @@ -76,6 +76,12 @@ std::string Status::ToString() const { case kBusy: type = "Resource busy: "; break; + case kExpired: + type = "Operation expired: "; + break; + case kTryAgain: + type = "Operation failed. Try again.: "; + break; default: snprintf(tmp, sizeof(tmp), "Unknown code(%d): ", static_cast(code())); diff --git a/utilities/transactions/optimistic_transaction_test.cc b/utilities/transactions/optimistic_transaction_test.cc index 09b2ee1d6f8797143bd7d9506a8f4b1fb190b3b4..114fbb811bba57b38700d7b9b3b648460ee8c612 100644 --- a/utilities/transactions/optimistic_transaction_test.cc +++ b/utilities/transactions/optimistic_transaction_test.cc @@ -91,7 +91,7 @@ TEST_F(OptimisticTransactionTest, WriteConflictTest) { ASSERT_EQ(value, "barz"); s = txn->Commit(); - ASSERT_NOK(s); // Txn should not commit + ASSERT_TRUE(s.IsBusy()); // Txn should not commit // Verify that transaction did not write anything db->Get(read_options, "foo", &value); @@ -126,7 +126,7 @@ TEST_F(OptimisticTransactionTest, WriteConflictTest2) { ASSERT_EQ(value, "barz"); s = txn->Commit(); - ASSERT_NOK(s); // Txn should not commit + ASSERT_TRUE(s.IsBusy()); // Txn should not commit // Verify that transaction did not write anything db->Get(read_options, "foo", &value); @@ -165,7 +165,7 @@ TEST_F(OptimisticTransactionTest, ReadConflictTest) { ASSERT_EQ(value, "barz"); s = txn->Commit(); - ASSERT_NOK(s); // Txn should not commit + ASSERT_TRUE(s.IsBusy()); // Txn should not commit // Verify that transaction did not write anything txn->GetForUpdate(read_options, "foo", &value); @@ -283,7 +283,7 @@ TEST_F(OptimisticTransactionTest, FlushTest2) { s = txn->Commit(); // txn should not commit since MemTableList History is not large enough - ASSERT_NOK(s); + ASSERT_TRUE(s.IsTryAgain()); db->Get(read_options, "foo", &value); ASSERT_EQ(value, "bar"); @@ -422,7 +422,7 @@ TEST_F(OptimisticTransactionTest, MultipleSnapshotTest) { txn2->Put("ZZZ", "xxxxx"); s = txn2->Commit(); - ASSERT_NOK(s); + ASSERT_TRUE(s.IsBusy()); delete txn2; } @@ -510,7 +510,7 @@ TEST_F(OptimisticTransactionTest, ColumnFamiliesTest) { // Verify txn did not commit s = txn2->Commit(); - ASSERT_NOK(s); + ASSERT_TRUE(s.IsBusy()); s = db->Get(read_options, handles[1], "AAAZZZ", &value); ASSERT_EQ(value, "barbar"); @@ -565,7 +565,7 @@ TEST_F(OptimisticTransactionTest, ColumnFamiliesTest) { // Verify Txn Did not Commit s = txn2->Commit(); - ASSERT_NOK(s); + ASSERT_TRUE(s.IsBusy()); s = db->DropColumnFamily(handles[1]); ASSERT_OK(s); @@ -613,7 +613,7 @@ TEST_F(OptimisticTransactionTest, EmptyTest) { s = db->Put(write_options, "aaa", "xxx"); s = txn->Commit(); - ASSERT_NOK(s); + ASSERT_TRUE(s.IsBusy()); delete txn; } @@ -651,7 +651,7 @@ TEST_F(OptimisticTransactionTest, PredicateManyPreceders) { // should not commit since txn2 wrote a key txn has read s = txn1->Commit(); - ASSERT_NOK(s); + ASSERT_TRUE(s.IsBusy()); delete txn1; delete txn2; @@ -675,7 +675,7 @@ TEST_F(OptimisticTransactionTest, PredicateManyPreceders) { // txn2 cannot commit since txn1 changed "4" s = txn2->Commit(); - ASSERT_NOK(s); + ASSERT_TRUE(s.IsBusy()); delete txn1; delete txn2; @@ -701,7 +701,7 @@ TEST_F(OptimisticTransactionTest, LostUpdate) { ASSERT_OK(s); s = txn2->Commit(); - ASSERT_NOK(s); + ASSERT_TRUE(s.IsBusy()); delete txn1; delete txn2; @@ -720,7 +720,7 @@ TEST_F(OptimisticTransactionTest, LostUpdate) { ASSERT_OK(s); s = txn2->Commit(); - ASSERT_NOK(s); + ASSERT_TRUE(s.IsBusy()); delete txn1; delete txn2; @@ -737,7 +737,7 @@ TEST_F(OptimisticTransactionTest, LostUpdate) { txn2->Put("1", "6"); s = txn2->Commit(); - ASSERT_NOK(s); + ASSERT_TRUE(s.IsBusy()); delete txn1; delete txn2; @@ -822,7 +822,7 @@ TEST_F(OptimisticTransactionTest, UntrackedWrites) { s = db->Delete(write_options, "tracked"); s = txn->Commit(); - ASSERT_NOK(s); + ASSERT_TRUE(s.IsBusy()); s = db->Get(read_options, "untracked", &value); ASSERT_TRUE(s.IsNotFound()); diff --git a/utilities/transactions/transaction_impl.h b/utilities/transactions/transaction_impl.h index c30c9f1b79ab6b73694d3c39e6671bc0c5fc209d..e670f0fc32411a87e6902b5d1743545a13a44799 100644 --- a/utilities/transactions/transaction_impl.h +++ b/utilities/transactions/transaction_impl.h @@ -248,7 +248,7 @@ class TransactionCallback : public WriteCallback { Status Callback(DB* db) override { if (txn_->IsExpired()) { - return Status::TimedOut(); + return Status::Expired(); } else { return Status::OK(); } diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/transaction_lock_mgr.cc index b6cc9eb79a58ab69188afa7216ec28ade8b9c58a..c7a327202aed835ec52ec0deba8e59152cb7d63f 100644 --- a/utilities/transactions/transaction_lock_mgr.cc +++ b/utilities/transactions/transaction_lock_mgr.cc @@ -232,7 +232,7 @@ Status TransactionLockMgr::AcquireWithTimeout(LockMap* lock_map, if (!locked) { // timeout acquiring mutex - return Status::Busy(); + return Status::TimedOut("Timeout Acquiring Mutex"); } // Acquire lock if we are able to @@ -240,7 +240,7 @@ Status TransactionLockMgr::AcquireWithTimeout(LockMap* lock_map, Status result = AcquireLocked(lock_map, stripe, key, env, lock_info, &wait_time_us); - if (result.IsBusy() && timeout != 0) { + if (!result.ok() && timeout != 0) { // If we weren't able to acquire the lock, we will keep retrying as long // as the // timeout allows. @@ -282,7 +282,7 @@ Status TransactionLockMgr::AcquireWithTimeout(LockMap* lock_map, result = AcquireLocked(lock_map, stripe, key, env, lock_info, &wait_time_us); - } while (result.IsBusy() && !timed_out); + } while (!result.ok() && !timed_out); } stripe->stripe_mutex.unlock(); @@ -313,7 +313,7 @@ Status TransactionLockMgr::AcquireLocked(LockMap* lock_map, lock_info.expiration_time = txn_lock_info.expiration_time; // lock_cnt does not change } else { - result = Status::Busy(); + result = Status::TimedOut("lock held"); } } } else { // Lock not held. diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 8aef74ffd4669f4a108aeb15708892f09952f7c4..828462dcdcea80e620c44c20b3fc7dd3a89082ea 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -96,7 +96,7 @@ TEST_F(TransactionTest, WriteConflictTest) { // This Put outside of a transaction will conflict with the previous write s = db->Put(write_options, "foo", "xxx"); - ASSERT_NOK(s); + ASSERT_TRUE(s.IsTimedOut()); s = db->Get(read_options, "foo", &value); ASSERT_EQ(value, "A"); @@ -134,7 +134,7 @@ TEST_F(TransactionTest, WriteConflictTest2) { s = txn->Put("foo", "bar2"); // Conflicts with write done after snapshot taken - ASSERT_NOK(s); + ASSERT_TRUE(s.IsBusy()); s = txn->Put("foo3", "Y"); ASSERT_OK(s); @@ -180,7 +180,7 @@ TEST_F(TransactionTest, ReadConflictTest) { // This Put outside of a transaction will conflict with the previous read s = db->Put(write_options, "foo", "barz"); - ASSERT_NOK(s); + ASSERT_TRUE(s.IsTimedOut()); s = db->Get(read_options, "foo", &value); ASSERT_EQ(value, "bar"); @@ -305,8 +305,8 @@ TEST_F(TransactionTest, FlushTest2) { db->Flush(flush_ops); s = txn->Put("X", "Y"); - ASSERT_NOK(s); // Put should fail since MemTableList History is not older - // than snapshot. + // Put should fail since MemTableList History is not older than the snapshot. + ASSERT_TRUE(s.IsTryAgain()); s = txn->Commit(); ASSERT_OK(s); @@ -456,7 +456,7 @@ TEST_F(TransactionTest, MultipleSnapshotTest) { // This will conflict since the snapshot is earlier than another write to ZZZ s = txn2->Put("ZZZ", "xxxxx"); - ASSERT_NOK(s); + ASSERT_TRUE(s.IsBusy()); s = txn2->Commit(); ASSERT_OK(s); @@ -554,7 +554,7 @@ TEST_F(TransactionTest, ColumnFamiliesTest) { // This write will cause a conflict with the earlier batch write s = txn2->Put(handles[1], SliceParts(key_slices, 3), SliceParts(&value_slice, 1)); - ASSERT_NOK(s); + ASSERT_TRUE(s.IsBusy()); s = txn2->Commit(); ASSERT_OK(s); @@ -608,10 +608,10 @@ TEST_F(TransactionTest, ColumnFamiliesTest) { results = txn2->MultiGetForUpdate(snapshot_read_options, multiget_cfh, multiget_keys, &values); // All results should fail since there was a conflict - ASSERT_NOK(results[0]); - ASSERT_NOK(results[1]); - ASSERT_NOK(results[2]); - ASSERT_NOK(results[3]); + ASSERT_TRUE(results[0].IsBusy()); + ASSERT_TRUE(results[1].IsBusy()); + ASSERT_TRUE(results[2].IsBusy()); + ASSERT_TRUE(results[3].IsBusy()); s = db->Get(read_options, handles[2], "foo", &value); ASSERT_EQ(value, "000"); @@ -661,7 +661,7 @@ TEST_F(TransactionTest, ColumnFamiliesTest2) { ASSERT_OK(s); s = txn2->Put(one, "X", "11"); - ASSERT_TRUE(s.IsBusy()); + ASSERT_TRUE(s.IsTimedOut()); s = txn1->Commit(); ASSERT_OK(s); @@ -743,7 +743,7 @@ TEST_F(TransactionTest, EmptyTest) { // Conflicts with previous GetForUpdate s = db->Put(write_options, "aaa", "xxx"); - ASSERT_NOK(s); + ASSERT_TRUE(s.IsTimedOut()); // transaction expired! s = txn->Commit(); @@ -774,7 +774,7 @@ TEST_F(TransactionTest, PredicateManyPreceders) { ASSERT_TRUE(results[1].IsNotFound()); s = txn2->Put("2", "x"); // Conflict's with txn1's MultiGetForUpdate - ASSERT_NOK(s); + ASSERT_TRUE(s.IsTimedOut()); txn2->Rollback(); @@ -799,7 +799,7 @@ TEST_F(TransactionTest, PredicateManyPreceders) { ASSERT_OK(s); s = txn2->Delete("4"); // conflict - ASSERT_NOK(s); + ASSERT_TRUE(s.IsTimedOut()); s = txn1->Commit(); ASSERT_OK(s); @@ -830,7 +830,7 @@ TEST_F(TransactionTest, LostUpdate) { ASSERT_OK(s); s = txn2->Put("1", "2"); // conflict - ASSERT_NOK(s); + ASSERT_TRUE(s.IsTimedOut()); s = txn2->Commit(); ASSERT_OK(s); @@ -855,7 +855,7 @@ TEST_F(TransactionTest, LostUpdate) { s = txn1->Put("1", "3"); ASSERT_OK(s); s = txn2->Put("1", "4"); // conflict - ASSERT_NOK(s); + ASSERT_TRUE(s.IsTimedOut()); s = txn1->Commit(); ASSERT_OK(s); @@ -883,7 +883,7 @@ TEST_F(TransactionTest, LostUpdate) { ASSERT_OK(s); s = txn2->Put("1", "6"); - ASSERT_NOK(s); + ASSERT_TRUE(s.IsBusy()); s = txn2->Commit(); ASSERT_OK(s); @@ -972,7 +972,7 @@ TEST_F(TransactionTest, UntrackedWrites) { // Conflict s = txn->Put("untracked", "3"); - ASSERT_NOK(s); + ASSERT_TRUE(s.IsBusy()); s = txn->Commit(); ASSERT_OK(s); @@ -1017,7 +1017,7 @@ TEST_F(TransactionTest, ExpiredTransaction) { // txn1 should fail to commit since it is expired s = txn1->Commit(); - ASSERT_TRUE(s.IsTimedOut()); + ASSERT_TRUE(s.IsExpired()); s = db->Get(read_options, "Y", &value); ASSERT_TRUE(s.IsNotFound()); @@ -1047,7 +1047,7 @@ TEST_F(TransactionTest, Rollback) { // txn2 should not be able to write to X since txn1 has it locked s = txn2->Put("X", "2"); - ASSERT_TRUE(s.IsBusy()); + ASSERT_TRUE(s.IsTimedOut()); txn1->Rollback(); delete txn1; @@ -1117,9 +1117,9 @@ TEST_F(TransactionTest, LockLimitTest) { Transaction* txn2 = db->BeginTransaction(write_options); ASSERT_TRUE(txn2); - // lock limit reached + // "X" currently locked s = txn2->Put("X", "x"); - ASSERT_TRUE(s.IsBusy()); + ASSERT_TRUE(s.IsTimedOut()); // lock limit reached s = txn2->Put("M", "m"); @@ -1483,7 +1483,7 @@ TEST_F(TransactionTest, TimeoutTest) { ASSERT_OK(s); s = txn1->Commit(); - ASSERT_NOK(s); // expired! + ASSERT_TRUE(s.IsExpired()); // expired! s = db->Get(read_options, "aaa", &value); ASSERT_OK(s); @@ -1542,9 +1542,9 @@ TEST_F(TransactionTest, TimeoutTest) { s = txn1->Commit(); ASSERT_OK(s); - // txn2 should be timed out since txn1 waiting until its timeout expired. + // txn2 should be expired out since txn1 waiting until its timeout expired. s = txn2->Commit(); - ASSERT_TRUE(s.IsTimedOut()); + ASSERT_TRUE(s.IsExpired()); delete txn1; delete txn2; @@ -1558,7 +1558,7 @@ TEST_F(TransactionTest, TimeoutTest) { // txn2 has a smaller lock timeout than txn1's expiration, so it will time out s = txn2->Delete("asdf"); - ASSERT_TRUE(s.IsBusy()); + ASSERT_TRUE(s.IsTimedOut()); s = txn1->Commit(); ASSERT_OK(s); diff --git a/utilities/transactions/transaction_util.cc b/utilities/transactions/transaction_util.cc index 086d650ae66f496bb59aa0a720b47d380f13521c..a84133e229325340a958a2c596845bb8d843681e 100644 --- a/utilities/transactions/transaction_util.cc +++ b/utilities/transactions/transaction_util.cc @@ -33,8 +33,8 @@ Status TransactionUtil::CheckKeyForConflicts(DBImpl* db_impl, SuperVersion* sv = db_impl->GetAndRefSuperVersion(cfd); if (sv == nullptr) { - result = Status::Busy("Could not access column family " + - cfh->GetName()); + result = Status::InvalidArgument("Could not access column family " + + cfh->GetName()); } if (result.ok()) { @@ -66,7 +66,7 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, // the // Memtable should have a valid earliest sequence number except in some // corner cases (such as error cases during recovery). - result = Status::Busy( + result = Status::TryAgain( "Transaction ould not check for conflicts as the MemTable does not " "countain a long enough history to check write at SequenceNumber: ", ToString(key_seq)); @@ -85,13 +85,14 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, "frequency " "of this error.", key_seq, earliest_seq); - result = Status::Busy(msg); + result = Status::TryAgain(msg); } else { SequenceNumber seq = kMaxSequenceNumber; Status s = db_impl->GetLatestSequenceForKeyFromMemtable(sv, key, &seq); if (!s.ok()) { result = s; } else if (seq != kMaxSequenceNumber && seq > key_seq) { + // Write Conflict result = Status::Busy(); } } @@ -109,8 +110,8 @@ Status TransactionUtil::CheckKeysForConflicts(DBImpl* db_impl, SuperVersion* sv = db_impl->GetAndRefSuperVersion(cf_id); if (sv == nullptr) { - result = - Status::Busy("Could not access column family " + ToString(cf_id)); + result = Status::InvalidArgument("Could not access column family " + + ToString(cf_id)); break; }