From c9e58846902d3fbdefee1e1326abe4e45dcf479f Mon Sep 17 00:00:00 2001 From: Fla3inH0tCheet0s <46694471+Fla3inH0tCheet0s@users.noreply.github.com> Date: Fri, 11 Sep 2020 10:06:09 -0700 Subject: [PATCH] cyber_recorder file writer threading bug fix (#12377) --- cyber/record/file/record_file_base.h | 2 +- cyber/record/file/record_file_test.cc | 41 +++++----- cyber/record/file/record_file_writer.cc | 99 +++++++++---------------- cyber/record/file/record_file_writer.h | 29 +++++--- cyber/record/record_viewer_test.cc | 50 +++++++------ cyber/record/record_writer.cc | 2 + cyber/record/record_writer.h | 5 ++ tools/bazel.rc | 2 +- 8 files changed, 109 insertions(+), 121 deletions(-) diff --git a/cyber/record/file/record_file_base.h b/cyber/record/file/record_file_base.h index a6aec68fc0..fe227c2376 100644 --- a/cyber/record/file/record_file_base.h +++ b/cyber/record/file/record_file_base.h @@ -45,7 +45,7 @@ class RecordFileBase { std::string path_; proto::Header header_; proto::Index index_; - int fd_; + int fd_ = -1; }; } // namespace record diff --git a/cyber/record/file/record_file_test.cc b/cyber/record/file/record_file_test.cc index eecf84a640..6cfd3dee5e 100644 --- a/cyber/record/file/record_file_test.cc +++ b/cyber/record/file/record_file_test.cc @@ -222,56 +222,56 @@ TEST(RecordFileTest, TestOneChunkFile) { TEST(RecordFileTest, TestIndex) { { - RecordFileWriter* rfw = new RecordFileWriter(); + RecordFileWriter rfw; - ASSERT_TRUE(rfw->Open(kTestFile2)); - ASSERT_EQ(kTestFile2, rfw->GetPath()); + ASSERT_TRUE(rfw.Open(kTestFile2)); + ASSERT_EQ(kTestFile2, rfw.GetPath()); Header header = HeaderBuilder::GetHeaderWithChunkParams(0, 0); header.set_segment_interval(0); header.set_segment_raw_size(0); - ASSERT_TRUE(rfw->WriteHeader(header)); - ASSERT_FALSE(rfw->GetHeader().is_complete()); + ASSERT_TRUE(rfw.WriteHeader(header)); + ASSERT_FALSE(rfw.GetHeader().is_complete()); Channel chan1; chan1.set_name(kChan1); chan1.set_message_type(kMsgType); chan1.set_proto_desc(kStr10B); - ASSERT_TRUE(rfw->WriteChannel(chan1)); + ASSERT_TRUE(rfw.WriteChannel(chan1)); Channel chan2; chan2.set_name(kChan2); chan2.set_message_type(kMsgType); chan2.set_proto_desc(kStr10B); - ASSERT_TRUE(rfw->WriteChannel(chan2)); + ASSERT_TRUE(rfw.WriteChannel(chan2)); SingleMessage msg1; msg1.set_channel_name(chan1.name()); msg1.set_content(kStr10B); msg1.set_time(1e9); - ASSERT_TRUE(rfw->WriteMessage(msg1)); - ASSERT_EQ(1, rfw->GetMessageNumber(chan1.name())); + ASSERT_TRUE(rfw.WriteMessage(msg1)); + ASSERT_EQ(1, rfw.GetMessageNumber(chan1.name())); SingleMessage msg2; msg2.set_channel_name(chan2.name()); msg2.set_content(kStr10B); msg2.set_time(2e9); - ASSERT_TRUE(rfw->WriteMessage(msg2)); - ASSERT_EQ(1, rfw->GetMessageNumber(chan2.name())); + ASSERT_TRUE(rfw.WriteMessage(msg2)); + ASSERT_EQ(1, rfw.GetMessageNumber(chan2.name())); SingleMessage msg3; msg3.set_channel_name(chan1.name()); msg3.set_content(kStr10B); msg3.set_time(3e9); - ASSERT_TRUE(rfw->WriteMessage(msg3)); - ASSERT_EQ(2, rfw->GetMessageNumber(chan1.name())); - - rfw->Close(); - ASSERT_TRUE(rfw->GetHeader().is_complete()); - ASSERT_EQ(1, rfw->GetHeader().chunk_number()); - ASSERT_EQ(1e9, rfw->GetHeader().begin_time()); - ASSERT_EQ(3e9, rfw->GetHeader().end_time()); - ASSERT_EQ(3, rfw->GetHeader().message_number()); + ASSERT_TRUE(rfw.WriteMessage(msg3)); + ASSERT_EQ(2, rfw.GetMessageNumber(chan1.name())); + + rfw.Close(); + ASSERT_TRUE(rfw.GetHeader().is_complete()); + ASSERT_EQ(1, rfw.GetHeader().chunk_number()); + ASSERT_EQ(1e9, rfw.GetHeader().begin_time()); + ASSERT_EQ(3e9, rfw.GetHeader().end_time()); + ASSERT_EQ(3, rfw.GetHeader().message_number()); } { RecordFileReader reader; @@ -304,6 +304,7 @@ TEST(RecordFileTest, TestIndex) { } } } + ASSERT_FALSE(remove(kTestFile2)); } } // namespace record diff --git a/cyber/record/file/record_file_writer.cc b/cyber/record/file/record_file_writer.cc index 5b7163ba4b..18f0b8013c 100644 --- a/cyber/record/file/record_file_writer.cc +++ b/cyber/record/file/record_file_writer.cc @@ -35,8 +35,6 @@ using apollo::cyber::proto::Header; using apollo::cyber::proto::SectionType; using apollo::cyber::proto::SingleIndex; -RecordFileWriter::RecordFileWriter() {} - RecordFileWriter::~RecordFileWriter() { Close(); } bool RecordFileWriter::Open(const std::string& path) { @@ -52,57 +50,31 @@ bool RecordFileWriter::Open(const std::string& path) { << ", errno: " << errno; return false; } - chunk_active_.reset(new Chunk()); - chunk_flush_.reset(new Chunk()); - is_writing_ = true; - flush_thread_ = std::make_shared([this]() { this->Flush(); }); - if (flush_thread_ == nullptr) { - AERROR << "Init flush thread error."; - return false; - } + chunk_active_ = std::make_unique(); return true; } void RecordFileWriter::Close() { - if (is_writing_) { - // wait for the flush operation that may exist now - while (!chunk_flush_->empty()) { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - - // last swap - { - std::unique_lock flush_lock(flush_mutex_); - chunk_flush_.swap(chunk_active_); - flush_cv_.notify_one(); - } - - // wait for the last flush operation - while (!chunk_flush_->empty()) { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - - is_writing_ = false; - flush_cv_.notify_all(); - if (flush_thread_ && flush_thread_->joinable()) { - flush_thread_->join(); - flush_thread_ = nullptr; - } + if (fd_ < 0) { + return; + } + flush_task_.wait(); + Flush(*chunk_active_); - if (!WriteIndex()) { - AERROR << "Write index section failed, file: " << path_; - } + if (!WriteIndex()) { + AERROR << "Write index section failed, file: " << path_; + } - header_.set_is_complete(true); - if (!WriteHeader(header_)) { - AERROR << "Overwrite header section failed, file: " << path_; - } + header_.set_is_complete(true); + if (!WriteHeader(header_)) { + AERROR << "Overwrite header section failed, file: " << path_; + } - if (close(fd_) < 0) { - AERROR << "Close file failed, file: " << path_ << ", fd: " << fd_ - << ", errno: " << errno; - } + if (close(fd_) < 0) { + AERROR << "Close file failed, file: " << path_ << ", fd: " << fd_ + << ", errno: " << errno; } + fd_ = -1; } bool RecordFileWriter::WriteHeader(const Header& header) { @@ -196,6 +168,7 @@ bool RecordFileWriter::WriteChunk(const ChunkHeader& chunk_header, } bool RecordFileWriter::WriteMessage(const proto::SingleMessage& message) { + CHECK_GE(fd_, 0) << "First, call Open"; chunk_active_->add(message); auto it = channel_message_number_map_.find(message.channel_name()); if (it != channel_message_number_map_.end()) { @@ -217,32 +190,28 @@ bool RecordFileWriter::WriteMessage(const proto::SingleMessage& message) { if (!need_flush) { return true; } - { - std::unique_lock flush_lock(flush_mutex_); - chunk_flush_.swap(chunk_active_); - flush_cv_.notify_one(); - } + + ACHECK(flush_task_.wait_for(std::chrono::milliseconds(0)) == + std::future_status::ready) + << "Flushing didn't finish. Either the hardware cannot keep up or the " + "flush rate is too fast."; + + flush_task_ = std::async( + std::launch::async, + [this, chunk = std::move(chunk_active_)]() { this->Flush(*chunk); }); + chunk_active_ = std::make_unique(); + return true; } -void RecordFileWriter::Flush() { - while (is_writing_) { - std::unique_lock flush_lock(flush_mutex_); - flush_cv_.wait(flush_lock, - [this] { return !chunk_flush_->empty() || !is_writing_; }); - if (!is_writing_) { - break; - } - if (chunk_flush_->empty()) { - continue; - } - if (!WriteChunk(chunk_flush_->header_, *(chunk_flush_->body_.get()))) { - AERROR << "Write chunk fail."; - } - chunk_flush_->clear(); +void RecordFileWriter::Flush(const Chunk& chunk) { + if (!WriteChunk(chunk.header_, *(chunk.body_.get()))) { + AERROR << "Write chunk fail."; } } +void RecordFileWriter::WaitForWrite() { flush_task_.wait(); } + uint64_t RecordFileWriter::GetMessageNumber( const std::string& channel_name) const { auto search = channel_message_number_map_.find(channel_name); diff --git a/cyber/record/file/record_file_writer.h b/cyber/record/file/record_file_writer.h index f41586578b..5b59ee688f 100644 --- a/cyber/record/file/record_file_writer.h +++ b/cyber/record/file/record_file_writer.h @@ -19,9 +19,9 @@ #include #include +#include #include #include -#include #include #include #include @@ -74,10 +74,13 @@ struct Chunk { std::unique_ptr body_ = nullptr; }; +/** +Writes cyber record files on an asynchronous task +*/ class RecordFileWriter : public RecordFileBase { public: - RecordFileWriter(); - virtual ~RecordFileWriter(); + RecordFileWriter() = default; + ~RecordFileWriter(); bool Open(const std::string& path) override; void Close() override; bool WriteHeader(const proto::Header& header); @@ -85,19 +88,22 @@ class RecordFileWriter : public RecordFileBase { bool WriteMessage(const proto::SingleMessage& message); uint64_t GetMessageNumber(const std::string& channel_name) const; + // For testing + void WaitForWrite(); + private: bool WriteChunk(const proto::ChunkHeader& chunk_header, const proto::ChunkBody& chunk_body); template bool WriteSection(const T& message); bool WriteIndex(); - void Flush(); - bool is_writing_ = false; - std::unique_ptr chunk_active_ = nullptr; - std::unique_ptr chunk_flush_ = nullptr; - std::shared_ptr flush_thread_ = nullptr; - std::mutex flush_mutex_; - std::condition_variable flush_cv_; + void Flush(const Chunk& chunk); + bool IsChunkFlushEmpty(); + void BlockUntilSpaceAvailable(); + // make moveable + std::unique_ptr chunk_active_; + // Initialize with a dummy value to simplify checking later + std::future flush_task_ = std::async(std::launch::async, []() {}); std::unordered_map channel_message_number_map_; }; @@ -125,7 +131,8 @@ bool RecordFileWriter::WriteSection(const T& message) { Section section; /// zero out whole struct even if padded memset(§ion, 0, sizeof(section)); - section = {type, static_cast(message.ByteSizeLong())}; + section.type = type; + section.size = static_cast(message.ByteSizeLong()); ssize_t count = write(fd_, §ion, sizeof(section)); if (count < 0) { AERROR << "Write fd failed, fd: " << fd_ << ", errno: " << errno; diff --git a/cyber/record/record_viewer_test.cc b/cyber/record/record_viewer_test.cc index 1a122b25b3..295efb3287 100644 --- a/cyber/record/record_viewer_test.cc +++ b/cyber/record/record_viewer_test.cc @@ -25,6 +25,7 @@ #include "gtest/gtest.h" +#include "cyber/common/file.h" #include "cyber/common/log.h" #include "cyber/record/record_reader.h" #include "cyber/record/record_writer.h" @@ -53,6 +54,9 @@ static void ConstructRecord(uint64_t msg_num, uint64_t begin_time, ai = msg_num - 1 - i; } auto msg = std::make_shared(std::to_string(ai)); + // Since writer is meant for real time operations at a set rate, + // we need to wait in the test. Otherwise, we should write synchronously + writer.WaitForWrite(); writer.WriteMessage(kChannelName1, msg, begin_time + time_step * ai); } ASSERT_EQ(msg_num, writer.GetMessageNumber(kChannelName1)); @@ -84,28 +88,28 @@ TEST(RecordTest, iterator_test) { uint64_t i = 0; for (auto& msg : viewer) { - EXPECT_EQ(kChannelName1, msg.channel_name); - EXPECT_EQ(begin_time + step_time * i, msg.time); - EXPECT_EQ(std::to_string(i), msg.content); - i++; + ASSERT_EQ(kChannelName1, msg.channel_name); + ASSERT_EQ(begin_time + step_time * i, msg.time); + ASSERT_EQ(std::to_string(i), msg.content); + ++i; } EXPECT_EQ(msg_num, i); i = 0; std::for_each(viewer.begin(), viewer.end(), [&i](RecordMessage& msg) { - EXPECT_EQ(kChannelName1, msg.channel_name); + ASSERT_EQ(kChannelName1, msg.channel_name); // EXPECT_EQ(begin_time + step_time * i, msg.time); - EXPECT_EQ(std::to_string(i), msg.content); - i++; + ASSERT_EQ(std::to_string(i), msg.content); + ++i; }); EXPECT_EQ(msg_num, i); i = 0; for (auto it = viewer.begin(); it != viewer.end(); ++it) { - EXPECT_EQ(kChannelName1, it->channel_name); - EXPECT_EQ(begin_time + step_time * i, it->time); - EXPECT_EQ(std::to_string(i), it->content); - i++; + ASSERT_EQ(kChannelName1, it->channel_name); + ASSERT_EQ(begin_time + step_time * i, it->time); + ASSERT_EQ(std::to_string(i), it->content); + ++i; } EXPECT_EQ(msg_num, i); ASSERT_FALSE(remove(kTestFile)); @@ -129,19 +133,19 @@ TEST(RecordTest, iterator_test_reverse) { uint64_t i = 0; for (auto& msg : viewer) { - EXPECT_EQ(kChannelName1, msg.channel_name); - EXPECT_EQ(begin_time + step_time * i, msg.time); - EXPECT_EQ(std::to_string(i), msg.content); - i++; + ASSERT_EQ(kChannelName1, msg.channel_name); + ASSERT_EQ(begin_time + step_time * i, msg.time); + ASSERT_EQ(std::to_string(i), msg.content); + ++i; } EXPECT_EQ(msg_num, i); i = 0; for (auto it = viewer.begin(); it != viewer.end(); ++it) { - EXPECT_EQ(kChannelName1, it->channel_name); - EXPECT_EQ(begin_time + step_time * i, it->time); - EXPECT_EQ(std::to_string(i), it->content); - i++; + ASSERT_EQ(kChannelName1, it->channel_name); + ASSERT_EQ(begin_time + step_time * i, it->time); + ASSERT_EQ(std::to_string(i), it->content); + ++i; } EXPECT_EQ(msg_num, i); ASSERT_FALSE(remove(kTestFile)); @@ -215,10 +219,10 @@ TEST(RecordTest, mult_iterator_test) { uint64_t i = 0; for (auto& msg : viewer) { // #2 iterator - EXPECT_EQ(kChannelName1, msg.channel_name); - EXPECT_EQ(begin_time + step_time * i, msg.time); - EXPECT_EQ(std::to_string(i), msg.content); - i++; + ASSERT_EQ(kChannelName1, msg.channel_name); + ASSERT_EQ(begin_time + step_time * i, msg.time); + ASSERT_EQ(std::to_string(i), msg.content); + ++i; } EXPECT_EQ(msg_num, i); ASSERT_FALSE(remove(kTestFile)); diff --git a/cyber/record/record_writer.cc b/cyber/record/record_writer.cc index 0a284a75ca..ac2046809b 100644 --- a/cyber/record/record_writer.cc +++ b/cyber/record/record_writer.cc @@ -180,6 +180,8 @@ bool RecordWriter::IsNewChannel(const std::string& channel_name) const { channel_message_number_map_.end(); } +void RecordWriter::WaitForWrite() { file_writer_->WaitForWrite(); } + void RecordWriter::OnNewChannel(const std::string& channel_name, const std::string& message_type, const std::string& proto_desc) { diff --git a/cyber/record/record_writer.h b/cyber/record/record_writer.h index 4c8dc269df..fab6fdf365 100644 --- a/cyber/record/record_writer.h +++ b/cyber/record/record_writer.h @@ -170,6 +170,11 @@ class RecordWriter : public RecordBase { */ bool IsNewChannel(const std::string& channel_name) const; + /** + * @brief Meant for testing + */ + void WaitForWrite(); + private: bool WriteMessage(const proto::SingleMessage& single_msg); bool SplitOutfile(); diff --git a/tools/bazel.rc b/tools/bazel.rc index 4b5cc860b2..0fe288f444 100644 --- a/tools/bazel.rc +++ b/tools/bazel.rc @@ -33,7 +33,7 @@ build --cxxopt="-fdiagnostics-color=always" build --per_file_copt=external/upb/.*@-Wno-sign-compare build --copt="-Werror=sign-compare" -build --copt="-Werror=return-type" +build --per_file_copt=^modules/.*\.cc,^cyber/.*\.cc,@"-Werror=return-type" build --copt="-Werror=unused-variable" build --copt="-Werror=unused-but-set-variable" build --copt="-Werror=switch" -- GitLab