提交 a03a9469 编写于 作者: G gruminions 提交者: Jiangtao Hu

framework: decouple transport and rtps_dispatcher

上级 eda23af0
......@@ -78,6 +78,7 @@ bool Init() {
// Initialize internal static objects
common::GlobalData::Instance();
transport::Transport::Instance();
service_discovery::TopologyManager::Instance();
scheduler::Scheduler::Instance();
TaskManager::Instance();
......@@ -152,7 +153,7 @@ void Shutdown() {
TaskManager::Instance()->Shutdown();
scheduler::Scheduler::Instance()->ShutDown();
service_discovery::TopologyManager::Instance()->Shutdown();
transport::Transport::Shutdown();
transport::Transport::Instance()->Shutdown();
SetState(STATE_SHUTDOWN);
}
......
......@@ -96,7 +96,7 @@ auto ReceiverManager<MessageT>::GetReceiver(
const std::string& channel_name = role_attr.channel_name();
if (receiver_map_.count(channel_name) == 0) {
receiver_map_[channel_name] =
transport::Transport::CreateReceiver<MessageT>(
transport::Transport::Instance()->CreateReceiver<MessageT>(
role_attr, [](const std::shared_ptr<MessageT>& msg,
const transport::MessageInfo& msg_info,
const proto::RoleAttributes& reader_attr) {
......
......@@ -71,7 +71,8 @@ bool Writer<MessageT>::Init() {
if (init_.exchange(true)) {
return true;
}
transmitter_ = transport::Transport::CreateTransmitter<MessageT>(role_attr_);
transmitter_ =
transport::Transport::Instance()->CreateTransmitter<MessageT>(role_attr_);
RETURN_VAL_IF_NULL(transmitter_, false);
this->role_attr_.set_id(transmitter_->id().HashValue());
channel_manager_ =
......
......@@ -115,8 +115,9 @@ bool Client<Request, Response>::Init() {
role.set_channel_id(channel_id);
role.mutable_qos_profile()->CopyFrom(
transport::QosProfileConf::QOS_PROFILE_SERVICES_DEFAULT);
request_transmitter_ = transport::Transport::CreateTransmitter<Request>(
role, proto::OptionalMode::RTPS);
auto transport = transport::Transport::Instance();
request_transmitter_ =
transport->CreateTransmitter<Request>(role, proto::OptionalMode::RTPS);
if (request_transmitter_ == nullptr) {
AERROR << "Create request pub failed.";
return false;
......@@ -130,7 +131,7 @@ bool Client<Request, Response>::Init() {
role.set_channel_name(response_channel_);
channel_id = common::GlobalData::RegisterChannel(response_channel_);
role.set_channel_id(channel_id);
response_receiver_ = transport::Transport::CreateReceiver<Response>(
response_receiver_ = transport->CreateReceiver<Response>(
role,
[=](const std::shared_ptr<Response>& request,
const transport::MessageInfo& message_info,
......
......@@ -90,8 +90,9 @@ bool Service<Request, Response>::Init() {
role.set_channel_id(channel_id);
role.mutable_qos_profile()->CopyFrom(
transport::QosProfileConf::QOS_PROFILE_SERVICES_DEFAULT);
response_transmitter_ = transport::Transport::CreateTransmitter<Response>(
role, proto::OptionalMode::RTPS);
auto transport = transport::Transport::Instance();
response_transmitter_ =
transport->CreateTransmitter<Response>(role, proto::OptionalMode::RTPS);
if (response_transmitter_ == nullptr) {
AERROR << " Create response pub failed.";
return false;
......@@ -104,7 +105,7 @@ bool Service<Request, Response>::Init() {
role.set_channel_name(request_channel_);
channel_id = common::GlobalData::RegisterChannel(request_channel_);
role.set_channel_id(channel_id);
request_receiver_ = transport::Transport::CreateReceiver<Request>(
request_receiver_ = transport->CreateReceiver<Request>(
role,
[=](const std::shared_ptr<Request>& request,
const transport::MessageInfo& message_info,
......
......@@ -48,13 +48,13 @@ class HybridTransceiverTest : public ::testing::Test {
attr.set_channel_id(common::Hash(channel_name_));
attr.mutable_qos_profile()->CopyFrom(QosProfileConf::QOS_PROFILE_DEFAULT);
transmitter_a_ = std::make_shared<HybridTransmitter<proto::UnitTest>>(
attr, Transport::participant());
attr, Transport::Instance()->participant());
attr.set_process_id(54321);
attr.mutable_qos_profile()->CopyFrom(
QosProfileConf::QOS_PROFILE_TOPO_CHANGE);
transmitter_b_ = std::make_shared<HybridTransmitter<proto::UnitTest>>(
attr, Transport::participant());
attr, Transport::Instance()->participant());
}
virtual void TearDown() {
......@@ -71,9 +71,9 @@ TEST_F(HybridTransceiverTest, constructor) {
RoleAttributes attr;
TransmitterPtr transmitter =
std::make_shared<HybridTransmitter<proto::UnitTest>>(
attr, Transport::participant());
attr, Transport::Instance()->participant());
ReceiverPtr receiver = std::make_shared<HybridReceiver<proto::UnitTest>>(
attr, nullptr, Transport::participant());
attr, nullptr, Transport::Instance()->participant());
EXPECT_EQ(transmitter->seq_num(), 0);
......@@ -103,7 +103,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_no_param) {
(void)attr;
msgs.emplace_back(*msg);
},
Transport::participant());
Transport::Instance()->participant());
receiver->Enable();
......@@ -155,7 +155,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_no_relation) {
std::lock_guard<std::mutex> lock(mtx);
msgs.emplace_back(*msg);
},
Transport::participant());
Transport::Instance()->participant());
attr.mutable_qos_profile()->CopyFrom(QosProfileConf::QOS_PROFILE_TOPO_CHANGE);
ReceiverPtr receiver_b = std::make_shared<HybridReceiver<proto::UnitTest>>(
......@@ -167,7 +167,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_no_relation) {
std::lock_guard<std::mutex> lock(mtx);
msgs.emplace_back(*msg);
},
Transport::participant());
Transport::Instance()->participant());
auto msg = std::make_shared<proto::UnitTest>();
msg->set_class_name("HybridTransceiverTest");
......@@ -208,7 +208,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_same_process) {
std::lock_guard<std::mutex> lock(mtx);
msgs.emplace_back(*msg);
},
Transport::participant());
Transport::Instance()->participant());
attr.mutable_qos_profile()->CopyFrom(QosProfileConf::QOS_PROFILE_TOPO_CHANGE);
ReceiverPtr receiver_b = std::make_shared<HybridReceiver<proto::UnitTest>>(
......@@ -220,7 +220,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_same_process) {
std::lock_guard<std::mutex> lock(mtx);
msgs.emplace_back(*msg);
},
Transport::participant());
Transport::Instance()->participant());
std::string class_name("HybridTransceiverTest");
std::string case_name("enable_and_disable_with_param_same_process");
......@@ -280,7 +280,7 @@ TEST_F(HybridTransceiverTest,
std::lock_guard<std::mutex> lock(mtx);
msgs.emplace_back(*msg);
},
Transport::participant());
Transport::Instance()->participant());
attr.mutable_qos_profile()->CopyFrom(QosProfileConf::QOS_PROFILE_TOPO_CHANGE);
ReceiverPtr receiver_b = std::make_shared<HybridReceiver<proto::UnitTest>>(
......@@ -292,7 +292,7 @@ TEST_F(HybridTransceiverTest,
std::lock_guard<std::mutex> lock(mtx);
msgs.emplace_back(*msg);
},
Transport::participant());
Transport::Instance()->participant());
std::string class_name("HybridTransceiverTest");
std::string case_name("enable_and_disable_with_param_same_host_diff_proc");
......@@ -348,7 +348,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_diff_host) {
std::lock_guard<std::mutex> lock(mtx);
msgs.emplace_back(*msg);
},
Transport::participant());
Transport::Instance()->participant());
attr.mutable_qos_profile()->CopyFrom(QosProfileConf::QOS_PROFILE_TOPO_CHANGE);
ReceiverPtr receiver_b = std::make_shared<HybridReceiver<proto::UnitTest>>(
......@@ -360,7 +360,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_diff_host) {
std::lock_guard<std::mutex> lock(mtx);
msgs.emplace_back(*msg);
},
Transport::participant());
Transport::Instance()->participant());
std::string class_name("HybridTransceiverTest");
std::string case_name("enable_and_disable_with_param_same_host_diff_proc");
......
......@@ -74,7 +74,7 @@ TEST(RtpsDispatcherTest, on_message) {
recv_msg->CopyFrom(*msg);
});
auto transmitter = Transport::CreateTransmitter<proto::Chatter>(
auto transmitter = Transport::Instance()->CreateTransmitter<proto::Chatter>(
self_attr, proto::OptionalMode::RTPS);
EXPECT_TRUE(transmitter != nullptr);
......
......@@ -42,9 +42,9 @@ class RtpsTransceiverTest : public ::testing::Test {
attr.set_channel_name(channel_name_);
attr.set_channel_id(common::Hash(channel_name_));
transmitter_a_ = std::make_shared<RtpsTransmitter<proto::UnitTest>>(
attr, Transport::participant());
attr, Transport::Instance()->participant());
transmitter_b_ = std::make_shared<RtpsTransmitter<proto::UnitTest>>(
attr, Transport::participant());
attr, Transport::Instance()->participant());
transmitter_a_->Enable();
transmitter_b_->Enable();
......@@ -64,7 +64,7 @@ TEST_F(RtpsTransceiverTest, constructor) {
RoleAttributes attr;
TransmitterPtr transmitter =
std::make_shared<RtpsTransmitter<proto::UnitTest>>(
attr, Transport::participant());
attr, Transport::Instance()->participant());
ReceiverPtr receiver =
std::make_shared<RtpsReceiver<proto::UnitTest>>(attr, nullptr);
......
......@@ -66,8 +66,9 @@ TEST(ShmDispatcherTest, on_message) {
Identity oppo_id;
oppo_attr.set_id(oppo_id.HashValue());
auto transmitter = Transport::CreateTransmitter<message::RawMessage>(
oppo_attr, proto::OptionalMode::SHM);
auto transmitter =
Transport::Instance()->CreateTransmitter<message::RawMessage>(
oppo_attr, proto::OptionalMode::SHM);
EXPECT_TRUE(transmitter != nullptr);
auto send_msg = std::make_shared<message::RawMessage>("raw_message");
......
......@@ -30,9 +30,9 @@ using TransmitterPtr = std::shared_ptr<Transmitter<proto::UnitTest>>;
using ReceiverPtr = std::shared_ptr<Receiver<proto::UnitTest>>;
TEST(TransportTest, constructor) {
Transport transport_a;
Transport transport_b;
EXPECT_EQ(transport_a.participant(), transport_b.participant());
auto transport_a = Transport::Instance();
auto transport_b = Transport::Instance();
EXPECT_EQ(transport_a->participant(), transport_b->participant());
}
TEST(TransportTest, create_transmitter) {
......@@ -45,19 +45,23 @@ TEST(TransportTest, create_transmitter) {
attr.set_id(id.HashValue());
TransmitterPtr intra =
Transport::CreateTransmitter<proto::UnitTest>(attr, OptionalMode::INTRA);
Transport::Instance()->CreateTransmitter<proto::UnitTest>(
attr, OptionalMode::INTRA);
EXPECT_EQ(typeid(*intra), typeid(IntraTransmitter<proto::UnitTest>));
TransmitterPtr shm =
Transport::CreateTransmitter<proto::UnitTest>(attr, OptionalMode::SHM);
Transport::Instance()->CreateTransmitter<proto::UnitTest>(
attr, OptionalMode::SHM);
EXPECT_EQ(typeid(*shm), typeid(ShmTransmitter<proto::UnitTest>));
TransmitterPtr rtps =
Transport::CreateTransmitter<proto::UnitTest>(attr, OptionalMode::RTPS);
Transport::Instance()->CreateTransmitter<proto::UnitTest>(
attr, OptionalMode::RTPS);
EXPECT_EQ(typeid(*rtps), typeid(RtpsTransmitter<proto::UnitTest>));
attr.mutable_qos_profile()->CopyFrom(QosProfileConf::QOS_PROFILE_DEFAULT);
TransmitterPtr hybrid = Transport::CreateTransmitter<proto::UnitTest>(attr);
TransmitterPtr hybrid =
Transport::Instance()->CreateTransmitter<proto::UnitTest>(attr);
EXPECT_EQ(typeid(*hybrid), typeid(HybridTransmitter<proto::UnitTest>));
}
......@@ -70,21 +74,21 @@ TEST(TransportTest, create_receiver) {
auto listener = [](const std::shared_ptr<proto::UnitTest>&,
const MessageInfo&, const RoleAttributes&) {};
ReceiverPtr intra = Transport::CreateReceiver<proto::UnitTest>(
ReceiverPtr intra = Transport::Instance()->CreateReceiver<proto::UnitTest>(
attr, listener, OptionalMode::INTRA);
EXPECT_EQ(typeid(*intra), typeid(IntraReceiver<proto::UnitTest>));
ReceiverPtr shm = Transport::CreateReceiver<proto::UnitTest>(
ReceiverPtr shm = Transport::Instance()->CreateReceiver<proto::UnitTest>(
attr, listener, OptionalMode::SHM);
EXPECT_EQ(typeid(*shm), typeid(ShmReceiver<proto::UnitTest>));
ReceiverPtr rtps = Transport::CreateReceiver<proto::UnitTest>(
ReceiverPtr rtps = Transport::Instance()->CreateReceiver<proto::UnitTest>(
attr, listener, OptionalMode::RTPS);
EXPECT_EQ(typeid(*rtps), typeid(RtpsReceiver<proto::UnitTest>));
attr.mutable_qos_profile()->CopyFrom(QosProfileConf::QOS_PROFILE_DEFAULT);
ReceiverPtr hybrid =
Transport::CreateReceiver<proto::UnitTest>(attr, listener);
Transport::Instance()->CreateReceiver<proto::UnitTest>(attr, listener);
EXPECT_EQ(typeid(*hybrid), typeid(HybridReceiver<proto::UnitTest>));
}
......
......@@ -6,15 +6,14 @@ cc_library(
name = "transport",
srcs = [
"transport.cc",
"dispatcher/rtps_dispatcher.cc",
],
hdrs = [
"transport.h",
"dispatcher/rtps_dispatcher.h",
],
deps = [
"history",
"intra_dispatcher",
"rtps_dispatcher",
"shm_dispatcher",
"hybrid_receiver",
"intra_receiver",
......@@ -137,6 +136,20 @@ cc_test(
],
)
cc_library(
name = "rtps_dispatcher",
srcs = [ "dispatcher/rtps_dispatcher.cc", ],
hdrs = [ "dispatcher/rtps_dispatcher.h", ],
deps = [
"//cyber/proto:role_attributes_cc_proto",
"//cyber/message:message_traits",
"dispatcher",
"attributes_filler",
"sub_listener",
"participant",
],
)
cc_test(
name = "rtps_dispatcher_test",
size = "small",
......@@ -146,7 +159,7 @@ cc_test(
deps = [
"//cyber:cyber_core",
"//cyber/proto:chatter_cc_proto",
"@gtest//:main",
"@gtest",
],
)
......
......@@ -20,11 +20,14 @@ namespace apollo {
namespace cyber {
namespace transport {
Dispatcher::Dispatcher() : shutdown_(false) {}
Dispatcher::Dispatcher() : is_shutdown_(false) {}
Dispatcher::~Dispatcher() { Shutdown(); }
void Dispatcher::Shutdown() { ADEBUG << "Shutdown"; }
void Dispatcher::Shutdown() {
is_shutdown_.exchange(true);
ADEBUG << "Shutdown";
}
bool Dispatcher::HasChannel(uint64_t channel_id) {
return msg_listeners_.Has(channel_id);
......
......@@ -17,6 +17,7 @@
#ifndef CYBER_TRANSPORT_DISPATCHER_DISPATCHER_H_
#define CYBER_TRANSPORT_DISPATCHER_DISPATCHER_H_
#include <atomic>
#include <functional>
#include <iostream>
#include <memory>
......@@ -76,7 +77,7 @@ class Dispatcher {
bool HasChannel(uint64_t channel_id);
protected:
bool shutdown_;
std::atomic<bool> is_shutdown_;
// key: channel_id of message
AtomicHashMap<uint64_t, ListenerHandlerBasePtr> msg_listeners_;
base::AtomicRWLock rw_lock_;
......@@ -85,6 +86,9 @@ class Dispatcher {
template <typename MessageT>
void Dispatcher::AddListener(const RoleAttributes& self_attr,
const MessageListener<MessageT>& listener) {
if (is_shutdown_.load()) {
return;
}
uint64_t channel_id = self_attr.channel_id();
std::shared_ptr<ListenerHandler<MessageT>> handler;
......@@ -105,6 +109,9 @@ template <typename MessageT>
void Dispatcher::AddListener(const RoleAttributes& self_attr,
const RoleAttributes& opposite_attr,
const MessageListener<MessageT>& listener) {
if (is_shutdown_.load()) {
return;
}
uint64_t channel_id = self_attr.channel_id();
std::shared_ptr<ListenerHandler<MessageT>> handler;
......@@ -123,6 +130,9 @@ void Dispatcher::AddListener(const RoleAttributes& self_attr,
template <typename MessageT>
void Dispatcher::RemoveListener(const RoleAttributes& self_attr) {
if (is_shutdown_.load()) {
return;
}
uint64_t channel_id = self_attr.channel_id();
ListenerHandlerBasePtr* handler_base = nullptr;
......@@ -134,6 +144,9 @@ void Dispatcher::RemoveListener(const RoleAttributes& self_attr) {
template <typename MessageT>
void Dispatcher::RemoveListener(const RoleAttributes& self_attr,
const RoleAttributes& opposite_attr) {
if (is_shutdown_.load()) {
return;
}
uint64_t channel_id = self_attr.channel_id();
ListenerHandlerBasePtr* handler_base = nullptr;
......
......@@ -49,6 +49,9 @@ template <typename MessageT>
void IntraDispatcher::OnMessage(uint64_t channel_id,
const std::shared_ptr<MessageT>& message,
const MessageInfo& message_info) {
if (is_shutdown_.load()) {
return;
}
ADEBUG << "intra on message, channel:"
<< common::GlobalData::GetChannelById(channel_id);
ListenerHandlerBasePtr* handler_base = nullptr;
......
......@@ -16,29 +16,35 @@
#include "cyber/transport/dispatcher/rtps_dispatcher.h"
#include "cyber/transport/transport.h"
namespace apollo {
namespace cyber {
namespace transport {
RtpsDispatcher::RtpsDispatcher() {}
RtpsDispatcher::RtpsDispatcher() : participant_(nullptr) {}
RtpsDispatcher::~RtpsDispatcher() { Shutdown(); }
void RtpsDispatcher::Shutdown() {
if (shutdown_) {
if (is_shutdown_.exchange(true)) {
return;
}
shutdown_ = true;
std::lock_guard<std::mutex> lock(subs_mutex_);
for (auto& item : subs_) {
item.second.sub = nullptr;
{
std::lock_guard<std::mutex> lock(subs_mutex_);
for (auto& item : subs_) {
item.second.sub = nullptr;
}
}
participant_ = nullptr;
}
void RtpsDispatcher::AddSubscriber(const RoleAttributes& self_attr) {
if (participant_ == nullptr) {
AWARN << "please set participant firstly.";
return;
}
uint64_t channel_id = self_attr.channel_id();
std::lock_guard<std::mutex> lock(subs_mutex_);
if (subs_.count(channel_id) > 0) {
......@@ -55,7 +61,7 @@ void RtpsDispatcher::AddSubscriber(const RoleAttributes& self_attr) {
std::placeholders::_2, std::placeholders::_3));
new_sub.sub = eprosima::fastrtps::Domain::createSubscriber(
Transport::participant()->fastrtps_participant(), sub_attr,
participant_->fastrtps_participant(), sub_attr,
new_sub.sub_listener.get());
RETURN_IF_NULL(new_sub.sub);
subs_[channel_id] = new_sub;
......@@ -64,6 +70,10 @@ void RtpsDispatcher::AddSubscriber(const RoleAttributes& self_attr) {
void RtpsDispatcher::OnMessage(uint64_t channel_id,
const std::shared_ptr<std::string>& msg_str,
const MessageInfo& msg_info) {
if (is_shutdown_.load()) {
return;
}
ListenerHandlerBasePtr* handler_base = nullptr;
if (msg_listeners_.Get(channel_id, &handler_base)) {
auto handler =
......
......@@ -28,6 +28,7 @@
#include "cyber/message/message_traits.h"
#include "cyber/transport/dispatcher/dispatcher.h"
#include "cyber/transport/rtps/attributes_filler.h"
#include "cyber/transport/rtps/participant.h"
#include "cyber/transport/rtps/sub_listener.h"
namespace apollo {
......@@ -59,6 +60,10 @@ class RtpsDispatcher : public Dispatcher {
const RoleAttributes& opposite_attr,
const MessageListener<MessageT>& listener);
void set_participant(const ParticipantPtr& participant) {
participant_ = participant;
}
private:
void OnMessage(uint64_t channel_id,
const std::shared_ptr<std::string>& msg_str,
......@@ -68,6 +73,8 @@ class RtpsDispatcher : public Dispatcher {
std::unordered_map<uint64_t, Subscriber> subs_;
std::mutex subs_mutex_;
ParticipantPtr participant_;
DECLARE_SINGLETON(RtpsDispatcher)
};
......@@ -75,8 +82,8 @@ template <typename MessageT>
void RtpsDispatcher::AddListener(const RoleAttributes& self_attr,
const MessageListener<MessageT>& listener) {
auto listener_adapter = [listener](
const std::shared_ptr<std::string>& msg_str,
const MessageInfo& msg_info) {
const std::shared_ptr<std::string>& msg_str,
const MessageInfo& msg_info) {
auto msg = std::make_shared<MessageT>();
RETURN_IF(!message::ParseFromString(*msg_str, msg.get()));
listener(msg, msg_info);
......@@ -91,8 +98,8 @@ void RtpsDispatcher::AddListener(const RoleAttributes& self_attr,
const RoleAttributes& opposite_attr,
const MessageListener<MessageT>& listener) {
auto listener_adapter = [listener](
const std::shared_ptr<std::string>& msg_str,
const MessageInfo& msg_info) {
const std::shared_ptr<std::string>& msg_str,
const MessageInfo& msg_info) {
auto msg = std::make_shared<MessageT>();
RETURN_IF(!message::ParseFromString(*msg_str, msg.get()));
listener(msg, msg_info);
......
......@@ -75,7 +75,7 @@ TEST(RtpsDispatcherTest, on_message) {
recv_msg->CopyFrom(*msg);
});
auto transmitter = Transport::CreateTransmitter<proto::Chatter>(
auto transmitter = Transport::Instance()->CreateTransmitter<proto::Chatter>(
self_attr, proto::OptionalMode::RTPS);
EXPECT_TRUE(transmitter != nullptr);
......@@ -105,3 +105,11 @@ TEST(RtpsDispatcherTest, shutdown) {
} // namespace transport
} // namespace cyber
} // namespace apollo
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
apollo::cyber::transport::Transport::Instance();
auto res = RUN_ALL_TESTS();
apollo::cyber::transport::Transport::Instance()->Shutdown();
return res;
}
......@@ -30,10 +30,10 @@ ShmDispatcher::ShmDispatcher() : host_id_(0), sfd_(-1) { Init(); }
ShmDispatcher::~ShmDispatcher() { Shutdown(); }
void ShmDispatcher::Shutdown() {
if (shutdown_) {
if (is_shutdown_.exchange(true)) {
return;
}
shutdown_ = true;
segments_.clear();
if (thread_.joinable()) {
thread_.join();
......@@ -54,6 +54,9 @@ void ShmDispatcher::AddSegment(const RoleAttributes& self_attr) {
void ShmDispatcher::OnMessage(uint64_t channel_id,
const std::shared_ptr<std::string>& msg_str,
const MessageInfo& msg_info) {
if (is_shutdown_.load()) {
return;
}
ListenerHandlerBasePtr* handler_base = nullptr;
if (msg_listeners_.Get(channel_id, &handler_base)) {
auto handler =
......@@ -69,7 +72,7 @@ void ShmDispatcher::ThreadFunc() {
std::string msg_info_str("");
std::shared_ptr<std::string> msg_str_ptr = std::make_shared<std::string>();
while (!shutdown_) {
while (!is_shutdown_.load()) {
int read_bytes =
recvfrom(sfd_, buf, 1024, 0, (struct sockaddr*)&local_addr_,
reinterpret_cast<socklen_t*>(&addr_len));
......
......@@ -67,8 +67,9 @@ TEST(ShmDispatcherTest, on_message) {
Identity oppo_id;
oppo_attr.set_id(oppo_id.HashValue());
auto transmitter = Transport::CreateTransmitter<message::RawMessage>(
oppo_attr, proto::OptionalMode::SHM);
auto transmitter =
Transport::Instance()->CreateTransmitter<message::RawMessage>(
oppo_attr, proto::OptionalMode::SHM);
EXPECT_TRUE(transmitter != nullptr);
auto send_msg = std::make_shared<message::RawMessage>("raw_message");
......
......@@ -48,6 +48,10 @@ void Participant::Shutdown() {
}
eprosima::fastrtps::Participant* Participant::fastrtps_participant() {
if (shutdown_.load()) {
return nullptr;
}
std::lock_guard<std::mutex> lck(mutex_);
if (fastrtps_participant_ != nullptr) {
return fastrtps_participant_;
......
......@@ -50,12 +50,12 @@ class HybridTransceiverTest : public ::testing::Test {
attr.set_channel_id(common::Hash(channel_name_));
attr.mutable_qos_profile()->CopyFrom(QosProfileConf::QOS_PROFILE_DEFAULT);
transmitter_a_ = std::make_shared<HybridTransmitter<proto::UnitTest>>(
attr, Transport::participant());
attr, Transport::Instance()->participant());
attr.set_process_id(common::GlobalData::Instance()->ProcessId() + 1);
attr.mutable_qos_profile()->CopyFrom(QosProfileConf::QOS_PROFILE_DEFAULT);
transmitter_b_ = std::make_shared<HybridTransmitter<proto::UnitTest>>(
attr, Transport::participant());
attr, Transport::Instance()->participant());
}
virtual void TearDown() {
......@@ -72,9 +72,9 @@ TEST_F(HybridTransceiverTest, constructor) {
RoleAttributes attr;
TransmitterPtr transmitter =
std::make_shared<HybridTransmitter<proto::UnitTest>>(
attr, Transport::participant());
attr, Transport::Instance()->participant());
ReceiverPtr receiver = std::make_shared<HybridReceiver<proto::UnitTest>>(
attr, nullptr, Transport::participant());
attr, nullptr, Transport::Instance()->participant());
EXPECT_EQ(transmitter->seq_num(), 0);
......@@ -104,7 +104,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_no_param) {
(void)attr;
msgs.emplace_back(*msg);
},
Transport::participant());
Transport::Instance()->participant());
receiver->Enable();
......@@ -156,7 +156,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_no_relation) {
std::lock_guard<std::mutex> lock(mtx);
msgs.emplace_back(*msg);
},
Transport::participant());
Transport::Instance()->participant());
ReceiverPtr receiver_b = std::make_shared<HybridReceiver<proto::UnitTest>>(
attr,
......@@ -167,7 +167,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_no_relation) {
std::lock_guard<std::mutex> lock(mtx);
msgs.emplace_back(*msg);
},
Transport::participant());
Transport::Instance()->participant());
auto msg = std::make_shared<proto::UnitTest>();
msg->set_class_name("HybridTransceiverTest");
......@@ -208,7 +208,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_same_process) {
std::lock_guard<std::mutex> lock(mtx);
msgs.emplace_back(*msg);
},
Transport::participant());
Transport::Instance()->participant());
ReceiverPtr receiver_b = std::make_shared<HybridReceiver<proto::UnitTest>>(
attr,
......@@ -219,7 +219,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_same_process) {
std::lock_guard<std::mutex> lock(mtx);
msgs.emplace_back(*msg);
},
Transport::participant());
Transport::Instance()->participant());
std::string class_name("HybridTransceiverTest");
std::string case_name("enable_and_disable_with_param_same_process");
......@@ -279,7 +279,7 @@ TEST_F(HybridTransceiverTest,
std::lock_guard<std::mutex> lock(mtx);
msgs.emplace_back(*msg);
},
Transport::participant());
Transport::Instance()->participant());
ReceiverPtr receiver_b = std::make_shared<HybridReceiver<proto::UnitTest>>(
attr,
......@@ -290,7 +290,7 @@ TEST_F(HybridTransceiverTest,
std::lock_guard<std::mutex> lock(mtx);
msgs.emplace_back(*msg);
},
Transport::participant());
Transport::Instance()->participant());
std::string class_name("HybridTransceiverTest");
std::string case_name("enable_and_disable_with_param_same_host_diff_proc");
......@@ -346,7 +346,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_diff_host) {
std::lock_guard<std::mutex> lock(mtx);
msgs.emplace_back(*msg);
},
Transport::participant());
Transport::Instance()->participant());
ReceiverPtr receiver_b = std::make_shared<HybridReceiver<proto::UnitTest>>(
attr,
......@@ -357,7 +357,7 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_diff_host) {
std::lock_guard<std::mutex> lock(mtx);
msgs.emplace_back(*msg);
},
Transport::participant());
Transport::Instance()->participant());
std::string class_name("HybridTransceiverTest");
std::string case_name("enable_and_disable_with_param_same_host_diff_proc");
......@@ -395,7 +395,8 @@ TEST_F(HybridTransceiverTest, enable_and_disable_with_param_diff_host) {
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
apollo::cyber::transport::Transport::Instance();
auto res = RUN_ALL_TESTS();
apollo::cyber::transport::Transport::Shutdown();
apollo::cyber::transport::Transport::Instance()->Shutdown();
return res;
}
......@@ -44,9 +44,9 @@ class RtpsTransceiverTest : public ::testing::Test {
attr.set_channel_name(channel_name_);
attr.set_channel_id(common::Hash(channel_name_));
transmitter_a_ = std::make_shared<RtpsTransmitter<proto::UnitTest>>(
attr, Transport::participant());
attr, Transport::Instance()->participant());
transmitter_b_ = std::make_shared<RtpsTransmitter<proto::UnitTest>>(
attr, Transport::participant());
attr, Transport::Instance()->participant());
transmitter_a_->Enable();
transmitter_b_->Enable();
......@@ -66,7 +66,7 @@ TEST_F(RtpsTransceiverTest, constructor) {
RoleAttributes attr;
TransmitterPtr transmitter =
std::make_shared<RtpsTransmitter<proto::UnitTest>>(
attr, Transport::participant());
attr, Transport::Instance()->participant());
ReceiverPtr receiver =
std::make_shared<RtpsReceiver<proto::UnitTest>>(attr, nullptr);
......@@ -154,7 +154,8 @@ TEST_F(RtpsTransceiverTest, enable_and_disable) {
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
apollo::cyber::transport::Transport::Instance();
auto res = RUN_ALL_TESTS();
apollo::cyber::transport::Transport::Shutdown();
apollo::cyber::transport::Transport::Instance()->Shutdown();
return res;
}
......@@ -16,50 +16,45 @@
#include "cyber/transport/transport.h"
#include <atomic>
#include <mutex>
#include "cyber/common/global_data.h"
namespace apollo {
namespace cyber {
namespace transport {
static std::atomic<bool> shutdown_ = {false};
static std::mutex participant_mutex_;
ParticipantPtr Transport::participant_ = nullptr;
Transport::Transport() {}
Transport::Transport()
: is_shutdown_(false),
participant_(nullptr),
intra_dispatcher_(nullptr),
shm_dispatcher_(nullptr),
rtps_dispatcher_(nullptr) {
CreateParticipant();
intra_dispatcher_ = IntraDispatcher::Instance();
shm_dispatcher_ = ShmDispatcher::Instance();
rtps_dispatcher_ = RtpsDispatcher::Instance();
rtps_dispatcher_->set_participant(participant_);
}
Transport::~Transport() {}
Transport::~Transport() { Shutdown(); }
void Transport::Shutdown() {
if (shutdown_.exchange(true)) {
if (is_shutdown_.exchange(true)) {
return;
}
intra_dispatcher_->Shutdown();
shm_dispatcher_->Shutdown();
rtps_dispatcher_->Shutdown();
if (participant_ != nullptr) {
participant_->Shutdown();
participant_ = nullptr;
}
}
ParticipantPtr Transport::CreateParticipant() {
void Transport::CreateParticipant() {
std::string participant_name =
common::GlobalData::Instance()->HostName() + "+" +
std::to_string(common::GlobalData::Instance()->ProcessId());
return std::make_shared<Participant>(participant_name, 11512);
}
ParticipantPtr Transport::participant() {
if (participant_ != nullptr) {
return participant_;
}
std::lock_guard<std::mutex> lck(participant_mutex_);
if (participant_ == nullptr) {
participant_ = CreateParticipant();
}
return participant_;
participant_ = std::make_shared<Participant>(participant_name, 11512);
}
} // namespace transport
......
......@@ -17,10 +17,15 @@
#ifndef CYBER_TRANSPORT_TRANSPORT_H_
#define CYBER_TRANSPORT_TRANSPORT_H_
#include <atomic>
#include <memory>
#include <string>
#include "cyber/common/macros.h"
#include "cyber/proto/transport_conf.pb.h"
#include "cyber/transport/dispatcher/intra_dispatcher.h"
#include "cyber/transport/dispatcher/rtps_dispatcher.h"
#include "cyber/transport/dispatcher/shm_dispatcher.h"
#include "cyber/transport/qos/qos_profile_conf.h"
#include "cyber/transport/receiver/hybrid_receiver.h"
#include "cyber/transport/receiver/intra_receiver.h"
......@@ -42,35 +47,43 @@ using apollo::cyber::proto::OptionalMode;
class Transport {
public:
Transport();
virtual ~Transport();
static void Shutdown();
void Shutdown();
template <typename M>
static auto CreateTransmitter(const RoleAttributes& attr,
const OptionalMode& mode = OptionalMode::HYBRID)
-> typename std::shared_ptr<Transmitter<M>>;
auto CreateTransmitter(const RoleAttributes& attr,
const OptionalMode& mode = OptionalMode::HYBRID) ->
typename std::shared_ptr<Transmitter<M>>;
template <typename M>
static auto CreateReceiver(
const RoleAttributes& attr,
const typename Receiver<M>::MessageListener& msg_listener,
const OptionalMode& mode = OptionalMode::HYBRID) ->
auto CreateReceiver(const RoleAttributes& attr,
const typename Receiver<M>::MessageListener& msg_listener,
const OptionalMode& mode = OptionalMode::HYBRID) ->
typename std::shared_ptr<Receiver<M>>;
static ParticipantPtr participant();
ParticipantPtr participant() const { return participant_; }
private:
static ParticipantPtr CreateParticipant();
void CreateParticipant();
static ParticipantPtr participant_;
std::atomic<bool> is_shutdown_;
ParticipantPtr participant_;
IntraDispatcherPtr intra_dispatcher_;
ShmDispatcherPtr shm_dispatcher_;
RtpsDispatcherPtr rtps_dispatcher_;
DECLARE_SINGLETON(Transport)
};
template <typename M>
auto Transport::CreateTransmitter(const RoleAttributes& attr,
const OptionalMode& mode) ->
typename std::shared_ptr<Transmitter<M>> {
if (is_shutdown_.load()) {
AINFO << "transport has been shut down.";
return nullptr;
}
std::shared_ptr<Transmitter<M>> transmitter = nullptr;
RoleAttributes modified_attr = attr;
if (!modified_attr.has_qos_profile()) {
......@@ -109,6 +122,10 @@ auto Transport::CreateReceiver(
const RoleAttributes& attr,
const typename Receiver<M>::MessageListener& msg_listener,
const OptionalMode& mode) -> typename std::shared_ptr<Receiver<M>> {
if (is_shutdown_.load()) {
AINFO << "transport has been shut down.";
return nullptr;
}
std::shared_ptr<Receiver<M>> receiver = nullptr;
RoleAttributes modified_attr = attr;
if (!modified_attr.has_qos_profile()) {
......
......@@ -30,9 +30,9 @@ using TransmitterPtr = std::shared_ptr<Transmitter<proto::UnitTest>>;
using ReceiverPtr = std::shared_ptr<Receiver<proto::UnitTest>>;
TEST(TransportTest, constructor) {
Transport transport_a;
Transport transport_b;
EXPECT_EQ(transport_a.participant(), transport_b.participant());
auto transport_a = Transport::Instance();
auto transport_b = Transport::Instance();
EXPECT_EQ(transport_a->participant(), transport_b->participant());
}
TEST(TransportTest, create_transmitter) {
......@@ -45,11 +45,13 @@ TEST(TransportTest, create_transmitter) {
attr.set_id(id.HashValue());
TransmitterPtr intra =
Transport::CreateTransmitter<proto::UnitTest>(attr, OptionalMode::INTRA);
Transport::Instance()->CreateTransmitter<proto::UnitTest>(
attr, OptionalMode::INTRA);
EXPECT_EQ(typeid(*intra), typeid(IntraTransmitter<proto::UnitTest>));
TransmitterPtr shm =
Transport::CreateTransmitter<proto::UnitTest>(attr, OptionalMode::SHM);
Transport::Instance()->CreateTransmitter<proto::UnitTest>(
attr, OptionalMode::SHM);
EXPECT_EQ(typeid(*shm), typeid(ShmTransmitter<proto::UnitTest>));
}
......@@ -62,11 +64,11 @@ TEST(TransportTest, create_receiver) {
auto listener = [](const std::shared_ptr<proto::UnitTest>&,
const MessageInfo&, const RoleAttributes&) {};
ReceiverPtr intra = Transport::CreateReceiver<proto::UnitTest>(
ReceiverPtr intra = Transport::Instance()->CreateReceiver<proto::UnitTest>(
attr, listener, OptionalMode::INTRA);
EXPECT_EQ(typeid(*intra), typeid(IntraReceiver<proto::UnitTest>));
ReceiverPtr shm = Transport::CreateReceiver<proto::UnitTest>(
ReceiverPtr shm = Transport::Instance()->CreateReceiver<proto::UnitTest>(
attr, listener, OptionalMode::SHM);
EXPECT_EQ(typeid(*shm), typeid(ShmReceiver<proto::UnitTest>));
}
......@@ -77,7 +79,8 @@ TEST(TransportTest, create_receiver) {
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
apollo::cyber::transport::Transport::Instance();
auto res = RUN_ALL_TESTS();
apollo::cyber::transport::Transport::Shutdown();
apollo::cyber::transport::Transport::Instance()->Shutdown();
return res;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册