From 687cb79dbbe7360b190d2fa0b8a244b7e158f9f6 Mon Sep 17 00:00:00 2001 From: dongdaxiang Date: Tue, 26 Feb 2019 18:48:55 +0800 Subject: [PATCH] add pipe command io interface --- paddle/fluid/framework/data_feed.cc | 25 ++++++------------- .../fluid/framework/executor_thread_worker.cc | 2 +- python/paddle/fluid/data_feed_desc.py | 19 +------------- 3 files changed, 9 insertions(+), 37 deletions(-) diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index 36ce3debc..4cfd2b434 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -177,6 +177,7 @@ void MultiSlotDataFeed::ReadThread() { while (PickOneFile(&filename)) { int err_no = 0; fp_ = fs_open_read(filename, &err_no, pipe_command_); + CHECK(fp_ != nullptr); __fsetlocking(&*fp_, FSETLOCKING_BYCALLER); thread_local string::LineFileReader reader; std::vector instance; @@ -185,7 +186,7 @@ void MultiSlotDataFeed::ReadThread() { ins_num++; queue_->Send(instance); } - LOG(ERROR) << "filename: " << filename << " inst num: " << ins_num; + VLOG(3) << "filename: " << filename << " inst num: " << ins_num; } queue_->Close(); } @@ -304,31 +305,16 @@ bool MultiSlotDataFeed::CheckFile(const char* filename) { bool MultiSlotDataFeed::ParseOneInstanceFromPipe( std::vector* instance) { thread_local string::LineFileReader reader; - /* - while (reader.getline(&*(fp_.get()))) { - */ - /* - const char* str = reader.get(); - std::string line = std::string(str); - LOG(ERROR) << line; - */ - /* - LOG(ERROR) << "read a line"; - } - */ if (!reader.getline(&*(fp_.get()))) { return false; } else { - // std::string& line = reader_.get(); - // const char* str = line.c_str(); - int use_slots_num = use_slots_.size(); instance->resize(use_slots_num); const char* str = reader.get(); std::string line = std::string(str); - // LOG(ERROR) << line; + VLOG(3) << line; char* endptr = const_cast(str); int pos = 0; for (size_t i = 0; i < use_slots_index_.size(); ++i) { @@ -357,7 +343,10 @@ bool MultiSlotDataFeed::ParseOneInstanceFromPipe( pos = endptr - str; } else { for (int j = 0; j <= num; ++j) { - pos = line.find_first_of(' ', pos + 1); + // pos = line.find_first_of(' ', pos + 1); + while (line[pos + 1] != ' ') { + pos++; + } } } } diff --git a/paddle/fluid/framework/executor_thread_worker.cc b/paddle/fluid/framework/executor_thread_worker.cc index d03eeb9e9..cf0738e07 100644 --- a/paddle/fluid/framework/executor_thread_worker.cc +++ b/paddle/fluid/framework/executor_thread_worker.cc @@ -274,7 +274,7 @@ void ExecutorThreadWorker::TrainFilesWithTimer() { ++batch_cnt; thread_scope_->DropKids(); if (thread_id_ == 0) { - if (batch_cnt > 0 && batch_cnt % 1000 == 0) { + if (batch_cnt > 0 && batch_cnt % 100 == 0) { for (size_t i = 0; i < ops_.size(); ++i) { fprintf(stderr, "op_name:[%zu][%s], op_mean_time:[%fs]\n", i, op_name[i].c_str(), op_total_time[i] / batch_cnt); diff --git a/python/paddle/fluid/data_feed_desc.py b/python/paddle/fluid/data_feed_desc.py index 2770c0209..80745aac8 100644 --- a/python/paddle/fluid/data_feed_desc.py +++ b/python/paddle/fluid/data_feed_desc.py @@ -68,6 +68,7 @@ class DataFeedDesc(object): def __init__(self, proto_file): self.proto_desc = data_feed_pb2.DataFeedDesc() + self.proto_desc.pipe_command = "cat" with open(proto_file, 'r') as f: text_format.Parse(f.read(), self.proto_desc) if self.proto_desc.name == "MultiSlotDataFeed": @@ -114,24 +115,6 @@ class DataFeedDesc(object): self.proto_desc.multi_slot_desc.slots[self.__name_to_index[ name]].is_dense = True - def set_pipe_command(self, pipe_command): - """ - Pipeline command will be set with this function. In IO runtime, - pipeline command will be executed given user provided input raw - files. - - Example: - >>> data_feed = fluid.DataFeedDesc('data.proto') - >>> data_feed.set_pipe_command('awk -F '\t' '{print $2}'') - - Args: - pipe_command: a command string of shell command - - Note: - Default is cat, i.e., cat user's input file list to data feed - """ - self.proto_desc.pipe_command = pipe_command - def set_use_slots(self, use_slots_name): """ Set if a specific slot will be used for training. A dataset shall -- GitLab