From 2f5db74871789322c796d3c9c890582f1463680e Mon Sep 17 00:00:00 2001 From: JackyWang08 Date: Thu, 13 Sep 2018 20:12:35 -0700 Subject: [PATCH] framework: add support for simulation --- framework/CMakeLists.txt | 2 + framework/cybertron/common/global_data.cpp | 7 + framework/cybertron/common/global_data.h | 5 + framework/cybertron/component/component.h | 150 +++++++++- framework/cybertron/conf/cybertron.pb.conf | 4 + framework/cybertron/cybertron.cpp | 9 +- framework/cybertron/dispatcher/CMakeLists.txt | 6 + framework/cybertron/dispatcher/dispatcher.cpp | 36 +++ framework/cybertron/dispatcher/dispatcher.h | 155 ++++++++++ framework/cybertron/dispatcher/intra_reader.h | 196 +++++++++++++ framework/cybertron/dispatcher/intra_writer.h | 91 ++++++ framework/cybertron/dispatcher/message.h | 265 ++++++++++++++++++ framework/cybertron/message/raw_message.h | 15 +- framework/cybertron/node/node.cpp | 12 +- framework/cybertron/node/node.h | 8 +- framework/cybertron/node/node_channel_impl.h | 119 +++++--- framework/cybertron/node/reader.h | 18 +- framework/cybertron/node/reader_base.h | 10 +- framework/cybertron/node/writer.h | 6 +- framework/cybertron/node/writer_base.h | 9 +- framework/cybertron/proto/cyber_config.proto | 2 + framework/cybertron/proto/run_mode_conf.proto | 12 + framework/cybertron/test/CMakeLists.txt | 1 + .../cybertron/test/dispatcher/CMakeLists.txt | 27 ++ .../test/dispatcher/dispatcher_test.cpp | 104 +++++++ .../test/dispatcher/message_test.cpp | 124 ++++++++ framework/cybertron/timer/timer.cpp | 5 + framework/examples/CMakeLists.txt | 6 + framework/examples/dispatcher_example.cpp | 58 ++++ 29 files changed, 1356 insertions(+), 106 deletions(-) create mode 100644 framework/cybertron/dispatcher/CMakeLists.txt create mode 100644 framework/cybertron/dispatcher/dispatcher.cpp create mode 100644 framework/cybertron/dispatcher/dispatcher.h create mode 100644 framework/cybertron/dispatcher/intra_reader.h create mode 100644 framework/cybertron/dispatcher/intra_writer.h create mode 100644 framework/cybertron/dispatcher/message.h create mode 100644 framework/cybertron/proto/run_mode_conf.proto create mode 100644 framework/cybertron/test/dispatcher/CMakeLists.txt create mode 100644 framework/cybertron/test/dispatcher/dispatcher_test.cpp create mode 100644 framework/cybertron/test/dispatcher/message_test.cpp create mode 100644 framework/examples/dispatcher_example.cpp diff --git a/framework/CMakeLists.txt b/framework/CMakeLists.txt index 4b20fd8823..6e817404bd 100644 --- a/framework/CMakeLists.txt +++ b/framework/CMakeLists.txt @@ -27,6 +27,7 @@ add_subdirectory(sensor) add_subdirectory(third_party) add_subdirectory(python) +add_subdirectory(cybertron/dispatcher) file(GLOB CYBERTRON_SRCS "cybertron/*.cpp" @@ -64,6 +65,7 @@ target_link_libraries(cybertron protobuf cybertron_proto cybertron_common + cybertron_dispatcher fastrtps fastcdr -lrt diff --git a/framework/cybertron/common/global_data.cpp b/framework/cybertron/common/global_data.cpp index 09db854446..8d2852c776 100644 --- a/framework/cybertron/common/global_data.cpp +++ b/framework/cybertron/common/global_data.cpp @@ -40,6 +40,11 @@ GlobalData::GlobalData() { InitConfig(); process_id_ = getpid(); process_name_ = "default_" + std::to_string(process_id_); + is_reality_mode_ = (config_.has_run_mode_conf() && + config_.run_mode_conf().run_mode() == + apollo::cybertron::proto::RunMode::MODE_SIMULATION) + ? false + : true; } GlobalData::~GlobalData() {} @@ -55,6 +60,8 @@ const std::string& GlobalData::HostIp() const { return host_ip_; } const std::string& GlobalData::HostName() const { return host_name_; } +bool GlobalData::IsRealityMode() const { return is_reality_mode_; } + void GlobalData::InitHostInfo() { char host_name[1024]; gethostname(host_name, 1024); diff --git a/framework/cybertron/common/global_data.h b/framework/cybertron/common/global_data.h index 2454c91ba8..38d96b508b 100644 --- a/framework/cybertron/common/global_data.h +++ b/framework/cybertron/common/global_data.h @@ -51,6 +51,8 @@ class GlobalData { const CyberConfig& Config() const; + bool IsRealityMode() const; + static uint64_t RegisterNode(const std::string& node_name); static const std::string& GetNodeById(uint64_t id); @@ -78,6 +80,9 @@ class GlobalData { int process_id_; std::string process_name_; + // run mode + bool is_reality_mode_; + static AtomicHashMap node_id_map_; static AtomicHashMap channel_id_map_; static AtomicHashMap service_id_map_; diff --git a/framework/cybertron/component/component.h b/framework/cybertron/component/component.h index 292f01a0d5..d5cbe88837 100644 --- a/framework/cybertron/component/component.h +++ b/framework/cybertron/component/component.h @@ -27,6 +27,7 @@ #include "cybertron/component/component_base.h" #include "cybertron/croutine/routine_factory.h" #include "cybertron/data/data_visitor.h" +#include "cybertron/proto/run_mode_conf.pb.h" #include "cybertron/scheduler/scheduler.h" namespace apollo { @@ -135,20 +136,15 @@ bool Component::Initialize( AERROR << "Component Init() failed."; return false; } + + bool is_reality_mode = GlobalData::Instance()->IsRealityMode(); + RoleAttributes attr; attr.set_node_name(config.name()); attr.set_channel_name(config.readers(0).channel()); auto qos_profile = attr.mutable_qos_profile(); *qos_profile = config.readers(0).qos_profile(); - auto reader = node_->CreateReader(attr); - if (reader == nullptr) { - AERROR << "Component create reader failed."; - return false; - } - auto dv = std::make_shared>(reader); - readers_.emplace_back(std::move(reader)); - auto sched = scheduler::Scheduler::Instance(); std::weak_ptr> self = std::dynamic_pointer_cast>(shared_from_this()); auto func = [self](const std::shared_ptr& msg) { @@ -160,8 +156,28 @@ bool Component::Initialize( } }; + std::shared_ptr> reader = nullptr; + + if (is_reality_mode) { + reader = node_->CreateReader(attr); + } else { + reader = node_->CreateReader(attr, func); + } + + if (reader == nullptr) { + AERROR << "Component create reader failed."; + return false; + } + readers_.emplace_back(std::move(reader)); + + if (!is_reality_mode) { + return true; + } + + auto dv = std::make_shared>(readers_[0]); croutine::RoutineFactory factory = croutine::CreateRoutineFactory(func, dv); + auto sched = scheduler::Scheduler::Instance(); return sched->CreateTask(factory, node_->Name()); } @@ -187,12 +203,39 @@ bool Component::Initialize( return false; } + bool is_reality_mode = GlobalData::Instance()->IsRealityMode(); + RoleAttributes attr; attr.set_node_name(config.name()); - attr.set_channel_name(config.readers(0).channel()); - auto reader0 = node_->template CreateReader(attr); attr.set_channel_name(config.readers(1).channel()); auto reader1 = node_->template CreateReader(attr); + + attr.set_channel_name(config.readers(0).channel()); + std::shared_ptr> reader0 = nullptr; + if (is_reality_mode) { + reader0 = node_->template CreateReader(attr); + } else { + std::weak_ptr> self = + std::dynamic_pointer_cast>(shared_from_this()); + + auto message1 = dispatcher::Dispatcher::Instance()->GetMessage( + config.readers(1).channel()); + + auto func = [self, message1](const std::shared_ptr& msg0) { + auto ptr = self.lock(); + if (ptr) { + if (!message1->IsPublishedEmpty()) { + auto msg1 = message1->GetLatestPublishedPtr(); + ptr->Process(msg0, msg1); + } + } else { + AERROR << "Component object has been destroyed."; + } + }; + + reader0 = node_->template CreateReader(attr, func); + } + if (reader0 == nullptr || reader1 == nullptr) { AERROR << "Component create reader failed."; return false; @@ -200,6 +243,10 @@ bool Component::Initialize( readers_.push_back(std::move(reader0)); readers_.push_back(std::move(reader1)); + if (!is_reality_mode) { + return true; + } + auto sched = scheduler::Scheduler::Instance(); std::weak_ptr> self = std::dynamic_pointer_cast>(shared_from_this()); @@ -242,14 +289,45 @@ bool Component::Initialize( return false; } + bool is_reality_mode = GlobalData::Instance()->IsRealityMode(); + RoleAttributes attr; attr.set_node_name(config.name()); - attr.set_channel_name(config.readers(0).channel()); - auto reader0 = node_->template CreateReader(attr); attr.set_channel_name(config.readers(1).channel()); auto reader1 = node_->template CreateReader(attr); attr.set_channel_name(config.readers(2).channel()); auto reader2 = node_->template CreateReader(attr); + + attr.set_channel_name(config.readers(0).channel()); + std::shared_ptr> reader0 = nullptr; + if (is_reality_mode) { + reader0 = node_->template CreateReader(attr); + } else { + std::weak_ptr> self = + std::dynamic_pointer_cast>( + shared_from_this()); + + auto message1 = dispatcher::Dispatcher::Instance()->GetMessage( + config.readers(1).channel()); + auto message2 = dispatcher::Dispatcher::Instance()->GetMessage( + config.readers(2).channel()); + + auto func = [self, message1, message2](const std::shared_ptr& msg0) { + auto ptr = self.lock(); + if (ptr) { + if (!message1->IsPublishedEmpty() && !message2->IsPublishedEmpty()) { + auto msg1 = message1->GetLatestPublishedPtr(); + auto msg2 = message2->GetLatestPublishedPtr(); + ptr->Process(msg0, msg1, msg2); + } + } else { + AERROR << "Component object has been destroyed."; + } + }; + + reader0 = node_->template CreateReader(attr, func); + } + if (reader0 == nullptr || reader1 == nullptr || reader2 == nullptr) { AERROR << "Component create reader failed."; return false; @@ -258,6 +336,10 @@ bool Component::Initialize( readers_.push_back(std::move(reader1)); readers_.push_back(std::move(reader2)); + if (!is_reality_mode) { + return true; + } + auto sched = scheduler::Scheduler::Instance(); std::weak_ptr> self = std::dynamic_pointer_cast>( @@ -302,16 +384,52 @@ bool Component::Initialize(const ComponentConfig& config) { return false; } + bool is_reality_mode = GlobalData::Instance()->IsRealityMode(); + RoleAttributes attr; attr.set_node_name(config.name()); - attr.set_channel_name(config.readers(0).channel()); - auto reader0 = node_->template CreateReader(attr); attr.set_channel_name(config.readers(1).channel()); auto reader1 = node_->template CreateReader(attr); attr.set_channel_name(config.readers(2).channel()); auto reader2 = node_->template CreateReader(attr); attr.set_channel_name(config.readers(3).channel()); auto reader3 = node_->template CreateReader(attr); + + attr.set_channel_name(config.readers(0).channel()); + std::shared_ptr> reader0 = nullptr; + if (is_reality_mode) { + reader0 = node_->template CreateReader(attr); + } else { + std::weak_ptr> self = + std::dynamic_pointer_cast>( + shared_from_this()); + + auto message1 = dispatcher::Dispatcher::Instance()->GetMessage( + config.readers(1).channel()); + auto message2 = dispatcher::Dispatcher::Instance()->GetMessage( + config.readers(2).channel()); + auto message3 = dispatcher::Dispatcher::Instance()->GetMessage( + config.readers(3).channel()); + + auto func = [self, message1, message2, + message3](const std::shared_ptr& msg0) { + auto ptr = self.lock(); + if (ptr) { + if (!message1->IsPublishedEmpty() && !message2->IsPublishedEmpty() && + !message3->IsPublishedEmpty()) { + auto msg1 = message1->GetLatestPublishedPtr(); + auto msg2 = message2->GetLatestPublishedPtr(); + auto msg3 = message3->GetLatestPublishedPtr(); + ptr->Process(msg0, msg1, msg2, msg3); + } + } else { + AERROR << "Component object has been destroyed."; + } + }; + + reader0 = node_->template CreateReader(attr, func); + } + if (reader0 == nullptr || reader1 == nullptr || reader2 == nullptr || reader3 == nullptr) { AERROR << "Component create reader failed." << std::endl; @@ -322,6 +440,10 @@ bool Component::Initialize(const ComponentConfig& config) { readers_.push_back(std::move(reader2)); readers_.push_back(std::move(reader3)); + if (!is_reality_mode) { + return true; + } + auto sched = scheduler::Scheduler::Instance(); std::weak_ptr> self = std::dynamic_pointer_cast>(shared_from_this()); diff --git a/framework/cybertron/conf/cybertron.pb.conf b/framework/cybertron/conf/cybertron.pb.conf index 5b2e28a94b..f722e2cc38 100644 --- a/framework/cybertron/conf/cybertron.pb.conf +++ b/framework/cybertron/conf/cybertron.pb.conf @@ -105,3 +105,7 @@ transport_conf { max_history_depth: 1000 } } + +run_mode_conf { + run_mode: MODE_REALITY +} diff --git a/framework/cybertron/cybertron.cpp b/framework/cybertron/cybertron.cpp index 9b441934ca..44d416f72b 100644 --- a/framework/cybertron/cybertron.cpp +++ b/framework/cybertron/cybertron.cpp @@ -15,13 +15,20 @@ *****************************************************************************/ #include "cybertron/cybertron.h" +#include "cybertron/common/global_data.h" +#include "cybertron/proto/run_mode_conf.pb.h" namespace apollo { namespace cybertron { +using apollo::cybertron::common::GlobalData; +using apollo::cybertron::proto::RunMode; + std::unique_ptr CreateNode(const std::string& node_name, const std::string& name_space) { - if (!OK()) { + static bool is_reality_mode = GlobalData::Instance()->IsRealityMode(); + + if (is_reality_mode && !OK()) { // TODO @nizhongjun // add some hint log AERROR << "cybertron has not inited."; diff --git a/framework/cybertron/dispatcher/CMakeLists.txt b/framework/cybertron/dispatcher/CMakeLists.txt new file mode 100644 index 0000000000..dea73ad7a9 --- /dev/null +++ b/framework/cybertron/dispatcher/CMakeLists.txt @@ -0,0 +1,6 @@ +project(cybertron_dispatcher) + +aux_source_directory(${PROJECT_SOURCE_DIR} DISPATCHER_SRCS) +add_library(${PROJECT_NAME} ${DISPATCHER_SRCS}) + +install(TARGETS cybertron_dispatcher LIBRARY DESTINATION lib) diff --git a/framework/cybertron/dispatcher/dispatcher.cpp b/framework/cybertron/dispatcher/dispatcher.cpp new file mode 100644 index 0000000000..1fb09dc9cd --- /dev/null +++ b/framework/cybertron/dispatcher/dispatcher.cpp @@ -0,0 +1,36 @@ +/****************************************************************************** + * Copyright 2018 The Apollo Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *****************************************************************************/ + +#include "cybertron/dispatcher/dispatcher.h" + +namespace apollo { +namespace cybertron { +namespace dispatcher { + +Dispatcher::Dispatcher() {} + +Dispatcher::~Dispatcher() { messages_.clear(); } + +void Dispatcher::Observe() { + std::lock_guard lock(msg_mutex_); + for (auto& item : messages_) { + item.second->Observe(); + } +} + +} // namespace dispatcher +} // namespace cybertron +} // namespace apollo diff --git a/framework/cybertron/dispatcher/dispatcher.h b/framework/cybertron/dispatcher/dispatcher.h new file mode 100644 index 0000000000..94e2a05b60 --- /dev/null +++ b/framework/cybertron/dispatcher/dispatcher.h @@ -0,0 +1,155 @@ +/****************************************************************************** + * Copyright 2018 The Apollo Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *****************************************************************************/ + +#ifndef CYBERTRON_DISPATCHER_DISPATCHER_H_ +#define CYBERTRON_DISPATCHER_DISPATCHER_H_ + +#include +#include +#include +#include + +#include "cybertron/dispatcher/message.h" + +namespace apollo { +namespace cybertron { +namespace dispatcher { + +class Dispatcher { + public: + using MessageMap = + std::unordered_map>; + + virtual ~Dispatcher(); + + static const std::shared_ptr& Instance() { + static auto instance = std::shared_ptr(new Dispatcher()); + return instance; + } + + template + bool Publish(const std::string& channel_name, + const typename Message::MessagePtr& msg); + + template + bool Publish(const std::string& channel_name, + const typename Message::MessageType& msg); + + template + bool Subscribe(const std::string& channel_name, size_t capacity, + const std::string& callback_id, + const typename Message::Callback& callback); + + template + bool Unsubscribe(const std::string& channel_name, + const std::string& callback_id); + + template + std::shared_ptr> GetMessage(const std::string& channel_name); + + template + std::shared_ptr> GetOrCreateMessage(const MessageAttr& attr); + + void Observe(); + + private: + Dispatcher(); + Dispatcher(const Dispatcher&) = delete; + Dispatcher& operator=(const Dispatcher&) = delete; + + MessageMap messages_; + std::mutex msg_mutex_; +}; + +template +bool Dispatcher::Publish(const std::string& channel_name, + const typename Message::MessagePtr& msg) { + auto message = GetOrCreateMessage(MessageAttr(channel_name)); + if (message == nullptr) { + return false; + } + message->Publish(msg); + return true; +} + +template +bool Dispatcher::Publish(const std::string& channel_name, + const typename Message::MessageType& msg) { + auto message = GetOrCreateMessage(MessageAttr(channel_name)); + if (message == nullptr) { + return false; + } + message->Publish(msg); + return true; +} + +template +bool Dispatcher::Subscribe(const std::string& channel_name, size_t capacity, + const std::string& callback_id, + const typename Message::Callback& callback) { + auto message = GetOrCreateMessage(MessageAttr(capacity, channel_name)); + if (message == nullptr) { + return false; + } + return message->Subscribe(callback_id, callback); +} + +template +bool Dispatcher::Unsubscribe(const std::string& channel_name, + const std::string& callback_id) { + auto message = GetMessage(channel_name); + if (message == nullptr) { + return false; + } + return message->Unsubscribe(callback_id); +} + +template +std::shared_ptr> Dispatcher::GetMessage( + const std::string& channel_name) { + std::shared_ptr> message = nullptr; + { + std::lock_guard lock(msg_mutex_); + auto search = messages_.find(channel_name); + if (search != messages_.end()) { + message = std::dynamic_pointer_cast>(search->second); + } + } + return message; +} + +template +std::shared_ptr> Dispatcher::GetOrCreateMessage( + const MessageAttr& attr) { + std::shared_ptr> message = nullptr; + { + std::lock_guard lock(msg_mutex_); + auto search = messages_.find(attr.channel_name); + if (search != messages_.end()) { + message = std::dynamic_pointer_cast>(search->second); + } else { + message = std::make_shared>(attr); + messages_[attr.channel_name] = message; + } + } + return message; +} + +} // namespace dispatcher +} // namespace cybertron +} // namespace apollo + +#endif // CYBERTRON_DISPATCHER_DISPATCHER_H_ diff --git a/framework/cybertron/dispatcher/intra_reader.h b/framework/cybertron/dispatcher/intra_reader.h new file mode 100644 index 0000000000..5890615719 --- /dev/null +++ b/framework/cybertron/dispatcher/intra_reader.h @@ -0,0 +1,196 @@ +/****************************************************************************** + * Copyright 2018 The Apollo Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *****************************************************************************/ + +#ifndef CYBERTRON_DISPATCHER_INTRA_READER_H_ +#define CYBERTRON_DISPATCHER_INTRA_READER_H_ + +#include +#include + +#include "cybertron/dispatcher/dispatcher.h" +#include "cybertron/node/reader.h" + +namespace apollo { +namespace cybertron { +namespace dispatcher { + +template +class IntraReader : public apollo::cybertron::Reader { + public: + using MessagePtr = std::shared_ptr; + using Callback = std::function&)>; + using Iterator = + typename std::list>::const_iterator; + + IntraReader(const proto::RoleAttributes& attr, const Callback& callback); + virtual ~IntraReader(); + + bool Init() override; + void Shutdown() override; + + void ClearData() override; + void Observe() override; + bool Empty() const override; + bool HasReceived() const override; + + void Enqueue(const std::shared_ptr& msg) override; + void SetHistoryDepth(const uint32_t& depth) override; + uint32_t GetHistoryDepth() const override; + const std::shared_ptr& GetLatestObserved() const override; + const std::shared_ptr& GetOldestObserved() const override; + Iterator Begin() const override; + Iterator End() const override; + + private: + void OnMessage(const MessagePtr& msg_ptr); + + Callback msg_callback_; + std::shared_ptr> message_; +}; + +template +IntraReader::IntraReader(const proto::RoleAttributes& attr, + const Callback& callback) + : Reader(attr), msg_callback_(callback), message_(nullptr) {} + +template +IntraReader::~IntraReader() { + Shutdown(); +} + +template +bool IntraReader::Init() { + if (this->init_.exchange(true)) { + return true; + } + MessageAttr attr(this->role_attr_.qos_profile().depth(), + this->role_attr_.channel_name()); + message_ = Dispatcher::Instance()->GetOrCreateMessage(attr); + if (message_ == nullptr) { + return false; + } + return message_->Subscribe(this->role_attr_.node_name(), + std::bind(&IntraReader::OnMessage, this, + std::placeholders::_1)); +} + +template +void IntraReader::Shutdown() { + if (!this->init_.exchange(false)) { + return; + } + message_->Unsubscribe(this->role_attr_.node_name()); + message_ = nullptr; +} + +template +void IntraReader::ClearData() { + if (message_ == nullptr) { + return; + } + message_->ClearPublished(); +} + +template +void IntraReader::Observe() { + if (message_ == nullptr) { + return; + } + message_->Observe(); +} + +template +bool IntraReader::Empty() const { + if (message_ == nullptr) { + return true; + } + return message_->IsObservedEmpty(); +} + +template +bool IntraReader::HasReceived() const { + if (message_ == nullptr) { + return false; + } + return !message_->IsPublishedEmpty(); +} + +template +void IntraReader::Enqueue(const std::shared_ptr& msg) { + if (message_ == nullptr) { + return; + } + message_->Publish(msg); +} + +template +void IntraReader::SetHistoryDepth(const uint32_t& depth) { + if (message_ == nullptr) { + return; + } + message_->set_capacity(depth); +} + +template +uint32_t IntraReader::GetHistoryDepth() const { + if (message_ == nullptr) { + return 0; + } + return message_->capacity(); +} + +template +const std::shared_ptr& IntraReader::GetLatestObserved() + const { + if (message_ == nullptr) { + return nullptr; + } + return message_->GetLatestObservedPtr(); +} + +template +const std::shared_ptr& IntraReader::GetOldestObserved() + const { + if (message_ == nullptr) { + return nullptr; + } + return message_->GetOldestObservedPtr(); +} + +template +auto IntraReader::Begin() const -> Iterator { + assert(message_ != nullptr); + return message_->ObservedBegin(); +} + +template +auto IntraReader::End() const -> Iterator { + assert(message_ != nullptr); + return message_->ObservedEnd(); +} + +template +void IntraReader::OnMessage(const MessagePtr& msg_ptr) { + if (msg_callback_ != nullptr) { + msg_callback_(msg_ptr); + } +} + +} // namespace dispatcher +} // namespace cybertron +} // namespace apollo + +#endif // CYBERTRON_DISPATCHER_INTRA_READER_H_ diff --git a/framework/cybertron/dispatcher/intra_writer.h b/framework/cybertron/dispatcher/intra_writer.h new file mode 100644 index 0000000000..1ae5f3a473 --- /dev/null +++ b/framework/cybertron/dispatcher/intra_writer.h @@ -0,0 +1,91 @@ +/****************************************************************************** + * Copyright 2018 The Apollo Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *****************************************************************************/ + +#ifndef CYBERTRON_DISPATCHER_INTRA_WRITER_H_ +#define CYBERTRON_DISPATCHER_INTRA_WRITER_H_ + +#include + +#include "cybertron/dispatcher/dispatcher.h" +#include "cybertron/node/writer.h" + +namespace apollo { +namespace cybertron { +namespace dispatcher { + +template +class IntraWriter : public apollo::cybertron::Writer { + public: + using MessagePtr = std::shared_ptr; + using DispatcherPtr = std::shared_ptr; + + IntraWriter(const proto::RoleAttributes& attr); + virtual ~IntraWriter(); + + bool Init() override; + void Shutdown() override; + + bool Write(const MessageT& msg) override; + bool Write(const MessagePtr& msg_ptr) override; + + private: + DispatcherPtr dispatcher_; +}; + +template +IntraWriter::IntraWriter(const proto::RoleAttributes& attr) + : Writer(attr) { + dispatcher_ = dispatcher::Dispatcher::Instance(); +} + +template +IntraWriter::~IntraWriter() { + Shutdown(); +} + +template +bool IntraWriter::Init() { + this->init_.exchange(true); + return true; +} + +template +void IntraWriter::Shutdown() { + this->init_.exchange(false); +} + +template +bool IntraWriter::Write(const MessageT& msg) { + if (!this->init_.load()) { + return false; + } + return Write(std::make_shared(msg)); +} + +template +bool IntraWriter::Write(const MessagePtr& msg_ptr) { + if (!this->init_.load()) { + return false; + } + return dispatcher_->Publish(this->role_attr_.channel_name(), + msg_ptr); +} + +} // namespace dispatcher +} // namespace cybertron +} // namespace apollo + +#endif // CYBERTRON_DISPATCHER_INTRA_WRITER_H_ diff --git a/framework/cybertron/dispatcher/message.h b/framework/cybertron/dispatcher/message.h new file mode 100644 index 0000000000..347bb416ae --- /dev/null +++ b/framework/cybertron/dispatcher/message.h @@ -0,0 +1,265 @@ +/****************************************************************************** + * Copyright 2018 The Apollo Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *****************************************************************************/ + +#ifndef CYBERTRON_DISPATCHER_MESSAGE_H_ +#define CYBERTRON_DISPATCHER_MESSAGE_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace apollo { +namespace cybertron { +namespace dispatcher { + +class MessageBase { + public: + virtual ~MessageBase() = default; + + virtual void ClearPublished() = 0; + virtual void Observe() = 0; + virtual bool IsObservedEmpty() const = 0; + virtual bool IsPublishedEmpty() const = 0; + virtual bool Unsubscribe(const std::string& callback_id) = 0; + + virtual size_t capacity() const = 0; + virtual void set_capacity(size_t capacity) = 0; + virtual const std::string& channel_name() const = 0; +}; + +struct MessageAttr { + MessageAttr() : capacity(10), channel_name("") {} + MessageAttr(const std::string& channel) + : capacity(10), channel_name(channel) {} + MessageAttr(size_t cap, const std::string& channel) + : capacity(cap), channel_name(channel) {} + MessageAttr(const MessageAttr& attr) + : capacity(attr.capacity), channel_name(attr.channel_name) {} + + size_t capacity; + std::string channel_name; +}; + +template +class Message : public MessageBase { + public: + using MessageType = T; + using MessagePtr = std::shared_ptr; + using MessageQueue = std::list; + using Callback = std::function; + using CallbackMap = std::unordered_map; + using Iterator = typename std::list>::const_iterator; + + Message(const MessageAttr& attr); + virtual ~Message(); + + void Publish(const MessageType& msg); + void Publish(const MessagePtr& msg); + + void ClearPublished() override; + void Observe() override; + bool IsObservedEmpty() const override; + bool IsPublishedEmpty() const override; + + bool Subscribe(const std::string& callback_id, const Callback& callback); + bool Unsubscribe(const std::string& callback_id) override; + + const MessageType& GetLatestObserved() const; + const MessagePtr GetLatestObservedPtr() const; + const MessagePtr GetOldestObservedPtr() const; + const MessagePtr GetLatestPublishedPtr() const; + + Iterator ObservedBegin() const; + Iterator ObservedEnd() const; + + size_t capacity() const override; + void set_capacity(size_t capacity) override; + const std::string& channel_name() const override; + + private: + void Enqueue(const MessagePtr& msg); + void Notify(const MessagePtr& msg); + + bool is_full_; + MessageAttr attr_; + MessageQueue observed_msg_queue_; + MessageQueue published_msg_queue_; + mutable std::mutex msg_mutex_; + + CallbackMap published_callbacks_; + mutable std::mutex cb_mutex_; +}; + +template +Message::Message(const MessageAttr& attr) : is_full_(false), attr_(attr) {} + +template +Message::~Message() { + published_msg_queue_.clear(); + observed_msg_queue_.clear(); + published_callbacks_.clear(); +} + +template +void Message::Publish(const MessageType& msg) { + Publish(std::make_shared(msg)); +} + +template +void Message::Publish(const MessagePtr& msg) { + Enqueue(msg); + Notify(msg); +} + +template +void Message::ClearPublished() { + std::lock_guard lock(msg_mutex_); + published_msg_queue_.clear(); +} + +template +void Message::Observe() { + std::lock_guard lock(msg_mutex_); + observed_msg_queue_ = published_msg_queue_; +} + +template +bool Message::IsObservedEmpty() const { + std::lock_guard lock(msg_mutex_); + return observed_msg_queue_.empty(); +} + +template +bool Message::IsPublishedEmpty() const { + std::lock_guard lock(msg_mutex_); + return published_msg_queue_.empty(); +} + +template +bool Message::Subscribe(const std::string& callback_id, + const Callback& callback) { + std::lock_guard lock(cb_mutex_); + if (published_callbacks_.find(callback_id) != published_callbacks_.end()) { + return false; + } + published_callbacks_[callback_id] = callback; + return true; +} + +template +bool Message::Unsubscribe(const std::string& callback_id) { + std::lock_guard lock(cb_mutex_); + return published_callbacks_.erase(callback_id) != 0; +} + +template +auto Message::GetLatestObserved() const -> const MessageType& { + std::lock_guard lock(msg_mutex_); + assert(!observed_msg_queue_.empty()); + return *observed_msg_queue_.back(); +} + +template +auto Message::GetLatestObservedPtr() const -> const MessagePtr { + std::lock_guard lock(msg_mutex_); + assert(!observed_msg_queue_.empty()); + return observed_msg_queue_.back(); +} + +template +auto Message::GetOldestObservedPtr() const -> const MessagePtr { + std::lock_guard lock(msg_mutex_); + assert(!observed_msg_queue_.empty()); + return observed_msg_queue_.front(); +} + +template +auto Message::GetLatestPublishedPtr() const -> const MessagePtr { + std::lock_guard lock(msg_mutex_); + assert(!published_msg_queue_.empty()); + return published_msg_queue_.back(); +} + +template +auto Message::ObservedBegin() const -> Iterator { + return observed_msg_queue_.begin(); +} + +template +auto Message::ObservedEnd() const -> Iterator { + return observed_msg_queue_.end(); +} + +template +size_t Message::capacity() const { + return attr_.capacity; +} + +template +void Message::set_capacity(size_t capacity) { + std::lock_guard lock(msg_mutex_); + if (capacity > attr_.capacity) { + is_full_ = false; + } + attr_.capacity = capacity; + while (published_msg_queue_.size() > capacity) { + published_msg_queue_.pop_front(); + } +} + +template +const std::string& Message::channel_name() const { + return attr_.channel_name; +} + +template +void Message::Enqueue(const MessagePtr& msg) { + if (attr_.capacity == 0) { + return; + } + std::lock_guard lock(msg_mutex_); + if (is_full_) { + published_msg_queue_.pop_front(); + } + + published_msg_queue_.push_back(msg); + + if (!is_full_) { + if (published_msg_queue_.size() >= attr_.capacity) { + is_full_ = true; + } + } +} + +template +void Message::Notify(const MessagePtr& msg) { + std::lock_guard lock(cb_mutex_); + for (const auto& item : published_callbacks_) { + item.second(msg); + } +} + +} // namespace dispatcher +} // namespace cybertron +} // namespace apollo + +#endif // CYBERTRON_DISPATCHER_MESSAGE_H_ diff --git a/framework/cybertron/message/raw_message.h b/framework/cybertron/message/raw_message.h index 75d62cc872..51e370e511 100644 --- a/framework/cybertron/message/raw_message.h +++ b/framework/cybertron/message/raw_message.h @@ -31,6 +31,17 @@ struct RawMessage { explicit RawMessage(const std::string &data) { message = data; } + RawMessage(const RawMessage &raw_msg) + : message(raw_msg.message), timestamp(raw_msg.timestamp) {} + + RawMessage &operator=(const RawMessage &raw_msg) { + if (this != &raw_msg) { + this->message = raw_msg.message; + this->timestamp = raw_msg.timestamp; + } + return *this; + } + ~RawMessage() {} bool SerializeToString(std::string *str) const { @@ -52,10 +63,6 @@ struct RawMessage { std::string message; uint64_t timestamp; - - private: - RawMessage(const RawMessage &) = delete; - RawMessage &operator=(const RawMessage &) = delete; }; } // namespace message diff --git a/framework/cybertron/node/node.cpp b/framework/cybertron/node/node.cpp index 4b91855dc3..d29af288c2 100644 --- a/framework/cybertron/node/node.cpp +++ b/framework/cybertron/node/node.cpp @@ -27,19 +27,9 @@ Node::Node(const std::string& node_name, const std::string& name_space) : node_name_(node_name), name_space_(name_space) { node_channel_impl_.reset(new NodeChannelImpl(node_name)); node_service_impl_.reset(new NodeServiceImpl(node_name)); - - attr_.set_host_name(common::GlobalData::Instance()->HostName()); - attr_.set_process_id(common::GlobalData::Instance()->ProcessId()); - attr_.set_node_name(node_name); - uint64_t node_id = common::GlobalData::RegisterNode(node_name); - attr_.set_node_id(node_id); - - node_manager_ = - service_discovery::TopologyManager::Instance()->node_manager(); - node_manager_->Join(attr_, RoleType::ROLE_NODE); } -Node::~Node() { node_manager_->Leave(attr_, RoleType::ROLE_NODE); } +Node::~Node() {} const std::string& Node::Name() const { return node_name_; } diff --git a/framework/cybertron/node/node.h b/framework/cybertron/node/node.h index 24747f9cd2..894be2fb36 100644 --- a/framework/cybertron/node/node.h +++ b/framework/cybertron/node/node.h @@ -83,14 +83,14 @@ class Node { auto CreateReader(const proto::RoleAttributes& role_attr) -> std::shared_ptr>; - std::unique_ptr node_channel_impl_ = nullptr; - std::unique_ptr node_service_impl_ = nullptr; - std::shared_ptr node_manager_ = nullptr; std::string node_name_; std::string name_space_; - proto::RoleAttributes attr_; + std::mutex readers_mutex_; std::map> readers_; + + std::unique_ptr node_channel_impl_ = nullptr; + std::unique_ptr node_service_impl_ = nullptr; }; template diff --git a/framework/cybertron/node/node_channel_impl.h b/framework/cybertron/node/node_channel_impl.h index 0da8a50cb0..43494ac195 100644 --- a/framework/cybertron/node/node_channel_impl.h +++ b/framework/cybertron/node/node_channel_impl.h @@ -20,27 +20,50 @@ #include #include +#include "cybertron/common/global_data.h" +#include "cybertron/dispatcher/intra_reader.h" +#include "cybertron/dispatcher/intra_writer.h" #include "cybertron/message/message_traits.h" #include "cybertron/node/reader.h" #include "cybertron/node/writer.h" +#include "cybertron/proto/run_mode_conf.pb.h" namespace apollo { namespace cybertron { class Node; class NodeChannelImpl { - public: friend class Node; + public: + using NodeManagerPtr = std::shared_ptr; + explicit NodeChannelImpl(const std::string& node_name) - : node_name_(node_name) { - // TODO: topology register node + : is_reality_mode_(true), node_name_(node_name) { + node_attr_.set_host_name(common::GlobalData::Instance()->HostName()); + node_attr_.set_process_id(common::GlobalData::Instance()->ProcessId()); + node_attr_.set_node_name(node_name); + uint64_t node_id = common::GlobalData::RegisterNode(node_name); + node_attr_.set_node_id(node_id); + + is_reality_mode_ = common::GlobalData::Instance()->IsRealityMode(); + + if (is_reality_mode_) { + node_manager_ = + service_discovery::TopologyManager::Instance()->node_manager(); + node_manager_->Join(node_attr_, RoleType::ROLE_NODE); + } + } + virtual ~NodeChannelImpl() { + if (is_reality_mode_) { + node_manager_->Leave(node_attr_, RoleType::ROLE_NODE); + node_manager_ = nullptr; + } } - virtual ~NodeChannelImpl() {} - private: - std::string NodeName() const { return node_name_; } + const std::string& NodeName() const { return node_name_; } + private: template auto CreateWriter(const proto::RoleAttributes& role_attr) -> std::shared_ptr>; @@ -63,29 +86,29 @@ class NodeChannelImpl { auto CreateReader(const proto::RoleAttributes& role_attr) -> std::shared_ptr>; + template + void FillInAttr(proto::RoleAttributes* attr); + + bool is_reality_mode_; std::string node_name_; + proto::RoleAttributes node_attr_; + NodeManagerPtr node_manager_ = nullptr; }; template auto NodeChannelImpl::CreateWriter(const proto::RoleAttributes& role_attr) -> std::shared_ptr> { - auto channel_id = GlobalData::RegisterChannel(role_attr.channel_name()); - auto node_id = GlobalData::RegisterNode(node_name_); + RETURN_VAL_IF(!role_attr.has_channel_name(), nullptr); proto::RoleAttributes new_attr(role_attr); - new_attr.set_node_name(node_name_); - new_attr.set_node_id(node_id); - new_attr.set_channel_id(channel_id); - if (!new_attr.has_message_type()) { - new_attr.set_message_type(message::MessageType()); - } - if (!new_attr.has_proto_desc()) { - std::string proto_desc(""); - message::GetDescriptorString(new_attr.message_type(), - &proto_desc); - new_attr.set_proto_desc(proto_desc); + FillInAttr(&new_attr); + + std::shared_ptr> writer_ptr = nullptr; + if (!is_reality_mode_) { + writer_ptr = std::make_shared>(new_attr); + } else { + writer_ptr = std::make_shared>(new_attr); } - std::shared_ptr> writer_ptr = - std::make_shared>(new_attr); + RETURN_VAL_IF_NULL(writer_ptr, nullptr); RETURN_VAL_IF(!writer_ptr->Init(), nullptr); return writer_ptr; @@ -112,17 +135,18 @@ template auto NodeChannelImpl::CreateReader(const proto::RoleAttributes& role_attr, const CallbackFunc& reader_func) -> std::shared_ptr> { - auto channel_id = GlobalData::RegisterChannel(role_attr.channel_name()); - auto node_id = GlobalData::RegisterNode(node_name_); + RETURN_VAL_IF(!role_attr.has_channel_name(), nullptr); proto::RoleAttributes new_attr(role_attr); - new_attr.set_node_name(node_name_); - new_attr.set_node_id(node_id); - new_attr.set_channel_id(channel_id); - if (!new_attr.has_message_type()) { - new_attr.set_message_type(message::MessageType()); + FillInAttr(&new_attr); + + std::shared_ptr> reader_ptr = nullptr; + if (!is_reality_mode_) { + reader_ptr = std::make_shared>( + new_attr, reader_func); + } else { + reader_ptr = std::make_shared>(new_attr, reader_func); } - std::shared_ptr> reader_ptr = - std::make_shared>(new_attr, reader_func); + RETURN_VAL_IF_NULL(reader_ptr, nullptr); RETURN_VAL_IF(!reader_ptr->Init(), nullptr); return reader_ptr; @@ -131,20 +155,29 @@ auto NodeChannelImpl::CreateReader(const proto::RoleAttributes& role_attr, template auto NodeChannelImpl::CreateReader(const proto::RoleAttributes& role_attr) -> std::shared_ptr> { - auto channel_id = GlobalData::RegisterChannel(role_attr.channel_name()); - auto node_id = GlobalData::RegisterNode(node_name_); - proto::RoleAttributes new_attr(role_attr); - new_attr.set_node_name(node_name_); - new_attr.set_node_id(node_id); - new_attr.set_channel_id(channel_id); - if (!new_attr.has_message_type()) { - new_attr.set_message_type(message::MessageType()); + return this->template CreateReader(role_attr, nullptr); +} + +template +void NodeChannelImpl::FillInAttr(proto::RoleAttributes* attr) { + attr->set_host_name(node_attr_.host_name()); + attr->set_process_id(node_attr_.process_id()); + attr->set_node_name(node_attr_.node_name()); + attr->set_node_id(node_attr_.node_id()); + auto channel_id = GlobalData::RegisterChannel(attr->channel_name()); + attr->set_channel_id(channel_id); + if (!attr->has_message_type()) { + attr->set_message_type(message::MessageType()); + } + if (!attr->has_proto_desc()) { + std::string proto_desc(""); + message::GetDescriptorString(attr->message_type(), &proto_desc); + attr->set_proto_desc(proto_desc); + } + if (!attr->has_qos_profile()) { + attr->mutable_qos_profile()->CopyFrom( + transport::QosProfileConf::QOS_PROFILE_DEFAULT); } - std::shared_ptr> reader_ptr = - std::make_shared>(new_attr, nullptr); - RETURN_VAL_IF_NULL(reader_ptr, nullptr); - RETURN_VAL_IF(!reader_ptr->Init(), nullptr); - return reader_ptr; } } // namespace cybertron diff --git a/framework/cybertron/node/reader.h b/framework/cybertron/node/reader.h index 79f7664ec4..b276eb15eb 100644 --- a/framework/cybertron/node/reader.h +++ b/framework/cybertron/node/reader.h @@ -62,15 +62,15 @@ class Reader : public ReaderBase { bool HasReceived() const override; bool Empty() const override; - void Enqueue(const std::shared_ptr& msg); - void SetHistoryDepth(const uint32_t& depth); - uint32_t GetHistoryDepth() const; - const std::shared_ptr& GetLatestObserved() const; - const std::shared_ptr& GetOldestObserved() const; - Iterator Begin() const { return observed_queue_.begin(); } - Iterator End() const { return observed_queue_.end(); } - - protected: + virtual void Enqueue(const std::shared_ptr& msg); + virtual void SetHistoryDepth(const uint32_t& depth); + virtual uint32_t GetHistoryDepth() const; + virtual const std::shared_ptr& GetLatestObserved() const; + virtual const std::shared_ptr& GetOldestObserved() const; + virtual Iterator Begin() const { return observed_queue_.begin(); } + virtual Iterator End() const { return observed_queue_.end(); } + + private: void JoinTheTopology(); void LeaveTheTopology(); void OnChannelChange(const proto::ChangeMsg& change_msg); diff --git a/framework/cybertron/node/reader_base.h b/framework/cybertron/node/reader_base.h index 01744e2d76..68f2100da7 100644 --- a/framework/cybertron/node/reader_base.h +++ b/framework/cybertron/node/reader_base.h @@ -22,7 +22,6 @@ #include #include -#include "cybertron/common/global_data.h" #include "cybertron/common/macros.h" #include "cybertron/common/util.h" #include "cybertron/event/perf_event_cache.h" @@ -38,14 +37,7 @@ using apollo::cybertron::event::TransPerf; class ReaderBase { public: explicit ReaderBase(const proto::RoleAttributes& role_attr) - : role_attr_(role_attr), init_(false) { - if (!role_attr_.has_host_name()) { - role_attr_.set_host_name(common::GlobalData::Instance()->HostName()); - } - if (!role_attr_.has_process_id()) { - role_attr_.set_process_id(common::GlobalData::Instance()->ProcessId()); - } - } + : role_attr_(role_attr), init_(false) {} virtual ~ReaderBase() {} virtual bool Init() = 0; diff --git a/framework/cybertron/node/writer.h b/framework/cybertron/node/writer.h index ea036c2b4b..28acc03bdb 100644 --- a/framework/cybertron/node/writer.h +++ b/framework/cybertron/node/writer.h @@ -42,10 +42,10 @@ class Writer : public WriterBase { bool Init() override; void Shutdown() override; - bool Write(const MessageT& msg); - bool Write(const std::shared_ptr& msg_ptr); + virtual bool Write(const MessageT& msg); + virtual bool Write(const std::shared_ptr& msg_ptr); - protected: + private: void JoinTheTopology(); void LeaveTheTopology(); void OnChannelChange(const proto::ChangeMsg& change_msg); diff --git a/framework/cybertron/node/writer_base.h b/framework/cybertron/node/writer_base.h index ea85369dd9..43596b2e38 100644 --- a/framework/cybertron/node/writer_base.h +++ b/framework/cybertron/node/writer_base.h @@ -20,9 +20,7 @@ #include #include -#include "cybertron/common/global_data.h" #include "cybertron/proto/role_attributes.pb.h" -#include "cybertron/transport/common/identity.h" namespace apollo { namespace cybertron { @@ -31,12 +29,7 @@ class WriterBase { public: explicit WriterBase(const proto::RoleAttributes& role_attr) : role_attr_(role_attr), init_(false) { - if (!role_attr_.has_host_name()) { - role_attr_.set_host_name(common::GlobalData::Instance()->HostName()); - } - if (!role_attr_.has_process_id()) { - role_attr_.set_process_id(common::GlobalData::Instance()->ProcessId()); - } + } virtual ~WriterBase() {} diff --git a/framework/cybertron/proto/cyber_config.proto b/framework/cybertron/proto/cyber_config.proto index 559988072a..74645d8fee 100644 --- a/framework/cybertron/proto/cyber_config.proto +++ b/framework/cybertron/proto/cyber_config.proto @@ -7,6 +7,7 @@ import "scheduler_conf.proto"; import "transport_conf.proto"; import "log_conf.proto"; import "perf_conf.proto"; +import "run_mode_conf.proto"; message CyberConfig { optional LogConf log_conf = 1; @@ -14,4 +15,5 @@ message CyberConfig { optional SchedulerConf scheduler_conf = 3; optional TransportConf transport_conf = 4; optional PerfConf perf_conf = 5; + optional RunModeConf run_mode_conf = 6; } diff --git a/framework/cybertron/proto/run_mode_conf.proto b/framework/cybertron/proto/run_mode_conf.proto new file mode 100644 index 0000000000..7b4a177ab8 --- /dev/null +++ b/framework/cybertron/proto/run_mode_conf.proto @@ -0,0 +1,12 @@ +syntax = "proto2"; + +package apollo.cybertron.proto; + +enum RunMode { + MODE_REALITY = 0; + MODE_SIMULATION = 1; +} + +message RunModeConf { + required RunMode run_mode = 1 [default = MODE_REALITY]; +} diff --git a/framework/cybertron/test/CMakeLists.txt b/framework/cybertron/test/CMakeLists.txt index a8e7762885..905c2bb6e8 100644 --- a/framework/cybertron/test/CMakeLists.txt +++ b/framework/cybertron/test/CMakeLists.txt @@ -21,3 +21,4 @@ add_subdirectory(tools) add_subdirectory(record) add_subdirectory(component) add_subdirectory(mainboard) +add_subdirectory(dispatcher) diff --git a/framework/cybertron/test/dispatcher/CMakeLists.txt b/framework/cybertron/test/dispatcher/CMakeLists.txt new file mode 100644 index 0000000000..5f3849dfaa --- /dev/null +++ b/framework/cybertron/test/dispatcher/CMakeLists.txt @@ -0,0 +1,27 @@ +project(dispatcher_test) + +add_executable(dispatcher_dispatcher_test + dispatcher_test.cpp +) + +target_link_libraries(dispatcher_dispatcher_test + cybertron + cybertron_dispatcher + gtest +) + +add_executable(dispatcher_message_test + message_test.cpp +) + +target_link_libraries(dispatcher_message_test + cybertron + cybertron_dispatcher + gtest +) + +install(TARGETS + dispatcher_dispatcher_test + dispatcher_message_test + DESTINATION ${UNIT_TEST_INSTALL_PREFIX} +) \ No newline at end of file diff --git a/framework/cybertron/test/dispatcher/dispatcher_test.cpp b/framework/cybertron/test/dispatcher/dispatcher_test.cpp new file mode 100644 index 0000000000..70e7837aff --- /dev/null +++ b/framework/cybertron/test/dispatcher/dispatcher_test.cpp @@ -0,0 +1,104 @@ +/****************************************************************************** + * Copyright 2018 The Apollo Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *****************************************************************************/ + +#include "gtest/gtest.h" + +#include "cybertron/dispatcher/dispatcher.h" +#include "cybertron/proto/unit_test.pb.h" + +namespace apollo { +namespace cybertron { +namespace dispatcher { + +using apollo::cybertron::proto::UnitTest; + +TEST(DispatcherTest, constructor) { + Dispatcher dispatcher; + auto msg = dispatcher.GetMessage("channel"); + EXPECT_EQ(msg, nullptr); +} + +TEST(DispatcherTest, publish) { + Dispatcher dispatcher; + + auto msg1 = std::make_shared(); + msg1->set_class_name("MessageTest"); + msg1->set_case_name("publish_1"); + + dispatcher.Publish("channel1", msg1); + auto message1 = dispatcher.GetMessage("channel1"); + EXPECT_NE(message1, nullptr); + EXPECT_FALSE(message1->IsPublishedEmpty()); + + UnitTest msg2; + msg2.set_class_name("MessageTest"); + msg2.set_case_name("publish_2"); + + dispatcher.Publish("channel2", msg2); + auto message2 = dispatcher.GetMessage("channel2"); + EXPECT_NE(message2, nullptr); + EXPECT_FALSE(message2->IsPublishedEmpty()); + + EXPECT_TRUE(message1->IsObservedEmpty()); + EXPECT_TRUE(message2->IsObservedEmpty()); + + dispatcher.Observe(); + + EXPECT_FALSE(message1->IsObservedEmpty()); + EXPECT_FALSE(message2->IsObservedEmpty()); +} + +TEST(DispatcherTest, subscribe) { + Dispatcher dispatcher; + + auto received_msg = std::make_shared(); + bool res = dispatcher.Subscribe( + "channel", 10, "DispatcherTest", + [&received_msg](const std::shared_ptr& msg) { + received_msg->CopyFrom(*msg); + }); + + EXPECT_TRUE(res); + + auto msg1 = std::make_shared(); + msg1->set_class_name("MessageTest"); + msg1->set_case_name("publish_1"); + + dispatcher.Publish("channel", msg1); + EXPECT_EQ(received_msg->class_name(), msg1->class_name()); + EXPECT_EQ(received_msg->case_name(), msg1->case_name()); + + res = dispatcher.Unsubscribe("channel", "DispatcherTest"); + EXPECT_TRUE(res); + + res = dispatcher.Unsubscribe("channel", "DispatcherTest"); + EXPECT_FALSE(res); + + res = dispatcher.Unsubscribe("channel", "DispatcherTest_11"); + EXPECT_FALSE(res); + + res = dispatcher.Unsubscribe("channel_11", "DispatcherTest"); + EXPECT_FALSE(res); +} + +} // namespace dispatcher +} // namespace cybertron +} // namespace apollo + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/framework/cybertron/test/dispatcher/message_test.cpp b/framework/cybertron/test/dispatcher/message_test.cpp new file mode 100644 index 0000000000..d0cad71372 --- /dev/null +++ b/framework/cybertron/test/dispatcher/message_test.cpp @@ -0,0 +1,124 @@ +/****************************************************************************** + * Copyright 2018 The Apollo Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *****************************************************************************/ + +#include "gtest/gtest.h" + +#include "cybertron/dispatcher/message.h" +#include "cybertron/proto/unit_test.pb.h" + +namespace apollo { +namespace cybertron { +namespace dispatcher { + +using apollo::cybertron::proto::UnitTest; + +TEST(MessageTest, constructor) { + MessageAttr attr(10, "channel"); + Message message(attr); + EXPECT_EQ(message.capacity(), 10); + EXPECT_EQ(message.channel_name(), "channel"); + + message.set_capacity(20); + EXPECT_EQ(message.capacity(), 20); +} + +TEST(MessageTest, publish) { + MessageAttr attr(10, "channel"); + Message message(attr); + + auto msg1 = std::make_shared(); + msg1->set_class_name("MessageTest"); + msg1->set_case_name("publish_1"); + + UnitTest msg2; + msg2.set_class_name("MessageTest"); + msg2.set_case_name("publish_2"); + + EXPECT_TRUE(message.IsPublishedEmpty()); + message.Publish(msg1); + message.Publish(msg2); + EXPECT_FALSE(message.IsPublishedEmpty()); + + EXPECT_TRUE(message.IsObservedEmpty()); + message.Observe(); + EXPECT_FALSE(message.IsObservedEmpty()); + + auto& latest_observed_msg = message.GetLatestObserved(); + EXPECT_EQ(latest_observed_msg.class_name(), "MessageTest"); + EXPECT_EQ(latest_observed_msg.case_name(), "publish_2"); + + auto latest_observed_msg_ptr = message.GetLatestObservedPtr(); + EXPECT_EQ(latest_observed_msg_ptr->class_name(), "MessageTest"); + EXPECT_EQ(latest_observed_msg_ptr->case_name(), "publish_2"); + + auto latest_published_ptr = message.GetLatestPublishedPtr(); + EXPECT_EQ(latest_published_ptr->class_name(), "MessageTest"); + EXPECT_EQ(latest_published_ptr->case_name(), "publish_2"); + + message.ClearPublished(); + EXPECT_TRUE(message.IsPublishedEmpty()); + EXPECT_FALSE(message.IsObservedEmpty()); +} + +TEST(MessageTest, subscribe) { + MessageAttr attr(10, "channel"); + Message message(attr); + + auto received_msg = std::make_shared(); + bool res = message.Subscribe( + "MessageTest1", [&received_msg](const std::shared_ptr& msg) { + received_msg->CopyFrom(*msg); + }); + + EXPECT_TRUE(res); + + auto msg1 = std::make_shared(); + msg1->set_class_name("MessageTest"); + msg1->set_case_name("publish_1"); + + message.Publish(msg1); + + EXPECT_EQ(received_msg->class_name(), msg1->class_name()); + EXPECT_EQ(received_msg->case_name(), msg1->case_name()); + + res = message.Subscribe( + "MessageTest1", [&received_msg](const std::shared_ptr& msg) { + received_msg->CopyFrom(*msg); + }); + + EXPECT_FALSE(res); + + res = message.Unsubscribe("MessageTest1"); + EXPECT_TRUE(res); + res = message.Unsubscribe("MessageTest1"); + EXPECT_FALSE(res); + + UnitTest msg2; + msg2.set_class_name("MessageTest"); + msg2.set_case_name("publish_2"); + + message.Publish(msg2); + EXPECT_NE(received_msg->case_name(), msg2.case_name()); +} + +} // namespace dispatcher +} // namespace cybertron +} // namespace apollo + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/framework/cybertron/timer/timer.cpp b/framework/cybertron/timer/timer.cpp index c9134a7b25..6a6e9bc75f 100644 --- a/framework/cybertron/timer/timer.cpp +++ b/framework/cybertron/timer/timer.cpp @@ -15,6 +15,7 @@ *****************************************************************************/ #include "cybertron/timer/timer.h" +#include "cybertron/common/global_data.h" namespace apollo { namespace cybertron { @@ -35,6 +36,10 @@ Timer::Timer(uint32_t period, std::function callback, bool oneshot) { void Timer::SetTimerOption(TimerOption opt) { timer_opt_ = opt; } void Timer::Start() { + if (!common::GlobalData::Instance()->IsRealityMode()) { + return; + } + if (timer_id_ == 0) { timer_id_ = tm_->Add(timer_opt_.period, timer_opt_.callback, timer_opt_.oneshot); diff --git a/framework/examples/CMakeLists.txt b/framework/examples/CMakeLists.txt index 0871f7cebf..b85a1011a8 100644 --- a/framework/examples/CMakeLists.txt +++ b/framework/examples/CMakeLists.txt @@ -82,6 +82,12 @@ add_executable(user_defined_task ) target_link_libraries(user_defined_task -Wl,--no-as-needed -pthread cybertron) +add_executable(dispatcher_example + dispatcher_example.cpp +) +target_link_libraries(dispatcher_example -pthread dl cybertron perception_component) +install(TARGETS dispatcher_example DESTINATION bin) + install(DIRECTORY dag DESTINATION ${CMAKE_INSTALL_PREFIX}) diff --git a/framework/examples/dispatcher_example.cpp b/framework/examples/dispatcher_example.cpp new file mode 100644 index 0000000000..158e37102d --- /dev/null +++ b/framework/examples/dispatcher_example.cpp @@ -0,0 +1,58 @@ +/****************************************************************************** + * Copyright 2018 The Apollo Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *****************************************************************************/ + +#include + +#include "cybertron/dispatcher/dispatcher.h" +#include "cybertron/proto/component_config.pb.h" +#include "examples/component_perception/perception_component.h" + +// To use this example, we need set run_mode to MODE_SIMULATION +// (run_mode is in file conf/cybertron.pb.conf) + +int main(int argc, char* argv[]) { + apollo::cybertron::proto::ComponentConfig config; + config.set_name("perception"); + auto reader = config.add_readers(); + reader->set_channel("/driver/channel"); + + auto perception = std::make_shared<::test::PerceptionComponent>(); + perception->Initialize(config); + + auto driver_msg = std::make_shared(); + driver_msg->set_msg_id(0); + driver_msg->set_timestamp(0); + driver_msg->mutable_header()->set_timestamp(0); + + auto driver_raw_msg = + std::make_shared(); + driver_msg->SerializeToString(&driver_raw_msg->message); + + perception->Proc(driver_raw_msg); + + auto message = apollo::cybertron::dispatcher::Dispatcher::Instance() + ->GetMessage("/perception/channel"); + + bool is_empty = message->IsPublishedEmpty(); + + if (is_empty) { + std::cout << "simulation failed." << std::endl; + } else { + std::cout << "simulation succ." << std::endl; + } + + return 0; +} -- GitLab