record_file_writer.cc 7.8 KB
Newer Older
1
/******************************************************************************
G
gruminions 已提交
2
 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 4 5 6 7 8 9 10 11 12 13 14 15 16
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *****************************************************************************/

17 18
#include "cyber/record/file/record_file_writer.h"

19
#include <fcntl.h>
A
Aaron Xiao 已提交
20

21
#include "cyber/common/file.h"
22
#include "cyber/time/time.h"
23 24

namespace apollo {
G
GoLancer 已提交
25
namespace cyber {
26 27 28 29 30 31 32 33 34
namespace record {

RecordFileWriter::RecordFileWriter() {}

RecordFileWriter::~RecordFileWriter() { Close(); }

bool RecordFileWriter::Open(const std::string& path) {
  std::lock_guard<std::mutex> lock(mutex_);
  path_ = path;
35 36 37 38 39 40 41 42
  if (::apollo::cyber::common::PathExists(path_)) {
    AWARN << "File exist and overwrite, file: " << path_;
  }
  fd_ = open(path_.data(), O_CREAT | O_WRONLY,
             S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
  if (fd_ < 0) {
    AERROR << "Open file failed, file: " << path_ << ", fd: " << fd_
           << ", errno: " << errno;
43 44 45 46 47 48 49
    return false;
  }
  chunk_active_.reset(new Chunk());
  chunk_flush_.reset(new Chunk());
  is_writing_ = true;
  flush_thread_ = std::make_shared<std::thread>([this]() { this->Flush(); });
  if (flush_thread_ == nullptr) {
50
    AERROR << "Init flush thread error.";
51 52 53 54 55 56 57
    return false;
  }
  return true;
}

void RecordFileWriter::Close() {
  if (is_writing_) {
58 59 60 61 62
    // wait for the flush operation that may exist now
    while (!chunk_flush_->empty()) {
      std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }

63
    // last swap
64 65 66 67 68
    {
      std::unique_lock<std::mutex> flush_lock(flush_mutex_);
      chunk_flush_.swap(chunk_active_);
      flush_cv_.notify_one();
    }
69

A
Aaron Xiao 已提交
70
    // wait for the last flush operation
71 72 73
    while (!chunk_flush_->empty()) {
      std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
74

75 76
    is_writing_ = false;
    flush_cv_.notify_all();
77 78 79 80
    if (flush_thread_ && flush_thread_->joinable()) {
      flush_thread_->join();
      flush_thread_ = nullptr;
    }
81

82 83 84
    if (!WriteIndex()) {
      AERROR << "Write index section failed, file: " << path_;
    }
85 86

    header_.set_is_complete(true);
87 88 89 90 91 92 93
    if (!WriteHeader(header_)) {
      AERROR << "Overwrite header section failed, file: " << path_;
    }

    if (close(fd_) < 0) {
      AERROR << "Close file failed, file: " << path_ << ", fd: " << fd_
             << ", errno: " << errno;
94
      return;
95
    }
96
  }
97
}
98

99 100 101 102 103 104 105 106
bool RecordFileWriter::WriteHeader(const Header& header) {
  std::lock_guard<std::mutex> lock(mutex_);
  header_ = header;
  if (!WriteSection<Header>(header_)) {
    AERROR << "Write header section fail";
    return false;
  }
  return true;
107 108
}

109 110
bool RecordFileWriter::WriteIndex() {
  std::lock_guard<std::mutex> lock(mutex_);
111 112 113 114 115 116 117 118 119 120 121
  for (int i = 0; i < index_.indexes_size(); i++) {
    SingleIndex* single_index = index_.mutable_indexes(i);
    if (single_index->type() == SectionType::SECTION_CHANNEL) {
      ChannelCache* channel_cache = single_index->mutable_channel_cache();
      if (channel_message_number_map_.find(channel_cache->name()) !=
          channel_message_number_map_.end()) {
        channel_cache->set_message_number(
            channel_message_number_map_[channel_cache->name()]);
      }
    }
  }
122 123 124
  header_.set_index_position(CurrentPosition());
  if (!WriteSection<Index>(index_)) {
    AERROR << "Write section fail";
125 126 127 128 129 130 131
    return false;
  }
  return true;
}

bool RecordFileWriter::WriteChannel(const Channel& channel) {
  std::lock_guard<std::mutex> lock(mutex_);
fengqikai1414's avatar
fengqikai1414 已提交
132
  uint64_t pos = CurrentPosition();
133 134
  if (!WriteSection<Channel>(channel)) {
    AERROR << "Write section fail";
135 136 137 138 139
    return false;
  }
  header_.set_channel_number(header_.channel_number() + 1);
  SingleIndex* single_index = index_.add_indexes();
  single_index->set_type(SectionType::SECTION_CHANNEL);
fengqikai1414's avatar
fengqikai1414 已提交
140
  single_index->set_position(pos);
141 142 143 144 145 146 147 148 149 150 151 152
  ChannelCache* channel_cache = new ChannelCache();
  channel_cache->set_name(channel.name());
  channel_cache->set_message_number(0);
  channel_cache->set_message_type(channel.message_type());
  channel_cache->set_proto_desc(channel.proto_desc());
  single_index->set_allocated_channel_cache(channel_cache);
  return true;
}

bool RecordFileWriter::WriteChunk(const ChunkHeader& chunk_header,
                                  const ChunkBody& chunk_body) {
  std::lock_guard<std::mutex> lock(mutex_);
fengqikai1414's avatar
fengqikai1414 已提交
153
  uint64_t pos = CurrentPosition();
154 155
  if (!WriteSection<ChunkHeader>(chunk_header)) {
    AERROR << "Write chunk header fail";
156 157
    return false;
  }
158 159
  SingleIndex* single_index = index_.add_indexes();
  single_index->set_type(SectionType::SECTION_CHUNK_HEADER);
fengqikai1414's avatar
fengqikai1414 已提交
160
  single_index->set_position(pos);
161 162 163 164 165 166
  ChunkHeaderCache* chunk_header_cache = new ChunkHeaderCache();
  chunk_header_cache->set_begin_time(chunk_header.begin_time());
  chunk_header_cache->set_end_time(chunk_header.end_time());
  chunk_header_cache->set_message_number(chunk_header.message_number());
  chunk_header_cache->set_raw_size(chunk_header.raw_size());
  single_index->set_allocated_chunk_header_cache(chunk_header_cache);
fengqikai1414's avatar
fengqikai1414 已提交
167 168

  pos = CurrentPosition();
169 170
  if (!WriteSection<ChunkBody>(chunk_body)) {
    AERROR << "Write chunk body fail";
171 172 173 174 175 176 177 178 179 180 181
    return false;
  }
  header_.set_chunk_number(header_.chunk_number() + 1);
  if (header_.begin_time() == 0) {
    header_.set_begin_time(chunk_header.begin_time());
  }
  header_.set_end_time(chunk_header.end_time());
  header_.set_message_number(header_.message_number() +
                             chunk_header.message_number());
  single_index = index_.add_indexes();
  single_index->set_type(SectionType::SECTION_CHUNK_BODY);
fengqikai1414's avatar
fengqikai1414 已提交
182
  single_index->set_position(pos);
183 184 185 186 187 188
  ChunkBodyCache* chunk_body_cache = new ChunkBodyCache();
  chunk_body_cache->set_message_number(chunk_body.messages_size());
  single_index->set_allocated_chunk_body_cache(chunk_body_cache);
  return true;
}

189
bool RecordFileWriter::WriteMessage(const SingleMessage& message) {
190
  chunk_active_->add(message);
191
  auto it = channel_message_number_map_.find(message.channel_name());
192 193 194
  if (it != channel_message_number_map_.end()) {
    it->second++;
  } else {
195 196
    channel_message_number_map_.insert(
        std::make_pair(message.channel_name(), 1));
197
  }
198 199 200 201 202 203 204 205 206
  bool need_flush = false;
  if (header_.chunk_interval() > 0 &&
      message.time() - chunk_active_->header_.begin_time() >
          header_.chunk_interval()) {
    need_flush = true;
  }
  if (header_.chunk_raw_size() > 0 &&
      chunk_active_->header_.raw_size() > header_.chunk_raw_size()) {
    need_flush = true;
207
  }
208
  if (!need_flush) {
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
    return true;
  }
  {
    std::unique_lock<std::mutex> flush_lock(flush_mutex_);
    chunk_flush_.swap(chunk_active_);
    flush_cv_.notify_one();
  }
  return true;
}

void RecordFileWriter::Flush() {
  while (is_writing_) {
    std::unique_lock<std::mutex> flush_lock(flush_mutex_);
    flush_cv_.wait(flush_lock,
                   [this] { return !chunk_flush_->empty() || !is_writing_; });
    if (!is_writing_) {
      break;
    }
    if (chunk_flush_->empty()) {
      continue;
    }
230
    if (!WriteChunk(chunk_flush_->header_, *(chunk_flush_->body_.get()))) {
231
      AERROR << "Write chunk fail.";
232 233 234
    }
    chunk_flush_->clear();
  }
235
  return;
236 237
}

238 239 240 241 242 243 244 245 246
uint64_t RecordFileWriter::GetMessageNumber(
    const std::string& channel_name) const {
  auto search = channel_message_number_map_.find(channel_name);
  if (search != channel_message_number_map_.end()) {
    return search->second;
  }
  return 0;
}

247
}  // namespace record
G
GoLancer 已提交
248
}  // namespace cyber
249
}  // namespace apollo