提交 e107a328 编写于 作者: fengqikai1414's avatar fengqikai1414 提交者: fengqikai1414

framework: optimize rw lock

上级 e9dcb4c5
#include "cybertron/base/atomic_rw_lock.h"
#include <atomic>
#include <mutex>
#include <string>
#include "cybertron/base/atomic_rw_lock.h"
#include "cybertron/time/time.h"
using apollo::cybertron::base::AtomicRWLock;
using apollo::cybertron::base::ReadLockGuard;
using apollo::cybertron::base::WriteLockGuard;
volatile bool ready = false;
int main(int argc, char const *argv[]) {
if (argc != 5) {
......@@ -15,11 +20,11 @@ int main(int argc, char const *argv[]) {
return 0;
}
apollo::cybertron::base::AtomicRWLock rw_lock(false);
AtomicRWLock rw_lock(false);
std::mutex mutex;
auto start = apollo::cybertron::Time::MonoTime();
for (int i = 0; i < 1000000; i++) {
apollo::cybertron::base::WriteLockGuard lg(rw_lock);
WriteLockGuard<AtomicRWLock> lg(rw_lock);
}
auto end = apollo::cybertron::Time::MonoTime();
std::cout << "rw lock 1000000 times: " << end - start << std::endl;
......@@ -46,7 +51,7 @@ int main(int argc, char const *argv[]) {
asm volatile("rep; nop" ::: "memory");
}
for (int j = 0; j < read_lock_times; j++) {
apollo::cybertron::base::ReadLockGuard lg(rw_lock);
ReadLockGuard<AtomicRWLock> lg(rw_lock);
usleep(1);
}
count++;
......@@ -59,7 +64,7 @@ int main(int argc, char const *argv[]) {
asm volatile("rep; nop" ::: "memory");
}
for (int j = 0; j < write_lock_times; j++) {
apollo::cybertron::base::WriteLockGuard lg(rw_lock);
WriteLockGuard<AtomicRWLock> lg(rw_lock);
usleep(1);
}
count++;
......
......@@ -25,24 +25,20 @@
#include <iostream>
#include <mutex>
#include <thread>
#include "cybertron/base/rw_lock_guard.h"
namespace apollo {
namespace cybertron {
namespace base {
class ReadLockGuard;
class WriteLockGuard;
static const std::thread::id NULL_THREAD_ID = std::thread::id();
class AtomicRWLock {
friend class ReadLockGuard;
friend class WriteLockGuard;
friend class ReadLockGuard<AtomicRWLock>;
friend class WriteLockGuard<AtomicRWLock>;
public:
static const int32_t RW_LOCK_FREE = 0;
static const int32_t WRITE_EXCLUSIVE = -1;
static const uint32_t MAX_RETRY_TIMES = 5;
static const std::thread::id null_thread;
AtomicRWLock() {}
explicit AtomicRWLock(bool write_first) : write_first_(write_first) {}
......@@ -56,45 +52,12 @@ class AtomicRWLock {
AtomicRWLock(const AtomicRWLock&) = delete;
AtomicRWLock& operator=(const AtomicRWLock&) = delete;
std::thread::id write_thread_id_ = {NULL_THREAD_ID};
std::atomic<uint32_t> write_lock_wait_num_ = {0};
std::atomic<int32_t> lock_num_ = {0};
bool write_first_ = true;
};
class ReadLockGuard {
public:
explicit ReadLockGuard(AtomicRWLock& lock) : rw_lock_(lock) {
rw_lock_.ReadLock();
}
~ReadLockGuard() { rw_lock_.ReadUnlock(); }
private:
ReadLockGuard(const ReadLockGuard& other) = delete;
ReadLockGuard& operator=(const ReadLockGuard& other) = delete;
AtomicRWLock& rw_lock_;
};
class WriteLockGuard {
public:
explicit WriteLockGuard(AtomicRWLock& lock) : rw_lock_(lock) {
rw_lock_.WriteLock();
}
~WriteLockGuard() { rw_lock_.WriteUnlock(); }
private:
WriteLockGuard(const WriteLockGuard& other) = delete;
WriteLockGuard& operator=(const WriteLockGuard& other) = delete;
AtomicRWLock& rw_lock_;
};
inline void AtomicRWLock::ReadLock() {
if (write_thread_id_ == std::this_thread::get_id()) {
return;
}
uint32_t retry_times = 0;
int32_t lock_num = lock_num_.load();
if (write_first_) {
......@@ -127,11 +90,6 @@ inline void AtomicRWLock::ReadLock() {
}
inline void AtomicRWLock::WriteLock() {
auto this_thread_id = std::this_thread::get_id();
if (write_thread_id_ == this_thread_id) {
lock_num_.fetch_sub(1);
return;
}
int32_t rw_lock_free = RW_LOCK_FREE;
uint32_t retry_times = 0;
write_lock_wait_num_.fetch_add(1);
......@@ -146,22 +104,12 @@ inline void AtomicRWLock::WriteLock() {
retry_times = 0;
}
}
write_thread_id_ = this_thread_id;
write_lock_wait_num_.fetch_sub(1);
}
inline void AtomicRWLock::ReadUnlock() {
if (write_thread_id_ == std::this_thread::get_id()) {
return;
}
lock_num_.fetch_sub(1);
}
inline void AtomicRWLock::ReadUnlock() { lock_num_.fetch_sub(1); }
inline void AtomicRWLock::WriteUnlock() {
if (lock_num_.fetch_add(1) == WRITE_EXCLUSIVE) {
write_thread_id_ = NULL_THREAD_ID;
}
}
inline void AtomicRWLock::WriteUnlock() { lock_num_.fetch_add(1); }
} // namespace base
} // namespace cybertron
......
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef CYBERTRON_BASE_REENTRANT_RW_LOCK_H_
#define CYBERTRON_BASE_REENTRANT_RW_LOCK_H_
#include <stdint.h>
#include <unistd.h>
#include <atomic>
#include <condition_variable>
#include <cstdlib>
#include <iostream>
#include <mutex>
#include <thread>
#include "cybertron/base/rw_lock_guard.h"
namespace apollo {
namespace cybertron {
namespace base {
static const std::thread::id NULL_THREAD_ID = std::thread::id();
class ReentrantRWLock {
friend class ReadLockGuard<ReentrantRWLock>;
friend class WriteLockGuard<ReentrantRWLock>;
public:
static const int32_t RW_LOCK_FREE = 0;
static const int32_t WRITE_EXCLUSIVE = -1;
static const uint32_t MAX_RETRY_TIMES = 5;
static const std::thread::id null_thread;
ReentrantRWLock() {}
explicit ReentrantRWLock(bool write_first) : write_first_(write_first) {}
private:
// all these function only can used by ReadLockGuard/WriteLockGuard;
void ReadLock();
void WriteLock();
void ReadUnlock();
void WriteUnlock();
ReentrantRWLock(const ReentrantRWLock&) = delete;
ReentrantRWLock& operator=(const ReentrantRWLock&) = delete;
std::thread::id write_thread_id_ = {NULL_THREAD_ID};
std::atomic<uint32_t> write_lock_wait_num_ = {0};
std::atomic<int32_t> lock_num_ = {0};
bool write_first_ = true;
};
inline void ReentrantRWLock::ReadLock() {
if (write_thread_id_ == std::this_thread::get_id()) {
return;
}
uint32_t retry_times = 0;
int32_t lock_num = lock_num_.load();
if (write_first_) {
do {
while (lock_num < RW_LOCK_FREE || write_lock_wait_num_.load() > 0) {
if (++retry_times == MAX_RETRY_TIMES) {
// saving cpu
std::this_thread::yield();
retry_times = 0;
}
lock_num = lock_num_.load();
}
} while (!lock_num_.compare_exchange_weak(lock_num, lock_num + 1,
std::memory_order_acquire,
std::memory_order_relaxed));
} else {
do {
while (lock_num < RW_LOCK_FREE) {
if (++retry_times == MAX_RETRY_TIMES) {
// saving cpu
std::this_thread::yield();
retry_times = 0;
}
lock_num = lock_num_.load();
}
} while (!lock_num_.compare_exchange_weak(lock_num, lock_num + 1,
std::memory_order_acquire,
std::memory_order_relaxed));
}
}
inline void ReentrantRWLock::WriteLock() {
auto this_thread_id = std::this_thread::get_id();
if (write_thread_id_ == this_thread_id) {
lock_num_.fetch_sub(1);
return;
}
int32_t rw_lock_free = RW_LOCK_FREE;
uint32_t retry_times = 0;
write_lock_wait_num_.fetch_add(1);
while (!lock_num_.compare_exchange_weak(rw_lock_free, WRITE_EXCLUSIVE,
std::memory_order_acquire,
std::memory_order_relaxed)) {
// rw_lock_free will change after CAS fail, so init agin
rw_lock_free = RW_LOCK_FREE;
if (++retry_times == MAX_RETRY_TIMES) {
// saving cpu
std::this_thread::yield();
retry_times = 0;
}
}
write_thread_id_ = this_thread_id;
write_lock_wait_num_.fetch_sub(1);
}
inline void ReentrantRWLock::ReadUnlock() {
if (write_thread_id_ == std::this_thread::get_id()) {
return;
}
lock_num_.fetch_sub(1);
}
inline void ReentrantRWLock::WriteUnlock() {
if (lock_num_.fetch_add(1) == WRITE_EXCLUSIVE) {
write_thread_id_ = NULL_THREAD_ID;
}
}
} // namespace base
} // namespace cybertron
} // namespace apollo
#endif // CYBERTRON_BASE_REENTRANT_RW_LOCK_H_
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef CYBERTRON_BASE_RW_LOCK_GUARD_H_
#define CYBERTRON_BASE_RW_LOCK_GUARD_H_
#include <stdint.h>
#include <unistd.h>
#include <atomic>
#include <condition_variable>
#include <cstdlib>
#include <iostream>
#include <mutex>
#include <thread>
namespace apollo {
namespace cybertron {
namespace base {
template <typename RWLock>
class ReadLockGuard {
public:
explicit ReadLockGuard(RWLock& lock) : rw_lock_(lock) { rw_lock_.ReadLock(); }
~ReadLockGuard() { rw_lock_.ReadUnlock(); }
private:
ReadLockGuard(const ReadLockGuard& other) = delete;
ReadLockGuard& operator=(const ReadLockGuard& other) = delete;
RWLock& rw_lock_;
};
template <typename RWLock>
class WriteLockGuard {
public:
explicit WriteLockGuard(RWLock& lock) : rw_lock_(lock) {
rw_lock_.WriteLock();
}
~WriteLockGuard() { rw_lock_.WriteUnlock(); }
private:
WriteLockGuard(const WriteLockGuard& other) = delete;
WriteLockGuard& operator=(const WriteLockGuard& other) = delete;
RWLock& rw_lock_;
};
} // namespace base
} // namespace cybertron
} // namespace apollo
#endif // CYBERTRON_BASE_RW_LOCK_GUARD_H_
......@@ -31,9 +31,6 @@ namespace cybertron {
namespace common {
using ::apollo::cybertron::base::AtomicHashMap;
using ::apollo::cybertron::base::AtomicRWLock;
using ::apollo::cybertron::base::ReadLockGuard;
using ::apollo::cybertron::base::WriteLockGuard;
using ::apollo::cybertron::proto::CyberConfig;
class GlobalData {
......
......@@ -40,9 +40,9 @@ class ChannelBuffer {
ChannelBuffer(uint64_t channel_id, BufferType* buffer)
: channel_id_(channel_id), buffer_(buffer) {}
bool Fetch(uint64_t* index, std::shared_ptr<T>& m);
bool Fetch(uint64_t* index, std::shared_ptr<T>& m); // NOLINT
bool Latest(std::shared_ptr<T>& m);
bool Latest(std::shared_ptr<T>& m); // NOLINT
bool FetchMulti(uint64_t fetch_size, std::vector<std::shared_ptr<T>>* vec);
......@@ -55,7 +55,8 @@ class ChannelBuffer {
};
template <typename T>
bool ChannelBuffer<T>::Fetch(uint64_t* index, std::shared_ptr<T>& m) {
bool ChannelBuffer<T>::Fetch(uint64_t* index,
std::shared_ptr<T>& m) { // NOLINT
std::lock_guard<std::mutex> lock(buffer_->Mutex());
if (buffer_->Empty()) {
return false;
......@@ -77,7 +78,7 @@ bool ChannelBuffer<T>::Fetch(uint64_t* index, std::shared_ptr<T>& m) {
}
template <typename T>
bool ChannelBuffer<T>::Latest(std::shared_ptr<T>& m) {
bool ChannelBuffer<T>::Latest(std::shared_ptr<T>& m) { // NOLINT
std::lock_guard<std::mutex> lock(buffer_->Mutex());
if (buffer_->Empty()) {
return false;
......
......@@ -59,8 +59,8 @@ class DataVisitor : public DataVisitorBase {
buffer_m0_, buffer_m1_, buffer_m2_, buffer_m3_);
}
bool TryFetch(std::shared_ptr<M0>& m0, std::shared_ptr<M1>& m1,
std::shared_ptr<M2>& m2, std::shared_ptr<M3>& m3) {
bool TryFetch(std::shared_ptr<M0>& m0, std::shared_ptr<M1>& m1, // NOLINT
std::shared_ptr<M2>& m2, std::shared_ptr<M3>& m3) { // NOLINT
if (data_fusion_->Fusion(&next_msg_index_, m0, m1, m2, m3)) {
next_msg_index_++;
return true;
......@@ -100,8 +100,8 @@ class DataVisitor<M0, M1, M2, NullType> : public DataVisitorBase {
}
}
bool TryFetch(std::shared_ptr<M0>& m0, std::shared_ptr<M1>& m1,
std::shared_ptr<M2>& m2) {
bool TryFetch(std::shared_ptr<M0>& m0, std::shared_ptr<M1>& m1, // NOLINT
std::shared_ptr<M2>& m2) { // NOLINT
if (data_fusion_->Fusion(&next_msg_index_, m0, m1, m2)) {
next_msg_index_++;
return true;
......@@ -136,7 +136,7 @@ class DataVisitor<M0, M1, NullType, NullType> : public DataVisitorBase {
}
}
bool TryFetch(std::shared_ptr<M0>& m0, std::shared_ptr<M1>& m1) {
bool TryFetch(std::shared_ptr<M0>& m0, std::shared_ptr<M1>& m1) { // NOLINT
if (data_fusion_->Fusion(&next_msg_index_, m0, m1)) {
next_msg_index_++;
return true;
......@@ -166,7 +166,7 @@ class DataVisitor<M0, NullType, NullType, NullType> : public DataVisitorBase {
data_notifier_->AddNotifier(buffer_.ChannelId(), notifier_);
}
bool TryFetch(std::shared_ptr<M0>& m0) {
bool TryFetch(std::shared_ptr<M0>& m0) { // NOLINT
if (buffer_.Fetch(&next_msg_index_, m0)) {
next_msg_index_++;
return true;
......
......@@ -36,9 +36,10 @@ template <typename M0, typename M1 = NullType, typename M2 = NullType,
class DataFusion {
public:
virtual ~DataFusion() {}
virtual bool Fusion(uint64_t* index, std::shared_ptr<M0>& m0,
std::shared_ptr<M1>& m1, std::shared_ptr<M2>& m2,
std::shared_ptr<M3>& m3) = 0;
virtual bool Fusion(uint64_t* index, std::shared_ptr<M0>& m0, // NOLINT
std::shared_ptr<M1>& m1, // NOLINT
std::shared_ptr<M2>& m2, // NOLINT
std::shared_ptr<M3>& m3) = 0; // NOLINT
};
template <typename M0, typename M1, typename M2>
......@@ -46,8 +47,9 @@ class DataFusion<M0, M1, M2, NullType> {
public:
virtual ~DataFusion() {}
virtual bool Fusion(uint64_t* index, std::shared_ptr<M0>& m0,
std::shared_ptr<M1>& m1, std::shared_ptr<M2>& m2) = 0;
virtual bool Fusion(uint64_t* index, std::shared_ptr<M0>& m0, // NOLINT
std::shared_ptr<M1>& m1, // NOLINT
std::shared_ptr<M2>& m2) = 0; // NOLINT
};
template <typename M0, typename M1>
......@@ -55,8 +57,8 @@ class DataFusion<M0, M1, NullType, NullType> {
public:
virtual ~DataFusion() {}
virtual bool Fusion(uint64_t* index, std::shared_ptr<M0>& m0,
std::shared_ptr<M1>& m1) = 0;
virtual bool Fusion(uint64_t* index, std::shared_ptr<M0>& m0, // NOLINT
std::shared_ptr<M1>& m1) = 0; // NOLINT
};
} // namespace fusion
......
......@@ -17,10 +17,10 @@
#include "cybertron/logger/async_logger.h"
#include "cybertron/logger/log_file_object.h"
#include <stdlib.h>
#include <string>
#include <thread>
#include <unordered_map>
#include "stdlib.h"
namespace apollo {
namespace cybertron {
......
......@@ -34,7 +34,7 @@ bool ProcessorContext::Pop(uint64_t croutine_id,
std::future<std::shared_ptr<CRoutine>>& fut) {
std::promise<std::shared_ptr<CRoutine>> prom;
fut = prom.get_future();
WriteLockGuard lg(rw_lock_);
WriteLockGuard<AtomicRWLock> lg(rw_lock_);
if (cr_map_.erase(croutine_id) != 0) {
pop_list_.Set(croutine_id, std::move(prom));
return true;
......@@ -44,7 +44,7 @@ bool ProcessorContext::Pop(uint64_t croutine_id,
void ProcessorContext::Push(const std::shared_ptr<CRoutine>& cr) {
{
WriteLockGuard lg(rw_lock_);
WriteLockGuard<AtomicRWLock> lg(rw_lock_);
if (cr_map_.find(cr->Id()) != cr_map_.end() || pop_list_.Has(cr->Id())) {
return;
}
......@@ -54,7 +54,7 @@ void ProcessorContext::Push(const std::shared_ptr<CRoutine>& cr) {
}
void ProcessorContext::RemoveCRoutine(uint64_t croutine_id) {
WriteLockGuard lg(rw_lock_);
WriteLockGuard<AtomicRWLock> lg(rw_lock_);
auto it = cr_map_.find(croutine_id);
if (it != cr_map_.end()) {
it->second->Stop();
......@@ -63,7 +63,7 @@ void ProcessorContext::RemoveCRoutine(uint64_t croutine_id) {
}
void ProcessorContext::NotifyProcessor(uint64_t routine_id) {
ReadLockGuard lg(rw_lock_);
ReadLockGuard<AtomicRWLock> lg(rw_lock_);
if (cr_map_.find(routine_id) == cr_map_.end()) {
return;
}
......@@ -95,14 +95,14 @@ void ProcessorContext::ShutDown() {
void ProcessorContext::PrintStatistics() {}
void ProcessorContext::PrintCRoutineStats() {
ReadLockGuard lg(rw_lock_);
ReadLockGuard<AtomicRWLock> lg(rw_lock_);
for (auto it = cr_map_.begin(); it != cr_map_.end(); ++it) {
it->second->PrintStatistics();
}
}
void ProcessorContext::UpdateProcessStat(ProcessorStat* stat) {
ReadLockGuard lg(rw_lock_);
ReadLockGuard<AtomicRWLock> lg(rw_lock_);
for (auto it = cr_map_.begin(); it != cr_map_.end(); ++it) {
auto s = it->second->GetStatistics();
stat->exec_time += s.exec_time;
......
......@@ -90,7 +90,7 @@ class ProcessorContext {
bool ProcessorContext::GetState(const uint64_t& routine_id,
RoutineState* state) {
ReadLockGuard lg(rw_lock_);
ReadLockGuard<AtomicRWLock> lg(rw_lock_);
auto it = cr_map_.find(routine_id);
if (it != cr_map_.end()) {
*state = it->second->State();
......@@ -101,7 +101,7 @@ bool ProcessorContext::GetState(const uint64_t& routine_id,
bool ProcessorContext::SetState(const uint64_t& routine_id,
const RoutineState& state) {
ReadLockGuard lg(rw_lock_);
ReadLockGuard<AtomicRWLock> lg(rw_lock_);
auto it = cr_map_.find(routine_id);
if (it != cr_map_.end()) {
it->second->SetState(state);
......
......@@ -100,7 +100,7 @@ void Graph::Insert(const Edge& e) {
if (!e.IsValid()) {
return;
}
WriteLockGuard lock(rw_lock_);
WriteLockGuard<AtomicRWLock> lock(rw_lock_);
auto& e_v = e.value();
if (edges_.find(e_v) == edges_.end()) {
edges_[e_v] = RelatedVertices();
......@@ -118,7 +118,7 @@ void Graph::Delete(const Edge& e) {
if (!e.IsValid()) {
return;
}
WriteLockGuard lock(rw_lock_);
WriteLockGuard<AtomicRWLock> lock(rw_lock_);
auto& e_v = e.value();
if (edges_.find(e_v) == edges_.end()) {
return;
......@@ -133,7 +133,7 @@ void Graph::Delete(const Edge& e) {
}
uint32_t Graph::GetNumOfEdge() {
ReadLockGuard lock(rw_lock_);
ReadLockGuard<AtomicRWLock> lock(rw_lock_);
uint32_t num = 0;
for (auto& item : list_) {
num += item.second.size();
......@@ -145,7 +145,7 @@ FlowDirection Graph::GetDirectionOf(const Vertice& lhs, const Vertice& rhs) {
if (lhs.IsDummy() || rhs.IsDummy()) {
return UNREACHABLE;
}
ReadLockGuard lock(rw_lock_);
ReadLockGuard<AtomicRWLock> lock(rw_lock_);
if (list_.count(lhs.GetKey()) == 0 || list_.count(rhs.GetKey()) == 0) {
return UNREACHABLE;
}
......
......@@ -27,7 +27,7 @@ namespace service_discovery {
bool MultiValueWarehouse::Add(uint64_t key, const RolePtr& role,
bool ignore_if_exist) {
WriteLockGuard lock(rw_lock_);
WriteLockGuard<AtomicRWLock> lock(rw_lock_);
if (!ignore_if_exist) {
if (roles_.find(key) != roles_.end()) {
return false;
......@@ -39,22 +39,22 @@ bool MultiValueWarehouse::Add(uint64_t key, const RolePtr& role,
}
void MultiValueWarehouse::Clear() {
WriteLockGuard lock(rw_lock_);
WriteLockGuard<AtomicRWLock> lock(rw_lock_);
roles_.clear();
}
std::size_t MultiValueWarehouse::Size() {
ReadLockGuard lock(rw_lock_);
ReadLockGuard<AtomicRWLock> lock(rw_lock_);
return roles_.size();
}
void MultiValueWarehouse::Remove(uint64_t key) {
WriteLockGuard lock(rw_lock_);
WriteLockGuard<AtomicRWLock> lock(rw_lock_);
roles_.erase(key);
}
void MultiValueWarehouse::Remove(uint64_t key, const RolePtr& role) {
WriteLockGuard lock(rw_lock_);
WriteLockGuard<AtomicRWLock> lock(rw_lock_);
auto range = roles_.equal_range(key);
for (auto it = range.first; it != range.second;) {
if (it->second->Match(role->attributes())) {
......@@ -66,7 +66,7 @@ void MultiValueWarehouse::Remove(uint64_t key, const RolePtr& role) {
}
void MultiValueWarehouse::Remove(const RoleAttributes& target_attr) {
WriteLockGuard lock(rw_lock_);
WriteLockGuard<AtomicRWLock> lock(rw_lock_);
for (auto it = roles_.begin(); it != roles_.end();) {
auto curr_role = it->second;
if (curr_role->Match(target_attr)) {
......@@ -84,7 +84,7 @@ bool MultiValueWarehouse::Search(uint64_t key) {
bool MultiValueWarehouse::Search(uint64_t key, RolePtr* first_matched_role) {
RETURN_VAL_IF_NULL(first_matched_role, false);
ReadLockGuard lock(rw_lock_);
ReadLockGuard<AtomicRWLock> lock(rw_lock_);
auto search = roles_.find(key);
if (search == roles_.end()) {
return false;
......@@ -138,7 +138,7 @@ bool MultiValueWarehouse::Search(const RoleAttributes& target_attr) {
bool MultiValueWarehouse::Search(const RoleAttributes& target_attr,
RolePtr* first_matched_role) {
RETURN_VAL_IF_NULL(first_matched_role, false);
ReadLockGuard lock(rw_lock_);
ReadLockGuard<AtomicRWLock> lock(rw_lock_);
for (auto& item : roles_) {
if (item.second->Match(target_attr)) {
*first_matched_role = item.second;
......@@ -163,7 +163,7 @@ bool MultiValueWarehouse::Search(const RoleAttributes& target_attr,
std::vector<RolePtr>* matched_roles) {
RETURN_VAL_IF_NULL(matched_roles, false);
bool find = false;
ReadLockGuard lock(rw_lock_);
ReadLockGuard<AtomicRWLock> lock(rw_lock_);
for (auto& item : roles_) {
if (item.second->Match(target_attr)) {
matched_roles->emplace_back(item.second);
......@@ -178,7 +178,7 @@ bool MultiValueWarehouse::Search(
std::vector<RoleAttributes>* matched_roles_attr) {
RETURN_VAL_IF_NULL(matched_roles_attr, false);
bool find = false;
ReadLockGuard lock(rw_lock_);
ReadLockGuard<AtomicRWLock> lock(rw_lock_);
for (auto& item : roles_) {
if (item.second->Match(target_attr)) {
matched_roles_attr->emplace_back(item.second->attributes());
......@@ -190,7 +190,7 @@ bool MultiValueWarehouse::Search(
void MultiValueWarehouse::GetAllRoles(std::vector<RolePtr>* roles) {
RETURN_IF_NULL(roles);
ReadLockGuard lock(rw_lock_);
ReadLockGuard<AtomicRWLock> lock(rw_lock_);
for (auto& item : roles_) {
roles->emplace_back(item.second);
}
......@@ -198,7 +198,7 @@ void MultiValueWarehouse::GetAllRoles(std::vector<RolePtr>* roles) {
void MultiValueWarehouse::GetAllRoles(std::vector<RoleAttributes>* roles_attr) {
RETURN_IF_NULL(roles_attr);
ReadLockGuard lock(rw_lock_);
ReadLockGuard<AtomicRWLock> lock(rw_lock_);
for (auto& item : roles_) {
roles_attr->emplace_back(item.second->attributes());
}
......
......@@ -24,7 +24,7 @@ namespace service_discovery {
bool SingleValueWarehouse::Add(uint64_t key, const RolePtr& role,
bool ignore_if_exist) {
WriteLockGuard lock(rw_lock_);
WriteLockGuard<AtomicRWLock> lock(rw_lock_);
if (!ignore_if_exist) {
if (roles_.find(key) != roles_.end()) {
return false;
......@@ -35,22 +35,22 @@ bool SingleValueWarehouse::Add(uint64_t key, const RolePtr& role,
}
void SingleValueWarehouse::Clear() {
WriteLockGuard lock(rw_lock_);
WriteLockGuard<AtomicRWLock> lock(rw_lock_);
roles_.clear();
}
std::size_t SingleValueWarehouse::Size() {
ReadLockGuard lock(rw_lock_);
ReadLockGuard<AtomicRWLock> lock(rw_lock_);
return roles_.size();
}
void SingleValueWarehouse::Remove(uint64_t key) {
WriteLockGuard lock(rw_lock_);
WriteLockGuard<AtomicRWLock> lock(rw_lock_);
roles_.erase(key);
}
void SingleValueWarehouse::Remove(uint64_t key, const RolePtr& role) {
WriteLockGuard lock(rw_lock_);
WriteLockGuard<AtomicRWLock> lock(rw_lock_);
auto search = roles_.find(key);
if (search == roles_.end()) {
return;
......@@ -62,7 +62,7 @@ void SingleValueWarehouse::Remove(uint64_t key, const RolePtr& role) {
}
void SingleValueWarehouse::Remove(const RoleAttributes& target_attr) {
WriteLockGuard lock(rw_lock_);
WriteLockGuard<AtomicRWLock> lock(rw_lock_);
for (auto it = roles_.begin(); it != roles_.end();) {
auto curr_role = it->second;
if (curr_role->Match(target_attr)) {
......@@ -80,7 +80,7 @@ bool SingleValueWarehouse::Search(uint64_t key) {
bool SingleValueWarehouse::Search(uint64_t key, RolePtr* first_matched_role) {
RETURN_VAL_IF_NULL(first_matched_role, false);
ReadLockGuard lock(rw_lock_);
ReadLockGuard<AtomicRWLock> lock(rw_lock_);
auto search = roles_.find(key);
if (search == roles_.end()) {
return false;
......@@ -130,7 +130,7 @@ bool SingleValueWarehouse::Search(const RoleAttributes& target_attr) {
bool SingleValueWarehouse::Search(const RoleAttributes& target_attr,
RolePtr* first_matched_role) {
RETURN_VAL_IF_NULL(first_matched_role, false);
ReadLockGuard lock(rw_lock_);
ReadLockGuard<AtomicRWLock> lock(rw_lock_);
for (auto& item : roles_) {
if (item.second->Match(target_attr)) {
*first_matched_role = item.second;
......@@ -155,7 +155,7 @@ bool SingleValueWarehouse::Search(const RoleAttributes& target_attr,
std::vector<RolePtr>* matched_roles) {
RETURN_VAL_IF_NULL(matched_roles, false);
bool find = false;
ReadLockGuard lock(rw_lock_);
ReadLockGuard<AtomicRWLock> lock(rw_lock_);
for (auto& item : roles_) {
if (item.second->Match(target_attr)) {
matched_roles->emplace_back(item.second);
......@@ -170,7 +170,7 @@ bool SingleValueWarehouse::Search(
std::vector<RoleAttributes>* matched_roles_attr) {
RETURN_VAL_IF_NULL(matched_roles_attr, false);
bool find = false;
ReadLockGuard lock(rw_lock_);
ReadLockGuard<AtomicRWLock> lock(rw_lock_);
for (auto& item : roles_) {
if (item.second->Match(target_attr)) {
matched_roles_attr->emplace_back(item.second->attributes());
......@@ -182,7 +182,7 @@ bool SingleValueWarehouse::Search(
void SingleValueWarehouse::GetAllRoles(std::vector<RolePtr>* roles) {
RETURN_IF_NULL(roles);
ReadLockGuard lock(rw_lock_);
ReadLockGuard<AtomicRWLock> lock(rw_lock_);
for (auto& item : roles_) {
roles->emplace_back(item.second);
}
......@@ -191,7 +191,7 @@ void SingleValueWarehouse::GetAllRoles(std::vector<RolePtr>* roles) {
void SingleValueWarehouse::GetAllRoles(
std::vector<RoleAttributes>* roles_attr) {
RETURN_IF_NULL(roles_attr);
ReadLockGuard lock(rw_lock_);
ReadLockGuard<AtomicRWLock> lock(rw_lock_);
for (auto& item : roles_) {
roles_attr->emplace_back(item.second->attributes());
}
......
......@@ -13,7 +13,7 @@ TEST(AtomicRWLockTest, read_lock) {
AtomicRWLock lock;
EXPECT_EQ(0, lock.lock_num_.load());
auto f = [&]() {
ReadLockGuard lg(lock);
ReadLockGuard<AtomicRWLock> lg(lock);
count++;
thread_init++;
while (flag) {
......@@ -31,16 +31,16 @@ TEST(AtomicRWLockTest, read_lock) {
t1.join();
t2.join();
{
ReadLockGuard lg1(lock);
ReadLockGuard<AtomicRWLock> lg1(lock);
EXPECT_EQ(1, lock.lock_num_.load());
{
ReadLockGuard lg2(lock);
ReadLockGuard<AtomicRWLock> lg2(lock);
EXPECT_EQ(2, lock.lock_num_.load());
{
ReadLockGuard lg3(lock);
ReadLockGuard<AtomicRWLock> lg3(lock);
EXPECT_EQ(3, lock.lock_num_.load());
{
ReadLockGuard lg4(lock);
ReadLockGuard<AtomicRWLock> lg4(lock);
EXPECT_EQ(4, lock.lock_num_.load());
}
EXPECT_EQ(3, lock.lock_num_.load());
......@@ -59,7 +59,7 @@ TEST(AtomicRWLockTest, write_lock) {
AtomicRWLock lock(false);
auto f = [&]() {
thread_run++;
WriteLockGuard lg(lock);
WriteLockGuard<AtomicRWLock> lg(lock);
count++;
while (flag) {
std::this_thread::yield();
......@@ -77,13 +77,13 @@ TEST(AtomicRWLockTest, write_lock) {
t2.join();
{
WriteLockGuard lg1(lock);
WriteLockGuard<AtomicRWLock> lg1(lock);
EXPECT_EQ(-1, lock.lock_num_.load());
{
WriteLockGuard lg2(lock);
WriteLockGuard<AtomicRWLock> lg2(lock);
EXPECT_EQ(-2, lock.lock_num_.load());
{
ReadLockGuard lg3(lock);
ReadLockGuard<AtomicRWLock> lg3(lock);
EXPECT_EQ(-2, lock.lock_num_.load());
}
}
......
......@@ -359,7 +359,7 @@ bool Player::GenerateTask(const SingleMessage& msg, const uint64_t& plus_time) {
bool Player::Start() {
if (!is_initialized_) {
AERROR << "NOT initializated";
AERROR << "NOT initialized";
return false;
}
if (is_started_) {
......@@ -367,7 +367,7 @@ bool Player::Start() {
return false;
}
is_started_ = true;
std::cout << "\nplease wait for loading and playing back record...\n"
std::cout << "\nPlease wait for loading and playing back record...\n"
<< "Hit Ctrl+C to stop replay, or Space to pause.\n"
<< std::endl;
std::ios::fmtflags before(std::cout.flags());
......
......@@ -40,7 +40,7 @@ bool Spliter::Proc() {
<< begin_time_ << "end_time_: " << end_time_;
return false;
}
for (auto channel_name : white_channels_) {
for (const auto& channel_name : white_channels_) {
if (std::find(black_channels_.begin(), black_channels_.end(),
channel_name) != black_channels_.end()) {
AERROR << "find channel in both of white list and black list, channel: "
......
......@@ -43,7 +43,7 @@ void ShmDispatcher::Shutdown() {
void ShmDispatcher::AddSegment(const RoleAttributes& self_attr) {
uint64_t channel_id = self_attr.channel_id();
WriteLockGuard lock(segments_lock_);
WriteLockGuard<AtomicRWLock> lock(segments_lock_);
if (segments_.count(channel_id) > 0) {
return;
}
......@@ -91,7 +91,7 @@ void ShmDispatcher::ThreadFunc() {
uint32_t block_index = readable_info.block_index();
{
ReadLockGuard lock(segments_lock_);
ReadLockGuard<AtomicRWLock> lock(segments_lock_);
if (segments_.count(channel_id) > 0) {
if (!segments_[channel_id]->Read(block_index, msg_str_ptr.get(),
&msg_info_str)) {
......
......@@ -101,14 +101,14 @@ void ListenerHandler<MessageT>::Connect(uint64_t self_id,
if (!connection.IsConnected()) {
return;
}
WriteLockGuard lock(rw_lock_);
WriteLockGuard<AtomicRWLock> lock(rw_lock_);
signal_conns_[self_id] = connection;
}
template <typename MessageT>
void ListenerHandler<MessageT>::Connect(uint64_t self_id, uint64_t oppo_id,
const Listener& listener) {
WriteLockGuard lock(rw_lock_);
WriteLockGuard<AtomicRWLock> lock(rw_lock_);
if (signals_.find(oppo_id) == signals_.end()) {
signals_[oppo_id] = std::make_shared<MessageSignal>();
}
......@@ -125,7 +125,7 @@ void ListenerHandler<MessageT>::Connect(uint64_t self_id, uint64_t oppo_id,
template <typename MessageT>
void ListenerHandler<MessageT>::Disconnect(uint64_t self_id) {
WriteLockGuard lock(rw_lock_);
WriteLockGuard<AtomicRWLock> lock(rw_lock_);
if (signal_conns_.find(self_id) == signal_conns_.end()) {
return;
}
......@@ -135,7 +135,7 @@ void ListenerHandler<MessageT>::Disconnect(uint64_t self_id) {
template <typename MessageT>
void ListenerHandler<MessageT>::Disconnect(uint64_t self_id, uint64_t oppo_id) {
WriteLockGuard lock(rw_lock_);
WriteLockGuard<AtomicRWLock> lock(rw_lock_);
if (signals_conns_.find(oppo_id) == signals_conns_.end()) {
return;
}
......@@ -151,7 +151,7 @@ void ListenerHandler<MessageT>::Run(const Message& msg,
const MessageInfo& msg_info) {
signal_(msg, msg_info);
uint64_t oppo_id = msg_info.sender_id().HashValue();
ReadLockGuard lock(rw_lock_);
ReadLockGuard<AtomicRWLock> lock(rw_lock_);
if (signals_.find(oppo_id) == signals_.end()) {
return;
}
......
......@@ -31,8 +31,7 @@ Block::Block()
Block::~Block() {}
bool Block::TryLockForWrite() {
// std::lock_guard<std::mutex> lock(read_write_mutex_);
WriteLockGuard lock(read_write_mutex_);
WriteLockGuard<AtomicRWLock> lock(read_write_mutex_);
if (is_writing_.load()) {
ADEBUG << "block is writing.";
return false;
......@@ -45,8 +44,7 @@ bool Block::TryLockForWrite() {
}
bool Block::TryLockForRead() {
// std::lock_guard<std::mutex> lock(read_write_mutex_);
ReadLockGuard lock(read_write_mutex_);
ReadLockGuard<AtomicRWLock> lock(read_write_mutex_);
if (is_writing_.load()) {
ADEBUG << "block is writing.";
return false;
......
#include <chrono>
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
#include <thread>
#include "cybertron/base/atomic_rw_lock.h"
#include "cybertron/base/reentrant_rw_lock.h"
using namespace apollo::cybertron::base;
// using AtomicRWLock = std::mutex;
// using ReadLockGuard = std::lock_guard<std::mutex>;
// using WriteLockGuard = std::lock_guard<std::mutex>;
using apollo::cybertron::base::AtomicRWLock;
using apollo::cybertron::base::ReadLockGuard;
using apollo::cybertron::base::ReentrantRWLock;
using apollo::cybertron::base::WriteLockGuard;
int x = 0;
AtomicRWLock rw_lock;
ReentrantRWLock reentrant_lock;
void read_lock() {
ReadLockGuard<AtomicRWLock> lock(rw_lock);
std::cout << "read lock x: " << x << std::endl;
}
void readlock() {
ReadLockGuard lock(rw_lock);
// std::cout << "read lock x: " << x << std::endl;
usleep(1);
void write_lock() {
WriteLockGuard<AtomicRWLock> lock(rw_lock);
std::cout << "write lock x: " << ++x << std::endl;
}
void writelock() {
WriteLockGuard lock(rw_lock);
x += 1;
// std::cout << "write lock x: " << x << std::endl;
// usleep(1);
readlock();
void reentrant_read_lock() {
ReadLockGuard<ReentrantRWLock> lock(reentrant_lock);
std::cout << "read lock x: " << x << std::endl;
}
void readloop() {
for (int i = 0; i < 10000; ++i) {
readlock();
}
void reentrant_write_lock() {
WriteLockGuard<ReentrantRWLock> lock(reentrant_lock);
std::cout << "write lock x: " << x++ << std::endl;
}
void writeloop() {
for (int i = 0; i < 10000; ++i) {
writelock();
}
void reentrant_read_write() {
WriteLockGuard<AtomicRWLock> lock(rw_lock);
std::cout << "write lock x: " << x++ << std::endl;
reentrant_read_lock();
reentrant_write_lock();
}
int main() {
auto start = std::chrono::steady_clock::now();
std::thread rt[30];
for (int i = 0; i < 30; ++i) {
rt[i] = std::thread(readloop);
}
std::thread t(writeloop);
t.join();
for (int i = 0; i < 30; ++i) {
rt[i].join();
}
auto end = std::chrono::steady_clock::now();
std::chrono::duration<double> diff = end - start;
std::cout << std::fixed << diff.count() << " s\n";
read_lock();
write_lock();
reentrant_read_write();
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册