Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Pinoxchio
apollo
提交
491d256a
A
apollo
项目概览
Pinoxchio
/
apollo
与 Fork 源项目一致
从无法访问的项目Fork
通知
2
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
A
apollo
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
491d256a
编写于
9月 12, 2019
作者:
X
Xiangquan Xiao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Revert "record file cleanup" which breaks.
This reverts commit
e2f94a1e
.
上级
c9904404
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
168 addition
and
312 deletion
+168
-312
apollo.sh
apollo.sh
+6
-5
cyber/record/BUILD
cyber/record/BUILD
+0
-16
cyber/record/file/record_file_integration_test.cc
cyber/record/file/record_file_integration_test.cc
+0
-126
cyber/record/file/record_file_test.cc
cyber/record/file/record_file_test.cc
+109
-112
cyber/record/file/record_file_writer.cc
cyber/record/file/record_file_writer.cc
+1
-0
cyber/record/file/record_file_writer.h
cyber/record/file/record_file_writer.h
+4
-8
cyber/record/record_reader_test.cc
cyber/record/record_reader_test.cc
+30
-26
cyber/record/record_viewer_test.cc
cyber/record/record_viewer_test.cc
+18
-19
未找到文件。
apollo.sh
浏览文件 @
491d256a
...
...
@@ -20,8 +20,6 @@
# Utils
#=================================================
DISABLED_CYBER_MODULES
=
"except //cyber/record:record_file_integration_test"
function
source_apollo_base
()
{
DIR
=
"
$(
cd
"
$(
dirname
"
${
BASH_SOURCE
[0]
}
"
)
"
&&
pwd
)
"
cd
"
${
DIR
}
"
...
...
@@ -94,7 +92,7 @@ function check_esd_files() {
}
function
generate_build_targets
()
{
COMMON_TARGETS
=
"//cyber/... union //modules/common/kv_db/... union //modules/dreamview/...
$DISABLED_CYBER_MODULES
"
COMMON_TARGETS
=
"//cyber/... union //modules/common/kv_db/... union //modules/dreamview/..."
case
$BUILD_FILTER
in
cyber
)
BUILD_TARGETS
=
`
bazel query //cyber/...
`
...
...
@@ -118,7 +116,10 @@ function generate_build_targets() {
BUILD_TARGETS
=
`
bazel query //modules/... except //modules/perception/... union //cyber/...
`
;;
*
)
BUILD_TARGETS
=
`
bazel query //modules/... union //cyber/...
$DISABLED_CYBER_MODULES
`
# BUILD_TARGETS=`bazel query //modules/... union //cyber/...`
# FIXME(all): temporarily disable modules doesn't compile in 18.04
BUILD_TARGETS
=
`
bazel query //modules/... union //cyber/... except //modules/tools/visualizer/... except //modules/data/tools/rosbag_to_record/... except //modules/v2x/... except //modules/map/tools/map_datachecker/...
`
esac
if
[
$?
-ne
0
]
;
then
...
...
@@ -538,7 +539,7 @@ function citest_extended() {
source
cyber/setup.bash
BUILD_TARGETS
=
"
`
bazel query //modules/planning/... union //modules/common/... union //cyber/...
$DISABLED_CYBER_MODULES
`
`
bazel query //modules/planning/... union //modules/common/... union //cyber/...
`
`
bazel query //modules/prediction/... union //modules/control/...
`
"
...
...
cyber/record/BUILD
浏览文件 @
491d256a
...
...
@@ -63,22 +63,6 @@ cc_test(
],
)
cc_test
(
name
=
"record_file_integration_test"
,
size
=
"small"
,
srcs
=
[
"file/record_file_integration_test.cc"
],
tags
=
[
"cpu:3"
,
"exclusive"
,
"manual"
,
],
deps
=
[
"//cyber"
,
"//cyber/proto:record_cc_proto"
,
"@gtest//:main"
,
],
)
cc_library
(
name
=
"header_builder"
,
srcs
=
[
"header_builder.cc"
],
...
...
cyber/record/file/record_file_integration_test.cc
已删除
100644 → 0
浏览文件 @
c9904404
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#include <gtest/gtest.h>
#include <atomic>
#include <chrono>
#include <thread>
#include "cyber/record/file/record_file_base.h"
#include "cyber/record/file/record_file_reader.h"
#include "cyber/record/file/record_file_writer.h"
#include "cyber/record/header_builder.h"
namespace
apollo
{
namespace
cyber
{
namespace
record
{
class
CpuSchedulerLatency
{
public:
CpuSchedulerLatency
()
:
periodic_thread_
([
this
]
{
this
->
Callback
();
})
{}
~
CpuSchedulerLatency
()
{
running_
=
false
;
periodic_thread_
.
join
();
}
std
::
chrono
::
nanoseconds
GetMaxJitter
()
{
return
std
::
chrono
::
nanoseconds
(
max_jitter_
);
}
int64_t
GetNumSamples
()
{
return
samples_
;
}
private:
void
Callback
()
{
static
constexpr
std
::
chrono
::
milliseconds
kSleepDuration
(
10
);
auto
prev_time
=
std
::
chrono
::
steady_clock
::
now
();
std
::
this_thread
::
sleep_for
(
kSleepDuration
);
while
(
running_
)
{
const
auto
current_time
=
std
::
chrono
::
steady_clock
::
now
();
const
auto
time_since_sleep
=
current_time
-
prev_time
;
const
auto
current_jitter
=
std
::
abs
((
time_since_sleep
-
kSleepDuration
).
count
());
prev_time
=
current_time
;
max_jitter_
=
std
::
max
(
current_jitter
,
max_jitter_
);
++
samples_
;
std
::
this_thread
::
sleep_for
(
kSleepDuration
);
}
}
std
::
atomic
<
bool
>
running_
{
true
};
int64_t
max_jitter_
=
0
;
int64_t
samples_
=
0
;
std
::
thread
periodic_thread_
;
};
const
char
kTestFile
[]
=
"integration_test.record"
;
TEST
(
RecordFileTest
,
SmallMessageHighThroughputOKThreadJitter
)
{
CpuSchedulerLatency
cpu_jitter
;
RecordFileWriter
rfw
;
ASSERT_TRUE
(
rfw
.
Open
(
kTestFile
));
Header
hdr1
=
HeaderBuilder
::
GetHeaderWithSegmentParams
(
0
,
0
);
hdr1
.
set_chunk_interval
(
1000
);
hdr1
.
set_chunk_raw_size
(
0
);
ASSERT_TRUE
(
rfw
.
WriteHeader
(
hdr1
));
ASSERT_FALSE
(
rfw
.
GetHeader
().
is_complete
());
// write chunk section
static
const
std
::
string
kChannelName
=
"small_message"
;
static
constexpr
int
kMaxIterations
=
1000000000
;
static
constexpr
int64_t
kMaxSamples
=
1000
;
for
(
int
i
=
0
;
i
<
kMaxIterations
&&
cpu_jitter
.
GetNumSamples
()
<
kMaxSamples
;
++
i
)
{
SingleMessage
msg1
;
msg1
.
set_channel_name
(
kChannelName
);
msg1
.
set_content
(
"0123456789"
);
msg1
.
set_time
(
i
);
ASSERT_TRUE
(
rfw
.
WriteMessage
(
msg1
));
ASSERT_EQ
(
i
+
1
,
rfw
.
GetMessageNumber
(
kChannelName
));
}
EXPECT_GE
(
cpu_jitter
.
GetNumSamples
(),
kMaxSamples
)
<<
"This system may be to fast. Consider increasing kMaxIterations"
;
static
constexpr
int64_t
kMaxJitterMS
=
20
;
const
int64_t
max_cpu_jitter_ms
=
std
::
chrono
::
duration_cast
<
std
::
chrono
::
milliseconds
>
(
cpu_jitter
.
GetMaxJitter
())
.
count
();
EXPECT_LT
(
max_cpu_jitter_ms
,
kMaxJitterMS
);
ASSERT_FALSE
(
remove
(
kTestFile
));
}
}
// namespace record
}
// namespace cyber
}
// namespace apollo
int
main
(
int
argc
,
char
**
argv
)
{
testing
::
GTEST_FLAG
(
catch_exceptions
)
=
1
;
testing
::
InitGoogleTest
(
&
argc
,
argv
);
google
::
InitGoogleLogging
(
argv
[
0
]);
FLAGS_logtostderr
=
true
;
const
int
ret_val
=
RUN_ALL_TESTS
();
google
::
protobuf
::
ShutdownProtobufLibrary
();
google
::
ShutDownCommandLineFlags
();
return
ret_val
;
}
cyber/record/file/record_file_test.cc
浏览文件 @
491d256a
...
...
@@ -28,197 +28,197 @@ namespace apollo {
namespace
cyber
{
namespace
record
{
const
expr
char
kChan
1
[]
=
"/test1"
;
const
expr
char
kChan
2
[]
=
"/test2"
;
const
expr
char
kMsgType
[]
=
"apollo.cyber.proto.Test"
;
const
expr
char
kStr
10B
[]
=
"1234567890"
;
const
expr
char
kTestFile1
[]
=
"record_file_
test_1.record"
;
const
expr
char
kTestFile2
[]
=
"record_file_
test_2.record"
;
const
char
CHAN_
1
[]
=
"/test1"
;
const
char
CHAN_
2
[]
=
"/test2"
;
const
char
MSG_TYPE
[]
=
"apollo.cyber.proto.Test"
;
const
char
STR_
10B
[]
=
"1234567890"
;
const
char
TEST_FILE_1
[]
=
"
test_1.record"
;
const
char
TEST_FILE_2
[]
=
"
test_2.record"
;
TEST
(
ChunkTest
,
TestAll
)
{
Chunk
ck
;
ASSERT_EQ
(
0
,
ck
.
header_
.
begin_time
());
ASSERT_EQ
(
0
,
ck
.
header_
.
end_time
());
ASSERT_EQ
(
0
,
ck
.
header_
.
raw_size
());
ASSERT_EQ
(
0
,
ck
.
header_
.
message_number
());
ASSERT_TRUE
(
ck
.
empty
());
Chunk
*
ck
=
new
Chunk
()
;
ASSERT_EQ
(
0
,
ck
->
header_
.
begin_time
());
ASSERT_EQ
(
0
,
ck
->
header_
.
end_time
());
ASSERT_EQ
(
0
,
ck
->
header_
.
raw_size
());
ASSERT_EQ
(
0
,
ck
->
header_
.
message_number
());
ASSERT_TRUE
(
ck
->
empty
());
SingleMessage
msg1
;
msg1
.
set_channel_name
(
kChan
1
);
msg1
.
set_content
(
kStr
10B
);
msg1
.
set_channel_name
(
CHAN_
1
);
msg1
.
set_content
(
STR_
10B
);
msg1
.
set_time
(
1e9
);
ck
.
add
(
msg1
);
ASSERT_EQ
(
1e9
,
ck
.
header_
.
begin_time
());
ASSERT_EQ
(
1e9
,
ck
.
header_
.
end_time
());
ASSERT_EQ
(
10
,
ck
.
header_
.
raw_size
());
ASSERT_EQ
(
1
,
ck
.
header_
.
message_number
());
ASSERT_FALSE
(
ck
.
empty
());
ck
->
add
(
msg1
);
ASSERT_EQ
(
1e9
,
ck
->
header_
.
begin_time
());
ASSERT_EQ
(
1e9
,
ck
->
header_
.
end_time
());
ASSERT_EQ
(
10
,
ck
->
header_
.
raw_size
());
ASSERT_EQ
(
1
,
ck
->
header_
.
message_number
());
ASSERT_FALSE
(
ck
->
empty
());
SingleMessage
msg2
;
msg2
.
set_channel_name
(
kChan
2
);
msg2
.
set_content
(
kStr
10B
);
msg2
.
set_channel_name
(
CHAN_
2
);
msg2
.
set_content
(
STR_
10B
);
msg2
.
set_time
(
2e9
);
ck
.
add
(
msg2
);
ASSERT_EQ
(
1e9
,
ck
.
header_
.
begin_time
());
ASSERT_EQ
(
2e9
,
ck
.
header_
.
end_time
());
ASSERT_EQ
(
20
,
ck
.
header_
.
raw_size
());
ASSERT_EQ
(
2
,
ck
.
header_
.
message_number
());
ASSERT_FALSE
(
ck
.
empty
());
ck
.
clear
();
ASSERT_EQ
(
0
,
ck
.
header_
.
begin_time
());
ASSERT_EQ
(
0
,
ck
.
header_
.
end_time
());
ASSERT_EQ
(
0
,
ck
.
header_
.
raw_size
());
ASSERT_EQ
(
0
,
ck
.
header_
.
message_number
());
ASSERT_TRUE
(
ck
.
empty
());
ck
->
add
(
msg2
);
ASSERT_EQ
(
1e9
,
ck
->
header_
.
begin_time
());
ASSERT_EQ
(
2e9
,
ck
->
header_
.
end_time
());
ASSERT_EQ
(
20
,
ck
->
header_
.
raw_size
());
ASSERT_EQ
(
2
,
ck
->
header_
.
message_number
());
ASSERT_FALSE
(
ck
->
empty
());
ck
->
clear
();
ASSERT_EQ
(
0
,
ck
->
header_
.
begin_time
());
ASSERT_EQ
(
0
,
ck
->
header_
.
end_time
());
ASSERT_EQ
(
0
,
ck
->
header_
.
raw_size
());
ASSERT_EQ
(
0
,
ck
->
header_
.
message_number
());
ASSERT_TRUE
(
ck
->
empty
());
}
TEST
(
RecordFileTest
,
TestOneMessageFile
)
{
// writer open one message file
RecordFileWriter
rfw
;
ASSERT_TRUE
(
rfw
.
Open
(
kTestFile
1
));
ASSERT_EQ
(
kTestFile1
,
rfw
.
GetPath
());
RecordFileWriter
*
rfw
=
new
RecordFileWriter
()
;
ASSERT_TRUE
(
rfw
->
Open
(
TEST_FILE_
1
));
ASSERT_EQ
(
TEST_FILE_1
,
rfw
->
GetPath
());
// write header section
Header
hdr1
=
HeaderBuilder
::
GetHeaderWithSegmentParams
(
0
,
0
);
hdr1
.
set_chunk_interval
(
0
);
hdr1
.
set_chunk_raw_size
(
0
);
ASSERT_TRUE
(
rfw
.
WriteHeader
(
hdr1
));
ASSERT_FALSE
(
rfw
.
GetHeader
().
is_complete
());
ASSERT_TRUE
(
rfw
->
WriteHeader
(
hdr1
));
ASSERT_FALSE
(
rfw
->
GetHeader
().
is_complete
());
// write channel section
Channel
chan1
;
chan1
.
set_name
(
kChan
1
);
chan1
.
set_message_type
(
kMsgType
);
chan1
.
set_proto_desc
(
kStr
10B
);
ASSERT_TRUE
(
rfw
.
WriteChannel
(
chan1
));
chan1
.
set_name
(
CHAN_
1
);
chan1
.
set_message_type
(
MSG_TYPE
);
chan1
.
set_proto_desc
(
STR_
10B
);
ASSERT_TRUE
(
rfw
->
WriteChannel
(
chan1
));
// write chunk section
SingleMessage
msg1
;
msg1
.
set_channel_name
(
chan1
.
name
());
msg1
.
set_content
(
kStr
10B
);
msg1
.
set_content
(
STR_
10B
);
msg1
.
set_time
(
1e9
);
ASSERT_TRUE
(
rfw
.
WriteMessage
(
msg1
));
ASSERT_EQ
(
1
,
rfw
.
GetMessageNumber
(
chan1
.
name
()));
ASSERT_TRUE
(
rfw
->
WriteMessage
(
msg1
));
ASSERT_EQ
(
1
,
rfw
->
GetMessageNumber
(
chan1
.
name
()));
// writer close one message file
rfw
.
Close
();
ASSERT_TRUE
(
rfw
.
GetHeader
().
is_complete
());
ASSERT_EQ
(
1
,
rfw
.
GetHeader
().
chunk_number
());
ASSERT_EQ
(
1e9
,
rfw
.
GetHeader
().
begin_time
());
ASSERT_EQ
(
1e9
,
rfw
.
GetHeader
().
end_time
());
ASSERT_EQ
(
1
,
rfw
.
GetHeader
().
message_number
());
hdr1
=
rfw
.
GetHeader
();
rfw
->
Close
();
ASSERT_TRUE
(
rfw
->
GetHeader
().
is_complete
());
ASSERT_EQ
(
1
,
rfw
->
GetHeader
().
chunk_number
());
ASSERT_EQ
(
1e9
,
rfw
->
GetHeader
().
begin_time
());
ASSERT_EQ
(
1e9
,
rfw
->
GetHeader
().
end_time
());
ASSERT_EQ
(
1
,
rfw
->
GetHeader
().
message_number
());
hdr1
=
rfw
->
GetHeader
();
delete
rfw
;
// header open one message file
RecordFileReader
rfr
;
ASSERT_TRUE
(
rfr
.
Open
(
kTestFile
1
));
ASSERT_EQ
(
kTestFile1
,
rfr
.
GetPath
());
RecordFileReader
*
rfr
=
new
RecordFileReader
()
;
ASSERT_TRUE
(
rfr
->
Open
(
TEST_FILE_
1
));
ASSERT_EQ
(
TEST_FILE_1
,
rfr
->
GetPath
());
Section
sec
;
// read header section
Header
hdr2
=
rfr
.
GetHeader
();
Header
hdr2
=
rfr
->
GetHeader
();
ASSERT_EQ
(
hdr2
.
chunk_number
(),
hdr1
.
chunk_number
());
ASSERT_EQ
(
hdr2
.
begin_time
(),
hdr1
.
begin_time
());
ASSERT_EQ
(
hdr2
.
end_time
(),
hdr1
.
end_time
());
ASSERT_EQ
(
hdr2
.
message_number
(),
hdr1
.
message_number
());
// read channel section
ASSERT_TRUE
(
rfr
.
ReadSection
(
&
sec
));
ASSERT_TRUE
(
rfr
->
ReadSection
(
&
sec
));
ASSERT_EQ
(
SectionType
::
SECTION_CHANNEL
,
sec
.
type
);
Channel
chan2
;
ASSERT_TRUE
(
rfr
.
ReadSection
<
Channel
>
(
sec
.
size
,
&
chan2
));
ASSERT_TRUE
(
rfr
->
ReadSection
<
Channel
>
(
sec
.
size
,
&
chan2
));
ASSERT_EQ
(
chan2
.
name
(),
chan1
.
name
());
ASSERT_EQ
(
chan2
.
message_type
(),
chan1
.
message_type
());
ASSERT_EQ
(
chan2
.
proto_desc
(),
chan1
.
proto_desc
());
// read chunk header section
ASSERT_TRUE
(
rfr
.
ReadSection
(
&
sec
));
ASSERT_TRUE
(
rfr
->
ReadSection
(
&
sec
));
ASSERT_EQ
(
SectionType
::
SECTION_CHUNK_HEADER
,
sec
.
type
);
ChunkHeader
ckh2
;
ASSERT_TRUE
(
rfr
.
ReadSection
<
ChunkHeader
>
(
sec
.
size
,
&
ckh2
));
ASSERT_TRUE
(
rfr
->
ReadSection
<
ChunkHeader
>
(
sec
.
size
,
&
ckh2
));
ASSERT_EQ
(
ckh2
.
begin_time
(),
1e9
);
ASSERT_EQ
(
ckh2
.
end_time
(),
1e9
);
ASSERT_EQ
(
ckh2
.
raw_size
(),
10
);
ASSERT_EQ
(
ckh2
.
message_number
(),
1
);
// read chunk body section
ASSERT_TRUE
(
rfr
.
ReadSection
(
&
sec
));
ASSERT_TRUE
(
rfr
->
ReadSection
(
&
sec
));
ASSERT_EQ
(
SectionType
::
SECTION_CHUNK_BODY
,
sec
.
type
);
ChunkBody
ckb2
;
ASSERT_TRUE
(
rfr
.
ReadSection
<
ChunkBody
>
(
sec
.
size
,
&
ckb2
));
ASSERT_TRUE
(
rfr
->
ReadSection
<
ChunkBody
>
(
sec
.
size
,
&
ckb2
));
ASSERT_EQ
(
ckb2
.
messages_size
(),
1
);
ASSERT_EQ
(
ckb2
.
messages
(
0
).
channel_name
(),
ckb2
.
messages
(
0
).
channel_name
());
ASSERT_EQ
(
ckb2
.
messages
(
0
).
time
(),
ckb2
.
messages
(
0
).
time
());
ASSERT_EQ
(
ckb2
.
messages
(
0
).
content
(),
ckb2
.
messages
(
0
).
content
());
ASSERT_FALSE
(
remove
(
kTestFile1
));
// close recorder file reader
delete
rfr
;
}
TEST
(
RecordFileTest
,
TestOneChunkFile
)
{
RecordFileWriter
rfw
;
RecordFileWriter
*
rfw
=
new
RecordFileWriter
()
;
ASSERT_TRUE
(
rfw
.
Open
(
kTestFile
1
));
ASSERT_EQ
(
kTestFile1
,
rfw
.
GetPath
());
ASSERT_TRUE
(
rfw
->
Open
(
TEST_FILE_
1
));
ASSERT_EQ
(
TEST_FILE_1
,
rfw
->
GetPath
());
Header
header
=
HeaderBuilder
::
GetHeaderWithChunkParams
(
0
,
0
);
header
.
set_segment_interval
(
0
);
header
.
set_segment_raw_size
(
0
);
ASSERT_TRUE
(
rfw
.
WriteHeader
(
header
));
ASSERT_FALSE
(
rfw
.
GetHeader
().
is_complete
());
ASSERT_TRUE
(
rfw
->
WriteHeader
(
header
));
ASSERT_FALSE
(
rfw
->
GetHeader
().
is_complete
());
Channel
chan1
;
chan1
.
set_name
(
kChan
1
);
chan1
.
set_message_type
(
kMsgType
);
chan1
.
set_proto_desc
(
kStr
10B
);
ASSERT_TRUE
(
rfw
.
WriteChannel
(
chan1
));
chan1
.
set_name
(
CHAN_
1
);
chan1
.
set_message_type
(
MSG_TYPE
);
chan1
.
set_proto_desc
(
STR_
10B
);
ASSERT_TRUE
(
rfw
->
WriteChannel
(
chan1
));
Channel
chan2
;
chan2
.
set_name
(
kChan
2
);
chan2
.
set_message_type
(
kMsgType
);
chan2
.
set_proto_desc
(
kStr
10B
);
ASSERT_TRUE
(
rfw
.
WriteChannel
(
chan2
));
chan2
.
set_name
(
CHAN_
2
);
chan2
.
set_message_type
(
MSG_TYPE
);
chan2
.
set_proto_desc
(
STR_
10B
);
ASSERT_TRUE
(
rfw
->
WriteChannel
(
chan2
));
SingleMessage
msg1
;
msg1
.
set_channel_name
(
chan1
.
name
());
msg1
.
set_content
(
kStr
10B
);
msg1
.
set_content
(
STR_
10B
);
msg1
.
set_time
(
1e9
);
ASSERT_TRUE
(
rfw
.
WriteMessage
(
msg1
));
ASSERT_EQ
(
1
,
rfw
.
GetMessageNumber
(
chan1
.
name
()));
ASSERT_TRUE
(
rfw
->
WriteMessage
(
msg1
));
ASSERT_EQ
(
1
,
rfw
->
GetMessageNumber
(
chan1
.
name
()));
SingleMessage
msg2
;
msg2
.
set_channel_name
(
chan2
.
name
());
msg2
.
set_content
(
kStr
10B
);
msg2
.
set_content
(
STR_
10B
);
msg2
.
set_time
(
2e9
);
ASSERT_TRUE
(
rfw
.
WriteMessage
(
msg2
));
ASSERT_EQ
(
1
,
rfw
.
GetMessageNumber
(
chan2
.
name
()));
ASSERT_TRUE
(
rfw
->
WriteMessage
(
msg2
));
ASSERT_EQ
(
1
,
rfw
->
GetMessageNumber
(
chan2
.
name
()));
SingleMessage
msg3
;
msg3
.
set_channel_name
(
chan1
.
name
());
msg3
.
set_content
(
kStr
10B
);
msg3
.
set_content
(
STR_
10B
);
msg3
.
set_time
(
3e9
);
ASSERT_TRUE
(
rfw
.
WriteMessage
(
msg3
));
ASSERT_EQ
(
2
,
rfw
.
GetMessageNumber
(
chan1
.
name
()));
rfw
.
Close
();
ASSERT_TRUE
(
rfw
.
GetHeader
().
is_complete
());
ASSERT_EQ
(
1
,
rfw
.
GetHeader
().
chunk_number
());
ASSERT_EQ
(
1e9
,
rfw
.
GetHeader
().
begin_time
());
ASSERT_EQ
(
3e9
,
rfw
.
GetHeader
().
end_time
());
ASSERT_EQ
(
3
,
rfw
.
GetHeader
().
message_number
());
ASSERT_FALSE
(
remove
(
kTestFile1
));
ASSERT_TRUE
(
rfw
->
WriteMessage
(
msg3
));
ASSERT_EQ
(
2
,
rfw
->
GetMessageNumber
(
chan1
.
name
()));
rfw
->
Close
();
ASSERT_TRUE
(
rfw
->
GetHeader
().
is_complete
());
ASSERT_EQ
(
1
,
rfw
->
GetHeader
().
chunk_number
());
ASSERT_EQ
(
1e9
,
rfw
->
GetHeader
().
begin_time
());
ASSERT_EQ
(
3e9
,
rfw
->
GetHeader
().
end_time
());
ASSERT_EQ
(
3
,
rfw
->
GetHeader
().
message_number
());
}
TEST
(
RecordFileTest
,
TestIndex
)
{
{
RecordFileWriter
*
rfw
=
new
RecordFileWriter
();
ASSERT_TRUE
(
rfw
->
Open
(
kTestFile
2
));
ASSERT_EQ
(
kTestFile
2
,
rfw
->
GetPath
());
ASSERT_TRUE
(
rfw
->
Open
(
TEST_FILE_
2
));
ASSERT_EQ
(
TEST_FILE_
2
,
rfw
->
GetPath
());
Header
header
=
HeaderBuilder
::
GetHeaderWithChunkParams
(
0
,
0
);
header
.
set_segment_interval
(
0
);
...
...
@@ -227,34 +227,34 @@ TEST(RecordFileTest, TestIndex) {
ASSERT_FALSE
(
rfw
->
GetHeader
().
is_complete
());
Channel
chan1
;
chan1
.
set_name
(
kChan
1
);
chan1
.
set_message_type
(
kMsgType
);
chan1
.
set_proto_desc
(
kStr
10B
);
chan1
.
set_name
(
CHAN_
1
);
chan1
.
set_message_type
(
MSG_TYPE
);
chan1
.
set_proto_desc
(
STR_
10B
);
ASSERT_TRUE
(
rfw
->
WriteChannel
(
chan1
));
Channel
chan2
;
chan2
.
set_name
(
kChan
2
);
chan2
.
set_message_type
(
kMsgType
);
chan2
.
set_proto_desc
(
kStr
10B
);
chan2
.
set_name
(
CHAN_
2
);
chan2
.
set_message_type
(
MSG_TYPE
);
chan2
.
set_proto_desc
(
STR_
10B
);
ASSERT_TRUE
(
rfw
->
WriteChannel
(
chan2
));
SingleMessage
msg1
;
msg1
.
set_channel_name
(
chan1
.
name
());
msg1
.
set_content
(
kStr
10B
);
msg1
.
set_content
(
STR_
10B
);
msg1
.
set_time
(
1e9
);
ASSERT_TRUE
(
rfw
->
WriteMessage
(
msg1
));
ASSERT_EQ
(
1
,
rfw
->
GetMessageNumber
(
chan1
.
name
()));
SingleMessage
msg2
;
msg2
.
set_channel_name
(
chan2
.
name
());
msg2
.
set_content
(
kStr
10B
);
msg2
.
set_content
(
STR_
10B
);
msg2
.
set_time
(
2e9
);
ASSERT_TRUE
(
rfw
->
WriteMessage
(
msg2
));
ASSERT_EQ
(
1
,
rfw
->
GetMessageNumber
(
chan2
.
name
()));
SingleMessage
msg3
;
msg3
.
set_channel_name
(
chan1
.
name
());
msg3
.
set_content
(
kStr
10B
);
msg3
.
set_content
(
STR_
10B
);
msg3
.
set_time
(
3e9
);
ASSERT_TRUE
(
rfw
->
WriteMessage
(
msg3
));
ASSERT_EQ
(
2
,
rfw
->
GetMessageNumber
(
chan1
.
name
()));
...
...
@@ -268,7 +268,7 @@ TEST(RecordFileTest, TestIndex) {
}
{
RecordFileReader
reader
;
reader
.
Open
(
kTestFile
2
);
reader
.
Open
(
TEST_FILE_
2
);
reader
.
ReadIndex
();
const
auto
&
index
=
reader
.
GetIndex
();
...
...
@@ -308,8 +308,5 @@ int main(int argc, char** argv) {
testing
::
InitGoogleTest
(
&
argc
,
argv
);
google
::
InitGoogleLogging
(
argv
[
0
]);
FLAGS_logtostderr
=
true
;
const
int
ret_val
=
RUN_ALL_TESTS
();
google
::
protobuf
::
ShutdownProtobufLibrary
();
google
::
ShutDownCommandLineFlags
();
return
ret_val
;
return
RUN_ALL_TESTS
();
}
cyber/record/file/record_file_writer.cc
浏览文件 @
491d256a
...
...
@@ -91,6 +91,7 @@ void RecordFileWriter::Close() {
if
(
close
(
fd_
)
<
0
)
{
AERROR
<<
"Close file failed, file: "
<<
path_
<<
", fd: "
<<
fd_
<<
", errno: "
<<
errno
;
return
;
}
}
}
...
...
cyber/record/file/record_file_writer.h
浏览文件 @
491d256a
...
...
@@ -123,10 +123,7 @@ bool RecordFileWriter::WriteSection(const T& message) {
AERROR
<<
"Do not support this template typename."
;
return
false
;
}
Section
section
;
/// zero out whole struct even if padded
memset
(
&
section
,
0
,
sizeof
(
section
));
section
=
{
type
,
message
.
ByteSize
()};
Section
section
=
{
type
,
message
.
ByteSize
()};
ssize_t
count
=
write
(
fd_
,
&
section
,
sizeof
(
section
));
if
(
count
<
0
)
{
AERROR
<<
"Write fd failed, fd: "
<<
fd_
<<
", errno: "
<<
errno
;
...
...
@@ -138,10 +135,9 @@ bool RecordFileWriter::WriteSection(const T& message) {
<<
", actual count: "
<<
count
;
return
false
;
}
{
FileOutputStream
raw_output
(
fd_
);
message
.
SerializeToZeroCopyStream
(
&
raw_output
);
}
ZeroCopyOutputStream
*
raw_output
=
new
FileOutputStream
(
fd_
);
message
.
SerializeToZeroCopyStream
(
raw_output
);
delete
raw_output
;
if
(
type
==
SectionType
::
SECTION_HEADER
)
{
static
char
blank
[
HEADER_LENGTH
]
=
{
'0'
};
count
=
write
(
fd_
,
&
blank
,
HEADER_LENGTH
-
message
.
ByteSize
());
...
...
cyber/record/record_reader_test.cc
浏览文件 @
491d256a
...
...
@@ -26,39 +26,44 @@ namespace record {
using
apollo
::
cyber
::
message
::
RawMessage
;
constexpr
char
kChannelName1
[]
=
"/test/channel1"
;
constexpr
char
kMessageType1
[]
=
"apollo.cyber.proto.Test"
;
constexpr
char
kProtoDesc
[]
=
"1234567890"
;
constexpr
char
kStr10B
[]
=
"1234567890"
;
constexpr
char
kTestFile
[]
=
"record_reader_test.record"
;
constexpr
uint32_t
kMessageNum
=
16
;
const
char
CHANNEL_NAME_1
[]
=
"/test/channel1"
;
const
char
CHANNEL_NAME_2
[]
=
"/test/channel2"
;
const
char
MESSAGE_TYPE_1
[]
=
"apollo.cyber.proto.Test"
;
const
char
MESSAGE_TYPE_2
[]
=
"apollo.cyber.proto.Channel"
;
const
char
PROTO_DESC
[]
=
"1234567890"
;
const
char
STR_10B
[]
=
"1234567890"
;
const
char
TEST_FILE
[]
=
"test.record"
;
const
uint64_t
TIME_1
=
1000
*
1e6
;
const
uint64_t
TIME_2
=
1010
*
1e6
;
const
uint64_t
TIME_3
=
1020
*
1e6
;
const
uint32_t
MESSAGE_NUM
=
16
;
TEST
(
RecordTest
,
TestSingleRecordFile
)
{
RecordWriter
writer
;
writer
.
SetSizeOfFileSegmentation
(
0
);
writer
.
SetIntervalOfFileSegmentation
(
0
);
writer
.
Open
(
kTestFile
);
writer
.
WriteChannel
(
kChannelName1
,
kMessageType1
,
kProtoDesc
);
for
(
uint32_t
i
=
0
;
i
<
kMessageNum
;
++
i
)
{
writer
.
Open
(
TEST_FILE
);
writer
.
WriteChannel
(
CHANNEL_NAME_1
,
MESSAGE_TYPE_1
,
PROTO_DESC
);
for
(
uint32_t
i
=
0
;
i
<
MESSAGE_NUM
;
++
i
)
{
auto
msg
=
std
::
make_shared
<
RawMessage
>
(
std
::
to_string
(
i
));
writer
.
WriteMessage
(
kChannelName
1
,
msg
,
i
);
writer
.
WriteMessage
(
CHANNEL_NAME_
1
,
msg
,
i
);
}
ASSERT_EQ
(
kMessageNum
,
writer
.
GetMessageNumber
(
kChannelName
1
));
ASSERT_EQ
(
kMessageType1
,
writer
.
GetMessageType
(
kChannelName
1
));
ASSERT_EQ
(
kProtoDesc
,
writer
.
GetProtoDesc
(
kChannelName
1
));
ASSERT_EQ
(
MESSAGE_NUM
,
writer
.
GetMessageNumber
(
CHANNEL_NAME_
1
));
ASSERT_EQ
(
MESSAGE_TYPE_1
,
writer
.
GetMessageType
(
CHANNEL_NAME_
1
));
ASSERT_EQ
(
PROTO_DESC
,
writer
.
GetProtoDesc
(
CHANNEL_NAME_
1
));
writer
.
Close
();
RecordReader
reader
(
kTestFile
);
RecordReader
reader
(
TEST_FILE
);
RecordMessage
message
;
ASSERT_EQ
(
kMessageNum
,
reader
.
GetMessageNumber
(
kChannelName
1
));
ASSERT_EQ
(
kMessageType1
,
reader
.
GetMessageType
(
kChannelName
1
));
ASSERT_EQ
(
kProtoDesc
,
reader
.
GetProtoDesc
(
kChannelName
1
));
ASSERT_EQ
(
MESSAGE_NUM
,
reader
.
GetMessageNumber
(
CHANNEL_NAME_
1
));
ASSERT_EQ
(
MESSAGE_TYPE_1
,
reader
.
GetMessageType
(
CHANNEL_NAME_
1
));
ASSERT_EQ
(
PROTO_DESC
,
reader
.
GetProtoDesc
(
CHANNEL_NAME_
1
));
// read all message
uint32_t
i
=
0
;
for
(
i
=
0
;
i
<
kMessageNum
;
++
i
)
{
for
(
i
=
0
;
i
<
MESSAGE_NUM
;
++
i
)
{
ASSERT_TRUE
(
reader
.
ReadMessage
(
&
message
));
ASSERT_EQ
(
kChannelName
1
,
message
.
channel_name
);
ASSERT_EQ
(
CHANNEL_NAME_
1
,
message
.
channel_name
);
ASSERT_EQ
(
std
::
to_string
(
i
),
message
.
content
);
ASSERT_EQ
(
i
,
message
.
time
);
}
...
...
@@ -66,9 +71,9 @@ TEST(RecordTest, TestSingleRecordFile) {
// skip first message
reader
.
Reset
();
for
(
i
=
0
;
i
<
kMessageNum
-
1
;
++
i
)
{
for
(
i
=
0
;
i
<
MESSAGE_NUM
-
1
;
++
i
)
{
ASSERT_TRUE
(
reader
.
ReadMessage
(
&
message
,
1
));
ASSERT_EQ
(
kChannelName
1
,
message
.
channel_name
);
ASSERT_EQ
(
CHANNEL_NAME_
1
,
message
.
channel_name
);
ASSERT_EQ
(
std
::
to_string
(
i
+
1
),
message
.
content
);
ASSERT_EQ
(
i
+
1
,
message
.
time
);
}
...
...
@@ -76,14 +81,13 @@ TEST(RecordTest, TestSingleRecordFile) {
// skip last message
reader
.
Reset
();
for
(
i
=
0
;
i
<
kMessageNum
-
1
;
++
i
)
{
ASSERT_TRUE
(
reader
.
ReadMessage
(
&
message
,
0
,
kMessageNum
-
2
));
ASSERT_EQ
(
kChannelName
1
,
message
.
channel_name
);
for
(
i
=
0
;
i
<
MESSAGE_NUM
-
1
;
++
i
)
{
ASSERT_TRUE
(
reader
.
ReadMessage
(
&
message
,
0
,
MESSAGE_NUM
-
2
));
ASSERT_EQ
(
CHANNEL_NAME_
1
,
message
.
channel_name
);
ASSERT_EQ
(
std
::
to_string
(
i
),
message
.
content
);
ASSERT_EQ
(
i
,
message
.
time
);
}
ASSERT_FALSE
(
reader
.
ReadMessage
(
&
message
,
0
,
kMessageNum
-
2
));
ASSERT_FALSE
(
remove
(
kTestFile
));
ASSERT_FALSE
(
reader
.
ReadMessage
(
&
message
,
0
,
MESSAGE_NUM
-
2
));
}
}
// namespace record
...
...
cyber/record/record_viewer_test.cc
浏览文件 @
491d256a
...
...
@@ -33,23 +33,25 @@ namespace record {
using
apollo
::
cyber
::
message
::
RawMessage
;
constexpr
char
kChannelName1
[]
=
"/test/channel1"
;
constexpr
char
kMessageType1
[]
=
"apollo.cyber.proto.Test"
;
constexpr
char
kProtoDesc1
[]
=
"1234567890"
;
constexpr
char
kTestFile
[]
=
"viewer_test.record"
;
const
char
CHANNEL_NAME_1
[]
=
"/test/channel1"
;
const
char
CHANNEL_NAME_2
[]
=
"/test/channel2"
;
const
char
MESSAGE_TYPE_1
[]
=
"apollo.cyber.proto.Test"
;
const
char
MESSAGE_TYPE_2
[]
=
"apollo.cyber.proto.Channel"
;
const
char
PROTO_DESC
[]
=
"1234567890"
;
const
char
TEST_FILE
[]
=
"viewer_test.record"
;
void
ConstructRecord
(
uint64_t
msg_num
,
uint64_t
begin_time
,
uint64_t
time_step
)
{
RecordWriter
writer
;
writer
.
SetSizeOfFileSegmentation
(
0
);
writer
.
SetIntervalOfFileSegmentation
(
0
);
writer
.
Open
(
kTestFile
);
writer
.
WriteChannel
(
kChannelName1
,
kMessageType1
,
kProtoDesc1
);
writer
.
Open
(
TEST_FILE
);
writer
.
WriteChannel
(
CHANNEL_NAME_1
,
MESSAGE_TYPE_1
,
PROTO_DESC
);
for
(
uint64_t
i
=
0
;
i
<
msg_num
;
i
++
)
{
auto
msg
=
std
::
make_shared
<
RawMessage
>
(
std
::
to_string
(
i
));
writer
.
WriteMessage
(
kChannelName
1
,
msg
,
begin_time
+
time_step
*
i
);
writer
.
WriteMessage
(
CHANNEL_NAME_
1
,
msg
,
begin_time
+
time_step
*
i
);
}
ASSERT_EQ
(
msg_num
,
writer
.
GetMessageNumber
(
kChannelName
1
));
ASSERT_EQ
(
msg_num
,
writer
.
GetMessageNumber
(
CHANNEL_NAME_
1
));
writer
.
Close
();
}
...
...
@@ -70,7 +72,7 @@ TEST(RecordTest, iterator_test) {
uint64_t
end_time
=
begin_time
+
step_time
*
(
msg_num
-
1
);
ConstructRecord
(
msg_num
,
begin_time
,
step_time
);
auto
reader
=
std
::
make_shared
<
RecordReader
>
(
kTestFile
);
auto
reader
=
std
::
make_shared
<
RecordReader
>
(
TEST_FILE
);
RecordViewer
viewer
(
reader
);
EXPECT_TRUE
(
viewer
.
IsValid
());
EXPECT_EQ
(
begin_time
,
viewer
.
begin_time
());
...
...
@@ -78,7 +80,7 @@ TEST(RecordTest, iterator_test) {
uint64_t
i
=
0
;
for
(
auto
&
msg
:
viewer
)
{
EXPECT_EQ
(
kChannelName
1
,
msg
.
channel_name
);
EXPECT_EQ
(
CHANNEL_NAME_
1
,
msg
.
channel_name
);
EXPECT_EQ
(
begin_time
+
step_time
*
i
,
msg
.
time
);
EXPECT_EQ
(
std
::
to_string
(
i
),
msg
.
content
);
i
++
;
...
...
@@ -87,7 +89,7 @@ TEST(RecordTest, iterator_test) {
i
=
0
;
std
::
for_each
(
viewer
.
begin
(),
viewer
.
end
(),
[
&
i
](
RecordMessage
&
msg
)
{
EXPECT_EQ
(
kChannelName
1
,
msg
.
channel_name
);
EXPECT_EQ
(
CHANNEL_NAME_
1
,
msg
.
channel_name
);
// EXPECT_EQ(begin_time + step_time * i, msg.time);
EXPECT_EQ
(
std
::
to_string
(
i
),
msg
.
content
);
i
++
;
...
...
@@ -96,13 +98,12 @@ TEST(RecordTest, iterator_test) {
i
=
0
;
for
(
auto
it
=
viewer
.
begin
();
it
!=
viewer
.
end
();
++
it
)
{
EXPECT_EQ
(
kChannelName
1
,
it
->
channel_name
);
EXPECT_EQ
(
CHANNEL_NAME_
1
,
it
->
channel_name
);
EXPECT_EQ
(
begin_time
+
step_time
*
i
,
it
->
time
);
EXPECT_EQ
(
std
::
to_string
(
i
),
it
->
content
);
i
++
;
}
EXPECT_EQ
(
msg_num
,
i
);
ASSERT_FALSE
(
remove
(
kTestFile
));
}
TEST
(
RecordTest
,
filter_test
)
{
...
...
@@ -112,7 +113,7 @@ TEST(RecordTest, filter_test) {
uint64_t
end_time
=
begin_time
+
step_time
*
(
msg_num
-
1
);
ConstructRecord
(
msg_num
,
begin_time
,
step_time
);
auto
reader
=
std
::
make_shared
<
RecordReader
>
(
kTestFile
);
auto
reader
=
std
::
make_shared
<
RecordReader
>
(
TEST_FILE
);
RecordViewer
viewer_0
(
reader
);
EXPECT_EQ
(
CheckCount
(
viewer_0
),
msg_num
);
EXPECT_EQ
(
begin_time
,
viewer_0
.
begin_time
());
...
...
@@ -151,9 +152,8 @@ TEST(RecordTest, filter_test) {
EXPECT_EQ
(
CheckCount
(
viewer_6
),
0
);
// filter with exist channel
RecordViewer
viewer_7
(
reader
,
0
,
end_time
,
{
kChannelName
1
});
RecordViewer
viewer_7
(
reader
,
0
,
end_time
,
{
CHANNEL_NAME_
1
});
EXPECT_EQ
(
CheckCount
(
viewer_7
),
msg_num
);
ASSERT_FALSE
(
remove
(
kTestFile
));
}
TEST
(
RecordTest
,
mult_iterator_test
)
{
...
...
@@ -163,7 +163,7 @@ TEST(RecordTest, mult_iterator_test) {
uint64_t
end_time
=
begin_time
+
step_time
*
(
msg_num
-
1
);
ConstructRecord
(
msg_num
,
begin_time
,
step_time
);
auto
reader
=
std
::
make_shared
<
RecordReader
>
(
kTestFile
);
auto
reader
=
std
::
make_shared
<
RecordReader
>
(
TEST_FILE
);
RecordViewer
viewer
(
reader
);
EXPECT_TRUE
(
viewer
.
IsValid
());
EXPECT_EQ
(
begin_time
,
viewer
.
begin_time
());
...
...
@@ -173,13 +173,12 @@ TEST(RecordTest, mult_iterator_test) {
uint64_t
i
=
0
;
for
(
auto
&
msg
:
viewer
)
{
// #2 iterator
EXPECT_EQ
(
kChannelName
1
,
msg
.
channel_name
);
EXPECT_EQ
(
CHANNEL_NAME_
1
,
msg
.
channel_name
);
EXPECT_EQ
(
begin_time
+
step_time
*
i
,
msg
.
time
);
EXPECT_EQ
(
std
::
to_string
(
i
),
msg
.
content
);
i
++
;
}
EXPECT_EQ
(
msg_num
,
i
);
ASSERT_FALSE
(
remove
(
kTestFile
));
}
}
// namespace record
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录