提交 6f02cd14 编写于 作者: A Alexey Milovidov

Added a kludge (experimental)

上级 23b28e6c
......@@ -447,6 +447,7 @@ namespace ErrorCodes
extern const int QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW = 470;
extern const int SETTINGS_ARE_NOT_SUPPORTED = 471;
extern const int IMMUTABLE_SETTING = 472;
extern const int DEADLOCK_AVOIDED = 473;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;
......
......@@ -4,6 +4,8 @@
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <cassert>
namespace ProfileEvents
{
......@@ -29,6 +31,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int DEADLOCK_AVOIDED;
}
......@@ -53,6 +56,44 @@ public:
};
namespace
{
/// Global information about all read locks that query has. It is needed to avoid some type of deadlocks.
class QueryLockInfo
{
private:
std::mutex mutex;
std::map<std::string, size_t> queries;
public:
void add(const String & query_id)
{
std::lock_guard lock(mutex);
++queries[query_id];
}
void remove(const String & query_id)
{
std::lock_guard lock(mutex);
auto it = queries.find(query_id);
assert(it != queries.end());
if (--it->second == 0)
queries.erase(it);
}
void check(const String & query_id)
{
std::lock_guard lock(mutex);
if (queries.count(query_id))
throw Exception("Deadlock avoided. Client must retry.", ErrorCodes::DEADLOCK_AVOIDED);
}
};
QueryLockInfo all_read_locks;
}
RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id)
{
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
......@@ -95,8 +136,26 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String &
return existing_holder_ptr;
}
/** If the query already has any active read lock and tries to acquire another read lock
* but it is not in front of the queue and has to wait, deadlock is possible:
*
* Example (four queries, two RWLocks - 'a' and 'b'):
*
* --> time -->
*
* q1: ra rb
* q2: wa
* q3: rb ra
* q4: wb
*
* We will throw an exception instead.
*/
if (type == Type::Write || queue.empty() || queue.back().type == Type::Write)
{
if (queue.back().type == Type::Write && query_id != RWLockImpl::NO_QUERY)
all_read_locks.check(query_id);
/// Create new group of clients
it_group = queue.emplace(queue.end(), type);
}
......@@ -104,6 +163,9 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String &
{
/// Will append myself to last group
it_group = std::prev(queue.end());
if (it_group != queue.begin() && query_id != RWLockImpl::NO_QUERY)
all_read_locks.check(query_id);
}
/// Append myself to the end of chosen group
......@@ -130,7 +192,12 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String &
res->thread_id = this_thread_id;
if (query_id != RWLockImpl::NO_QUERY)
{
query_id_to_holder.emplace(query_id, res);
if (type == Type::Read)
all_read_locks.add(query_id);
}
res->query_id = query_id;
finalize_metrics();
......@@ -140,12 +207,15 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String &
RWLockImpl::LockHolderImpl::~LockHolderImpl()
{
std::unique_lock lock(parent->mutex);
std::lock_guard lock(parent->mutex);
/// Remove weak_ptrs to the holder, since there are no owners of the current lock
parent->thread_to_holder.erase(thread_id);
parent->query_id_to_holder.erase(query_id);
if (*it_client == RWLockImpl::Read && query_id != RWLockImpl::NO_QUERY)
all_read_locks.remove(query_id);
/// Removes myself from client list of our group
it_group->clients.erase(it_client);
......@@ -166,6 +236,7 @@ RWLockImpl::LockHolderImpl::LockHolderImpl(RWLock && parent_, RWLockImpl::Groups
: parent{std::move(parent_)}, it_group{it_group_}, it_client{it_client_},
active_client_increment{(*it_client == RWLockImpl::Read) ? CurrentMetrics::RWLockActiveReaders
: CurrentMetrics::RWLockActiveWriters}
{}
{
}
}
......@@ -13,6 +13,14 @@
using namespace DB;
namespace DB
{
namespace ErrorCodes
{
extern const int DEADLOCK_AVOIDED;
}
}
TEST(Common, RWLock_1)
{
......@@ -123,6 +131,74 @@ TEST(Common, RWLock_Recursive)
}
TEST(Common, RWLock_Deadlock)
{
static auto lock1 = RWLockImpl::create();
static auto lock2 = RWLockImpl::create();
/**
* q1: r1 r2
* q2: w1
* q3: r2 r1
* q4: w2
*/
std::thread t1([&] ()
{
auto holder1 = lock1->getLock(RWLockImpl::Read, "q1");
usleep(100000);
usleep(100000);
usleep(100000);
try
{
auto holder2 = lock2->getLock(RWLockImpl::Read, "q1");
}
catch (const Exception & e)
{
if (e.code() != ErrorCodes::DEADLOCK_AVOIDED)
throw;
}
});
std::thread t2([&] ()
{
usleep(100000);
auto holder1 = lock1->getLock(RWLockImpl::Write, "q2");
});
std::thread t3([&] ()
{
usleep(100000);
usleep(100000);
auto holder2 = lock2->getLock(RWLockImpl::Read, "q3");
usleep(100000);
usleep(100000);
try
{
auto holder1 = lock1->getLock(RWLockImpl::Read, "q3");
}
catch (const Exception & e)
{
if (e.code() != ErrorCodes::DEADLOCK_AVOIDED)
throw;
}
});
std::thread t4([&] ()
{
usleep(100000);
usleep(100000);
usleep(100000);
auto holder2 = lock2->getLock(RWLockImpl::Write, "q4");
});
t1.join();
t2.join();
t3.join();
t4.join();
}
TEST(Common, RWLock_PerfTest_Readers)
{
constexpr int cycles = 100000; // 100k
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册