提交 16e962c7 编写于 作者: F Fla3inH0tCheet0s 提交者: Xiangquan Xiao

record file cleanup

上级 67bf026a
......@@ -20,6 +20,8 @@
# Utils
#=================================================
DISABLED_CYBER_MODULES="except //cyber/record:record_file_integration_test"
function source_apollo_base() {
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
cd "${DIR}"
......@@ -92,7 +94,7 @@ function check_esd_files() {
}
function generate_build_targets() {
COMMON_TARGETS="//cyber/... union //modules/common/kv_db/... union //modules/dreamview/..."
COMMON_TARGETS="//cyber/... union //modules/common/kv_db/... union //modules/dreamview/... $DISABLED_CYBER_MODULES"
case $BUILD_FILTER in
cyber)
BUILD_TARGETS=`bazel query //cyber/...`
......@@ -118,8 +120,7 @@ function generate_build_targets() {
*)
# BUILD_TARGETS=`bazel query //modules/... union //cyber/...`
# FIXME(all): temporarily disable modules doesn't compile in 18.04
BUILD_TARGETS=`bazel query //modules/... union //cyber/... except //modules/tools/visualizer/... except //modules/data/tools/rosbag_to_record/... except //modules/v2x/... except //modules/map/tools/map_datachecker/... `
BUILD_TARGETS=`bazel query //modules/... union //cyber/... except //modules/tools/visualizer/... except //modules/data/tools/rosbag_to_record/... except //modules/v2x/... except //modules/map/tools/map_datachecker/... $DISABLE_CYBER_MODULES`
esac
if [ $? -ne 0 ]; then
......@@ -539,7 +540,7 @@ function citest_extended() {
source cyber/setup.bash
BUILD_TARGETS="
`bazel query //modules/planning/... union //modules/common/... union //cyber/...`
`bazel query //modules/planning/... union //modules/common/... union //cyber/... $DISABLED_CYBER_MODULES`
`bazel query //modules/prediction/... union //modules/control/...`
"
......
......@@ -63,6 +63,22 @@ cc_test(
],
)
cc_test(
name = "record_file_integration_test",
size = "small",
srcs = ["file/record_file_integration_test.cc"],
tags = [
"cpu:3",
"exclusive",
"manual",
],
deps = [
"//cyber",
"//cyber/proto:record_cc_proto",
"@gtest//:main",
],
)
cc_library(
name = "header_builder",
srcs = ["header_builder.cc"],
......
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#include <gtest/gtest.h>
#include <atomic>
#include <chrono>
#include <thread>
#include "cyber/record/file/record_file_base.h"
#include "cyber/record/file/record_file_reader.h"
#include "cyber/record/file/record_file_writer.h"
#include "cyber/record/header_builder.h"
namespace apollo {
namespace cyber {
namespace record {
class CpuSchedulerLatency {
public:
CpuSchedulerLatency() : periodic_thread_([this] { this->Callback(); }) {}
~CpuSchedulerLatency() {
running_ = false;
periodic_thread_.join();
}
std::chrono::nanoseconds GetMaxJitter() {
return std::chrono::nanoseconds(max_jitter_);
}
int64_t GetNumSamples() { return samples_; }
private:
void Callback() {
static constexpr std::chrono::milliseconds kSleepDuration(10);
auto prev_time = std::chrono::steady_clock::now();
std::this_thread::sleep_for(kSleepDuration);
while (running_) {
const auto current_time = std::chrono::steady_clock::now();
const auto time_since_sleep = current_time - prev_time;
const auto current_jitter =
std::abs((time_since_sleep - kSleepDuration).count());
prev_time = current_time;
max_jitter_ = std::max(current_jitter, max_jitter_);
++samples_;
std::this_thread::sleep_for(kSleepDuration);
}
}
std::atomic<bool> running_{true};
int64_t max_jitter_ = 0;
int64_t samples_ = 0;
std::thread periodic_thread_;
};
const char kTestFile[] = "integration_test.record";
TEST(RecordFileTest, SmallMessageHighThroughputOKThreadJitter) {
CpuSchedulerLatency cpu_jitter;
RecordFileWriter rfw;
ASSERT_TRUE(rfw.Open(kTestFile));
Header hdr1 = HeaderBuilder::GetHeaderWithSegmentParams(0, 0);
hdr1.set_chunk_interval(1000);
hdr1.set_chunk_raw_size(0);
ASSERT_TRUE(rfw.WriteHeader(hdr1));
ASSERT_FALSE(rfw.GetHeader().is_complete());
// write chunk section
static const std::string kChannelName = "small_message";
static constexpr int kMaxIterations = 1000000000;
static constexpr int64_t kMaxSamples = 1000;
for (int i = 0;
i < kMaxIterations && cpu_jitter.GetNumSamples() < kMaxSamples; ++i) {
SingleMessage msg1;
msg1.set_channel_name(kChannelName);
msg1.set_content("0123456789");
msg1.set_time(i);
ASSERT_TRUE(rfw.WriteMessage(msg1));
ASSERT_EQ(i + 1, rfw.GetMessageNumber(kChannelName));
}
EXPECT_GE(cpu_jitter.GetNumSamples(), kMaxSamples)
<< "This system may be to fast. Consider increasing kMaxIterations";
static constexpr int64_t kMaxJitterMS = 20;
const int64_t max_cpu_jitter_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(
cpu_jitter.GetMaxJitter())
.count();
EXPECT_LT(max_cpu_jitter_ms, kMaxJitterMS);
ASSERT_FALSE(remove(kTestFile));
}
} // namespace record
} // namespace cyber
} // namespace apollo
int main(int argc, char** argv) {
testing::GTEST_FLAG(catch_exceptions) = 1;
testing::InitGoogleTest(&argc, argv);
google::InitGoogleLogging(argv[0]);
FLAGS_logtostderr = true;
const int ret_val = RUN_ALL_TESTS();
google::protobuf::ShutdownProtobufLibrary();
google::ShutDownCommandLineFlags();
return ret_val;
}
......@@ -28,197 +28,197 @@ namespace apollo {
namespace cyber {
namespace record {
const char CHAN_1[] = "/test1";
const char CHAN_2[] = "/test2";
const char MSG_TYPE[] = "apollo.cyber.proto.Test";
const char STR_10B[] = "1234567890";
const char TEST_FILE_1[] = "test_1.record";
const char TEST_FILE_2[] = "test_2.record";
constexpr char kChan1[] = "/test1";
constexpr char kChan2[] = "/test2";
constexpr char kMsgType[] = "apollo.cyber.proto.Test";
constexpr char kStr10B[] = "1234567890";
constexpr char kTestFile1[] = "record_file_test_1.record";
constexpr char kTestFile2[] = "record_file_test_2.record";
TEST(ChunkTest, TestAll) {
Chunk* ck = new Chunk();
ASSERT_EQ(0, ck->header_.begin_time());
ASSERT_EQ(0, ck->header_.end_time());
ASSERT_EQ(0, ck->header_.raw_size());
ASSERT_EQ(0, ck->header_.message_number());
ASSERT_TRUE(ck->empty());
Chunk ck;
ASSERT_EQ(0, ck.header_.begin_time());
ASSERT_EQ(0, ck.header_.end_time());
ASSERT_EQ(0, ck.header_.raw_size());
ASSERT_EQ(0, ck.header_.message_number());
ASSERT_TRUE(ck.empty());
SingleMessage msg1;
msg1.set_channel_name(CHAN_1);
msg1.set_content(STR_10B);
msg1.set_channel_name(kChan1);
msg1.set_content(kStr10B);
msg1.set_time(1e9);
ck->add(msg1);
ASSERT_EQ(1e9, ck->header_.begin_time());
ASSERT_EQ(1e9, ck->header_.end_time());
ASSERT_EQ(10, ck->header_.raw_size());
ASSERT_EQ(1, ck->header_.message_number());
ASSERT_FALSE(ck->empty());
ck.add(msg1);
ASSERT_EQ(1e9, ck.header_.begin_time());
ASSERT_EQ(1e9, ck.header_.end_time());
ASSERT_EQ(10, ck.header_.raw_size());
ASSERT_EQ(1, ck.header_.message_number());
ASSERT_FALSE(ck.empty());
SingleMessage msg2;
msg2.set_channel_name(CHAN_2);
msg2.set_content(STR_10B);
msg2.set_channel_name(kChan2);
msg2.set_content(kStr10B);
msg2.set_time(2e9);
ck->add(msg2);
ASSERT_EQ(1e9, ck->header_.begin_time());
ASSERT_EQ(2e9, ck->header_.end_time());
ASSERT_EQ(20, ck->header_.raw_size());
ASSERT_EQ(2, ck->header_.message_number());
ASSERT_FALSE(ck->empty());
ck->clear();
ASSERT_EQ(0, ck->header_.begin_time());
ASSERT_EQ(0, ck->header_.end_time());
ASSERT_EQ(0, ck->header_.raw_size());
ASSERT_EQ(0, ck->header_.message_number());
ASSERT_TRUE(ck->empty());
ck.add(msg2);
ASSERT_EQ(1e9, ck.header_.begin_time());
ASSERT_EQ(2e9, ck.header_.end_time());
ASSERT_EQ(20, ck.header_.raw_size());
ASSERT_EQ(2, ck.header_.message_number());
ASSERT_FALSE(ck.empty());
ck.clear();
ASSERT_EQ(0, ck.header_.begin_time());
ASSERT_EQ(0, ck.header_.end_time());
ASSERT_EQ(0, ck.header_.raw_size());
ASSERT_EQ(0, ck.header_.message_number());
ASSERT_TRUE(ck.empty());
}
TEST(RecordFileTest, TestOneMessageFile) {
// writer open one message file
RecordFileWriter* rfw = new RecordFileWriter();
ASSERT_TRUE(rfw->Open(TEST_FILE_1));
ASSERT_EQ(TEST_FILE_1, rfw->GetPath());
RecordFileWriter rfw;
ASSERT_TRUE(rfw.Open(kTestFile1));
ASSERT_EQ(kTestFile1, rfw.GetPath());
// write header section
Header hdr1 = HeaderBuilder::GetHeaderWithSegmentParams(0, 0);
hdr1.set_chunk_interval(0);
hdr1.set_chunk_raw_size(0);
ASSERT_TRUE(rfw->WriteHeader(hdr1));
ASSERT_FALSE(rfw->GetHeader().is_complete());
ASSERT_TRUE(rfw.WriteHeader(hdr1));
ASSERT_FALSE(rfw.GetHeader().is_complete());
// write channel section
Channel chan1;
chan1.set_name(CHAN_1);
chan1.set_message_type(MSG_TYPE);
chan1.set_proto_desc(STR_10B);
ASSERT_TRUE(rfw->WriteChannel(chan1));
chan1.set_name(kChan1);
chan1.set_message_type(kMsgType);
chan1.set_proto_desc(kStr10B);
ASSERT_TRUE(rfw.WriteChannel(chan1));
// write chunk section
SingleMessage msg1;
msg1.set_channel_name(chan1.name());
msg1.set_content(STR_10B);
msg1.set_content(kStr10B);
msg1.set_time(1e9);
ASSERT_TRUE(rfw->WriteMessage(msg1));
ASSERT_EQ(1, rfw->GetMessageNumber(chan1.name()));
ASSERT_TRUE(rfw.WriteMessage(msg1));
ASSERT_EQ(1, rfw.GetMessageNumber(chan1.name()));
// writer close one message file
rfw->Close();
ASSERT_TRUE(rfw->GetHeader().is_complete());
ASSERT_EQ(1, rfw->GetHeader().chunk_number());
ASSERT_EQ(1e9, rfw->GetHeader().begin_time());
ASSERT_EQ(1e9, rfw->GetHeader().end_time());
ASSERT_EQ(1, rfw->GetHeader().message_number());
hdr1 = rfw->GetHeader();
delete rfw;
rfw.Close();
ASSERT_TRUE(rfw.GetHeader().is_complete());
ASSERT_EQ(1, rfw.GetHeader().chunk_number());
ASSERT_EQ(1e9, rfw.GetHeader().begin_time());
ASSERT_EQ(1e9, rfw.GetHeader().end_time());
ASSERT_EQ(1, rfw.GetHeader().message_number());
hdr1 = rfw.GetHeader();
// header open one message file
RecordFileReader* rfr = new RecordFileReader();
ASSERT_TRUE(rfr->Open(TEST_FILE_1));
ASSERT_EQ(TEST_FILE_1, rfr->GetPath());
RecordFileReader rfr;
ASSERT_TRUE(rfr.Open(kTestFile1));
ASSERT_EQ(kTestFile1, rfr.GetPath());
Section sec;
// read header section
Header hdr2 = rfr->GetHeader();
Header hdr2 = rfr.GetHeader();
ASSERT_EQ(hdr2.chunk_number(), hdr1.chunk_number());
ASSERT_EQ(hdr2.begin_time(), hdr1.begin_time());
ASSERT_EQ(hdr2.end_time(), hdr1.end_time());
ASSERT_EQ(hdr2.message_number(), hdr1.message_number());
// read channel section
ASSERT_TRUE(rfr->ReadSection(&sec));
ASSERT_TRUE(rfr.ReadSection(&sec));
ASSERT_EQ(SectionType::SECTION_CHANNEL, sec.type);
Channel chan2;
ASSERT_TRUE(rfr->ReadSection<Channel>(sec.size, &chan2));
ASSERT_TRUE(rfr.ReadSection<Channel>(sec.size, &chan2));
ASSERT_EQ(chan2.name(), chan1.name());
ASSERT_EQ(chan2.message_type(), chan1.message_type());
ASSERT_EQ(chan2.proto_desc(), chan1.proto_desc());
// read chunk header section
ASSERT_TRUE(rfr->ReadSection(&sec));
ASSERT_TRUE(rfr.ReadSection(&sec));
ASSERT_EQ(SectionType::SECTION_CHUNK_HEADER, sec.type);
ChunkHeader ckh2;
ASSERT_TRUE(rfr->ReadSection<ChunkHeader>(sec.size, &ckh2));
ASSERT_TRUE(rfr.ReadSection<ChunkHeader>(sec.size, &ckh2));
ASSERT_EQ(ckh2.begin_time(), 1e9);
ASSERT_EQ(ckh2.end_time(), 1e9);
ASSERT_EQ(ckh2.raw_size(), 10);
ASSERT_EQ(ckh2.message_number(), 1);
// read chunk body section
ASSERT_TRUE(rfr->ReadSection(&sec));
ASSERT_TRUE(rfr.ReadSection(&sec));
ASSERT_EQ(SectionType::SECTION_CHUNK_BODY, sec.type);
ChunkBody ckb2;
ASSERT_TRUE(rfr->ReadSection<ChunkBody>(sec.size, &ckb2));
ASSERT_TRUE(rfr.ReadSection<ChunkBody>(sec.size, &ckb2));
ASSERT_EQ(ckb2.messages_size(), 1);
ASSERT_EQ(ckb2.messages(0).channel_name(), ckb2.messages(0).channel_name());
ASSERT_EQ(ckb2.messages(0).time(), ckb2.messages(0).time());
ASSERT_EQ(ckb2.messages(0).content(), ckb2.messages(0).content());
// close recorder file reader
delete rfr;
ASSERT_FALSE(remove(kTestFile1));
}
TEST(RecordFileTest, TestOneChunkFile) {
RecordFileWriter* rfw = new RecordFileWriter();
RecordFileWriter rfw;
ASSERT_TRUE(rfw->Open(TEST_FILE_1));
ASSERT_EQ(TEST_FILE_1, rfw->GetPath());
ASSERT_TRUE(rfw.Open(kTestFile1));
ASSERT_EQ(kTestFile1, rfw.GetPath());
Header header = HeaderBuilder::GetHeaderWithChunkParams(0, 0);
header.set_segment_interval(0);
header.set_segment_raw_size(0);
ASSERT_TRUE(rfw->WriteHeader(header));
ASSERT_FALSE(rfw->GetHeader().is_complete());
ASSERT_TRUE(rfw.WriteHeader(header));
ASSERT_FALSE(rfw.GetHeader().is_complete());
Channel chan1;
chan1.set_name(CHAN_1);
chan1.set_message_type(MSG_TYPE);
chan1.set_proto_desc(STR_10B);
ASSERT_TRUE(rfw->WriteChannel(chan1));
chan1.set_name(kChan1);
chan1.set_message_type(kMsgType);
chan1.set_proto_desc(kStr10B);
ASSERT_TRUE(rfw.WriteChannel(chan1));
Channel chan2;
chan2.set_name(CHAN_2);
chan2.set_message_type(MSG_TYPE);
chan2.set_proto_desc(STR_10B);
ASSERT_TRUE(rfw->WriteChannel(chan2));
chan2.set_name(kChan2);
chan2.set_message_type(kMsgType);
chan2.set_proto_desc(kStr10B);
ASSERT_TRUE(rfw.WriteChannel(chan2));
SingleMessage msg1;
msg1.set_channel_name(chan1.name());
msg1.set_content(STR_10B);
msg1.set_content(kStr10B);
msg1.set_time(1e9);
ASSERT_TRUE(rfw->WriteMessage(msg1));
ASSERT_EQ(1, rfw->GetMessageNumber(chan1.name()));
ASSERT_TRUE(rfw.WriteMessage(msg1));
ASSERT_EQ(1, rfw.GetMessageNumber(chan1.name()));
SingleMessage msg2;
msg2.set_channel_name(chan2.name());
msg2.set_content(STR_10B);
msg2.set_content(kStr10B);
msg2.set_time(2e9);
ASSERT_TRUE(rfw->WriteMessage(msg2));
ASSERT_EQ(1, rfw->GetMessageNumber(chan2.name()));
ASSERT_TRUE(rfw.WriteMessage(msg2));
ASSERT_EQ(1, rfw.GetMessageNumber(chan2.name()));
SingleMessage msg3;
msg3.set_channel_name(chan1.name());
msg3.set_content(STR_10B);
msg3.set_content(kStr10B);
msg3.set_time(3e9);
ASSERT_TRUE(rfw->WriteMessage(msg3));
ASSERT_EQ(2, rfw->GetMessageNumber(chan1.name()));
rfw->Close();
ASSERT_TRUE(rfw->GetHeader().is_complete());
ASSERT_EQ(1, rfw->GetHeader().chunk_number());
ASSERT_EQ(1e9, rfw->GetHeader().begin_time());
ASSERT_EQ(3e9, rfw->GetHeader().end_time());
ASSERT_EQ(3, rfw->GetHeader().message_number());
ASSERT_TRUE(rfw.WriteMessage(msg3));
ASSERT_EQ(2, rfw.GetMessageNumber(chan1.name()));
rfw.Close();
ASSERT_TRUE(rfw.GetHeader().is_complete());
ASSERT_EQ(1, rfw.GetHeader().chunk_number());
ASSERT_EQ(1e9, rfw.GetHeader().begin_time());
ASSERT_EQ(3e9, rfw.GetHeader().end_time());
ASSERT_EQ(3, rfw.GetHeader().message_number());
ASSERT_FALSE(remove(kTestFile1));
}
TEST(RecordFileTest, TestIndex) {
{
RecordFileWriter* rfw = new RecordFileWriter();
ASSERT_TRUE(rfw->Open(TEST_FILE_2));
ASSERT_EQ(TEST_FILE_2, rfw->GetPath());
ASSERT_TRUE(rfw->Open(kTestFile2));
ASSERT_EQ(kTestFile2, rfw->GetPath());
Header header = HeaderBuilder::GetHeaderWithChunkParams(0, 0);
header.set_segment_interval(0);
......@@ -227,34 +227,34 @@ TEST(RecordFileTest, TestIndex) {
ASSERT_FALSE(rfw->GetHeader().is_complete());
Channel chan1;
chan1.set_name(CHAN_1);
chan1.set_message_type(MSG_TYPE);
chan1.set_proto_desc(STR_10B);
chan1.set_name(kChan1);
chan1.set_message_type(kMsgType);
chan1.set_proto_desc(kStr10B);
ASSERT_TRUE(rfw->WriteChannel(chan1));
Channel chan2;
chan2.set_name(CHAN_2);
chan2.set_message_type(MSG_TYPE);
chan2.set_proto_desc(STR_10B);
chan2.set_name(kChan2);
chan2.set_message_type(kMsgType);
chan2.set_proto_desc(kStr10B);
ASSERT_TRUE(rfw->WriteChannel(chan2));
SingleMessage msg1;
msg1.set_channel_name(chan1.name());
msg1.set_content(STR_10B);
msg1.set_content(kStr10B);
msg1.set_time(1e9);
ASSERT_TRUE(rfw->WriteMessage(msg1));
ASSERT_EQ(1, rfw->GetMessageNumber(chan1.name()));
SingleMessage msg2;
msg2.set_channel_name(chan2.name());
msg2.set_content(STR_10B);
msg2.set_content(kStr10B);
msg2.set_time(2e9);
ASSERT_TRUE(rfw->WriteMessage(msg2));
ASSERT_EQ(1, rfw->GetMessageNumber(chan2.name()));
SingleMessage msg3;
msg3.set_channel_name(chan1.name());
msg3.set_content(STR_10B);
msg3.set_content(kStr10B);
msg3.set_time(3e9);
ASSERT_TRUE(rfw->WriteMessage(msg3));
ASSERT_EQ(2, rfw->GetMessageNumber(chan1.name()));
......@@ -268,7 +268,7 @@ TEST(RecordFileTest, TestIndex) {
}
{
RecordFileReader reader;
reader.Open(TEST_FILE_2);
reader.Open(kTestFile2);
reader.ReadIndex();
const auto& index = reader.GetIndex();
......@@ -308,5 +308,8 @@ int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
google::InitGoogleLogging(argv[0]);
FLAGS_logtostderr = true;
return RUN_ALL_TESTS();
const int ret_val = RUN_ALL_TESTS();
google::protobuf::ShutdownProtobufLibrary();
google::ShutDownCommandLineFlags();
return ret_val;
}
......@@ -91,7 +91,6 @@ void RecordFileWriter::Close() {
if (close(fd_) < 0) {
AERROR << "Close file failed, file: " << path_ << ", fd: " << fd_
<< ", errno: " << errno;
return;
}
}
}
......
......@@ -123,7 +123,10 @@ bool RecordFileWriter::WriteSection(const T& message) {
AERROR << "Do not support this template typename.";
return false;
}
Section section = {type, message.ByteSize()};
Section section;
/// zero out whole struct even if padded
memset(&section, 0, sizeof(section));
section = {type, message.ByteSize()};
ssize_t count = write(fd_, &section, sizeof(section));
if (count < 0) {
AERROR << "Write fd failed, fd: " << fd_ << ", errno: " << errno;
......@@ -135,9 +138,10 @@ bool RecordFileWriter::WriteSection(const T& message) {
<< ", actual count: " << count;
return false;
}
ZeroCopyOutputStream* raw_output = new FileOutputStream(fd_);
message.SerializeToZeroCopyStream(raw_output);
delete raw_output;
{
FileOutputStream raw_output(fd_);
message.SerializeToZeroCopyStream(&raw_output);
}
if (type == SectionType::SECTION_HEADER) {
static char blank[HEADER_LENGTH] = {'0'};
count = write(fd_, &blank, HEADER_LENGTH - message.ByteSize());
......
......@@ -26,44 +26,39 @@ namespace record {
using apollo::cyber::message::RawMessage;
const char CHANNEL_NAME_1[] = "/test/channel1";
const char CHANNEL_NAME_2[] = "/test/channel2";
const char MESSAGE_TYPE_1[] = "apollo.cyber.proto.Test";
const char MESSAGE_TYPE_2[] = "apollo.cyber.proto.Channel";
const char PROTO_DESC[] = "1234567890";
const char STR_10B[] = "1234567890";
const char TEST_FILE[] = "test.record";
const uint64_t TIME_1 = 1000 * 1e6;
const uint64_t TIME_2 = 1010 * 1e6;
const uint64_t TIME_3 = 1020 * 1e6;
const uint32_t MESSAGE_NUM = 16;
constexpr char kChannelName1[] = "/test/channel1";
constexpr char kMessageType1[] = "apollo.cyber.proto.Test";
constexpr char kProtoDesc[] = "1234567890";
constexpr char kStr10B[] = "1234567890";
constexpr char kTestFile[] = "record_reader_test.record";
constexpr uint32_t kMessageNum = 16;
TEST(RecordTest, TestSingleRecordFile) {
RecordWriter writer;
writer.SetSizeOfFileSegmentation(0);
writer.SetIntervalOfFileSegmentation(0);
writer.Open(TEST_FILE);
writer.WriteChannel(CHANNEL_NAME_1, MESSAGE_TYPE_1, PROTO_DESC);
for (uint32_t i = 0; i < MESSAGE_NUM; ++i) {
writer.Open(kTestFile);
writer.WriteChannel(kChannelName1, kMessageType1, kProtoDesc);
for (uint32_t i = 0; i < kMessageNum; ++i) {
auto msg = std::make_shared<RawMessage>(std::to_string(i));
writer.WriteMessage(CHANNEL_NAME_1, msg, i);
writer.WriteMessage(kChannelName1, msg, i);
}
ASSERT_EQ(MESSAGE_NUM, writer.GetMessageNumber(CHANNEL_NAME_1));
ASSERT_EQ(MESSAGE_TYPE_1, writer.GetMessageType(CHANNEL_NAME_1));
ASSERT_EQ(PROTO_DESC, writer.GetProtoDesc(CHANNEL_NAME_1));
ASSERT_EQ(kMessageNum, writer.GetMessageNumber(kChannelName1));
ASSERT_EQ(kMessageType1, writer.GetMessageType(kChannelName1));
ASSERT_EQ(kProtoDesc, writer.GetProtoDesc(kChannelName1));
writer.Close();
RecordReader reader(TEST_FILE);
RecordReader reader(kTestFile);
RecordMessage message;
ASSERT_EQ(MESSAGE_NUM, reader.GetMessageNumber(CHANNEL_NAME_1));
ASSERT_EQ(MESSAGE_TYPE_1, reader.GetMessageType(CHANNEL_NAME_1));
ASSERT_EQ(PROTO_DESC, reader.GetProtoDesc(CHANNEL_NAME_1));
ASSERT_EQ(kMessageNum, reader.GetMessageNumber(kChannelName1));
ASSERT_EQ(kMessageType1, reader.GetMessageType(kChannelName1));
ASSERT_EQ(kProtoDesc, reader.GetProtoDesc(kChannelName1));
// read all message
uint32_t i = 0;
for (i = 0; i < MESSAGE_NUM; ++i) {
for (i = 0; i < kMessageNum; ++i) {
ASSERT_TRUE(reader.ReadMessage(&message));
ASSERT_EQ(CHANNEL_NAME_1, message.channel_name);
ASSERT_EQ(kChannelName1, message.channel_name);
ASSERT_EQ(std::to_string(i), message.content);
ASSERT_EQ(i, message.time);
}
......@@ -71,9 +66,9 @@ TEST(RecordTest, TestSingleRecordFile) {
// skip first message
reader.Reset();
for (i = 0; i < MESSAGE_NUM - 1; ++i) {
for (i = 0; i < kMessageNum - 1; ++i) {
ASSERT_TRUE(reader.ReadMessage(&message, 1));
ASSERT_EQ(CHANNEL_NAME_1, message.channel_name);
ASSERT_EQ(kChannelName1, message.channel_name);
ASSERT_EQ(std::to_string(i + 1), message.content);
ASSERT_EQ(i + 1, message.time);
}
......@@ -81,13 +76,14 @@ TEST(RecordTest, TestSingleRecordFile) {
// skip last message
reader.Reset();
for (i = 0; i < MESSAGE_NUM - 1; ++i) {
ASSERT_TRUE(reader.ReadMessage(&message, 0, MESSAGE_NUM - 2));
ASSERT_EQ(CHANNEL_NAME_1, message.channel_name);
for (i = 0; i < kMessageNum - 1; ++i) {
ASSERT_TRUE(reader.ReadMessage(&message, 0, kMessageNum - 2));
ASSERT_EQ(kChannelName1, message.channel_name);
ASSERT_EQ(std::to_string(i), message.content);
ASSERT_EQ(i, message.time);
}
ASSERT_FALSE(reader.ReadMessage(&message, 0, MESSAGE_NUM - 2));
ASSERT_FALSE(reader.ReadMessage(&message, 0, kMessageNum - 2));
ASSERT_FALSE(remove(kTestFile));
}
} // namespace record
......
......@@ -33,25 +33,23 @@ namespace record {
using apollo::cyber::message::RawMessage;
const char CHANNEL_NAME_1[] = "/test/channel1";
const char CHANNEL_NAME_2[] = "/test/channel2";
const char MESSAGE_TYPE_1[] = "apollo.cyber.proto.Test";
const char MESSAGE_TYPE_2[] = "apollo.cyber.proto.Channel";
const char PROTO_DESC[] = "1234567890";
const char TEST_FILE[] = "viewer_test.record";
constexpr char kChannelName1[] = "/test/channel1";
constexpr char kMessageType1[] = "apollo.cyber.proto.Test";
constexpr char kProtoDesc1[] = "1234567890";
constexpr char kTestFile[] = "viewer_test.record";
void ConstructRecord(uint64_t msg_num, uint64_t begin_time,
uint64_t time_step) {
RecordWriter writer;
writer.SetSizeOfFileSegmentation(0);
writer.SetIntervalOfFileSegmentation(0);
writer.Open(TEST_FILE);
writer.WriteChannel(CHANNEL_NAME_1, MESSAGE_TYPE_1, PROTO_DESC);
writer.Open(kTestFile);
writer.WriteChannel(kChannelName1, kMessageType1, kProtoDesc1);
for (uint64_t i = 0; i < msg_num; i++) {
auto msg = std::make_shared<RawMessage>(std::to_string(i));
writer.WriteMessage(CHANNEL_NAME_1, msg, begin_time + time_step * i);
writer.WriteMessage(kChannelName1, msg, begin_time + time_step * i);
}
ASSERT_EQ(msg_num, writer.GetMessageNumber(CHANNEL_NAME_1));
ASSERT_EQ(msg_num, writer.GetMessageNumber(kChannelName1));
writer.Close();
}
......@@ -72,7 +70,7 @@ TEST(RecordTest, iterator_test) {
uint64_t end_time = begin_time + step_time * (msg_num - 1);
ConstructRecord(msg_num, begin_time, step_time);
auto reader = std::make_shared<RecordReader>(TEST_FILE);
auto reader = std::make_shared<RecordReader>(kTestFile);
RecordViewer viewer(reader);
EXPECT_TRUE(viewer.IsValid());
EXPECT_EQ(begin_time, viewer.begin_time());
......@@ -80,7 +78,7 @@ TEST(RecordTest, iterator_test) {
uint64_t i = 0;
for (auto& msg : viewer) {
EXPECT_EQ(CHANNEL_NAME_1, msg.channel_name);
EXPECT_EQ(kChannelName1, msg.channel_name);
EXPECT_EQ(begin_time + step_time * i, msg.time);
EXPECT_EQ(std::to_string(i), msg.content);
i++;
......@@ -89,7 +87,7 @@ TEST(RecordTest, iterator_test) {
i = 0;
std::for_each(viewer.begin(), viewer.end(), [&i](RecordMessage& msg) {
EXPECT_EQ(CHANNEL_NAME_1, msg.channel_name);
EXPECT_EQ(kChannelName1, msg.channel_name);
// EXPECT_EQ(begin_time + step_time * i, msg.time);
EXPECT_EQ(std::to_string(i), msg.content);
i++;
......@@ -98,12 +96,13 @@ TEST(RecordTest, iterator_test) {
i = 0;
for (auto it = viewer.begin(); it != viewer.end(); ++it) {
EXPECT_EQ(CHANNEL_NAME_1, it->channel_name);
EXPECT_EQ(kChannelName1, it->channel_name);
EXPECT_EQ(begin_time + step_time * i, it->time);
EXPECT_EQ(std::to_string(i), it->content);
i++;
}
EXPECT_EQ(msg_num, i);
ASSERT_FALSE(remove(kTestFile));
}
TEST(RecordTest, filter_test) {
......@@ -113,7 +112,7 @@ TEST(RecordTest, filter_test) {
uint64_t end_time = begin_time + step_time * (msg_num - 1);
ConstructRecord(msg_num, begin_time, step_time);
auto reader = std::make_shared<RecordReader>(TEST_FILE);
auto reader = std::make_shared<RecordReader>(kTestFile);
RecordViewer viewer_0(reader);
EXPECT_EQ(CheckCount(viewer_0), msg_num);
EXPECT_EQ(begin_time, viewer_0.begin_time());
......@@ -152,8 +151,9 @@ TEST(RecordTest, filter_test) {
EXPECT_EQ(CheckCount(viewer_6), 0);
// filter with exist channel
RecordViewer viewer_7(reader, 0, end_time, {CHANNEL_NAME_1});
RecordViewer viewer_7(reader, 0, end_time, {kChannelName1});
EXPECT_EQ(CheckCount(viewer_7), msg_num);
ASSERT_FALSE(remove(kTestFile));
}
TEST(RecordTest, mult_iterator_test) {
......@@ -163,7 +163,7 @@ TEST(RecordTest, mult_iterator_test) {
uint64_t end_time = begin_time + step_time * (msg_num - 1);
ConstructRecord(msg_num, begin_time, step_time);
auto reader = std::make_shared<RecordReader>(TEST_FILE);
auto reader = std::make_shared<RecordReader>(kTestFile);
RecordViewer viewer(reader);
EXPECT_TRUE(viewer.IsValid());
EXPECT_EQ(begin_time, viewer.begin_time());
......@@ -173,12 +173,13 @@ TEST(RecordTest, mult_iterator_test) {
uint64_t i = 0;
for (auto& msg : viewer) { // #2 iterator
EXPECT_EQ(CHANNEL_NAME_1, msg.channel_name);
EXPECT_EQ(kChannelName1, msg.channel_name);
EXPECT_EQ(begin_time + step_time * i, msg.time);
EXPECT_EQ(std::to_string(i), msg.content);
i++;
}
EXPECT_EQ(msg_num, i);
ASSERT_FALSE(remove(kTestFile));
}
} // namespace record
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册