提交 2f5db748 编写于 作者: J JackyWang08 提交者: Zhongjun Ni

framework: add support for simulation

上级 c07db594
......@@ -27,6 +27,7 @@ add_subdirectory(sensor)
add_subdirectory(third_party)
add_subdirectory(python)
add_subdirectory(cybertron/dispatcher)
file(GLOB CYBERTRON_SRCS
"cybertron/*.cpp"
......@@ -64,6 +65,7 @@ target_link_libraries(cybertron
protobuf
cybertron_proto
cybertron_common
cybertron_dispatcher
fastrtps
fastcdr
-lrt
......
......@@ -40,6 +40,11 @@ GlobalData::GlobalData() {
InitConfig();
process_id_ = getpid();
process_name_ = "default_" + std::to_string(process_id_);
is_reality_mode_ = (config_.has_run_mode_conf() &&
config_.run_mode_conf().run_mode() ==
apollo::cybertron::proto::RunMode::MODE_SIMULATION)
? false
: true;
}
GlobalData::~GlobalData() {}
......@@ -55,6 +60,8 @@ const std::string& GlobalData::HostIp() const { return host_ip_; }
const std::string& GlobalData::HostName() const { return host_name_; }
bool GlobalData::IsRealityMode() const { return is_reality_mode_; }
void GlobalData::InitHostInfo() {
char host_name[1024];
gethostname(host_name, 1024);
......
......@@ -51,6 +51,8 @@ class GlobalData {
const CyberConfig& Config() const;
bool IsRealityMode() const;
static uint64_t RegisterNode(const std::string& node_name);
static const std::string& GetNodeById(uint64_t id);
......@@ -78,6 +80,9 @@ class GlobalData {
int process_id_;
std::string process_name_;
// run mode
bool is_reality_mode_;
static AtomicHashMap<uint64_t, std::string, 512> node_id_map_;
static AtomicHashMap<uint64_t, std::string, 256> channel_id_map_;
static AtomicHashMap<uint64_t, std::string, 256> service_id_map_;
......
......@@ -27,6 +27,7 @@
#include "cybertron/component/component_base.h"
#include "cybertron/croutine/routine_factory.h"
#include "cybertron/data/data_visitor.h"
#include "cybertron/proto/run_mode_conf.pb.h"
#include "cybertron/scheduler/scheduler.h"
namespace apollo {
......@@ -135,20 +136,15 @@ bool Component<M0, NullType, NullType, NullType>::Initialize(
AERROR << "Component Init() failed.";
return false;
}
bool is_reality_mode = GlobalData::Instance()->IsRealityMode();
RoleAttributes attr;
attr.set_node_name(config.name());
attr.set_channel_name(config.readers(0).channel());
auto qos_profile = attr.mutable_qos_profile();
*qos_profile = config.readers(0).qos_profile();
auto reader = node_->CreateReader<M0>(attr);
if (reader == nullptr) {
AERROR << "Component create reader failed.";
return false;
}
auto dv = std::make_shared<data::DataVisitor<M0>>(reader);
readers_.emplace_back(std::move(reader));
auto sched = scheduler::Scheduler::Instance();
std::weak_ptr<Component<M0>> self =
std::dynamic_pointer_cast<Component<M0>>(shared_from_this());
auto func = [self](const std::shared_ptr<M0>& msg) {
......@@ -160,8 +156,28 @@ bool Component<M0, NullType, NullType, NullType>::Initialize(
}
};
std::shared_ptr<Reader<M0>> reader = nullptr;
if (is_reality_mode) {
reader = node_->CreateReader<M0>(attr);
} else {
reader = node_->CreateReader<M0>(attr, func);
}
if (reader == nullptr) {
AERROR << "Component create reader failed.";
return false;
}
readers_.emplace_back(std::move(reader));
if (!is_reality_mode) {
return true;
}
auto dv = std::make_shared<data::DataVisitor<M0>>(readers_[0]);
croutine::RoutineFactory factory =
croutine::CreateRoutineFactory<M0>(func, dv);
auto sched = scheduler::Scheduler::Instance();
return sched->CreateTask(factory, node_->Name());
}
......@@ -187,12 +203,39 @@ bool Component<M0, M1, NullType, NullType>::Initialize(
return false;
}
bool is_reality_mode = GlobalData::Instance()->IsRealityMode();
RoleAttributes attr;
attr.set_node_name(config.name());
attr.set_channel_name(config.readers(0).channel());
auto reader0 = node_->template CreateReader<M0>(attr);
attr.set_channel_name(config.readers(1).channel());
auto reader1 = node_->template CreateReader<M1>(attr);
attr.set_channel_name(config.readers(0).channel());
std::shared_ptr<Reader<M0>> reader0 = nullptr;
if (is_reality_mode) {
reader0 = node_->template CreateReader<M0>(attr);
} else {
std::weak_ptr<Component<M0, M1>> self =
std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());
auto message1 = dispatcher::Dispatcher::Instance()->GetMessage<M1>(
config.readers(1).channel());
auto func = [self, message1](const std::shared_ptr<M0>& msg0) {
auto ptr = self.lock();
if (ptr) {
if (!message1->IsPublishedEmpty()) {
auto msg1 = message1->GetLatestPublishedPtr();
ptr->Process(msg0, msg1);
}
} else {
AERROR << "Component object has been destroyed.";
}
};
reader0 = node_->template CreateReader<M0>(attr, func);
}
if (reader0 == nullptr || reader1 == nullptr) {
AERROR << "Component create reader failed.";
return false;
......@@ -200,6 +243,10 @@ bool Component<M0, M1, NullType, NullType>::Initialize(
readers_.push_back(std::move(reader0));
readers_.push_back(std::move(reader1));
if (!is_reality_mode) {
return true;
}
auto sched = scheduler::Scheduler::Instance();
std::weak_ptr<Component<M0, M1>> self =
std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());
......@@ -242,14 +289,45 @@ bool Component<M0, M1, M2, NullType>::Initialize(
return false;
}
bool is_reality_mode = GlobalData::Instance()->IsRealityMode();
RoleAttributes attr;
attr.set_node_name(config.name());
attr.set_channel_name(config.readers(0).channel());
auto reader0 = node_->template CreateReader<M0>(attr);
attr.set_channel_name(config.readers(1).channel());
auto reader1 = node_->template CreateReader<M1>(attr);
attr.set_channel_name(config.readers(2).channel());
auto reader2 = node_->template CreateReader<M2>(attr);
attr.set_channel_name(config.readers(0).channel());
std::shared_ptr<Reader<M0>> reader0 = nullptr;
if (is_reality_mode) {
reader0 = node_->template CreateReader<M0>(attr);
} else {
std::weak_ptr<Component<M0, M1, M2, NullType>> self =
std::dynamic_pointer_cast<Component<M0, M1, M2, NullType>>(
shared_from_this());
auto message1 = dispatcher::Dispatcher::Instance()->GetMessage<M1>(
config.readers(1).channel());
auto message2 = dispatcher::Dispatcher::Instance()->GetMessage<M2>(
config.readers(2).channel());
auto func = [self, message1, message2](const std::shared_ptr<M0>& msg0) {
auto ptr = self.lock();
if (ptr) {
if (!message1->IsPublishedEmpty() && !message2->IsPublishedEmpty()) {
auto msg1 = message1->GetLatestPublishedPtr();
auto msg2 = message2->GetLatestPublishedPtr();
ptr->Process(msg0, msg1, msg2);
}
} else {
AERROR << "Component object has been destroyed.";
}
};
reader0 = node_->template CreateReader<M0>(attr, func);
}
if (reader0 == nullptr || reader1 == nullptr || reader2 == nullptr) {
AERROR << "Component create reader failed.";
return false;
......@@ -258,6 +336,10 @@ bool Component<M0, M1, M2, NullType>::Initialize(
readers_.push_back(std::move(reader1));
readers_.push_back(std::move(reader2));
if (!is_reality_mode) {
return true;
}
auto sched = scheduler::Scheduler::Instance();
std::weak_ptr<Component<M0, M1, M2, NullType>> self =
std::dynamic_pointer_cast<Component<M0, M1, M2, NullType>>(
......@@ -302,16 +384,52 @@ bool Component<M0, M1, M2, M3>::Initialize(const ComponentConfig& config) {
return false;
}
bool is_reality_mode = GlobalData::Instance()->IsRealityMode();
RoleAttributes attr;
attr.set_node_name(config.name());
attr.set_channel_name(config.readers(0).channel());
auto reader0 = node_->template CreateReader<M0>(attr);
attr.set_channel_name(config.readers(1).channel());
auto reader1 = node_->template CreateReader<M1>(attr);
attr.set_channel_name(config.readers(2).channel());
auto reader2 = node_->template CreateReader<M2>(attr);
attr.set_channel_name(config.readers(3).channel());
auto reader3 = node_->template CreateReader<M3>(attr);
attr.set_channel_name(config.readers(0).channel());
std::shared_ptr<Reader<M0>> reader0 = nullptr;
if (is_reality_mode) {
reader0 = node_->template CreateReader<M0>(attr);
} else {
std::weak_ptr<Component<M0, M1, M2, M3>> self =
std::dynamic_pointer_cast<Component<M0, M1, M2, M3>>(
shared_from_this());
auto message1 = dispatcher::Dispatcher::Instance()->GetMessage<M1>(
config.readers(1).channel());
auto message2 = dispatcher::Dispatcher::Instance()->GetMessage<M2>(
config.readers(2).channel());
auto message3 = dispatcher::Dispatcher::Instance()->GetMessage<M3>(
config.readers(3).channel());
auto func = [self, message1, message2,
message3](const std::shared_ptr<M0>& msg0) {
auto ptr = self.lock();
if (ptr) {
if (!message1->IsPublishedEmpty() && !message2->IsPublishedEmpty() &&
!message3->IsPublishedEmpty()) {
auto msg1 = message1->GetLatestPublishedPtr();
auto msg2 = message2->GetLatestPublishedPtr();
auto msg3 = message3->GetLatestPublishedPtr();
ptr->Process(msg0, msg1, msg2, msg3);
}
} else {
AERROR << "Component object has been destroyed.";
}
};
reader0 = node_->template CreateReader<M0>(attr, func);
}
if (reader0 == nullptr || reader1 == nullptr || reader2 == nullptr ||
reader3 == nullptr) {
AERROR << "Component create reader failed." << std::endl;
......@@ -322,6 +440,10 @@ bool Component<M0, M1, M2, M3>::Initialize(const ComponentConfig& config) {
readers_.push_back(std::move(reader2));
readers_.push_back(std::move(reader3));
if (!is_reality_mode) {
return true;
}
auto sched = scheduler::Scheduler::Instance();
std::weak_ptr<Component<M0, M1, M2, M3>> self =
std::dynamic_pointer_cast<Component<M0, M1, M2, M3>>(shared_from_this());
......
......@@ -105,3 +105,7 @@ transport_conf {
max_history_depth: 1000
}
}
run_mode_conf {
run_mode: MODE_REALITY
}
......@@ -15,13 +15,20 @@
*****************************************************************************/
#include "cybertron/cybertron.h"
#include "cybertron/common/global_data.h"
#include "cybertron/proto/run_mode_conf.pb.h"
namespace apollo {
namespace cybertron {
using apollo::cybertron::common::GlobalData;
using apollo::cybertron::proto::RunMode;
std::unique_ptr<Node> CreateNode(const std::string& node_name,
const std::string& name_space) {
if (!OK()) {
static bool is_reality_mode = GlobalData::Instance()->IsRealityMode();
if (is_reality_mode && !OK()) {
// TODO @nizhongjun
// add some hint log
AERROR << "cybertron has not inited.";
......
project(cybertron_dispatcher)
aux_source_directory(${PROJECT_SOURCE_DIR} DISPATCHER_SRCS)
add_library(${PROJECT_NAME} ${DISPATCHER_SRCS})
install(TARGETS cybertron_dispatcher LIBRARY DESTINATION lib)
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#include "cybertron/dispatcher/dispatcher.h"
namespace apollo {
namespace cybertron {
namespace dispatcher {
Dispatcher::Dispatcher() {}
Dispatcher::~Dispatcher() { messages_.clear(); }
void Dispatcher::Observe() {
std::lock_guard<std::mutex> lock(msg_mutex_);
for (auto& item : messages_) {
item.second->Observe();
}
}
} // namespace dispatcher
} // namespace cybertron
} // namespace apollo
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef CYBERTRON_DISPATCHER_DISPATCHER_H_
#define CYBERTRON_DISPATCHER_DISPATCHER_H_
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include "cybertron/dispatcher/message.h"
namespace apollo {
namespace cybertron {
namespace dispatcher {
class Dispatcher {
public:
using MessageMap =
std::unordered_map<std::string, std::shared_ptr<MessageBase>>;
virtual ~Dispatcher();
static const std::shared_ptr<Dispatcher>& Instance() {
static auto instance = std::shared_ptr<Dispatcher>(new Dispatcher());
return instance;
}
template <typename T>
bool Publish(const std::string& channel_name,
const typename Message<T>::MessagePtr& msg);
template <typename T>
bool Publish(const std::string& channel_name,
const typename Message<T>::MessageType& msg);
template <typename T>
bool Subscribe(const std::string& channel_name, size_t capacity,
const std::string& callback_id,
const typename Message<T>::Callback& callback);
template <typename T>
bool Unsubscribe(const std::string& channel_name,
const std::string& callback_id);
template <typename T>
std::shared_ptr<Message<T>> GetMessage(const std::string& channel_name);
template <typename T>
std::shared_ptr<Message<T>> GetOrCreateMessage(const MessageAttr& attr);
void Observe();
private:
Dispatcher();
Dispatcher(const Dispatcher&) = delete;
Dispatcher& operator=(const Dispatcher&) = delete;
MessageMap messages_;
std::mutex msg_mutex_;
};
template <typename T>
bool Dispatcher::Publish(const std::string& channel_name,
const typename Message<T>::MessagePtr& msg) {
auto message = GetOrCreateMessage<T>(MessageAttr(channel_name));
if (message == nullptr) {
return false;
}
message->Publish(msg);
return true;
}
template <typename T>
bool Dispatcher::Publish(const std::string& channel_name,
const typename Message<T>::MessageType& msg) {
auto message = GetOrCreateMessage<T>(MessageAttr(channel_name));
if (message == nullptr) {
return false;
}
message->Publish(msg);
return true;
}
template <typename T>
bool Dispatcher::Subscribe(const std::string& channel_name, size_t capacity,
const std::string& callback_id,
const typename Message<T>::Callback& callback) {
auto message = GetOrCreateMessage<T>(MessageAttr(capacity, channel_name));
if (message == nullptr) {
return false;
}
return message->Subscribe(callback_id, callback);
}
template <typename T>
bool Dispatcher::Unsubscribe(const std::string& channel_name,
const std::string& callback_id) {
auto message = GetMessage<T>(channel_name);
if (message == nullptr) {
return false;
}
return message->Unsubscribe(callback_id);
}
template <typename T>
std::shared_ptr<Message<T>> Dispatcher::GetMessage(
const std::string& channel_name) {
std::shared_ptr<Message<T>> message = nullptr;
{
std::lock_guard<std::mutex> lock(msg_mutex_);
auto search = messages_.find(channel_name);
if (search != messages_.end()) {
message = std::dynamic_pointer_cast<Message<T>>(search->second);
}
}
return message;
}
template <typename T>
std::shared_ptr<Message<T>> Dispatcher::GetOrCreateMessage(
const MessageAttr& attr) {
std::shared_ptr<Message<T>> message = nullptr;
{
std::lock_guard<std::mutex> lock(msg_mutex_);
auto search = messages_.find(attr.channel_name);
if (search != messages_.end()) {
message = std::dynamic_pointer_cast<Message<T>>(search->second);
} else {
message = std::make_shared<Message<T>>(attr);
messages_[attr.channel_name] = message;
}
}
return message;
}
} // namespace dispatcher
} // namespace cybertron
} // namespace apollo
#endif // CYBERTRON_DISPATCHER_DISPATCHER_H_
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef CYBERTRON_DISPATCHER_INTRA_READER_H_
#define CYBERTRON_DISPATCHER_INTRA_READER_H_
#include <functional>
#include <memory>
#include "cybertron/dispatcher/dispatcher.h"
#include "cybertron/node/reader.h"
namespace apollo {
namespace cybertron {
namespace dispatcher {
template <typename MessageT>
class IntraReader : public apollo::cybertron::Reader<MessageT> {
public:
using MessagePtr = std::shared_ptr<MessageT>;
using Callback = std::function<void(const std::shared_ptr<MessageT>&)>;
using Iterator =
typename std::list<std::shared_ptr<MessageT>>::const_iterator;
IntraReader(const proto::RoleAttributes& attr, const Callback& callback);
virtual ~IntraReader();
bool Init() override;
void Shutdown() override;
void ClearData() override;
void Observe() override;
bool Empty() const override;
bool HasReceived() const override;
void Enqueue(const std::shared_ptr<MessageT>& msg) override;
void SetHistoryDepth(const uint32_t& depth) override;
uint32_t GetHistoryDepth() const override;
const std::shared_ptr<MessageT>& GetLatestObserved() const override;
const std::shared_ptr<MessageT>& GetOldestObserved() const override;
Iterator Begin() const override;
Iterator End() const override;
private:
void OnMessage(const MessagePtr& msg_ptr);
Callback msg_callback_;
std::shared_ptr<Message<MessageT>> message_;
};
template <typename MessageT>
IntraReader<MessageT>::IntraReader(const proto::RoleAttributes& attr,
const Callback& callback)
: Reader<MessageT>(attr), msg_callback_(callback), message_(nullptr) {}
template <typename MessageT>
IntraReader<MessageT>::~IntraReader() {
Shutdown();
}
template <typename MessageT>
bool IntraReader<MessageT>::Init() {
if (this->init_.exchange(true)) {
return true;
}
MessageAttr attr(this->role_attr_.qos_profile().depth(),
this->role_attr_.channel_name());
message_ = Dispatcher::Instance()->GetOrCreateMessage<MessageT>(attr);
if (message_ == nullptr) {
return false;
}
return message_->Subscribe(this->role_attr_.node_name(),
std::bind(&IntraReader<MessageT>::OnMessage, this,
std::placeholders::_1));
}
template <typename MessageT>
void IntraReader<MessageT>::Shutdown() {
if (!this->init_.exchange(false)) {
return;
}
message_->Unsubscribe(this->role_attr_.node_name());
message_ = nullptr;
}
template <typename MessageT>
void IntraReader<MessageT>::ClearData() {
if (message_ == nullptr) {
return;
}
message_->ClearPublished();
}
template <typename MessageT>
void IntraReader<MessageT>::Observe() {
if (message_ == nullptr) {
return;
}
message_->Observe();
}
template <typename MessageT>
bool IntraReader<MessageT>::Empty() const {
if (message_ == nullptr) {
return true;
}
return message_->IsObservedEmpty();
}
template <typename MessageT>
bool IntraReader<MessageT>::HasReceived() const {
if (message_ == nullptr) {
return false;
}
return !message_->IsPublishedEmpty();
}
template <typename MessageT>
void IntraReader<MessageT>::Enqueue(const std::shared_ptr<MessageT>& msg) {
if (message_ == nullptr) {
return;
}
message_->Publish(msg);
}
template <typename MessageT>
void IntraReader<MessageT>::SetHistoryDepth(const uint32_t& depth) {
if (message_ == nullptr) {
return;
}
message_->set_capacity(depth);
}
template <typename MessageT>
uint32_t IntraReader<MessageT>::GetHistoryDepth() const {
if (message_ == nullptr) {
return 0;
}
return message_->capacity();
}
template <typename MessageT>
const std::shared_ptr<MessageT>& IntraReader<MessageT>::GetLatestObserved()
const {
if (message_ == nullptr) {
return nullptr;
}
return message_->GetLatestObservedPtr();
}
template <typename MessageT>
const std::shared_ptr<MessageT>& IntraReader<MessageT>::GetOldestObserved()
const {
if (message_ == nullptr) {
return nullptr;
}
return message_->GetOldestObservedPtr();
}
template <typename MessageT>
auto IntraReader<MessageT>::Begin() const -> Iterator {
assert(message_ != nullptr);
return message_->ObservedBegin();
}
template <typename MessageT>
auto IntraReader<MessageT>::End() const -> Iterator {
assert(message_ != nullptr);
return message_->ObservedEnd();
}
template <typename MessageT>
void IntraReader<MessageT>::OnMessage(const MessagePtr& msg_ptr) {
if (msg_callback_ != nullptr) {
msg_callback_(msg_ptr);
}
}
} // namespace dispatcher
} // namespace cybertron
} // namespace apollo
#endif // CYBERTRON_DISPATCHER_INTRA_READER_H_
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef CYBERTRON_DISPATCHER_INTRA_WRITER_H_
#define CYBERTRON_DISPATCHER_INTRA_WRITER_H_
#include <memory>
#include "cybertron/dispatcher/dispatcher.h"
#include "cybertron/node/writer.h"
namespace apollo {
namespace cybertron {
namespace dispatcher {
template <typename MessageT>
class IntraWriter : public apollo::cybertron::Writer<MessageT> {
public:
using MessagePtr = std::shared_ptr<MessageT>;
using DispatcherPtr = std::shared_ptr<dispatcher::Dispatcher>;
IntraWriter(const proto::RoleAttributes& attr);
virtual ~IntraWriter();
bool Init() override;
void Shutdown() override;
bool Write(const MessageT& msg) override;
bool Write(const MessagePtr& msg_ptr) override;
private:
DispatcherPtr dispatcher_;
};
template <typename MessageT>
IntraWriter<MessageT>::IntraWriter(const proto::RoleAttributes& attr)
: Writer<MessageT>(attr) {
dispatcher_ = dispatcher::Dispatcher::Instance();
}
template <typename MessageT>
IntraWriter<MessageT>::~IntraWriter() {
Shutdown();
}
template <typename MessageT>
bool IntraWriter<MessageT>::Init() {
this->init_.exchange(true);
return true;
}
template <typename MessageT>
void IntraWriter<MessageT>::Shutdown() {
this->init_.exchange(false);
}
template <typename MessageT>
bool IntraWriter<MessageT>::Write(const MessageT& msg) {
if (!this->init_.load()) {
return false;
}
return Write(std::make_shared<MessageT>(msg));
}
template <typename MessageT>
bool IntraWriter<MessageT>::Write(const MessagePtr& msg_ptr) {
if (!this->init_.load()) {
return false;
}
return dispatcher_->Publish<MessageT>(this->role_attr_.channel_name(),
msg_ptr);
}
} // namespace dispatcher
} // namespace cybertron
} // namespace apollo
#endif // CYBERTRON_DISPATCHER_INTRA_WRITER_H_
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef CYBERTRON_DISPATCHER_MESSAGE_H_
#define CYBERTRON_DISPATCHER_MESSAGE_H_
#include <assert.h>
#include <stddef.h>
#include <functional>
#include <list>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>
namespace apollo {
namespace cybertron {
namespace dispatcher {
class MessageBase {
public:
virtual ~MessageBase() = default;
virtual void ClearPublished() = 0;
virtual void Observe() = 0;
virtual bool IsObservedEmpty() const = 0;
virtual bool IsPublishedEmpty() const = 0;
virtual bool Unsubscribe(const std::string& callback_id) = 0;
virtual size_t capacity() const = 0;
virtual void set_capacity(size_t capacity) = 0;
virtual const std::string& channel_name() const = 0;
};
struct MessageAttr {
MessageAttr() : capacity(10), channel_name("") {}
MessageAttr(const std::string& channel)
: capacity(10), channel_name(channel) {}
MessageAttr(size_t cap, const std::string& channel)
: capacity(cap), channel_name(channel) {}
MessageAttr(const MessageAttr& attr)
: capacity(attr.capacity), channel_name(attr.channel_name) {}
size_t capacity;
std::string channel_name;
};
template <typename T>
class Message : public MessageBase {
public:
using MessageType = T;
using MessagePtr = std::shared_ptr<T>;
using MessageQueue = std::list<MessagePtr>;
using Callback = std::function<void(const MessagePtr&)>;
using CallbackMap = std::unordered_map<std::string, Callback>;
using Iterator = typename std::list<std::shared_ptr<T>>::const_iterator;
Message(const MessageAttr& attr);
virtual ~Message();
void Publish(const MessageType& msg);
void Publish(const MessagePtr& msg);
void ClearPublished() override;
void Observe() override;
bool IsObservedEmpty() const override;
bool IsPublishedEmpty() const override;
bool Subscribe(const std::string& callback_id, const Callback& callback);
bool Unsubscribe(const std::string& callback_id) override;
const MessageType& GetLatestObserved() const;
const MessagePtr GetLatestObservedPtr() const;
const MessagePtr GetOldestObservedPtr() const;
const MessagePtr GetLatestPublishedPtr() const;
Iterator ObservedBegin() const;
Iterator ObservedEnd() const;
size_t capacity() const override;
void set_capacity(size_t capacity) override;
const std::string& channel_name() const override;
private:
void Enqueue(const MessagePtr& msg);
void Notify(const MessagePtr& msg);
bool is_full_;
MessageAttr attr_;
MessageQueue observed_msg_queue_;
MessageQueue published_msg_queue_;
mutable std::mutex msg_mutex_;
CallbackMap published_callbacks_;
mutable std::mutex cb_mutex_;
};
template <typename T>
Message<T>::Message(const MessageAttr& attr) : is_full_(false), attr_(attr) {}
template <typename T>
Message<T>::~Message() {
published_msg_queue_.clear();
observed_msg_queue_.clear();
published_callbacks_.clear();
}
template <typename T>
void Message<T>::Publish(const MessageType& msg) {
Publish(std::make_shared<MessageType>(msg));
}
template <typename T>
void Message<T>::Publish(const MessagePtr& msg) {
Enqueue(msg);
Notify(msg);
}
template <typename T>
void Message<T>::ClearPublished() {
std::lock_guard<std::mutex> lock(msg_mutex_);
published_msg_queue_.clear();
}
template <typename T>
void Message<T>::Observe() {
std::lock_guard<std::mutex> lock(msg_mutex_);
observed_msg_queue_ = published_msg_queue_;
}
template <typename T>
bool Message<T>::IsObservedEmpty() const {
std::lock_guard<std::mutex> lock(msg_mutex_);
return observed_msg_queue_.empty();
}
template <typename T>
bool Message<T>::IsPublishedEmpty() const {
std::lock_guard<std::mutex> lock(msg_mutex_);
return published_msg_queue_.empty();
}
template <typename T>
bool Message<T>::Subscribe(const std::string& callback_id,
const Callback& callback) {
std::lock_guard<std::mutex> lock(cb_mutex_);
if (published_callbacks_.find(callback_id) != published_callbacks_.end()) {
return false;
}
published_callbacks_[callback_id] = callback;
return true;
}
template <typename T>
bool Message<T>::Unsubscribe(const std::string& callback_id) {
std::lock_guard<std::mutex> lock(cb_mutex_);
return published_callbacks_.erase(callback_id) != 0;
}
template <typename T>
auto Message<T>::GetLatestObserved() const -> const MessageType& {
std::lock_guard<std::mutex> lock(msg_mutex_);
assert(!observed_msg_queue_.empty());
return *observed_msg_queue_.back();
}
template <typename T>
auto Message<T>::GetLatestObservedPtr() const -> const MessagePtr {
std::lock_guard<std::mutex> lock(msg_mutex_);
assert(!observed_msg_queue_.empty());
return observed_msg_queue_.back();
}
template <typename T>
auto Message<T>::GetOldestObservedPtr() const -> const MessagePtr {
std::lock_guard<std::mutex> lock(msg_mutex_);
assert(!observed_msg_queue_.empty());
return observed_msg_queue_.front();
}
template <typename T>
auto Message<T>::GetLatestPublishedPtr() const -> const MessagePtr {
std::lock_guard<std::mutex> lock(msg_mutex_);
assert(!published_msg_queue_.empty());
return published_msg_queue_.back();
}
template <typename T>
auto Message<T>::ObservedBegin() const -> Iterator {
return observed_msg_queue_.begin();
}
template <typename T>
auto Message<T>::ObservedEnd() const -> Iterator {
return observed_msg_queue_.end();
}
template <typename T>
size_t Message<T>::capacity() const {
return attr_.capacity;
}
template <typename T>
void Message<T>::set_capacity(size_t capacity) {
std::lock_guard<std::mutex> lock(msg_mutex_);
if (capacity > attr_.capacity) {
is_full_ = false;
}
attr_.capacity = capacity;
while (published_msg_queue_.size() > capacity) {
published_msg_queue_.pop_front();
}
}
template <typename T>
const std::string& Message<T>::channel_name() const {
return attr_.channel_name;
}
template <typename T>
void Message<T>::Enqueue(const MessagePtr& msg) {
if (attr_.capacity == 0) {
return;
}
std::lock_guard<std::mutex> lock(msg_mutex_);
if (is_full_) {
published_msg_queue_.pop_front();
}
published_msg_queue_.push_back(msg);
if (!is_full_) {
if (published_msg_queue_.size() >= attr_.capacity) {
is_full_ = true;
}
}
}
template <typename T>
void Message<T>::Notify(const MessagePtr& msg) {
std::lock_guard<std::mutex> lock(cb_mutex_);
for (const auto& item : published_callbacks_) {
item.second(msg);
}
}
} // namespace dispatcher
} // namespace cybertron
} // namespace apollo
#endif // CYBERTRON_DISPATCHER_MESSAGE_H_
......@@ -31,6 +31,17 @@ struct RawMessage {
explicit RawMessage(const std::string &data) { message = data; }
RawMessage(const RawMessage &raw_msg)
: message(raw_msg.message), timestamp(raw_msg.timestamp) {}
RawMessage &operator=(const RawMessage &raw_msg) {
if (this != &raw_msg) {
this->message = raw_msg.message;
this->timestamp = raw_msg.timestamp;
}
return *this;
}
~RawMessage() {}
bool SerializeToString(std::string *str) const {
......@@ -52,10 +63,6 @@ struct RawMessage {
std::string message;
uint64_t timestamp;
private:
RawMessage(const RawMessage &) = delete;
RawMessage &operator=(const RawMessage &) = delete;
};
} // namespace message
......
......@@ -27,19 +27,9 @@ Node::Node(const std::string& node_name, const std::string& name_space)
: node_name_(node_name), name_space_(name_space) {
node_channel_impl_.reset(new NodeChannelImpl(node_name));
node_service_impl_.reset(new NodeServiceImpl(node_name));
attr_.set_host_name(common::GlobalData::Instance()->HostName());
attr_.set_process_id(common::GlobalData::Instance()->ProcessId());
attr_.set_node_name(node_name);
uint64_t node_id = common::GlobalData::RegisterNode(node_name);
attr_.set_node_id(node_id);
node_manager_ =
service_discovery::TopologyManager::Instance()->node_manager();
node_manager_->Join(attr_, RoleType::ROLE_NODE);
}
Node::~Node() { node_manager_->Leave(attr_, RoleType::ROLE_NODE); }
Node::~Node() {}
const std::string& Node::Name() const { return node_name_; }
......
......@@ -83,14 +83,14 @@ class Node {
auto CreateReader(const proto::RoleAttributes& role_attr)
-> std::shared_ptr<Reader<MessageT>>;
std::unique_ptr<NodeChannelImpl> node_channel_impl_ = nullptr;
std::unique_ptr<NodeServiceImpl> node_service_impl_ = nullptr;
std::shared_ptr<service_discovery::NodeManager> node_manager_ = nullptr;
std::string node_name_;
std::string name_space_;
proto::RoleAttributes attr_;
std::mutex readers_mutex_;
std::map<std::string, std::shared_ptr<ReaderBase>> readers_;
std::unique_ptr<NodeChannelImpl> node_channel_impl_ = nullptr;
std::unique_ptr<NodeServiceImpl> node_service_impl_ = nullptr;
};
template <typename MessageT>
......
......@@ -20,27 +20,50 @@
#include <memory>
#include <string>
#include "cybertron/common/global_data.h"
#include "cybertron/dispatcher/intra_reader.h"
#include "cybertron/dispatcher/intra_writer.h"
#include "cybertron/message/message_traits.h"
#include "cybertron/node/reader.h"
#include "cybertron/node/writer.h"
#include "cybertron/proto/run_mode_conf.pb.h"
namespace apollo {
namespace cybertron {
class Node;
class NodeChannelImpl {
public:
friend class Node;
public:
using NodeManagerPtr = std::shared_ptr<service_discovery::NodeManager>;
explicit NodeChannelImpl(const std::string& node_name)
: node_name_(node_name) {
// TODO: topology register node
: is_reality_mode_(true), node_name_(node_name) {
node_attr_.set_host_name(common::GlobalData::Instance()->HostName());
node_attr_.set_process_id(common::GlobalData::Instance()->ProcessId());
node_attr_.set_node_name(node_name);
uint64_t node_id = common::GlobalData::RegisterNode(node_name);
node_attr_.set_node_id(node_id);
is_reality_mode_ = common::GlobalData::Instance()->IsRealityMode();
if (is_reality_mode_) {
node_manager_ =
service_discovery::TopologyManager::Instance()->node_manager();
node_manager_->Join(node_attr_, RoleType::ROLE_NODE);
}
}
virtual ~NodeChannelImpl() {
if (is_reality_mode_) {
node_manager_->Leave(node_attr_, RoleType::ROLE_NODE);
node_manager_ = nullptr;
}
}
virtual ~NodeChannelImpl() {}
private:
std::string NodeName() const { return node_name_; }
const std::string& NodeName() const { return node_name_; }
private:
template <typename MessageT>
auto CreateWriter(const proto::RoleAttributes& role_attr)
-> std::shared_ptr<Writer<MessageT>>;
......@@ -63,29 +86,29 @@ class NodeChannelImpl {
auto CreateReader(const proto::RoleAttributes& role_attr)
-> std::shared_ptr<Reader<MessageT>>;
template <typename MessageT>
void FillInAttr(proto::RoleAttributes* attr);
bool is_reality_mode_;
std::string node_name_;
proto::RoleAttributes node_attr_;
NodeManagerPtr node_manager_ = nullptr;
};
template <typename MessageT>
auto NodeChannelImpl::CreateWriter(const proto::RoleAttributes& role_attr)
-> std::shared_ptr<Writer<MessageT>> {
auto channel_id = GlobalData::RegisterChannel(role_attr.channel_name());
auto node_id = GlobalData::RegisterNode(node_name_);
RETURN_VAL_IF(!role_attr.has_channel_name(), nullptr);
proto::RoleAttributes new_attr(role_attr);
new_attr.set_node_name(node_name_);
new_attr.set_node_id(node_id);
new_attr.set_channel_id(channel_id);
if (!new_attr.has_message_type()) {
new_attr.set_message_type(message::MessageType<MessageT>());
}
if (!new_attr.has_proto_desc()) {
std::string proto_desc("");
message::GetDescriptorString<MessageT>(new_attr.message_type(),
&proto_desc);
new_attr.set_proto_desc(proto_desc);
FillInAttr<MessageT>(&new_attr);
std::shared_ptr<Writer<MessageT>> writer_ptr = nullptr;
if (!is_reality_mode_) {
writer_ptr = std::make_shared<dispatcher::IntraWriter<MessageT>>(new_attr);
} else {
writer_ptr = std::make_shared<Writer<MessageT>>(new_attr);
}
std::shared_ptr<Writer<MessageT>> writer_ptr =
std::make_shared<Writer<MessageT>>(new_attr);
RETURN_VAL_IF_NULL(writer_ptr, nullptr);
RETURN_VAL_IF(!writer_ptr->Init(), nullptr);
return writer_ptr;
......@@ -112,17 +135,18 @@ template <typename MessageT>
auto NodeChannelImpl::CreateReader(const proto::RoleAttributes& role_attr,
const CallbackFunc<MessageT>& reader_func)
-> std::shared_ptr<Reader<MessageT>> {
auto channel_id = GlobalData::RegisterChannel(role_attr.channel_name());
auto node_id = GlobalData::RegisterNode(node_name_);
RETURN_VAL_IF(!role_attr.has_channel_name(), nullptr);
proto::RoleAttributes new_attr(role_attr);
new_attr.set_node_name(node_name_);
new_attr.set_node_id(node_id);
new_attr.set_channel_id(channel_id);
if (!new_attr.has_message_type()) {
new_attr.set_message_type(message::MessageType<MessageT>());
FillInAttr<MessageT>(&new_attr);
std::shared_ptr<Reader<MessageT>> reader_ptr = nullptr;
if (!is_reality_mode_) {
reader_ptr = std::make_shared<dispatcher::IntraReader<MessageT>>(
new_attr, reader_func);
} else {
reader_ptr = std::make_shared<Reader<MessageT>>(new_attr, reader_func);
}
std::shared_ptr<Reader<MessageT>> reader_ptr =
std::make_shared<Reader<MessageT>>(new_attr, reader_func);
RETURN_VAL_IF_NULL(reader_ptr, nullptr);
RETURN_VAL_IF(!reader_ptr->Init(), nullptr);
return reader_ptr;
......@@ -131,20 +155,29 @@ auto NodeChannelImpl::CreateReader(const proto::RoleAttributes& role_attr,
template <typename MessageT>
auto NodeChannelImpl::CreateReader(const proto::RoleAttributes& role_attr)
-> std::shared_ptr<Reader<MessageT>> {
auto channel_id = GlobalData::RegisterChannel(role_attr.channel_name());
auto node_id = GlobalData::RegisterNode(node_name_);
proto::RoleAttributes new_attr(role_attr);
new_attr.set_node_name(node_name_);
new_attr.set_node_id(node_id);
new_attr.set_channel_id(channel_id);
if (!new_attr.has_message_type()) {
new_attr.set_message_type(message::MessageType<MessageT>());
return this->template CreateReader<MessageT>(role_attr, nullptr);
}
template <typename MessageT>
void NodeChannelImpl::FillInAttr(proto::RoleAttributes* attr) {
attr->set_host_name(node_attr_.host_name());
attr->set_process_id(node_attr_.process_id());
attr->set_node_name(node_attr_.node_name());
attr->set_node_id(node_attr_.node_id());
auto channel_id = GlobalData::RegisterChannel(attr->channel_name());
attr->set_channel_id(channel_id);
if (!attr->has_message_type()) {
attr->set_message_type(message::MessageType<MessageT>());
}
if (!attr->has_proto_desc()) {
std::string proto_desc("");
message::GetDescriptorString<MessageT>(attr->message_type(), &proto_desc);
attr->set_proto_desc(proto_desc);
}
if (!attr->has_qos_profile()) {
attr->mutable_qos_profile()->CopyFrom(
transport::QosProfileConf::QOS_PROFILE_DEFAULT);
}
std::shared_ptr<Reader<MessageT>> reader_ptr =
std::make_shared<Reader<MessageT>>(new_attr, nullptr);
RETURN_VAL_IF_NULL(reader_ptr, nullptr);
RETURN_VAL_IF(!reader_ptr->Init(), nullptr);
return reader_ptr;
}
} // namespace cybertron
......
......@@ -62,15 +62,15 @@ class Reader : public ReaderBase {
bool HasReceived() const override;
bool Empty() const override;
void Enqueue(const std::shared_ptr<MessageT>& msg);
void SetHistoryDepth(const uint32_t& depth);
uint32_t GetHistoryDepth() const;
const std::shared_ptr<MessageT>& GetLatestObserved() const;
const std::shared_ptr<MessageT>& GetOldestObserved() const;
Iterator Begin() const { return observed_queue_.begin(); }
Iterator End() const { return observed_queue_.end(); }
protected:
virtual void Enqueue(const std::shared_ptr<MessageT>& msg);
virtual void SetHistoryDepth(const uint32_t& depth);
virtual uint32_t GetHistoryDepth() const;
virtual const std::shared_ptr<MessageT>& GetLatestObserved() const;
virtual const std::shared_ptr<MessageT>& GetOldestObserved() const;
virtual Iterator Begin() const { return observed_queue_.begin(); }
virtual Iterator End() const { return observed_queue_.end(); }
private:
void JoinTheTopology();
void LeaveTheTopology();
void OnChannelChange(const proto::ChangeMsg& change_msg);
......
......@@ -22,7 +22,6 @@
#include <string>
#include <unordered_map>
#include "cybertron/common/global_data.h"
#include "cybertron/common/macros.h"
#include "cybertron/common/util.h"
#include "cybertron/event/perf_event_cache.h"
......@@ -38,14 +37,7 @@ using apollo::cybertron::event::TransPerf;
class ReaderBase {
public:
explicit ReaderBase(const proto::RoleAttributes& role_attr)
: role_attr_(role_attr), init_(false) {
if (!role_attr_.has_host_name()) {
role_attr_.set_host_name(common::GlobalData::Instance()->HostName());
}
if (!role_attr_.has_process_id()) {
role_attr_.set_process_id(common::GlobalData::Instance()->ProcessId());
}
}
: role_attr_(role_attr), init_(false) {}
virtual ~ReaderBase() {}
virtual bool Init() = 0;
......
......@@ -42,10 +42,10 @@ class Writer : public WriterBase {
bool Init() override;
void Shutdown() override;
bool Write(const MessageT& msg);
bool Write(const std::shared_ptr<MessageT>& msg_ptr);
virtual bool Write(const MessageT& msg);
virtual bool Write(const std::shared_ptr<MessageT>& msg_ptr);
protected:
private:
void JoinTheTopology();
void LeaveTheTopology();
void OnChannelChange(const proto::ChangeMsg& change_msg);
......
......@@ -20,9 +20,7 @@
#include <atomic>
#include <string>
#include "cybertron/common/global_data.h"
#include "cybertron/proto/role_attributes.pb.h"
#include "cybertron/transport/common/identity.h"
namespace apollo {
namespace cybertron {
......@@ -31,12 +29,7 @@ class WriterBase {
public:
explicit WriterBase(const proto::RoleAttributes& role_attr)
: role_attr_(role_attr), init_(false) {
if (!role_attr_.has_host_name()) {
role_attr_.set_host_name(common::GlobalData::Instance()->HostName());
}
if (!role_attr_.has_process_id()) {
role_attr_.set_process_id(common::GlobalData::Instance()->ProcessId());
}
}
virtual ~WriterBase() {}
......
......@@ -7,6 +7,7 @@ import "scheduler_conf.proto";
import "transport_conf.proto";
import "log_conf.proto";
import "perf_conf.proto";
import "run_mode_conf.proto";
message CyberConfig {
optional LogConf log_conf = 1;
......@@ -14,4 +15,5 @@ message CyberConfig {
optional SchedulerConf scheduler_conf = 3;
optional TransportConf transport_conf = 4;
optional PerfConf perf_conf = 5;
optional RunModeConf run_mode_conf = 6;
}
syntax = "proto2";
package apollo.cybertron.proto;
enum RunMode {
MODE_REALITY = 0;
MODE_SIMULATION = 1;
}
message RunModeConf {
required RunMode run_mode = 1 [default = MODE_REALITY];
}
......@@ -21,3 +21,4 @@ add_subdirectory(tools)
add_subdirectory(record)
add_subdirectory(component)
add_subdirectory(mainboard)
add_subdirectory(dispatcher)
project(dispatcher_test)
add_executable(dispatcher_dispatcher_test
dispatcher_test.cpp
)
target_link_libraries(dispatcher_dispatcher_test
cybertron
cybertron_dispatcher
gtest
)
add_executable(dispatcher_message_test
message_test.cpp
)
target_link_libraries(dispatcher_message_test
cybertron
cybertron_dispatcher
gtest
)
install(TARGETS
dispatcher_dispatcher_test
dispatcher_message_test
DESTINATION ${UNIT_TEST_INSTALL_PREFIX}
)
\ No newline at end of file
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#include "gtest/gtest.h"
#include "cybertron/dispatcher/dispatcher.h"
#include "cybertron/proto/unit_test.pb.h"
namespace apollo {
namespace cybertron {
namespace dispatcher {
using apollo::cybertron::proto::UnitTest;
TEST(DispatcherTest, constructor) {
Dispatcher dispatcher;
auto msg = dispatcher.GetMessage<UnitTest>("channel");
EXPECT_EQ(msg, nullptr);
}
TEST(DispatcherTest, publish) {
Dispatcher dispatcher;
auto msg1 = std::make_shared<UnitTest>();
msg1->set_class_name("MessageTest");
msg1->set_case_name("publish_1");
dispatcher.Publish<UnitTest>("channel1", msg1);
auto message1 = dispatcher.GetMessage<UnitTest>("channel1");
EXPECT_NE(message1, nullptr);
EXPECT_FALSE(message1->IsPublishedEmpty());
UnitTest msg2;
msg2.set_class_name("MessageTest");
msg2.set_case_name("publish_2");
dispatcher.Publish<UnitTest>("channel2", msg2);
auto message2 = dispatcher.GetMessage<UnitTest>("channel2");
EXPECT_NE(message2, nullptr);
EXPECT_FALSE(message2->IsPublishedEmpty());
EXPECT_TRUE(message1->IsObservedEmpty());
EXPECT_TRUE(message2->IsObservedEmpty());
dispatcher.Observe();
EXPECT_FALSE(message1->IsObservedEmpty());
EXPECT_FALSE(message2->IsObservedEmpty());
}
TEST(DispatcherTest, subscribe) {
Dispatcher dispatcher;
auto received_msg = std::make_shared<UnitTest>();
bool res = dispatcher.Subscribe<UnitTest>(
"channel", 10, "DispatcherTest",
[&received_msg](const std::shared_ptr<UnitTest>& msg) {
received_msg->CopyFrom(*msg);
});
EXPECT_TRUE(res);
auto msg1 = std::make_shared<UnitTest>();
msg1->set_class_name("MessageTest");
msg1->set_case_name("publish_1");
dispatcher.Publish<UnitTest>("channel", msg1);
EXPECT_EQ(received_msg->class_name(), msg1->class_name());
EXPECT_EQ(received_msg->case_name(), msg1->case_name());
res = dispatcher.Unsubscribe<UnitTest>("channel", "DispatcherTest");
EXPECT_TRUE(res);
res = dispatcher.Unsubscribe<UnitTest>("channel", "DispatcherTest");
EXPECT_FALSE(res);
res = dispatcher.Unsubscribe<UnitTest>("channel", "DispatcherTest_11");
EXPECT_FALSE(res);
res = dispatcher.Unsubscribe<UnitTest>("channel_11", "DispatcherTest");
EXPECT_FALSE(res);
}
} // namespace dispatcher
} // namespace cybertron
} // namespace apollo
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#include "gtest/gtest.h"
#include "cybertron/dispatcher/message.h"
#include "cybertron/proto/unit_test.pb.h"
namespace apollo {
namespace cybertron {
namespace dispatcher {
using apollo::cybertron::proto::UnitTest;
TEST(MessageTest, constructor) {
MessageAttr attr(10, "channel");
Message<UnitTest> message(attr);
EXPECT_EQ(message.capacity(), 10);
EXPECT_EQ(message.channel_name(), "channel");
message.set_capacity(20);
EXPECT_EQ(message.capacity(), 20);
}
TEST(MessageTest, publish) {
MessageAttr attr(10, "channel");
Message<UnitTest> message(attr);
auto msg1 = std::make_shared<UnitTest>();
msg1->set_class_name("MessageTest");
msg1->set_case_name("publish_1");
UnitTest msg2;
msg2.set_class_name("MessageTest");
msg2.set_case_name("publish_2");
EXPECT_TRUE(message.IsPublishedEmpty());
message.Publish(msg1);
message.Publish(msg2);
EXPECT_FALSE(message.IsPublishedEmpty());
EXPECT_TRUE(message.IsObservedEmpty());
message.Observe();
EXPECT_FALSE(message.IsObservedEmpty());
auto& latest_observed_msg = message.GetLatestObserved();
EXPECT_EQ(latest_observed_msg.class_name(), "MessageTest");
EXPECT_EQ(latest_observed_msg.case_name(), "publish_2");
auto latest_observed_msg_ptr = message.GetLatestObservedPtr();
EXPECT_EQ(latest_observed_msg_ptr->class_name(), "MessageTest");
EXPECT_EQ(latest_observed_msg_ptr->case_name(), "publish_2");
auto latest_published_ptr = message.GetLatestPublishedPtr();
EXPECT_EQ(latest_published_ptr->class_name(), "MessageTest");
EXPECT_EQ(latest_published_ptr->case_name(), "publish_2");
message.ClearPublished();
EXPECT_TRUE(message.IsPublishedEmpty());
EXPECT_FALSE(message.IsObservedEmpty());
}
TEST(MessageTest, subscribe) {
MessageAttr attr(10, "channel");
Message<UnitTest> message(attr);
auto received_msg = std::make_shared<UnitTest>();
bool res = message.Subscribe(
"MessageTest1", [&received_msg](const std::shared_ptr<UnitTest>& msg) {
received_msg->CopyFrom(*msg);
});
EXPECT_TRUE(res);
auto msg1 = std::make_shared<UnitTest>();
msg1->set_class_name("MessageTest");
msg1->set_case_name("publish_1");
message.Publish(msg1);
EXPECT_EQ(received_msg->class_name(), msg1->class_name());
EXPECT_EQ(received_msg->case_name(), msg1->case_name());
res = message.Subscribe(
"MessageTest1", [&received_msg](const std::shared_ptr<UnitTest>& msg) {
received_msg->CopyFrom(*msg);
});
EXPECT_FALSE(res);
res = message.Unsubscribe("MessageTest1");
EXPECT_TRUE(res);
res = message.Unsubscribe("MessageTest1");
EXPECT_FALSE(res);
UnitTest msg2;
msg2.set_class_name("MessageTest");
msg2.set_case_name("publish_2");
message.Publish(msg2);
EXPECT_NE(received_msg->case_name(), msg2.case_name());
}
} // namespace dispatcher
} // namespace cybertron
} // namespace apollo
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
......@@ -15,6 +15,7 @@
*****************************************************************************/
#include "cybertron/timer/timer.h"
#include "cybertron/common/global_data.h"
namespace apollo {
namespace cybertron {
......@@ -35,6 +36,10 @@ Timer::Timer(uint32_t period, std::function<void()> callback, bool oneshot) {
void Timer::SetTimerOption(TimerOption opt) { timer_opt_ = opt; }
void Timer::Start() {
if (!common::GlobalData::Instance()->IsRealityMode()) {
return;
}
if (timer_id_ == 0) {
timer_id_ =
tm_->Add(timer_opt_.period, timer_opt_.callback, timer_opt_.oneshot);
......
......@@ -82,6 +82,12 @@ add_executable(user_defined_task
)
target_link_libraries(user_defined_task -Wl,--no-as-needed -pthread cybertron)
add_executable(dispatcher_example
dispatcher_example.cpp
)
target_link_libraries(dispatcher_example -pthread dl cybertron perception_component)
install(TARGETS dispatcher_example DESTINATION bin)
install(DIRECTORY dag DESTINATION ${CMAKE_INSTALL_PREFIX})
......
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#include <iostream>
#include "cybertron/dispatcher/dispatcher.h"
#include "cybertron/proto/component_config.pb.h"
#include "examples/component_perception/perception_component.h"
// To use this example, we need set run_mode to MODE_SIMULATION
// (run_mode is in file conf/cybertron.pb.conf)
int main(int argc, char* argv[]) {
apollo::cybertron::proto::ComponentConfig config;
config.set_name("perception");
auto reader = config.add_readers();
reader->set_channel("/driver/channel");
auto perception = std::make_shared<::test::PerceptionComponent>();
perception->Initialize(config);
auto driver_msg = std::make_shared<apollo::cybertron::proto::Driver>();
driver_msg->set_msg_id(0);
driver_msg->set_timestamp(0);
driver_msg->mutable_header()->set_timestamp(0);
auto driver_raw_msg =
std::make_shared<apollo::cybertron::message::RawMessage>();
driver_msg->SerializeToString(&driver_raw_msg->message);
perception->Proc(driver_raw_msg);
auto message = apollo::cybertron::dispatcher::Dispatcher::Instance()
->GetMessage<Perception>("/perception/channel");
bool is_empty = message->IsPublishedEmpty();
if (is_empty) {
std::cout << "simulation failed." << std::endl;
} else {
std::cout << "simulation succ." << std::endl;
}
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册