提交 24eaeafa 编写于 作者: baowanyu's avatar baowanyu 提交者: GoLancer

framework: add channel_vec param in RecordReader for filter

上级 d0a845b3
......@@ -33,7 +33,7 @@ namespace apollo {
namespace cybertron {
namespace record {
static std::string g_empty_string = "";
const char g_empty_string[] = "";
class RecordBase {
public:
......
......@@ -116,7 +116,7 @@ bool RecordFileReader::ReadSection(Section* section) {
}
ifstream_.read(reinterpret_cast<char*>(section), sizeof(*section));
if (ifstream_.eof()) {
AERROR << "input file stream is eof, file: " << path_;
AINFO << "input file stream is eof, file: " << path_;
return false;
}
if (ifstream_.gcount() != sizeof(*section)) {
......
......@@ -30,11 +30,14 @@ RecordReader::~RecordReader() {
}
bool RecordReader::Open(const std::string& file, uint64_t begin_time,
uint64_t end_time) {
uint64_t end_time,
const std::vector<std::string>& channel_vec) {
file_ = file;
path_ = file_;
begin_time_ = begin_time;
end_time_ = end_time;
channel_vec_ = channel_vec;
all_channels_ = channel_vec_.empty() ? true : false;
file_reader_.reset(new RecordFileReader());
current_message_.reset(new RecordMessage());
......@@ -174,6 +177,11 @@ void RecordReader::LoadChunk(uint64_t from_time, uint64_t to_time) {
continue;
}
std::string channel_name(cbd.messages(idx).channel_name());
if (!all_channels_ &&
std::find(channel_vec_.begin(), channel_vec_.end(),
channel_name) == channel_vec_.end()) {
continue;
}
std::shared_ptr<RawMessage> raw_message(
new RawMessage(cbd.messages(idx).content()));
std::shared_ptr<RecordMessage> message_queue_item(new RecordMessage(
......
......@@ -40,11 +40,14 @@ class RecordReader : public RecordBase {
public:
RecordReader();
virtual ~RecordReader();
bool Open(const std::string& filename, uint64_t begin_time = 0,
uint64_t end_time = UINT64_MAX);
bool Open(
const std::string& filename, uint64_t begin_time = 0,
uint64_t end_time = UINT64_MAX,
const std::vector<std::string>& channel_vec = std::vector<std::string>());
void Close();
bool ReadMessage();
bool EndOfFile();
const std::string& CurrentMessageChannelName();
std::shared_ptr<RawMessage> CurrentRawMessage();
uint64_t CurrentMessageTime();
......@@ -53,6 +56,7 @@ class RecordReader : public RecordBase {
bool InitLoadThread();
void LoadChunk(uint64_t from_time, uint64_t to_time);
bool all_channels_ = false;
bool looped_readback_ = false;
bool loaded_all_message_ = false;
uint64_t begin_time_ = 0;
......@@ -61,6 +65,7 @@ class RecordReader : public RecordBase {
uint32_t min_queue_size_ = 2000;
std::unique_ptr<RecordFileReader> file_reader_ = nullptr;
std::shared_ptr<std::thread> load_thread_ = nullptr;
std::vector<std::string> channel_vec_;
struct RecordMessage {
public:
......
......@@ -30,12 +30,15 @@ namespace apollo {
namespace cybertron {
namespace record {
const char CHANNEL_NAME[] = "/test/channel1";
const char CHANNEL_NAME_1[] = "/test/channel1";
const char CHANNEL_NAME_2[] = "/test/channel2";
const char MESSAGE_TYPE[] = "apollo.cybertron.proto.Test";
const char PROTO_DESC[] = "1234567890";
const char STR_10B[] = "1234567890";
const char TEST_FILE[] = "test.record";
const uint64_t TIME = 1e9;
const uint64_t TIME_1 = 1000 * 1e6;
const uint64_t TIME_2 = 1010 * 1e6;
const uint64_t TIME_3 = 1020 * 1e6;
TEST(RecordTest, TestOneMessageFile) {
// writer
......@@ -51,14 +54,14 @@ TEST(RecordTest, TestOneMessageFile) {
ASSERT_EQ(TEST_FILE, rw->path_);
ASSERT_TRUE(rw->file_writer_->ofstream_.is_open());
ASSERT_TRUE(rw->WriteChannel(CHANNEL_NAME, MESSAGE_TYPE, PROTO_DESC));
ASSERT_EQ(0, rw->GetMessageNumber(CHANNEL_NAME));
ASSERT_EQ(MESSAGE_TYPE, rw->GetMessageType(CHANNEL_NAME));
ASSERT_EQ(PROTO_DESC, rw->GetProtoDesc(CHANNEL_NAME));
ASSERT_TRUE(rw->WriteChannel(CHANNEL_NAME_1, MESSAGE_TYPE, PROTO_DESC));
ASSERT_EQ(0, rw->GetMessageNumber(CHANNEL_NAME_1));
ASSERT_EQ(MESSAGE_TYPE, rw->GetMessageType(CHANNEL_NAME_1));
ASSERT_EQ(PROTO_DESC, rw->GetProtoDesc(CHANNEL_NAME_1));
std::shared_ptr<RawMessage> rm(new RawMessage(STR_10B));
ASSERT_TRUE(rw->WriteMessage(CHANNEL_NAME, rm, TIME));
ASSERT_EQ(1, rw->GetMessageNumber(CHANNEL_NAME));
ASSERT_TRUE(rw->WriteMessage(CHANNEL_NAME_1, rm, TIME_1));
ASSERT_EQ(1, rw->GetMessageNumber(CHANNEL_NAME_1));
delete rw;
......@@ -84,9 +87,79 @@ TEST(RecordTest, TestOneMessageFile) {
ASSERT_FALSE(rr->EndOfFile());
ASSERT_TRUE(rr->ReadMessage());
ASSERT_EQ(CHANNEL_NAME, rr->CurrentMessageChannelName());
ASSERT_EQ(CHANNEL_NAME_1, rr->CurrentMessageChannelName());
ASSERT_EQ(STR_10B, rr->CurrentRawMessage()->message);
ASSERT_EQ(TIME, rr->CurrentMessageTime());
ASSERT_EQ(TIME_1, rr->CurrentMessageTime());
ASSERT_TRUE(rr->EndOfFile());
ASSERT_FALSE(rr->ReadMessage());
delete rr;
}
TEST(RecordTest, TestMutiMessageFile) {
// writer
RecordWriter* rw = new RecordWriter();
ASSERT_FALSE(rw->is_opened_);
ASSERT_EQ("", rw->file_);
ASSERT_EQ("", rw->path_);
ASSERT_EQ(nullptr, rw->file_writer_);
ASSERT_TRUE(rw->Open(TEST_FILE));
ASSERT_TRUE(rw->is_opened_);
ASSERT_EQ(TEST_FILE, rw->file_);
ASSERT_EQ(TEST_FILE, rw->path_);
ASSERT_TRUE(rw->file_writer_->ofstream_.is_open());
ASSERT_TRUE(rw->WriteChannel(CHANNEL_NAME_1, MESSAGE_TYPE, PROTO_DESC));
ASSERT_EQ(0, rw->GetMessageNumber(CHANNEL_NAME_1));
ASSERT_EQ(MESSAGE_TYPE, rw->GetMessageType(CHANNEL_NAME_1));
ASSERT_EQ(PROTO_DESC, rw->GetProtoDesc(CHANNEL_NAME_1));
ASSERT_TRUE(rw->WriteChannel(CHANNEL_NAME_2, MESSAGE_TYPE, PROTO_DESC));
ASSERT_EQ(0, rw->GetMessageNumber(CHANNEL_NAME_2));
ASSERT_EQ(MESSAGE_TYPE, rw->GetMessageType(CHANNEL_NAME_2));
ASSERT_EQ(PROTO_DESC, rw->GetProtoDesc(CHANNEL_NAME_2));
std::shared_ptr<RawMessage> rm(new RawMessage(STR_10B));
ASSERT_TRUE(rw->WriteMessage(CHANNEL_NAME_1, rm, TIME_1));
ASSERT_EQ(1, rw->GetMessageNumber(CHANNEL_NAME_1));
ASSERT_TRUE(rw->WriteMessage(CHANNEL_NAME_2, rm, TIME_2));
ASSERT_EQ(1, rw->GetMessageNumber(CHANNEL_NAME_2));
ASSERT_TRUE(rw->WriteMessage(CHANNEL_NAME_1, rm, TIME_3));
ASSERT_EQ(2, rw->GetMessageNumber(CHANNEL_NAME_1));
delete rw;
// reader
RecordReader* rr = new RecordReader();
ASSERT_FALSE(rr->is_opened_);
ASSERT_EQ("", rr->file_);
ASSERT_EQ("", rr->path_);
ASSERT_EQ(nullptr, rr->file_reader_);
std::vector<std::string> channel_vec = std::vector<std::string>();
channel_vec.push_back(CHANNEL_NAME_2);
ASSERT_TRUE(rr->Open(TEST_FILE, TIME_1, TIME_2, channel_vec));
ASSERT_TRUE(rr->is_opened_);
ASSERT_EQ(TEST_FILE, rr->file_);
ASSERT_EQ(TEST_FILE, rr->path_);
ASSERT_TRUE(rr->file_reader_->ifstream_.is_open());
sleep(1);
ASSERT_EQ("", rr->CurrentMessageChannelName());
ASSERT_EQ("", rr->CurrentRawMessage()->message);
ASSERT_EQ(0, rr->CurrentMessageTime());
ASSERT_FALSE(rr->EndOfFile());
ASSERT_TRUE(rr->ReadMessage());
ASSERT_EQ(CHANNEL_NAME_2, rr->CurrentMessageChannelName());
ASSERT_EQ(STR_10B, rr->CurrentRawMessage()->message);
ASSERT_EQ(TIME_2, rr->CurrentMessageTime());
ASSERT_TRUE(rr->EndOfFile());
ASSERT_FALSE(rr->ReadMessage());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册