Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Pinoxchio
apollo
提交
a5efbf0c
A
apollo
项目概览
Pinoxchio
/
apollo
与 Fork 源项目一致
从无法访问的项目Fork
通知
2
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
A
apollo
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
a5efbf0c
编写于
12月 10, 2019
作者:
M
michael4338
提交者:
Xiangquan Xiao
12月 11, 2019
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
disable dynamic modules topology sort
上级
579474ce
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
164 addition
and
207 deletion
+164
-207
modules/common/latency_recorder/proto/latency_record.proto
modules/common/latency_recorder/proto/latency_record.proto
+6
-6
modules/drivers/velodyne/compensator/BUILD
modules/drivers/velodyne/compensator/BUILD
+3
-0
modules/drivers/velodyne/compensator/compensator_component.cc
...les/drivers/velodyne/compensator/compensator_component.cc
+13
-7
modules/monitor/hardware/resource_monitor.cc
modules/monitor/hardware/resource_monitor.cc
+7
-8
modules/monitor/monitor.cc
modules/monitor/monitor.cc
+4
-3
modules/monitor/monitor.h
modules/monitor/monitor.h
+1
-1
modules/monitor/software/BUILD
modules/monitor/software/BUILD
+2
-0
modules/monitor/software/channel_monitor.cc
modules/monitor/software/channel_monitor.cc
+36
-12
modules/monitor/software/channel_monitor.h
modules/monitor/software/channel_monitor.h
+7
-2
modules/monitor/software/latency_monitor.cc
modules/monitor/software/latency_monitor.cc
+82
-166
modules/monitor/software/latency_monitor.h
modules/monitor/software/latency_monitor.h
+3
-2
未找到文件。
modules/common/latency_recorder/proto/latency_record.proto
浏览文件 @
a5efbf0c
...
...
@@ -25,15 +25,15 @@ message LatencyStat {
};
message
LatencyTrack
{
message
ModuleLatencyTrack
{
optional
string
module
_name
=
1
;
optional
LatencyStat
module
_stat
=
2
;
message
LatencyTrackMessage
{
optional
string
latency
_name
=
1
;
optional
LatencyStat
latency
_stat
=
2
;
}
repeated
ModuleLatencyTrack
module_latency
=
1
;
repeated
LatencyTrackMessage
latency_track
=
1
;
}
message
LatencyReport
{
optional
apollo.common.Header
header
=
1
;
optional
Latency
Stat
total_duration
=
2
;
optional
LatencyTrack
latency_tracks
=
3
;
optional
Latency
Track
e2es_latency
=
2
;
optional
LatencyTrack
modules_latency
=
3
;
};
modules/drivers/velodyne/compensator/BUILD
浏览文件 @
a5efbf0c
...
...
@@ -16,6 +16,9 @@ cc_library(
copts
=
[
'-DMODULE_NAME=
\\
"velodyne
\\
"'
],
deps
=
[
"//cyber"
,
"//modules/common/adapters:adapter_gflags"
,
"//modules/common/latency_recorder"
,
"//modules/common/time"
,
"//modules/drivers/velodyne/compensator:compensator_lib"
,
],
)
...
...
modules/drivers/velodyne/compensator/compensator_component.cc
浏览文件 @
a5efbf0c
...
...
@@ -16,6 +16,9 @@
#include <memory>
#include "modules/common/adapters/adapter_gflags.h"
#include "modules/common/latency_recorder/latency_recorder.h"
#include "modules/common/time/time.h"
#include "modules/drivers/velodyne/compensator/compensator_component.h"
#include "modules/drivers/velodyne/proto/velodyne.pb.h"
...
...
@@ -48,7 +51,7 @@ bool CompensatorComponent::Init() {
bool
CompensatorComponent
::
Proc
(
const
std
::
shared_ptr
<
PointCloud
>&
point_cloud
)
{
uint64_t
start
=
cyber
::
Time
().
Now
().
ToNanosecond
();
const
auto
start_time
=
common
::
time
::
Clock
::
Now
();
std
::
shared_ptr
<
PointCloud
>
point_cloud_compensated
=
compensator_pool_
->
GetObject
();
if
(
point_cloud_compensated
==
nullptr
)
{
...
...
@@ -62,14 +65,17 @@ bool CompensatorComponent::Proc(
}
point_cloud_compensated
->
Clear
();
if
(
compensator_
->
MotionCompensation
(
point_cloud
,
point_cloud_compensated
))
{
const
auto
now
=
cyber
::
Time
().
Now
().
ToNanosecond
();
auto
diff
=
now
-
start
;
auto
meta_diff
=
now
-
point_cloud_compensated
->
header
().
lidar_timestamp
();
AINFO
<<
"compenstator diff:"
<<
diff
/
1000000
<<
"ms"
<<
"meta diff: "
<<
meta_diff
/
1000000
<<
"ms"
const
auto
end_time
=
common
::
time
::
Clock
::
Now
();
AINFO
<<
"compenstator diff:"
<<
absl
::
ToInt64Nanoseconds
(
end_time
-
start_time
)
<<
";meta:"
<<
point_cloud_compensated
->
header
().
lidar_timestamp
();
point_cloud_compensated
->
mutable_header
()
->
set_sequence_num
(
seq_
);
static
common
::
LatencyRecorder
latency_recorder
(
FLAGS_pointcloud_topic
);
latency_recorder
.
AppendLatencyRecord
(
point_cloud_compensated
->
header
().
lidar_timestamp
(),
start_time
,
end_time
);
point_cloud_compensated
->
mutable_header
()
->
set_sequence_num
(
seq_
);
writer_
->
Write
(
point_cloud_compensated
);
seq_
++
;
}
...
...
modules/monitor/hardware/resource_monitor.cc
浏览文件 @
a5efbf0c
...
...
@@ -150,14 +150,13 @@ float GetSystemMemoryUsage() {
AERROR
<<
"failed to load contents from "
<<
system_mem_stat_file
;
return
0.
f
;
}
const
auto
total_memory
=
GetSystemMemoryValueFromLine
(
stat_lines
[
mem_total
]);
int64_t
used_memory
=
0
;
for
(
int
cur_line
=
0
;
cur_line
<=
slab
;
++
cur_line
)
{
if
(
cur_line
==
mem_total
||
cur_line
==
swap_total
)
{
used_memory
+=
GetSystemMemoryValueFromLine
(
stat_lines
[
cur_line
]);
}
else
if
(
cur_line
==
mem_free
||
cur_line
==
buffers
||
cur_line
==
cached
||
cur_line
==
swap_free
||
cur_line
==
slab
)
{
const
auto
total_memory
=
GetSystemMemoryValueFromLine
(
stat_lines
[
mem_total
])
+
GetSystemMemoryValueFromLine
(
stat_lines
[
swap_total
]);
int64_t
used_memory
=
total_memory
;
for
(
int
cur_line
=
mem_free
;
cur_line
<=
slab
;
++
cur_line
)
{
if
(
cur_line
==
mem_free
||
cur_line
==
buffers
||
cur_line
==
cached
||
cur_line
==
swap_free
||
cur_line
==
slab
)
{
used_memory
-=
GetSystemMemoryValueFromLine
(
stat_lines
[
cur_line
]);
}
}
...
...
modules/monitor/monitor.cc
浏览文件 @
a5efbf0c
...
...
@@ -49,8 +49,11 @@ bool Monitor::Init() {
runners_
.
emplace_back
(
new
LocalizationMonitor
());
// Monitor if processes are running.
runners_
.
emplace_back
(
new
ProcessMonitor
());
// Monitor message processing latencies across modules
const
std
::
shared_ptr
<
LatencyMonitor
>
latency_monitor
(
new
LatencyMonitor
());
runners_
.
emplace_back
(
latency_monitor
);
// Monitor if channel messages are updated in time.
runners_
.
emplace_back
(
new
ChannelMonitor
());
runners_
.
emplace_back
(
new
ChannelMonitor
(
latency_monitor
));
// Monitor if resources are sufficient.
runners_
.
emplace_back
(
new
ResourceMonitor
());
...
...
@@ -61,8 +64,6 @@ bool Monitor::Init() {
if
(
FLAGS_enable_functional_safety
)
{
runners_
.
emplace_back
(
new
FunctionalSafetyMonitor
());
}
// Monitor message processing latencies across modules
runners_
.
emplace_back
(
new
LatencyMonitor
());
return
true
;
}
...
...
modules/monitor/monitor.h
浏览文件 @
a5efbf0c
...
...
@@ -36,7 +36,7 @@ class Monitor : public apollo::cyber::TimerComponent {
bool
Proc
()
override
;
private:
std
::
vector
<
std
::
unique
_ptr
<
RecurrentRunner
>>
runners_
;
std
::
vector
<
std
::
shared
_ptr
<
RecurrentRunner
>>
runners_
;
};
CYBER_REGISTER_COMPONENT
(
Monitor
)
...
...
modules/monitor/software/BUILD
浏览文件 @
a5efbf0c
...
...
@@ -20,7 +20,9 @@ cc_library(
srcs
=
[
"channel_monitor.cc"
],
hdrs
=
[
"channel_monitor.h"
],
deps
=
[
":latency_monitor"
,
":summary_monitor"
,
"//modules/common/latency_recorder/proto:latency_record_proto"
,
"//modules/control/proto:control_proto"
,
"//modules/dreamview/proto:hmi_mode_proto"
,
"//modules/drivers/proto:sensor_proto"
,
...
...
modules/monitor/software/channel_monitor.cc
浏览文件 @
a5efbf0c
...
...
@@ -30,6 +30,7 @@
#include "cyber/common/log.h"
#include "cyber/cyber.h"
#include "modules/common/adapters/adapter_gflags.h"
#include "modules/common/latency_recorder/proto/latency_record.pb.h"
#include "modules/common/util/map_util.h"
#include "modules/control/proto/control_cmd.pb.h"
#include "modules/drivers/proto/conti_radar.pb.h"
...
...
@@ -61,7 +62,7 @@ GetReaderAndLatestMessage(const std::string& channel) {
if
(
channel
==
FLAGS_control_command_topic
)
{
const
auto
reader
=
manager
->
CreateReader
<
control
::
ControlCommand
>
(
channel
);
reader
->
Observe
();
const
auto
message
=
reader
?
reader
->
GetLatestObserved
()
:
nullptr
;
const
auto
message
=
reader
->
GetLatestObserved
()
;
return
std
::
pair
<
std
::
shared_ptr
<
cyber
::
ReaderBase
>
,
std
::
shared_ptr
<
google
::
protobuf
::
Message
>>
(
reader
,
message
);
...
...
@@ -69,7 +70,7 @@ GetReaderAndLatestMessage(const std::string& channel) {
const
auto
reader
=
manager
->
CreateReader
<
localization
::
LocalizationEstimate
>
(
channel
);
reader
->
Observe
();
const
auto
message
=
reader
?
reader
->
GetLatestObserved
()
:
nullptr
;
const
auto
message
=
reader
->
GetLatestObserved
()
;
return
std
::
pair
<
std
::
shared_ptr
<
cyber
::
ReaderBase
>
,
std
::
shared_ptr
<
google
::
protobuf
::
Message
>>
(
reader
,
message
);
...
...
@@ -77,7 +78,7 @@ GetReaderAndLatestMessage(const std::string& channel) {
const
auto
reader
=
manager
->
CreateReader
<
perception
::
PerceptionObstacles
>
(
channel
);
reader
->
Observe
();
const
auto
message
=
reader
?
reader
->
GetLatestObserved
()
:
nullptr
;
const
auto
message
=
reader
->
GetLatestObserved
()
;
return
std
::
pair
<
std
::
shared_ptr
<
cyber
::
ReaderBase
>
,
std
::
shared_ptr
<
google
::
protobuf
::
Message
>>
(
reader
,
message
);
...
...
@@ -85,28 +86,28 @@ GetReaderAndLatestMessage(const std::string& channel) {
const
auto
reader
=
manager
->
CreateReader
<
prediction
::
PredictionObstacles
>
(
channel
);
reader
->
Observe
();
const
auto
message
=
reader
?
reader
->
GetLatestObserved
()
:
nullptr
;
const
auto
message
=
reader
->
GetLatestObserved
()
;
return
std
::
pair
<
std
::
shared_ptr
<
cyber
::
ReaderBase
>
,
std
::
shared_ptr
<
google
::
protobuf
::
Message
>>
(
reader
,
message
);
}
else
if
(
channel
==
FLAGS_planning_trajectory_topic
)
{
const
auto
reader
=
manager
->
CreateReader
<
planning
::
ADCTrajectory
>
(
channel
);
reader
->
Observe
();
const
auto
message
=
reader
?
reader
->
GetLatestObserved
()
:
nullptr
;
const
auto
message
=
reader
->
GetLatestObserved
()
;
return
std
::
pair
<
std
::
shared_ptr
<
cyber
::
ReaderBase
>
,
std
::
shared_ptr
<
google
::
protobuf
::
Message
>>
(
reader
,
message
);
}
else
if
(
channel
==
FLAGS_conti_radar_topic
)
{
const
auto
reader
=
manager
->
CreateReader
<
drivers
::
ContiRadar
>
(
channel
);
reader
->
Observe
();
const
auto
message
=
reader
?
reader
->
GetLatestObserved
()
:
nullptr
;
const
auto
message
=
reader
->
GetLatestObserved
()
;
return
std
::
pair
<
std
::
shared_ptr
<
cyber
::
ReaderBase
>
,
std
::
shared_ptr
<
google
::
protobuf
::
Message
>>
(
reader
,
message
);
}
else
if
(
channel
==
FLAGS_relative_map_topic
)
{
const
auto
reader
=
manager
->
CreateReader
<
relative_map
::
MapMsg
>
(
channel
);
reader
->
Observe
();
const
auto
message
=
reader
?
reader
->
GetLatestObserved
()
:
nullptr
;
const
auto
message
=
reader
->
GetLatestObserved
()
;
return
std
::
pair
<
std
::
shared_ptr
<
cyber
::
ReaderBase
>
,
std
::
shared_ptr
<
google
::
protobuf
::
Message
>>
(
reader
,
message
);
...
...
@@ -115,7 +116,7 @@ GetReaderAndLatestMessage(const std::string& channel) {
channel
==
FLAGS_pointcloud_16_front_up_topic
)
{
const
auto
reader
=
manager
->
CreateReader
<
drivers
::
PointCloud
>
(
channel
);
reader
->
Observe
();
const
auto
message
=
reader
?
reader
->
GetLatestObserved
()
:
nullptr
;
const
auto
message
=
reader
->
GetLatestObserved
()
;
return
std
::
pair
<
std
::
shared_ptr
<
cyber
::
ReaderBase
>
,
std
::
shared_ptr
<
google
::
protobuf
::
Message
>>
(
reader
,
message
);
...
...
@@ -158,9 +159,11 @@ bool ValidateFields(const google::protobuf::Message& message,
}
// namespace
ChannelMonitor
::
ChannelMonitor
()
ChannelMonitor
::
ChannelMonitor
(
const
std
::
shared_ptr
<
LatencyMonitor
>&
latency_monitor
)
:
RecurrentRunner
(
FLAGS_channel_monitor_name
,
FLAGS_channel_monitor_interval
)
{}
FLAGS_channel_monitor_interval
),
latency_monitor
(
latency_monitor
)
{}
void
ChannelMonitor
::
RunOnce
(
const
double
current_time
)
{
auto
manager
=
MonitorManager
::
Instance
();
...
...
@@ -170,15 +173,19 @@ void ChannelMonitor::RunOnce(const double current_time) {
const
std
::
string
&
name
=
iter
.
first
;
const
auto
&
config
=
iter
.
second
;
if
(
config
.
has_channel
())
{
double
freq
;
const
auto
update_freq
=
latency_monitor
->
GetFrequency
(
config
.
channel
().
name
(),
&
freq
);
UpdateStatus
(
config
.
channel
(),
components
->
at
(
name
).
mutable_channel_status
());
components
->
at
(
name
).
mutable_channel_status
(),
update_freq
,
freq
);
}
}
}
void
ChannelMonitor
::
UpdateStatus
(
const
apollo
::
dreamview
::
ChannelMonitorConfig
&
config
,
ComponentStatus
*
status
)
{
ComponentStatus
*
status
,
const
bool
update_freq
,
const
double
freq
)
{
status
->
clear_status
();
const
auto
reader_message_pair
=
GetReaderAndLatestMessage
(
config
.
name
());
...
...
@@ -214,6 +221,23 @@ void ChannelMonitor::UpdateStatus(
}
}
// Check channel frequency
if
(
update_freq
)
{
if
(
freq
>
config
.
max_frequency_allowed
())
{
SummaryMonitor
::
EscalateStatus
(
ComponentStatus
::
WARN
,
absl
::
StrCat
(
config
.
name
(),
" has frequency "
,
freq
,
" > max allowed "
,
config
.
max_frequency_allowed
()),
status
);
}
else
if
(
freq
<
config
.
min_frequency_allowed
())
{
SummaryMonitor
::
EscalateStatus
(
ComponentStatus
::
WARN
,
absl
::
StrCat
(
config
.
name
(),
" has frequency "
,
freq
,
" < min allowed "
,
config
.
max_frequency_allowed
()),
status
);
}
}
SummaryMonitor
::
EscalateStatus
(
ComponentStatus
::
OK
,
""
,
status
);
}
...
...
modules/monitor/software/channel_monitor.h
浏览文件 @
a5efbf0c
...
...
@@ -15,22 +15,27 @@
*****************************************************************************/
#pragma once
#include <string>
#include <unordered_map>
#include "modules/dreamview/proto/hmi_mode.pb.h"
#include "modules/monitor/common/recurrent_runner.h"
#include "modules/monitor/proto/system_status.pb.h"
#include "modules/monitor/software/latency_monitor.h"
namespace
apollo
{
namespace
monitor
{
class
ChannelMonitor
:
public
RecurrentRunner
{
public:
ChannelMonitor
();
ChannelMonitor
(
const
std
::
shared_ptr
<
LatencyMonitor
>&
latency_monitor
);
void
RunOnce
(
const
double
current_time
)
override
;
private:
static
void
UpdateStatus
(
const
apollo
::
dreamview
::
ChannelMonitorConfig
&
config
,
ComponentStatus
*
status
);
ComponentStatus
*
status
,
const
bool
update_freq
,
const
double
freq
);
std
::
shared_ptr
<
LatencyMonitor
>
latency_monitor
;
};
}
// namespace monitor
...
...
modules/monitor/software/latency_monitor.cc
浏览文件 @
a5efbf0c
...
...
@@ -18,7 +18,6 @@
#include <algorithm>
#include <memory>
#include <queue>
#include <unordered_set>
#include <utility>
#include <vector>
...
...
@@ -26,6 +25,7 @@
#include "absl/strings/str_cat.h"
#include "cyber/common/log.h"
#include "modules/common/adapters/adapter_gflags.h"
#include "modules/common/util/string_util.h"
#include "modules/monitor/common/monitor_manager.h"
#include "modules/monitor/software/summary_monitor.h"
...
...
@@ -51,92 +51,6 @@ using apollo::common::LatencyReport;
using
apollo
::
common
::
LatencyStat
;
using
apollo
::
common
::
LatencyTrack
;
void
FillInStat
(
const
std
::
string
&
module_name
,
const
uint64_t
duration
,
std
::
vector
<
std
::
pair
<
std
::
string
,
uint64_t
>>*
stats
,
uint64_t
*
total_duration
)
{
*
total_duration
+=
duration
;
stats
->
emplace_back
(
module_name
,
duration
);
}
void
FillInTopo
(
const
std
::
string
&
prev_node_name
,
const
std
::
string
&
curr_node_name
,
std
::
unordered_map
<
std
::
string
,
std
::
pair
<
int
,
std
::
unordered_set
<
std
::
string
>>>*
topo
)
{
bool
not_existing
=
false
;
auto
*
edges
=
&
(
*
topo
)[
prev_node_name
].
second
;
std
::
tie
(
std
::
ignore
,
not_existing
)
=
edges
->
emplace
(
curr_node_name
);
if
(
not_existing
)
{
(
*
topo
)[
curr_node_name
].
first
+=
1
;
}
}
std
::
vector
<
std
::
string
>
topology_sort
(
std
::
unordered_map
<
std
::
string
,
std
::
pair
<
int
,
std
::
unordered_set
<
std
::
string
>>>*
topo
)
{
std
::
vector
<
std
::
string
>
sorted_modules
;
std
::
queue
<
std
::
string
>
zero_indegree_modules
;
for
(
const
auto
&
vortex
:
*
topo
)
{
if
(
vortex
.
second
.
first
==
0
)
{
zero_indegree_modules
.
push
(
vortex
.
first
);
}
}
while
(
!
zero_indegree_modules
.
empty
())
{
const
auto
cur_module
=
zero_indegree_modules
.
front
();
zero_indegree_modules
.
pop
();
sorted_modules
.
push_back
(
cur_module
);
for
(
const
auto
&
edge
:
(
*
topo
)[
cur_module
].
second
)
{
(
*
topo
)[
edge
].
first
-=
1
;
if
((
*
topo
)[
edge
].
first
==
0
)
{
zero_indegree_modules
.
push
(
edge
);
}
}
}
if
(
sorted_modules
.
size
()
!=
topo
->
size
())
{
AERROR
<<
"The topology sort failed, there must be some disorder for "
"messages traveling among modules"
;
}
return
sorted_modules
;
}
std
::
vector
<
std
::
pair
<
std
::
string
,
uint64_t
>>
GetFlowTrackStats
(
const
std
::
set
<
std
::
tuple
<
uint64_t
,
uint64_t
,
std
::
string
>>&
module_durations
,
uint64_t
*
total_duration
,
std
::
unordered_map
<
std
::
string
,
std
::
pair
<
int
,
std
::
unordered_set
<
std
::
string
>>>*
topo
)
{
const
std
::
string
module_connector
=
"->"
;
std
::
vector
<
std
::
pair
<
std
::
string
,
uint64_t
>>
stats
;
// Generate a list of <module, duration> pair, for example:
// <percetpion: 100>, <perception->prediction: 10>, <prediction: 50>, ...
auto
iter
=
module_durations
.
begin
();
std
::
string
module_name
,
prev_name
;
uint64_t
begin_time
=
0
,
end_time
,
prev_end_time
=
0
;
while
(
iter
!=
module_durations
.
end
())
{
std
::
tie
(
begin_time
,
end_time
,
module_name
)
=
*
iter
;
if
(
!
prev_name
.
empty
()
&&
prev_name
!=
module_name
)
{
const
std
::
string
mid_name
=
absl
::
StrCat
(
prev_name
,
module_connector
,
module_name
);
FillInStat
(
mid_name
,
begin_time
-
prev_end_time
,
&
stats
,
total_duration
);
FillInTopo
(
prev_name
,
mid_name
,
topo
);
FillInTopo
(
mid_name
,
module_name
,
topo
);
}
FillInStat
(
module_name
,
end_time
-
begin_time
,
&
stats
,
total_duration
);
if
(
topo
->
find
(
module_name
)
==
topo
->
end
())
{
topo
->
emplace
(
module_name
,
std
::
pair
<
int
,
std
::
unordered_set
<
std
::
string
>>
(
0
,
std
::
unordered_set
<
std
::
string
>
()));
}
prev_name
=
module_name
;
prev_end_time
=
end_time
;
++
iter
;
}
return
stats
;
}
LatencyStat
GenerateStat
(
const
std
::
vector
<
uint64_t
>&
numbers
)
{
LatencyStat
stat
;
uint64_t
min_number
=
(
1UL
<<
63
),
max_number
=
0
,
sum
=
0
;
...
...
@@ -148,7 +62,8 @@ LatencyStat GenerateStat(const std::vector<uint64_t>& numbers) {
const
uint32_t
sample_size
=
static_cast
<
uint32_t
>
(
numbers
.
size
());
stat
.
set_min_duration
(
min_number
);
stat
.
set_max_duration
(
max_number
);
stat
.
set_aver_duration
(
static_cast
<
uint64_t
>
(
sum
/
sample_size
));
stat
.
set_aver_duration
(
sample_size
==
0
?
0
:
static_cast
<
uint64_t
>
(
sum
/
sample_size
));
stat
.
set_sample_size
(
sample_size
);
return
stat
;
}
...
...
@@ -160,6 +75,14 @@ void SetStat(const LatencyStat& src, LatencyStat* dst) {
dst
->
set_sample_size
(
src
.
sample_size
());
}
void
SetLatency
(
const
std
::
string
&
latency_name
,
const
std
::
vector
<
uint64_t
>&
latency_values
,
LatencyTrack
*
track
)
{
auto
*
latency_track
=
track
->
add_latency_track
();
latency_track
->
set_latency_name
(
latency_name
);
SetStat
(
GenerateStat
(
latency_values
),
latency_track
->
mutable_latency_stat
());
}
}
// namespace
LatencyMonitor
::
LatencyMonitor
()
...
...
@@ -178,14 +101,13 @@ void LatencyMonitor::RunOnce(const double current_time) {
for
(
auto
it
=
reader
->
Begin
();
it
!=
reader
->
End
();
++
it
)
{
const
std
::
string
current_key
=
absl
::
StrCat
((
*
it
)
->
module_name
(),
(
*
it
)
->
header
().
sequence_num
());
if
(
it
==
reader
->
Begin
())
{
first_key_of_current_round
=
current_key
;
}
if
(
current_key
==
last_processed_key
)
{
break
;
}
Update
Latency
Stat
(
*
it
);
UpdateStat
(
*
it
);
}
last_processed_key
=
first_key_of_current_round
;
...
...
@@ -197,12 +119,22 @@ void LatencyMonitor::RunOnce(const double current_time) {
}
}
void
LatencyMonitor
::
Update
Latency
Stat
(
void
LatencyMonitor
::
UpdateStat
(
const
std
::
shared_ptr
<
LatencyRecordMap
>&
records
)
{
const
auto
module_name
=
records
->
module_name
();
for
(
const
auto
&
record
:
records
->
latency_records
())
{
track_map_
[
record
.
message_id
()].
emplace
(
record
.
begin_time
(),
record
.
end_time
(),
records
->
module_name
());
record
.
end_time
(),
module_name
);
}
if
(
!
records
->
latency_records
().
empty
())
{
const
auto
begin_time
=
records
->
latency_records
().
begin
()
->
begin_time
();
const
auto
end_time
=
records
->
latency_records
().
rbegin
()
->
end_time
();
if
(
end_time
>
begin_time
)
{
freq_map_
[
module_name
]
=
records
->
latency_records
().
size
()
/
absl
::
ToDoubleSeconds
(
absl
::
Nanoseconds
(
end_time
-
begin_time
));
}
}
}
...
...
@@ -211,96 +143,80 @@ void LatencyMonitor::PublishLatencyReport() {
FLAGS_latency_reporting_topic
);
apollo
::
common
::
util
::
FillHeader
(
"LatencyReport"
,
&
latency_report_
);
AggregateLatency
();
ValidateMaxLatency
();
writer
->
Write
(
latency_report_
);
latency_report_
.
clear_header
();
track_map_
.
clear
();
latency_report_
.
clear_latency_tracks
();
latency_report_
.
clear_modules_latency
();
latency_report_
.
clear_e2es_latency
();
}
void
LatencyMonitor
::
AggregateLatency
()
{
std
::
unordered_map
<
std
::
string
,
std
::
vector
<
uint64_t
>>
tracks
;
std
::
vector
<
uint64_t
>
totals
;
std
::
unordered_map
<
std
::
string
,
std
::
pair
<
int
,
std
::
unordered_set
<
std
::
string
>>>
topo
;
// Aggregate durations by module names
for
(
auto
&
message
:
track_map_
)
{
uint64_t
total_duration
=
0
;
const
auto
stats
=
GetFlowTrackStats
(
message
.
second
,
&
total_duration
,
&
topo
);
std
::
string
module_name
;
uint64_t
duration
=
0
;
totals
.
push_back
(
total_duration
);
for
(
const
auto
&
module_stat
:
stats
)
{
std
::
tie
(
module_name
,
duration
)
=
module_stat
;
tracks
[
module_name
].
push_back
(
duration
);
static
const
std
::
string
kE2EStartPoint
=
FLAGS_pointcloud_topic
;
std
::
unordered_map
<
std
::
string
,
std
::
vector
<
uint64_t
>>
modules_track
;
std
::
unordered_map
<
std
::
string
,
std
::
vector
<
uint64_t
>>
e2es_track
;
std
::
unordered_set
<
std
::
string
>
all_modules
;
// Aggregate modules latencies
std
::
string
module_name
;
uint64_t
begin_time
=
0
,
end_time
=
0
;
for
(
const
auto
&
message
:
track_map_
)
{
auto
iter
=
message
.
second
.
begin
();
while
(
iter
!=
message
.
second
.
end
())
{
std
::
tie
(
begin_time
,
end_time
,
module_name
)
=
*
iter
;
modules_track
[
module_name
].
push_back
(
end_time
-
begin_time
);
all_modules
.
emplace
(
module_name
);
++
iter
;
}
}
// Aggregate E2E latencies
for
(
const
auto
&
message
:
track_map_
)
{
uint64_t
e2e_begin_time
=
0
;
std
::
unordered_map
<
std
::
string
,
uint64_t
>
e2e_latencies
;
auto
iter
=
message
.
second
.
begin
();
while
(
iter
!=
message
.
second
.
end
())
{
std
::
tie
(
begin_time
,
std
::
ignore
,
module_name
)
=
*
iter
;
if
(
e2e_begin_time
==
0
&&
module_name
==
kE2EStartPoint
)
{
e2e_begin_time
=
begin_time
;
}
if
(
module_name
!=
kE2EStartPoint
&&
e2e_latencies
.
find
(
module_name
)
==
e2e_latencies
.
end
())
{
e2e_latencies
[
module_name
]
=
begin_time
-
e2e_begin_time
;
}
++
iter
;
}
}
// The results could be in the following fromat:
// total: min(500), max(600), average(550), sample_size(1500)
// e2e latency:
// pointcloud -> perception: min(500), max(600), average(550),
// sample_size(1500) pointcloud -> planning: min(800), max(1000),
// average(900), sample_size(1500) pointcloud -> control: min(1200),
// max(1300), average(1250), sample_size(1500)
// ...
// modules latency:
// perception: min(5), max(50), average(30), sample_size(1000)
// p
erception->prediction: min(0), max(5), average(3), sample_size(10
00)
//
prediction: min(50), max(500), average(8
0), sample_size(800)
// p
rediction: min(500), max(5000), average(2000), sample_size(8
00)
//
control: min(500), max(800), average(60
0), sample_size(800)
// ...
SetStat
(
GenerateStat
(
totals
),
latency_report_
.
mutable_total_duration
());
// Sort the modules following the messages flowing direction,
// for better display
const
auto
topo_sorted_modules
=
topology_sort
(
&
topo
);
auto
*
latency_tracks
=
latency_report_
.
mutable_latency_tracks
();
for
(
const
auto
&
module_name
:
topo_sorted_modules
)
{
auto
*
module_latency
=
latency_tracks
->
add_module_latency
();
module_latency
->
set_module_name
(
module_name
);
SetStat
(
GenerateStat
(
tracks
[
module_name
]),
module_latency
->
mutable_module_stat
());
auto
*
modules_latency
=
latency_report_
.
mutable_modules_latency
();
for
(
const
auto
&
module
:
modules_track
)
{
SetLatency
(
module
.
first
,
module
.
second
,
modules_latency
);
}
auto
*
e2es_latency
=
latency_report_
.
mutable_e2es_latency
();
for
(
const
auto
&
e2e
:
e2es_track
)
{
SetLatency
(
absl
::
StrCat
(
kE2EStartPoint
,
" -> "
,
e2e
.
first
),
e2e
.
second
,
e2es_latency
);
}
}
void
LatencyMonitor
::
ValidateMaxLatency
()
{
auto
manager
=
MonitorManager
::
Instance
();
const
auto
&
mode
=
manager
->
GetHMIMode
();
auto
*
components
=
manager
->
GetStatus
()
->
mutable_components
();
for
(
const
auto
&
iter
:
mode
.
monitored_components
())
{
const
std
::
string
&
name
=
iter
.
first
;
if
(
iter
.
second
.
has_channel
())
{
const
auto
config
=
iter
.
second
.
channel
();
if
(
!
config
.
has_max_latency_allowed
())
{
continue
;
}
auto
*
status
=
components
->
at
(
name
).
mutable_channel_status
();
status
->
clear_status
();
for
(
const
auto
&
latency
:
latency_report_
.
latency_tracks
().
module_latency
())
{
if
(
latency
.
module_name
()
==
config
.
name
())
{
if
(
latency
.
module_stat
().
aver_duration
()
>
config
.
max_latency_allowed
())
{
SummaryMonitor
::
EscalateStatus
(
ComponentStatus
::
WARN
,
absl
::
StrCat
(
config
.
name
(),
" has average latency "
,
latency
.
module_stat
().
aver_duration
(),
" > maximum "
,
config
.
max_latency_allowed
()),
status
);
}
else
if
(
latency
.
module_stat
().
aver_duration
()
<
config
.
min_latency_allowed
())
{
SummaryMonitor
::
EscalateStatus
(
ComponentStatus
::
WARN
,
absl
::
StrCat
(
config
.
name
(),
" has average latency "
,
latency
.
module_stat
().
aver_duration
(),
" < minimum "
,
config
.
min_latency_allowed
()),
status
);
}
}
}
SummaryMonitor
::
EscalateStatus
(
ComponentStatus
::
OK
,
""
,
status
);
}
bool
LatencyMonitor
::
GetFrequency
(
const
std
::
string
&
channel_name
,
double
*
freq
)
{
if
(
freq_map_
.
find
(
channel_name
)
==
freq_map_
.
end
())
{
return
false
;
}
*
freq
=
freq_map_
[
channel_name
];
return
true
;
}
}
// namespace monitor
...
...
modules/monitor/software/latency_monitor.h
浏览文件 @
a5efbf0c
...
...
@@ -32,18 +32,19 @@ class LatencyMonitor : public RecurrentRunner {
public:
LatencyMonitor
();
void
RunOnce
(
const
double
current_time
)
override
;
bool
GetFrequency
(
const
std
::
string
&
channel_name
,
double
*
freq
);
private:
void
Update
Latency
Stat
(
void
UpdateStat
(
const
std
::
shared_ptr
<
apollo
::
common
::
LatencyRecordMap
>&
records
);
void
PublishLatencyReport
();
void
AggregateLatency
();
void
ValidateMaxLatency
();
apollo
::
common
::
LatencyReport
latency_report_
;
std
::
unordered_map
<
uint64_t
,
std
::
set
<
std
::
tuple
<
uint64_t
,
uint64_t
,
std
::
string
>>>
track_map_
;
std
::
unordered_map
<
std
::
string
,
double
>
freq_map_
;
double
flush_time_
=
0.0
;
};
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录