diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index 2e5380afbcec41c1b51eecbd3663b4a329e07f40..5d4d0ad4b775b8d2fde71d380b9f24019d08c524 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -23,11 +23,9 @@ endfunction() add_subdirectory(ir) add_subdirectory(details) -<<<<<<< HEAD add_subdirectory(fleet) -======= add_subdirectory(common) ->>>>>>> add fs_local_open example +add_subdirectory(io) #ddim lib proto_library(framework_proto SRCS framework.proto) proto_library(async_executor_proto SRCS data_feed.proto) diff --git a/paddle/fluid/framework/async_executor.cc b/paddle/fluid/framework/async_executor.cc index 9d8246d71370fa513b3f2b3761a27bace8a3b39f..67770f77c2fc3b4b7f6141f7813c5f55b9c94796 100644 --- a/paddle/fluid/framework/async_executor.cc +++ b/paddle/fluid/framework/async_executor.cc @@ -18,8 +18,6 @@ limitations under the License. */ #include "google/protobuf/text_format.h" #include "gflags/gflags.h" -#include "paddle/fluid/framework/common/fs.h" -#include "paddle/fluid/framework/common/shell.h" #include "paddle/fluid/framework/data_feed_factory.h" #include "paddle/fluid/framework/executor_thread_worker.h" #include "paddle/fluid/framework/feed_fetch_method.h" diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index e37e596565470cc8f2daad278b6668ee91d615cc..36ce3debc301c0010436c12892fc746ee14cb4b4 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -12,15 +12,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ +#include "paddle/fluid/framework/data_feed.h" #include +#include "gflags/gflags.h" #include "google/protobuf/io/zero_copy_stream_impl.h" #include "google/protobuf/message.h" #include "google/protobuf/text_format.h" - -#include "common/fs.h" -#include "common/shell.h" -#include "gflags/gflags.h" -#include "paddle/fluid/framework/data_feed.h" +#include "io/fs.h" +#include "io/shell.h" #include "paddle/fluid/framework/feed_fetch_method.h" #include "paddle/fluid/framework/feed_fetch_type.h" @@ -94,24 +93,8 @@ void PrivateQueueDataFeed::SetQueueSize(int queue_size) { template bool PrivateQueueDataFeed::Start() { CheckSetFileList(); - std::string filename; - if (PickOneFile(&filename)) { - int err_no = 0; - std::string pipeline_cmd = "cat"; - - std::string path = - "/home/users/dongdaxiang/pslib_ctr/local/data_mod/part-00012"; - fp_ = fs_open_read(path, &err_no, pipeline_cmd); - __fsetlocking(&*fp_, FSETLOCKING_BYCALLER); - thread_local LineFileReader reader; - while (reader.getline(&*(fp_.get()))) { - LOG(ERROR) << "read a line"; - } - - read_thread_ = std::thread(&PrivateQueueDataFeed::ReadThread, this); - read_thread_.detach(); - } - queue_->Close(); + read_thread_ = std::thread(&PrivateQueueDataFeed::ReadThread, this); + read_thread_.detach(); finish_start_ = true; return true; @@ -119,10 +102,18 @@ bool PrivateQueueDataFeed::Start() { template void PrivateQueueDataFeed::ReadThread() { - T instance; - while (ParseOneInstanceFromPipe(&instance)) { - queue_->Send(instance); + std::string filename; + while (PickOneFile(&filename)) { + int err_no = 0; + fp_ = fs_open_read(filename, &err_no, pipe_command_); + __fsetlocking(&*fp_, FSETLOCKING_BYCALLER); + thread_local string::LineFileReader reader; + T instance; + while (ParseOneInstanceFromPipe(&instance)) { + queue_->Send(instance); + } } + queue_->Close(); } template @@ -177,15 +168,26 @@ void MultiSlotDataFeed::Init( } } feed_vec_.resize(use_slots_.size()); + pipe_command_ = data_feed_desc.pipe_command(); finish_init_ = true; } void MultiSlotDataFeed::ReadThread() { - LOG(ERROR) << "Haha"; - std::vector instance; - while (ParseOneInstanceFromPipe(&instance)) { - queue_->Send(instance); + std::string filename; + while (PickOneFile(&filename)) { + int err_no = 0; + fp_ = fs_open_read(filename, &err_no, pipe_command_); + __fsetlocking(&*fp_, FSETLOCKING_BYCALLER); + thread_local string::LineFileReader reader; + std::vector instance; + int ins_num = 0; + while (ParseOneInstanceFromPipe(&instance)) { + ins_num++; + queue_->Send(instance); + } + LOG(ERROR) << "filename: " << filename << " inst num: " << ins_num; } + queue_->Close(); } bool MultiSlotDataFeed::CheckFile(const char* filename) { @@ -301,26 +303,32 @@ bool MultiSlotDataFeed::CheckFile(const char* filename) { bool MultiSlotDataFeed::ParseOneInstanceFromPipe( std::vector* instance) { - LOG(ERROR) << "hehe"; - thread_local LineFileReader reader; + thread_local string::LineFileReader reader; + /* while (reader.getline(&*(fp_.get()))) { - /* + */ + /* const char* str = reader.get(); std::string line = std::string(str); LOG(ERROR) << line; - */ + */ + /* LOG(ERROR) << "read a line"; } - return true; - /* - if (!reader.getline(fp_.get())) { + */ + + if (!reader.getline(&*(fp_.get()))) { return false; } else { // std::string& line = reader_.get(); // const char* str = line.c_str(); + + int use_slots_num = use_slots_.size(); + instance->resize(use_slots_num); + const char* str = reader.get(); std::string line = std::string(str); - LOG(ERROR) << line; + // LOG(ERROR) << line; char* endptr = const_cast(str); int pos = 0; for (size_t i = 0; i < use_slots_index_.size(); ++i) { @@ -355,7 +363,6 @@ bool MultiSlotDataFeed::ParseOneInstanceFromPipe( } return true; } - */ } bool MultiSlotDataFeed::ParseOneInstance(std::vector* instance) { diff --git a/paddle/fluid/framework/data_feed.h b/paddle/fluid/framework/data_feed.h index de0289e4d2489b15454cd7eb4958466dd79d9355..59ad90afe1fe88e0ef034594e11a3cf07fca8ad7 100644 --- a/paddle/fluid/framework/data_feed.h +++ b/paddle/fluid/framework/data_feed.h @@ -21,12 +21,12 @@ limitations under the License. */ #include // NOLINT #include -#include "paddle/fluid/framework/common/ps_string.h" #include "paddle/fluid/framework/data_feed.pb.h" #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/reader.h" #include "paddle/fluid/framework/variable.h" #include "paddle/fluid/operators/reader/blocking_queue.h" +#include "paddle/fluid/string/string_helper.h" namespace paddle { namespace framework { @@ -115,6 +115,7 @@ class DataFeed { bool finish_init_; static bool finish_set_filelist_; bool finish_start_; + std::string pipe_command_; }; // PrivateQueueDataFeed is the base virtual class for ohther DataFeeds. @@ -154,7 +155,7 @@ class PrivateQueueDataFeed : public DataFeed { std::ifstream file_; std::shared_ptr fp_; size_t queue_size_; - LineFileReader reader_; + string::LineFileReader reader_; // The queue for store parsed data std::unique_ptr> queue_; }; diff --git a/paddle/fluid/framework/data_feed.proto b/paddle/fluid/framework/data_feed.proto index 489fec08d86ccf61ece29bbba6d0204f25530b0f..b13c908b37cd7407c8e5473f8fcd21c92084675d 100644 --- a/paddle/fluid/framework/data_feed.proto +++ b/paddle/fluid/framework/data_feed.proto @@ -27,4 +27,5 @@ message DataFeedDesc { optional string name = 1; optional int32 batch_size = 2 [ default = 32 ]; optional MultiSlotDesc multi_slot_desc = 3; + optional string pipe_command = 4; } diff --git a/paddle/fluid/framework/executor_thread_worker.cc b/paddle/fluid/framework/executor_thread_worker.cc index efa148a6b50e6d78958acdf8d6fcaa7f32bb3129..d03eeb9e9d8352e9921e654791604496177d1c20 100644 --- a/paddle/fluid/framework/executor_thread_worker.cc +++ b/paddle/fluid/framework/executor_thread_worker.cc @@ -13,15 +13,12 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/framework/executor_thread_worker.h" -#include #include #include "google/protobuf/io/zero_copy_stream_impl.h" #include "google/protobuf/message.h" #include "google/protobuf/text_format.h" #include "gflags/gflags.h" -#include "paddle/fluid/framework/common/fs.h" -#include "paddle/fluid/framework/common/shell.h" #include "paddle/fluid/framework/feed_fetch_method.h" #include "paddle/fluid/framework/feed_fetch_type.h" #include "paddle/fluid/framework/lod_rank_table.h" @@ -247,8 +244,7 @@ void ExecutorThreadWorker::TrainFilesWithTimer() { platform::SetNumThreads(1); SetDevice(); thread_reader_->Start(); - exit(0); - /* + std::vector op_total_time; std::vector op_name; for (auto& op : ops_) { @@ -292,7 +288,6 @@ void ExecutorThreadWorker::TrainFilesWithTimer() { } timeline.Start(); } - */ } void ExecutorThreadWorker::TrainFiles() { diff --git a/paddle/fluid/framework/common/CMakeLists.txt b/paddle/fluid/framework/io/CMakeLists.txt similarity index 100% rename from paddle/fluid/framework/common/CMakeLists.txt rename to paddle/fluid/framework/io/CMakeLists.txt diff --git a/paddle/fluid/framework/common/fs.cc b/paddle/fluid/framework/io/fs.cc similarity index 81% rename from paddle/fluid/framework/common/fs.cc rename to paddle/fluid/framework/io/fs.cc index 62db2a2bd0b8e67b4096bcf57e4c5043373d7cad..a4f2d2a89a4fb262776b535f264555761ef52a0c 100644 --- a/paddle/fluid/framework/common/fs.cc +++ b/paddle/fluid/framework/io/fs.cc @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include "paddle/fluid/framework/common/fs.h" +#include "paddle/fluid/framework/io/fs.h" namespace paddle { namespace framework { @@ -25,10 +25,11 @@ static void fs_add_read_converter_internal(std::string& path, // NOLINT } if (!is_pipe) { - path = format_string("( %s ) < \"%s\"", converter.c_str(), path.c_str()); + path = string::format_string("( %s ) < \"%s\"", converter.c_str(), + path.c_str()); is_pipe = true; } else { - path = format_string("%s | %s", path.c_str(), converter.c_str()); + path = string::format_string("%s | %s", path.c_str(), converter.c_str()); } } @@ -40,10 +41,11 @@ static void fs_add_write_converter_internal(std::string& path, // NOLINT } if (!is_pipe) { - path = format_string("( %s ) > \"%s\"", converter.c_str(), path.c_str()); + path = string::format_string("( %s ) > \"%s\"", converter.c_str(), + path.c_str()); is_pipe = true; } else { - path = format_string("%s | %s", converter.c_str(), path.c_str()); + path = string::format_string("%s | %s", converter.c_str(), path.c_str()); } } @@ -108,7 +110,8 @@ std::shared_ptr localfs_open_read(std::string path, std::shared_ptr localfs_open_write(std::string path, const std::string& converter) { - shell_execute(format_string("mkdir -p $(dirname \"%s\")", path.c_str())); + shell_execute( + string::format_string("mkdir -p $(dirname \"%s\")", path.c_str())); bool is_pipe = false; @@ -134,7 +137,7 @@ void localfs_remove(const std::string& path) { return; } - shell_execute(format_string("rm -rf %s", path.c_str())); + shell_execute(string::format_string("rm -rf %s", path.c_str())); } std::vector localfs_list(const std::string& path) { @@ -144,9 +147,10 @@ std::vector localfs_list(const std::string& path) { std::shared_ptr pipe; int err_no = 0; - pipe = shell_popen(format_string("find %s -type f -maxdepth 1", path.c_str()), - "r", &err_no); - LineFileReader reader; + pipe = shell_popen( + string::format_string("find %s -type f -maxdepth 1", path.c_str()), "r", + &err_no); + string::LineFileReader reader; std::vector list; while (reader.getline(&*pipe)) { @@ -161,21 +165,22 @@ std::string localfs_tail(const std::string& path) { return ""; } - return shell_get_command_output(format_string("tail -1 %s ", path.c_str())); + return shell_get_command_output( + string::format_string("tail -1 %s ", path.c_str())); } bool localfs_exists(const std::string& path) { std::string test_f = shell_get_command_output( - format_string("[ -f %s ] ; echo $?", path.c_str())); + string::format_string("[ -f %s ] ; echo $?", path.c_str())); - if (trim_spaces(test_f) == "0") { + if (string::trim_spaces(test_f) == "0") { return true; } std::string test_d = shell_get_command_output( - format_string("[ -d %s ] ; echo $?", path.c_str())); + string::format_string("[ -d %s ] ; echo $?", path.c_str())); - if (trim_spaces(test_d) == "0") { + if (string::trim_spaces(test_d) == "0") { return true; } @@ -187,7 +192,7 @@ void localfs_mkdir(const std::string& path) { return; } - shell_execute(format_string("mkdir -p %s", path.c_str())); + shell_execute(string::format_string("mkdir -p %s", path.c_str())); } static size_t& hdfs_buffer_size_internal() { @@ -211,11 +216,11 @@ void hdfs_set_command(const std::string& x) { hdfs_command_internal() = x; } std::shared_ptr hdfs_open_read(std::string path, int* err_no, const std::string& converter) { if (fs_end_with_internal(path, ".gz")) { - path = - format_string("%s -text \"%s\"", hdfs_command().c_str(), path.c_str()); + path = string::format_string("%s -text \"%s\"", hdfs_command().c_str(), + path.c_str()); } else { - path = - format_string("%s -cat \"%s\"", hdfs_command().c_str(), path.c_str()); + path = string::format_string("%s -cat \"%s\"", hdfs_command().c_str(), + path.c_str()); } bool is_pipe = true; @@ -225,8 +230,8 @@ std::shared_ptr hdfs_open_read(std::string path, int* err_no, std::shared_ptr hdfs_open_write(std::string path, int* err_no, const std::string& converter) { - path = - format_string("%s -put - \"%s\"", hdfs_command().c_str(), path.c_str()); + path = string::format_string("%s -put - \"%s\"", hdfs_command().c_str(), + path.c_str()); bool is_pipe = true; if (fs_end_with_internal(path, ".gz\"")) { @@ -242,8 +247,8 @@ void hdfs_remove(const std::string& path) { return; } - shell_execute(format_string("%s -rmr %s &>/dev/null; true", - hdfs_command().c_str(), path.c_str())); + shell_execute(string::format_string("%s -rmr %s &>/dev/null; true", + hdfs_command().c_str(), path.c_str())); } std::vector hdfs_list(const std::string& path) { @@ -261,14 +266,15 @@ std::vector hdfs_list(const std::string& path) { do { err_no = 0; std::shared_ptr pipe; - pipe = shell_popen(format_string("%s -ls %s | ( grep ^- ; [ $? != 2 ] )", - hdfs_command().c_str(), path.c_str()), - "r", &err_no); - LineFileReader reader; + pipe = shell_popen( + string::format_string("%s -ls %s | ( grep ^- ; [ $? != 2 ] )", + hdfs_command().c_str(), path.c_str()), + "r", &err_no); + string::LineFileReader reader; list.clear(); while (reader.getline(&*pipe)) { - std::vector line = split_string(reader.get()); + std::vector line = string::split_string(reader.get()); if (line.size() != 8) { continue; } @@ -283,15 +289,15 @@ std::string hdfs_tail(const std::string& path) { return ""; } - return shell_get_command_output(format_string( + return shell_get_command_output(string::format_string( "%s -text %s | tail -1 ", hdfs_command().c_str(), path.c_str())); } bool hdfs_exists(const std::string& path) { - std::string test = shell_get_command_output(format_string( + std::string test = shell_get_command_output(string::format_string( "%s -test -e %s ; echo $?", hdfs_command().c_str(), path.c_str())); - if (trim_spaces(test) == "0") { + if (string::trim_spaces(test) == "0") { return true; } @@ -303,8 +309,8 @@ void hdfs_mkdir(const std::string& path) { return; } - shell_execute(format_string("%s -mkdir %s; true", hdfs_command().c_str(), - path.c_str())); + shell_execute(string::format_string("%s -mkdir %s; true", + hdfs_command().c_str(), path.c_str())); } int fs_select_internal(const std::string& path) { @@ -445,5 +451,5 @@ void fs_mkdir(const std::string& path) { LOG(FATAL) << "Not supported"; } } -} // namespace framework -} // namespace paddle +} // end namespace framework +} // end namespace paddle diff --git a/paddle/fluid/framework/common/fs.h b/paddle/fluid/framework/io/fs.h similarity index 96% rename from paddle/fluid/framework/common/fs.h rename to paddle/fluid/framework/io/fs.h index 66429482cc1e0d68a3bd1638804d06e23e8fe248..f08953552c5597a750a05de5ff7f3c8c77fd1f67 100644 --- a/paddle/fluid/framework/common/fs.h +++ b/paddle/fluid/framework/io/fs.h @@ -18,8 +18,8 @@ #include #include #include "glog/logging.h" -#include "paddle/fluid/framework/common/ps_string.h" -#include "paddle/fluid/framework/common/shell.h" +#include "paddle/fluid/framework/io/shell.h" +#include "paddle/fluid/string/string_helper.h" namespace paddle { namespace framework { diff --git a/paddle/fluid/framework/common/shell.cc b/paddle/fluid/framework/io/shell.cc similarity index 98% rename from paddle/fluid/framework/common/shell.cc rename to paddle/fluid/framework/io/shell.cc index ff6e828aa1578e33c0fbe53ca90eadf18f53e3b2..286f48f6f177787c1a0e543e1a0c17a9680d88a8 100644 --- a/paddle/fluid/framework/common/shell.cc +++ b/paddle/fluid/framework/io/shell.cc @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "paddle/fluid/framework/common/shell.h" +#include "paddle/fluid/framework/io/shell.h" namespace paddle { namespace framework { @@ -282,7 +282,7 @@ std::string shell_get_command_output(const std::string& cmd) { do { err_no = 0; std::shared_ptr pipe = shell_popen(cmd, "r", &err_no); - LineFileReader reader; + string::LineFileReader reader; if (reader.getdelim(&*pipe, 0)) { pipe = nullptr; @@ -294,5 +294,5 @@ std::string shell_get_command_output(const std::string& cmd) { return ""; } -} // namespace framework -} // namespace paddle +} // end namespace framework +} // end namespace paddle diff --git a/paddle/fluid/framework/common/shell.h b/paddle/fluid/framework/io/shell.h similarity index 97% rename from paddle/fluid/framework/common/shell.h rename to paddle/fluid/framework/io/shell.h index 41ef3a9957679edc7e940d91da1a9cf769a12af0..effaa1e99ed477bd1667b3237faed2824967911b 100644 --- a/paddle/fluid/framework/common/shell.h +++ b/paddle/fluid/framework/io/shell.h @@ -23,7 +23,7 @@ #include #include #include "glog/logging.h" -#include "paddle/fluid/framework/common/ps_string.h" +#include "paddle/fluid/string/string_helper.h" namespace paddle { namespace framework { diff --git a/paddle/fluid/framework/common/ps_string.h b/paddle/fluid/string/string_helper.h similarity index 99% rename from paddle/fluid/framework/common/ps_string.h rename to paddle/fluid/string/string_helper.h index 6de9b7be32468445373a63369a25d79b73f92714..48af332bb816b4fe51b86cc8c21ef62db2292f1e 100644 --- a/paddle/fluid/framework/common/ps_string.h +++ b/paddle/fluid/string/string_helper.h @@ -23,7 +23,7 @@ #include "glog/logging.h" namespace paddle { -namespace framework { +namespace string { inline size_t count_spaces(const char* s) { size_t count = 0; @@ -234,5 +234,5 @@ class LineFileReader { size_t _buf_size = 0; size_t _length = 0; }; -} // end namespace framework +} // end namespace string } // end namespace paddle diff --git a/python/paddle/fluid/data_feed_desc.py b/python/paddle/fluid/data_feed_desc.py index d2ec74d6cfdeb34c1f48c086a3aa30d5100c3efb..2770c0209e15dc07033984e1a8fb95db29039819 100644 --- a/python/paddle/fluid/data_feed_desc.py +++ b/python/paddle/fluid/data_feed_desc.py @@ -114,6 +114,24 @@ class DataFeedDesc(object): self.proto_desc.multi_slot_desc.slots[self.__name_to_index[ name]].is_dense = True + def set_pipe_command(self, pipe_command): + """ + Pipeline command will be set with this function. In IO runtime, + pipeline command will be executed given user provided input raw + files. + + Example: + >>> data_feed = fluid.DataFeedDesc('data.proto') + >>> data_feed.set_pipe_command('awk -F '\t' '{print $2}'') + + Args: + pipe_command: a command string of shell command + + Note: + Default is cat, i.e., cat user's input file list to data feed + """ + self.proto_desc.pipe_command = pipe_command + def set_use_slots(self, use_slots_name): """ Set if a specific slot will be used for training. A dataset shall