提交 687cb79d 编写于 作者: D dongdaxiang

add pipe command io interface

上级 1fe54416
...@@ -177,6 +177,7 @@ void MultiSlotDataFeed::ReadThread() { ...@@ -177,6 +177,7 @@ void MultiSlotDataFeed::ReadThread() {
while (PickOneFile(&filename)) { while (PickOneFile(&filename)) {
int err_no = 0; int err_no = 0;
fp_ = fs_open_read(filename, &err_no, pipe_command_); fp_ = fs_open_read(filename, &err_no, pipe_command_);
CHECK(fp_ != nullptr);
__fsetlocking(&*fp_, FSETLOCKING_BYCALLER); __fsetlocking(&*fp_, FSETLOCKING_BYCALLER);
thread_local string::LineFileReader reader; thread_local string::LineFileReader reader;
std::vector<MultiSlotType> instance; std::vector<MultiSlotType> instance;
...@@ -185,7 +186,7 @@ void MultiSlotDataFeed::ReadThread() { ...@@ -185,7 +186,7 @@ void MultiSlotDataFeed::ReadThread() {
ins_num++; ins_num++;
queue_->Send(instance); queue_->Send(instance);
} }
LOG(ERROR) << "filename: " << filename << " inst num: " << ins_num; VLOG(3) << "filename: " << filename << " inst num: " << ins_num;
} }
queue_->Close(); queue_->Close();
} }
...@@ -304,31 +305,16 @@ bool MultiSlotDataFeed::CheckFile(const char* filename) { ...@@ -304,31 +305,16 @@ bool MultiSlotDataFeed::CheckFile(const char* filename) {
bool MultiSlotDataFeed::ParseOneInstanceFromPipe( bool MultiSlotDataFeed::ParseOneInstanceFromPipe(
std::vector<MultiSlotType>* instance) { std::vector<MultiSlotType>* instance) {
thread_local string::LineFileReader reader; thread_local string::LineFileReader reader;
/*
while (reader.getline(&*(fp_.get()))) {
*/
/*
const char* str = reader.get();
std::string line = std::string(str);
LOG(ERROR) << line;
*/
/*
LOG(ERROR) << "read a line";
}
*/
if (!reader.getline(&*(fp_.get()))) { if (!reader.getline(&*(fp_.get()))) {
return false; return false;
} else { } else {
// std::string& line = reader_.get();
// const char* str = line.c_str();
int use_slots_num = use_slots_.size(); int use_slots_num = use_slots_.size();
instance->resize(use_slots_num); instance->resize(use_slots_num);
const char* str = reader.get(); const char* str = reader.get();
std::string line = std::string(str); std::string line = std::string(str);
// LOG(ERROR) << line; VLOG(3) << line;
char* endptr = const_cast<char*>(str); char* endptr = const_cast<char*>(str);
int pos = 0; int pos = 0;
for (size_t i = 0; i < use_slots_index_.size(); ++i) { for (size_t i = 0; i < use_slots_index_.size(); ++i) {
...@@ -357,7 +343,10 @@ bool MultiSlotDataFeed::ParseOneInstanceFromPipe( ...@@ -357,7 +343,10 @@ bool MultiSlotDataFeed::ParseOneInstanceFromPipe(
pos = endptr - str; pos = endptr - str;
} else { } else {
for (int j = 0; j <= num; ++j) { for (int j = 0; j <= num; ++j) {
pos = line.find_first_of(' ', pos + 1); // pos = line.find_first_of(' ', pos + 1);
while (line[pos + 1] != ' ') {
pos++;
}
} }
} }
} }
......
...@@ -274,7 +274,7 @@ void ExecutorThreadWorker::TrainFilesWithTimer() { ...@@ -274,7 +274,7 @@ void ExecutorThreadWorker::TrainFilesWithTimer() {
++batch_cnt; ++batch_cnt;
thread_scope_->DropKids(); thread_scope_->DropKids();
if (thread_id_ == 0) { if (thread_id_ == 0) {
if (batch_cnt > 0 && batch_cnt % 1000 == 0) { if (batch_cnt > 0 && batch_cnt % 100 == 0) {
for (size_t i = 0; i < ops_.size(); ++i) { for (size_t i = 0; i < ops_.size(); ++i) {
fprintf(stderr, "op_name:[%zu][%s], op_mean_time:[%fs]\n", i, fprintf(stderr, "op_name:[%zu][%s], op_mean_time:[%fs]\n", i,
op_name[i].c_str(), op_total_time[i] / batch_cnt); op_name[i].c_str(), op_total_time[i] / batch_cnt);
......
...@@ -68,6 +68,7 @@ class DataFeedDesc(object): ...@@ -68,6 +68,7 @@ class DataFeedDesc(object):
def __init__(self, proto_file): def __init__(self, proto_file):
self.proto_desc = data_feed_pb2.DataFeedDesc() self.proto_desc = data_feed_pb2.DataFeedDesc()
self.proto_desc.pipe_command = "cat"
with open(proto_file, 'r') as f: with open(proto_file, 'r') as f:
text_format.Parse(f.read(), self.proto_desc) text_format.Parse(f.read(), self.proto_desc)
if self.proto_desc.name == "MultiSlotDataFeed": if self.proto_desc.name == "MultiSlotDataFeed":
...@@ -114,24 +115,6 @@ class DataFeedDesc(object): ...@@ -114,24 +115,6 @@ class DataFeedDesc(object):
self.proto_desc.multi_slot_desc.slots[self.__name_to_index[ self.proto_desc.multi_slot_desc.slots[self.__name_to_index[
name]].is_dense = True name]].is_dense = True
def set_pipe_command(self, pipe_command):
"""
Pipeline command will be set with this function. In IO runtime,
pipeline command will be executed given user provided input raw
files.
Example:
>>> data_feed = fluid.DataFeedDesc('data.proto')
>>> data_feed.set_pipe_command('awk -F '\t' '{print $2}'')
Args:
pipe_command: a command string of shell command
Note:
Default is cat, i.e., cat user's input file list to data feed
"""
self.proto_desc.pipe_command = pipe_command
def set_use_slots(self, use_slots_name): def set_use_slots(self, use_slots_name):
""" """
Set if a specific slot will be used for training. A dataset shall Set if a specific slot will be used for training. A dataset shall
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册