diff --git a/framework/benchmark/transport/intra.cpp b/framework/benchmark/transport/intra.cpp index 99ff684d68f2853057b6e189443a1fa1599abcf2..c8f81c33eee73c67e56b631c548d2aabb619f472 100644 --- a/framework/benchmark/transport/intra.cpp +++ b/framework/benchmark/transport/intra.cpp @@ -19,11 +19,11 @@ using namespace apollo::cybertron::proto; struct RunConfig { RunConfig() - : msg_size(1000), msg_num(1000), sleep_us(10000), lower_reach_num(1) {} + : msg_size(1000), msg_num(1000), sleep_us(10000), receiver_num(1) {} int msg_size; int msg_num; int sleep_us; - int lower_reach_num; + int receiver_num; }; struct RunStatistic { @@ -38,9 +38,9 @@ struct RunStatistic { uint64_t total_latency; }; -const int MAX_LOWER_REACH_NUM = 10; -std::shared_ptr> lower_reach[MAX_LOWER_REACH_NUM]; -RunStatistic statistic[MAX_LOWER_REACH_NUM]; +const int MAX_RECEIVER_NUM = 10; +std::shared_ptr> receiver[MAX_RECEIVER_NUM]; +RunStatistic statistic[MAX_RECEIVER_NUM]; bool Fill(std::string* dst) { if (dst == nullptr) { @@ -93,10 +93,10 @@ bool Init(char* argv[], RunConfig* cfg) { } tmp = atoi(argv[4]); - if (tmp < 1 || tmp > MAX_LOWER_REACH_NUM) { + if (tmp < 1 || tmp > MAX_RECEIVER_NUM) { return false; } else { - cfg->lower_reach_num = tmp; + cfg->receiver_num = tmp; } return true; @@ -110,8 +110,8 @@ int main(int argc, char* argv[]) { std::cout << " argv[2] message transmit number" << std::endl; std::cout << " argv[3] message transmit frequency(Hz, 0 means max)" << std::endl; - std::cout << " argv[4] lower_reach number(1~" << MAX_LOWER_REACH_NUM - << ")" << std::endl; + std::cout << " argv[4] receiver number(1~" << MAX_RECEIVER_NUM << ")" + << std::endl; return 0; } @@ -125,25 +125,25 @@ int main(int argc, char* argv[]) { std::cout << " message size: " << cfg.msg_size << std::endl; std::cout << " transmit num: " << cfg.msg_num << std::endl; std::cout << " interval (us): " << cfg.sleep_us << std::endl; - std::cout << "lower_reach num: " << cfg.lower_reach_num << std::endl; + std::cout << "receiver num: " << cfg.receiver_num << std::endl; RoleAttributes attr; attr.set_channel_name("channel"); - // create upper_reach - std::shared_ptr> - upper_reach; + // create transmitter + std::shared_ptr> + transmitter; try { - upper_reach = - Transport::CreateUpperReach(attr, OptionalMode::INTRA); + transmitter = + Transport::CreateTransmitter(attr, OptionalMode::INTRA); } catch (...) { return -1; } - // create lower_reach - for (int i = 0; i < cfg.lower_reach_num; ++i) { + // create receiver + for (int i = 0; i < cfg.receiver_num; ++i) { try { - lower_reach[i] = Transport::CreateLowerReach( + receiver[i] = Transport::CreateReceiver( attr, [i](const std::shared_ptr& msg, const MessageInfo& msg_info, const RoleAttributes& attr) { @@ -170,7 +170,7 @@ int main(int argc, char* argv[]) { // transmit msg for (int i = 0; i < cfg.msg_num; ++i) { Fill(&msg->message); - upper_reach->Transmit(msg); + transmitter->Transmit(msg); usleep(cfg.sleep_us); static int last_percent = 0; @@ -189,8 +189,8 @@ int main(int argc, char* argv[]) { RunStatistic total; // show result - for (int i = 0; i < cfg.lower_reach_num; ++i) { - std::cout << "lower_reach[" << i << "]" << std::endl; + for (int i = 0; i < cfg.receiver_num; ++i) { + std::cout << "receiver[" << i << "]" << std::endl; std::cout << " recv num: " << statistic[i].recv_num << std::endl; std::cout << " min latency(ns): " << statistic[i].min_latency << std::endl; std::cout << " max latency(ns): " << statistic[i].max_latency << std::endl; diff --git a/framework/cybertron/transport/message/history.h b/framework/cybertron/transport/message/history.h index 9fea9d1eec36c44eb5970f8ea9c8e48ddeba94bb..a1d99638afe916aa9d20e31f85901424fdbf6628 100644 --- a/framework/cybertron/transport/message/history.h +++ b/framework/cybertron/transport/message/history.h @@ -18,9 +18,9 @@ #define CYBERTRON_TRANSPORT_MESSAGE_HISTORY_H_ #include +#include #include #include -#include #include #include "cybertron/common/global_data.h" @@ -63,7 +63,7 @@ class History { uint32_t depth_; uint32_t max_depth_; bool is_full_; - std::queue msgs_; + std::list msgs_; std::mutex msgs_mutex_; }; @@ -100,10 +100,10 @@ void History::Add(const MessagePtr& msg, } std::lock_guard lock(msgs_mutex_); if (is_full_) { - msgs_.pop(); + msgs_.pop_front(); } - msgs_.emplace(msg, msg_info); + msgs_.emplace_back(msg, msg_info); if (!is_full_) { if (msgs_.size() == depth_) { @@ -115,9 +115,7 @@ void History::Add(const MessagePtr& msg, template void History::Clear() { std::lock_guard lock(msgs_mutex_); - while (!msgs_.empty()) { - msgs_.pop(); - } + msgs_.clear(); } template @@ -125,15 +123,13 @@ void History::GetCachedMessage(std::vector* msgs) { if (msgs == nullptr) { return; } - std::queue local_msgs; + { std::lock_guard lock(msgs_mutex_); - local_msgs = msgs_; - } - - while (!local_msgs.empty()) { - msgs->emplace_back(local_msgs.front()); - local_msgs.pop(); + msgs->reserve(msgs_.size()); + for (auto& item : msgs_) { + msgs->emplace_back(item); + } } }