RWLock.cpp 5.3 KB
Newer Older
I
Ivan Lezhankin 已提交
1
#include "RWLock.h"
2
#include <Common/Stopwatch.h>
3
#include <Common/Exception.h>
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>


namespace ProfileEvents
{
    extern const Event RWLockAcquiredReadLocks;
    extern const Event RWLockAcquiredWriteLocks;
    extern const Event RWLockReadersWaitMilliseconds;
    extern const Event RWLockWritersWaitMilliseconds;
}


namespace CurrentMetrics
{
    extern const Metric RWLockWaitingReaders;
    extern const Metric RWLockWaitingWriters;
    extern const Metric RWLockActiveReaders;
    extern const Metric RWLockActiveWriters;
}
24

25 26 27 28

namespace DB
{

29 30 31 32 33
namespace ErrorCodes
{
    extern const int LOGICAL_ERROR;
}

34

35
class RWLockImpl::LockHolderImpl
36
{
I
Ivan Lezhankin 已提交
37
    RWLock parent;
38 39
    GroupsContainer::iterator it_group;
    ClientsContainer::iterator it_client;
40 41
    ThreadToHolder::key_type thread_id;
    QueryIdToHolder::key_type query_id;
42 43
    CurrentMetrics::Increment active_client_increment;

44
    LockHolderImpl(RWLock && parent, GroupsContainer::iterator it_group, ClientsContainer::iterator it_client);
45 46 47

public:

48
    LockHolderImpl(const LockHolderImpl & other) = delete;
49

50
    ~LockHolderImpl();
51

I
Ivan Lezhankin 已提交
52
    friend class RWLockImpl;
53 54 55
};


56
RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id)
57 58 59 60 61 62 63 64 65 66 67
{
    Stopwatch watch(CLOCK_MONOTONIC_COARSE);
    CurrentMetrics::Increment waiting_client_increment((type == Read) ? CurrentMetrics::RWLockWaitingReaders
                                                                      : CurrentMetrics::RWLockWaitingWriters);
    auto finalize_metrics = [type, &watch] ()
    {
        ProfileEvents::increment((type == Read) ? ProfileEvents::RWLockAcquiredReadLocks
                                                : ProfileEvents::RWLockAcquiredWriteLocks);
        ProfileEvents::increment((type == Read) ? ProfileEvents::RWLockReadersWaitMilliseconds
                                                : ProfileEvents::RWLockWritersWaitMilliseconds, watch.elapsedMilliseconds());
    };
68

69 70
    GroupsContainer::iterator it_group;
    ClientsContainer::iterator it_client;
71

A
Alexey Milovidov 已提交
72
    std::unique_lock lock(mutex);
73

74
    /// Check if the same query is acquiring previously acquired lock
75
    LockHolder existing_holder_ptr;
76 77

    auto this_thread_id = std::this_thread::get_id();
78
    auto it_thread = thread_to_holder.find(this_thread_id);
79

80
    auto it_query = query_id_to_holder.end();
81
    if (query_id != RWLockImpl::NO_QUERY)
82
        it_query = query_id_to_holder.find(query_id);
83

84 85 86 87
    if (it_thread != thread_to_holder.end())
        existing_holder_ptr = it_thread->second.lock();
    else if (it_query != query_id_to_holder.end())
        existing_holder_ptr = it_query->second.lock();
88

89
    if (existing_holder_ptr)
90 91
    {
        /// XXX: it means we can't upgrade lock from read to write - with proper waiting!
92
        if (type != Read || existing_holder_ptr->it_group->type != Read)
93 94
            throw Exception("Attempt to acquire exclusive lock recursively", ErrorCodes::LOGICAL_ERROR);

95
        return existing_holder_ptr;
96 97
    }

98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
    if (type == Type::Write || queue.empty() || queue.back().type == Type::Write)
    {
        /// Create new group of clients
        it_group = queue.emplace(queue.end(), type);
    }
    else
    {
        /// Will append myself to last group
        it_group = std::prev(queue.end());
    }

    /// Append myself to the end of chosen group
    auto & clients = it_group->clients;
    try
    {
113
        it_client = clients.emplace(clients.end(), type);
114 115 116 117 118 119 120 121 122
    }
    catch (...)
    {
        /// Remove group if it was the first client in the group and an error occurred
        if (clients.empty())
            queue.erase(it_group);
        throw;
    }

123
    LockHolder res(new LockHolderImpl(shared_from_this(), it_group, it_client));
124

125 126 127
    /// Wait a notification until we will be the only in the group.
    it_group->cv.wait(lock, [&] () { return it_group == queue.begin(); });

128
    /// Insert myself (weak_ptr to the holder) to threads set to implement recursive lock
129 130
    thread_to_holder.emplace(this_thread_id, res);
    res->thread_id = this_thread_id;
131

132
    if (query_id != RWLockImpl::NO_QUERY)
133 134
        query_id_to_holder.emplace(query_id, res);
    res->query_id = query_id;
135

136
    finalize_metrics();
137 138 139 140
    return res;
}


141
RWLockImpl::LockHolderImpl::~LockHolderImpl()
142
{
A
Alexey Milovidov 已提交
143
    std::unique_lock lock(parent->mutex);
144

145
    /// Remove weak_ptrs to the holder, since there are no owners of the current lock
146 147
    parent->thread_to_holder.erase(thread_id);
    parent->query_id_to_holder.erase(query_id);
148 149 150

    /// Removes myself from client list of our group
    it_group->clients.erase(it_client);
151

152 153
    /// Remove the group if we were the last client and notify the next group
    if (it_group->clients.empty())
154
    {
155 156
        auto & parent_queue = parent->queue;
        parent_queue.erase(it_group);
157

158 159
        if (!parent_queue.empty())
            parent_queue.front().cv.notify_all();
160 161 162 163
    }
}


164
RWLockImpl::LockHolderImpl::LockHolderImpl(RWLock && parent, RWLockImpl::GroupsContainer::iterator it_group,
I
Ivan Lezhankin 已提交
165
                                             RWLockImpl::ClientsContainer::iterator it_client)
166
    : parent{std::move(parent)}, it_group{it_group}, it_client{it_client},
167 168
      active_client_increment{(*it_client == RWLockImpl::Read) ? CurrentMetrics::RWLockActiveReaders
                                                               : CurrentMetrics::RWLockActiveWriters}
169
{}
170 171

}