提交 2bf79c1f 编写于 作者: Z zhangchao19 提交者: fengqikai1414

cyber: update perf event

上级 097d5fec
......@@ -22,6 +22,7 @@
#include <string>
#include "cyber/common/global_data.h"
#include "cyber/common/macros.h"
namespace apollo {
namespace cyber {
......@@ -29,7 +30,19 @@ namespace event {
enum class EventType { SCHED_EVENT = 0, TRANS_EVENT = 1, TRY_FETCH_EVENT = 3 };
enum class TransPerf { TRANS_FROM = 1, TRANS_TO = 2, WRITE_NOTIFY = 3 };
enum class TransPerf {
TRANSMIT_BEGIN = 0,
SERIALIZE = 1,
SEND = 2,
MESSAGE_ARRIVE = 3,
OBTAIN = 4, // only for shm
DESERIALIZE = 5,
DISPATCH = 6,
NOTIFY = 7,
FETCH = 8,
CALLBACK = 9,
TRANS_END
};
enum class SchedPerf {
SWAP_IN = 1,
......@@ -47,13 +60,28 @@ class EventBase {
void set_etype(int etype) { etype_ = etype; }
void set_stamp(uint64_t stamp) { stamp_ = stamp; }
virtual void set_cr_id(uint64_t cr_id) {}
virtual void set_cr_state(int cr_state) {}
virtual void set_proc_id(int proc_id) {}
virtual void set_fetch_res(int fetch_res) {}
virtual void set_cr_id(uint64_t cr_id) {
UNUSED(cr_id);
}
virtual void set_cr_state(int cr_state) {
UNUSED(cr_state);
}
virtual void set_proc_id(int proc_id) {
UNUSED(proc_id);
}
virtual void set_fetch_res(int fetch_res) {
UNUSED(fetch_res);
}
virtual void set_msg_seq(uint64_t msg_seq) {}
virtual void set_channel_id(uint64_t channel_id) {}
virtual void set_msg_seq(uint64_t msg_seq) {
UNUSED(msg_seq);
}
virtual void set_channel_id(uint64_t channel_id) {
UNUSED(channel_id);
}
virtual void set_adder(const std::string& adder) {
UNUSED(adder);
}
protected:
int etype_;
......@@ -86,12 +114,12 @@ class SchedEvent : public EventBase {
void set_proc_id(int proc_id) override { proc_id_ = proc_id; }
private:
int cr_state_;
int proc_id_;
uint64_t cr_id_;
int cr_state_ = 1;
int proc_id_ = 0;
uint64_t cr_id_ = 0;
};
// event_id
// event_id = 1 transport
// 1 transport time
// 2 write_data_cache & notify listener
class TransportEvent : public EventBase {
......@@ -104,7 +132,8 @@ class TransportEvent : public EventBase {
ss << eid_ << "\t";
ss << common::GlobalData::GetChannelById(channel_id_) << "\t";
ss << msg_seq_ << "\t";
ss << stamp_;
ss << stamp_ << "\t";
ss << adder_;
return ss.str();
}
......@@ -112,8 +141,37 @@ class TransportEvent : public EventBase {
void set_channel_id(uint64_t channel_id) override {
channel_id_ = channel_id;
}
void set_adder(const std::string& adder) override {
adder_ = adder;
}
static std::string ShowTransPerf(TransPerf type) {
if (type == TransPerf::TRANSMIT_BEGIN) {
return "TRANSMIT_BEGIN";
} else if (type == TransPerf::SERIALIZE) {
return "SERIALIZE";
} else if (type == TransPerf::SEND) {
return "SEND";
} else if (type == TransPerf::MESSAGE_ARRIVE) {
return "MESSAGE_ARRIVE";
} else if (type == TransPerf::OBTAIN) {
return "OBTAIN";
} else if (type == TransPerf::DESERIALIZE) {
return "DESERIALIZE";
} else if (type == TransPerf::DISPATCH) {
return "DISPATCH";
} else if (type == TransPerf::NOTIFY) {
return "NOTIFY";
} else if (type == TransPerf::FETCH) {
return "FETCH";
} else if (type == TransPerf::CALLBACK) {
return "CALLBACK";
}
return "";
}
private:
std::string adder_ = "";
uint64_t msg_seq_ = 0;
uint64_t channel_id_ = UINT64_MAX;
};
......
......@@ -18,10 +18,8 @@
#include <string>
#include "cyber/base/macros.h"
#include "cyber/common/environment.h"
#include "cyber/common/global_data.h"
#include "cyber/common/log.h"
#include "cyber/common/macros.h"
#include "cyber/state.h"
#include "cyber/time/time.h"
......@@ -29,19 +27,18 @@ namespace apollo {
namespace cyber {
namespace event {
using common::GetEnv;
using common::GlobalData;
using proto::PerfConf;
using proto::PerfType;
PerfEventCache::PerfEventCache() {
auto trans_perf = GetEnv("cyber_trans_perf");
if (trans_perf != "" && std::stoi(trans_perf)) {
enable_trans_perf_ = true;
}
auto sched_perf = GetEnv("cyber_sched_perf");
if (sched_perf != "" && std::stoi(sched_perf)) {
enable_sched_perf_ = true;
auto& global_conf = GlobalData::Instance()->Config();
if (global_conf.has_perf_conf()) {
perf_conf_.CopyFrom(global_conf.perf_conf());
enable_ = perf_conf_.enable();
}
if (enable_sched_perf_ || enable_trans_perf_) {
if (enable_) {
if (!event_queue_.Init(kEventQueueSize)) {
AERROR << "Event queue init failed.";
throw std::runtime_error("Event queue init failed.");
......@@ -51,14 +48,15 @@ PerfEventCache::PerfEventCache() {
}
PerfEventCache::~PerfEventCache() {
if (!enable_sched_perf_ && !enable_trans_perf_) {
return;
}
Shutdown();
}
if (shutdown_.exchange(true)) {
void PerfEventCache::Shutdown() {
if (!enable_) {
return;
}
shutdown_ = true;
event_queue_.BreakAllWait();
if (io_thread_.joinable()) {
io_thread_.join();
......@@ -72,7 +70,12 @@ PerfEventCache::~PerfEventCache() {
void PerfEventCache::AddSchedEvent(const SchedPerf event_id,
const uint64_t cr_id, const int proc_id,
const int cr_state) {
if (cyber_likely(!enable_sched_perf_)) {
if (!enable_) {
return;
}
if (perf_conf_.type() != PerfType::SCHED &&
perf_conf_.type() != PerfType::ALL) {
return;
}
......@@ -88,8 +91,15 @@ void PerfEventCache::AddSchedEvent(const SchedPerf event_id,
void PerfEventCache::AddTransportEvent(const TransPerf event_id,
const uint64_t channel_id,
const uint64_t msg_seq) {
if (cyber_likely(!enable_trans_perf_)) {
const uint64_t msg_seq,
const uint64_t stamp,
const std::string& adder) {
if (!enable_) {
return;
}
if (perf_conf_.type() != PerfType::TRANSPORT &&
perf_conf_.type() != PerfType::ALL) {
return;
}
......@@ -97,7 +107,12 @@ void PerfEventCache::AddTransportEvent(const TransPerf event_id,
e->set_eid(static_cast<int>(event_id));
e->set_channel_id(channel_id);
e->set_msg_seq(msg_seq);
e->set_stamp(Time::Now().ToNanosecond());
e->set_adder(adder);
auto t = stamp;
if (stamp == 0) {
t = Time::Now().ToNanosecond();
}
e->set_stamp(t);
event_queue_.Enqueue(e);
}
......@@ -120,7 +135,10 @@ void PerfEventCache::Run() {
void PerfEventCache::Start() {
auto now = Time::Now();
std::string perf_file = "cyber_perf_" + now.ToString() + ".data";
std::replace(perf_file.begin(), perf_file.end(), ' ', '_');
std::replace(perf_file.begin(), perf_file.end(), ':', '-');
of_.open(perf_file, std::ios::trunc);
perf_file_ = perf_file;
of_ << Time::Now().ToNanosecond() << std::endl;
io_thread_ = std::thread(&PerfEventCache::Run, this);
}
......
......@@ -14,17 +14,19 @@
* limitations under the License.
*****************************************************************************/
#ifndef CYBER_EVENT_CACHE_H_
#define CYBER_EVENT_CACHE_H_
#ifndef CYBER_EVENT_PERF_EVENT_CACHE_H_
#define CYBER_EVENT_PERF_EVENT_CACHE_H_
#include <chrono>
#include <fstream>
#include <memory>
#include <string>
#include <thread>
#include "cyber/base/bounded_queue.h"
#include "cyber/common/macros.h"
#include "cyber/event/perf_event.h"
#include "cyber/proto/perf_conf.pb.h"
namespace apollo {
namespace cyber {
......@@ -38,7 +40,12 @@ class PerfEventCache {
void AddSchedEvent(const SchedPerf event_id, const uint64_t cr_id,
const int proc_id, const int cr_state = -1);
void AddTransportEvent(const TransPerf event_id, const uint64_t channel_id,
const uint64_t msg_seq);
const uint64_t msg_seq, const uint64_t stamp = 0,
const std::string& adder = "-");
std::string PerfFile() { return perf_file_; }
void Shutdown();
private:
void Start();
......@@ -47,10 +54,11 @@ class PerfEventCache {
std::thread io_thread_;
std::ofstream of_;
bool enable_trans_perf_ = false;
bool enable_sched_perf_ = false;
std::atomic<bool> shutdown_ = {false};
bool enable_ = false;
bool shutdown_ = false;
proto::PerfConf perf_conf_;
std::string perf_file_ = "";
base::BoundedQueue<EventBasePtr> event_queue_;
const int kFlushSize = 512;
......@@ -58,8 +66,9 @@ class PerfEventCache {
DECLARE_SINGLETON(PerfEventCache)
};
} // namespace event
} // namespace cyber
} // namespace apollo
#endif // CYBER_INIT_H_
#endif // CYBER_EVENT_PERF_EVENT_CACHE_H_
......@@ -212,12 +212,12 @@ auto ReceiverManager<MessageT>::GetReceiver(
(void)msg_info;
(void)reader_attr;
PerfEventCache::Instance()->AddTransportEvent(
TransPerf::TRANS_TO, reader_attr.channel_id(),
TransPerf::DISPATCH, reader_attr.channel_id(),
msg_info.seq_num());
data::DataDispatcher<MessageT>::Instance()->Dispatch(
reader_attr.channel_id(), msg);
PerfEventCache::Instance()->AddTransportEvent(
TransPerf::WRITE_NOTIFY, reader_attr.channel_id(),
TransPerf::NOTIFY, reader_attr.channel_id(),
msg_info.seq_num());
});
}
......
......@@ -19,6 +19,7 @@ proto_library(
":run_mode_conf_proto",
":scheduler_conf_proto",
":transport_conf_proto",
":perf_conf_proto",
],
)
......@@ -50,6 +51,20 @@ proto_library(
],
)
cc_proto_library(
name = "perf_conf_cc_proto",
deps = [
":perf_conf_proto",
],
)
proto_library(
name = "perf_conf_proto",
srcs = [
"perf_conf.proto",
],
)
cc_proto_library(
name = "scheduler_conf_cc_proto",
deps = [
......
......@@ -5,9 +5,11 @@ package apollo.cyber.proto;
import "cyber/proto/scheduler_conf.proto";
import "cyber/proto/transport_conf.proto";
import "cyber/proto/run_mode_conf.proto";
import "cyber/proto/perf_conf.proto";
message CyberConfig {
optional SchedulerConf scheduler_conf = 1;
optional TransportConf transport_conf = 2;
optional RunModeConf run_mode_conf = 3;
optional PerfConf perf_conf = 4;
}
syntax = "proto2";
package apollo.cyber.proto;
enum PerfType {
SCHED = 1;
TRANSPORT = 2;
DATA_CACHE = 3;
ALL = 4;
}
message PerfConf {
optional bool enable = 1 [default = false];
optional PerfType type = 2 [default = ALL];
}
......@@ -26,9 +26,6 @@ export GLOG_alsologtostderr=0
export GLOG_colorlogtostderr=1
export GLOG_minloglevel=0
export cyber_trans_perf=0
export cyber_sched_perf=0
# for DEBUG log
#export GLOG_minloglevel=-1
#export GLOG_v=4
......
......@@ -72,7 +72,7 @@ template <typename M>
bool Transmitter<M>::Transmit(const MessagePtr& msg) {
msg_info_.set_seq_num(NextSeqNum());
PerfEventCache::Instance()->AddTransportEvent(
TransPerf::TRANS_FROM, attr_.channel_id(), msg_info_.seq_num());
TransPerf::TRANSMIT_BEGIN, attr_.channel_id(), msg_info_.seq_num());
return Transmit(msg, msg_info_);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册