提交 bdfb088f 编写于 作者: baowanyu's avatar baowanyu 提交者: Zhongjun Ni

framework: add "-k --black-channel" options for split, and bugfix.

上级 aff787c8
......@@ -26,15 +26,22 @@ RecordBase::~RecordBase() {}
const Header RecordBase::GetHeader() const { return header_; }
bool RecordBase::IsNewChannel(const std::string& channel_name) {
auto search = channel_message_number_map_.find(channel_name);
if (search == channel_message_number_map_.end()) {
return true;
}
return false;
}
void RecordBase::OnNewChannel(const std::string& channel_name,
const std::string& message_type,
const std::string& proto_desc) {
auto search = channel_message_number_map_.find(channel_name);
if (search == channel_message_number_map_.end()) {
if (IsNewChannel(channel_name)) {
channel_message_number_map_[channel_name] = 0;
channel_message_type_map_[channel_name] = message_type;
channel_proto_desc_map_[channel_name] = proto_desc;
}
channel_message_type_map_[channel_name] = message_type;
channel_proto_desc_map_[channel_name] = proto_desc;
}
void RecordBase::OnNewMessage(const std::string& channel_name) {
......
......@@ -45,6 +45,8 @@ class RecordBase {
const Header GetHeader() const;
protected:
bool IsNewChannel(const std::string& channel_name);
void OnNewChannel(const std::string& channel_name,
const std::string& message_type,
const std::string& proto_desc);
......
......@@ -68,14 +68,19 @@ bool RecordWriter::WriteChannel(const std::string& channel_name,
const std::string& message_type,
const std::string& proto_desc) {
std::lock_guard<std::mutex> lg(mutex_);
OnNewChannel(channel_name, message_type, proto_desc);
Channel channel;
channel.set_name(channel_name);
channel.set_message_type(message_type);
channel.set_proto_desc(proto_desc);
if (!file_writer_->WriteChannel(channel)) {
AERROR << "write channel fail.";
return false;
if (IsNewChannel(channel_name)) {
OnNewChannel(channel_name, message_type, proto_desc);
Channel channel;
channel.set_name(channel_name);
channel.set_message_type(message_type);
channel.set_proto_desc(proto_desc);
if (!file_writer_->WriteChannel(channel)) {
AERROR << "write channel fail.";
return false;
}
} else {
AWARN << "intercept write channel request, duplicate channel: "
<< channel_name;
}
return true;
}
......@@ -83,28 +88,23 @@ bool RecordWriter::WriteChannel(const std::string& channel_name,
bool RecordWriter::WriteMessage(const SingleMessage& message) {
std::lock_guard<std::mutex> lg(mutex_);
segment_raw_size_ += message.content().size();
if (segment_begin_time_ == 0) {
segment_begin_time_ = message.time();
}
if (!file_writer_->WriteMessage(message)) {
AERROR << "write message fail.";
return false;
}
if (0 == header_.segment_interval() && 0 == header_.segment_raw_size()) {
return true;
segment_raw_size_ += message.content().size();
if (segment_begin_time_ == 0) {
segment_begin_time_ = message.time();
}
if (message.time() - segment_begin_time_ < header_.segment_interval() &&
segment_raw_size_ < header_.segment_raw_size()) {
return true;
if ((header_.segment_interval() > 0 &&
message.time() - segment_begin_time_ > header_.segment_interval()) ||
(header_.segment_raw_size() > 0 &&
segment_raw_size_ > header_.segment_raw_size())) {
file_writer_backup_.swap(file_writer_);
file_writer_backup_->Close();
SplitOutfile();
}
file_writer_backup_.swap(file_writer_);
file_writer_backup_->Close();
SplitOutfile();
return true;
}
......
......@@ -42,7 +42,7 @@ using apollo::cybertron::common::UnixSecondsToString;
const char INFO_OPTIONS[] = "f:h";
const char RECORD_OPTIONS[] = "o:ac:h";
const char PLAY_OPTIONS[] = "f:c:lr:b:e:s:d:h";
const char SPLIT_OPTIONS[] = "f:o:c:b:e:h";
const char SPLIT_OPTIONS[] = "f:o:c:k:b:e:h";
const char RECOVER_OPTIONS[] = "f:o:h";
void DisplayUsage(const std::string& binary);
......@@ -95,7 +95,11 @@ void DisplayUsage(const std::string& binary, const std::string& command,
std::cout << "\t-a, --all\t\t\t\t" << command << " all" << std::endl;
break;
case 'c':
std::cout << "\t-c, --channel <name>\t\t\tonly " << command
std::cout << "\t-c, --white-channel <name>\t\t\tonly " << command
<< " the specified channel" << std::endl;
break;
case 'k':
std::cout << "\t-k, --black-channel <name>\t\t\tnot " << command
<< " the specified channel" << std::endl;
break;
case 'l':
......@@ -142,10 +146,11 @@ int main(int argc, char** argv) {
const std::string command(argv[1]);
int long_index = 0;
const std::string short_opts = "f:c:o:alr:b:e:s:d:h";
const std::string short_opts = "f:c:k:o:alr:b:e:s:d:h";
static const struct option long_opts[] = {
{"files", required_argument, NULL, 'f'},
{"channel", required_argument, NULL, 'c'},
{"white-channel", required_argument, NULL, 'c'},
{"black-channel", required_argument, NULL, 'k'},
{"output", required_argument, NULL, 'o'},
{"all", no_argument, NULL, 'a'},
{"loop", no_argument, NULL, 'l'},
......@@ -158,7 +163,8 @@ int main(int argc, char** argv) {
std::vector<std::string> opt_file_vec;
std::vector<std::string> opt_output_vec;
std::vector<std::string> opt_channel_vec;
std::vector<std::string> opt_white_channels;
std::vector<std::string> opt_black_channels;
bool opt_all = false;
bool opt_loop = false;
float opt_rate = 1.0f;
......@@ -181,7 +187,19 @@ int main(int argc, char** argv) {
optind--;
while (optind < argc) {
if (*argv[optind] != '-') {
opt_channel_vec.emplace_back(argv[optind]);
opt_white_channels.emplace_back(argv[optind]);
optind++;
} else {
optind--;
break;
}
}
break;
case 'k':
optind--;
while (optind < argc) {
if (*argv[optind] != '-') {
opt_black_channels.emplace_back(argv[optind]);
optind++;
} else {
optind--;
......@@ -294,16 +312,16 @@ int main(int argc, char** argv) {
// TODO @baownayu order input record file
bool play_result = true;
for (auto& opt_file : opt_file_vec) {
Player player(opt_file_vec[0], opt_channel_vec.empty(), opt_channel_vec,
opt_loop, opt_rate, opt_begin, opt_end, opt_start,
opt_delay);
Player player(opt_file_vec[0], opt_white_channels.empty(),
opt_white_channels, opt_loop, opt_rate, opt_begin, opt_end,
opt_start, opt_delay);
play_result = play_result && player.Init() ? true : false;
play_result = play_result && player.Start() ? true : false;
}
::apollo::cybertron::Shutdown();
return play_result ? 0 : -1;
} else if (command == "record") {
if (opt_channel_vec.empty() && !opt_all) {
if (opt_white_channels.empty() && !opt_all) {
std::cout
<< "MUST specify channels option (-c) or all channels option (-a)."
<< std::endl;
......@@ -320,8 +338,8 @@ int main(int argc, char** argv) {
}
bool record_result = true;
::apollo::cybertron::Init(argv[0]);
auto recorder =
std::make_shared<Recorder>(opt_output_vec[0], opt_all, opt_channel_vec);
auto recorder = std::make_shared<Recorder>(opt_output_vec[0], opt_all,
opt_white_channels);
record_result = record_result && recorder->Start() ? true : false;
if (record_result) {
while (::apollo::cybertron::OK()) {
......@@ -345,8 +363,8 @@ int main(int argc, char** argv) {
opt_output_vec.push_back(default_output_file);
}
::apollo::cybertron::Init(argv[0]);
Spliter spliter(opt_file_vec[0], opt_output_vec[0], opt_channel_vec.empty(),
opt_channel_vec, opt_begin, opt_end);
Spliter spliter(opt_file_vec[0], opt_output_vec[0], opt_white_channels,
opt_black_channels, opt_begin, opt_end);
bool split_result = spliter.Proc();
::apollo::cybertron::Shutdown();
return split_result ? 0 : -1;
......
......@@ -97,8 +97,11 @@ void Recorder::FindNewChannel(const RoleAttributes& role_attr) {
}
if (channel_reader_map_.find(role_attr.channel_name()) ==
channel_reader_map_.end()) {
writer_->WriteChannel(role_attr.channel_name(), role_attr.message_type(),
role_attr.proto_desc());
if (!writer_->WriteChannel(role_attr.channel_name(),
role_attr.message_type(),
role_attr.proto_desc())) {
AERROR << "write channel fail, channel:" << role_attr.channel_name();
}
InitReaderImpl(role_attr.channel_name(), role_attr.message_type());
}
}
......
......@@ -21,24 +21,35 @@ namespace cybertron {
namespace record {
Spliter::Spliter(const std::string& input_file, const std::string& output_file,
bool all_channels, const std::vector<std::string>& channel_vec,
const std::vector<std::string>& white_channels,
const std::vector<std::string>& black_channels,
uint64_t begin_time, uint64_t end_time)
: input_file_(input_file),
output_file_(output_file),
channel_vec_(channel_vec),
all_channels_(all_channels),
white_channels_(white_channels),
black_channels_(black_channels),
begin_time_(begin_time),
end_time_(end_time) {}
Spliter::~Spliter() {}
bool Spliter::Proc() {
AINFO << "split record file started.";
// check params
if (begin_time_ >= end_time_) {
AERROR << "begin time larger or equal than end time, begin_time_: "
<< begin_time_ << "end_time_: " << end_time_;
return false;
}
for (auto channel_name : white_channels_) {
if (std::find(black_channels_.begin(), black_channels_.end(),
channel_name) != black_channels_.end()) {
AERROR << "find channel in both of white list and black list, channel: "
<< channel_name;
return false;
}
}
AINFO << "split record file started.";
// open input file
if (!reader_.Open(input_file_)) {
......@@ -88,10 +99,13 @@ bool Spliter::Proc() {
AERROR << "read channel section fail.";
return false;
}
if (all_channels_ ||
std::find(channel_vec_.begin(), channel_vec_.end(), chan.name()) !=
channel_vec_.end()) {
writer_.WriteChannel(chan);
if (white_channels_.empty() ||
std::find(white_channels_.begin(), white_channels_.end(),
chan.name()) != white_channels_.end()) {
if (std::find(black_channels_.begin(), black_channels_.end(),
chan.name()) == black_channels_.end()) {
writer_.WriteChannel(chan);
}
}
break;
}
......@@ -118,10 +132,15 @@ bool Spliter::Proc() {
return false;
}
for (int idx = 0; idx < cbd.messages_size(); ++idx) {
if (!all_channels_ &&
std::find(channel_vec_.begin(), channel_vec_.end(),
if (!white_channels_.empty() &&
std::find(white_channels_.begin(), white_channels_.end(),
cbd.messages(idx).channel_name()) ==
channel_vec_.end()) {
white_channels_.end()) {
continue;
}
if (std::find(black_channels_.begin(), black_channels_.end(),
cbd.messages(idx).channel_name()) !=
black_channels_.end()) {
continue;
}
if (cbd.messages(idx).time() < begin_time_ ||
......
......@@ -39,7 +39,8 @@ namespace record {
class Spliter {
public:
Spliter(const std::string& input_file, const std::string& output_file,
bool all_channels, const std::vector<std::string>& channel_vec,
const std::vector<std::string>& white_channels,
const std::vector<std::string>& black_channels,
uint64_t begin_time = 0, uint64_t end_time = UINT64_MAX);
virtual ~Spliter();
bool Proc();
......@@ -49,7 +50,8 @@ class Spliter {
RecordFileWriter writer_;
std::string input_file_;
std::string output_file_;
std::vector<std::string> channel_vec_;
std::vector<std::string> white_channels_;
std::vector<std::string> black_channels_;
bool all_channels_;
uint64_t begin_time_;
uint64_t end_time_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册