diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index 87ab252c5830d488bf59c4e0c71c8aec982d9d50..f1c03c5fd0ed0abf8c5e60f576b0c45e62234352 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -447,6 +447,7 @@ namespace ErrorCodes extern const int QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW = 470; extern const int SETTINGS_ARE_NOT_SUPPORTED = 471; extern const int IMMUTABLE_SETTING = 472; + extern const int DEADLOCK_AVOIDED = 473; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/Common/RWLock.cpp b/dbms/src/Common/RWLock.cpp index e343ce0b0cd994ac170158db20c34402fef4e5fe..91d6e759c468ce49d0a8a761e069ae5aa220e4b8 100644 --- a/dbms/src/Common/RWLock.cpp +++ b/dbms/src/Common/RWLock.cpp @@ -4,6 +4,8 @@ #include #include +#include + namespace ProfileEvents { @@ -29,6 +31,7 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; + extern const int DEADLOCK_AVOIDED; } @@ -53,6 +56,44 @@ public: }; +namespace +{ + /// Global information about all read locks that query has. It is needed to avoid some type of deadlocks. + + class QueryLockInfo + { + private: + std::mutex mutex; + std::map queries; + + public: + void add(const String & query_id) + { + std::lock_guard lock(mutex); + ++queries[query_id]; + } + + void remove(const String & query_id) + { + std::lock_guard lock(mutex); + auto it = queries.find(query_id); + assert(it != queries.end()); + if (--it->second == 0) + queries.erase(it); + } + + void check(const String & query_id) + { + std::lock_guard lock(mutex); + if (queries.count(query_id)) + throw Exception("Deadlock avoided. Client must retry.", ErrorCodes::DEADLOCK_AVOIDED); + } + }; + + QueryLockInfo all_read_locks; +} + + RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id) { Stopwatch watch(CLOCK_MONOTONIC_COARSE); @@ -95,8 +136,26 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String & return existing_holder_ptr; } + /** If the query already has any active read lock and tries to acquire another read lock + * but it is not in front of the queue and has to wait, deadlock is possible: + * + * Example (four queries, two RWLocks - 'a' and 'b'): + * + * --> time --> + * + * q1: ra rb + * q2: wa + * q3: rb ra + * q4: wb + * + * We will throw an exception instead. + */ + if (type == Type::Write || queue.empty() || queue.back().type == Type::Write) { + if (queue.back().type == Type::Write && query_id != RWLockImpl::NO_QUERY) + all_read_locks.check(query_id); + /// Create new group of clients it_group = queue.emplace(queue.end(), type); } @@ -104,6 +163,9 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String & { /// Will append myself to last group it_group = std::prev(queue.end()); + + if (it_group != queue.begin() && query_id != RWLockImpl::NO_QUERY) + all_read_locks.check(query_id); } /// Append myself to the end of chosen group @@ -130,7 +192,12 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String & res->thread_id = this_thread_id; if (query_id != RWLockImpl::NO_QUERY) + { query_id_to_holder.emplace(query_id, res); + + if (type == Type::Read) + all_read_locks.add(query_id); + } res->query_id = query_id; finalize_metrics(); @@ -140,12 +207,15 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String & RWLockImpl::LockHolderImpl::~LockHolderImpl() { - std::unique_lock lock(parent->mutex); + std::lock_guard lock(parent->mutex); /// Remove weak_ptrs to the holder, since there are no owners of the current lock parent->thread_to_holder.erase(thread_id); parent->query_id_to_holder.erase(query_id); + if (*it_client == RWLockImpl::Read && query_id != RWLockImpl::NO_QUERY) + all_read_locks.remove(query_id); + /// Removes myself from client list of our group it_group->clients.erase(it_client); @@ -166,6 +236,7 @@ RWLockImpl::LockHolderImpl::LockHolderImpl(RWLock && parent_, RWLockImpl::Groups : parent{std::move(parent_)}, it_group{it_group_}, it_client{it_client_}, active_client_increment{(*it_client == RWLockImpl::Read) ? CurrentMetrics::RWLockActiveReaders : CurrentMetrics::RWLockActiveWriters} -{} +{ +} } diff --git a/dbms/src/Common/tests/gtest_rw_lock.cpp b/dbms/src/Common/tests/gtest_rw_lock.cpp index 68927c8bc4a044f149769cf2bcdd78f2c09a9d33..6826c885ae0402b2e46c5bb9b6382209c65d3a1a 100644 --- a/dbms/src/Common/tests/gtest_rw_lock.cpp +++ b/dbms/src/Common/tests/gtest_rw_lock.cpp @@ -13,6 +13,14 @@ using namespace DB; +namespace DB +{ + namespace ErrorCodes + { + extern const int DEADLOCK_AVOIDED; + } +} + TEST(Common, RWLock_1) { @@ -123,6 +131,74 @@ TEST(Common, RWLock_Recursive) } +TEST(Common, RWLock_Deadlock) +{ + static auto lock1 = RWLockImpl::create(); + static auto lock2 = RWLockImpl::create(); + + /** + * q1: r1 r2 + * q2: w1 + * q3: r2 r1 + * q4: w2 + */ + + std::thread t1([&] () + { + auto holder1 = lock1->getLock(RWLockImpl::Read, "q1"); + usleep(100000); + usleep(100000); + usleep(100000); + try + { + auto holder2 = lock2->getLock(RWLockImpl::Read, "q1"); + } + catch (const Exception & e) + { + if (e.code() != ErrorCodes::DEADLOCK_AVOIDED) + throw; + } + }); + + std::thread t2([&] () + { + usleep(100000); + auto holder1 = lock1->getLock(RWLockImpl::Write, "q2"); + }); + + std::thread t3([&] () + { + usleep(100000); + usleep(100000); + auto holder2 = lock2->getLock(RWLockImpl::Read, "q3"); + usleep(100000); + usleep(100000); + try + { + auto holder1 = lock1->getLock(RWLockImpl::Read, "q3"); + } + catch (const Exception & e) + { + if (e.code() != ErrorCodes::DEADLOCK_AVOIDED) + throw; + } + }); + + std::thread t4([&] () + { + usleep(100000); + usleep(100000); + usleep(100000); + auto holder2 = lock2->getLock(RWLockImpl::Write, "q4"); + }); + + t1.join(); + t2.join(); + t3.join(); + t4.join(); +} + + TEST(Common, RWLock_PerfTest_Readers) { constexpr int cycles = 100000; // 100k