提交 5407293e 编写于 作者: G gruminions 提交者: fengqikai1414

framework: optimize history

上级 45a5a674
...@@ -19,11 +19,11 @@ using namespace apollo::cybertron::proto; ...@@ -19,11 +19,11 @@ using namespace apollo::cybertron::proto;
struct RunConfig { struct RunConfig {
RunConfig() RunConfig()
: msg_size(1000), msg_num(1000), sleep_us(10000), lower_reach_num(1) {} : msg_size(1000), msg_num(1000), sleep_us(10000), receiver_num(1) {}
int msg_size; int msg_size;
int msg_num; int msg_num;
int sleep_us; int sleep_us;
int lower_reach_num; int receiver_num;
}; };
struct RunStatistic { struct RunStatistic {
...@@ -38,9 +38,9 @@ struct RunStatistic { ...@@ -38,9 +38,9 @@ struct RunStatistic {
uint64_t total_latency; uint64_t total_latency;
}; };
const int MAX_LOWER_REACH_NUM = 10; const int MAX_RECEIVER_NUM = 10;
std::shared_ptr<LowerReach<RawMessage>> lower_reach[MAX_LOWER_REACH_NUM]; std::shared_ptr<Receiver<RawMessage>> receiver[MAX_RECEIVER_NUM];
RunStatistic statistic[MAX_LOWER_REACH_NUM]; RunStatistic statistic[MAX_RECEIVER_NUM];
bool Fill(std::string* dst) { bool Fill(std::string* dst) {
if (dst == nullptr) { if (dst == nullptr) {
...@@ -93,10 +93,10 @@ bool Init(char* argv[], RunConfig* cfg) { ...@@ -93,10 +93,10 @@ bool Init(char* argv[], RunConfig* cfg) {
} }
tmp = atoi(argv[4]); tmp = atoi(argv[4]);
if (tmp < 1 || tmp > MAX_LOWER_REACH_NUM) { if (tmp < 1 || tmp > MAX_RECEIVER_NUM) {
return false; return false;
} else { } else {
cfg->lower_reach_num = tmp; cfg->receiver_num = tmp;
} }
return true; return true;
...@@ -110,8 +110,8 @@ int main(int argc, char* argv[]) { ...@@ -110,8 +110,8 @@ int main(int argc, char* argv[]) {
std::cout << " argv[2] message transmit number" << std::endl; std::cout << " argv[2] message transmit number" << std::endl;
std::cout << " argv[3] message transmit frequency(Hz, 0 means max)" std::cout << " argv[3] message transmit frequency(Hz, 0 means max)"
<< std::endl; << std::endl;
std::cout << " argv[4] lower_reach number(1~" << MAX_LOWER_REACH_NUM std::cout << " argv[4] receiver number(1~" << MAX_RECEIVER_NUM << ")"
<< ")" << std::endl; << std::endl;
return 0; return 0;
} }
...@@ -125,25 +125,25 @@ int main(int argc, char* argv[]) { ...@@ -125,25 +125,25 @@ int main(int argc, char* argv[]) {
std::cout << " message size: " << cfg.msg_size << std::endl; std::cout << " message size: " << cfg.msg_size << std::endl;
std::cout << " transmit num: " << cfg.msg_num << std::endl; std::cout << " transmit num: " << cfg.msg_num << std::endl;
std::cout << " interval (us): " << cfg.sleep_us << std::endl; std::cout << " interval (us): " << cfg.sleep_us << std::endl;
std::cout << "lower_reach num: " << cfg.lower_reach_num << std::endl; std::cout << "receiver num: " << cfg.receiver_num << std::endl;
RoleAttributes attr; RoleAttributes attr;
attr.set_channel_name("channel"); attr.set_channel_name("channel");
// create upper_reach // create transmitter
std::shared_ptr<apollo::cybertron::transport::UpperReach<RawMessage>> std::shared_ptr<apollo::cybertron::transport::Transmitter<RawMessage>>
upper_reach; transmitter;
try { try {
upper_reach = transmitter =
Transport::CreateUpperReach<RawMessage>(attr, OptionalMode::INTRA); Transport::CreateTransmitter<RawMessage>(attr, OptionalMode::INTRA);
} catch (...) { } catch (...) {
return -1; return -1;
} }
// create lower_reach // create receiver
for (int i = 0; i < cfg.lower_reach_num; ++i) { for (int i = 0; i < cfg.receiver_num; ++i) {
try { try {
lower_reach[i] = Transport::CreateLowerReach<RawMessage>( receiver[i] = Transport::CreateReceiver<RawMessage>(
attr, attr,
[i](const std::shared_ptr<RawMessage>& msg, [i](const std::shared_ptr<RawMessage>& msg,
const MessageInfo& msg_info, const RoleAttributes& attr) { const MessageInfo& msg_info, const RoleAttributes& attr) {
...@@ -170,7 +170,7 @@ int main(int argc, char* argv[]) { ...@@ -170,7 +170,7 @@ int main(int argc, char* argv[]) {
// transmit msg // transmit msg
for (int i = 0; i < cfg.msg_num; ++i) { for (int i = 0; i < cfg.msg_num; ++i) {
Fill(&msg->message); Fill(&msg->message);
upper_reach->Transmit(msg); transmitter->Transmit(msg);
usleep(cfg.sleep_us); usleep(cfg.sleep_us);
static int last_percent = 0; static int last_percent = 0;
...@@ -189,8 +189,8 @@ int main(int argc, char* argv[]) { ...@@ -189,8 +189,8 @@ int main(int argc, char* argv[]) {
RunStatistic total; RunStatistic total;
// show result // show result
for (int i = 0; i < cfg.lower_reach_num; ++i) { for (int i = 0; i < cfg.receiver_num; ++i) {
std::cout << "lower_reach[" << i << "]" << std::endl; std::cout << "receiver[" << i << "]" << std::endl;
std::cout << " recv num: " << statistic[i].recv_num << std::endl; std::cout << " recv num: " << statistic[i].recv_num << std::endl;
std::cout << " min latency(ns): " << statistic[i].min_latency << std::endl; std::cout << " min latency(ns): " << statistic[i].min_latency << std::endl;
std::cout << " max latency(ns): " << statistic[i].max_latency << std::endl; std::cout << " max latency(ns): " << statistic[i].max_latency << std::endl;
......
...@@ -18,9 +18,9 @@ ...@@ -18,9 +18,9 @@
#define CYBERTRON_TRANSPORT_MESSAGE_HISTORY_H_ #define CYBERTRON_TRANSPORT_MESSAGE_HISTORY_H_
#include <cstdint> #include <cstdint>
#include <list>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <queue>
#include <vector> #include <vector>
#include "cybertron/common/global_data.h" #include "cybertron/common/global_data.h"
...@@ -63,7 +63,7 @@ class History { ...@@ -63,7 +63,7 @@ class History {
uint32_t depth_; uint32_t depth_;
uint32_t max_depth_; uint32_t max_depth_;
bool is_full_; bool is_full_;
std::queue<CachedMessage> msgs_; std::list<CachedMessage> msgs_;
std::mutex msgs_mutex_; std::mutex msgs_mutex_;
}; };
...@@ -100,10 +100,10 @@ void History<MessageT>::Add(const MessagePtr& msg, ...@@ -100,10 +100,10 @@ void History<MessageT>::Add(const MessagePtr& msg,
} }
std::lock_guard<std::mutex> lock(msgs_mutex_); std::lock_guard<std::mutex> lock(msgs_mutex_);
if (is_full_) { if (is_full_) {
msgs_.pop(); msgs_.pop_front();
} }
msgs_.emplace(msg, msg_info); msgs_.emplace_back(msg, msg_info);
if (!is_full_) { if (!is_full_) {
if (msgs_.size() == depth_) { if (msgs_.size() == depth_) {
...@@ -115,9 +115,7 @@ void History<MessageT>::Add(const MessagePtr& msg, ...@@ -115,9 +115,7 @@ void History<MessageT>::Add(const MessagePtr& msg,
template <typename MessageT> template <typename MessageT>
void History<MessageT>::Clear() { void History<MessageT>::Clear() {
std::lock_guard<std::mutex> lock(msgs_mutex_); std::lock_guard<std::mutex> lock(msgs_mutex_);
while (!msgs_.empty()) { msgs_.clear();
msgs_.pop();
}
} }
template <typename MessageT> template <typename MessageT>
...@@ -125,15 +123,13 @@ void History<MessageT>::GetCachedMessage(std::vector<CachedMessage>* msgs) { ...@@ -125,15 +123,13 @@ void History<MessageT>::GetCachedMessage(std::vector<CachedMessage>* msgs) {
if (msgs == nullptr) { if (msgs == nullptr) {
return; return;
} }
std::queue<CachedMessage> local_msgs;
{ {
std::lock_guard<std::mutex> lock(msgs_mutex_); std::lock_guard<std::mutex> lock(msgs_mutex_);
local_msgs = msgs_; msgs->reserve(msgs_.size());
} for (auto& item : msgs_) {
msgs->emplace_back(item);
while (!local_msgs.empty()) { }
msgs->emplace_back(local_msgs.front());
local_msgs.pop();
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册