提交 2cfe009e 编写于 作者: G gruminions 提交者: Dengchengliang

framework: 1.add Reset API 2.adjust code implementation

上级 0132d919
......@@ -34,6 +34,8 @@ class BlockerBase {
public:
virtual ~BlockerBase() = default;
virtual void Reset() = 0;
virtual void ClearObserved() = 0;
virtual void ClearPublished() = 0;
virtual void Observe() = 0;
virtual bool IsObservedEmpty() const = 0;
......@@ -60,6 +62,8 @@ struct BlockerAttr {
template <typename T>
class Blocker : public BlockerBase {
friend class BlockerManager;
public:
using MessageType = T;
using MessagePtr = std::shared_ptr<T>;
......@@ -74,6 +78,7 @@ class Blocker : public BlockerBase {
void Publish(const MessageType& msg);
void Publish(const MessagePtr& msg);
void ClearObserved() override;
void ClearPublished() override;
void Observe() override;
bool IsObservedEmpty() const override;
......@@ -95,10 +100,10 @@ class Blocker : public BlockerBase {
const std::string& channel_name() const override;
private:
void Reset() override;
void Enqueue(const MessagePtr& msg);
void Notify(const MessagePtr& msg);
bool is_full_;
BlockerAttr attr_;
MessageQueue observed_msg_queue_;
MessageQueue published_msg_queue_;
......@@ -111,8 +116,7 @@ class Blocker : public BlockerBase {
};
template <typename T>
Blocker<T>::Blocker(const BlockerAttr& attr)
: is_full_(false), attr_(attr), dummy_msg_() {}
Blocker<T>::Blocker(const BlockerAttr& attr) : attr_(attr), dummy_msg_() {}
template <typename T>
Blocker<T>::~Blocker() {
......@@ -132,6 +136,25 @@ void Blocker<T>::Publish(const MessagePtr& msg) {
Notify(msg);
}
template <typename T>
void Blocker<T>::Reset() {
{
std::lock_guard<std::mutex> lock(msg_mutex_);
observed_msg_queue_.clear();
published_msg_queue_.clear();
}
{
std::lock_guard<std::mutex> lock(cb_mutex_);
published_callbacks_.clear();
}
}
template <typename T>
void Blocker<T>::ClearObserved() {
std::lock_guard<std::mutex> lock(msg_mutex_);
observed_msg_queue_.clear();
}
template <typename T>
void Blocker<T>::ClearPublished() {
std::lock_guard<std::mutex> lock(msg_mutex_);
......@@ -227,9 +250,6 @@ size_t Blocker<T>::capacity() const {
template <typename T>
void Blocker<T>::set_capacity(size_t capacity) {
std::lock_guard<std::mutex> lock(msg_mutex_);
if (capacity > attr_.capacity) {
is_full_ = false;
}
attr_.capacity = capacity;
while (published_msg_queue_.size() > capacity) {
published_msg_queue_.pop_back();
......@@ -247,16 +267,9 @@ void Blocker<T>::Enqueue(const MessagePtr& msg) {
return;
}
std::lock_guard<std::mutex> lock(msg_mutex_);
if (is_full_) {
published_msg_queue_.pop_back();
}
published_msg_queue_.push_front(msg);
if (!is_full_) {
if (published_msg_queue_.size() >= attr_.capacity) {
is_full_ = true;
}
while (published_msg_queue_.size() > attr_.capacity) {
published_msg_queue_.pop_back();
}
}
......
......@@ -31,6 +31,14 @@ void BlockerManager::Observe() {
}
}
void BlockerManager::Reset() {
std::lock_guard<std::mutex> lock(blocker_mutex_);
for (auto& item : blockers_) {
item.second->Reset();
}
blockers_.clear();
}
} // namespace blocker
} // namespace cybertron
} // namespace apollo
......@@ -65,6 +65,7 @@ class BlockerManager {
std::shared_ptr<Blocker<T>> GetOrCreateBlocker(const BlockerAttr& attr);
void Observe();
void Reset();
private:
BlockerManager();
......
......@@ -21,8 +21,8 @@
#include <list>
#include <memory>
#include "cybertron/blocker/blocker.h"
#include "cybertron/blocker/blocker_manager.h"
#include "cybertron/common/log.h"
#include "cybertron/node/reader.h"
#include "cybertron/time/time.h"
......@@ -61,13 +61,12 @@ class IntraReader : public apollo::cybertron::Reader<MessageT> {
void OnMessage(const MessagePtr& msg_ptr);
Callback msg_callback_;
std::shared_ptr<Blocker<MessageT>> blocker_;
};
template <typename MessageT>
IntraReader<MessageT>::IntraReader(const proto::RoleAttributes& attr,
const Callback& callback)
: Reader<MessageT>(attr), msg_callback_(callback), blocker_(nullptr) {}
: Reader<MessageT>(attr), msg_callback_(callback) {}
template <typename MessageT>
IntraReader<MessageT>::~IntraReader() {
......@@ -79,15 +78,11 @@ bool IntraReader<MessageT>::Init() {
if (this->init_.exchange(true)) {
return true;
}
BlockerAttr attr(this->role_attr_.qos_profile().depth(),
this->role_attr_.channel_name());
blocker_ = BlockerManager::Instance()->GetOrCreateBlocker<MessageT>(attr);
if (blocker_ == nullptr) {
return false;
}
return blocker_->Subscribe(this->role_attr_.node_name(),
std::bind(&IntraReader<MessageT>::OnMessage, this,
std::placeholders::_1));
return BlockerManager::Instance()->Subscribe<MessageT>(
this->role_attr_.channel_name(), this->role_attr_.qos_profile().depth(),
this->role_attr_.node_name(),
std::bind(&IntraReader<MessageT>::OnMessage, this,
std::placeholders::_1));
}
template <typename MessageT>
......@@ -95,92 +90,108 @@ void IntraReader<MessageT>::Shutdown() {
if (!this->init_.exchange(false)) {
return;
}
blocker_->Unsubscribe(this->role_attr_.node_name());
blocker_ = nullptr;
BlockerManager::Instance()->Unsubscribe<MessageT>(
this->role_attr_.channel_name(), this->role_attr_.node_name());
}
template <typename MessageT>
void IntraReader<MessageT>::ClearData() {
if (blocker_ == nullptr) {
return;
auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
this->role_attr_.channel_name());
if (blocker != nullptr) {
blocker->ClearObserved();
blocker->ClearPublished();
}
blocker_->ClearPublished();
}
template <typename MessageT>
void IntraReader<MessageT>::Observe() {
if (blocker_ == nullptr) {
return;
auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
this->role_attr_.channel_name());
if (blocker != nullptr) {
blocker->Observe();
}
blocker_->Observe();
}
template <typename MessageT>
bool IntraReader<MessageT>::Empty() const {
if (blocker_ == nullptr) {
return true;
auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
this->role_attr_.channel_name());
if (blocker != nullptr) {
return blocker->IsObservedEmpty();
}
return blocker_->IsObservedEmpty();
return true;
}
template <typename MessageT>
bool IntraReader<MessageT>::HasReceived() const {
if (blocker_ == nullptr) {
return false;
auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
this->role_attr_.channel_name());
if (blocker != nullptr) {
return !blocker->IsPublishedEmpty();
}
return !blocker_->IsPublishedEmpty();
return false;
}
template <typename MessageT>
void IntraReader<MessageT>::Enqueue(const std::shared_ptr<MessageT>& msg) {
if (blocker_ == nullptr) {
return;
}
blocker_->Publish(msg);
BlockerManager::Instance()->Publish<MessageT>(this->role_attr_.channel_name(),
msg);
}
template <typename MessageT>
void IntraReader<MessageT>::SetHistoryDepth(const uint32_t& depth) {
if (blocker_ == nullptr) {
return;
auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
this->role_attr_.channel_name());
if (blocker != nullptr) {
blocker->set_capacity(depth);
}
blocker_->set_capacity(depth);
}
template <typename MessageT>
uint32_t IntraReader<MessageT>::GetHistoryDepth() const {
if (blocker_ == nullptr) {
return 0;
auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
this->role_attr_.channel_name());
if (blocker != nullptr) {
return blocker->capacity();
}
return blocker_->capacity();
return 0;
}
template <typename MessageT>
std::shared_ptr<MessageT> IntraReader<MessageT>::GetLatestObserved() const {
if (blocker_ == nullptr) {
return nullptr;
auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
this->role_attr_.channel_name());
if (blocker != nullptr) {
return blocker->GetLatestObservedPtr();
}
return blocker_->GetLatestObservedPtr();
return nullptr;
}
template <typename MessageT>
std::shared_ptr<MessageT> IntraReader<MessageT>::GetOldestObserved() const {
if (blocker_ == nullptr) {
return nullptr;
auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
this->role_attr_.channel_name());
if (blocker != nullptr) {
return blocker->GetOldestObservedPtr();
}
return blocker_->GetOldestObservedPtr();
return nullptr;
}
template <typename MessageT>
auto IntraReader<MessageT>::Begin() const -> Iterator {
assert(blocker_ != nullptr);
return blocker_->ObservedBegin();
auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
this->role_attr_.channel_name());
ACHECK(blocker != nullptr);
return blocker->ObservedBegin();
}
template <typename MessageT>
auto IntraReader<MessageT>::End() const -> Iterator {
assert(blocker_ != nullptr);
return blocker_->ObservedEnd();
auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
this->role_attr_.channel_name());
ACHECK(blocker != nullptr);
return blocker->ObservedBegin();
}
template <typename MessageT>
......
......@@ -19,7 +19,6 @@
#include <memory>
#include "cybertron/blocker/blocker.h"
#include "cybertron/blocker/blocker_manager.h"
#include "cybertron/node/writer.h"
......@@ -31,6 +30,7 @@ template <typename MessageT>
class IntraWriter : public apollo::cybertron::Writer<MessageT> {
public:
using MessagePtr = std::shared_ptr<MessageT>;
using BlockerManagerPtr = std::shared_ptr<BlockerManager>;
explicit IntraWriter(const proto::RoleAttributes& attr);
virtual ~IntraWriter();
......@@ -42,7 +42,7 @@ class IntraWriter : public apollo::cybertron::Writer<MessageT> {
bool Write(const MessagePtr& msg_ptr) override;
private:
std::shared_ptr<Blocker<MessageT>> blocker_;
BlockerManagerPtr blocker_manager_;
};
template <typename MessageT>
......@@ -60,12 +60,7 @@ bool IntraWriter<MessageT>::Init() {
return true;
}
BlockerAttr attr(this->role_attr_.channel_name());
blocker_ = BlockerManager::Instance()->GetOrCreateBlocker<MessageT>(attr);
if (blocker_ == nullptr) {
this->init_.exchange(false);
return false;
}
blocker_manager_ = BlockerManager::Instance();
return true;
}
......@@ -74,7 +69,7 @@ void IntraWriter<MessageT>::Shutdown() {
if (!this->init_.exchange(false)) {
return;
}
blocker_ = nullptr;
blocker_manager_ = nullptr;
}
template <typename MessageT>
......@@ -82,8 +77,8 @@ bool IntraWriter<MessageT>::Write(const MessageT& msg) {
if (!this->init_.load()) {
return false;
}
blocker_->Publish(msg);
return true;
return blocker_manager_->Publish<MessageT>(this->role_attr_.channel_name(),
msg);
}
template <typename MessageT>
......@@ -91,8 +86,8 @@ bool IntraWriter<MessageT>::Write(const MessagePtr& msg_ptr) {
if (!this->init_.load()) {
return false;
}
blocker_->Publish(msg_ptr);
return true;
return blocker_manager_->Publish<MessageT>(this->role_attr_.channel_name(),
msg_ptr);
}
} // namespace blocker
......
......@@ -59,6 +59,10 @@ TEST(BlockerManagerTest, publish) {
EXPECT_FALSE(blocker1->IsObservedEmpty());
EXPECT_FALSE(blocker2->IsObservedEmpty());
blocker_manager.Reset();
auto blocker3 = blocker_manager.GetBlocker<UnitTest>("channel2");
EXPECT_EQ(blocker3, nullptr);
}
TEST(BlockerManagerTest, subscribe) {
......
......@@ -69,8 +69,9 @@ TEST(BlockerTest, publish) {
EXPECT_EQ(latest_published_ptr->case_name(), "publish_2");
blocker.ClearPublished();
blocker.ClearObserved();
EXPECT_TRUE(blocker.IsPublishedEmpty());
EXPECT_FALSE(blocker.IsObservedEmpty());
EXPECT_TRUE(blocker.IsObservedEmpty());
}
TEST(BlockerTest, subscribe) {
......@@ -101,6 +102,17 @@ TEST(BlockerTest, subscribe) {
EXPECT_FALSE(res);
blocker.Reset();
res = blocker.Subscribe(
"BlockerTest1", [&received_msg](const std::shared_ptr<UnitTest>& msg) {
received_msg->CopyFrom(*msg);
});
EXPECT_TRUE(res);
blocker.Publish(msg1);
EXPECT_EQ(received_msg->class_name(), msg1->class_name());
EXPECT_EQ(received_msg->case_name(), msg1->case_name());
res = blocker.Unsubscribe("BlockerTest1");
EXPECT_TRUE(res);
res = blocker.Unsubscribe("BlockerTest1");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册