提交 2005c88a 编写于 作者: M Manuel Ung 提交者: Facebook Github Bot

Implement non-exclusive locks

Summary:
This is an implementation of non-exclusive locks for pessimistic transactions. It is relatively simple and does not prevent starvation (ie. it's possible that request for exclusive access will never be granted if there are always threads holding shared access). It is done by changing `KeyLockInfo` to hold an set a transaction ids, instead of just one, and adding a flag specifying whether this lock is currently held with exclusive access or not.

Some implementation notes:
- Some lock diagnostic functions had to be updated to return a set of transaction ids for a given lock, eg. `GetWaitingTxn` and `GetLockStatusData`.
- Deadlock detection is a bit more complicated since a transaction can now wait on multiple other transactions. A BFS is done in this case, and deadlock detection depth is now just a limit on the number of transactions we visit.
- Expirable transactions do not work efficiently with shared locks at the moment, but that's okay for now.
Closes https://github.com/facebook/rocksdb/pull/1573

Differential Revision: D4239097

Pulled By: lth

fbshipit-source-id: da7c074
上级 0b0f2357
......@@ -209,10 +209,11 @@ class Transaction {
// or other errors if this key could not be read.
virtual Status GetForUpdate(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) = 0;
const Slice& key, std::string* value,
bool exclusive = true) = 0;
virtual Status GetForUpdate(const ReadOptions& options, const Slice& key,
std::string* value) = 0;
std::string* value, bool exclusive = true) = 0;
virtual std::vector<Status> MultiGetForUpdate(
const ReadOptions& options,
......@@ -401,10 +402,10 @@ class Transaction {
virtual bool IsDeadlockDetect() const { return false; }
virtual TransactionID GetWaitingTxn(uint32_t* column_family_id,
const std::string** key) const {
virtual std::vector<TransactionID> GetWaitingTxns(uint32_t* column_family_id,
std::string* key) const {
assert(false);
return 0;
return std::vector<TransactionID>();
}
enum TransactionState {
......
......@@ -104,7 +104,8 @@ struct TransactionOptions {
struct KeyLockInfo {
std::string key;
TransactionID id;
std::vector<TransactionID> ids;
bool exclusive;
};
class TransactionDB : public StackableDB {
......
......@@ -283,11 +283,6 @@ class autovector {
autovector& operator=(const autovector& other) { return assign(other); }
// move operation are disallowed since it is very hard to make sure both
// autovectors are allocated from the same function stack.
autovector& operator=(autovector&& other) = delete;
autovector(autovector&& other) = delete;
// -- Iterator Operations
iterator begin() { return iterator(this, 0); }
......
......@@ -86,9 +86,11 @@ Status OptimisticTransactionImpl::Rollback() {
}
// Record this key so that we can check it for conflicts at commit time.
//
// 'exclusive' is unused for OptimisticTransaction.
Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family,
const Slice& key, bool read_only,
bool untracked) {
bool exclusive, bool untracked) {
if (untracked) {
return Status::OK();
}
......
......@@ -48,7 +48,8 @@ class OptimisticTransactionImpl : public TransactionBaseImpl {
protected:
Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
bool read_only, bool untracked = false) override;
bool read_only, bool exclusive,
bool untracked = false) override;
private:
OptimisticTransactionDB* const txn_db_;
......
......@@ -96,7 +96,7 @@ void TransactionBaseImpl::SetSnapshotIfNeeded() {
Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family,
const SliceParts& key, bool read_only,
bool untracked) {
bool exclusive, bool untracked) {
size_t key_size = 0;
for (int i = 0; i < key.num_parts; ++i) {
key_size += key.parts[i].size();
......@@ -109,7 +109,7 @@ Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family,
str.append(key.parts[i].data(), key.parts[i].size());
}
return TryLock(column_family, str, read_only, untracked);
return TryLock(column_family, str, read_only, exclusive, untracked);
}
void TransactionBaseImpl::SetSavePoint() {
......@@ -187,8 +187,9 @@ Status TransactionBaseImpl::Get(const ReadOptions& read_options,
Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) {
Status s = TryLock(column_family, key, true /* read_only */);
const Slice& key, std::string* value,
bool exclusive) {
Status s = TryLock(column_family, key, true /* read_only */, exclusive);
if (s.ok() && value != nullptr) {
s = Get(read_options, column_family, key, value);
......@@ -222,7 +223,8 @@ std::vector<Status> TransactionBaseImpl::MultiGetForUpdate(
// Lock all keys
for (size_t i = 0; i < num_keys; ++i) {
Status s = TryLock(column_family[i], keys[i], true /* read_only */);
Status s = TryLock(column_family[i], keys[i], true /* read_only */,
true /* exclusive */);
if (!s.ok()) {
// Fail entire multiget if we cannot lock all keys
return std::vector<Status>(num_keys, s);
......@@ -256,7 +258,8 @@ Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options,
Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
Status s = TryLock(column_family, key, false /* read_only */);
Status s =
TryLock(column_family, key, false /* read_only */, true /* exclusive */);
if (s.ok()) {
GetBatchForWrite()->Put(column_family, key, value);
......@@ -269,7 +272,8 @@ Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
const SliceParts& key,
const SliceParts& value) {
Status s = TryLock(column_family, key, false /* read_only */);
Status s =
TryLock(column_family, key, false /* read_only */, true /* exclusive */);
if (s.ok()) {
GetBatchForWrite()->Put(column_family, key, value);
......@@ -281,7 +285,8 @@ Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
Status s = TryLock(column_family, key, false /* read_only */);
Status s =
TryLock(column_family, key, false /* read_only */, true /* exclusive */);
if (s.ok()) {
GetBatchForWrite()->Merge(column_family, key, value);
......@@ -293,7 +298,8 @@ Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
const Slice& key) {
Status s = TryLock(column_family, key, false /* read_only */);
Status s =
TryLock(column_family, key, false /* read_only */, true /* exclusive */);
if (s.ok()) {
GetBatchForWrite()->Delete(column_family, key);
......@@ -305,7 +311,8 @@ Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) {
Status s = TryLock(column_family, key, false /* read_only */);
Status s =
TryLock(column_family, key, false /* read_only */, true /* exclusive */);
if (s.ok()) {
GetBatchForWrite()->Delete(column_family, key);
......@@ -317,7 +324,8 @@ Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key) {
Status s = TryLock(column_family, key, false /* read_only */);
Status s =
TryLock(column_family, key, false /* read_only */, true /* exclusive */);
if (s.ok()) {
GetBatchForWrite()->SingleDelete(column_family, key);
......@@ -329,7 +337,8 @@ Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key) {
Status s = TryLock(column_family, key, false /* read_only */);
Status s =
TryLock(column_family, key, false /* read_only */, true /* exclusive */);
if (s.ok()) {
GetBatchForWrite()->SingleDelete(column_family, key);
......@@ -341,8 +350,8 @@ Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
Status s =
TryLock(column_family, key, false /* read_only */, true /* untracked */);
Status s = TryLock(column_family, key, false /* read_only */,
true /* exclusive */, true /* untracked */);
if (s.ok()) {
GetBatchForWrite()->Put(column_family, key, value);
......@@ -355,8 +364,8 @@ Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
const SliceParts& key,
const SliceParts& value) {
Status s =
TryLock(column_family, key, false /* read_only */, true /* untracked */);
Status s = TryLock(column_family, key, false /* read_only */,
true /* exclusive */, true /* untracked */);
if (s.ok()) {
GetBatchForWrite()->Put(column_family, key, value);
......@@ -369,8 +378,8 @@ Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family,
const Slice& key,
const Slice& value) {
Status s =
TryLock(column_family, key, false /* read_only */, true /* untracked */);
Status s = TryLock(column_family, key, false /* read_only */,
true /* exclusive */, true /* untracked */);
if (s.ok()) {
GetBatchForWrite()->Merge(column_family, key, value);
......@@ -382,8 +391,8 @@ Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
const Slice& key) {
Status s =
TryLock(column_family, key, false /* read_only */, true /* untracked */);
Status s = TryLock(column_family, key, false /* read_only */,
true /* exclusive */, true /* untracked */);
if (s.ok()) {
GetBatchForWrite()->Delete(column_family, key);
......@@ -395,8 +404,8 @@ Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
const SliceParts& key) {
Status s =
TryLock(column_family, key, false /* read_only */, true /* untracked */);
Status s = TryLock(column_family, key, false /* read_only */,
true /* exclusive */, true /* untracked */);
if (s.ok()) {
GetBatchForWrite()->Delete(column_family, key);
......
......@@ -39,7 +39,8 @@ class TransactionBaseImpl : public Transaction {
// untracked will be true if called from PutUntracked, DeleteUntracked, or
// MergeUntracked.
virtual Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
bool read_only, bool untracked = false) = 0;
bool read_only, bool exclusive,
bool untracked = false) = 0;
void SetSavePoint() override;
......@@ -55,11 +56,12 @@ class TransactionBaseImpl : public Transaction {
Status GetForUpdate(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) override;
std::string* value, bool exclusive) override;
Status GetForUpdate(const ReadOptions& options, const Slice& key,
std::string* value) override {
return GetForUpdate(options, db_->DefaultColumnFamily(), key, value);
std::string* value, bool exclusive) override {
return GetForUpdate(options, db_->DefaultColumnFamily(), key, value,
exclusive);
}
std::vector<Status> MultiGet(
......@@ -315,7 +317,7 @@ class TransactionBaseImpl : public Transaction {
std::shared_ptr<TransactionNotifier> snapshot_notifier_ = nullptr;
Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key,
bool read_only, bool untracked = false);
bool read_only, bool exclusive, bool untracked = false);
WriteBatchBase* GetBatchForWrite();
......
......@@ -266,8 +266,8 @@ Status TransactionDBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
}
Status TransactionDBImpl::TryLock(TransactionImpl* txn, uint32_t cfh_id,
const std::string& key) {
return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv());
const std::string& key, bool exclusive) {
return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive);
}
void TransactionDBImpl::UnLock(TransactionImpl* txn,
......
......@@ -63,7 +63,8 @@ class TransactionDBImpl : public TransactionDB {
using StackableDB::DropColumnFamily;
virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override;
Status TryLock(TransactionImpl* txn, uint32_t cfh_id, const std::string& key);
Status TryLock(TransactionImpl* txn, uint32_t cfh_id, const std::string& key,
bool exclusive);
void UnLock(TransactionImpl* txn, const TransactionKeyMap* keys);
void UnLock(TransactionImpl* txn, uint32_t cfh_id, const std::string& key);
......
......@@ -40,7 +40,6 @@ TransactionImpl::TransactionImpl(TransactionDB* txn_db,
: TransactionBaseImpl(txn_db->GetRootDB(), write_options),
txn_db_impl_(nullptr),
txn_id_(0),
waiting_txn_id_(0),
waiting_cf_id_(0),
waiting_key_(nullptr),
expiration_time_(0),
......@@ -395,7 +394,7 @@ Status TransactionImpl::LockBatch(WriteBatch* batch,
for (const auto& key_iter : cfh_keys) {
const std::string& key = key_iter;
s = txn_db_impl_->TryLock(this, cfh_id, key);
s = txn_db_impl_->TryLock(this, cfh_id, key, true /* exclusive */);
if (!s.ok()) {
break;
}
......@@ -422,7 +421,7 @@ Status TransactionImpl::LockBatch(WriteBatch* batch,
// the snapshot time.
Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
const Slice& key, bool read_only,
bool untracked) {
bool exclusive, bool untracked) {
uint32_t cfh_id = GetColumnFamilyID(column_family);
std::string key_str = key.ToString();
bool previously_locked;
......@@ -448,7 +447,7 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
// lock this key if this transactions hasn't already locked it
if (!previously_locked) {
s = txn_db_impl_->TryLock(this, cfh_id, key_str);
s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive);
}
SetSnapshotIfNeeded();
......
......@@ -7,6 +7,7 @@
#ifndef ROCKSDB_LITE
#include <algorithm>
#include <atomic>
#include <mutex>
#include <stack>
......@@ -23,6 +24,7 @@
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "util/autovector.h"
#include "utilities/transactions/transaction_base.h"
#include "utilities/transactions/transaction_util.h"
......@@ -57,22 +59,31 @@ class TransactionImpl : public TransactionBaseImpl {
TransactionID GetID() const override { return txn_id_; }
TransactionID GetWaitingTxn(uint32_t* column_family_id,
const std::string** key) const override {
std::vector<TransactionID> GetWaitingTxns(uint32_t* column_family_id,
std::string* key) const override {
std::lock_guard<std::mutex> lock(wait_mutex_);
if (key) *key = waiting_key_;
std::vector<TransactionID> ids(waiting_txn_ids_.size());
if (key) *key = waiting_key_ ? *waiting_key_ : "";
if (column_family_id) *column_family_id = waiting_cf_id_;
return waiting_txn_id_;
std::copy(waiting_txn_ids_.begin(), waiting_txn_ids_.end(), ids.begin());
return ids;
}
void SetWaitingTxn(TransactionID id, uint32_t column_family_id,
void SetWaitingTxn(autovector<TransactionID> ids, uint32_t column_family_id,
const std::string* key) {
std::lock_guard<std::mutex> lock(wait_mutex_);
waiting_txn_id_ = id;
waiting_txn_ids_ = ids;
waiting_cf_id_ = column_family_id;
waiting_key_ = key;
}
void ClearWaitingTxn() {
std::lock_guard<std::mutex> lock(wait_mutex_);
waiting_txn_ids_.clear();
waiting_cf_id_ = 0;
waiting_key_ = nullptr;
}
// Returns the time (in microseconds according to Env->GetMicros())
// that this transaction will be expired. Returns 0 if this transaction does
// not expire.
......@@ -97,7 +108,8 @@ class TransactionImpl : public TransactionBaseImpl {
protected:
Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
bool read_only, bool untracked = false) override;
bool read_only, bool exclusive,
bool untracked = false) override;
private:
TransactionDBImpl* txn_db_impl_;
......@@ -109,10 +121,10 @@ class TransactionImpl : public TransactionBaseImpl {
// Unique ID for this transaction
TransactionID txn_id_;
// ID for the transaction that is blocking the current transaction.
// IDs for the transactions that are blocking the current transaction.
//
// 0 if current transaction is not waiting.
TransactionID waiting_txn_id_;
// empty if current transaction is not waiting.
autovector<TransactionID> waiting_txn_ids_;
// The following two represents the (cf, key) that a transaction is waiting
// on.
......@@ -125,7 +137,7 @@ class TransactionImpl : public TransactionBaseImpl {
uint32_t waiting_cf_id_;
const std::string* waiting_key_;
// Mutex protecting waiting_txn_id_, waiting_cf_id_ and waiting_key_.
// Mutex protecting waiting_txn_ids_, waiting_cf_id_ and waiting_key_.
mutable std::mutex wait_mutex_;
// If non-zero, this transaction should not be committed after this time (in
......
......@@ -22,7 +22,6 @@
#include "rocksdb/slice.h"
#include "rocksdb/utilities/transaction_db_mutex.h"
#include "util/autovector.h"
#include "util/murmurhash.h"
#include "util/sync_point.h"
#include "util/thread_local.h"
......@@ -31,15 +30,20 @@
namespace rocksdb {
struct LockInfo {
TransactionID txn_id;
bool exclusive;
autovector<TransactionID> txn_ids;
// Transaction locks are not valid after this time in us
uint64_t expiration_time;
LockInfo(TransactionID id, uint64_t time)
: txn_id(id), expiration_time(time) {}
LockInfo(TransactionID id, uint64_t time, bool ex)
: exclusive(ex), expiration_time(time) {
txn_ids.push_back(id);
}
LockInfo(const LockInfo& lock_info)
: txn_id(lock_info.txn_id), expiration_time(lock_info.expiration_time) {}
: exclusive(lock_info.exclusive),
txn_ids(lock_info.txn_ids),
expiration_time(lock_info.expiration_time) {}
};
struct LockMapStripe {
......@@ -192,7 +196,8 @@ std::shared_ptr<LockMap> TransactionLockMgr::GetLockMap(
// transaction.
// If false, sets *expire_time to the expiration time of the lock according
// to Env->GetMicros() or 0 if no expiration.
bool TransactionLockMgr::IsLockExpired(const LockInfo& lock_info, Env* env,
bool TransactionLockMgr::IsLockExpired(TransactionID txn_id,
const LockInfo& lock_info, Env* env,
uint64_t* expire_time) {
auto now = env->NowMicros();
......@@ -203,12 +208,18 @@ bool TransactionLockMgr::IsLockExpired(const LockInfo& lock_info, Env* env,
// return how many microseconds until lock will be expired
*expire_time = lock_info.expiration_time;
} else {
bool success =
txn_db_impl_->TryStealingExpiredTransactionLocks(lock_info.txn_id);
if (!success) {
expired = false;
for (auto id : lock_info.txn_ids) {
if (txn_id == id) {
continue;
}
bool success = txn_db_impl_->TryStealingExpiredTransactionLocks(id);
if (!success) {
expired = false;
break;
}
*expire_time = 0;
}
*expire_time = 0;
}
return expired;
......@@ -216,7 +227,8 @@ bool TransactionLockMgr::IsLockExpired(const LockInfo& lock_info, Env* env,
Status TransactionLockMgr::TryLock(TransactionImpl* txn,
uint32_t column_family_id,
const std::string& key, Env* env) {
const std::string& key, Env* env,
bool exclusive) {
// Lookup lock map for this column family id
std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
LockMap* lock_map = lock_map_ptr.get();
......@@ -233,7 +245,7 @@ Status TransactionLockMgr::TryLock(TransactionImpl* txn,
assert(lock_map->lock_map_stripes_.size() > stripe_num);
LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
LockInfo lock_info(txn->GetID(), txn->GetExpirationTime());
LockInfo lock_info(txn->GetID(), txn->GetExpirationTime(), exclusive);
int64_t timeout = txn->GetLockTimeout();
return AcquireWithTimeout(txn, lock_map, stripe, column_family_id, key, env,
......@@ -268,9 +280,9 @@ Status TransactionLockMgr::AcquireWithTimeout(
// Acquire lock if we are able to
uint64_t expire_time_hint = 0;
TransactionID wait_id = 0;
autovector<TransactionID> wait_ids;
result = AcquireLocked(lock_map, stripe, key, env, lock_info,
&expire_time_hint, &wait_id);
&expire_time_hint, &wait_ids);
if (!result.ok() && timeout != 0) {
// If we weren't able to acquire the lock, we will keep retrying as long
......@@ -289,19 +301,19 @@ Status TransactionLockMgr::AcquireWithTimeout(
cv_end_time = end_time;
}
assert(result.IsBusy() || wait_id != 0);
assert(result.IsBusy() || wait_ids.size() != 0);
// We are dependent on a transaction to finish, so perform deadlock
// detection.
if (wait_id != 0) {
if (wait_ids.size() != 0) {
if (txn->IsDeadlockDetect()) {
if (IncrementWaiters(txn, wait_id)) {
if (IncrementWaiters(txn, wait_ids)) {
result = Status::Busy(Status::SubCode::kDeadlock);
stripe->stripe_mutex->UnLock();
return result;
}
}
txn->SetWaitingTxn(wait_id, column_family_id, &key);
txn->SetWaitingTxn(wait_ids, column_family_id, &key);
}
TEST_SYNC_POINT("TransactionLockMgr::AcquireWithTimeout:WaitingTxn");
......@@ -316,10 +328,10 @@ Status TransactionLockMgr::AcquireWithTimeout(
}
}
if (wait_id != 0) {
txn->SetWaitingTxn(0, 0, nullptr);
if (wait_ids.size() != 0) {
txn->ClearWaitingTxn();
if (txn->IsDeadlockDetect()) {
DecrementWaiters(txn, wait_id);
DecrementWaiters(txn, wait_ids);
}
}
......@@ -332,7 +344,7 @@ Status TransactionLockMgr::AcquireWithTimeout(
if (result.ok() || result.IsTimedOut()) {
result = AcquireLocked(lock_map, stripe, key, env, lock_info,
&expire_time_hint, &wait_id);
&expire_time_hint, &wait_ids);
}
} while (!result.ok() && !timed_out);
}
......@@ -342,35 +354,40 @@ Status TransactionLockMgr::AcquireWithTimeout(
return result;
}
void TransactionLockMgr::DecrementWaiters(const TransactionImpl* txn,
TransactionID wait_id) {
void TransactionLockMgr::DecrementWaiters(
const TransactionImpl* txn, const autovector<TransactionID>& wait_ids) {
std::lock_guard<std::mutex> lock(wait_txn_map_mutex_);
DecrementWaitersImpl(txn, wait_id);
DecrementWaitersImpl(txn, wait_ids);
}
void TransactionLockMgr::DecrementWaitersImpl(const TransactionImpl* txn,
TransactionID wait_id) {
void TransactionLockMgr::DecrementWaitersImpl(
const TransactionImpl* txn, const autovector<TransactionID>& wait_ids) {
auto id = txn->GetID();
assert(wait_txn_map_.Contains(id));
wait_txn_map_.Delete(id);
rev_wait_txn_map_.Get(wait_id)--;
if (rev_wait_txn_map_.Get(wait_id) == 0) {
rev_wait_txn_map_.Delete(wait_id);
for (auto wait_id : wait_ids) {
rev_wait_txn_map_.Get(wait_id)--;
if (rev_wait_txn_map_.Get(wait_id) == 0) {
rev_wait_txn_map_.Delete(wait_id);
}
}
}
bool TransactionLockMgr::IncrementWaiters(const TransactionImpl* txn,
TransactionID wait_id) {
bool TransactionLockMgr::IncrementWaiters(
const TransactionImpl* txn, const autovector<TransactionID>& wait_ids) {
auto id = txn->GetID();
std::vector<TransactionID> queue(txn->GetDeadlockDetectDepth());
std::lock_guard<std::mutex> lock(wait_txn_map_mutex_);
assert(!wait_txn_map_.Contains(id));
wait_txn_map_.Insert(id, wait_id);
wait_txn_map_.Insert(id, wait_ids);
if (rev_wait_txn_map_.Contains(wait_id)) {
rev_wait_txn_map_.Get(wait_id)++;
} else {
rev_wait_txn_map_.Insert(wait_id, 1);
for (auto wait_id : wait_ids) {
if (rev_wait_txn_map_.Contains(wait_id)) {
rev_wait_txn_map_.Get(wait_id)++;
} else {
rev_wait_txn_map_.Insert(wait_id, 1);
}
}
// No deadlock if nobody is waiting on self.
......@@ -378,20 +395,36 @@ bool TransactionLockMgr::IncrementWaiters(const TransactionImpl* txn,
return false;
}
TransactionID next = wait_id;
for (int i = 0; i < txn->GetDeadlockDetectDepth(); i++) {
const auto* next_ids = &wait_ids;
for (int tail = 0, head = 0; head < txn->GetDeadlockDetectDepth(); head++) {
uint i = 0;
if (next_ids) {
for (; i < next_ids->size() && tail + i < txn->GetDeadlockDetectDepth();
i++) {
queue[tail + i] = (*next_ids)[i];
}
tail += i;
}
// No more items in the list, meaning no deadlock.
if (tail == head) {
return false;
}
auto next = queue[head];
if (next == id) {
DecrementWaitersImpl(txn, wait_id);
DecrementWaitersImpl(txn, wait_ids);
return true;
} else if (!wait_txn_map_.Contains(next)) {
return false;
next_ids = nullptr;
continue;
} else {
next = wait_txn_map_.Get(next);
next_ids = &wait_txn_map_.Get(next);
}
}
// Wait cycle too big, just assume deadlock.
DecrementWaitersImpl(txn, wait_id);
DecrementWaitersImpl(txn, wait_ids);
return true;
}
......@@ -404,24 +437,47 @@ Status TransactionLockMgr::AcquireLocked(LockMap* lock_map,
const std::string& key, Env* env,
const LockInfo& txn_lock_info,
uint64_t* expire_time,
TransactionID* txn_id) {
autovector<TransactionID>* txn_ids) {
assert(txn_lock_info.txn_ids.size() == 1);
Status result;
// Check if this key is already locked
if (stripe->keys.find(key) != stripe->keys.end()) {
// Lock already held
LockInfo& lock_info = stripe->keys.at(key);
if (lock_info.txn_id != txn_lock_info.txn_id) {
// locked by another txn. Check if it's expired
if (IsLockExpired(lock_info, env, expire_time)) {
// lock is expired, can steal it
lock_info.txn_id = txn_lock_info.txn_id;
assert(lock_info.txn_ids.size() == 1 || !lock_info.exclusive);
if (lock_info.exclusive || txn_lock_info.exclusive) {
if (lock_info.txn_ids.size() == 1 &&
lock_info.txn_ids[0] == txn_lock_info.txn_ids[0]) {
// The list contains one txn and we're it, so just take it.
lock_info.exclusive = txn_lock_info.exclusive;
lock_info.expiration_time = txn_lock_info.expiration_time;
// lock_cnt does not change
} else {
result = Status::TimedOut(Status::SubCode::kLockTimeout);
*txn_id = lock_info.txn_id;
// Check if it's expired. Skips over txn_lock_info.txn_ids[0] in case
// it's there for a shared lock with multiple holders which was not
// caught in the first case.
if (IsLockExpired(txn_lock_info.txn_ids[0], lock_info, env,
expire_time)) {
// lock is expired, can steal it
lock_info.txn_ids = txn_lock_info.txn_ids;
lock_info.exclusive = txn_lock_info.exclusive;
lock_info.expiration_time = txn_lock_info.expiration_time;
// lock_cnt does not change
} else {
result = Status::TimedOut(Status::SubCode::kLockTimeout);
*txn_ids = lock_info.txn_ids;
}
}
} else {
// We are requesting shared access to a shared lock, so just grant it.
lock_info.txn_ids.push_back(txn_lock_info.txn_ids[0]);
// Using std::max means that expiration time never goes down even when
// a transaction is removed from the list. The correct solution would be
// to track expiry for every transaction, but this would also work for
// now.
lock_info.expiration_time =
std::max(lock_info.expiration_time, txn_lock_info.expiration_time);
}
} else { // Lock not held.
// Check lock limit
......@@ -442,6 +498,42 @@ Status TransactionLockMgr::AcquireLocked(LockMap* lock_map,
return result;
}
void TransactionLockMgr::UnLockKey(const TransactionImpl* txn,
const std::string& key,
LockMapStripe* stripe, LockMap* lock_map,
Env* env) {
TransactionID txn_id = txn->GetID();
auto stripe_iter = stripe->keys.find(key);
if (stripe_iter != stripe->keys.end()) {
auto& txns = stripe_iter->second.txn_ids;
auto txn_it = std::find(txns.begin(), txns.end(), txn_id);
// Found the key we locked. unlock it.
if (txn_it != txns.end()) {
if (txns.size() == 1) {
stripe->keys.erase(stripe_iter);
} else {
auto last_it = txns.end() - 1;
if (txn_it != last_it) {
*txn_it = *last_it;
}
txns.pop_back();
}
if (max_num_locks_ > 0) {
// Maintain lock count if there is a limit on the number of locks.
assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0);
lock_map->lock_cnt--;
}
}
} else {
// This key is either not locked or locked by someone else. This should
// only happen if the unlocking transaction has expired.
assert(txn->GetExpirationTime() > 0 &&
txn->GetExpirationTime() < env->NowMicros());
}
}
void TransactionLockMgr::UnLock(TransactionImpl* txn, uint32_t column_family_id,
const std::string& key, Env* env) {
std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
......@@ -456,26 +548,8 @@ void TransactionLockMgr::UnLock(TransactionImpl* txn, uint32_t column_family_id,
assert(lock_map->lock_map_stripes_.size() > stripe_num);
LockMapStripe* stripe = lock_map->lock_map_stripes_.at(stripe_num);
TransactionID txn_id = txn->GetID();
stripe->stripe_mutex->Lock();
const auto& iter = stripe->keys.find(key);
if (iter != stripe->keys.end() && iter->second.txn_id == txn_id) {
// Found the key we locked. unlock it.
stripe->keys.erase(iter);
if (max_num_locks_ > 0) {
// Maintain lock count if there is a limit on the number of locks.
assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0);
lock_map->lock_cnt--;
}
} else {
// This key is either not locked or locked by someone else. This should
// only happen if the unlocking transaction has expired.
assert(txn->GetExpirationTime() > 0 &&
txn->GetExpirationTime() < env->NowMicros());
}
UnLockKey(txn, key, stripe, lock_map, env);
stripe->stripe_mutex->UnLock();
// Signal waiting threads to retry locking
......@@ -484,8 +558,6 @@ void TransactionLockMgr::UnLock(TransactionImpl* txn, uint32_t column_family_id,
void TransactionLockMgr::UnLock(const TransactionImpl* txn,
const TransactionKeyMap* key_map, Env* env) {
TransactionID txn_id = txn->GetID();
for (auto& key_map_iter : *key_map) {
uint32_t column_family_id = key_map_iter.first;
auto& keys = key_map_iter.second;
......@@ -520,22 +592,7 @@ void TransactionLockMgr::UnLock(const TransactionImpl* txn,
stripe->stripe_mutex->Lock();
for (const std::string* key : stripe_keys) {
const auto& iter = stripe->keys.find(*key);
if (iter != stripe->keys.end() && iter->second.txn_id == txn_id) {
// Found the key we locked. unlock it.
stripe->keys.erase(iter);
if (max_num_locks_ > 0) {
// Maintain lock count if there is a limit on the number of locks.
assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0);
lock_map->lock_cnt--;
}
} else {
// This key is either not locked or locked by someone else. This
// should only
// happen if the unlocking transaction has expired.
assert(txn->GetExpirationTime() > 0 &&
txn->GetExpirationTime() < env->NowMicros());
}
UnLockKey(txn, *key, stripe, lock_map, env);
}
stripe->stripe_mutex->UnLock();
......@@ -565,7 +622,13 @@ TransactionLockMgr::LockStatusData TransactionLockMgr::GetLockStatusData() {
for (const auto& j : stripes) {
j->stripe_mutex->Lock();
for (const auto& it : j->keys) {
data.insert({i, {it.first, it.second.txn_id}});
struct KeyLockInfo info;
info.exclusive = it.second.exclusive;
info.key = it.first;
for (const auto& id : it.second.txn_ids) {
info.ids.push_back(id);
}
data.insert({i, info});
}
}
}
......
......@@ -13,6 +13,7 @@
#include <vector>
#include "rocksdb/utilities/transaction.h"
#include "util/autovector.h"
#include "util/hash_map.h"
#include "util/instrumented_mutex.h"
#include "util/thread_local.h"
......@@ -47,7 +48,7 @@ class TransactionLockMgr {
// Attempt to lock key. If OK status is returned, the caller is responsible
// for calling UnLock() on this key.
Status TryLock(TransactionImpl* txn, uint32_t column_family_id,
const std::string& key, Env* env);
const std::string& key, Env* env, bool exclusive);
// Unlock a key locked by TryLock(). txn must be the same Transaction that
// locked this key.
......@@ -91,12 +92,13 @@ class TransactionLockMgr {
// Maps from waitee -> number of waiters.
HashMap<TransactionID, int> rev_wait_txn_map_;
// Maps from waiter -> waitee.
HashMap<TransactionID, TransactionID> wait_txn_map_;
HashMap<TransactionID, autovector<TransactionID>> wait_txn_map_;
// Used to allocate mutexes/condvars to use when locking keys
std::shared_ptr<TransactionDBMutexFactory> mutex_factory_;
bool IsLockExpired(const LockInfo& lock_info, Env* env, uint64_t* wait_time);
bool IsLockExpired(TransactionID txn_id, const LockInfo& lock_info, Env* env,
uint64_t* wait_time);
std::shared_ptr<LockMap> GetLockMap(uint32_t column_family_id);
......@@ -108,11 +110,17 @@ class TransactionLockMgr {
Status AcquireLocked(LockMap* lock_map, LockMapStripe* stripe,
const std::string& key, Env* env,
const LockInfo& lock_info, uint64_t* wait_time,
TransactionID* txn_id);
autovector<TransactionID>* txn_ids);
bool IncrementWaiters(const TransactionImpl* txn, TransactionID wait_id);
void DecrementWaiters(const TransactionImpl* txn, TransactionID wait_id);
void DecrementWaitersImpl(const TransactionImpl* txn, TransactionID wait_id);
void UnLockKey(const TransactionImpl* txn, const std::string& key,
LockMapStripe* stripe, LockMap* lock_map, Env* env);
bool IncrementWaiters(const TransactionImpl* txn,
const autovector<TransactionID>& wait_ids);
void DecrementWaiters(const TransactionImpl* txn,
const autovector<TransactionID>& wait_ids);
void DecrementWaitersImpl(const TransactionImpl* txn,
const autovector<TransactionID>& wait_ids);
// No copying allowed
TransactionLockMgr(const TransactionLockMgr&);
......
......@@ -197,11 +197,12 @@ TEST_P(TransactionTest, WaitingTxn) {
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"TransactionLockMgr::AcquireWithTimeout:WaitingTxn", [&](void* arg) {
const std::string* key;
std::string key;
uint32_t cf_id;
TransactionID wait = txn2->GetWaitingTxn(&cf_id, &key);
ASSERT_EQ(*key, "foo");
ASSERT_EQ(wait, id1);
std::vector<TransactionID> wait = txn2->GetWaitingTxns(&cf_id, &key);
ASSERT_EQ(key, "foo");
ASSERT_EQ(wait.size(), 1);
ASSERT_EQ(wait[0], id1);
ASSERT_EQ(cf_id, 0);
});
......@@ -225,7 +226,8 @@ TEST_P(TransactionTest, WaitingTxn) {
ASSERT_EQ(cf_iterator->first, 1);
// The locked key is "foo" and is locked by txn1
ASSERT_EQ(cf_iterator->second.key, "foo");
ASSERT_EQ(cf_iterator->second.id, txn1->GetID());
ASSERT_EQ(cf_iterator->second.ids.size(), 1);
ASSERT_EQ(cf_iterator->second.ids[0], txn1->GetID());
cf_iterator++;
......@@ -233,7 +235,8 @@ TEST_P(TransactionTest, WaitingTxn) {
ASSERT_EQ(cf_iterator->first, 0);
// The locked key is "foo" and is locked by txn1
ASSERT_EQ(cf_iterator->second.key, "foo");
ASSERT_EQ(cf_iterator->second.id, txn1->GetID());
ASSERT_EQ(cf_iterator->second.ids.size(), 1);
ASSERT_EQ(cf_iterator->second.ids[0], txn1->GetID());
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
......@@ -249,6 +252,170 @@ TEST_P(TransactionTest, WaitingTxn) {
delete txn2;
}
TEST_P(TransactionTest, SharedLocks) {
WriteOptions write_options;
ReadOptions read_options;
TransactionOptions txn_options;
Status s;
txn_options.lock_timeout = 1;
s = db->Put(write_options, Slice("foo"), Slice("bar"));
ASSERT_OK(s);
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
Transaction* txn3 = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txn1);
ASSERT_TRUE(txn2);
ASSERT_TRUE(txn3);
// Test shared access between txns
s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
ASSERT_OK(s);
s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
ASSERT_OK(s);
s = txn3->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
ASSERT_OK(s);
auto lock_data = db->GetLockStatusData();
ASSERT_EQ(lock_data.size(), 1);
auto cf_iterator = lock_data.begin();
ASSERT_EQ(cf_iterator->second.key, "foo");
// We compare whether the set of txns locking this key is the same. To do
// this, we need to sort both vectors so that the comparison is done
// correctly.
std::vector<TransactionID> expected_txns = {txn1->GetID(), txn2->GetID(),
txn3->GetID()};
std::vector<TransactionID> lock_txns = cf_iterator->second.ids;
ASSERT_EQ(expected_txns, lock_txns);
ASSERT_FALSE(cf_iterator->second.exclusive);
txn1->Rollback();
txn2->Rollback();
txn3->Rollback();
// Test txn1 and txn2 sharing a lock and txn3 trying to obtain it.
s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
ASSERT_OK(s);
s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
ASSERT_OK(s);
s = txn3->GetForUpdate(read_options, "foo", nullptr);
ASSERT_TRUE(s.IsTimedOut());
ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
txn1->UndoGetForUpdate("foo");
s = txn3->GetForUpdate(read_options, "foo", nullptr);
ASSERT_TRUE(s.IsTimedOut());
ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
txn2->UndoGetForUpdate("foo");
s = txn3->GetForUpdate(read_options, "foo", nullptr);
ASSERT_OK(s);
txn1->Rollback();
txn2->Rollback();
txn3->Rollback();
// Test txn1 holding an exclusive lock and txn2 trying to obtain shared
// access.
s = txn1->GetForUpdate(read_options, "foo", nullptr);
ASSERT_OK(s);
s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
ASSERT_TRUE(s.IsTimedOut());
ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
txn1->UndoGetForUpdate("foo");
s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
ASSERT_OK(s);
delete txn1;
delete txn2;
delete txn3;
}
TEST_P(TransactionTest, DeadlockCycleShared) {
WriteOptions write_options;
ReadOptions read_options;
TransactionOptions txn_options;
txn_options.lock_timeout = 1000000;
txn_options.deadlock_detect = true;
// Set up a wait for chain like this:
//
// Tn -> T(n*2)
// Tn -> T(n*2 + 1)
//
// So we have:
// T1 -> T2 -> T4 ...
// | |> T5 ...
// |> T3 -> T6 ...
// |> T7 ...
// up to T31, then T[16 - 31] -> T1.
// Note that Tn holds lock on floor(n / 2).
std::vector<Transaction*> txns(31);
for (uint32_t i = 0; i < 31; i++) {
txns[i] = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txns[i]);
auto s = txns[i]->GetForUpdate(read_options, ToString((i + 1) / 2), nullptr,
false /* exclusive */);
ASSERT_OK(s);
}
std::atomic<uint32_t> checkpoints(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"TransactionLockMgr::AcquireWithTimeout:WaitingTxn",
[&](void* arg) { checkpoints.fetch_add(1); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// We want the leaf transactions to block and hold everyone back.
std::vector<std::thread> threads;
for (uint32_t i = 0; i < 15; i++) {
std::function<void()> blocking_thread = [&, i] {
auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr,
true /* exclusive */);
ASSERT_OK(s);
txns[i]->Rollback();
delete txns[i];
};
threads.emplace_back(blocking_thread);
}
// Wait until all threads are waiting on each other.
while (checkpoints.load() != 15) {
/* sleep override */
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
// Complete the cycle T[16 - 31] -> T1
for (uint32_t i = 15; i < 31; i++) {
auto s =
txns[i]->GetForUpdate(read_options, "0", nullptr, true /* exclusive */);
ASSERT_TRUE(s.IsDeadlock());
}
// Rollback the leaf transaction.
for (uint32_t i = 15; i < 31; i++) {
txns[i]->Rollback();
delete txns[i];
}
for (auto& t : threads) {
t.join();
}
}
TEST_P(TransactionTest, DeadlockCycle) {
WriteOptions write_options;
ReadOptions read_options;
......@@ -345,7 +512,9 @@ TEST_P(TransactionTest, DeadlockStress) {
// Lock keys in random order.
for (const auto& k : random_keys) {
auto s = txn->GetForUpdate(read_options, k, nullptr);
// Lock mostly for shared access, but exclusive 1/4 of the time.
auto s =
txn->GetForUpdate(read_options, k, nullptr, txn->GetID() % 4 == 0);
if (!s.ok()) {
ASSERT_TRUE(s.IsDeadlock());
txn->Rollback();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册