提交 2218ab92 编写于 作者: A azural 提交者: GoLancer

framework: Update RecordReader interface

上级 7db58ece
......@@ -26,7 +26,7 @@ add_subdirectory(cybertron/mainboard)
# add_subdirectory(sensor)
add_subdirectory(third_party)
add_subdirectory(python)
#add_subdirectory(python)
add_subdirectory(cybertron/blocker)
file(GLOB CYBERTRON_SRCS
......@@ -93,4 +93,4 @@ install(DIRECTORY "cybertron"
)
enable_testing()
#add_subdirectory(cybertron/test)
add_subdirectory(cybertron/test)
......@@ -51,10 +51,16 @@ bool RecordFileReader::Open(const std::string& path) {
return ReadHeader();
}
void RecordFileReader::Reset() {
ifstream_.clear();
ifstream_.seekg(0, std::ios::beg);
ReadHeader();
}
bool RecordFileReader::ReadHeader() {
Section section;
uint64_t backup_position = ifstream_.tellg();
ifstream_.seekg(0, std::ios::beg);
Section section;
if (!ReadSection(&section)) {
AERROR << "read header section fail, file is broken of it is not a record "
"file.";
......@@ -163,7 +169,7 @@ bool RecordFileWriter::Open(const std::string& path) {
AERROR << "init flush thread error.";
return false;
}
AINFO << "record file opened, file: " << path_;
ADEBUG << "record file opened, file: " << path_;
return true;
}
......@@ -207,7 +213,7 @@ void RecordFileWriter::Close() {
}
ofstream_.close();
AINFO << "record file closed, file: " << path_;
ADEBUG << "record file closed, file: " << path_;
}
void RecordFileWriter::RefreshIndex() {
......
......@@ -95,6 +95,7 @@ class RecordFile {
virtual ~RecordFile();
virtual bool Open(const std::string& path) = 0;
virtual void Close() = 0;
Header GetHeader();
Index GetIndex();
......@@ -121,6 +122,7 @@ class RecordFileReader : public RecordFile {
bool ReadHeader();
bool ReadIndex();
bool EndOfFile();
void Reset();
private:
std::ifstream ifstream_;
......
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef CYBERTRON_RECORD_RECORD_MESSAGE_H_
#define CYBERTRON_RECORD_RECORD_MESSAGE_H_
#include <condition_variable>
#include <memory>
#include <queue>
#include <string>
#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>
#include "cybertron/message/raw_message.h"
#include "cybertron/record/record_base.h"
namespace apollo {
namespace cybertron {
namespace record {
using ::apollo::cybertron::message::RawMessage;
using ::apollo::cybertron::record::RecordFileReader;
struct RecordMessage {
RecordMessage() {}
RecordMessage(const std::string& name,
const std::string& message,
uint64_t msg_time)
: channel_name(name),
content(message),
time(msg_time) {}
std::string channel_name;
std::string content;
uint64_t time;
};
} // namespace record
} // namespace cybertron
} // namespace apollo
#endif // CYBERTRON_RECORD_RECORD_READER_H_
......@@ -20,205 +20,116 @@ namespace apollo {
namespace cybertron {
namespace record {
RecordReader::RecordReader() {}
RecordReader::~RecordReader() {
if (load_thread_ && load_thread_->joinable()) {
load_thread_->join();
}
Close();
}
bool RecordReader::Open(const std::string& file, uint64_t begin_time,
uint64_t end_time,
const std::vector<std::string>& channel_vec) {
file_ = file;
path_ = file_;
RecordReader::RecordReader(const std::shared_ptr<RecordFileReader>& file,
uint64_t begin_time,
uint64_t end_time,
const std::set<std::string>& channels) {
begin_time_ = begin_time;
end_time_ = end_time;
channel_vec_ = channel_vec;
all_channels_ = channel_vec_.empty() ? true : false;
file_reader_.reset(new RecordFileReader());
current_message_.reset(new RecordMessage());
channels_ = channels;
file_reader_ = file;
if (!file_reader_->Open(path_)) {
AERROR << "open record file failed. file: " << path_;
return false;
}
header_ = file_reader_->GetHeader();
file_reader_->Reset();
if (begin_time_ < header_.begin_time()) {
begin_time_ = header_.begin_time();
}
if (end_time_ > header_.end_time()) {
end_time_ = header_.end_time();
}
if (begin_time_ > end_time_) {
AERROR << "begin time is larger than end time"
AERROR << "Begin time must be earlier than end time"
<< ", begin_time_=" << begin_time_ << ", end_time_=" << end_time_;
return false;
}
if (!InitLoadThread()) {
AERROR << "init loader error.";
return false;
}
is_opened_ = true;
return is_opened_;
}
}
void RecordReader::Close() {
if (is_opened_) {
file_reader_->Close();
is_opened_ = false;
}
}
bool RecordReader::ReadMessage(RecordMessage* message) {
while (index_ < chunk_.messages_size()) {
const auto& next_message = chunk_.messages(index_++);
uint64_t time = next_message.time();
if (time > end_time_) {
return false;
}
if (time < begin_time_) {
continue;
}
bool RecordReader::InitLoadThread() {
auto f = std::bind(&RecordReader::LoadChunk, this, std::placeholders::_1,
std::placeholders::_2);
load_thread_ = std::make_shared<std::thread>(f, begin_time_, end_time_);
if (load_thread_ == nullptr) {
AERROR << "init loader thread error.";
return false;
}
return true;
}
bool RecordReader::ReadMessage() {
std::lock_guard<std::mutex> lck(mutex_);
if (!message_queue_.empty()) {
current_message_ = message_queue_.front();
message_queue_.pop();
const std::string& channel_name = next_message.channel_name();
if (!channels_.empty() && channels_.count(channel_name) == 0) {
continue;
}
OnNewMessage(channel_name);
message->channel_name = channel_name;
message->content = next_message.content();
message->time = time;
return true;
}
if (!looped_readback_ && loaded_all_message_) {
AINFO << "reach the last message";
} else {
AERROR << "read too fast, wait for load thread";
}
return false;
}
bool RecordReader::EndOfFile() {
std::lock_guard<std::mutex> lck(mutex_);
if (message_queue_.empty() && !looped_readback_ && loaded_all_message_) {
return true;
if (ReadNextChunk()) {
index_ = 0;
return ReadMessage(message);
}
return false;
}
void RecordReader::LoadChunk(uint64_t from_time, uint64_t to_time) {
uint64_t loop_time = end_time_ - begin_time_;
uint64_t total_message_number = header_.message_number();
if (total_message_number == 0) {
AERROR << "total message number is zero.";
return;
}
uint64_t average_period = loop_time / total_message_number;
if (average_period < 1e7) {
average_period = 1e7; // 10ms
}
double average_hz = total_message_number / (loop_time * 1e-9);
uint32_t preload_queue_size = (uint32_t)average_hz * preload_seconds_;
if (preload_queue_size < min_queue_size_) {
preload_queue_size = min_queue_size_;
}
uint32_t loop_num = 0;
do {
AINFO << "new loop started";
bool skip_next_chunk_body(false);
file_reader_->ReadHeader();
while (!file_reader_->EndOfFile()) {
while (message_queue_.size() >= preload_queue_size) {
std::this_thread::sleep_for(std::chrono::nanoseconds(average_period));
}
Section section;
if (!file_reader_->ReadSection(&section)) {
AERROR << "read section fail";
return;
bool RecordReader::ReadNextChunk() {
bool skip_next_chunk_body = false;
Section section;
while (file_reader_->ReadSection(&section)) {
switch (section.type) {
case SectionType::SECTION_INDEX: {
file_reader_->SkipSection(section.size);
break;
}
if (section.type == SectionType::SECTION_INDEX) {
case SectionType::SECTION_CHANNEL: {
ADEBUG << "Read channel section of size: " << section.size;
Channel channel;
if (!file_reader_->ReadSection<Channel>(section.size, &channel)) {
AERROR << "Failed to read channel section.";
return false;
}
OnNewChannel(channel.name(), channel.message_type(), channel.proto_desc());
break;
}
switch (section.type) {
case SectionType::SECTION_CHANNEL: {
Channel chan;
if (!file_reader_->ReadSection<Channel>(section.size, &chan)) {
AERROR << "read channel section fail.";
return;
}
OnNewChannel(chan.name(), chan.message_type(), chan.proto_desc());
break;
case SectionType::SECTION_CHUNK_HEADER: {
ADEBUG << "Read chunk header section of size: " << section.size;
ChunkHeader header;
if (!file_reader_->ReadSection<ChunkHeader>(section.size, &header)) {
AERROR << "Failed to read chunk header section.";
return false;
}
case SectionType::SECTION_CHUNK_HEADER: {
ChunkHeader chdr;
if (!file_reader_->ReadSection<ChunkHeader>(section.size, &chdr)) {
AERROR << "read chunk header section fail.";
return;
}
if (from_time > chdr.end_time() || to_time < chdr.begin_time()) {
skip_next_chunk_body = true;
}
break;
if (begin_time_ > header.end_time()) {
return false;
}
case SectionType::SECTION_CHUNK_BODY: {
if (skip_next_chunk_body) {
file_reader_->SkipSection(section.size);
skip_next_chunk_body = false;
break;
}
ChunkBody cbd;
if (!file_reader_->ReadSection<ChunkBody>(section.size, &cbd)) {
AERROR << "read chunk body section fail.";
return;
}
for (int idx = 0; idx < cbd.messages_size(); ++idx) {
uint64_t time(cbd.messages(idx).time());
if (time < from_time || time > to_time) {
continue;
}
std::string channel_name(cbd.messages(idx).channel_name());
if (!all_channels_ &&
std::find(channel_vec_.begin(), channel_vec_.end(),
channel_name) == channel_vec_.end()) {
continue;
}
std::shared_ptr<RawMessage> raw_message(
new RawMessage(cbd.messages(idx).content()));
std::shared_ptr<RecordMessage> message_queue_item(new RecordMessage(
channel_name, raw_message, cbd.messages(idx).time()));
OnNewMessage(channel_name);
{
std::lock_guard<std::mutex> lck(mutex_);
message_queue_.push(message_queue_item);
}
}
break;
if (end_time_ < header.begin_time()) {
skip_next_chunk_body = true;
}
default: {
AERROR << "section should not be here, section type: "
<< section.type;
break;
}
case SectionType::SECTION_CHUNK_BODY: {
if (skip_next_chunk_body) {
file_reader_->SkipSection(section.size);
skip_next_chunk_body = false;
break;
}
} // end switch for section type
} // end while, condition is not EOF
loop_num++;
} while (looped_readback_);
loaded_all_message_ = true;
}
const std::string& RecordReader::CurrentMessageChannelName() {
std::lock_guard<std::mutex> lck(mutex_);
return current_message_->channel_name;
}
uint64_t RecordReader::CurrentMessageTime() {
std::lock_guard<std::mutex> lck(mutex_);
return current_message_->time;
}
std::shared_ptr<RawMessage> RecordReader::CurrentRawMessage() {
std::lock_guard<std::mutex> lck(mutex_);
return current_message_->raw_message;
if (!file_reader_->ReadSection<ChunkBody>(section.size, &chunk_)) {
AERROR << "Failed to read chunk body section.";
return false;
}
return true;
}
default: {
AERROR << "Invalid section type: " << section.type;
return false;
}
}
}
return false;
}
} // namespace record
......
......@@ -28,6 +28,7 @@
#include <vector>
#include "cybertron/message/raw_message.h"
#include "cybertron/record/record_base.h"
#include "cybertron/record/record_message.h"
namespace apollo {
namespace cybertron {
......@@ -38,59 +39,24 @@ using ::apollo::cybertron::record::RecordFileReader;
class RecordReader : public RecordBase {
public:
RecordReader();
RecordReader(const std::shared_ptr<RecordFileReader>& file,
uint64_t begin_time,
uint64_t end_time,
const std::set<std::string>& channel_filter);
virtual ~RecordReader();
bool Open(
const std::string& filename, uint64_t begin_time = 0,
uint64_t end_time = UINT64_MAX,
const std::vector<std::string>& channel_vec = std::vector<std::string>());
void Close();
bool ReadMessage();
bool EndOfFile();
const std::string& CurrentMessageChannelName();
std::shared_ptr<RawMessage> CurrentRawMessage();
uint64_t CurrentMessageTime();
bool ReadMessage(RecordMessage* message);
private:
bool InitLoadThread();
void LoadChunk(uint64_t from_time, uint64_t to_time);
bool ReadNextChunk();
bool all_channels_ = false;
bool looped_readback_ = false;
bool loaded_all_message_ = false;
uint64_t begin_time_ = 0;
uint64_t end_time_ = UINT64_MAX;
uint32_t preload_seconds_ = 3;
uint32_t min_queue_size_ = 2000;
std::unique_ptr<RecordFileReader> file_reader_ = nullptr;
std::shared_ptr<std::thread> load_thread_ = nullptr;
std::vector<std::string> channel_vec_;
struct RecordMessage {
public:
RecordMessage()
: channel_name(""), raw_message(new RawMessage()), time(0) {}
explicit RecordMessage(const std::string& channel_name,
std::shared_ptr<RawMessage>& raw_message,
uint64_t time)
: channel_name(std::move(channel_name)),
raw_message(raw_message),
time(time) {}
~RecordMessage() {}
std::string channel_name;
std::shared_ptr<RawMessage> raw_message;
uint64_t time;
std::set<std::string> channels_;
std::shared_ptr<RecordFileReader> file_reader_ = nullptr;
private:
RecordMessage(const RecordMessage&) = delete;
RecordMessage& operator=(const RecordMessage&) = delete;
};
std::shared_ptr<RecordMessage> current_message_;
std::queue<std::shared_ptr<RecordMessage>> message_queue_;
ChunkBody chunk_;
uint64_t pos = 0;
uint32_t index_ = 0;
};
} // namespace record
......
......@@ -43,7 +43,7 @@ TEST(CRoutineTest, croutine_switch) {
TEST(CRoutineTest, croutine_sleep) {
auto cr = std::make_shared<CRoutine>(
[]() { croutine::CRoutine::GetCurrentRoutine()->Sleep(1000); });
[]() { croutine::CRoutine::GetCurrentRoutine()->Sleep(croutine::Duration(1000)); });
auto state = cr->Resume();
EXPECT_EQ(state, RoutineState::SLEEP);
Sleep(5);
......
......@@ -66,42 +66,41 @@ TEST(RecordTest, TestOneMessageFile) {
ASSERT_EQ(MESSAGE_TYPE_1, rw->GetMessageType(CHANNEL_NAME_1));
ASSERT_EQ(PROTO_DESC, rw->GetProtoDesc(CHANNEL_NAME_1));
std::shared_ptr<RawMessage> rm(new RawMessage(STR_10B));
ASSERT_TRUE(rw->WriteMessage(CHANNEL_NAME_1, rm, TIME_1));
ASSERT_EQ(1, rw->GetMessageNumber(CHANNEL_NAME_1));
int MESSAGE_NUM = 1024;
for (int i = 0; i < MESSAGE_NUM; ++i) {
std::shared_ptr<RawMessage> rm(new RawMessage(STR_10B));
ASSERT_TRUE(rw->WriteMessage(CHANNEL_NAME_1, rm, TIME_1));
ASSERT_EQ(i + 1, rw->GetMessageNumber(CHANNEL_NAME_1));
}
delete rw;
// reader
RecordReader* rr = new RecordReader();
ASSERT_FALSE(rr->is_opened_);
ASSERT_EQ("", rr->file_);
ASSERT_EQ("", rr->path_);
ASSERT_EQ(nullptr, rr->file_reader_);
ASSERT_TRUE(rr->Open(TEST_FILE));
ASSERT_TRUE(rr->is_opened_);
ASSERT_EQ(TEST_FILE, rr->file_);
ASSERT_EQ(TEST_FILE, rr->path_);
ASSERT_TRUE(rr->file_reader_->ifstream_.is_open());
sleep(1);
ASSERT_EQ("", rr->CurrentMessageChannelName());
ASSERT_EQ("", rr->CurrentRawMessage()->message);
ASSERT_EQ(0, rr->CurrentMessageTime());
ASSERT_FALSE(rr->EndOfFile());
ASSERT_TRUE(rr->ReadMessage());
ASSERT_EQ(CHANNEL_NAME_1, rr->CurrentMessageChannelName());
ASSERT_EQ(STR_10B, rr->CurrentRawMessage()->message);
ASSERT_EQ(TIME_1, rr->CurrentMessageTime());
ASSERT_TRUE(rr->EndOfFile());
ASSERT_FALSE(rr->ReadMessage());
delete rr;
auto file = std::make_shared<RecordFileReader>();
file->Open(TEST_FILE);
RecordReader reader(file, 0, UINT_MAX, std::set<std::string>());
ASSERT_NE(nullptr, reader.file_reader_);
ASSERT_TRUE(reader.file_reader_->ifstream_.is_open());
RecordMessage msg;
for (int i = 0; i < MESSAGE_NUM; ++i) {
ASSERT_TRUE(reader.ReadMessage(&msg));
EXPECT_EQ(STR_10B, msg.content);
EXPECT_EQ(CHANNEL_NAME_1, msg.channel_name);
EXPECT_EQ(TIME_1, msg.time);
}
RecordReader reader_2(file, 0, UINT_MAX, std::set<std::string>());
for (int i = 0; i < MESSAGE_NUM; ++i) {
ASSERT_TRUE(reader_2.ReadMessage(&msg));
EXPECT_EQ(STR_10B, msg.content);
EXPECT_EQ(CHANNEL_NAME_1, msg.channel_name);
EXPECT_EQ(TIME_1, msg.time);
}
EXPECT_FALSE(reader_2.ReadMessage(&msg));
EXPECT_FALSE(reader_2.ReadMessage(&msg));
}
TEST(RecordTest, TestMutiMessageFile) {
......@@ -150,6 +149,7 @@ TEST(RecordTest, TestMutiMessageFile) {
delete rw;
// reader
/*
RecordReader* rr = new RecordReader();
ASSERT_FALSE(rr->is_opened_);
ASSERT_EQ("", rr->file_);
......@@ -181,6 +181,7 @@ TEST(RecordTest, TestMutiMessageFile) {
ASSERT_FALSE(rr->ReadMessage());
delete rr;
*/
}
} // namespace record
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册