diff --git a/apollo.sh b/apollo.sh index 962218271dca17d8dbbf8080a8cf36c7a5bda47e..f9299481c0dd9a172a53e19ca3c562fa5ff3fb17 100755 --- a/apollo.sh +++ b/apollo.sh @@ -20,8 +20,6 @@ # Utils #================================================= -DISABLED_CYBER_MODULES="except //cyber/record:record_file_integration_test" - function source_apollo_base() { DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" cd "${DIR}" @@ -94,7 +92,7 @@ function check_esd_files() { } function generate_build_targets() { - COMMON_TARGETS="//cyber/... union //modules/common/kv_db/... union //modules/dreamview/... $DISABLED_CYBER_MODULES" + COMMON_TARGETS="//cyber/... union //modules/common/kv_db/... union //modules/dreamview/..." case $BUILD_FILTER in cyber) BUILD_TARGETS=`bazel query //cyber/...` @@ -118,7 +116,10 @@ function generate_build_targets() { BUILD_TARGETS=`bazel query //modules/... except //modules/perception/... union //cyber/...` ;; *) - BUILD_TARGETS=`bazel query //modules/... union //cyber/... $DISABLED_CYBER_MODULES` +# BUILD_TARGETS=`bazel query //modules/... union //cyber/...` + # FIXME(all): temporarily disable modules doesn't compile in 18.04 + BUILD_TARGETS=`bazel query //modules/... union //cyber/... except //modules/tools/visualizer/... except //modules/data/tools/rosbag_to_record/... except //modules/v2x/... except //modules/map/tools/map_datachecker/... ` + esac if [ $? -ne 0 ]; then @@ -538,7 +539,7 @@ function citest_extended() { source cyber/setup.bash BUILD_TARGETS=" - `bazel query //modules/planning/... union //modules/common/... union //cyber/... $DISABLED_CYBER_MODULES` + `bazel query //modules/planning/... union //modules/common/... union //cyber/...` `bazel query //modules/prediction/... union //modules/control/...` " diff --git a/cyber/record/BUILD b/cyber/record/BUILD index 7ab1f735c0b3b55d640d91dde7e12d19e4aacc6a..c991ad557b719db02b86f2ecbe1e63819a0c58e6 100644 --- a/cyber/record/BUILD +++ b/cyber/record/BUILD @@ -63,22 +63,6 @@ cc_test( ], ) -cc_test( - name = "record_file_integration_test", - size = "small", - srcs = ["file/record_file_integration_test.cc"], - tags = [ - "cpu:3", - "exclusive", - "manual", - ], - deps = [ - "//cyber", - "//cyber/proto:record_cc_proto", - "@gtest//:main", - ], -) - cc_library( name = "header_builder", srcs = ["header_builder.cc"], diff --git a/cyber/record/file/record_file_integration_test.cc b/cyber/record/file/record_file_integration_test.cc deleted file mode 100644 index 52b06480c9014276980447b4f71c2b0230b47250..0000000000000000000000000000000000000000 --- a/cyber/record/file/record_file_integration_test.cc +++ /dev/null @@ -1,126 +0,0 @@ -/****************************************************************************** - * Copyright 2018 The Apollo Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *****************************************************************************/ - -#include - -#include -#include -#include - -#include "cyber/record/file/record_file_base.h" -#include "cyber/record/file/record_file_reader.h" -#include "cyber/record/file/record_file_writer.h" -#include "cyber/record/header_builder.h" - -namespace apollo { -namespace cyber { -namespace record { - -class CpuSchedulerLatency { - public: - CpuSchedulerLatency() : periodic_thread_([this] { this->Callback(); }) {} - - ~CpuSchedulerLatency() { - running_ = false; - periodic_thread_.join(); - } - - std::chrono::nanoseconds GetMaxJitter() { - return std::chrono::nanoseconds(max_jitter_); - } - - int64_t GetNumSamples() { return samples_; } - - private: - void Callback() { - static constexpr std::chrono::milliseconds kSleepDuration(10); - auto prev_time = std::chrono::steady_clock::now(); - std::this_thread::sleep_for(kSleepDuration); - while (running_) { - const auto current_time = std::chrono::steady_clock::now(); - const auto time_since_sleep = current_time - prev_time; - const auto current_jitter = - std::abs((time_since_sleep - kSleepDuration).count()); - prev_time = current_time; - max_jitter_ = std::max(current_jitter, max_jitter_); - ++samples_; - std::this_thread::sleep_for(kSleepDuration); - } - } - - std::atomic running_{true}; - int64_t max_jitter_ = 0; - int64_t samples_ = 0; - - std::thread periodic_thread_; -}; - -const char kTestFile[] = "integration_test.record"; - -TEST(RecordFileTest, SmallMessageHighThroughputOKThreadJitter) { - CpuSchedulerLatency cpu_jitter; - - RecordFileWriter rfw; - - ASSERT_TRUE(rfw.Open(kTestFile)); - - Header hdr1 = HeaderBuilder::GetHeaderWithSegmentParams(0, 0); - hdr1.set_chunk_interval(1000); - hdr1.set_chunk_raw_size(0); - ASSERT_TRUE(rfw.WriteHeader(hdr1)); - ASSERT_FALSE(rfw.GetHeader().is_complete()); - - // write chunk section - static const std::string kChannelName = "small_message"; - - static constexpr int kMaxIterations = 1000000000; - static constexpr int64_t kMaxSamples = 1000; - for (int i = 0; - i < kMaxIterations && cpu_jitter.GetNumSamples() < kMaxSamples; ++i) { - SingleMessage msg1; - msg1.set_channel_name(kChannelName); - msg1.set_content("0123456789"); - msg1.set_time(i); - ASSERT_TRUE(rfw.WriteMessage(msg1)); - ASSERT_EQ(i + 1, rfw.GetMessageNumber(kChannelName)); - } - - EXPECT_GE(cpu_jitter.GetNumSamples(), kMaxSamples) - << "This system may be to fast. Consider increasing kMaxIterations"; - static constexpr int64_t kMaxJitterMS = 20; - const int64_t max_cpu_jitter_ms = - std::chrono::duration_cast( - cpu_jitter.GetMaxJitter()) - .count(); - EXPECT_LT(max_cpu_jitter_ms, kMaxJitterMS); - ASSERT_FALSE(remove(kTestFile)); -} - -} // namespace record -} // namespace cyber -} // namespace apollo - -int main(int argc, char** argv) { - testing::GTEST_FLAG(catch_exceptions) = 1; - testing::InitGoogleTest(&argc, argv); - google::InitGoogleLogging(argv[0]); - FLAGS_logtostderr = true; - const int ret_val = RUN_ALL_TESTS(); - google::protobuf::ShutdownProtobufLibrary(); - google::ShutDownCommandLineFlags(); - return ret_val; -} - diff --git a/cyber/record/file/record_file_test.cc b/cyber/record/file/record_file_test.cc index cab1e2ca817869eea172681f7b37e9d1af0310bd..32ee153c58d04494499e4ef6472960f4c3c9ffaa 100644 --- a/cyber/record/file/record_file_test.cc +++ b/cyber/record/file/record_file_test.cc @@ -28,197 +28,197 @@ namespace apollo { namespace cyber { namespace record { -constexpr char kChan1[] = "/test1"; -constexpr char kChan2[] = "/test2"; -constexpr char kMsgType[] = "apollo.cyber.proto.Test"; -constexpr char kStr10B[] = "1234567890"; -constexpr char kTestFile1[] = "record_file_test_1.record"; -constexpr char kTestFile2[] = "record_file_test_2.record"; +const char CHAN_1[] = "/test1"; +const char CHAN_2[] = "/test2"; +const char MSG_TYPE[] = "apollo.cyber.proto.Test"; +const char STR_10B[] = "1234567890"; +const char TEST_FILE_1[] = "test_1.record"; +const char TEST_FILE_2[] = "test_2.record"; TEST(ChunkTest, TestAll) { - Chunk ck; - ASSERT_EQ(0, ck.header_.begin_time()); - ASSERT_EQ(0, ck.header_.end_time()); - ASSERT_EQ(0, ck.header_.raw_size()); - ASSERT_EQ(0, ck.header_.message_number()); - ASSERT_TRUE(ck.empty()); + Chunk* ck = new Chunk(); + ASSERT_EQ(0, ck->header_.begin_time()); + ASSERT_EQ(0, ck->header_.end_time()); + ASSERT_EQ(0, ck->header_.raw_size()); + ASSERT_EQ(0, ck->header_.message_number()); + ASSERT_TRUE(ck->empty()); SingleMessage msg1; - msg1.set_channel_name(kChan1); - msg1.set_content(kStr10B); + msg1.set_channel_name(CHAN_1); + msg1.set_content(STR_10B); msg1.set_time(1e9); - ck.add(msg1); - ASSERT_EQ(1e9, ck.header_.begin_time()); - ASSERT_EQ(1e9, ck.header_.end_time()); - ASSERT_EQ(10, ck.header_.raw_size()); - ASSERT_EQ(1, ck.header_.message_number()); - ASSERT_FALSE(ck.empty()); + ck->add(msg1); + ASSERT_EQ(1e9, ck->header_.begin_time()); + ASSERT_EQ(1e9, ck->header_.end_time()); + ASSERT_EQ(10, ck->header_.raw_size()); + ASSERT_EQ(1, ck->header_.message_number()); + ASSERT_FALSE(ck->empty()); SingleMessage msg2; - msg2.set_channel_name(kChan2); - msg2.set_content(kStr10B); + msg2.set_channel_name(CHAN_2); + msg2.set_content(STR_10B); msg2.set_time(2e9); - ck.add(msg2); - ASSERT_EQ(1e9, ck.header_.begin_time()); - ASSERT_EQ(2e9, ck.header_.end_time()); - ASSERT_EQ(20, ck.header_.raw_size()); - ASSERT_EQ(2, ck.header_.message_number()); - ASSERT_FALSE(ck.empty()); - - ck.clear(); - ASSERT_EQ(0, ck.header_.begin_time()); - ASSERT_EQ(0, ck.header_.end_time()); - ASSERT_EQ(0, ck.header_.raw_size()); - ASSERT_EQ(0, ck.header_.message_number()); - ASSERT_TRUE(ck.empty()); + ck->add(msg2); + ASSERT_EQ(1e9, ck->header_.begin_time()); + ASSERT_EQ(2e9, ck->header_.end_time()); + ASSERT_EQ(20, ck->header_.raw_size()); + ASSERT_EQ(2, ck->header_.message_number()); + ASSERT_FALSE(ck->empty()); + + ck->clear(); + ASSERT_EQ(0, ck->header_.begin_time()); + ASSERT_EQ(0, ck->header_.end_time()); + ASSERT_EQ(0, ck->header_.raw_size()); + ASSERT_EQ(0, ck->header_.message_number()); + ASSERT_TRUE(ck->empty()); } TEST(RecordFileTest, TestOneMessageFile) { // writer open one message file - RecordFileWriter rfw; - ASSERT_TRUE(rfw.Open(kTestFile1)); - ASSERT_EQ(kTestFile1, rfw.GetPath()); + RecordFileWriter* rfw = new RecordFileWriter(); + ASSERT_TRUE(rfw->Open(TEST_FILE_1)); + ASSERT_EQ(TEST_FILE_1, rfw->GetPath()); // write header section Header hdr1 = HeaderBuilder::GetHeaderWithSegmentParams(0, 0); hdr1.set_chunk_interval(0); hdr1.set_chunk_raw_size(0); - ASSERT_TRUE(rfw.WriteHeader(hdr1)); - ASSERT_FALSE(rfw.GetHeader().is_complete()); + ASSERT_TRUE(rfw->WriteHeader(hdr1)); + ASSERT_FALSE(rfw->GetHeader().is_complete()); // write channel section Channel chan1; - chan1.set_name(kChan1); - chan1.set_message_type(kMsgType); - chan1.set_proto_desc(kStr10B); - ASSERT_TRUE(rfw.WriteChannel(chan1)); + chan1.set_name(CHAN_1); + chan1.set_message_type(MSG_TYPE); + chan1.set_proto_desc(STR_10B); + ASSERT_TRUE(rfw->WriteChannel(chan1)); // write chunk section SingleMessage msg1; msg1.set_channel_name(chan1.name()); - msg1.set_content(kStr10B); + msg1.set_content(STR_10B); msg1.set_time(1e9); - ASSERT_TRUE(rfw.WriteMessage(msg1)); - ASSERT_EQ(1, rfw.GetMessageNumber(chan1.name())); + ASSERT_TRUE(rfw->WriteMessage(msg1)); + ASSERT_EQ(1, rfw->GetMessageNumber(chan1.name())); // writer close one message file - rfw.Close(); - ASSERT_TRUE(rfw.GetHeader().is_complete()); - ASSERT_EQ(1, rfw.GetHeader().chunk_number()); - ASSERT_EQ(1e9, rfw.GetHeader().begin_time()); - ASSERT_EQ(1e9, rfw.GetHeader().end_time()); - ASSERT_EQ(1, rfw.GetHeader().message_number()); - hdr1 = rfw.GetHeader(); + rfw->Close(); + ASSERT_TRUE(rfw->GetHeader().is_complete()); + ASSERT_EQ(1, rfw->GetHeader().chunk_number()); + ASSERT_EQ(1e9, rfw->GetHeader().begin_time()); + ASSERT_EQ(1e9, rfw->GetHeader().end_time()); + ASSERT_EQ(1, rfw->GetHeader().message_number()); + hdr1 = rfw->GetHeader(); + delete rfw; // header open one message file - RecordFileReader rfr; - ASSERT_TRUE(rfr.Open(kTestFile1)); - ASSERT_EQ(kTestFile1, rfr.GetPath()); + RecordFileReader* rfr = new RecordFileReader(); + ASSERT_TRUE(rfr->Open(TEST_FILE_1)); + ASSERT_EQ(TEST_FILE_1, rfr->GetPath()); Section sec; // read header section - Header hdr2 = rfr.GetHeader(); + Header hdr2 = rfr->GetHeader(); ASSERT_EQ(hdr2.chunk_number(), hdr1.chunk_number()); ASSERT_EQ(hdr2.begin_time(), hdr1.begin_time()); ASSERT_EQ(hdr2.end_time(), hdr1.end_time()); ASSERT_EQ(hdr2.message_number(), hdr1.message_number()); // read channel section - ASSERT_TRUE(rfr.ReadSection(&sec)); + ASSERT_TRUE(rfr->ReadSection(&sec)); ASSERT_EQ(SectionType::SECTION_CHANNEL, sec.type); Channel chan2; - ASSERT_TRUE(rfr.ReadSection(sec.size, &chan2)); + ASSERT_TRUE(rfr->ReadSection(sec.size, &chan2)); ASSERT_EQ(chan2.name(), chan1.name()); ASSERT_EQ(chan2.message_type(), chan1.message_type()); ASSERT_EQ(chan2.proto_desc(), chan1.proto_desc()); // read chunk header section - ASSERT_TRUE(rfr.ReadSection(&sec)); + ASSERT_TRUE(rfr->ReadSection(&sec)); ASSERT_EQ(SectionType::SECTION_CHUNK_HEADER, sec.type); ChunkHeader ckh2; - ASSERT_TRUE(rfr.ReadSection(sec.size, &ckh2)); + ASSERT_TRUE(rfr->ReadSection(sec.size, &ckh2)); ASSERT_EQ(ckh2.begin_time(), 1e9); ASSERT_EQ(ckh2.end_time(), 1e9); ASSERT_EQ(ckh2.raw_size(), 10); ASSERT_EQ(ckh2.message_number(), 1); // read chunk body section - ASSERT_TRUE(rfr.ReadSection(&sec)); + ASSERT_TRUE(rfr->ReadSection(&sec)); ASSERT_EQ(SectionType::SECTION_CHUNK_BODY, sec.type); ChunkBody ckb2; - ASSERT_TRUE(rfr.ReadSection(sec.size, &ckb2)); + ASSERT_TRUE(rfr->ReadSection(sec.size, &ckb2)); ASSERT_EQ(ckb2.messages_size(), 1); ASSERT_EQ(ckb2.messages(0).channel_name(), ckb2.messages(0).channel_name()); ASSERT_EQ(ckb2.messages(0).time(), ckb2.messages(0).time()); ASSERT_EQ(ckb2.messages(0).content(), ckb2.messages(0).content()); - ASSERT_FALSE(remove(kTestFile1)); + // close recorder file reader + delete rfr; } TEST(RecordFileTest, TestOneChunkFile) { - RecordFileWriter rfw; + RecordFileWriter* rfw = new RecordFileWriter(); - ASSERT_TRUE(rfw.Open(kTestFile1)); - ASSERT_EQ(kTestFile1, rfw.GetPath()); + ASSERT_TRUE(rfw->Open(TEST_FILE_1)); + ASSERT_EQ(TEST_FILE_1, rfw->GetPath()); Header header = HeaderBuilder::GetHeaderWithChunkParams(0, 0); header.set_segment_interval(0); header.set_segment_raw_size(0); - ASSERT_TRUE(rfw.WriteHeader(header)); - ASSERT_FALSE(rfw.GetHeader().is_complete()); + ASSERT_TRUE(rfw->WriteHeader(header)); + ASSERT_FALSE(rfw->GetHeader().is_complete()); Channel chan1; - chan1.set_name(kChan1); - chan1.set_message_type(kMsgType); - chan1.set_proto_desc(kStr10B); - ASSERT_TRUE(rfw.WriteChannel(chan1)); + chan1.set_name(CHAN_1); + chan1.set_message_type(MSG_TYPE); + chan1.set_proto_desc(STR_10B); + ASSERT_TRUE(rfw->WriteChannel(chan1)); Channel chan2; - chan2.set_name(kChan2); - chan2.set_message_type(kMsgType); - chan2.set_proto_desc(kStr10B); - ASSERT_TRUE(rfw.WriteChannel(chan2)); + chan2.set_name(CHAN_2); + chan2.set_message_type(MSG_TYPE); + chan2.set_proto_desc(STR_10B); + ASSERT_TRUE(rfw->WriteChannel(chan2)); SingleMessage msg1; msg1.set_channel_name(chan1.name()); - msg1.set_content(kStr10B); + msg1.set_content(STR_10B); msg1.set_time(1e9); - ASSERT_TRUE(rfw.WriteMessage(msg1)); - ASSERT_EQ(1, rfw.GetMessageNumber(chan1.name())); + ASSERT_TRUE(rfw->WriteMessage(msg1)); + ASSERT_EQ(1, rfw->GetMessageNumber(chan1.name())); SingleMessage msg2; msg2.set_channel_name(chan2.name()); - msg2.set_content(kStr10B); + msg2.set_content(STR_10B); msg2.set_time(2e9); - ASSERT_TRUE(rfw.WriteMessage(msg2)); - ASSERT_EQ(1, rfw.GetMessageNumber(chan2.name())); + ASSERT_TRUE(rfw->WriteMessage(msg2)); + ASSERT_EQ(1, rfw->GetMessageNumber(chan2.name())); SingleMessage msg3; msg3.set_channel_name(chan1.name()); - msg3.set_content(kStr10B); + msg3.set_content(STR_10B); msg3.set_time(3e9); - ASSERT_TRUE(rfw.WriteMessage(msg3)); - ASSERT_EQ(2, rfw.GetMessageNumber(chan1.name())); - - rfw.Close(); - ASSERT_TRUE(rfw.GetHeader().is_complete()); - ASSERT_EQ(1, rfw.GetHeader().chunk_number()); - ASSERT_EQ(1e9, rfw.GetHeader().begin_time()); - ASSERT_EQ(3e9, rfw.GetHeader().end_time()); - ASSERT_EQ(3, rfw.GetHeader().message_number()); - - ASSERT_FALSE(remove(kTestFile1)); + ASSERT_TRUE(rfw->WriteMessage(msg3)); + ASSERT_EQ(2, rfw->GetMessageNumber(chan1.name())); + + rfw->Close(); + ASSERT_TRUE(rfw->GetHeader().is_complete()); + ASSERT_EQ(1, rfw->GetHeader().chunk_number()); + ASSERT_EQ(1e9, rfw->GetHeader().begin_time()); + ASSERT_EQ(3e9, rfw->GetHeader().end_time()); + ASSERT_EQ(3, rfw->GetHeader().message_number()); } TEST(RecordFileTest, TestIndex) { { RecordFileWriter* rfw = new RecordFileWriter(); - ASSERT_TRUE(rfw->Open(kTestFile2)); - ASSERT_EQ(kTestFile2, rfw->GetPath()); + ASSERT_TRUE(rfw->Open(TEST_FILE_2)); + ASSERT_EQ(TEST_FILE_2, rfw->GetPath()); Header header = HeaderBuilder::GetHeaderWithChunkParams(0, 0); header.set_segment_interval(0); @@ -227,34 +227,34 @@ TEST(RecordFileTest, TestIndex) { ASSERT_FALSE(rfw->GetHeader().is_complete()); Channel chan1; - chan1.set_name(kChan1); - chan1.set_message_type(kMsgType); - chan1.set_proto_desc(kStr10B); + chan1.set_name(CHAN_1); + chan1.set_message_type(MSG_TYPE); + chan1.set_proto_desc(STR_10B); ASSERT_TRUE(rfw->WriteChannel(chan1)); Channel chan2; - chan2.set_name(kChan2); - chan2.set_message_type(kMsgType); - chan2.set_proto_desc(kStr10B); + chan2.set_name(CHAN_2); + chan2.set_message_type(MSG_TYPE); + chan2.set_proto_desc(STR_10B); ASSERT_TRUE(rfw->WriteChannel(chan2)); SingleMessage msg1; msg1.set_channel_name(chan1.name()); - msg1.set_content(kStr10B); + msg1.set_content(STR_10B); msg1.set_time(1e9); ASSERT_TRUE(rfw->WriteMessage(msg1)); ASSERT_EQ(1, rfw->GetMessageNumber(chan1.name())); SingleMessage msg2; msg2.set_channel_name(chan2.name()); - msg2.set_content(kStr10B); + msg2.set_content(STR_10B); msg2.set_time(2e9); ASSERT_TRUE(rfw->WriteMessage(msg2)); ASSERT_EQ(1, rfw->GetMessageNumber(chan2.name())); SingleMessage msg3; msg3.set_channel_name(chan1.name()); - msg3.set_content(kStr10B); + msg3.set_content(STR_10B); msg3.set_time(3e9); ASSERT_TRUE(rfw->WriteMessage(msg3)); ASSERT_EQ(2, rfw->GetMessageNumber(chan1.name())); @@ -268,7 +268,7 @@ TEST(RecordFileTest, TestIndex) { } { RecordFileReader reader; - reader.Open(kTestFile2); + reader.Open(TEST_FILE_2); reader.ReadIndex(); const auto& index = reader.GetIndex(); @@ -308,8 +308,5 @@ int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); google::InitGoogleLogging(argv[0]); FLAGS_logtostderr = true; - const int ret_val = RUN_ALL_TESTS(); - google::protobuf::ShutdownProtobufLibrary(); - google::ShutDownCommandLineFlags(); - return ret_val; + return RUN_ALL_TESTS(); } diff --git a/cyber/record/file/record_file_writer.cc b/cyber/record/file/record_file_writer.cc index d48a4621becc11b454e6a5a5f207cd5a7349f9a5..4bf4a6621081a7d1141d75d32b0a1aa01b4d44e3 100644 --- a/cyber/record/file/record_file_writer.cc +++ b/cyber/record/file/record_file_writer.cc @@ -91,6 +91,7 @@ void RecordFileWriter::Close() { if (close(fd_) < 0) { AERROR << "Close file failed, file: " << path_ << ", fd: " << fd_ << ", errno: " << errno; + return; } } } diff --git a/cyber/record/file/record_file_writer.h b/cyber/record/file/record_file_writer.h index d5c3756d5a483b720dd8b3a32aa225ff8b390016..2700beb66ce35c1d5654d38b3b1da0085c31a3c8 100644 --- a/cyber/record/file/record_file_writer.h +++ b/cyber/record/file/record_file_writer.h @@ -123,10 +123,7 @@ bool RecordFileWriter::WriteSection(const T& message) { AERROR << "Do not support this template typename."; return false; } - Section section; - /// zero out whole struct even if padded - memset(§ion, 0, sizeof(section)); - section = {type, message.ByteSize()}; + Section section = {type, message.ByteSize()}; ssize_t count = write(fd_, §ion, sizeof(section)); if (count < 0) { AERROR << "Write fd failed, fd: " << fd_ << ", errno: " << errno; @@ -138,10 +135,9 @@ bool RecordFileWriter::WriteSection(const T& message) { << ", actual count: " << count; return false; } - { - FileOutputStream raw_output(fd_); - message.SerializeToZeroCopyStream(&raw_output); - } + ZeroCopyOutputStream* raw_output = new FileOutputStream(fd_); + message.SerializeToZeroCopyStream(raw_output); + delete raw_output; if (type == SectionType::SECTION_HEADER) { static char blank[HEADER_LENGTH] = {'0'}; count = write(fd_, &blank, HEADER_LENGTH - message.ByteSize()); diff --git a/cyber/record/record_reader_test.cc b/cyber/record/record_reader_test.cc index 67ed24dedf5ec3c86423c27841fac2f44d3f73f8..01262ff9439793bee04168c846d3787ee919535d 100644 --- a/cyber/record/record_reader_test.cc +++ b/cyber/record/record_reader_test.cc @@ -26,39 +26,44 @@ namespace record { using apollo::cyber::message::RawMessage; -constexpr char kChannelName1[] = "/test/channel1"; -constexpr char kMessageType1[] = "apollo.cyber.proto.Test"; -constexpr char kProtoDesc[] = "1234567890"; -constexpr char kStr10B[] = "1234567890"; -constexpr char kTestFile[] = "record_reader_test.record"; -constexpr uint32_t kMessageNum = 16; +const char CHANNEL_NAME_1[] = "/test/channel1"; +const char CHANNEL_NAME_2[] = "/test/channel2"; +const char MESSAGE_TYPE_1[] = "apollo.cyber.proto.Test"; +const char MESSAGE_TYPE_2[] = "apollo.cyber.proto.Channel"; +const char PROTO_DESC[] = "1234567890"; +const char STR_10B[] = "1234567890"; +const char TEST_FILE[] = "test.record"; +const uint64_t TIME_1 = 1000 * 1e6; +const uint64_t TIME_2 = 1010 * 1e6; +const uint64_t TIME_3 = 1020 * 1e6; +const uint32_t MESSAGE_NUM = 16; TEST(RecordTest, TestSingleRecordFile) { RecordWriter writer; writer.SetSizeOfFileSegmentation(0); writer.SetIntervalOfFileSegmentation(0); - writer.Open(kTestFile); - writer.WriteChannel(kChannelName1, kMessageType1, kProtoDesc); - for (uint32_t i = 0; i < kMessageNum; ++i) { + writer.Open(TEST_FILE); + writer.WriteChannel(CHANNEL_NAME_1, MESSAGE_TYPE_1, PROTO_DESC); + for (uint32_t i = 0; i < MESSAGE_NUM; ++i) { auto msg = std::make_shared(std::to_string(i)); - writer.WriteMessage(kChannelName1, msg, i); + writer.WriteMessage(CHANNEL_NAME_1, msg, i); } - ASSERT_EQ(kMessageNum, writer.GetMessageNumber(kChannelName1)); - ASSERT_EQ(kMessageType1, writer.GetMessageType(kChannelName1)); - ASSERT_EQ(kProtoDesc, writer.GetProtoDesc(kChannelName1)); + ASSERT_EQ(MESSAGE_NUM, writer.GetMessageNumber(CHANNEL_NAME_1)); + ASSERT_EQ(MESSAGE_TYPE_1, writer.GetMessageType(CHANNEL_NAME_1)); + ASSERT_EQ(PROTO_DESC, writer.GetProtoDesc(CHANNEL_NAME_1)); writer.Close(); - RecordReader reader(kTestFile); + RecordReader reader(TEST_FILE); RecordMessage message; - ASSERT_EQ(kMessageNum, reader.GetMessageNumber(kChannelName1)); - ASSERT_EQ(kMessageType1, reader.GetMessageType(kChannelName1)); - ASSERT_EQ(kProtoDesc, reader.GetProtoDesc(kChannelName1)); + ASSERT_EQ(MESSAGE_NUM, reader.GetMessageNumber(CHANNEL_NAME_1)); + ASSERT_EQ(MESSAGE_TYPE_1, reader.GetMessageType(CHANNEL_NAME_1)); + ASSERT_EQ(PROTO_DESC, reader.GetProtoDesc(CHANNEL_NAME_1)); // read all message uint32_t i = 0; - for (i = 0; i < kMessageNum; ++i) { + for (i = 0; i < MESSAGE_NUM; ++i) { ASSERT_TRUE(reader.ReadMessage(&message)); - ASSERT_EQ(kChannelName1, message.channel_name); + ASSERT_EQ(CHANNEL_NAME_1, message.channel_name); ASSERT_EQ(std::to_string(i), message.content); ASSERT_EQ(i, message.time); } @@ -66,9 +71,9 @@ TEST(RecordTest, TestSingleRecordFile) { // skip first message reader.Reset(); - for (i = 0; i < kMessageNum - 1; ++i) { + for (i = 0; i < MESSAGE_NUM - 1; ++i) { ASSERT_TRUE(reader.ReadMessage(&message, 1)); - ASSERT_EQ(kChannelName1, message.channel_name); + ASSERT_EQ(CHANNEL_NAME_1, message.channel_name); ASSERT_EQ(std::to_string(i + 1), message.content); ASSERT_EQ(i + 1, message.time); } @@ -76,14 +81,13 @@ TEST(RecordTest, TestSingleRecordFile) { // skip last message reader.Reset(); - for (i = 0; i < kMessageNum - 1; ++i) { - ASSERT_TRUE(reader.ReadMessage(&message, 0, kMessageNum - 2)); - ASSERT_EQ(kChannelName1, message.channel_name); + for (i = 0; i < MESSAGE_NUM - 1; ++i) { + ASSERT_TRUE(reader.ReadMessage(&message, 0, MESSAGE_NUM - 2)); + ASSERT_EQ(CHANNEL_NAME_1, message.channel_name); ASSERT_EQ(std::to_string(i), message.content); ASSERT_EQ(i, message.time); } - ASSERT_FALSE(reader.ReadMessage(&message, 0, kMessageNum - 2)); - ASSERT_FALSE(remove(kTestFile)); + ASSERT_FALSE(reader.ReadMessage(&message, 0, MESSAGE_NUM - 2)); } } // namespace record diff --git a/cyber/record/record_viewer_test.cc b/cyber/record/record_viewer_test.cc index ac471d2abcfb6c4922b77f91177996c8792a20c6..72713c36d3de042343d02d822c90d22f6eb0e74d 100644 --- a/cyber/record/record_viewer_test.cc +++ b/cyber/record/record_viewer_test.cc @@ -33,23 +33,25 @@ namespace record { using apollo::cyber::message::RawMessage; -constexpr char kChannelName1[] = "/test/channel1"; -constexpr char kMessageType1[] = "apollo.cyber.proto.Test"; -constexpr char kProtoDesc1[] = "1234567890"; -constexpr char kTestFile[] = "viewer_test.record"; +const char CHANNEL_NAME_1[] = "/test/channel1"; +const char CHANNEL_NAME_2[] = "/test/channel2"; +const char MESSAGE_TYPE_1[] = "apollo.cyber.proto.Test"; +const char MESSAGE_TYPE_2[] = "apollo.cyber.proto.Channel"; +const char PROTO_DESC[] = "1234567890"; +const char TEST_FILE[] = "viewer_test.record"; void ConstructRecord(uint64_t msg_num, uint64_t begin_time, uint64_t time_step) { RecordWriter writer; writer.SetSizeOfFileSegmentation(0); writer.SetIntervalOfFileSegmentation(0); - writer.Open(kTestFile); - writer.WriteChannel(kChannelName1, kMessageType1, kProtoDesc1); + writer.Open(TEST_FILE); + writer.WriteChannel(CHANNEL_NAME_1, MESSAGE_TYPE_1, PROTO_DESC); for (uint64_t i = 0; i < msg_num; i++) { auto msg = std::make_shared(std::to_string(i)); - writer.WriteMessage(kChannelName1, msg, begin_time + time_step * i); + writer.WriteMessage(CHANNEL_NAME_1, msg, begin_time + time_step * i); } - ASSERT_EQ(msg_num, writer.GetMessageNumber(kChannelName1)); + ASSERT_EQ(msg_num, writer.GetMessageNumber(CHANNEL_NAME_1)); writer.Close(); } @@ -70,7 +72,7 @@ TEST(RecordTest, iterator_test) { uint64_t end_time = begin_time + step_time * (msg_num - 1); ConstructRecord(msg_num, begin_time, step_time); - auto reader = std::make_shared(kTestFile); + auto reader = std::make_shared(TEST_FILE); RecordViewer viewer(reader); EXPECT_TRUE(viewer.IsValid()); EXPECT_EQ(begin_time, viewer.begin_time()); @@ -78,7 +80,7 @@ TEST(RecordTest, iterator_test) { uint64_t i = 0; for (auto& msg : viewer) { - EXPECT_EQ(kChannelName1, msg.channel_name); + EXPECT_EQ(CHANNEL_NAME_1, msg.channel_name); EXPECT_EQ(begin_time + step_time * i, msg.time); EXPECT_EQ(std::to_string(i), msg.content); i++; @@ -87,7 +89,7 @@ TEST(RecordTest, iterator_test) { i = 0; std::for_each(viewer.begin(), viewer.end(), [&i](RecordMessage& msg) { - EXPECT_EQ(kChannelName1, msg.channel_name); + EXPECT_EQ(CHANNEL_NAME_1, msg.channel_name); // EXPECT_EQ(begin_time + step_time * i, msg.time); EXPECT_EQ(std::to_string(i), msg.content); i++; @@ -96,13 +98,12 @@ TEST(RecordTest, iterator_test) { i = 0; for (auto it = viewer.begin(); it != viewer.end(); ++it) { - EXPECT_EQ(kChannelName1, it->channel_name); + EXPECT_EQ(CHANNEL_NAME_1, it->channel_name); EXPECT_EQ(begin_time + step_time * i, it->time); EXPECT_EQ(std::to_string(i), it->content); i++; } EXPECT_EQ(msg_num, i); - ASSERT_FALSE(remove(kTestFile)); } TEST(RecordTest, filter_test) { @@ -112,7 +113,7 @@ TEST(RecordTest, filter_test) { uint64_t end_time = begin_time + step_time * (msg_num - 1); ConstructRecord(msg_num, begin_time, step_time); - auto reader = std::make_shared(kTestFile); + auto reader = std::make_shared(TEST_FILE); RecordViewer viewer_0(reader); EXPECT_EQ(CheckCount(viewer_0), msg_num); EXPECT_EQ(begin_time, viewer_0.begin_time()); @@ -151,9 +152,8 @@ TEST(RecordTest, filter_test) { EXPECT_EQ(CheckCount(viewer_6), 0); // filter with exist channel - RecordViewer viewer_7(reader, 0, end_time, {kChannelName1}); + RecordViewer viewer_7(reader, 0, end_time, {CHANNEL_NAME_1}); EXPECT_EQ(CheckCount(viewer_7), msg_num); - ASSERT_FALSE(remove(kTestFile)); } TEST(RecordTest, mult_iterator_test) { @@ -163,7 +163,7 @@ TEST(RecordTest, mult_iterator_test) { uint64_t end_time = begin_time + step_time * (msg_num - 1); ConstructRecord(msg_num, begin_time, step_time); - auto reader = std::make_shared(kTestFile); + auto reader = std::make_shared(TEST_FILE); RecordViewer viewer(reader); EXPECT_TRUE(viewer.IsValid()); EXPECT_EQ(begin_time, viewer.begin_time()); @@ -173,13 +173,12 @@ TEST(RecordTest, mult_iterator_test) { uint64_t i = 0; for (auto& msg : viewer) { // #2 iterator - EXPECT_EQ(kChannelName1, msg.channel_name); + EXPECT_EQ(CHANNEL_NAME_1, msg.channel_name); EXPECT_EQ(begin_time + step_time * i, msg.time); EXPECT_EQ(std::to_string(i), msg.content); i++; } EXPECT_EQ(msg_num, i); - ASSERT_FALSE(remove(kTestFile)); } } // namespace record