提交 1f947c63 编写于 作者: fengqikai1414's avatar fengqikai1414 提交者: Calvin Miao

cyber: fix for record file

上级 ef8aa02f
......@@ -47,9 +47,9 @@ class RecordFileBase {
virtual ~RecordFileBase() {}
virtual bool Open(const std::string& path) = 0;
virtual void Close() = 0;
std::string GetPath() { return path_; }
Header GetHeader() { return header_; }
Index GetIndex() { return index_; }
const std::string& GetPath() const { return path_; }
const Header& GetHeader() const { return header_; }
const Index& GetIndex() const { return index_; }
int64_t CurrentPosition();
bool SetPosition(int64_t position);
......
......@@ -32,7 +32,8 @@ const char CHAN_1[] = "/test1";
const char CHAN_2[] = "/test2";
const char MSG_TYPE[] = "apollo.cyber.proto.Test";
const char STR_10B[] = "1234567890";
const char TEST_FILE[] = "test.record";
const char TEST_FILE_1[] = "test_1.record";
const char TEST_FILE_2[] = "test_2.record";
TEST(ChunkTest, TestAll) {
Chunk* ck = new Chunk();
......@@ -77,8 +78,8 @@ TEST(ChunkTest, TestAll) {
TEST(RecordFileTest, TestOneMessageFile) {
// writer open one message file
RecordFileWriter* rfw = new RecordFileWriter();
ASSERT_TRUE(rfw->Open(TEST_FILE));
ASSERT_EQ(TEST_FILE, rfw->GetPath());
ASSERT_TRUE(rfw->Open(TEST_FILE_1));
ASSERT_EQ(TEST_FILE_1, rfw->GetPath());
// write header section
Header hdr1 = HeaderBuilder::GetHeaderWithSegmentParams(0, 0);
......@@ -114,8 +115,8 @@ TEST(RecordFileTest, TestOneMessageFile) {
// header open one message file
RecordFileReader* rfr = new RecordFileReader();
ASSERT_TRUE(rfr->Open(TEST_FILE));
ASSERT_EQ(TEST_FILE, rfr->GetPath());
ASSERT_TRUE(rfr->Open(TEST_FILE_1));
ASSERT_EQ(TEST_FILE_1, rfr->GetPath());
Section sec;
......@@ -162,8 +163,8 @@ TEST(RecordFileTest, TestOneMessageFile) {
TEST(RecordFileTest, TestOneChunkFile) {
RecordFileWriter* rfw = new RecordFileWriter();
ASSERT_TRUE(rfw->Open(TEST_FILE));
ASSERT_EQ(TEST_FILE, rfw->GetPath());
ASSERT_TRUE(rfw->Open(TEST_FILE_1));
ASSERT_EQ(TEST_FILE_1, rfw->GetPath());
Header header = HeaderBuilder::GetHeaderWithChunkParams(0, 0);
header.set_segment_interval(0);
......@@ -212,6 +213,92 @@ TEST(RecordFileTest, TestOneChunkFile) {
ASSERT_EQ(3, rfw->GetHeader().message_number());
}
TEST(RecordFileTest, TestIndex) {
{
RecordFileWriter* rfw = new RecordFileWriter();
ASSERT_TRUE(rfw->Open(TEST_FILE_2));
ASSERT_EQ(TEST_FILE_2, rfw->GetPath());
Header header = HeaderBuilder::GetHeaderWithChunkParams(0, 0);
header.set_segment_interval(0);
header.set_segment_raw_size(0);
ASSERT_TRUE(rfw->WriteHeader(header));
ASSERT_FALSE(rfw->GetHeader().is_complete());
Channel chan1;
chan1.set_name(CHAN_1);
chan1.set_message_type(MSG_TYPE);
chan1.set_proto_desc(STR_10B);
ASSERT_TRUE(rfw->WriteChannel(chan1));
Channel chan2;
chan2.set_name(CHAN_2);
chan2.set_message_type(MSG_TYPE);
chan2.set_proto_desc(STR_10B);
ASSERT_TRUE(rfw->WriteChannel(chan2));
SingleMessage msg1;
msg1.set_channel_name(chan1.name());
msg1.set_content(STR_10B);
msg1.set_time(1e9);
ASSERT_TRUE(rfw->WriteMessage(msg1));
ASSERT_EQ(1, rfw->GetMessageNumber(chan1.name()));
SingleMessage msg2;
msg2.set_channel_name(chan2.name());
msg2.set_content(STR_10B);
msg2.set_time(2e9);
ASSERT_TRUE(rfw->WriteMessage(msg2));
ASSERT_EQ(1, rfw->GetMessageNumber(chan2.name()));
SingleMessage msg3;
msg3.set_channel_name(chan1.name());
msg3.set_content(STR_10B);
msg3.set_time(3e9);
ASSERT_TRUE(rfw->WriteMessage(msg3));
ASSERT_EQ(2, rfw->GetMessageNumber(chan1.name()));
rfw->Close();
ASSERT_TRUE(rfw->GetHeader().is_complete());
ASSERT_EQ(1, rfw->GetHeader().chunk_number());
ASSERT_EQ(1e9, rfw->GetHeader().begin_time());
ASSERT_EQ(3e9, rfw->GetHeader().end_time());
ASSERT_EQ(3, rfw->GetHeader().message_number());
}
{
RecordFileReader reader;
reader.Open(TEST_FILE_2);
reader.ReadIndex();
const auto& index = reader.GetIndex();
// Walk through file the long way and check that the indexes match the
// sections
reader.Reset();
Section section;
for (uint64_t pos = reader.CurrentPosition();
reader.ReadSection(&section) && reader.SkipSection(section.size);
pos = reader.CurrentPosition()) {
// Find index at position
if (section.type != SectionType::SECTION_INDEX) {
bool found = false;
SingleIndex match;
for (const auto& row : index.indexes()) {
if (row.position() == pos) {
match = row;
found = true;
break;
}
}
ASSERT_TRUE(found);
EXPECT_EQ(match.position(), pos);
EXPECT_EQ(match.type(), section.type);
}
}
}
}
} // namespace record
} // namespace cyber
} // namespace apollo
......
......@@ -129,6 +129,7 @@ bool RecordFileWriter::WriteIndex() {
bool RecordFileWriter::WriteChannel(const Channel& channel) {
std::lock_guard<std::mutex> lock(mutex_);
uint64_t pos = CurrentPosition();
if (!WriteSection<Channel>(channel)) {
AERROR << "Write section fail";
return false;
......@@ -136,7 +137,7 @@ bool RecordFileWriter::WriteChannel(const Channel& channel) {
header_.set_channel_number(header_.channel_number() + 1);
SingleIndex* single_index = index_.add_indexes();
single_index->set_type(SectionType::SECTION_CHANNEL);
single_index->set_position(CurrentPosition());
single_index->set_position(pos);
ChannelCache* channel_cache = new ChannelCache();
channel_cache->set_name(channel.name());
channel_cache->set_message_number(0);
......@@ -149,19 +150,22 @@ bool RecordFileWriter::WriteChannel(const Channel& channel) {
bool RecordFileWriter::WriteChunk(const ChunkHeader& chunk_header,
const ChunkBody& chunk_body) {
std::lock_guard<std::mutex> lock(mutex_);
uint64_t pos = CurrentPosition();
if (!WriteSection<ChunkHeader>(chunk_header)) {
AERROR << "Write chunk header fail";
return false;
}
SingleIndex* single_index = index_.add_indexes();
single_index->set_type(SectionType::SECTION_CHUNK_HEADER);
single_index->set_position(CurrentPosition());
single_index->set_position(pos);
ChunkHeaderCache* chunk_header_cache = new ChunkHeaderCache();
chunk_header_cache->set_begin_time(chunk_header.begin_time());
chunk_header_cache->set_end_time(chunk_header.end_time());
chunk_header_cache->set_message_number(chunk_header.message_number());
chunk_header_cache->set_raw_size(chunk_header.raw_size());
single_index->set_allocated_chunk_header_cache(chunk_header_cache);
pos = CurrentPosition();
if (!WriteSection<ChunkBody>(chunk_body)) {
AERROR << "Write chunk body fail";
return false;
......@@ -175,7 +179,7 @@ bool RecordFileWriter::WriteChunk(const ChunkHeader& chunk_header,
chunk_header.message_number());
single_index = index_.add_indexes();
single_index->set_type(SectionType::SECTION_CHUNK_BODY);
single_index->set_position(CurrentPosition());
single_index->set_position(pos);
ChunkBodyCache* chunk_body_cache = new ChunkBodyCache();
chunk_body_cache->set_message_number(chunk_body.messages_size());
single_index->set_allocated_chunk_body_cache(chunk_body_cache);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册