提交 fe93bf9b 编写于 作者: A agiardullo

Transaction::UndoGetForUpdate

Summary: MyRocks wants to be able to un-lock a key that was just locked by GetForUpdate().  To do this safely, I am now keeping track of the number of reads(for update) and writes for each key in a transaction.  UndoGetForUpdate() will only unlock a key if it hasn't been written and the read count reaches 0.

Test Plan: more unit tests

Reviewers: igor, rven, yhchiang, spetrunia, sdong

Reviewed By: spetrunia, sdong

Subscribers: spetrunia, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D47043
上级 2608219c
......@@ -356,6 +356,28 @@ class Transaction {
// Reset the WriteOptions that will be used during Commit().
virtual void SetWriteOptions(const WriteOptions& write_options) = 0;
// If this key was previously fetched in this transaction using
// GetForUpdate/MultigetForUpdate(), calling UndoGetForUpdate will tell
// the transaction that it no longer needs to do any conflict checking
// for this key.
//
// If a key has been fetched N times via GetForUpdate/MultigetForUpdate(),
// then UndoGetForUpdate will only have an effect if it is also called N
// times. If this key has been written to in this transaction,
// UndoGetForUpdate() will have no effect.
//
// If SetSavePoint() has been called after the GetForUpdate(),
// UndoGetForUpdate() will not have any effect.
//
// If this Transaction was created by an OptimisticTransactionDB,
// calling UndoGetForUpdate can affect whether this key is conflict checked
// at commit time.
// If this Transaction was created by a TransactionDB,
// calling UndoGetForUpdate may release any held locks for this key.
virtual void UndoGetForUpdate(ColumnFamilyHandle* column_family,
const Slice& key) = 0;
virtual void UndoGetForUpdate(const Slice& key) = 0;
protected:
explicit Transaction(const TransactionDB* db) {}
Transaction() {}
......
......@@ -67,7 +67,8 @@ void OptimisticTransactionImpl::Rollback() { Clear(); }
// Record this key so that we can check it for conflicts at commit time.
Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family,
const Slice& key, bool untracked) {
const Slice& key, bool read_only,
bool untracked) {
if (untracked) {
return Status::OK();
}
......@@ -84,7 +85,7 @@ Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family,
std::string key_str = key.ToString();
TrackKey(cfh_id, key_str, seq);
TrackKey(cfh_id, key_str, seq, read_only);
// Always return OK. Confilct checking will happen at commit time.
return Status::OK();
......
......@@ -40,7 +40,7 @@ class OptimisticTransactionImpl : public TransactionBaseImpl {
protected:
Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
bool untracked = false) override;
bool read_only, bool untracked = false) override;
private:
OptimisticTransactionDB* const txn_db_;
......@@ -56,6 +56,11 @@ class OptimisticTransactionImpl : public TransactionBaseImpl {
void Clear() override;
void UnlockGetForUpdate(ColumnFamilyHandle* column_family,
const Slice& key) override {
// Nothing to unlock.
}
// No copying allowed
OptimisticTransactionImpl(const OptimisticTransactionImpl&);
void operator=(const OptimisticTransactionImpl&);
......
......@@ -1114,6 +1114,159 @@ TEST_F(OptimisticTransactionTest, SavepointTest) {
delete txn;
}
TEST_F(OptimisticTransactionTest, UndoGetForUpdateTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
OptimisticTransactionOptions txn_options;
string value;
Status s;
db->Put(write_options, "A", "");
Transaction* txn1 = txn_db->BeginTransaction(write_options);
ASSERT_TRUE(txn1);
s = txn1->GetForUpdate(read_options, "A", &value);
ASSERT_OK(s);
txn1->UndoGetForUpdate("A");
Transaction* txn2 = txn_db->BeginTransaction(write_options);
txn2->Put("A", "x");
s = txn2->Commit();
ASSERT_OK(s);
delete txn2;
// Verify that txn1 can commit since A isn't conflict checked
s = txn1->Commit();
ASSERT_OK(s);
delete txn1;
txn1 = txn_db->BeginTransaction(write_options);
txn1->Put("A", "a");
s = txn1->GetForUpdate(read_options, "A", &value);
ASSERT_OK(s);
txn1->UndoGetForUpdate("A");
txn2 = txn_db->BeginTransaction(write_options);
txn2->Put("A", "x");
s = txn2->Commit();
ASSERT_OK(s);
delete txn2;
// Verify that txn1 cannot commit since A will still be conflict checked
s = txn1->Commit();
ASSERT_TRUE(s.IsBusy());
delete txn1;
txn1 = txn_db->BeginTransaction(write_options);
s = txn1->GetForUpdate(read_options, "A", &value);
ASSERT_OK(s);
s = txn1->GetForUpdate(read_options, "A", &value);
ASSERT_OK(s);
txn1->UndoGetForUpdate("A");
txn2 = txn_db->BeginTransaction(write_options);
txn2->Put("A", "x");
s = txn2->Commit();
ASSERT_OK(s);
delete txn2;
// Verify that txn1 cannot commit since A will still be conflict checked
s = txn1->Commit();
ASSERT_TRUE(s.IsBusy());
delete txn1;
txn1 = txn_db->BeginTransaction(write_options);
s = txn1->GetForUpdate(read_options, "A", &value);
ASSERT_OK(s);
s = txn1->GetForUpdate(read_options, "A", &value);
ASSERT_OK(s);
txn1->UndoGetForUpdate("A");
txn1->UndoGetForUpdate("A");
txn2 = txn_db->BeginTransaction(write_options);
txn2->Put("A", "x");
s = txn2->Commit();
ASSERT_OK(s);
delete txn2;
// Verify that txn1 can commit since A isn't conflict checked
s = txn1->Commit();
ASSERT_OK(s);
delete txn1;
txn1 = txn_db->BeginTransaction(write_options);
s = txn1->GetForUpdate(read_options, "A", &value);
ASSERT_OK(s);
txn1->SetSavePoint();
txn1->UndoGetForUpdate("A");
txn2 = txn_db->BeginTransaction(write_options);
txn2->Put("A", "x");
s = txn2->Commit();
ASSERT_OK(s);
delete txn2;
// Verify that txn1 cannot commit since A will still be conflict checked
s = txn1->Commit();
ASSERT_TRUE(s.IsBusy());
delete txn1;
txn1 = txn_db->BeginTransaction(write_options);
s = txn1->GetForUpdate(read_options, "A", &value);
ASSERT_OK(s);
txn1->SetSavePoint();
s = txn1->GetForUpdate(read_options, "A", &value);
ASSERT_OK(s);
txn1->UndoGetForUpdate("A");
txn2 = txn_db->BeginTransaction(write_options);
txn2->Put("A", "x");
s = txn2->Commit();
ASSERT_OK(s);
delete txn2;
// Verify that txn1 cannot commit since A will still be conflict checked
s = txn1->Commit();
ASSERT_TRUE(s.IsBusy());
delete txn1;
txn1 = txn_db->BeginTransaction(write_options);
s = txn1->GetForUpdate(read_options, "A", &value);
ASSERT_OK(s);
txn1->SetSavePoint();
s = txn1->GetForUpdate(read_options, "A", &value);
ASSERT_OK(s);
txn1->UndoGetForUpdate("A");
txn1->RollbackToSavePoint();
txn1->UndoGetForUpdate("A");
txn2 = txn_db->BeginTransaction(write_options);
txn2->Put("A", "x");
s = txn2->Commit();
ASSERT_OK(s);
delete txn2;
// Verify that txn1 can commit since A isn't conflict checked
s = txn1->Commit();
ASSERT_OK(s);
delete txn1;
}
} // namespace rocksdb
int main(int argc, char** argv) {
......
......@@ -66,7 +66,8 @@ void TransactionBaseImpl::SetSnapshotIfNeeded() {
}
Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family,
const SliceParts& key, bool untracked) {
const SliceParts& key, bool read_only,
bool untracked) {
size_t key_size = 0;
for (int i = 0; i < key.num_parts; ++i) {
key_size += key.parts[i].size();
......@@ -79,7 +80,7 @@ Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family,
str.append(key.parts[i].data(), key.parts[i].size());
}
return TryLock(column_family, str, untracked);
return TryLock(column_family, str, read_only, untracked);
}
void TransactionBaseImpl::SetSavePoint() {
......@@ -107,15 +108,35 @@ Status TransactionBaseImpl::RollbackToSavePoint() {
assert(s.ok());
// Rollback any keys that were tracked since the last savepoint
const TransactionKeyMap* key_map = GetTrackedKeysSinceSavePoint();
assert(key_map);
for (auto& key_map_iter : *key_map) {
const TransactionKeyMap& key_map = save_point.new_keys_;
for (const auto& key_map_iter : key_map) {
uint32_t column_family_id = key_map_iter.first;
auto& keys = key_map_iter.second;
for (auto& key_iter : keys) {
auto& cf_tracked_keys = tracked_keys_[column_family_id];
for (const auto& key_iter : keys) {
const std::string& key = key_iter.first;
tracked_keys_[column_family_id].erase(key);
uint32_t num_reads = key_iter.second.num_reads;
uint32_t num_writes = key_iter.second.num_writes;
auto tracked_keys_iter = cf_tracked_keys.find(key);
assert(tracked_keys_iter != cf_tracked_keys.end());
// Decrement the total reads/writes of this key by the number of
// reads/writes done since the last SavePoint.
if (num_reads > 0) {
assert(tracked_keys_iter->second.num_reads >= num_reads);
tracked_keys_iter->second.num_reads -= num_reads;
}
if (num_writes > 0) {
assert(tracked_keys_iter->second.num_writes >= num_writes);
tracked_keys_iter->second.num_writes -= num_writes;
}
if (tracked_keys_iter->second.num_reads == 0 &&
tracked_keys_iter->second.num_writes == 0) {
tracked_keys_[column_family_id].erase(tracked_keys_iter);
}
}
}
......@@ -138,7 +159,7 @@ Status TransactionBaseImpl::Get(const ReadOptions& read_options,
Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) {
Status s = TryLock(column_family, key);
Status s = TryLock(column_family, key, true /* read_only */);
if (s.ok() && value != nullptr) {
s = Get(read_options, column_family, key, value);
......@@ -172,7 +193,7 @@ std::vector<Status> TransactionBaseImpl::MultiGetForUpdate(
// Lock all keys
for (size_t i = 0; i < num_keys; ++i) {
Status s = TryLock(column_family[i], keys[i]);
Status s = TryLock(column_family[i], keys[i], true /* read_only */);
if (!s.ok()) {
// Fail entire multiget if we cannot lock all keys
return std::vector<Status>(num_keys, s);
......@@ -206,7 +227,7 @@ Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options,
Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
Status s = TryLock(column_family, key);
Status s = TryLock(column_family, key, false /* read_only */);
if (s.ok()) {
GetBatchForWrite()->Put(column_family, key, value);
......@@ -219,7 +240,7 @@ Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
const SliceParts& key,
const SliceParts& value) {
Status s = TryLock(column_family, key);
Status s = TryLock(column_family, key, false /* read_only */);
if (s.ok()) {
GetBatchForWrite()->Put(column_family, key, value);
......@@ -231,7 +252,7 @@ Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
Status s = TryLock(column_family, key);
Status s = TryLock(column_family, key, false /* read_only */);
if (s.ok()) {
GetBatchForWrite()->Merge(column_family, key, value);
......@@ -243,7 +264,7 @@ Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
const Slice& key) {
Status s = TryLock(column_family, key);
Status s = TryLock(column_family, key, false /* read_only */);
if (s.ok()) {
GetBatchForWrite()->Delete(column_family, key);
......@@ -255,7 +276,7 @@ Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) {
Status s = TryLock(column_family, key);
Status s = TryLock(column_family, key, false /* read_only */);
if (s.ok()) {
GetBatchForWrite()->Delete(column_family, key);
......@@ -267,7 +288,7 @@ Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key) {
Status s = TryLock(column_family, key);
Status s = TryLock(column_family, key, false /* read_only */);
if (s.ok()) {
GetBatchForWrite()->SingleDelete(column_family, key);
......@@ -279,7 +300,7 @@ Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key) {
Status s = TryLock(column_family, key);
Status s = TryLock(column_family, key, false /* read_only */);
if (s.ok()) {
GetBatchForWrite()->SingleDelete(column_family, key);
......@@ -291,8 +312,8 @@ Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
bool untracked = true;
Status s = TryLock(column_family, key, untracked);
Status s =
TryLock(column_family, key, false /* read_only */, true /* untracked */);
if (s.ok()) {
GetBatchForWrite()->Put(column_family, key, value);
......@@ -305,8 +326,8 @@ Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
const SliceParts& key,
const SliceParts& value) {
bool untracked = true;
Status s = TryLock(column_family, key, untracked);
Status s =
TryLock(column_family, key, false /* read_only */, true /* untracked */);
if (s.ok()) {
GetBatchForWrite()->Put(column_family, key, value);
......@@ -319,8 +340,8 @@ Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family,
const Slice& key,
const Slice& value) {
bool untracked = true;
Status s = TryLock(column_family, key, untracked);
Status s =
TryLock(column_family, key, false /* read_only */, true /* untracked */);
if (s.ok()) {
GetBatchForWrite()->Merge(column_family, key, value);
......@@ -332,8 +353,8 @@ Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
const Slice& key) {
bool untracked = true;
Status s = TryLock(column_family, key, untracked);
Status s =
TryLock(column_family, key, false /* read_only */, true /* untracked */);
if (s.ok()) {
GetBatchForWrite()->Delete(column_family, key);
......@@ -345,8 +366,8 @@ Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
const SliceParts& key) {
bool untracked = true;
Status s = TryLock(column_family, key, untracked);
Status s =
TryLock(column_family, key, false /* read_only */, true /* untracked */);
if (s.ok()) {
GetBatchForWrite()->Delete(column_family, key);
......@@ -387,26 +408,73 @@ uint64_t TransactionBaseImpl::GetNumKeys() const {
}
void TransactionBaseImpl::TrackKey(uint32_t cfh_id, const std::string& key,
SequenceNumber seq) {
auto iter = tracked_keys_[cfh_id].find(key);
if (iter == tracked_keys_[cfh_id].end()) {
tracked_keys_[cfh_id].insert({key, seq});
if (save_points_ != nullptr && !save_points_->empty()) {
// Aren't tracking this key, add it.
save_points_->top().new_keys_[cfh_id][key] = seq;
}
} else if (seq < iter->second) {
SequenceNumber seq, bool read_only) {
// Update map of all tracked keys for this transaction
TrackKey(&tracked_keys_, cfh_id, key, seq, read_only);
if (save_points_ != nullptr && !save_points_->empty()) {
// Update map of tracked keys in this SavePoint
TrackKey(&save_points_->top().new_keys_, cfh_id, key, seq, read_only);
}
}
// Add a key to the given TransactionKeyMap
void TransactionBaseImpl::TrackKey(TransactionKeyMap* key_map, uint32_t cfh_id,
const std::string& key, SequenceNumber seq,
bool read_only) {
auto& cf_key_map = (*key_map)[cfh_id];
auto iter = cf_key_map.find(key);
if (iter == cf_key_map.end()) {
auto result = cf_key_map.insert({key, TransactionKeyMapInfo(seq)});
iter = result.first;
} else if (seq < iter->second.seq) {
// Now tracking this key with an earlier sequence number
iter->second = seq;
iter->second.seq = seq;
}
if (read_only) {
iter->second.num_reads++;
} else {
iter->second.num_writes++;
}
}
const TransactionKeyMap* TransactionBaseImpl::GetTrackedKeysSinceSavePoint() {
std::unique_ptr<TransactionKeyMap>
TransactionBaseImpl::GetTrackedKeysSinceSavePoint() {
if (save_points_ != nullptr && !save_points_->empty()) {
return &save_points_->top().new_keys_;
// Examine the number of reads/writes performed on all keys written
// since the last SavePoint and compare to the total number of reads/writes
// for each key.
TransactionKeyMap* result = new TransactionKeyMap();
for (const auto& key_map_iter : save_points_->top().new_keys_) {
uint32_t column_family_id = key_map_iter.first;
auto& keys = key_map_iter.second;
auto& cf_tracked_keys = tracked_keys_[column_family_id];
for (const auto& key_iter : keys) {
const std::string& key = key_iter.first;
uint32_t num_reads = key_iter.second.num_reads;
uint32_t num_writes = key_iter.second.num_writes;
auto total_key_info = cf_tracked_keys.find(key);
assert(total_key_info != cf_tracked_keys.end());
assert(total_key_info->second.num_reads >= num_reads);
assert(total_key_info->second.num_writes >= num_writes);
if (total_key_info->second.num_reads == num_reads &&
total_key_info->second.num_writes == num_writes) {
// All the reads/writes to this key were done in the last savepoint.
bool read_only = (num_writes == 0);
TrackKey(result, column_family_id, key, key_iter.second.seq,
read_only);
}
}
}
return std::unique_ptr<TransactionKeyMap>(result);
}
// No SavePoint
return nullptr;
}
......@@ -428,6 +496,60 @@ void TransactionBaseImpl::ReleaseSnapshot(const Snapshot* snapshot, DB* db) {
db->ReleaseSnapshot(snapshot);
}
void TransactionBaseImpl::UndoGetForUpdate(ColumnFamilyHandle* column_family,
const Slice& key) {
uint32_t column_family_id = GetColumnFamilyID(column_family);
auto& cf_tracked_keys = tracked_keys_[column_family_id];
std::string key_str = key.ToString();
bool can_decrement = false;
bool can_unlock = false;
if (save_points_ != nullptr && !save_points_->empty()) {
// Check if this key was fetched ForUpdate in this SavePoint
auto& cf_savepoint_keys = save_points_->top().new_keys_[column_family_id];
auto savepoint_iter = cf_savepoint_keys.find(key_str);
if (savepoint_iter != cf_savepoint_keys.end()) {
if (savepoint_iter->second.num_reads > 0) {
savepoint_iter->second.num_reads--;
can_decrement = true;
if (savepoint_iter->second.num_reads == 0 &&
savepoint_iter->second.num_writes == 0) {
// No other GetForUpdates or write on this key in this SavePoint
cf_savepoint_keys.erase(savepoint_iter);
can_unlock = true;
}
}
}
} else {
// No SavePoint set
can_decrement = true;
can_unlock = true;
}
// We can only decrement the read count for this key if we were able to
// decrement the read count in the current SavePoint, OR if there is no
// SavePoint set.
if (can_decrement) {
auto key_iter = cf_tracked_keys.find(key_str);
if (key_iter != cf_tracked_keys.end()) {
if (key_iter->second.num_reads > 0) {
key_iter->second.num_reads--;
if (key_iter->second.num_reads == 0 &&
key_iter->second.num_writes == 0) {
// No other GetForUpdates or writes on this key
assert(can_unlock);
cf_tracked_keys.erase(key_iter);
UnlockGetForUpdate(column_family, key);
}
}
}
}
}
} // namespace rocksdb
#endif // ROCKSDB_LITE
......@@ -37,7 +37,7 @@ class TransactionBaseImpl : public Transaction {
// untracked will be true if called from PutUntracked, DeleteUntracked, or
// MergeUntracked.
virtual Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
bool untracked = false) = 0;
bool read_only, bool untracked = false) = 0;
void SetSavePoint() override;
......@@ -192,6 +192,12 @@ class TransactionBaseImpl : public Transaction {
uint64_t GetNumKeys() const override;
void UndoGetForUpdate(ColumnFamilyHandle* column_family,
const Slice& key) override;
void UndoGetForUpdate(const Slice& key) override {
return UndoGetForUpdate(nullptr, key);
};
// Get list of keys in this transaction that must not have any conflicts
// with writes in other transactions.
const TransactionKeyMap& GetTrackedKeys() const { return tracked_keys_; }
......@@ -207,10 +213,22 @@ class TransactionBaseImpl : public Transaction {
protected:
// Add a key to the list of tracked keys.
//
// seqno is the earliest seqno this key was involved with this transaction.
void TrackKey(uint32_t cfh_id, const std::string& key, SequenceNumber seqno);
// readonly should be set to true if no data was written for this key
void TrackKey(uint32_t cfh_id, const std::string& key, SequenceNumber seqno,
bool readonly);
// Helper function to add a key to the given TransactionKeyMap
static void TrackKey(TransactionKeyMap* key_map, uint32_t cfh_id,
const std::string& key, SequenceNumber seqno,
bool readonly);
// Called when UndoGetForUpdate determines that this key can be unlocked.
virtual void UnlockGetForUpdate(ColumnFamilyHandle* column_family,
const Slice& key) = 0;
const TransactionKeyMap* GetTrackedKeysSinceSavePoint();
std::unique_ptr<TransactionKeyMap> GetTrackedKeysSinceSavePoint();
// Sets a snapshot if SetSnapshotOnNextOperation() has been called.
void SetSnapshotIfNeeded();
......@@ -285,7 +303,7 @@ class TransactionBaseImpl : public Transaction {
std::shared_ptr<TransactionNotifier> snapshot_notifier_ = nullptr;
Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key,
bool untracked = false);
bool read_only, bool untracked = false);
WriteBatchBase* GetBatchForWrite();
};
......
......@@ -143,9 +143,11 @@ void TransactionImpl::Rollback() { Clear(); }
Status TransactionImpl::RollbackToSavePoint() {
// Unlock any keys locked since last transaction
const TransactionKeyMap* keys = GetTrackedKeysSinceSavePoint();
const std::unique_ptr<TransactionKeyMap>& keys =
GetTrackedKeysSinceSavePoint();
if (keys) {
txn_db_impl_->UnLock(this, keys);
txn_db_impl_->UnLock(this, keys.get());
}
return TransactionBaseImpl::RollbackToSavePoint();
......@@ -210,7 +212,8 @@ Status TransactionImpl::LockBatch(WriteBatch* batch,
if (!s.ok()) {
break;
}
(*keys_to_unlock)[cfh_id].insert({std::move(key), kMaxSequenceNumber});
TrackKey(keys_to_unlock, cfh_id, std::move(key), kMaxSequenceNumber,
false);
}
if (!s.ok()) {
......@@ -231,7 +234,8 @@ Status TransactionImpl::LockBatch(WriteBatch* batch,
// this key will only be locked if there have been no writes to this key since
// the snapshot time.
Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
const Slice& key, bool untracked) {
const Slice& key, bool read_only,
bool untracked) {
uint32_t cfh_id = GetColumnFamilyID(column_family);
std::string key_str = key.ToString();
bool previously_locked;
......@@ -251,7 +255,7 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
previously_locked = false;
} else {
previously_locked = true;
current_seqno = iter->second;
current_seqno = iter->second.seq;
}
}
......@@ -298,7 +302,7 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
if (s.ok()) {
// Let base class know we've conflict checked this key.
TrackKey(cfh_id, key_str, new_seqno);
TrackKey(cfh_id, key_str, new_seqno, read_only);
}
return s;
......@@ -340,6 +344,11 @@ bool TransactionImpl::TryStealingLocks() {
LOCKS_STOLEN);
}
void TransactionImpl::UnlockGetForUpdate(ColumnFamilyHandle* column_family,
const Slice& key) {
txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString());
}
} // namespace rocksdb
#endif // ROCKSDB_LITE
......@@ -71,7 +71,7 @@ class TransactionImpl : public TransactionBaseImpl {
protected:
Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
bool untracked = false) override;
bool read_only, bool untracked = false) override;
private:
enum ExecutionStatus { STARTED, COMMITTING, LOCKS_STOLEN };
......@@ -105,6 +105,9 @@ class TransactionImpl : public TransactionBaseImpl {
void RollbackLastN(size_t num);
void UnlockGetForUpdate(ColumnFamilyHandle* column_family,
const Slice& key) override;
// No copying allowed
TransactionImpl(const TransactionImpl&);
void operator=(const TransactionImpl&);
......
......@@ -1717,9 +1717,8 @@ TEST_F(TransactionTest, SavepointTest) {
TEST_F(TransactionTest, SavepointTest2) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
ReadOptions read_options;
TransactionOptions txn_options;
string value;
Status s;
txn_options.lock_timeout = 1; // 1 ms
......@@ -1814,6 +1813,356 @@ TEST_F(TransactionTest, SavepointTest2) {
delete txn2;
}
TEST_F(TransactionTest, UndoGetForUpdateTest) {
WriteOptions write_options;
ReadOptions read_options;
TransactionOptions txn_options;
string value;
Status s;
txn_options.lock_timeout = 1; // 1 ms
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txn1);
txn1->UndoGetForUpdate("A");
s = txn1->Commit();
ASSERT_OK(s);
delete txn1;
txn1 = db->BeginTransaction(write_options, txn_options);
txn1->UndoGetForUpdate("A");
s = txn1->GetForUpdate(read_options, "A", &value);
ASSERT_TRUE(s.IsNotFound());
// Verify that A is locked
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
s = txn2->Put("A", "a");
ASSERT_TRUE(s.IsTimedOut());
txn1->UndoGetForUpdate("A");
// Verify that A is now unlocked
s = txn2->Put("A", "a2");
ASSERT_OK(s);
txn2->Commit();
delete txn2;
s = db->Get(read_options, "A", &value);
ASSERT_OK(s);
ASSERT_EQ("a2", value);
s = txn1->Delete("A");
ASSERT_OK(s);
s = txn1->GetForUpdate(read_options, "A", &value);
ASSERT_TRUE(s.IsNotFound());
s = txn1->Put("B", "b3");
ASSERT_OK(s);
s = txn1->GetForUpdate(read_options, "B", &value);
ASSERT_OK(s);
txn1->UndoGetForUpdate("A");
txn1->UndoGetForUpdate("B");
// Verify that A and B are still locked
txn2 = db->BeginTransaction(write_options, txn_options);
s = txn2->Put("A", "a4");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("B", "b4");
ASSERT_TRUE(s.IsTimedOut());
txn1->Rollback();
delete txn1;
// Verify that A and B are no longer locked
s = txn2->Put("A", "a5");
ASSERT_OK(s);
s = txn2->Put("B", "b5");
ASSERT_OK(s);
s = txn2->Commit();
delete txn2;
ASSERT_OK(s);
txn1 = db->BeginTransaction(write_options, txn_options);
s = txn1->GetForUpdate(read_options, "A", &value);
ASSERT_OK(s);
s = txn1->GetForUpdate(read_options, "A", &value);
ASSERT_OK(s);
s = txn1->GetForUpdate(read_options, "C", &value);
ASSERT_TRUE(s.IsNotFound());
s = txn1->GetForUpdate(read_options, "A", &value);
ASSERT_OK(s);
s = txn1->GetForUpdate(read_options, "C", &value);
ASSERT_TRUE(s.IsNotFound());
s = txn1->GetForUpdate(read_options, "B", &value);
ASSERT_OK(s);
s = txn1->Put("B", "b5");
s = txn1->GetForUpdate(read_options, "B", &value);
ASSERT_OK(s);
txn1->UndoGetForUpdate("A");
txn1->UndoGetForUpdate("B");
txn1->UndoGetForUpdate("C");
txn1->UndoGetForUpdate("X");
// Verify A,B,C are locked
txn2 = db->BeginTransaction(write_options, txn_options);
s = txn2->Put("A", "a6");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Delete("B");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("C", "c6");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("X", "x6");
ASSERT_OK(s);
txn1->UndoGetForUpdate("A");
txn1->UndoGetForUpdate("B");
txn1->UndoGetForUpdate("C");
txn1->UndoGetForUpdate("X");
// Verify A,B are locked and C is not
s = txn2->Put("A", "a6");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Delete("B");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("C", "c6");
ASSERT_OK(s);
s = txn2->Put("X", "x6");
ASSERT_OK(s);
txn1->UndoGetForUpdate("A");
txn1->UndoGetForUpdate("B");
txn1->UndoGetForUpdate("C");
txn1->UndoGetForUpdate("X");
// Verify B is locked and A and C are not
s = txn2->Put("A", "a7");
ASSERT_OK(s);
s = txn2->Delete("B");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("C", "c7");
ASSERT_OK(s);
s = txn2->Put("X", "x7");
ASSERT_OK(s);
s = txn2->Commit();
ASSERT_OK(s);
delete txn2;
s = txn1->Commit();
ASSERT_OK(s);
delete txn1;
}
TEST_F(TransactionTest, UndoGetForUpdateTest2) {
WriteOptions write_options;
ReadOptions read_options;
TransactionOptions txn_options;
string value;
Status s;
s = db->Put(write_options, "A", "");
ASSERT_OK(s);
txn_options.lock_timeout = 1; // 1 ms
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txn1);
s = txn1->GetForUpdate(read_options, "A", &value);
ASSERT_OK(s);
s = txn1->GetForUpdate(read_options, "B", &value);
ASSERT_TRUE(s.IsNotFound());
s = txn1->Put("F", "f");
ASSERT_OK(s);
txn1->SetSavePoint(); // 1
txn1->UndoGetForUpdate("A");
s = txn1->GetForUpdate(read_options, "C", &value);
ASSERT_TRUE(s.IsNotFound());
s = txn1->GetForUpdate(read_options, "D", &value);
ASSERT_TRUE(s.IsNotFound());
s = txn1->Put("E", "e");
ASSERT_OK(s);
s = txn1->GetForUpdate(read_options, "E", &value);
ASSERT_OK(s);
s = txn1->GetForUpdate(read_options, "F", &value);
ASSERT_OK(s);
// Verify A,B,C,D,E,F are still locked
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
s = txn2->Put("A", "a1");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("B", "b1");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("C", "c1");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("D", "d1");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("E", "e1");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("F", "f1");
ASSERT_TRUE(s.IsTimedOut());
txn1->UndoGetForUpdate("C");
txn1->UndoGetForUpdate("E");
// Verify A,B,D,E,F are still locked and C is not.
s = txn2->Put("A", "a2");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("B", "b2");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("D", "d2");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("E", "e2");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("F", "f2");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("C", "c2");
ASSERT_OK(s);
txn1->SetSavePoint(); // 2
s = txn1->Put("H", "h");
ASSERT_OK(s);
txn1->UndoGetForUpdate("A");
txn1->UndoGetForUpdate("B");
txn1->UndoGetForUpdate("C");
txn1->UndoGetForUpdate("D");
txn1->UndoGetForUpdate("E");
txn1->UndoGetForUpdate("F");
txn1->UndoGetForUpdate("G");
txn1->UndoGetForUpdate("H");
// Verify A,B,D,E,F,H are still locked and C,G are not.
s = txn2->Put("A", "a3");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("B", "b3");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("D", "d3");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("E", "e3");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("F", "f3");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("H", "h3");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("C", "c3");
ASSERT_OK(s);
s = txn2->Put("G", "g3");
ASSERT_OK(s);
txn1->RollbackToSavePoint(); // rollback to 2
// Verify A,B,D,E,F are still locked and C,G,H are not.
s = txn2->Put("A", "a3");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("B", "b3");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("D", "d3");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("E", "e3");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("F", "f3");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("C", "c3");
ASSERT_OK(s);
s = txn2->Put("G", "g3");
ASSERT_OK(s);
s = txn2->Put("H", "h3");
ASSERT_OK(s);
txn1->UndoGetForUpdate("A");
txn1->UndoGetForUpdate("B");
txn1->UndoGetForUpdate("C");
txn1->UndoGetForUpdate("D");
txn1->UndoGetForUpdate("E");
txn1->UndoGetForUpdate("F");
txn1->UndoGetForUpdate("G");
txn1->UndoGetForUpdate("H");
// Verify A,B,E,F are still locked and C,D,G,H are not.
s = txn2->Put("A", "a3");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("B", "b3");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("E", "e3");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("F", "f3");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("C", "c3");
ASSERT_OK(s);
s = txn2->Put("D", "d3");
ASSERT_OK(s);
s = txn2->Put("G", "g3");
ASSERT_OK(s);
s = txn2->Put("H", "h3");
ASSERT_OK(s);
txn1->RollbackToSavePoint(); // rollback to 1
// Verify A,B,F are still locked and C,D,E,G,H are not.
s = txn2->Put("A", "a3");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("B", "b3");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("F", "f3");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("C", "c3");
ASSERT_OK(s);
s = txn2->Put("D", "d3");
ASSERT_OK(s);
s = txn2->Put("E", "e3");
ASSERT_OK(s);
s = txn2->Put("G", "g3");
ASSERT_OK(s);
s = txn2->Put("H", "h3");
ASSERT_OK(s);
txn1->UndoGetForUpdate("A");
txn1->UndoGetForUpdate("B");
txn1->UndoGetForUpdate("C");
txn1->UndoGetForUpdate("D");
txn1->UndoGetForUpdate("E");
txn1->UndoGetForUpdate("F");
txn1->UndoGetForUpdate("G");
txn1->UndoGetForUpdate("H");
// Verify F is still locked and A,B,C,D,E,G,H are not.
s = txn2->Put("F", "f3");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Put("A", "a3");
ASSERT_OK(s);
s = txn2->Put("B", "b3");
ASSERT_OK(s);
s = txn2->Put("C", "c3");
ASSERT_OK(s);
s = txn2->Put("D", "d3");
ASSERT_OK(s);
s = txn2->Put("E", "e3");
ASSERT_OK(s);
s = txn2->Put("G", "g3");
ASSERT_OK(s);
s = txn2->Put("H", "h3");
ASSERT_OK(s);
s = txn1->Commit();
ASSERT_OK(s);
s = txn2->Commit();
ASSERT_OK(s);
delete txn1;
delete txn2;
}
TEST_F(TransactionTest, TimeoutTest) {
WriteOptions write_options;
ReadOptions read_options;
......
......@@ -137,7 +137,7 @@ Status TransactionUtil::CheckKeysForConflicts(DBImpl* db_impl,
// written to this key since the start of the transaction.
for (const auto& key_iter : keys) {
const auto& key = key_iter.first;
const SequenceNumber key_seq = key_iter.second;
const SequenceNumber key_seq = key_iter.second.seq;
result = CheckKey(db_impl, sv, earliest_seq, key_seq, key, cache_only);
......
......@@ -17,9 +17,20 @@
namespace rocksdb {
struct TransactionKeyMapInfo {
// Earliest sequence number that is relevant to this transaction for this key
SequenceNumber seq;
uint32_t num_writes;
uint32_t num_reads;
explicit TransactionKeyMapInfo(SequenceNumber seq_no)
: seq(seq_no), num_writes(0), num_reads(0) {}
};
using TransactionKeyMap =
std::unordered_map<uint32_t,
std::unordered_map<std::string, SequenceNumber>>;
std::unordered_map<std::string, TransactionKeyMapInfo>>;
class DBImpl;
struct SuperVersion;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册