未验证 提交 c9e58846 编写于 作者: F Fla3inH0tCheet0s 提交者: GitHub

cyber_recorder file writer threading bug fix (#12377)

上级 50f49272
...@@ -45,7 +45,7 @@ class RecordFileBase { ...@@ -45,7 +45,7 @@ class RecordFileBase {
std::string path_; std::string path_;
proto::Header header_; proto::Header header_;
proto::Index index_; proto::Index index_;
int fd_; int fd_ = -1;
}; };
} // namespace record } // namespace record
......
...@@ -222,56 +222,56 @@ TEST(RecordFileTest, TestOneChunkFile) { ...@@ -222,56 +222,56 @@ TEST(RecordFileTest, TestOneChunkFile) {
TEST(RecordFileTest, TestIndex) { TEST(RecordFileTest, TestIndex) {
{ {
RecordFileWriter* rfw = new RecordFileWriter(); RecordFileWriter rfw;
ASSERT_TRUE(rfw->Open(kTestFile2)); ASSERT_TRUE(rfw.Open(kTestFile2));
ASSERT_EQ(kTestFile2, rfw->GetPath()); ASSERT_EQ(kTestFile2, rfw.GetPath());
Header header = HeaderBuilder::GetHeaderWithChunkParams(0, 0); Header header = HeaderBuilder::GetHeaderWithChunkParams(0, 0);
header.set_segment_interval(0); header.set_segment_interval(0);
header.set_segment_raw_size(0); header.set_segment_raw_size(0);
ASSERT_TRUE(rfw->WriteHeader(header)); ASSERT_TRUE(rfw.WriteHeader(header));
ASSERT_FALSE(rfw->GetHeader().is_complete()); ASSERT_FALSE(rfw.GetHeader().is_complete());
Channel chan1; Channel chan1;
chan1.set_name(kChan1); chan1.set_name(kChan1);
chan1.set_message_type(kMsgType); chan1.set_message_type(kMsgType);
chan1.set_proto_desc(kStr10B); chan1.set_proto_desc(kStr10B);
ASSERT_TRUE(rfw->WriteChannel(chan1)); ASSERT_TRUE(rfw.WriteChannel(chan1));
Channel chan2; Channel chan2;
chan2.set_name(kChan2); chan2.set_name(kChan2);
chan2.set_message_type(kMsgType); chan2.set_message_type(kMsgType);
chan2.set_proto_desc(kStr10B); chan2.set_proto_desc(kStr10B);
ASSERT_TRUE(rfw->WriteChannel(chan2)); ASSERT_TRUE(rfw.WriteChannel(chan2));
SingleMessage msg1; SingleMessage msg1;
msg1.set_channel_name(chan1.name()); msg1.set_channel_name(chan1.name());
msg1.set_content(kStr10B); msg1.set_content(kStr10B);
msg1.set_time(1e9); msg1.set_time(1e9);
ASSERT_TRUE(rfw->WriteMessage(msg1)); ASSERT_TRUE(rfw.WriteMessage(msg1));
ASSERT_EQ(1, rfw->GetMessageNumber(chan1.name())); ASSERT_EQ(1, rfw.GetMessageNumber(chan1.name()));
SingleMessage msg2; SingleMessage msg2;
msg2.set_channel_name(chan2.name()); msg2.set_channel_name(chan2.name());
msg2.set_content(kStr10B); msg2.set_content(kStr10B);
msg2.set_time(2e9); msg2.set_time(2e9);
ASSERT_TRUE(rfw->WriteMessage(msg2)); ASSERT_TRUE(rfw.WriteMessage(msg2));
ASSERT_EQ(1, rfw->GetMessageNumber(chan2.name())); ASSERT_EQ(1, rfw.GetMessageNumber(chan2.name()));
SingleMessage msg3; SingleMessage msg3;
msg3.set_channel_name(chan1.name()); msg3.set_channel_name(chan1.name());
msg3.set_content(kStr10B); msg3.set_content(kStr10B);
msg3.set_time(3e9); msg3.set_time(3e9);
ASSERT_TRUE(rfw->WriteMessage(msg3)); ASSERT_TRUE(rfw.WriteMessage(msg3));
ASSERT_EQ(2, rfw->GetMessageNumber(chan1.name())); ASSERT_EQ(2, rfw.GetMessageNumber(chan1.name()));
rfw->Close(); rfw.Close();
ASSERT_TRUE(rfw->GetHeader().is_complete()); ASSERT_TRUE(rfw.GetHeader().is_complete());
ASSERT_EQ(1, rfw->GetHeader().chunk_number()); ASSERT_EQ(1, rfw.GetHeader().chunk_number());
ASSERT_EQ(1e9, rfw->GetHeader().begin_time()); ASSERT_EQ(1e9, rfw.GetHeader().begin_time());
ASSERT_EQ(3e9, rfw->GetHeader().end_time()); ASSERT_EQ(3e9, rfw.GetHeader().end_time());
ASSERT_EQ(3, rfw->GetHeader().message_number()); ASSERT_EQ(3, rfw.GetHeader().message_number());
} }
{ {
RecordFileReader reader; RecordFileReader reader;
...@@ -304,6 +304,7 @@ TEST(RecordFileTest, TestIndex) { ...@@ -304,6 +304,7 @@ TEST(RecordFileTest, TestIndex) {
} }
} }
} }
ASSERT_FALSE(remove(kTestFile2));
} }
} // namespace record } // namespace record
......
...@@ -35,8 +35,6 @@ using apollo::cyber::proto::Header; ...@@ -35,8 +35,6 @@ using apollo::cyber::proto::Header;
using apollo::cyber::proto::SectionType; using apollo::cyber::proto::SectionType;
using apollo::cyber::proto::SingleIndex; using apollo::cyber::proto::SingleIndex;
RecordFileWriter::RecordFileWriter() {}
RecordFileWriter::~RecordFileWriter() { Close(); } RecordFileWriter::~RecordFileWriter() { Close(); }
bool RecordFileWriter::Open(const std::string& path) { bool RecordFileWriter::Open(const std::string& path) {
...@@ -52,57 +50,31 @@ bool RecordFileWriter::Open(const std::string& path) { ...@@ -52,57 +50,31 @@ bool RecordFileWriter::Open(const std::string& path) {
<< ", errno: " << errno; << ", errno: " << errno;
return false; return false;
} }
chunk_active_.reset(new Chunk()); chunk_active_ = std::make_unique<Chunk>();
chunk_flush_.reset(new Chunk());
is_writing_ = true;
flush_thread_ = std::make_shared<std::thread>([this]() { this->Flush(); });
if (flush_thread_ == nullptr) {
AERROR << "Init flush thread error.";
return false;
}
return true; return true;
} }
void RecordFileWriter::Close() { void RecordFileWriter::Close() {
if (is_writing_) { if (fd_ < 0) {
// wait for the flush operation that may exist now return;
while (!chunk_flush_->empty()) { }
std::this_thread::sleep_for(std::chrono::milliseconds(100)); flush_task_.wait();
} Flush(*chunk_active_);
// last swap
{
std::unique_lock<std::mutex> flush_lock(flush_mutex_);
chunk_flush_.swap(chunk_active_);
flush_cv_.notify_one();
}
// wait for the last flush operation
while (!chunk_flush_->empty()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
is_writing_ = false;
flush_cv_.notify_all();
if (flush_thread_ && flush_thread_->joinable()) {
flush_thread_->join();
flush_thread_ = nullptr;
}
if (!WriteIndex()) { if (!WriteIndex()) {
AERROR << "Write index section failed, file: " << path_; AERROR << "Write index section failed, file: " << path_;
} }
header_.set_is_complete(true); header_.set_is_complete(true);
if (!WriteHeader(header_)) { if (!WriteHeader(header_)) {
AERROR << "Overwrite header section failed, file: " << path_; AERROR << "Overwrite header section failed, file: " << path_;
} }
if (close(fd_) < 0) { if (close(fd_) < 0) {
AERROR << "Close file failed, file: " << path_ << ", fd: " << fd_ AERROR << "Close file failed, file: " << path_ << ", fd: " << fd_
<< ", errno: " << errno; << ", errno: " << errno;
}
} }
fd_ = -1;
} }
bool RecordFileWriter::WriteHeader(const Header& header) { bool RecordFileWriter::WriteHeader(const Header& header) {
...@@ -196,6 +168,7 @@ bool RecordFileWriter::WriteChunk(const ChunkHeader& chunk_header, ...@@ -196,6 +168,7 @@ bool RecordFileWriter::WriteChunk(const ChunkHeader& chunk_header,
} }
bool RecordFileWriter::WriteMessage(const proto::SingleMessage& message) { bool RecordFileWriter::WriteMessage(const proto::SingleMessage& message) {
CHECK_GE(fd_, 0) << "First, call Open";
chunk_active_->add(message); chunk_active_->add(message);
auto it = channel_message_number_map_.find(message.channel_name()); auto it = channel_message_number_map_.find(message.channel_name());
if (it != channel_message_number_map_.end()) { if (it != channel_message_number_map_.end()) {
...@@ -217,32 +190,28 @@ bool RecordFileWriter::WriteMessage(const proto::SingleMessage& message) { ...@@ -217,32 +190,28 @@ bool RecordFileWriter::WriteMessage(const proto::SingleMessage& message) {
if (!need_flush) { if (!need_flush) {
return true; return true;
} }
{
std::unique_lock<std::mutex> flush_lock(flush_mutex_); ACHECK(flush_task_.wait_for(std::chrono::milliseconds(0)) ==
chunk_flush_.swap(chunk_active_); std::future_status::ready)
flush_cv_.notify_one(); << "Flushing didn't finish. Either the hardware cannot keep up or the "
} "flush rate is too fast.";
flush_task_ = std::async(
std::launch::async,
[this, chunk = std::move(chunk_active_)]() { this->Flush(*chunk); });
chunk_active_ = std::make_unique<Chunk>();
return true; return true;
} }
void RecordFileWriter::Flush() { void RecordFileWriter::Flush(const Chunk& chunk) {
while (is_writing_) { if (!WriteChunk(chunk.header_, *(chunk.body_.get()))) {
std::unique_lock<std::mutex> flush_lock(flush_mutex_); AERROR << "Write chunk fail.";
flush_cv_.wait(flush_lock,
[this] { return !chunk_flush_->empty() || !is_writing_; });
if (!is_writing_) {
break;
}
if (chunk_flush_->empty()) {
continue;
}
if (!WriteChunk(chunk_flush_->header_, *(chunk_flush_->body_.get()))) {
AERROR << "Write chunk fail.";
}
chunk_flush_->clear();
} }
} }
void RecordFileWriter::WaitForWrite() { flush_task_.wait(); }
uint64_t RecordFileWriter::GetMessageNumber( uint64_t RecordFileWriter::GetMessageNumber(
const std::string& channel_name) const { const std::string& channel_name) const {
auto search = channel_message_number_map_.find(channel_name); auto search = channel_message_number_map_.find(channel_name);
......
...@@ -19,9 +19,9 @@ ...@@ -19,9 +19,9 @@
#include <condition_variable> #include <condition_variable>
#include <fstream> #include <fstream>
#include <future>
#include <memory> #include <memory>
#include <string> #include <string>
#include <thread>
#include <type_traits> #include <type_traits>
#include <unordered_map> #include <unordered_map>
#include <utility> #include <utility>
...@@ -74,10 +74,13 @@ struct Chunk { ...@@ -74,10 +74,13 @@ struct Chunk {
std::unique_ptr<proto::ChunkBody> body_ = nullptr; std::unique_ptr<proto::ChunkBody> body_ = nullptr;
}; };
/**
Writes cyber record files on an asynchronous task
*/
class RecordFileWriter : public RecordFileBase { class RecordFileWriter : public RecordFileBase {
public: public:
RecordFileWriter(); RecordFileWriter() = default;
virtual ~RecordFileWriter(); ~RecordFileWriter();
bool Open(const std::string& path) override; bool Open(const std::string& path) override;
void Close() override; void Close() override;
bool WriteHeader(const proto::Header& header); bool WriteHeader(const proto::Header& header);
...@@ -85,19 +88,22 @@ class RecordFileWriter : public RecordFileBase { ...@@ -85,19 +88,22 @@ class RecordFileWriter : public RecordFileBase {
bool WriteMessage(const proto::SingleMessage& message); bool WriteMessage(const proto::SingleMessage& message);
uint64_t GetMessageNumber(const std::string& channel_name) const; uint64_t GetMessageNumber(const std::string& channel_name) const;
// For testing
void WaitForWrite();
private: private:
bool WriteChunk(const proto::ChunkHeader& chunk_header, bool WriteChunk(const proto::ChunkHeader& chunk_header,
const proto::ChunkBody& chunk_body); const proto::ChunkBody& chunk_body);
template <typename T> template <typename T>
bool WriteSection(const T& message); bool WriteSection(const T& message);
bool WriteIndex(); bool WriteIndex();
void Flush(); void Flush(const Chunk& chunk);
bool is_writing_ = false; bool IsChunkFlushEmpty();
std::unique_ptr<Chunk> chunk_active_ = nullptr; void BlockUntilSpaceAvailable();
std::unique_ptr<Chunk> chunk_flush_ = nullptr; // make moveable
std::shared_ptr<std::thread> flush_thread_ = nullptr; std::unique_ptr<Chunk> chunk_active_;
std::mutex flush_mutex_; // Initialize with a dummy value to simplify checking later
std::condition_variable flush_cv_; std::future<void> flush_task_ = std::async(std::launch::async, []() {});
std::unordered_map<std::string, uint64_t> channel_message_number_map_; std::unordered_map<std::string, uint64_t> channel_message_number_map_;
}; };
...@@ -125,7 +131,8 @@ bool RecordFileWriter::WriteSection(const T& message) { ...@@ -125,7 +131,8 @@ bool RecordFileWriter::WriteSection(const T& message) {
Section section; Section section;
/// zero out whole struct even if padded /// zero out whole struct even if padded
memset(&section, 0, sizeof(section)); memset(&section, 0, sizeof(section));
section = {type, static_cast<int64_t>(message.ByteSizeLong())}; section.type = type;
section.size = static_cast<int64_t>(message.ByteSizeLong());
ssize_t count = write(fd_, &section, sizeof(section)); ssize_t count = write(fd_, &section, sizeof(section));
if (count < 0) { if (count < 0) {
AERROR << "Write fd failed, fd: " << fd_ << ", errno: " << errno; AERROR << "Write fd failed, fd: " << fd_ << ", errno: " << errno;
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "cyber/common/file.h"
#include "cyber/common/log.h" #include "cyber/common/log.h"
#include "cyber/record/record_reader.h" #include "cyber/record/record_reader.h"
#include "cyber/record/record_writer.h" #include "cyber/record/record_writer.h"
...@@ -53,6 +54,9 @@ static void ConstructRecord(uint64_t msg_num, uint64_t begin_time, ...@@ -53,6 +54,9 @@ static void ConstructRecord(uint64_t msg_num, uint64_t begin_time,
ai = msg_num - 1 - i; ai = msg_num - 1 - i;
} }
auto msg = std::make_shared<RawMessage>(std::to_string(ai)); auto msg = std::make_shared<RawMessage>(std::to_string(ai));
// Since writer is meant for real time operations at a set rate,
// we need to wait in the test. Otherwise, we should write synchronously
writer.WaitForWrite();
writer.WriteMessage(kChannelName1, msg, begin_time + time_step * ai); writer.WriteMessage(kChannelName1, msg, begin_time + time_step * ai);
} }
ASSERT_EQ(msg_num, writer.GetMessageNumber(kChannelName1)); ASSERT_EQ(msg_num, writer.GetMessageNumber(kChannelName1));
...@@ -84,28 +88,28 @@ TEST(RecordTest, iterator_test) { ...@@ -84,28 +88,28 @@ TEST(RecordTest, iterator_test) {
uint64_t i = 0; uint64_t i = 0;
for (auto& msg : viewer) { for (auto& msg : viewer) {
EXPECT_EQ(kChannelName1, msg.channel_name); ASSERT_EQ(kChannelName1, msg.channel_name);
EXPECT_EQ(begin_time + step_time * i, msg.time); ASSERT_EQ(begin_time + step_time * i, msg.time);
EXPECT_EQ(std::to_string(i), msg.content); ASSERT_EQ(std::to_string(i), msg.content);
i++; ++i;
} }
EXPECT_EQ(msg_num, i); EXPECT_EQ(msg_num, i);
i = 0; i = 0;
std::for_each(viewer.begin(), viewer.end(), [&i](RecordMessage& msg) { std::for_each(viewer.begin(), viewer.end(), [&i](RecordMessage& msg) {
EXPECT_EQ(kChannelName1, msg.channel_name); ASSERT_EQ(kChannelName1, msg.channel_name);
// EXPECT_EQ(begin_time + step_time * i, msg.time); // EXPECT_EQ(begin_time + step_time * i, msg.time);
EXPECT_EQ(std::to_string(i), msg.content); ASSERT_EQ(std::to_string(i), msg.content);
i++; ++i;
}); });
EXPECT_EQ(msg_num, i); EXPECT_EQ(msg_num, i);
i = 0; i = 0;
for (auto it = viewer.begin(); it != viewer.end(); ++it) { for (auto it = viewer.begin(); it != viewer.end(); ++it) {
EXPECT_EQ(kChannelName1, it->channel_name); ASSERT_EQ(kChannelName1, it->channel_name);
EXPECT_EQ(begin_time + step_time * i, it->time); ASSERT_EQ(begin_time + step_time * i, it->time);
EXPECT_EQ(std::to_string(i), it->content); ASSERT_EQ(std::to_string(i), it->content);
i++; ++i;
} }
EXPECT_EQ(msg_num, i); EXPECT_EQ(msg_num, i);
ASSERT_FALSE(remove(kTestFile)); ASSERT_FALSE(remove(kTestFile));
...@@ -129,19 +133,19 @@ TEST(RecordTest, iterator_test_reverse) { ...@@ -129,19 +133,19 @@ TEST(RecordTest, iterator_test_reverse) {
uint64_t i = 0; uint64_t i = 0;
for (auto& msg : viewer) { for (auto& msg : viewer) {
EXPECT_EQ(kChannelName1, msg.channel_name); ASSERT_EQ(kChannelName1, msg.channel_name);
EXPECT_EQ(begin_time + step_time * i, msg.time); ASSERT_EQ(begin_time + step_time * i, msg.time);
EXPECT_EQ(std::to_string(i), msg.content); ASSERT_EQ(std::to_string(i), msg.content);
i++; ++i;
} }
EXPECT_EQ(msg_num, i); EXPECT_EQ(msg_num, i);
i = 0; i = 0;
for (auto it = viewer.begin(); it != viewer.end(); ++it) { for (auto it = viewer.begin(); it != viewer.end(); ++it) {
EXPECT_EQ(kChannelName1, it->channel_name); ASSERT_EQ(kChannelName1, it->channel_name);
EXPECT_EQ(begin_time + step_time * i, it->time); ASSERT_EQ(begin_time + step_time * i, it->time);
EXPECT_EQ(std::to_string(i), it->content); ASSERT_EQ(std::to_string(i), it->content);
i++; ++i;
} }
EXPECT_EQ(msg_num, i); EXPECT_EQ(msg_num, i);
ASSERT_FALSE(remove(kTestFile)); ASSERT_FALSE(remove(kTestFile));
...@@ -215,10 +219,10 @@ TEST(RecordTest, mult_iterator_test) { ...@@ -215,10 +219,10 @@ TEST(RecordTest, mult_iterator_test) {
uint64_t i = 0; uint64_t i = 0;
for (auto& msg : viewer) { // #2 iterator for (auto& msg : viewer) { // #2 iterator
EXPECT_EQ(kChannelName1, msg.channel_name); ASSERT_EQ(kChannelName1, msg.channel_name);
EXPECT_EQ(begin_time + step_time * i, msg.time); ASSERT_EQ(begin_time + step_time * i, msg.time);
EXPECT_EQ(std::to_string(i), msg.content); ASSERT_EQ(std::to_string(i), msg.content);
i++; ++i;
} }
EXPECT_EQ(msg_num, i); EXPECT_EQ(msg_num, i);
ASSERT_FALSE(remove(kTestFile)); ASSERT_FALSE(remove(kTestFile));
......
...@@ -180,6 +180,8 @@ bool RecordWriter::IsNewChannel(const std::string& channel_name) const { ...@@ -180,6 +180,8 @@ bool RecordWriter::IsNewChannel(const std::string& channel_name) const {
channel_message_number_map_.end(); channel_message_number_map_.end();
} }
void RecordWriter::WaitForWrite() { file_writer_->WaitForWrite(); }
void RecordWriter::OnNewChannel(const std::string& channel_name, void RecordWriter::OnNewChannel(const std::string& channel_name,
const std::string& message_type, const std::string& message_type,
const std::string& proto_desc) { const std::string& proto_desc) {
......
...@@ -170,6 +170,11 @@ class RecordWriter : public RecordBase { ...@@ -170,6 +170,11 @@ class RecordWriter : public RecordBase {
*/ */
bool IsNewChannel(const std::string& channel_name) const; bool IsNewChannel(const std::string& channel_name) const;
/**
* @brief Meant for testing
*/
void WaitForWrite();
private: private:
bool WriteMessage(const proto::SingleMessage& single_msg); bool WriteMessage(const proto::SingleMessage& single_msg);
bool SplitOutfile(); bool SplitOutfile();
......
...@@ -33,7 +33,7 @@ build --cxxopt="-fdiagnostics-color=always" ...@@ -33,7 +33,7 @@ build --cxxopt="-fdiagnostics-color=always"
build --per_file_copt=external/upb/.*@-Wno-sign-compare build --per_file_copt=external/upb/.*@-Wno-sign-compare
build --copt="-Werror=sign-compare" build --copt="-Werror=sign-compare"
build --copt="-Werror=return-type" build --per_file_copt=^modules/.*\.cc,^cyber/.*\.cc,@"-Werror=return-type"
build --copt="-Werror=unused-variable" build --copt="-Werror=unused-variable"
build --copt="-Werror=unused-but-set-variable" build --copt="-Werror=unused-but-set-variable"
build --copt="-Werror=switch" build --copt="-Werror=switch"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册