提交 dd53d485 编写于 作者: G gruminions 提交者: fengqikai1414

framework: rename, 1.change Dispatcher to BlockerManager 2.change Message to Blocker

上级 6046b544
......@@ -27,7 +27,7 @@ add_subdirectory(sensor)
add_subdirectory(third_party)
add_subdirectory(python)
add_subdirectory(cybertron/dispatcher)
add_subdirectory(cybertron/blocker)
file(GLOB CYBERTRON_SRCS
"cybertron/*.cpp"
......@@ -65,7 +65,7 @@ target_link_libraries(cybertron
protobuf
cybertron_proto
cybertron_common
cybertron_dispatcher
cybertron_blocker
fastrtps
fastcdr
-lrt
......
project(cybertron_blocker)
aux_source_directory(${PROJECT_SOURCE_DIR} BLOCKER_SRCS)
add_library(${PROJECT_NAME} ${BLOCKER_SRCS})
install(TARGETS cybertron_blocker LIBRARY DESTINATION lib)
......@@ -14,8 +14,8 @@
* limitations under the License.
*****************************************************************************/
#ifndef CYBERTRON_DISPATCHER_MESSAGE_H_
#define CYBERTRON_DISPATCHER_MESSAGE_H_
#ifndef CYBERTRON_BLOCKER_BLOCKER_H_
#define CYBERTRON_BLOCKER_BLOCKER_H_
#include <assert.h>
#include <stddef.h>
......@@ -29,11 +29,11 @@
namespace apollo {
namespace cybertron {
namespace dispatcher {
namespace blocker {
class MessageBase {
class BlockerBase {
public:
virtual ~MessageBase() = default;
virtual ~BlockerBase() = default;
virtual void ClearPublished() = 0;
virtual void Observe() = 0;
......@@ -46,13 +46,13 @@ class MessageBase {
virtual const std::string& channel_name() const = 0;
};
struct MessageAttr {
MessageAttr() : capacity(10), channel_name("") {}
explicit MessageAttr(const std::string& channel)
struct BlockerAttr {
BlockerAttr() : capacity(10), channel_name("") {}
explicit BlockerAttr(const std::string& channel)
: capacity(10), channel_name(channel) {}
MessageAttr(size_t cap, const std::string& channel)
BlockerAttr(size_t cap, const std::string& channel)
: capacity(cap), channel_name(channel) {}
MessageAttr(const MessageAttr& attr)
BlockerAttr(const BlockerAttr& attr)
: capacity(attr.capacity), channel_name(attr.channel_name) {}
size_t capacity;
......@@ -60,7 +60,7 @@ struct MessageAttr {
};
template <typename T>
class Message : public MessageBase {
class Blocker : public BlockerBase {
public:
using MessageType = T;
using MessagePtr = std::shared_ptr<T>;
......@@ -69,8 +69,8 @@ class Message : public MessageBase {
using CallbackMap = std::unordered_map<std::string, Callback>;
using Iterator = typename std::list<std::shared_ptr<T>>::const_iterator;
explicit Message(const MessageAttr& attr);
virtual ~Message();
explicit Blocker(const BlockerAttr& attr);
virtual ~Blocker();
void Publish(const MessageType& msg);
void Publish(const MessagePtr& msg);
......@@ -100,7 +100,7 @@ class Message : public MessageBase {
void Notify(const MessagePtr& msg);
bool is_full_;
MessageAttr attr_;
BlockerAttr attr_;
MessageQueue observed_msg_queue_;
MessageQueue published_msg_queue_;
mutable std::mutex msg_mutex_;
......@@ -110,52 +110,52 @@ class Message : public MessageBase {
};
template <typename T>
Message<T>::Message(const MessageAttr& attr) : is_full_(false), attr_(attr) {}
Blocker<T>::Blocker(const BlockerAttr& attr) : is_full_(false), attr_(attr) {}
template <typename T>
Message<T>::~Message() {
Blocker<T>::~Blocker() {
published_msg_queue_.clear();
observed_msg_queue_.clear();
published_callbacks_.clear();
}
template <typename T>
void Message<T>::Publish(const MessageType& msg) {
void Blocker<T>::Publish(const MessageType& msg) {
Publish(std::make_shared<MessageType>(msg));
}
template <typename T>
void Message<T>::Publish(const MessagePtr& msg) {
void Blocker<T>::Publish(const MessagePtr& msg) {
Enqueue(msg);
Notify(msg);
}
template <typename T>
void Message<T>::ClearPublished() {
void Blocker<T>::ClearPublished() {
std::lock_guard<std::mutex> lock(msg_mutex_);
published_msg_queue_.clear();
}
template <typename T>
void Message<T>::Observe() {
void Blocker<T>::Observe() {
std::lock_guard<std::mutex> lock(msg_mutex_);
observed_msg_queue_ = published_msg_queue_;
}
template <typename T>
bool Message<T>::IsObservedEmpty() const {
bool Blocker<T>::IsObservedEmpty() const {
std::lock_guard<std::mutex> lock(msg_mutex_);
return observed_msg_queue_.empty();
}
template <typename T>
bool Message<T>::IsPublishedEmpty() const {
bool Blocker<T>::IsPublishedEmpty() const {
std::lock_guard<std::mutex> lock(msg_mutex_);
return published_msg_queue_.empty();
}
template <typename T>
bool Message<T>::Subscribe(const std::string& callback_id,
bool Blocker<T>::Subscribe(const std::string& callback_id,
const Callback& callback) {
std::lock_guard<std::mutex> lock(cb_mutex_);
if (published_callbacks_.find(callback_id) != published_callbacks_.end()) {
......@@ -166,82 +166,82 @@ bool Message<T>::Subscribe(const std::string& callback_id,
}
template <typename T>
bool Message<T>::Unsubscribe(const std::string& callback_id) {
bool Blocker<T>::Unsubscribe(const std::string& callback_id) {
std::lock_guard<std::mutex> lock(cb_mutex_);
return published_callbacks_.erase(callback_id) != 0;
}
template <typename T>
auto Message<T>::GetLatestObserved() const -> const MessageType& {
auto Blocker<T>::GetLatestObserved() const -> const MessageType& {
std::lock_guard<std::mutex> lock(msg_mutex_);
assert(!observed_msg_queue_.empty());
return *observed_msg_queue_.back();
return *observed_msg_queue_.front();
}
template <typename T>
auto Message<T>::GetLatestObservedPtr() const -> const MessagePtr {
auto Blocker<T>::GetLatestObservedPtr() const -> const MessagePtr {
std::lock_guard<std::mutex> lock(msg_mutex_);
assert(!observed_msg_queue_.empty());
return observed_msg_queue_.back();
return observed_msg_queue_.front();
}
template <typename T>
auto Message<T>::GetOldestObservedPtr() const -> const MessagePtr {
auto Blocker<T>::GetOldestObservedPtr() const -> const MessagePtr {
std::lock_guard<std::mutex> lock(msg_mutex_);
assert(!observed_msg_queue_.empty());
return observed_msg_queue_.front();
return observed_msg_queue_.back();
}
template <typename T>
auto Message<T>::GetLatestPublishedPtr() const -> const MessagePtr {
auto Blocker<T>::GetLatestPublishedPtr() const -> const MessagePtr {
std::lock_guard<std::mutex> lock(msg_mutex_);
assert(!published_msg_queue_.empty());
return published_msg_queue_.back();
return published_msg_queue_.front();
}
template <typename T>
auto Message<T>::ObservedBegin() const -> Iterator {
auto Blocker<T>::ObservedBegin() const -> Iterator {
return observed_msg_queue_.begin();
}
template <typename T>
auto Message<T>::ObservedEnd() const -> Iterator {
auto Blocker<T>::ObservedEnd() const -> Iterator {
return observed_msg_queue_.end();
}
template <typename T>
size_t Message<T>::capacity() const {
size_t Blocker<T>::capacity() const {
return attr_.capacity;
}
template <typename T>
void Message<T>::set_capacity(size_t capacity) {
void Blocker<T>::set_capacity(size_t capacity) {
std::lock_guard<std::mutex> lock(msg_mutex_);
if (capacity > attr_.capacity) {
is_full_ = false;
}
attr_.capacity = capacity;
while (published_msg_queue_.size() > capacity) {
published_msg_queue_.pop_front();
published_msg_queue_.pop_back();
}
}
template <typename T>
const std::string& Message<T>::channel_name() const {
const std::string& Blocker<T>::channel_name() const {
return attr_.channel_name;
}
template <typename T>
void Message<T>::Enqueue(const MessagePtr& msg) {
void Blocker<T>::Enqueue(const MessagePtr& msg) {
if (attr_.capacity == 0) {
return;
}
std::lock_guard<std::mutex> lock(msg_mutex_);
if (is_full_) {
published_msg_queue_.pop_front();
published_msg_queue_.pop_back();
}
published_msg_queue_.push_back(msg);
published_msg_queue_.push_front(msg);
if (!is_full_) {
if (published_msg_queue_.size() >= attr_.capacity) {
......@@ -251,15 +251,15 @@ void Message<T>::Enqueue(const MessagePtr& msg) {
}
template <typename T>
void Message<T>::Notify(const MessagePtr& msg) {
void Blocker<T>::Notify(const MessagePtr& msg) {
std::lock_guard<std::mutex> lock(cb_mutex_);
for (const auto& item : published_callbacks_) {
item.second(msg);
}
}
} // namespace dispatcher
} // namespace blocker
} // namespace cybertron
} // namespace apollo
#endif // CYBERTRON_DISPATCHER_MESSAGE_H_
#endif // CYBERTRON_BLOCKER_BLOCKER_H_
......@@ -14,23 +14,23 @@
* limitations under the License.
*****************************************************************************/
#include "cybertron/dispatcher/dispatcher.h"
#include "cybertron/blocker/blocker_manager.h"
namespace apollo {
namespace cybertron {
namespace dispatcher {
namespace blocker {
Dispatcher::Dispatcher() {}
BlockerManager::BlockerManager() {}
Dispatcher::~Dispatcher() { messages_.clear(); }
BlockerManager::~BlockerManager() { blockers_.clear(); }
void Dispatcher::Observe() {
std::lock_guard<std::mutex> lock(msg_mutex_);
for (auto& item : messages_) {
void BlockerManager::Observe() {
std::lock_guard<std::mutex> lock(blocker_mutex_);
for (auto& item : blockers_) {
item.second->Observe();
}
}
} // namespace dispatcher
} // namespace blocker
} // namespace cybertron
} // namespace apollo
......@@ -14,142 +14,143 @@
* limitations under the License.
*****************************************************************************/
#ifndef CYBERTRON_DISPATCHER_DISPATCHER_H_
#define CYBERTRON_DISPATCHER_DISPATCHER_H_
#ifndef CYBERTRON_BLOCKER_BLOCKER_MANAGER_H_
#define CYBERTRON_BLOCKER_BLOCKER_MANAGER_H_
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include "cybertron/dispatcher/message.h"
#include "cybertron/blocker/blocker.h"
namespace apollo {
namespace cybertron {
namespace dispatcher {
namespace blocker {
class Dispatcher {
class BlockerManager {
public:
using MessageMap =
std::unordered_map<std::string, std::shared_ptr<MessageBase>>;
using BlockerMap =
std::unordered_map<std::string, std::shared_ptr<BlockerBase>>;
virtual ~Dispatcher();
virtual ~BlockerManager();
static const std::shared_ptr<Dispatcher>& Instance() {
static auto instance = std::shared_ptr<Dispatcher>(new Dispatcher());
static const std::shared_ptr<BlockerManager>& Instance() {
static auto instance =
std::shared_ptr<BlockerManager>(new BlockerManager());
return instance;
}
template <typename T>
bool Publish(const std::string& channel_name,
const typename Message<T>::MessagePtr& msg);
const typename Blocker<T>::MessagePtr& msg);
template <typename T>
bool Publish(const std::string& channel_name,
const typename Message<T>::MessageType& msg);
const typename Blocker<T>::MessageType& msg);
template <typename T>
bool Subscribe(const std::string& channel_name, size_t capacity,
const std::string& callback_id,
const typename Message<T>::Callback& callback);
const typename Blocker<T>::Callback& callback);
template <typename T>
bool Unsubscribe(const std::string& channel_name,
const std::string& callback_id);
template <typename T>
std::shared_ptr<Message<T>> GetMessage(const std::string& channel_name);
std::shared_ptr<Blocker<T>> GetBlocker(const std::string& channel_name);
template <typename T>
std::shared_ptr<Message<T>> GetOrCreateMessage(const MessageAttr& attr);
std::shared_ptr<Blocker<T>> GetOrCreateBlocker(const BlockerAttr& attr);
void Observe();
private:
Dispatcher();
Dispatcher(const Dispatcher&) = delete;
Dispatcher& operator=(const Dispatcher&) = delete;
BlockerManager();
BlockerManager(const BlockerManager&) = delete;
BlockerManager& operator=(const BlockerManager&) = delete;
MessageMap messages_;
std::mutex msg_mutex_;
BlockerMap blockers_;
std::mutex blocker_mutex_;
};
template <typename T>
bool Dispatcher::Publish(const std::string& channel_name,
const typename Message<T>::MessagePtr& msg) {
auto message = GetOrCreateMessage<T>(MessageAttr(channel_name));
if (message == nullptr) {
bool BlockerManager::Publish(const std::string& channel_name,
const typename Blocker<T>::MessagePtr& msg) {
auto blocker = GetOrCreateBlocker<T>(BlockerAttr(channel_name));
if (blocker == nullptr) {
return false;
}
message->Publish(msg);
blocker->Publish(msg);
return true;
}
template <typename T>
bool Dispatcher::Publish(const std::string& channel_name,
const typename Message<T>::MessageType& msg) {
auto message = GetOrCreateMessage<T>(MessageAttr(channel_name));
if (message == nullptr) {
bool BlockerManager::Publish(const std::string& channel_name,
const typename Blocker<T>::MessageType& msg) {
auto blocker = GetOrCreateBlocker<T>(BlockerAttr(channel_name));
if (blocker == nullptr) {
return false;
}
message->Publish(msg);
blocker->Publish(msg);
return true;
}
template <typename T>
bool Dispatcher::Subscribe(const std::string& channel_name, size_t capacity,
const std::string& callback_id,
const typename Message<T>::Callback& callback) {
auto message = GetOrCreateMessage<T>(MessageAttr(capacity, channel_name));
if (message == nullptr) {
bool BlockerManager::Subscribe(const std::string& channel_name, size_t capacity,
const std::string& callback_id,
const typename Blocker<T>::Callback& callback) {
auto blocker = GetOrCreateBlocker<T>(BlockerAttr(capacity, channel_name));
if (blocker == nullptr) {
return false;
}
return message->Subscribe(callback_id, callback);
return blocker->Subscribe(callback_id, callback);
}
template <typename T>
bool Dispatcher::Unsubscribe(const std::string& channel_name,
const std::string& callback_id) {
auto message = GetMessage<T>(channel_name);
if (message == nullptr) {
bool BlockerManager::Unsubscribe(const std::string& channel_name,
const std::string& callback_id) {
auto blocker = GetBlocker<T>(channel_name);
if (blocker == nullptr) {
return false;
}
return message->Unsubscribe(callback_id);
return blocker->Unsubscribe(callback_id);
}
template <typename T>
std::shared_ptr<Message<T>> Dispatcher::GetMessage(
std::shared_ptr<Blocker<T>> BlockerManager::GetBlocker(
const std::string& channel_name) {
std::shared_ptr<Message<T>> message = nullptr;
std::shared_ptr<Blocker<T>> blocker = nullptr;
{
std::lock_guard<std::mutex> lock(msg_mutex_);
auto search = messages_.find(channel_name);
if (search != messages_.end()) {
message = std::dynamic_pointer_cast<Message<T>>(search->second);
std::lock_guard<std::mutex> lock(blocker_mutex_);
auto search = blockers_.find(channel_name);
if (search != blockers_.end()) {
blocker = std::dynamic_pointer_cast<Blocker<T>>(search->second);
}
}
return message;
return blocker;
}
template <typename T>
std::shared_ptr<Message<T>> Dispatcher::GetOrCreateMessage(
const MessageAttr& attr) {
std::shared_ptr<Message<T>> message = nullptr;
std::shared_ptr<Blocker<T>> BlockerManager::GetOrCreateBlocker(
const BlockerAttr& attr) {
std::shared_ptr<Blocker<T>> blocker = nullptr;
{
std::lock_guard<std::mutex> lock(msg_mutex_);
auto search = messages_.find(attr.channel_name);
if (search != messages_.end()) {
message = std::dynamic_pointer_cast<Message<T>>(search->second);
std::lock_guard<std::mutex> lock(blocker_mutex_);
auto search = blockers_.find(attr.channel_name);
if (search != blockers_.end()) {
blocker = std::dynamic_pointer_cast<Blocker<T>>(search->second);
} else {
message = std::make_shared<Message<T>>(attr);
messages_[attr.channel_name] = message;
blocker = std::make_shared<Blocker<T>>(attr);
blockers_[attr.channel_name] = blocker;
}
}
return message;
return blocker;
}
} // namespace dispatcher
} // namespace blocker
} // namespace cybertron
} // namespace apollo
#endif // CYBERTRON_DISPATCHER_DISPATCHER_H_
#endif // CYBERTRON_BLOCKER_BLOCKER_MANAGER_H_
......@@ -14,19 +14,20 @@
* limitations under the License.
*****************************************************************************/
#ifndef CYBERTRON_DISPATCHER_INTRA_READER_H_
#define CYBERTRON_DISPATCHER_INTRA_READER_H_
#ifndef CYBERTRON_BLOCKER_INTRA_READER_H_
#define CYBERTRON_BLOCKER_INTRA_READER_H_
#include <functional>
#include <list>
#include <memory>
#include "cybertron/dispatcher/dispatcher.h"
#include "cybertron/blocker/blocker.h"
#include "cybertron/blocker/blocker_manager.h"
#include "cybertron/node/reader.h"
namespace apollo {
namespace cybertron {
namespace dispatcher {
namespace blocker {
template <typename MessageT>
class IntraReader : public apollo::cybertron::Reader<MessageT> {
......@@ -59,13 +60,13 @@ class IntraReader : public apollo::cybertron::Reader<MessageT> {
void OnMessage(const MessagePtr& msg_ptr);
Callback msg_callback_;
std::shared_ptr<Message<MessageT>> message_;
std::shared_ptr<Blocker<MessageT>> blocker_;
};
template <typename MessageT>
IntraReader<MessageT>::IntraReader(const proto::RoleAttributes& attr,
const Callback& callback)
: Reader<MessageT>(attr), msg_callback_(callback), message_(nullptr) {}
: Reader<MessageT>(attr), msg_callback_(callback), blocker_(nullptr) {}
template <typename MessageT>
IntraReader<MessageT>::~IntraReader() {
......@@ -77,13 +78,13 @@ bool IntraReader<MessageT>::Init() {
if (this->init_.exchange(true)) {
return true;
}
MessageAttr attr(this->role_attr_.qos_profile().depth(),
BlockerAttr attr(this->role_attr_.qos_profile().depth(),
this->role_attr_.channel_name());
message_ = Dispatcher::Instance()->GetOrCreateMessage<MessageT>(attr);
if (message_ == nullptr) {
blocker_ = BlockerManager::Instance()->GetOrCreateBlocker<MessageT>(attr);
if (blocker_ == nullptr) {
return false;
}
return message_->Subscribe(this->role_attr_.node_name(),
return blocker_->Subscribe(this->role_attr_.node_name(),
std::bind(&IntraReader<MessageT>::OnMessage, this,
std::placeholders::_1));
}
......@@ -93,94 +94,94 @@ void IntraReader<MessageT>::Shutdown() {
if (!this->init_.exchange(false)) {
return;
}
message_->Unsubscribe(this->role_attr_.node_name());
message_ = nullptr;
blocker_->Unsubscribe(this->role_attr_.node_name());
blocker_ = nullptr;
}
template <typename MessageT>
void IntraReader<MessageT>::ClearData() {
if (message_ == nullptr) {
if (blocker_ == nullptr) {
return;
}
message_->ClearPublished();
blocker_->ClearPublished();
}
template <typename MessageT>
void IntraReader<MessageT>::Observe() {
if (message_ == nullptr) {
if (blocker_ == nullptr) {
return;
}
message_->Observe();
blocker_->Observe();
}
template <typename MessageT>
bool IntraReader<MessageT>::Empty() const {
if (message_ == nullptr) {
if (blocker_ == nullptr) {
return true;
}
return message_->IsObservedEmpty();
return blocker_->IsObservedEmpty();
}
template <typename MessageT>
bool IntraReader<MessageT>::HasReceived() const {
if (message_ == nullptr) {
if (blocker_ == nullptr) {
return false;
}
return !message_->IsPublishedEmpty();
return !blocker_->IsPublishedEmpty();
}
template <typename MessageT>
void IntraReader<MessageT>::Enqueue(const std::shared_ptr<MessageT>& msg) {
if (message_ == nullptr) {
if (blocker_ == nullptr) {
return;
}
message_->Publish(msg);
blocker_->Publish(msg);
}
template <typename MessageT>
void IntraReader<MessageT>::SetHistoryDepth(const uint32_t& depth) {
if (message_ == nullptr) {
if (blocker_ == nullptr) {
return;
}
message_->set_capacity(depth);
blocker_->set_capacity(depth);
}
template <typename MessageT>
uint32_t IntraReader<MessageT>::GetHistoryDepth() const {
if (message_ == nullptr) {
if (blocker_ == nullptr) {
return 0;
}
return message_->capacity();
return blocker_->capacity();
}
template <typename MessageT>
const std::shared_ptr<MessageT>& IntraReader<MessageT>::GetLatestObserved()
const {
if (message_ == nullptr) {
if (blocker_ == nullptr) {
return nullptr;
}
return message_->GetLatestObservedPtr();
return blocker_->GetLatestObservedPtr();
}
template <typename MessageT>
const std::shared_ptr<MessageT>& IntraReader<MessageT>::GetOldestObserved()
const {
if (message_ == nullptr) {
if (blocker_ == nullptr) {
return nullptr;
}
return message_->GetOldestObservedPtr();
return blocker_->GetOldestObservedPtr();
}
template <typename MessageT>
auto IntraReader<MessageT>::Begin() const -> Iterator {
assert(message_ != nullptr);
return message_->ObservedBegin();
assert(blocker_ != nullptr);
return blocker_->ObservedBegin();
}
template <typename MessageT>
auto IntraReader<MessageT>::End() const -> Iterator {
assert(message_ != nullptr);
return message_->ObservedEnd();
assert(blocker_ != nullptr);
return blocker_->ObservedEnd();
}
template <typename MessageT>
......@@ -190,8 +191,8 @@ void IntraReader<MessageT>::OnMessage(const MessagePtr& msg_ptr) {
}
}
} // namespace dispatcher
} // namespace blocker
} // namespace cybertron
} // namespace apollo
#endif // CYBERTRON_DISPATCHER_INTRA_READER_H_
#endif // CYBERTRON_BLOCKER_INTRA_READER_H_
......@@ -14,23 +14,23 @@
* limitations under the License.
*****************************************************************************/
#ifndef CYBERTRON_DISPATCHER_INTRA_WRITER_H_
#define CYBERTRON_DISPATCHER_INTRA_WRITER_H_
#ifndef CYBERTRON_BLOCKER_INTRA_WRITER_H_
#define CYBERTRON_BLOCKER_INTRA_WRITER_H_
#include <memory>
#include "cybertron/dispatcher/dispatcher.h"
#include "cybertron/blocker/blocker.h"
#include "cybertron/blocker/blocker_manager.h"
#include "cybertron/node/writer.h"
namespace apollo {
namespace cybertron {
namespace dispatcher {
namespace blocker {
template <typename MessageT>
class IntraWriter : public apollo::cybertron::Writer<MessageT> {
public:
using MessagePtr = std::shared_ptr<MessageT>;
using DispatcherPtr = std::shared_ptr<dispatcher::Dispatcher>;
explicit IntraWriter(const proto::RoleAttributes& attr);
virtual ~IntraWriter();
......@@ -42,14 +42,12 @@ class IntraWriter : public apollo::cybertron::Writer<MessageT> {
bool Write(const MessagePtr& msg_ptr) override;
private:
DispatcherPtr dispatcher_;
std::shared_ptr<Blocker<MessageT>> blocker_;
};
template <typename MessageT>
IntraWriter<MessageT>::IntraWriter(const proto::RoleAttributes& attr)
: Writer<MessageT>(attr) {
dispatcher_ = dispatcher::Dispatcher::Instance();
}
: Writer<MessageT>(attr) {}
template <typename MessageT>
IntraWriter<MessageT>::~IntraWriter() {
......@@ -58,13 +56,25 @@ IntraWriter<MessageT>::~IntraWriter() {
template <typename MessageT>
bool IntraWriter<MessageT>::Init() {
this->init_.exchange(true);
if (this->init_.exchange(true)) {
return true;
}
BlockerAttr attr(this->role_attr_.channel_name());
blocker_ = BlockerManager::Instance()->GetOrCreateBlocker<MessageT>(attr);
if (blocker_ == nullptr) {
this->init_.exchange(false);
return false;
}
return true;
}
template <typename MessageT>
void IntraWriter<MessageT>::Shutdown() {
this->init_.exchange(false);
if (!this->init_.exchange(false)) {
return;
}
blocker_ = nullptr;
}
template <typename MessageT>
......@@ -72,7 +82,8 @@ bool IntraWriter<MessageT>::Write(const MessageT& msg) {
if (!this->init_.load()) {
return false;
}
return Write(std::make_shared<MessageT>(msg));
blocker_->Publish(msg);
return true;
}
template <typename MessageT>
......@@ -80,12 +91,12 @@ bool IntraWriter<MessageT>::Write(const MessagePtr& msg_ptr) {
if (!this->init_.load()) {
return false;
}
return dispatcher_->Publish<MessageT>(this->role_attr_.channel_name(),
msg_ptr);
blocker_->Publish(msg_ptr);
return true;
}
} // namespace dispatcher
} // namespace blocker
} // namespace cybertron
} // namespace apollo
#endif // CYBERTRON_DISPATCHER_INTRA_WRITER_H_
#endif // CYBERTRON_BLOCKER_INTRA_WRITER_H_
......@@ -21,13 +21,13 @@
#include <utility>
#include <vector>
#include "cybertron/blocker/blocker_manager.h"
#include "cybertron/common/global_data.h"
#include "cybertron/common/types.h"
#include "cybertron/common/util.h"
#include "cybertron/component/component_base.h"
#include "cybertron/croutine/routine_factory.h"
#include "cybertron/data/data_visitor.h"
#include "cybertron/proto/run_mode_conf.pb.h"
#include "cybertron/scheduler/scheduler.h"
namespace apollo {
......@@ -218,14 +218,14 @@ bool Component<M0, M1, NullType, NullType>::Initialize(
std::weak_ptr<Component<M0, M1>> self =
std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());
auto message1 = dispatcher::Dispatcher::Instance()->GetMessage<M1>(
auto blocker1 = blocker::BlockerManager::Instance()->GetBlocker<M1>(
config.readers(1).channel());
auto func = [self, message1](const std::shared_ptr<M0>& msg0) {
auto func = [self, blocker1](const std::shared_ptr<M0>& msg0) {
auto ptr = self.lock();
if (ptr) {
if (!message1->IsPublishedEmpty()) {
auto msg1 = message1->GetLatestPublishedPtr();
if (!blocker1->IsPublishedEmpty()) {
auto msg1 = blocker1->GetLatestPublishedPtr();
ptr->Process(msg0, msg1);
}
} else {
......@@ -307,17 +307,17 @@ bool Component<M0, M1, M2, NullType>::Initialize(
std::dynamic_pointer_cast<Component<M0, M1, M2, NullType>>(
shared_from_this());
auto message1 = dispatcher::Dispatcher::Instance()->GetMessage<M1>(
auto blocker1 = blocker::BlockerManager::Instance()->GetBlocker<M1>(
config.readers(1).channel());
auto message2 = dispatcher::Dispatcher::Instance()->GetMessage<M2>(
auto blocker2 = blocker::BlockerManager::Instance()->GetBlocker<M2>(
config.readers(2).channel());
auto func = [self, message1, message2](const std::shared_ptr<M0>& msg0) {
auto func = [self, blocker1, blocker2](const std::shared_ptr<M0>& msg0) {
auto ptr = self.lock();
if (ptr) {
if (!message1->IsPublishedEmpty() && !message2->IsPublishedEmpty()) {
auto msg1 = message1->GetLatestPublishedPtr();
auto msg2 = message2->GetLatestPublishedPtr();
if (!blocker1->IsPublishedEmpty() && !blocker2->IsPublishedEmpty()) {
auto msg1 = blocker1->GetLatestPublishedPtr();
auto msg2 = blocker2->GetLatestPublishedPtr();
ptr->Process(msg0, msg1, msg2);
}
} else {
......@@ -404,22 +404,22 @@ bool Component<M0, M1, M2, M3>::Initialize(const ComponentConfig& config) {
std::dynamic_pointer_cast<Component<M0, M1, M2, M3>>(
shared_from_this());
auto message1 = dispatcher::Dispatcher::Instance()->GetMessage<M1>(
auto blocker1 = blocker::BlockerManager::Instance()->GetBlocker<M1>(
config.readers(1).channel());
auto message2 = dispatcher::Dispatcher::Instance()->GetMessage<M2>(
auto blocker2 = blocker::BlockerManager::Instance()->GetBlocker<M2>(
config.readers(2).channel());
auto message3 = dispatcher::Dispatcher::Instance()->GetMessage<M3>(
auto blocker3 = blocker::BlockerManager::Instance()->GetBlocker<M3>(
config.readers(3).channel());
auto func = [self, message1, message2,
message3](const std::shared_ptr<M0>& msg0) {
auto func = [self, blocker1, blocker2,
blocker3](const std::shared_ptr<M0>& msg0) {
auto ptr = self.lock();
if (ptr) {
if (!message1->IsPublishedEmpty() && !message2->IsPublishedEmpty() &&
!message3->IsPublishedEmpty()) {
auto msg1 = message1->GetLatestPublishedPtr();
auto msg2 = message2->GetLatestPublishedPtr();
auto msg3 = message3->GetLatestPublishedPtr();
if (!blocker1->IsPublishedEmpty() && !blocker2->IsPublishedEmpty() &&
!blocker3->IsPublishedEmpty()) {
auto msg1 = blocker1->GetLatestPublishedPtr();
auto msg2 = blocker2->GetLatestPublishedPtr();
auto msg3 = blocker3->GetLatestPublishedPtr();
ptr->Process(msg0, msg1, msg2, msg3);
}
} else {
......@@ -447,16 +447,16 @@ bool Component<M0, M1, M2, M3>::Initialize(const ComponentConfig& config) {
auto sched = scheduler::Scheduler::Instance();
std::weak_ptr<Component<M0, M1, M2, M3>> self =
std::dynamic_pointer_cast<Component<M0, M1, M2, M3>>(shared_from_this());
auto func = [self](
const std::shared_ptr<M0>& msg0, const std::shared_ptr<M1>& msg1,
const std::shared_ptr<M2>& msg2, const std::shared_ptr<M3>& msg3) {
auto ptr = self.lock();
if (ptr) {
ptr->Process(msg0, msg1, msg2, msg3);
} else {
AERROR << "Component object has been destroyed." << std::endl;
}
};
auto func =
[self](const std::shared_ptr<M0>& msg0, const std::shared_ptr<M1>& msg1,
const std::shared_ptr<M2>& msg2, const std::shared_ptr<M3>& msg3) {
auto ptr = self.lock();
if (ptr) {
ptr->Process(msg0, msg1, msg2, msg3);
} else {
AERROR << "Component object has been destroyed." << std::endl;
}
};
auto dv = std::make_shared<data::DataVisitor<M0, M1, M2, M3>>(readers_);
croutine::RoutineFactory factory =
......
project(cybertron_dispatcher)
aux_source_directory(${PROJECT_SOURCE_DIR} DISPATCHER_SRCS)
add_library(${PROJECT_NAME} ${DISPATCHER_SRCS})
install(TARGETS cybertron_dispatcher LIBRARY DESTINATION lib)
......@@ -20,9 +20,9 @@
#include <memory>
#include <string>
#include "cybertron/blocker/intra_reader.h"
#include "cybertron/blocker/intra_writer.h"
#include "cybertron/common/global_data.h"
#include "cybertron/dispatcher/intra_reader.h"
#include "cybertron/dispatcher/intra_writer.h"
#include "cybertron/message/message_traits.h"
#include "cybertron/node/reader.h"
#include "cybertron/node/writer.h"
......@@ -104,7 +104,7 @@ auto NodeChannelImpl::CreateWriter(const proto::RoleAttributes& role_attr)
std::shared_ptr<Writer<MessageT>> writer_ptr = nullptr;
if (!is_reality_mode_) {
writer_ptr = std::make_shared<dispatcher::IntraWriter<MessageT>>(new_attr);
writer_ptr = std::make_shared<blocker::IntraWriter<MessageT>>(new_attr);
} else {
writer_ptr = std::make_shared<Writer<MessageT>>(new_attr);
}
......@@ -141,8 +141,8 @@ auto NodeChannelImpl::CreateReader(const proto::RoleAttributes& role_attr,
std::shared_ptr<Reader<MessageT>> reader_ptr = nullptr;
if (!is_reality_mode_) {
reader_ptr = std::make_shared<dispatcher::IntraReader<MessageT>>(
new_attr, reader_func);
reader_ptr =
std::make_shared<blocker::IntraReader<MessageT>>(new_attr, reader_func);
} else {
reader_ptr = std::make_shared<Reader<MessageT>>(new_attr, reader_func);
}
......
......@@ -21,4 +21,4 @@ add_subdirectory(tools)
add_subdirectory(record)
add_subdirectory(component)
add_subdirectory(mainboard)
add_subdirectory(dispatcher)
add_subdirectory(blocker)
project(blocker_test)
add_executable(blocker_test
blocker_test.cpp
)
target_link_libraries(blocker_test
cybertron
cybertron_blocker
gtest
)
add_executable(blocker_manager_test
blocker_manager_test.cpp
)
target_link_libraries(blocker_manager_test
cybertron
cybertron_blocker
gtest
)
install(TARGETS
blocker_test
blocker_manager_test
DESTINATION ${UNIT_TEST_INSTALL_PREFIX}
)
\ No newline at end of file
......@@ -16,59 +16,59 @@
#include "gtest/gtest.h"
#include "cybertron/dispatcher/dispatcher.h"
#include "cybertron/blocker/blocker_manager.h"
#include "cybertron/proto/unit_test.pb.h"
namespace apollo {
namespace cybertron {
namespace dispatcher {
namespace blocker {
using apollo::cybertron::proto::UnitTest;
TEST(DispatcherTest, constructor) {
Dispatcher dispatcher;
auto msg = dispatcher.GetMessage<UnitTest>("channel");
EXPECT_EQ(msg, nullptr);
TEST(BlockerManagerTest, constructor) {
BlockerManager blocker_manager;
auto blocker = blocker_manager.GetBlocker<UnitTest>("channel");
EXPECT_EQ(blocker, nullptr);
}
TEST(DispatcherTest, publish) {
Dispatcher dispatcher;
TEST(BlockerManagerTest, publish) {
BlockerManager blocker_manager;
auto msg1 = std::make_shared<UnitTest>();
msg1->set_class_name("MessageTest");
msg1->set_case_name("publish_1");
dispatcher.Publish<UnitTest>("channel1", msg1);
auto message1 = dispatcher.GetMessage<UnitTest>("channel1");
EXPECT_NE(message1, nullptr);
EXPECT_FALSE(message1->IsPublishedEmpty());
blocker_manager.Publish<UnitTest>("channel1", msg1);
auto blocker1 = blocker_manager.GetBlocker<UnitTest>("channel1");
EXPECT_NE(blocker1, nullptr);
EXPECT_FALSE(blocker1->IsPublishedEmpty());
UnitTest msg2;
msg2.set_class_name("MessageTest");
msg2.set_case_name("publish_2");
dispatcher.Publish<UnitTest>("channel2", msg2);
auto message2 = dispatcher.GetMessage<UnitTest>("channel2");
EXPECT_NE(message2, nullptr);
EXPECT_FALSE(message2->IsPublishedEmpty());
blocker_manager.Publish<UnitTest>("channel2", msg2);
auto blocker2 = blocker_manager.GetBlocker<UnitTest>("channel2");
EXPECT_NE(blocker2, nullptr);
EXPECT_FALSE(blocker2->IsPublishedEmpty());
EXPECT_TRUE(message1->IsObservedEmpty());
EXPECT_TRUE(message2->IsObservedEmpty());
EXPECT_TRUE(blocker1->IsObservedEmpty());
EXPECT_TRUE(blocker2->IsObservedEmpty());
dispatcher.Observe();
blocker_manager.Observe();
EXPECT_FALSE(message1->IsObservedEmpty());
EXPECT_FALSE(message2->IsObservedEmpty());
EXPECT_FALSE(blocker1->IsObservedEmpty());
EXPECT_FALSE(blocker2->IsObservedEmpty());
}
TEST(DispatcherTest, subscribe) {
Dispatcher dispatcher;
TEST(BlockerManagerTest, subscribe) {
BlockerManager blocker_manager;
auto received_msg = std::make_shared<UnitTest>();
bool res = dispatcher.Subscribe<UnitTest>(
"channel", 10, "DispatcherTest",
[&received_msg](const std::shared_ptr<UnitTest>& msg) {
received_msg->CopyFrom(*msg);
bool res = blocker_manager.Subscribe<UnitTest>(
"channel", 10, "BlockerManagerTest",
[&received_msg](const std::shared_ptr<UnitTest>& blocker) {
received_msg->CopyFrom(*blocker);
});
EXPECT_TRUE(res);
......@@ -77,24 +77,26 @@ TEST(DispatcherTest, subscribe) {
msg1->set_class_name("MessageTest");
msg1->set_case_name("publish_1");
dispatcher.Publish<UnitTest>("channel", msg1);
blocker_manager.Publish<UnitTest>("channel", msg1);
EXPECT_EQ(received_msg->class_name(), msg1->class_name());
EXPECT_EQ(received_msg->case_name(), msg1->case_name());
res = dispatcher.Unsubscribe<UnitTest>("channel", "DispatcherTest");
res = blocker_manager.Unsubscribe<UnitTest>("channel", "BlockerManagerTest");
EXPECT_TRUE(res);
res = dispatcher.Unsubscribe<UnitTest>("channel", "DispatcherTest");
res = blocker_manager.Unsubscribe<UnitTest>("channel", "BlockerManagerTest");
EXPECT_FALSE(res);
res = dispatcher.Unsubscribe<UnitTest>("channel", "DispatcherTest_11");
res =
blocker_manager.Unsubscribe<UnitTest>("channel", "BlockerManagerTest_11");
EXPECT_FALSE(res);
res = dispatcher.Unsubscribe<UnitTest>("channel_11", "DispatcherTest");
res =
blocker_manager.Unsubscribe<UnitTest>("channel_11", "BlockerManagerTest");
EXPECT_FALSE(res);
}
} // namespace dispatcher
} // namespace blocker
} // namespace cybertron
} // namespace apollo
......
......@@ -16,105 +16,105 @@
#include "gtest/gtest.h"
#include "cybertron/dispatcher/message.h"
#include "cybertron/blocker/blocker.h"
#include "cybertron/proto/unit_test.pb.h"
namespace apollo {
namespace cybertron {
namespace dispatcher {
namespace blocker {
using apollo::cybertron::proto::UnitTest;
TEST(MessageTest, constructor) {
MessageAttr attr(10, "channel");
Message<UnitTest> message(attr);
EXPECT_EQ(message.capacity(), 10);
EXPECT_EQ(message.channel_name(), "channel");
TEST(BlockerTest, constructor) {
BlockerAttr attr(10, "channel");
Blocker<UnitTest> blocker(attr);
EXPECT_EQ(blocker.capacity(), 10);
EXPECT_EQ(blocker.channel_name(), "channel");
message.set_capacity(20);
EXPECT_EQ(message.capacity(), 20);
blocker.set_capacity(20);
EXPECT_EQ(blocker.capacity(), 20);
}
TEST(MessageTest, publish) {
MessageAttr attr(10, "channel");
Message<UnitTest> message(attr);
TEST(BlockerTest, publish) {
BlockerAttr attr(10, "channel");
Blocker<UnitTest> blocker(attr);
auto msg1 = std::make_shared<UnitTest>();
msg1->set_class_name("MessageTest");
msg1->set_class_name("BlockerTest");
msg1->set_case_name("publish_1");
UnitTest msg2;
msg2.set_class_name("MessageTest");
msg2.set_class_name("BlockerTest");
msg2.set_case_name("publish_2");
EXPECT_TRUE(message.IsPublishedEmpty());
message.Publish(msg1);
message.Publish(msg2);
EXPECT_FALSE(message.IsPublishedEmpty());
EXPECT_TRUE(blocker.IsPublishedEmpty());
blocker.Publish(msg1);
blocker.Publish(msg2);
EXPECT_FALSE(blocker.IsPublishedEmpty());
EXPECT_TRUE(message.IsObservedEmpty());
message.Observe();
EXPECT_FALSE(message.IsObservedEmpty());
EXPECT_TRUE(blocker.IsObservedEmpty());
blocker.Observe();
EXPECT_FALSE(blocker.IsObservedEmpty());
auto& latest_observed_msg = message.GetLatestObserved();
EXPECT_EQ(latest_observed_msg.class_name(), "MessageTest");
auto& latest_observed_msg = blocker.GetLatestObserved();
EXPECT_EQ(latest_observed_msg.class_name(), "BlockerTest");
EXPECT_EQ(latest_observed_msg.case_name(), "publish_2");
auto latest_observed_msg_ptr = message.GetLatestObservedPtr();
EXPECT_EQ(latest_observed_msg_ptr->class_name(), "MessageTest");
auto latest_observed_msg_ptr = blocker.GetLatestObservedPtr();
EXPECT_EQ(latest_observed_msg_ptr->class_name(), "BlockerTest");
EXPECT_EQ(latest_observed_msg_ptr->case_name(), "publish_2");
auto latest_published_ptr = message.GetLatestPublishedPtr();
EXPECT_EQ(latest_published_ptr->class_name(), "MessageTest");
auto latest_published_ptr = blocker.GetLatestPublishedPtr();
EXPECT_EQ(latest_published_ptr->class_name(), "BlockerTest");
EXPECT_EQ(latest_published_ptr->case_name(), "publish_2");
message.ClearPublished();
EXPECT_TRUE(message.IsPublishedEmpty());
EXPECT_FALSE(message.IsObservedEmpty());
blocker.ClearPublished();
EXPECT_TRUE(blocker.IsPublishedEmpty());
EXPECT_FALSE(blocker.IsObservedEmpty());
}
TEST(MessageTest, subscribe) {
MessageAttr attr(10, "channel");
Message<UnitTest> message(attr);
TEST(BlockerTest, subscribe) {
BlockerAttr attr(10, "channel");
Blocker<UnitTest> blocker(attr);
auto received_msg = std::make_shared<UnitTest>();
bool res = message.Subscribe(
"MessageTest1", [&received_msg](const std::shared_ptr<UnitTest>& msg) {
bool res = blocker.Subscribe(
"BlockerTest1", [&received_msg](const std::shared_ptr<UnitTest>& msg) {
received_msg->CopyFrom(*msg);
});
EXPECT_TRUE(res);
auto msg1 = std::make_shared<UnitTest>();
msg1->set_class_name("MessageTest");
msg1->set_class_name("BlockerTest");
msg1->set_case_name("publish_1");
message.Publish(msg1);
blocker.Publish(msg1);
EXPECT_EQ(received_msg->class_name(), msg1->class_name());
EXPECT_EQ(received_msg->case_name(), msg1->case_name());
res = message.Subscribe(
"MessageTest1", [&received_msg](const std::shared_ptr<UnitTest>& msg) {
res = blocker.Subscribe(
"BlockerTest1", [&received_msg](const std::shared_ptr<UnitTest>& msg) {
received_msg->CopyFrom(*msg);
});
EXPECT_FALSE(res);
res = message.Unsubscribe("MessageTest1");
res = blocker.Unsubscribe("BlockerTest1");
EXPECT_TRUE(res);
res = message.Unsubscribe("MessageTest1");
res = blocker.Unsubscribe("BlockerTest1");
EXPECT_FALSE(res);
UnitTest msg2;
msg2.set_class_name("MessageTest");
msg2.set_class_name("BlockerTest");
msg2.set_case_name("publish_2");
message.Publish(msg2);
blocker.Publish(msg2);
EXPECT_NE(received_msg->case_name(), msg2.case_name());
}
} // namespace dispatcher
} // namespace blocker
} // namespace cybertron
} // namespace apollo
......
project(dispatcher_test)
add_executable(dispatcher_dispatcher_test
dispatcher_test.cpp
)
target_link_libraries(dispatcher_dispatcher_test
cybertron
cybertron_dispatcher
gtest
)
add_executable(dispatcher_message_test
message_test.cpp
)
target_link_libraries(dispatcher_message_test
cybertron
cybertron_dispatcher
gtest
)
install(TARGETS
dispatcher_dispatcher_test
dispatcher_message_test
DESTINATION ${UNIT_TEST_INSTALL_PREFIX}
)
\ No newline at end of file
......@@ -82,11 +82,11 @@ add_executable(user_defined_task
)
target_link_libraries(user_defined_task -Wl,--no-as-needed -pthread cybertron)
add_executable(dispatcher_example
dispatcher_example.cpp
add_executable(blocker_example
blocker_example.cpp
)
target_link_libraries(dispatcher_example -pthread dl cybertron perception_component)
install(TARGETS dispatcher_example DESTINATION bin)
target_link_libraries(blocker_example -pthread dl cybertron perception_component)
install(TARGETS blocker_example DESTINATION bin)
install(DIRECTORY dag DESTINATION ${CMAKE_INSTALL_PREFIX})
......
......@@ -16,14 +16,24 @@
#include <iostream>
#include "cybertron/dispatcher/dispatcher.h"
#include "cybertron/blocker/blocker_manager.h"
#include "cybertron/common/global_data.h"
#include "cybertron/proto/component_config.pb.h"
#include "examples/component_perception/perception_component.h"
// To use this example, we need set run_mode to MODE_SIMULATION
// (run_mode is in file conf/cybertron.pb.conf)
int main(int argc, char* argv[]) {
bool is_reality_mode =
apollo::cybertron::common::GlobalData::Instance()->IsRealityMode();
if (is_reality_mode) {
std::cout << "To use this example:\n"
"we need to set run_mode to MODE_SIMULATION (run_mode is in\n"
"file conf/cybertron.pb.conf)\n"
"Or export CYBER_RUN_MODE=simulation firstly."
<< std::endl;
return 0;
}
apollo::cybertron::proto::ComponentConfig config;
config.set_name("perception");
auto reader = config.add_readers();
......@@ -43,10 +53,11 @@ int main(int argc, char* argv[]) {
perception->Proc(driver_raw_msg);
auto message = apollo::cybertron::dispatcher::Dispatcher::Instance()
->GetMessage<Perception>("/perception/channel");
auto blocker = apollo::cybertron::blocker::BlockerManager::Instance()
->GetBlocker<apollo::cybertron::proto::Perception>(
"/perception/channel");
bool is_empty = message->IsPublishedEmpty();
bool is_empty = blocker->IsPublishedEmpty();
if (is_empty) {
std::cout << "simulation failed." << std::endl;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册