diff --git a/cyber/init.cc b/cyber/init.cc index 547b93d4f96e96752e2b46910fc16aaa58dad151..7e7fe654f3125ff2d65e0629dcf873f56918abf6 100644 --- a/cyber/init.cc +++ b/cyber/init.cc @@ -78,6 +78,7 @@ bool Init() { // Initialize internal static objects common::GlobalData::Instance(); + transport::Transport::Instance(); service_discovery::TopologyManager::Instance(); scheduler::Scheduler::Instance(); TaskManager::Instance(); @@ -152,7 +153,7 @@ void Shutdown() { TaskManager::Instance()->Shutdown(); scheduler::Scheduler::Instance()->ShutDown(); service_discovery::TopologyManager::Instance()->Shutdown(); - transport::Transport::Shutdown(); + transport::Transport::Instance()->Shutdown(); SetState(STATE_SHUTDOWN); } diff --git a/cyber/node/reader_base.h b/cyber/node/reader_base.h index 0b63577c84f7228a6225494b4f129b2338f64bac..76c8c4c9710901b201d049c9e2605770383115c3 100644 --- a/cyber/node/reader_base.h +++ b/cyber/node/reader_base.h @@ -96,7 +96,7 @@ auto ReceiverManager::GetReceiver( const std::string& channel_name = role_attr.channel_name(); if (receiver_map_.count(channel_name) == 0) { receiver_map_[channel_name] = - transport::Transport::CreateReceiver( + transport::Transport::Instance()->CreateReceiver( role_attr, [](const std::shared_ptr& msg, const transport::MessageInfo& msg_info, const proto::RoleAttributes& reader_attr) { diff --git a/cyber/node/writer.h b/cyber/node/writer.h index 7f45ee631f684335fcddc6300c0c6e1c4a454a0c..35f6d82f8b6fa6dd1564c021ca2140a59aeca32f 100644 --- a/cyber/node/writer.h +++ b/cyber/node/writer.h @@ -71,7 +71,8 @@ bool Writer::Init() { if (init_.exchange(true)) { return true; } - transmitter_ = transport::Transport::CreateTransmitter(role_attr_); + transmitter_ = + transport::Transport::Instance()->CreateTransmitter(role_attr_); RETURN_VAL_IF_NULL(transmitter_, false); this->role_attr_.set_id(transmitter_->id().HashValue()); channel_manager_ = diff --git a/cyber/service/client.h b/cyber/service/client.h index c3f7cb00bcfd6370c24b24f09483244a0aa62e1c..8b37ddbadc06f46c248117ae971d6b9b57353077 100644 --- a/cyber/service/client.h +++ b/cyber/service/client.h @@ -115,8 +115,9 @@ bool Client::Init() { role.set_channel_id(channel_id); role.mutable_qos_profile()->CopyFrom( transport::QosProfileConf::QOS_PROFILE_SERVICES_DEFAULT); - request_transmitter_ = transport::Transport::CreateTransmitter( - role, proto::OptionalMode::RTPS); + auto transport = transport::Transport::Instance(); + request_transmitter_ = + transport->CreateTransmitter(role, proto::OptionalMode::RTPS); if (request_transmitter_ == nullptr) { AERROR << "Create request pub failed."; return false; @@ -130,7 +131,7 @@ bool Client::Init() { role.set_channel_name(response_channel_); channel_id = common::GlobalData::RegisterChannel(response_channel_); role.set_channel_id(channel_id); - response_receiver_ = transport::Transport::CreateReceiver( + response_receiver_ = transport->CreateReceiver( role, [=](const std::shared_ptr& request, const transport::MessageInfo& message_info, diff --git a/cyber/service/service.h b/cyber/service/service.h index 3ba18285efdcff369b1d25bfed25fcdddf36e781..4d917d446e21cdcb4f84b0ced30a5110ac4461a9 100644 --- a/cyber/service/service.h +++ b/cyber/service/service.h @@ -90,8 +90,9 @@ bool Service::Init() { role.set_channel_id(channel_id); role.mutable_qos_profile()->CopyFrom( transport::QosProfileConf::QOS_PROFILE_SERVICES_DEFAULT); - response_transmitter_ = transport::Transport::CreateTransmitter( - role, proto::OptionalMode::RTPS); + auto transport = transport::Transport::Instance(); + response_transmitter_ = + transport->CreateTransmitter(role, proto::OptionalMode::RTPS); if (response_transmitter_ == nullptr) { AERROR << " Create response pub failed."; return false; @@ -104,7 +105,7 @@ bool Service::Init() { role.set_channel_name(request_channel_); channel_id = common::GlobalData::RegisterChannel(request_channel_); role.set_channel_id(channel_id); - request_receiver_ = transport::Transport::CreateReceiver( + request_receiver_ = transport->CreateReceiver( role, [=](const std::shared_ptr& request, const transport::MessageInfo& message_info, diff --git a/cyber/test/transport/hybrid_transceiver_test.cc b/cyber/test/transport/hybrid_transceiver_test.cc index a2a21c98b908eac9c6f123acf3909be9edc0d9bb..f5155c272ceb795f291930f66f9b8bc954a8e9ac 100644 --- a/cyber/test/transport/hybrid_transceiver_test.cc +++ b/cyber/test/transport/hybrid_transceiver_test.cc @@ -48,13 +48,13 @@ class HybridTransceiverTest : public ::testing::Test { attr.set_channel_id(common::Hash(channel_name_)); attr.mutable_qos_profile()->CopyFrom(QosProfileConf::QOS_PROFILE_DEFAULT); transmitter_a_ = std::make_shared>( - attr, Transport::participant()); + attr, Transport::Instance()->participant()); attr.set_process_id(54321); attr.mutable_qos_profile()->CopyFrom( QosProfileConf::QOS_PROFILE_TOPO_CHANGE); transmitter_b_ = std::make_shared>( - attr, Transport::participant()); + attr, Transport::Instance()->participant()); } virtual void TearDown() { @@ -71,9 +71,9 @@ TEST_F(HybridTransceiverTest, constructor) { RoleAttributes attr; TransmitterPtr transmitter = std::make_shared>( - attr, Transport::participant()); + attr, Transport::Instance()->participant()); ReceiverPtr receiver = std::make_shared>( - attr, nullptr, Transport::participant()); + attr, nullptr, Transport::Instance()->participant()); EXPECT_EQ(transmitter->seq_num(), 0); @@ -103,7 +103,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_no_param) { (void)attr; msgs.emplace_back(*msg); }, - Transport::participant()); + Transport::Instance()->participant()); receiver->Enable(); @@ -155,7 +155,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_no_relation) { std::lock_guard lock(mtx); msgs.emplace_back(*msg); }, - Transport::participant()); + Transport::Instance()->participant()); attr.mutable_qos_profile()->CopyFrom(QosProfileConf::QOS_PROFILE_TOPO_CHANGE); ReceiverPtr receiver_b = std::make_shared>( @@ -167,7 +167,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_no_relation) { std::lock_guard lock(mtx); msgs.emplace_back(*msg); }, - Transport::participant()); + Transport::Instance()->participant()); auto msg = std::make_shared(); msg->set_class_name("HybridTransceiverTest"); @@ -208,7 +208,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_same_process) { std::lock_guard lock(mtx); msgs.emplace_back(*msg); }, - Transport::participant()); + Transport::Instance()->participant()); attr.mutable_qos_profile()->CopyFrom(QosProfileConf::QOS_PROFILE_TOPO_CHANGE); ReceiverPtr receiver_b = std::make_shared>( @@ -220,7 +220,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_same_process) { std::lock_guard lock(mtx); msgs.emplace_back(*msg); }, - Transport::participant()); + Transport::Instance()->participant()); std::string class_name("HybridTransceiverTest"); std::string case_name("enable_and_disable_with_param_same_process"); @@ -280,7 +280,7 @@ TEST_F(HybridTransceiverTest, std::lock_guard lock(mtx); msgs.emplace_back(*msg); }, - Transport::participant()); + Transport::Instance()->participant()); attr.mutable_qos_profile()->CopyFrom(QosProfileConf::QOS_PROFILE_TOPO_CHANGE); ReceiverPtr receiver_b = std::make_shared>( @@ -292,7 +292,7 @@ TEST_F(HybridTransceiverTest, std::lock_guard lock(mtx); msgs.emplace_back(*msg); }, - Transport::participant()); + Transport::Instance()->participant()); std::string class_name("HybridTransceiverTest"); std::string case_name("enable_and_disable_with_param_same_host_diff_proc"); @@ -348,7 +348,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_diff_host) { std::lock_guard lock(mtx); msgs.emplace_back(*msg); }, - Transport::participant()); + Transport::Instance()->participant()); attr.mutable_qos_profile()->CopyFrom(QosProfileConf::QOS_PROFILE_TOPO_CHANGE); ReceiverPtr receiver_b = std::make_shared>( @@ -360,7 +360,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_diff_host) { std::lock_guard lock(mtx); msgs.emplace_back(*msg); }, - Transport::participant()); + Transport::Instance()->participant()); std::string class_name("HybridTransceiverTest"); std::string case_name("enable_and_disable_with_param_same_host_diff_proc"); diff --git a/cyber/test/transport/rtps_dispatcher_test.cc b/cyber/test/transport/rtps_dispatcher_test.cc index 6f4da5eb7aa66251d3113690c63a33d76961a911..0432677d1b6850f9b1ad9294189e7d1c01497632 100644 --- a/cyber/test/transport/rtps_dispatcher_test.cc +++ b/cyber/test/transport/rtps_dispatcher_test.cc @@ -74,7 +74,7 @@ TEST(RtpsDispatcherTest, on_message) { recv_msg->CopyFrom(*msg); }); - auto transmitter = Transport::CreateTransmitter( + auto transmitter = Transport::Instance()->CreateTransmitter( self_attr, proto::OptionalMode::RTPS); EXPECT_TRUE(transmitter != nullptr); diff --git a/cyber/test/transport/rtps_transceiver_test.cc b/cyber/test/transport/rtps_transceiver_test.cc index ad1a8a367c2fa4800925b01e521e6164cebd433c..6234f1f3d761dce632fb9b93875753ea9c82022d 100644 --- a/cyber/test/transport/rtps_transceiver_test.cc +++ b/cyber/test/transport/rtps_transceiver_test.cc @@ -42,9 +42,9 @@ class RtpsTransceiverTest : public ::testing::Test { attr.set_channel_name(channel_name_); attr.set_channel_id(common::Hash(channel_name_)); transmitter_a_ = std::make_shared>( - attr, Transport::participant()); + attr, Transport::Instance()->participant()); transmitter_b_ = std::make_shared>( - attr, Transport::participant()); + attr, Transport::Instance()->participant()); transmitter_a_->Enable(); transmitter_b_->Enable(); @@ -64,7 +64,7 @@ TEST_F(RtpsTransceiverTest, constructor) { RoleAttributes attr; TransmitterPtr transmitter = std::make_shared>( - attr, Transport::participant()); + attr, Transport::Instance()->participant()); ReceiverPtr receiver = std::make_shared>(attr, nullptr); diff --git a/cyber/test/transport/shm_dispatcher_test.cc b/cyber/test/transport/shm_dispatcher_test.cc index 48cefac806fc974995252456955f4ce4cc38ba24..5fc9e119c9f8b30fcf2f6b34f2c4924b40497e64 100644 --- a/cyber/test/transport/shm_dispatcher_test.cc +++ b/cyber/test/transport/shm_dispatcher_test.cc @@ -66,8 +66,9 @@ TEST(ShmDispatcherTest, on_message) { Identity oppo_id; oppo_attr.set_id(oppo_id.HashValue()); - auto transmitter = Transport::CreateTransmitter( - oppo_attr, proto::OptionalMode::SHM); + auto transmitter = + Transport::Instance()->CreateTransmitter( + oppo_attr, proto::OptionalMode::SHM); EXPECT_TRUE(transmitter != nullptr); auto send_msg = std::make_shared("raw_message"); diff --git a/cyber/test/transport/transport_test.cc b/cyber/test/transport/transport_test.cc index c4182f18882b846b3eea4e520a17356c6a124187..e02f8e52283711f3720f016dd81d7f7fba8cf0eb 100644 --- a/cyber/test/transport/transport_test.cc +++ b/cyber/test/transport/transport_test.cc @@ -30,9 +30,9 @@ using TransmitterPtr = std::shared_ptr>; using ReceiverPtr = std::shared_ptr>; TEST(TransportTest, constructor) { - Transport transport_a; - Transport transport_b; - EXPECT_EQ(transport_a.participant(), transport_b.participant()); + auto transport_a = Transport::Instance(); + auto transport_b = Transport::Instance(); + EXPECT_EQ(transport_a->participant(), transport_b->participant()); } TEST(TransportTest, create_transmitter) { @@ -45,19 +45,23 @@ TEST(TransportTest, create_transmitter) { attr.set_id(id.HashValue()); TransmitterPtr intra = - Transport::CreateTransmitter(attr, OptionalMode::INTRA); + Transport::Instance()->CreateTransmitter( + attr, OptionalMode::INTRA); EXPECT_EQ(typeid(*intra), typeid(IntraTransmitter)); TransmitterPtr shm = - Transport::CreateTransmitter(attr, OptionalMode::SHM); + Transport::Instance()->CreateTransmitter( + attr, OptionalMode::SHM); EXPECT_EQ(typeid(*shm), typeid(ShmTransmitter)); TransmitterPtr rtps = - Transport::CreateTransmitter(attr, OptionalMode::RTPS); + Transport::Instance()->CreateTransmitter( + attr, OptionalMode::RTPS); EXPECT_EQ(typeid(*rtps), typeid(RtpsTransmitter)); attr.mutable_qos_profile()->CopyFrom(QosProfileConf::QOS_PROFILE_DEFAULT); - TransmitterPtr hybrid = Transport::CreateTransmitter(attr); + TransmitterPtr hybrid = + Transport::Instance()->CreateTransmitter(attr); EXPECT_EQ(typeid(*hybrid), typeid(HybridTransmitter)); } @@ -70,21 +74,21 @@ TEST(TransportTest, create_receiver) { auto listener = [](const std::shared_ptr&, const MessageInfo&, const RoleAttributes&) {}; - ReceiverPtr intra = Transport::CreateReceiver( + ReceiverPtr intra = Transport::Instance()->CreateReceiver( attr, listener, OptionalMode::INTRA); EXPECT_EQ(typeid(*intra), typeid(IntraReceiver)); - ReceiverPtr shm = Transport::CreateReceiver( + ReceiverPtr shm = Transport::Instance()->CreateReceiver( attr, listener, OptionalMode::SHM); EXPECT_EQ(typeid(*shm), typeid(ShmReceiver)); - ReceiverPtr rtps = Transport::CreateReceiver( + ReceiverPtr rtps = Transport::Instance()->CreateReceiver( attr, listener, OptionalMode::RTPS); EXPECT_EQ(typeid(*rtps), typeid(RtpsReceiver)); attr.mutable_qos_profile()->CopyFrom(QosProfileConf::QOS_PROFILE_DEFAULT); ReceiverPtr hybrid = - Transport::CreateReceiver(attr, listener); + Transport::Instance()->CreateReceiver(attr, listener); EXPECT_EQ(typeid(*hybrid), typeid(HybridReceiver)); } diff --git a/cyber/transport/BUILD b/cyber/transport/BUILD index 63f7c74c1bfa3902c58e67ac67a11f690070bbe6..cafc54116063f97dc1a4e1928194cc1e4f146492 100644 --- a/cyber/transport/BUILD +++ b/cyber/transport/BUILD @@ -6,15 +6,14 @@ cc_library( name = "transport", srcs = [ "transport.cc", - "dispatcher/rtps_dispatcher.cc", ], hdrs = [ "transport.h", - "dispatcher/rtps_dispatcher.h", ], deps = [ "history", "intra_dispatcher", + "rtps_dispatcher", "shm_dispatcher", "hybrid_receiver", "intra_receiver", @@ -137,6 +136,20 @@ cc_test( ], ) +cc_library( + name = "rtps_dispatcher", + srcs = [ "dispatcher/rtps_dispatcher.cc", ], + hdrs = [ "dispatcher/rtps_dispatcher.h", ], + deps = [ + "//cyber/proto:role_attributes_cc_proto", + "//cyber/message:message_traits", + "dispatcher", + "attributes_filler", + "sub_listener", + "participant", + ], +) + cc_test( name = "rtps_dispatcher_test", size = "small", @@ -146,7 +159,7 @@ cc_test( deps = [ "//cyber:cyber_core", "//cyber/proto:chatter_cc_proto", - "@gtest//:main", + "@gtest", ], ) diff --git a/cyber/transport/dispatcher/dispatcher.cc b/cyber/transport/dispatcher/dispatcher.cc index c16e8df02a6380a2403d2a722ed82ebf0dddfcee..7e629ae850593d43981e13452f5b8924a9463588 100644 --- a/cyber/transport/dispatcher/dispatcher.cc +++ b/cyber/transport/dispatcher/dispatcher.cc @@ -20,11 +20,14 @@ namespace apollo { namespace cyber { namespace transport { -Dispatcher::Dispatcher() : shutdown_(false) {} +Dispatcher::Dispatcher() : is_shutdown_(false) {} Dispatcher::~Dispatcher() { Shutdown(); } -void Dispatcher::Shutdown() { ADEBUG << "Shutdown"; } +void Dispatcher::Shutdown() { + is_shutdown_.exchange(true); + ADEBUG << "Shutdown"; +} bool Dispatcher::HasChannel(uint64_t channel_id) { return msg_listeners_.Has(channel_id); diff --git a/cyber/transport/dispatcher/dispatcher.h b/cyber/transport/dispatcher/dispatcher.h index 393d87cbab20631748a1274e2b1bde740e21e769..e5275115757d16c68c983783f87acd42067e92e3 100644 --- a/cyber/transport/dispatcher/dispatcher.h +++ b/cyber/transport/dispatcher/dispatcher.h @@ -17,6 +17,7 @@ #ifndef CYBER_TRANSPORT_DISPATCHER_DISPATCHER_H_ #define CYBER_TRANSPORT_DISPATCHER_DISPATCHER_H_ +#include #include #include #include @@ -76,7 +77,7 @@ class Dispatcher { bool HasChannel(uint64_t channel_id); protected: - bool shutdown_; + std::atomic is_shutdown_; // key: channel_id of message AtomicHashMap msg_listeners_; base::AtomicRWLock rw_lock_; @@ -85,6 +86,9 @@ class Dispatcher { template void Dispatcher::AddListener(const RoleAttributes& self_attr, const MessageListener& listener) { + if (is_shutdown_.load()) { + return; + } uint64_t channel_id = self_attr.channel_id(); std::shared_ptr> handler; @@ -105,6 +109,9 @@ template void Dispatcher::AddListener(const RoleAttributes& self_attr, const RoleAttributes& opposite_attr, const MessageListener& listener) { + if (is_shutdown_.load()) { + return; + } uint64_t channel_id = self_attr.channel_id(); std::shared_ptr> handler; @@ -123,6 +130,9 @@ void Dispatcher::AddListener(const RoleAttributes& self_attr, template void Dispatcher::RemoveListener(const RoleAttributes& self_attr) { + if (is_shutdown_.load()) { + return; + } uint64_t channel_id = self_attr.channel_id(); ListenerHandlerBasePtr* handler_base = nullptr; @@ -134,6 +144,9 @@ void Dispatcher::RemoveListener(const RoleAttributes& self_attr) { template void Dispatcher::RemoveListener(const RoleAttributes& self_attr, const RoleAttributes& opposite_attr) { + if (is_shutdown_.load()) { + return; + } uint64_t channel_id = self_attr.channel_id(); ListenerHandlerBasePtr* handler_base = nullptr; diff --git a/cyber/transport/dispatcher/intra_dispatcher.h b/cyber/transport/dispatcher/intra_dispatcher.h index c4908b67f9b964c55e759a2c111145719da58b90..0017722dc344be081f6cddac6cfd0000be912093 100644 --- a/cyber/transport/dispatcher/intra_dispatcher.h +++ b/cyber/transport/dispatcher/intra_dispatcher.h @@ -49,6 +49,9 @@ template void IntraDispatcher::OnMessage(uint64_t channel_id, const std::shared_ptr& message, const MessageInfo& message_info) { + if (is_shutdown_.load()) { + return; + } ADEBUG << "intra on message, channel:" << common::GlobalData::GetChannelById(channel_id); ListenerHandlerBasePtr* handler_base = nullptr; diff --git a/cyber/transport/dispatcher/rtps_dispatcher.cc b/cyber/transport/dispatcher/rtps_dispatcher.cc index 5ad9b1f63432f29a2c5e91f41d70a1c79bbaebc0..744442f2f779ac65ac7bdd5eca7cf46c5a8a0225 100644 --- a/cyber/transport/dispatcher/rtps_dispatcher.cc +++ b/cyber/transport/dispatcher/rtps_dispatcher.cc @@ -16,29 +16,35 @@ #include "cyber/transport/dispatcher/rtps_dispatcher.h" -#include "cyber/transport/transport.h" - namespace apollo { namespace cyber { namespace transport { -RtpsDispatcher::RtpsDispatcher() {} +RtpsDispatcher::RtpsDispatcher() : participant_(nullptr) {} RtpsDispatcher::~RtpsDispatcher() { Shutdown(); } void RtpsDispatcher::Shutdown() { - if (shutdown_) { + if (is_shutdown_.exchange(true)) { return; } - shutdown_ = true; - std::lock_guard lock(subs_mutex_); - for (auto& item : subs_) { - item.second.sub = nullptr; + { + std::lock_guard lock(subs_mutex_); + for (auto& item : subs_) { + item.second.sub = nullptr; + } } + + participant_ = nullptr; } void RtpsDispatcher::AddSubscriber(const RoleAttributes& self_attr) { + if (participant_ == nullptr) { + AWARN << "please set participant firstly."; + return; + } + uint64_t channel_id = self_attr.channel_id(); std::lock_guard lock(subs_mutex_); if (subs_.count(channel_id) > 0) { @@ -55,7 +61,7 @@ void RtpsDispatcher::AddSubscriber(const RoleAttributes& self_attr) { std::placeholders::_2, std::placeholders::_3)); new_sub.sub = eprosima::fastrtps::Domain::createSubscriber( - Transport::participant()->fastrtps_participant(), sub_attr, + participant_->fastrtps_participant(), sub_attr, new_sub.sub_listener.get()); RETURN_IF_NULL(new_sub.sub); subs_[channel_id] = new_sub; @@ -64,6 +70,10 @@ void RtpsDispatcher::AddSubscriber(const RoleAttributes& self_attr) { void RtpsDispatcher::OnMessage(uint64_t channel_id, const std::shared_ptr& msg_str, const MessageInfo& msg_info) { + if (is_shutdown_.load()) { + return; + } + ListenerHandlerBasePtr* handler_base = nullptr; if (msg_listeners_.Get(channel_id, &handler_base)) { auto handler = diff --git a/cyber/transport/dispatcher/rtps_dispatcher.h b/cyber/transport/dispatcher/rtps_dispatcher.h index 0f7b145b37829d6ed438cdddae16748341e5fa15..8e253621744b7a795ac8539499afd8570f42b21a 100644 --- a/cyber/transport/dispatcher/rtps_dispatcher.h +++ b/cyber/transport/dispatcher/rtps_dispatcher.h @@ -28,6 +28,7 @@ #include "cyber/message/message_traits.h" #include "cyber/transport/dispatcher/dispatcher.h" #include "cyber/transport/rtps/attributes_filler.h" +#include "cyber/transport/rtps/participant.h" #include "cyber/transport/rtps/sub_listener.h" namespace apollo { @@ -59,6 +60,10 @@ class RtpsDispatcher : public Dispatcher { const RoleAttributes& opposite_attr, const MessageListener& listener); + void set_participant(const ParticipantPtr& participant) { + participant_ = participant; + } + private: void OnMessage(uint64_t channel_id, const std::shared_ptr& msg_str, @@ -68,6 +73,8 @@ class RtpsDispatcher : public Dispatcher { std::unordered_map subs_; std::mutex subs_mutex_; + ParticipantPtr participant_; + DECLARE_SINGLETON(RtpsDispatcher) }; @@ -75,8 +82,8 @@ template void RtpsDispatcher::AddListener(const RoleAttributes& self_attr, const MessageListener& listener) { auto listener_adapter = [listener]( - const std::shared_ptr& msg_str, - const MessageInfo& msg_info) { + const std::shared_ptr& msg_str, + const MessageInfo& msg_info) { auto msg = std::make_shared(); RETURN_IF(!message::ParseFromString(*msg_str, msg.get())); listener(msg, msg_info); @@ -91,8 +98,8 @@ void RtpsDispatcher::AddListener(const RoleAttributes& self_attr, const RoleAttributes& opposite_attr, const MessageListener& listener) { auto listener_adapter = [listener]( - const std::shared_ptr& msg_str, - const MessageInfo& msg_info) { + const std::shared_ptr& msg_str, + const MessageInfo& msg_info) { auto msg = std::make_shared(); RETURN_IF(!message::ParseFromString(*msg_str, msg.get())); listener(msg, msg_info); diff --git a/cyber/transport/dispatcher/rtps_dispatcher_test.cc b/cyber/transport/dispatcher/rtps_dispatcher_test.cc index cf94c58728344008ae93c80df017417901f78d34..d239a2f4897d3acf27da7e5236000d67404b3e5d 100644 --- a/cyber/transport/dispatcher/rtps_dispatcher_test.cc +++ b/cyber/transport/dispatcher/rtps_dispatcher_test.cc @@ -75,7 +75,7 @@ TEST(RtpsDispatcherTest, on_message) { recv_msg->CopyFrom(*msg); }); - auto transmitter = Transport::CreateTransmitter( + auto transmitter = Transport::Instance()->CreateTransmitter( self_attr, proto::OptionalMode::RTPS); EXPECT_TRUE(transmitter != nullptr); @@ -105,3 +105,11 @@ TEST(RtpsDispatcherTest, shutdown) { } // namespace transport } // namespace cyber } // namespace apollo + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + apollo::cyber::transport::Transport::Instance(); + auto res = RUN_ALL_TESTS(); + apollo::cyber::transport::Transport::Instance()->Shutdown(); + return res; +} diff --git a/cyber/transport/dispatcher/shm_dispatcher.cc b/cyber/transport/dispatcher/shm_dispatcher.cc index 5653081f5e4102149ad1dc2a7da5159c65dc88ea..6a75e0ce1675f1571884d50c189a76da310dae50 100644 --- a/cyber/transport/dispatcher/shm_dispatcher.cc +++ b/cyber/transport/dispatcher/shm_dispatcher.cc @@ -30,10 +30,10 @@ ShmDispatcher::ShmDispatcher() : host_id_(0), sfd_(-1) { Init(); } ShmDispatcher::~ShmDispatcher() { Shutdown(); } void ShmDispatcher::Shutdown() { - if (shutdown_) { + if (is_shutdown_.exchange(true)) { return; } - shutdown_ = true; + segments_.clear(); if (thread_.joinable()) { thread_.join(); @@ -54,6 +54,9 @@ void ShmDispatcher::AddSegment(const RoleAttributes& self_attr) { void ShmDispatcher::OnMessage(uint64_t channel_id, const std::shared_ptr& msg_str, const MessageInfo& msg_info) { + if (is_shutdown_.load()) { + return; + } ListenerHandlerBasePtr* handler_base = nullptr; if (msg_listeners_.Get(channel_id, &handler_base)) { auto handler = @@ -69,7 +72,7 @@ void ShmDispatcher::ThreadFunc() { std::string msg_info_str(""); std::shared_ptr msg_str_ptr = std::make_shared(); - while (!shutdown_) { + while (!is_shutdown_.load()) { int read_bytes = recvfrom(sfd_, buf, 1024, 0, (struct sockaddr*)&local_addr_, reinterpret_cast(&addr_len)); diff --git a/cyber/transport/dispatcher/shm_dispatcher_test.cc b/cyber/transport/dispatcher/shm_dispatcher_test.cc index 0f5575e276ad3365f2d89247d2975aa024717f87..ef7dc35fbb10f3c33ef21799c95d31769227d7eb 100644 --- a/cyber/transport/dispatcher/shm_dispatcher_test.cc +++ b/cyber/transport/dispatcher/shm_dispatcher_test.cc @@ -67,8 +67,9 @@ TEST(ShmDispatcherTest, on_message) { Identity oppo_id; oppo_attr.set_id(oppo_id.HashValue()); - auto transmitter = Transport::CreateTransmitter( - oppo_attr, proto::OptionalMode::SHM); + auto transmitter = + Transport::Instance()->CreateTransmitter( + oppo_attr, proto::OptionalMode::SHM); EXPECT_TRUE(transmitter != nullptr); auto send_msg = std::make_shared("raw_message"); diff --git a/cyber/transport/rtps/participant.cc b/cyber/transport/rtps/participant.cc index 35f1ded3b1333d9bda58e7bc08147e09f1583ac7..227046718c8bd1d77491f410106f3260101cc1e4 100644 --- a/cyber/transport/rtps/participant.cc +++ b/cyber/transport/rtps/participant.cc @@ -48,6 +48,10 @@ void Participant::Shutdown() { } eprosima::fastrtps::Participant* Participant::fastrtps_participant() { + if (shutdown_.load()) { + return nullptr; + } + std::lock_guard lck(mutex_); if (fastrtps_participant_ != nullptr) { return fastrtps_participant_; diff --git a/cyber/transport/transceiver/hybrid_transceiver_test.cc b/cyber/transport/transceiver/hybrid_transceiver_test.cc index 0360e13ffe5850f44301112d7997474865285581..dbe97ae5fe0f5c6a5ba5ed2739fd563a78617c98 100644 --- a/cyber/transport/transceiver/hybrid_transceiver_test.cc +++ b/cyber/transport/transceiver/hybrid_transceiver_test.cc @@ -50,12 +50,12 @@ class HybridTransceiverTest : public ::testing::Test { attr.set_channel_id(common::Hash(channel_name_)); attr.mutable_qos_profile()->CopyFrom(QosProfileConf::QOS_PROFILE_DEFAULT); transmitter_a_ = std::make_shared>( - attr, Transport::participant()); + attr, Transport::Instance()->participant()); attr.set_process_id(common::GlobalData::Instance()->ProcessId() + 1); attr.mutable_qos_profile()->CopyFrom(QosProfileConf::QOS_PROFILE_DEFAULT); transmitter_b_ = std::make_shared>( - attr, Transport::participant()); + attr, Transport::Instance()->participant()); } virtual void TearDown() { @@ -72,9 +72,9 @@ TEST_F(HybridTransceiverTest, constructor) { RoleAttributes attr; TransmitterPtr transmitter = std::make_shared>( - attr, Transport::participant()); + attr, Transport::Instance()->participant()); ReceiverPtr receiver = std::make_shared>( - attr, nullptr, Transport::participant()); + attr, nullptr, Transport::Instance()->participant()); EXPECT_EQ(transmitter->seq_num(), 0); @@ -104,7 +104,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_no_param) { (void)attr; msgs.emplace_back(*msg); }, - Transport::participant()); + Transport::Instance()->participant()); receiver->Enable(); @@ -156,7 +156,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_no_relation) { std::lock_guard lock(mtx); msgs.emplace_back(*msg); }, - Transport::participant()); + Transport::Instance()->participant()); ReceiverPtr receiver_b = std::make_shared>( attr, @@ -167,7 +167,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_no_relation) { std::lock_guard lock(mtx); msgs.emplace_back(*msg); }, - Transport::participant()); + Transport::Instance()->participant()); auto msg = std::make_shared(); msg->set_class_name("HybridTransceiverTest"); @@ -208,7 +208,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_same_process) { std::lock_guard lock(mtx); msgs.emplace_back(*msg); }, - Transport::participant()); + Transport::Instance()->participant()); ReceiverPtr receiver_b = std::make_shared>( attr, @@ -219,7 +219,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_same_process) { std::lock_guard lock(mtx); msgs.emplace_back(*msg); }, - Transport::participant()); + Transport::Instance()->participant()); std::string class_name("HybridTransceiverTest"); std::string case_name("enable_and_disable_with_param_same_process"); @@ -279,7 +279,7 @@ TEST_F(HybridTransceiverTest, std::lock_guard lock(mtx); msgs.emplace_back(*msg); }, - Transport::participant()); + Transport::Instance()->participant()); ReceiverPtr receiver_b = std::make_shared>( attr, @@ -290,7 +290,7 @@ TEST_F(HybridTransceiverTest, std::lock_guard lock(mtx); msgs.emplace_back(*msg); }, - Transport::participant()); + Transport::Instance()->participant()); std::string class_name("HybridTransceiverTest"); std::string case_name("enable_and_disable_with_param_same_host_diff_proc"); @@ -346,7 +346,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_diff_host) { std::lock_guard lock(mtx); msgs.emplace_back(*msg); }, - Transport::participant()); + Transport::Instance()->participant()); ReceiverPtr receiver_b = std::make_shared>( attr, @@ -357,7 +357,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_diff_host) { std::lock_guard lock(mtx); msgs.emplace_back(*msg); }, - Transport::participant()); + Transport::Instance()->participant()); std::string class_name("HybridTransceiverTest"); std::string case_name("enable_and_disable_with_param_same_host_diff_proc"); @@ -395,7 +395,8 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_diff_host) { int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); + apollo::cyber::transport::Transport::Instance(); auto res = RUN_ALL_TESTS(); - apollo::cyber::transport::Transport::Shutdown(); + apollo::cyber::transport::Transport::Instance()->Shutdown(); return res; } diff --git a/cyber/transport/transceiver/rtps_transceiver_test.cc b/cyber/transport/transceiver/rtps_transceiver_test.cc index d65039e6816632add3d36421d1fb6d722d9a50ed..0e9efbb1a402f20d499a1c2a2d36187d55b9f35d 100644 --- a/cyber/transport/transceiver/rtps_transceiver_test.cc +++ b/cyber/transport/transceiver/rtps_transceiver_test.cc @@ -44,9 +44,9 @@ class RtpsTransceiverTest : public ::testing::Test { attr.set_channel_name(channel_name_); attr.set_channel_id(common::Hash(channel_name_)); transmitter_a_ = std::make_shared>( - attr, Transport::participant()); + attr, Transport::Instance()->participant()); transmitter_b_ = std::make_shared>( - attr, Transport::participant()); + attr, Transport::Instance()->participant()); transmitter_a_->Enable(); transmitter_b_->Enable(); @@ -66,7 +66,7 @@ TEST_F(RtpsTransceiverTest, constructor) { RoleAttributes attr; TransmitterPtr transmitter = std::make_shared>( - attr, Transport::participant()); + attr, Transport::Instance()->participant()); ReceiverPtr receiver = std::make_shared>(attr, nullptr); @@ -154,7 +154,8 @@ TEST_F(RtpsTransceiverTest, enable_and_disable) { int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); + apollo::cyber::transport::Transport::Instance(); auto res = RUN_ALL_TESTS(); - apollo::cyber::transport::Transport::Shutdown(); + apollo::cyber::transport::Transport::Instance()->Shutdown(); return res; } diff --git a/cyber/transport/transport.cc b/cyber/transport/transport.cc index 862f5e72b80c76f6e6481c0deeabdcc81e75ca8b..f1e727baa478af0bd6ad415c4f3c0706bc9e79c5 100644 --- a/cyber/transport/transport.cc +++ b/cyber/transport/transport.cc @@ -16,50 +16,45 @@ #include "cyber/transport/transport.h" -#include -#include - #include "cyber/common/global_data.h" namespace apollo { namespace cyber { namespace transport { -static std::atomic shutdown_ = {false}; -static std::mutex participant_mutex_; - -ParticipantPtr Transport::participant_ = nullptr; - -Transport::Transport() {} +Transport::Transport() + : is_shutdown_(false), + participant_(nullptr), + intra_dispatcher_(nullptr), + shm_dispatcher_(nullptr), + rtps_dispatcher_(nullptr) { + CreateParticipant(); + intra_dispatcher_ = IntraDispatcher::Instance(); + shm_dispatcher_ = ShmDispatcher::Instance(); + rtps_dispatcher_ = RtpsDispatcher::Instance(); + rtps_dispatcher_->set_participant(participant_); +} -Transport::~Transport() {} +Transport::~Transport() { Shutdown(); } void Transport::Shutdown() { - if (shutdown_.exchange(true)) { + if (is_shutdown_.exchange(true)) { return; } + intra_dispatcher_->Shutdown(); + shm_dispatcher_->Shutdown(); + rtps_dispatcher_->Shutdown(); if (participant_ != nullptr) { participant_->Shutdown(); participant_ = nullptr; } } -ParticipantPtr Transport::CreateParticipant() { +void Transport::CreateParticipant() { std::string participant_name = common::GlobalData::Instance()->HostName() + "+" + std::to_string(common::GlobalData::Instance()->ProcessId()); - return std::make_shared(participant_name, 11512); -} - -ParticipantPtr Transport::participant() { - if (participant_ != nullptr) { - return participant_; - } - std::lock_guard lck(participant_mutex_); - if (participant_ == nullptr) { - participant_ = CreateParticipant(); - } - return participant_; + participant_ = std::make_shared(participant_name, 11512); } } // namespace transport diff --git a/cyber/transport/transport.h b/cyber/transport/transport.h index 479437e2d8542934777e8a8eba52de20c06bbe2d..8a374ce9a5415108e16b6c49fe5bb8e1f550190e 100644 --- a/cyber/transport/transport.h +++ b/cyber/transport/transport.h @@ -17,10 +17,15 @@ #ifndef CYBER_TRANSPORT_TRANSPORT_H_ #define CYBER_TRANSPORT_TRANSPORT_H_ +#include #include #include +#include "cyber/common/macros.h" #include "cyber/proto/transport_conf.pb.h" +#include "cyber/transport/dispatcher/intra_dispatcher.h" +#include "cyber/transport/dispatcher/rtps_dispatcher.h" +#include "cyber/transport/dispatcher/shm_dispatcher.h" #include "cyber/transport/qos/qos_profile_conf.h" #include "cyber/transport/receiver/hybrid_receiver.h" #include "cyber/transport/receiver/intra_receiver.h" @@ -42,35 +47,43 @@ using apollo::cyber::proto::OptionalMode; class Transport { public: - Transport(); virtual ~Transport(); - static void Shutdown(); + void Shutdown(); template - static auto CreateTransmitter(const RoleAttributes& attr, - const OptionalMode& mode = OptionalMode::HYBRID) - -> typename std::shared_ptr>; + auto CreateTransmitter(const RoleAttributes& attr, + const OptionalMode& mode = OptionalMode::HYBRID) -> + typename std::shared_ptr>; template - static auto CreateReceiver( - const RoleAttributes& attr, - const typename Receiver::MessageListener& msg_listener, - const OptionalMode& mode = OptionalMode::HYBRID) -> + auto CreateReceiver(const RoleAttributes& attr, + const typename Receiver::MessageListener& msg_listener, + const OptionalMode& mode = OptionalMode::HYBRID) -> typename std::shared_ptr>; - static ParticipantPtr participant(); + ParticipantPtr participant() const { return participant_; } private: - static ParticipantPtr CreateParticipant(); + void CreateParticipant(); - static ParticipantPtr participant_; + std::atomic is_shutdown_; + ParticipantPtr participant_; + IntraDispatcherPtr intra_dispatcher_; + ShmDispatcherPtr shm_dispatcher_; + RtpsDispatcherPtr rtps_dispatcher_; + + DECLARE_SINGLETON(Transport) }; template auto Transport::CreateTransmitter(const RoleAttributes& attr, const OptionalMode& mode) -> typename std::shared_ptr> { + if (is_shutdown_.load()) { + AINFO << "transport has been shut down."; + return nullptr; + } std::shared_ptr> transmitter = nullptr; RoleAttributes modified_attr = attr; if (!modified_attr.has_qos_profile()) { @@ -109,6 +122,10 @@ auto Transport::CreateReceiver( const RoleAttributes& attr, const typename Receiver::MessageListener& msg_listener, const OptionalMode& mode) -> typename std::shared_ptr> { + if (is_shutdown_.load()) { + AINFO << "transport has been shut down."; + return nullptr; + } std::shared_ptr> receiver = nullptr; RoleAttributes modified_attr = attr; if (!modified_attr.has_qos_profile()) { diff --git a/cyber/transport/transport_test.cc b/cyber/transport/transport_test.cc index 15e8b190abe89142225887099a5e7a276d93f480..d84865e2c740ec0c3cc183902da98e796d5b53c5 100644 --- a/cyber/transport/transport_test.cc +++ b/cyber/transport/transport_test.cc @@ -30,9 +30,9 @@ using TransmitterPtr = std::shared_ptr>; using ReceiverPtr = std::shared_ptr>; TEST(TransportTest, constructor) { - Transport transport_a; - Transport transport_b; - EXPECT_EQ(transport_a.participant(), transport_b.participant()); + auto transport_a = Transport::Instance(); + auto transport_b = Transport::Instance(); + EXPECT_EQ(transport_a->participant(), transport_b->participant()); } TEST(TransportTest, create_transmitter) { @@ -45,11 +45,13 @@ TEST(TransportTest, create_transmitter) { attr.set_id(id.HashValue()); TransmitterPtr intra = - Transport::CreateTransmitter(attr, OptionalMode::INTRA); + Transport::Instance()->CreateTransmitter( + attr, OptionalMode::INTRA); EXPECT_EQ(typeid(*intra), typeid(IntraTransmitter)); TransmitterPtr shm = - Transport::CreateTransmitter(attr, OptionalMode::SHM); + Transport::Instance()->CreateTransmitter( + attr, OptionalMode::SHM); EXPECT_EQ(typeid(*shm), typeid(ShmTransmitter)); } @@ -62,11 +64,11 @@ TEST(TransportTest, create_receiver) { auto listener = [](const std::shared_ptr&, const MessageInfo&, const RoleAttributes&) {}; - ReceiverPtr intra = Transport::CreateReceiver( + ReceiverPtr intra = Transport::Instance()->CreateReceiver( attr, listener, OptionalMode::INTRA); EXPECT_EQ(typeid(*intra), typeid(IntraReceiver)); - ReceiverPtr shm = Transport::CreateReceiver( + ReceiverPtr shm = Transport::Instance()->CreateReceiver( attr, listener, OptionalMode::SHM); EXPECT_EQ(typeid(*shm), typeid(ShmReceiver)); } @@ -77,7 +79,8 @@ TEST(TransportTest, create_receiver) { int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); + apollo::cyber::transport::Transport::Instance(); auto res = RUN_ALL_TESTS(); - apollo::cyber::transport::Transport::Shutdown(); + apollo::cyber::transport::Transport::Instance()->Shutdown(); return res; }