提交 8edbac77 编写于 作者: M Martin Jansa 提交者: Liu Jiaming

cyber_recorder: add support for black listing topic names in record like in play and split

* this is useful to record all topic except explicit list of a few topic we
  really don't want to record, e.g. because the size
上级 6ebc9195
......@@ -169,7 +169,8 @@ $ cyber_recorder record -h
usage: cyber_recorder record [options]
-o, --output <file> output record file
-a, --all all channels
-c, --channel <name> channel name
-c, --white-channel <name> only record the specified channel
-k, --black-channel <name> not record the specified channel
-i, --segment-interval <seconds> record segmented every n second(s)
-m, --segment-size <MB> record segmented every n megabyte(s)
-h, --help show help message
......
......@@ -42,7 +42,7 @@ using apollo::cyber::record::Recoverer;
using apollo::cyber::record::Spliter;
const char INFO_OPTIONS[] = "h";
const char RECORD_OPTIONS[] = "o:ac:i:m:h";
const char RECORD_OPTIONS[] = "o:ac:k:i:m:h";
const char PLAY_OPTIONS[] = "f:ac:k:lr:b:e:s:d:p:h";
const char SPLIT_OPTIONS[] = "f:o:c:k:b:e:h";
const char RECOVER_OPTIONS[] = "f:o:h";
......@@ -424,7 +424,8 @@ int main(int argc, char** argv) {
}
::apollo::cyber::Init(argv[0]);
auto recorder = std::make_shared<Recorder>(opt_output_vec[0], opt_all,
opt_white_channels, opt_header);
opt_white_channels,
opt_black_channels, opt_header);
bool record_result = recorder->Start();
if (record_result) {
while (!::apollo::cyber::IsShutdown()) {
......
......@@ -23,22 +23,37 @@ namespace cyber {
namespace record {
Recorder::Recorder(const std::string& output, bool all_channels,
const std::vector<std::string>& channel_vec)
: output_(output), all_channels_(all_channels), channel_vec_(channel_vec) {
const std::vector<std::string>& white_channels,
const std::vector<std::string>& black_channels)
: output_(output),
all_channels_(all_channels),
white_channels_(white_channels),
black_channels_(black_channels) {
header_ = HeaderBuilder::GetHeader();
}
Recorder::Recorder(const std::string& output, bool all_channels,
const std::vector<std::string>& channel_vec,
const std::vector<std::string>& white_channels,
const std::vector<std::string>& black_channels,
const proto::Header& header)
: output_(output),
all_channels_(all_channels),
channel_vec_(channel_vec),
white_channels_(white_channels),
black_channels_(black_channels),
header_(header) {}
Recorder::~Recorder() { Stop(); }
bool Recorder::Start() {
for (const auto& channel_name : white_channels_) {
if (std::find(black_channels_.begin(), black_channels_.end(),
channel_name) != black_channels_.end()) {
AERROR << "find channel in both of white list and black list, channel: "
<< channel_name;
return false;
}
}
writer_.reset(new RecordWriter(header_));
if (!writer_->Open(output_)) {
AERROR << "Datafile open file error.";
......@@ -110,11 +125,20 @@ void Recorder::FindNewChannel(const RoleAttributes& role_attr) {
return;
}
if (!all_channels_ &&
std::find(channel_vec_.begin(), channel_vec_.end(),
role_attr.channel_name()) == channel_vec_.end()) {
ADEBUG << "New channel was found, but not in record list.";
std::find(white_channels_.begin(), white_channels_.end(),
role_attr.channel_name()) == white_channels_.end()) {
ADEBUG << "New channel '" << role_attr.channel_name()
<< "' was found, but not in record list.";
return;
}
if (std::find(black_channels_.begin(), black_channels_.end(),
role_attr.channel_name()) != black_channels_.end()) {
ADEBUG << "New channel '" << role_attr.channel_name()
<< "' was found, but it appears in the blacklist.";
return;
}
if (channel_reader_map_.find(role_attr.channel_name()) ==
channel_reader_map_.end()) {
if (!writer_->WriteChannel(role_attr.channel_name(),
......
......@@ -47,9 +47,11 @@ namespace record {
class Recorder : public std::enable_shared_from_this<Recorder> {
public:
Recorder(const std::string& output, bool all_channels,
const std::vector<std::string>& channel_vec);
const std::vector<std::string>& white_channels,
const std::vector<std::string>& black_channels);
Recorder(const std::string& output, bool all_channels,
const std::vector<std::string>& channel_vec,
const std::vector<std::string>& white_channels,
const std::vector<std::string>& black_channels,
const proto::Header& header);
~Recorder();
bool Start();
......@@ -64,7 +66,8 @@ class Recorder : public std::enable_shared_from_this<Recorder> {
Connection<const ChangeMsg&> change_conn_;
std::string output_;
bool all_channels_ = true;
std::vector<std::string> channel_vec_;
std::vector<std::string> white_channels_;
std::vector<std::string> black_channels_;
proto::Header header_;
std::unordered_map<std::string, std::shared_ptr<ReaderBase>>
channel_reader_map_;
......
......@@ -168,7 +168,8 @@ $ cyber_recorder record -h
usage: cyber_recorder record [options]
-o, --output <file> output record file
-a, --all all channels
-c, --channel <name> channel name
-c, --white-channel <name> only record the specified channel
-k, --black-channel <name> not record the specified channel
-i, --segment-interval <seconds> record segmented every n second(s)
-m, --segment-size <MB> record segmented every n megabyte(s)
-h, --help show help message
......
......@@ -128,13 +128,14 @@ bool RealtimeRecordProcessor::Init(const SmartRecordTrigger& trigger_conf) {
max_backward_time_ = trigger_conf.max_backward_time();
min_restore_chunk_ = trigger_conf.min_restore_chunk();
std::vector<std::string> all_channels;
std::vector<std::string> black_channels;
const std::set<std::string>& all_channels_set =
ChannelPool::Instance()->GetAllChannels();
std::copy(all_channels_set.begin(), all_channels_set.end(),
std::back_inserter(all_channels));
recorder_ = std::make_shared<Recorder>(
absl::StrCat(source_record_dir_, "/", default_output_filename_), false,
all_channels, HeaderBuilder::GetHeader());
all_channels, black_channels, HeaderBuilder::GetHeader());
// Init base
if (!RecordProcessor::Init(trigger_conf)) {
AERROR << "base init failed";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册