From afaf937010adc9acf520a10bfacfe6eb2124869f Mon Sep 17 00:00:00 2001 From: dongdaxiang Date: Fri, 22 Feb 2019 10:05:12 +0800 Subject: [PATCH] add fs_local_open example --- paddle/fluid/framework/CMakeLists.txt | 4 + paddle/fluid/framework/async_executor.cc | 2 + paddle/fluid/framework/common/CMakeLists.txt | 2 + paddle/fluid/framework/common/fs.cc | 450 ++++++++++++++++++ paddle/fluid/framework/common/fs.h | 100 ++++ paddle/fluid/framework/common/ps_string.h | 238 +++++++++ paddle/fluid/framework/common/shell.cc | 298 ++++++++++++ paddle/fluid/framework/common/shell.h | 60 +++ paddle/fluid/framework/data_feed.cc | 105 +++- paddle/fluid/framework/data_feed.h | 7 + .../fluid/framework/executor_thread_worker.cc | 8 +- 11 files changed, 1260 insertions(+), 14 deletions(-) create mode 100644 paddle/fluid/framework/common/CMakeLists.txt create mode 100644 paddle/fluid/framework/common/fs.cc create mode 100644 paddle/fluid/framework/common/fs.h create mode 100644 paddle/fluid/framework/common/ps_string.h create mode 100644 paddle/fluid/framework/common/shell.cc create mode 100644 paddle/fluid/framework/common/shell.h diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index 9cdf8f691f..2e5380afbc 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -23,7 +23,11 @@ endfunction() add_subdirectory(ir) add_subdirectory(details) +<<<<<<< HEAD add_subdirectory(fleet) +======= +add_subdirectory(common) +>>>>>>> add fs_local_open example #ddim lib proto_library(framework_proto SRCS framework.proto) proto_library(async_executor_proto SRCS data_feed.proto) diff --git a/paddle/fluid/framework/async_executor.cc b/paddle/fluid/framework/async_executor.cc index 67770f77c2..9d8246d713 100644 --- a/paddle/fluid/framework/async_executor.cc +++ b/paddle/fluid/framework/async_executor.cc @@ -18,6 +18,8 @@ limitations under the License. */ #include "google/protobuf/text_format.h" #include "gflags/gflags.h" +#include "paddle/fluid/framework/common/fs.h" +#include "paddle/fluid/framework/common/shell.h" #include "paddle/fluid/framework/data_feed_factory.h" #include "paddle/fluid/framework/executor_thread_worker.h" #include "paddle/fluid/framework/feed_fetch_method.h" diff --git a/paddle/fluid/framework/common/CMakeLists.txt b/paddle/fluid/framework/common/CMakeLists.txt new file mode 100644 index 0000000000..bc43f569b7 --- /dev/null +++ b/paddle/fluid/framework/common/CMakeLists.txt @@ -0,0 +1,2 @@ +cc_library(fs SRCS fs.cc DEPS glog boost) +cc_library(shell SRCS shell.cc DEPS glog) diff --git a/paddle/fluid/framework/common/fs.cc b/paddle/fluid/framework/common/fs.cc new file mode 100644 index 0000000000..295b2d3c54 --- /dev/null +++ b/paddle/fluid/framework/common/fs.cc @@ -0,0 +1,450 @@ +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/framework/common/fs.h" + +namespace paddle { +namespace framework { + +static void fs_add_read_converter_internal(std::string& path, // NOLINT + bool& is_pipe, // NOLINT + const std::string& converter) { + if (converter == "") { + return; + } + + if (!is_pipe) { + path = format_string("( %s ) < \"%s\"", converter.c_str(), path.c_str()); + is_pipe = true; + } else { + path = format_string("%s | %s", path.c_str(), converter.c_str()); + } +} + +static void fs_add_write_converter_internal(std::string& path, // NOLINT + bool& is_pipe, // NOLINT + const std::string& converter) { + if (converter == "") { + return; + } + + if (!is_pipe) { + path = format_string("( %s ) > \"%s\"", converter.c_str(), path.c_str()); + is_pipe = true; + } else { + path = format_string("%s | %s", converter.c_str(), path.c_str()); + } +} + +static std::shared_ptr fs_open_internal(const std::string& path, + bool is_pipe, + const std::string& mode, + size_t buffer_size, + int* err_no = 0) { + std::shared_ptr fp = nullptr; + + if (!is_pipe) { + fp = shell_fopen(path, mode); + } else { + fp = shell_popen(path, mode, err_no); + } + + if (buffer_size > 0) { + char* buffer = new char[buffer_size]; + CHECK_EQ(0, setvbuf(&*fp, buffer, _IOFBF, buffer_size)); + fp = {&*fp, + [ fp, buffer ] reinterpret_cast mutable {CHECK(fp.unique()); + fp = nullptr; + delete[] buffer; + } +}; +} + +return fp; +} + +static bool fs_begin_with_internal(const std::string& path, + const std::string& str) { + return strncmp(path.c_str(), str.c_str(), str.length()) == 0; +} + +static bool fs_end_with_internal(const std::string& path, + const std::string& str) { + return path.length() >= str.length() && + strncmp(&path[path.length() - str.length()], str.c_str(), + str.length()) == 0; +} + +static size_t& localfs_buffer_size_internal() { + static size_t x = 0; + return x; +} + +size_t localfs_buffer_size() { return localfs_buffer_size_internal(); } + +void localfs_set_buffer_size(size_t x) { localfs_buffer_size_internal() = x; } + +std::shared_ptr localfs_open_read(std::string path, + const std::string& converter) { + bool is_pipe = false; + + if (fs_end_with_internal(path, ".gz")) { + fs_add_read_converter_internal(path, is_pipe, "zcat"); + } + + fs_add_read_converter_internal(path, is_pipe, converter); + return fs_open_internal(path, is_pipe, "r", localfs_buffer_size()); +} + +std::shared_ptr localfs_open_write(std::string path, + const std::string& converter) { + shell_execute(format_string("mkdir -p $(dirname \"%s\")", path.c_str())); + + bool is_pipe = false; + + if (fs_end_with_internal(path, ".gz")) { + fs_add_write_converter_internal(path, is_pipe, "gzip"); + } + + fs_add_write_converter_internal(path, is_pipe, converter); + return fs_open_internal(path, is_pipe, "w", localfs_buffer_size()); +} + +int64_t localfs_file_size(const std::string& path) { + struct stat buf; + if (0 != stat(path.c_str(), &buf)) { + LOG(FATAL) << "file stat not zero"; + return -1; + } + return (int64_t)buf.st_size; +} + +void localfs_remove(const std::string& path) { + if (path == "") { + return; + } + + shell_execute(format_string("rm -rf %s", path.c_str())); +} + +std::vector localfs_list(const std::string& path) { + if (path == "") { + return {}; + } + + std::shared_ptr pipe; + int err_no = 0; + pipe = shell_popen(format_string("find %s -type f -maxdepth 1", path.c_str()), + "r", &err_no); + LineFileReader reader; + std::vector list; + + while (reader.getline(&*pipe)) { + list.push_back(reader.get()); + } + + return list; +} + +std::string localfs_tail(const std::string& path) { + if (path == "") { + return ""; + } + + return shell_get_command_output(format_string("tail -1 %s ", path.c_str())); +} + +bool localfs_exists(const std::string& path) { + std::string test_f = shell_get_command_output( + format_string("[ -f %s ] ; echo $?", path.c_str())); + + if (trim_spaces(test_f) == "0") { + return true; + } + + std::string test_d = shell_get_command_output( + format_string("[ -d %s ] ; echo $?", path.c_str())); + + if (trim_spaces(test_d) == "0") { + return true; + } + + return false; +} + +void localfs_mkdir(const std::string& path) { + if (path == "") { + return; + } + + shell_execute(format_string("mkdir -p %s", path.c_str())); +} + +static size_t& hdfs_buffer_size_internal() { + static size_t x = 0; + return x; +} + +size_t hdfs_buffer_size() { return hdfs_buffer_size_internal(); } + +void hdfs_set_buffer_size(size_t x) { hdfs_buffer_size_internal() = x; } + +static std::string& hdfs_command_internal() { + static std::string x = "hadoop fs"; + return x; +} + +const std::string& hdfs_command() { return hdfs_command_internal(); } + +void hdfs_set_command(const std::string& x) { hdfs_command_internal() = x; } + +std::shared_ptr hdfs_open_read(std::string path, int* err_no, + const std::string& converter) { + if (fs_end_with_internal(path, ".gz")) { + path = + format_string("%s -text \"%s\"", hdfs_command().c_str(), path.c_str()); + } else { + path = + format_string("%s -cat \"%s\"", hdfs_command().c_str(), path.c_str()); + } + + bool is_pipe = true; + fs_add_read_converter_internal(path, is_pipe, converter); + return fs_open_internal(path, is_pipe, "r", hdfs_buffer_size(), err_no); +} + +std::shared_ptr hdfs_open_write(std::string path, int* err_no, + const std::string& converter) { + path = + format_string("%s -put - \"%s\"", hdfs_command().c_str(), path.c_str()); + bool is_pipe = true; + + if (fs_end_with_internal(path, ".gz\"")) { + fs_add_write_converter_internal(path, is_pipe, "gzip"); + } + + fs_add_write_converter_internal(path, is_pipe, converter); + return fs_open_internal(path, is_pipe, "w", hdfs_buffer_size(), err_no); +} + +void hdfs_remove(const std::string& path) { + if (path == "") { + return; + } + + shell_execute(format_string("%s -rmr %s &>/dev/null; true", + hdfs_command().c_str(), path.c_str())); +} + +std::vector hdfs_list(const std::string& path) { + if (path == "") { + return {}; + } + + std::string prefix = "hdfs:"; + + if (fs_begin_with_internal(path, "afs:")) { + prefix = "afs:"; + } + int err_no = 0; + std::vector list; + do { + err_no = 0; + std::shared_ptr pipe; + pipe = shell_popen(format_string("%s -ls %s | ( grep ^- ; [ $? != 2 ] )", + hdfs_command().c_str(), path.c_str()), + "r", &err_no); + LineFileReader reader; + list.clear(); + + while (reader.getline(&*pipe)) { + std::vector line = split_string(reader.get()); + if (line.size() != 8) { + continue; + } + list.push_back(prefix + line[7]); + } + } while (err_no == -1); + return list; +} + +std::string hdfs_tail(const std::string& path) { + if (path == "") { + return ""; + } + + return shell_get_command_output(format_string( + "%s -text %s | tail -1 ", hdfs_command().c_str(), path.c_str())); +} + +bool hdfs_exists(const std::string& path) { + std::string test = shell_get_command_output(format_string( + "%s -test -e %s ; echo $?", hdfs_command().c_str(), path.c_str())); + + if (trim_spaces(test) == "0") { + return true; + } + + return false; +} + +void hdfs_mkdir(const std::string& path) { + if (path == "") { + return; + } + + shell_execute(format_string("%s -mkdir %s; true", hdfs_command().c_str(), + path.c_str())); +} + +int fs_select_internal(const std::string& path) { + if (fs_begin_with_internal(path, "hdfs:")) { + return 1; + } else if (fs_begin_with_internal(path, "afs:")) { + return 1; + } + + return 0; +} + +std::shared_ptr fs_open_read(const std::string& path, int* err_no, + const std::string& converter) { + switch (fs_select_internal(path)) { + case 0: + return localfs_open_read(path, converter); + + case 1: + return hdfs_open_read(path, err_no, converter); + + default: + LOG(FATAL) << "Not supported"; + } + + return {}; +} + +std::shared_ptr fs_open_write(const std::string& path, int* err_no, + const std::string& converter) { + switch (fs_select_internal(path)) { + case 0: + return localfs_open_write(path, converter); + + case 1: + return hdfs_open_write(path, err_no, converter); + + default: + LOG(FATAL) << "Not supported"; + } + + return {}; +} + +std::shared_ptr fs_open(const std::string& path, const std::string& mode, + int* err_no, const std::string& converter) { + if (mode == "r" || mode == "rb") { + return fs_open_read(path, err_no, converter); + } + + if (mode == "w" || mode == "wb") { + return fs_open_write(path, err_no, converter); + } + + LOG(FATAL) << "Unknown mode: " << mode; + return {}; +} + +int64_t fs_file_size(const std::string& path) { + switch (fs_select_internal(path)) { + case 0: + return localfs_file_size(path); + + default: + LOG(FATAL) << "Not supported"; + } + + return 0; +} + +void fs_remove(const std::string& path) { + switch (fs_select_internal(path)) { + case 0: + return localfs_remove(path); + + case 1: + return hdfs_remove(path); + + default: + LOG(FATAL) << "Not supported"; + } +} + +std::vector fs_list(const std::string& path) { + switch (fs_select_internal(path)) { + case 0: + return localfs_list(path); + + case 1: + return hdfs_list(path); + + default: + LOG(FATAL) << "Not supported"; + } + + return {}; +} + +std::string fs_tail(const std::string& path) { + switch (fs_select_internal(path)) { + case 0: + return localfs_tail(path); + + case 1: + return hdfs_tail(path); + + default: + LOG(FATAL) << "Not supported"; + } + + return ""; +} + +bool fs_exists(const std::string& path) { + switch (fs_select_internal(path)) { + case 0: + return localfs_exists(path); + + case 1: + return hdfs_exists(path); + + default: + LOG(FATAL) << "Not supported"; + } + + return false; +} + +void fs_mkdir(const std::string& path) { + switch (fs_select_internal(path)) { + case 0: + return localfs_mkdir(path); + + case 1: + return hdfs_mkdir(path); + + default: + LOG(FATAL) << "Not supported"; + } +} +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/common/fs.h b/paddle/fluid/framework/common/fs.h new file mode 100644 index 0000000000..66429482cc --- /dev/null +++ b/paddle/fluid/framework/common/fs.h @@ -0,0 +1,100 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include "glog/logging.h" +#include "paddle/fluid/framework/common/ps_string.h" +#include "paddle/fluid/framework/common/shell.h" + +namespace paddle { +namespace framework { + +int fs_select_internal(const std::string& path); + +// localfs +extern size_t localfs_buffer_size(); + +extern void localfs_set_buffer_size(size_t x); + +extern std::shared_ptr localfs_open_read(std::string path, + const std::string& converter); + +extern std::shared_ptr localfs_open_write(std::string path, + const std::string& converter); + +extern int64_t localfs_file_size(const std::string& path); + +extern void localfs_remove(const std::string& path); + +extern std::vector localfs_list(const std::string& path); + +extern std::string localfs_tail(const std::string& path); + +extern bool localfs_exists(const std::string& path); + +extern void localfs_mkdir(const std::string& path); + +// hdfs +extern size_t hdfs_buffer_size(); + +extern void hdfs_set_buffer_size(size_t x); + +extern const std::string& hdfs_command(); + +extern void hdfs_set_command(const std::string& x); + +extern std::shared_ptr hdfs_open_read(std::string path, int* err_no, + const std::string& converter); + +extern std::shared_ptr hdfs_open_write(std::string path, int* err_no, + const std::string& converter); + +extern void hdfs_remove(const std::string& path); + +extern std::vector hdfs_list(const std::string& path); + +extern std::string hdfs_tail(const std::string& path); + +extern bool hdfs_exists(const std::string& path); + +extern void hdfs_mkdir(const std::string& path); + +// aut-detect fs +extern std::shared_ptr fs_open_read(const std::string& path, int* err_no, + const std::string& converter); + +extern std::shared_ptr fs_open_write(const std::string& path, int* err_no, + const std::string& converter); + +extern std::shared_ptr fs_open(const std::string& path, + const std::string& mode, int* err_no, + const std::string& converter = ""); + +extern int64_t fs_file_size(const std::string& path); + +extern void fs_remove(const std::string& path); + +extern std::vector fs_list(const std::string& path); + +extern std::string fs_tail(const std::string& path); + +extern bool fs_exists(const std::string& path); + +extern void fs_mkdir(const std::string& path); +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/common/ps_string.h b/paddle/fluid/framework/common/ps_string.h new file mode 100644 index 0000000000..6de9b7be32 --- /dev/null +++ b/paddle/fluid/framework/common/ps_string.h @@ -0,0 +1,238 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include "boost/lexical_cast.hpp" +#include "glog/logging.h" + +namespace paddle { +namespace framework { + +inline size_t count_spaces(const char* s) { + size_t count = 0; + + while (*s != 0 && isspace(*s++)) { + count++; + } + + return count; +} + +inline size_t count_nonspaces(const char* s) { + size_t count = 0; + + while (*s != 0 && !isspace(*s++)) { + count++; + } + + return count; +} + +template +void format_string_append(std::string& str, const char* fmt, // NOLINT + ARGS&&... args) { // use VA_ARGS may be better ? + int len = snprintf(NULL, 0, fmt, args...); + CHECK_GE(len, 0); + size_t oldlen = str.length(); + str.resize(oldlen + len + 1); + CHECK(snprintf(&str[oldlen], (size_t)len + 1, fmt, args...) == len); + str.resize(oldlen + len); +} + +template +void format_string_append(std::string& str, const std::string& fmt, // NOLINT + ARGS&&... args) { + format_string_append(str, fmt.c_str(), args...); +} + +template +std::string format_string(const char* fmt, ARGS&&... args) { + std::string str; + format_string_append(str, fmt, args...); + return std::move(str); +} + +template +std::string format_string(const std::string& fmt, ARGS&&... args) { + return format_string(fmt.c_str(), args...); +} + +// remove leading and tailing spaces +inline std::string trim_spaces(const std::string& str) { + const char* p = str.c_str(); + + while (*p != 0 && isspace(*p)) { + p++; + } + + size_t len = strlen(p); + + while (len > 0 && isspace(p[len - 1])) { + len--; + } + + return std::string(p, len); +} + +inline int str_to_float(const char* str, float* v) { + const char* head = str; + char* cursor = NULL; + int index = 0; + while (*(head += count_spaces(head)) != 0) { + v[index++] = std::strtof(head, &cursor); + if (head == cursor) { + break; + } + head = cursor; + } + return index; +} + +// split string by delim +template +std::vector split_string(const std::string& str, const std::string& delim) { + size_t pre_pos = 0; + size_t pos = 0; + std::string tmp_str; + std::vector res_list; + res_list.clear(); + if (str.empty()) { + return res_list; + } + + while ((pos = str.find(delim, pre_pos)) != std::string::npos) { + tmp_str.assign(str, pre_pos, pos - pre_pos); + res_list.push_back(tmp_str); + pre_pos = pos + 1; + } + tmp_str.assign(str, pre_pos, str.length() - pre_pos); + if (!tmp_str.empty()) { + res_list.push_back(tmp_str); + } + return res_list; + /* + size_t num = 1; + const char* p; + + for (p = str.c_str(); *p != 0; p++) { + if (*p == delim) { + num++; + } + } + + std::vector list(num); + const char* last = str.c_str(); + num = 0; + + for (p = str.c_str(); *p != 0; p++) { + if (*p == delim) { + list[num++] = boost::lexical_cast(last, p - last); + last = p + 1; + } + } + + list[num] = boost::lexical_cast(last, p - last); + return list; + */ +} + +// split string by spaces. Leading and tailing spaces are ignored. Consecutive +// spaces are treated as one delim. +template +std::vector split_string(const std::string& str) { + std::vector list; + const char* p; + int pre_pos = 0; + int pos = 0; + std::string tmp_str; + if (str.empty()) { + return list; + } + for (p = str.c_str(); *p != 0;) { + if (!isspace(*p)) { + pos = pre_pos; + p++; + + while (*p != 0 && !isspace(*p)) { + pos++; + p++; + } + tmp_str.assign(str, pre_pos, pos - pre_pos + 1); + list.push_back(tmp_str); + pre_pos = pos + 1; + } else { + pre_pos++; + p++; + } + } + + return list; +} + +template +std::string join_strings(const std::vector& strs, char delim) { + std::string str; + + for (size_t i = 0; i < strs.size(); i++) { + if (i > 0) { + str += delim; + } + + str += boost::lexical_cast(strs[i]); + } + + return str; +} + +// A helper class for reading lines from file. A line buffer is maintained. It +// doesn't need to know the maximum possible length of a line. +class LineFileReader { + public: + LineFileReader() {} + LineFileReader(LineFileReader&&) = delete; + LineFileReader(const LineFileReader&) = delete; + ~LineFileReader() { ::free(_buffer); } + char* getline(FILE* f) { return this->getdelim(f, '\n'); } + char* getdelim(FILE* f, char delim) { + ssize_t ret = ::getdelim(&_buffer, &_buf_size, delim, f); + + if (ret >= 0) { + if (ret >= 1 && _buffer[ret - 1] == delim) { + _buffer[--ret] = 0; + } + + _length = (size_t)ret; + return _buffer; + } else { + _length = 0; + CHECK(feof(f)); + return NULL; + } + } + char* get() { return _buffer; } + size_t length() { return _length; } + + private: + char* _buffer = NULL; + size_t _buf_size = 0; + size_t _length = 0; +}; +} // end namespace framework +} // end namespace paddle diff --git a/paddle/fluid/framework/common/shell.cc b/paddle/fluid/framework/common/shell.cc new file mode 100644 index 0000000000..6e423d9071 --- /dev/null +++ b/paddle/fluid/framework/common/shell.cc @@ -0,0 +1,298 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/common/shell.h" + +namespace paddle { +namespace framework { + +std::shared_ptr shell_fopen(const std::string& path, + const std::string& mode) { + if (shell_verbose()) { + LOG(INFO) << "Opening file[" << path << "] with mode[" << mode << "]"; + } + FILE* fp; + if (!(fp = fopen(path.c_str(), mode.c_str()))) { + LOG(FATAL) << "fopen fail, path[" << path << "], mode[" << mode << "]"; + } + return {fp, [path](FILE* fp) { + if (shell_verbose()) { + LOG(INFO) << "Closing file[" << path << "]"; + } + if (0 != fclose(fp)) { + LOG(FATAL) << "fclose fail, path[" << path << "]"; + } + }}; +} + +// Close all open file descriptors +// The implementation is async signal safe +// Mostly copy from CPython code +static int close_open_fds_internal() { + struct linux_dirent { + int64 d_ino = 0; + off_t d_off; + uint16 d_reclen = 0; + char d_name[256]; + }; + + int dir_fd = -1; + if ((dir_fd = open("/proc/self/fd", O_RDONLY)) < 0) { + LOG(FATAL) << "proc/self/fd open fail"; + return -1; + } + char buffer[sizeof(linux_dirent)]; + + for (;;) { + int bytes = 0; + if ((bytes = syscall(SYS_getdents, dir_fd, + reinterpret_cast(buffer), + sizeof(buffer))) < 0) { + LOG(FATAL) << "syscall fail"; + return -1; + } + + if (bytes == 0) { + break; + } + + linux_dirent* entry = NULL; + + for (int offset = 0; offset < bytes; offset += entry->d_reclen) { + entry = reinterpret_cast(buffer + offset); + int fd = 0; + const char* s = entry->d_name; + + while (*s >= '0' && *s <= '9') { + fd = fd * 10 + (*s - '0'); + s++; + } + + if (s != entry->d_name && fd != dir_fd && fd >= 3) { + close(fd); + } + } + } + + close(dir_fd); + return 0; +} + +static int shell_popen_fork_internal(const char* real_cmd, bool do_read, + int parent_end, int child_end) { + int child_pid = -1; + // Too frequent calls to fork() makes openmpi very slow. Use vfork() instead. + // But vfork() is very dangerous. Be careful. + if ((child_pid = vfork()) < 0) { + return -1; + } + + // The following code is async signal safe (No memory allocation, no access to + // global data, etc.) + if (child_pid != 0) { + return child_pid; + } + + int child_std_end = do_read ? 1 : 0; + close(parent_end); + + if (child_end != child_std_end) { + if (dup2(child_end, child_std_end) != child_std_end) { + return -1; + } + close(child_end); + } + + close_open_fds_internal(); + if (execl("/bin/sh", "sh", "-c", real_cmd, NULL) < 0) { + return -1; + } + exit(127); +} + +std::shared_ptr shell_popen(const std::string& cmd, + const std::string& mode, int* err_no) { + bool do_read = mode == "r"; + bool do_write = mode == "w"; + if (!(do_read || do_write)) { + *err_no = -1; + return NULL; + } + + if (shell_verbose()) { + LOG(INFO) << "Opening pipe[" << cmd << "] with mode[" << mode << "]"; + } + + std::string real_cmd = "set -o pipefail; " + cmd; + + int pipe_fds[2]; + if (pipe(pipe_fds) != 0) { + *err_no = -1; + return NULL; + } + int parent_end = 0; + int child_end = 0; + + if (do_read) { + parent_end = pipe_fds[0]; + child_end = pipe_fds[1]; + } else if (do_write) { + parent_end = pipe_fds[1]; + child_end = pipe_fds[0]; + } + + int child_pid = shell_popen_fork_internal(real_cmd.c_str(), do_read, + parent_end, child_end); + close(child_end); + fcntl(parent_end, F_SETFD, FD_CLOEXEC); + FILE* fp; + if ((fp = fdopen(parent_end, mode.c_str())) == NULL) { + *err_no = -1; + return NULL; + } + return {fp, [child_pid, cmd, err_no](FILE* fp) { + if (shell_verbose()) { + LOG(INFO) << "Closing pipe[" << cmd << "]"; + } + + if (fclose(fp) != 0) { + *err_no = -1; + } + int wstatus = -1; + // int ret = waitpid(child_pid, &wstatus, 0); + waitpid(child_pid, &wstatus, 0); + if (wstatus == 0 || wstatus == (128 + SIGPIPE) * 256 || + (wstatus == -1 && errno == ECHILD)) { + // LOG(INFO) << "status[" << wstatus << "], cmd[" << cmd << "]" << + // ", err_no[" << *err_no << "]"; + } else { + *err_no = -1; + LOG(WARNING) << "status[" << wstatus << "], cmd[" << cmd << "]" + << ", err_no[" << *err_no << "]"; + } + if (wstatus == -1 && errno == ECHILD) { + LOG(WARNING) << "errno is ECHILD"; + } + }}; +} + +static int shell_p2open_fork_internal(const char* real_cmd, int pipein_fds[2], + int pipeout_fds[2]) { + int child_pid = -1; + if ((child_pid = fork()) < 0) { + return -1; + } + + if (child_pid != 0) { + return child_pid; + } + + close(pipein_fds[0]); + close(pipeout_fds[1]); + + if (pipein_fds[1] != 1) { + if (dup2(pipein_fds[1], 1) != 1) { + return -1; + } + close(pipein_fds[1]); + } + + if (pipeout_fds[0] != 0) { + if (dup2(pipeout_fds[0], 0) != 0) { + return -1; + } + close(pipeout_fds[0]); + } + + close_open_fds_internal(); + if (execl("/bin/sh", "sh", "-c", real_cmd, NULL) < 0) { + return -1; + } + exit(127); +} + +std::pair, std::shared_ptr> shell_p2open( + const std::string& cmd) { + if (shell_verbose()) { + LOG(INFO) << "Opening bidirectional pipe[" << cmd << "]"; + } + + std::string real_cmd = "set -o pipefail; " + cmd; + + int pipein_fds[2]; + int pipeout_fds[2]; + if (pipe(pipein_fds) != 0) { + return {NULL, NULL}; + } + if (pipe(pipeout_fds) != 0) { + return {NULL, NULL}; + } + + int child_pid = + shell_p2open_fork_internal(real_cmd.c_str(), pipein_fds, pipeout_fds); + + close(pipein_fds[1]); + close(pipeout_fds[0]); + fcntl(pipein_fds[0], F_SETFD, FD_CLOEXEC); + fcntl(pipeout_fds[1], F_SETFD, FD_CLOEXEC); + + std::shared_ptr child_life = { + NULL, [child_pid, cmd](void*) { + if (shell_verbose()) { + LOG(INFO) << "Closing bidirectional pipe[" << cmd << "]"; + } + + int wstatus, ret; + + do { + PCHECK((ret = waitpid(child_pid, &wstatus, 0)) >= 0 || + (ret == -1 && errno == EINTR)); + } while (ret == -1 && errno == EINTR); + + PCHECK(wstatus == 0 || wstatus == (128 + SIGPIPE) * 256 || + (wstatus == -1 && errno == ECHILD)) + << "status[" << wstatus << "], cmd[" << cmd << "]"; + + if (wstatus == -1 && errno == ECHILD) { + LOG(WARNING) << "errno is ECHILD"; + } + }}; + + FILE* in_fp; + PCHECK((in_fp = fdopen(pipein_fds[0], "r")) != NULL); + FILE* out_fp; + PCHECK((out_fp = fdopen(pipeout_fds[1], "w")) != NULL); + return {{in_fp, [child_life](FILE* fp) { PCHECK(fclose(fp) == 0); }}, + {out_fp, [child_life](FILE* fp) { PCHECK(fclose(fp) == 0); }}}; +} + +std::string shell_get_command_output(const std::string& cmd) { + int err_no = 0; + do { + err_no = 0; + std::shared_ptr pipe = shell_popen(cmd, "r", &err_no); + LineFileReader reader; + + if (reader.getdelim(&*pipe, 0)) { + pipe = nullptr; + if (err_no == 0) { + return reader.get(); + } + } + } while (err_no == -1); + + return ""; +} +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/common/shell.h b/paddle/fluid/framework/common/shell.h new file mode 100644 index 0000000000..41ef3a9957 --- /dev/null +++ b/paddle/fluid/framework/common/shell.h @@ -0,0 +1,60 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include "glog/logging.h" +#include "paddle/fluid/framework/common/ps_string.h" + +namespace paddle { +namespace framework { + +inline bool& shell_verbose_internal() { + static bool x = false; + return x; +} + +inline bool shell_verbose() { return shell_verbose_internal(); } + +inline void shell_set_verbose(bool x) { shell_verbose_internal() = x; } + +extern std::shared_ptr shell_fopen(const std::string& path, + const std::string& mode); + +extern std::shared_ptr shell_popen(const std::string& cmd, + const std::string& mode, int* err_no); + +extern std::pair, std::shared_ptr> shell_p2open( + const std::string& cmd); + +inline void shell_execute(const std::string& cmd) { + int err_no = 0; + do { + err_no = 0; + shell_popen(cmd, "w", &err_no); + } while (err_no == -1); +} + +extern std::string shell_get_command_output(const std::string& cmd); + +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index 41155cfb77..0703851d20 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -12,10 +12,13 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ +#include #include "google/protobuf/io/zero_copy_stream_impl.h" #include "google/protobuf/message.h" #include "google/protobuf/text_format.h" +#include "common/fs.h" +#include "common/shell.h" #include "gflags/gflags.h" #include "paddle/fluid/framework/data_feed.h" #include "paddle/fluid/framework/feed_fetch_method.h" @@ -64,7 +67,7 @@ bool DataFeed::PickOneFile(std::string* filename) { return false; } *filename = filelist_[file_idx_++]; - LOG(ERROR) << "pick file:" << *filename; + // LOG(ERROR) << "pick file:" << *filename; return true; } @@ -91,8 +94,24 @@ void PrivateQueueDataFeed::SetQueueSize(int queue_size) { template bool PrivateQueueDataFeed::Start() { CheckSetFileList(); - read_thread_ = std::thread(&PrivateQueueDataFeed::ReadThread, this); - read_thread_.detach(); + std::string filename; + while (PickOneFile(&filename)) { + int err_no = 0; + std::string pipeline_cmd = "cat"; + + std::string path = + "/home/users/dongdaxiang/pslib_ctr/local/data_mod/part-00012"; + fp_ = fs_open_read(path, &err_no, pipeline_cmd); + __fsetlocking(&*fp_, FSETLOCKING_BYCALLER); + thread_local LineFileReader reader; + while (reader.getline(&*(fp_.get()))) { + LOG(ERROR) << "read a line"; + } + + read_thread_ = std::thread(&PrivateQueueDataFeed::ReadThread, this); + read_thread_.detach(); + } + queue_->Close(); finish_start_ = true; return true; @@ -100,17 +119,10 @@ bool PrivateQueueDataFeed::Start() { template void PrivateQueueDataFeed::ReadThread() { - std::string filename; - while (PickOneFile(&filename)) { - file_.open(filename.c_str()); // is_text_feed - PADDLE_ENFORCE(file_.good(), "Open file<%s> fail.", filename.c_str()); - T instance; - while (ParseOneInstance(&instance)) { - queue_->Send(instance); - } - file_.close(); + T instance; + while (ParseOneInstanceFromPipe(&instance)) { + queue_->Send(instance); } - queue_->Close(); } template @@ -168,6 +180,14 @@ void MultiSlotDataFeed::Init( finish_init_ = true; } +void MultiSlotDataFeed::ReadThread() { + LOG(ERROR) << "Haha"; + std::vector instance; + while (ParseOneInstanceFromPipe(&instance)) { + queue_->Send(instance); + } +} + bool MultiSlotDataFeed::CheckFile(const char* filename) { CheckInit(); // get info of slots std::ifstream fin(filename); @@ -279,6 +299,65 @@ bool MultiSlotDataFeed::CheckFile(const char* filename) { return true; } +bool MultiSlotDataFeed::ParseOneInstanceFromPipe( + std::vector* instance) { + LOG(ERROR) << "hehe"; + thread_local LineFileReader reader; + while (reader.getline(&*(fp_.get()))) { + /* + const char* str = reader.get(); + std::string line = std::string(str); + LOG(ERROR) << line; + */ + LOG(ERROR) << "read a line"; + } + return true; + /* + if (!reader.getline(fp_.get())) { + return false; + } else { + // std::string& line = reader_.get(); + // const char* str = line.c_str(); + const char* str = reader.get(); + std::string line = std::string(str); + LOG(ERROR) << line; + char* endptr = const_cast(str); + int pos = 0; + for (size_t i = 0; i < use_slots_index_.size(); ++i) { + int idx = use_slots_index_[i]; + int num = strtol(&str[pos], &endptr, 10); + PADDLE_ENFORCE( + num, + "The number of ids can not be zero, you need padding " + "it in data generator; or if there is something wrong with " + "the data, please check if the data contains unresolvable " + "characters.\nplease check this error line: %s", + str); + if (idx != -1) { + (*instance)[idx].Init(all_slots_type_[i]); + if ((*instance)[idx].GetType()[0] == 'f') { // float + for (int j = 0; j < num; ++j) { + float feasign = strtof(endptr, &endptr); + (*instance)[idx].AddValue(feasign); + } + } else if ((*instance)[idx].GetType()[0] == 'u') { // uint64 + for (int j = 0; j < num; ++j) { + uint64_t feasign = (uint64_t)strtoull(endptr, &endptr, 10); + (*instance)[idx].AddValue(feasign); + } + } + pos = endptr - str; + } else { + for (int j = 0; j <= num; ++j) { + pos = line.find_first_of(' ', pos + 1); + } + } + } + return true; + } + */ +} + bool MultiSlotDataFeed::ParseOneInstance(std::vector* instance) { std::string line; if (getline(file_, line)) { diff --git a/paddle/fluid/framework/data_feed.h b/paddle/fluid/framework/data_feed.h index b027c71e97..de0289e4d2 100644 --- a/paddle/fluid/framework/data_feed.h +++ b/paddle/fluid/framework/data_feed.h @@ -21,6 +21,7 @@ limitations under the License. */ #include // NOLINT #include +#include "paddle/fluid/framework/common/ps_string.h" #include "paddle/fluid/framework/data_feed.pb.h" #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/reader.h" @@ -136,6 +137,7 @@ class PrivateQueueDataFeed : public DataFeed { virtual void SetQueueSize(int queue_size); // The reading and parsing method called in the ReadThread. virtual bool ParseOneInstance(T* instance) = 0; + virtual bool ParseOneInstanceFromPipe(T* instance) = 0; // This function is used to put instance to vec_ins virtual void AddInstanceToInsVec(T* vec_ins, const T& instance, int index) = 0; @@ -150,7 +152,9 @@ class PrivateQueueDataFeed : public DataFeed { // ifstream one line and one line parse: 6034 ms // fread one buffer and one buffer parse: 7097 ms std::ifstream file_; + std::shared_ptr fp_; size_t queue_size_; + LineFileReader reader_; // The queue for store parsed data std::unique_ptr> queue_; }; @@ -228,12 +232,15 @@ class MultiSlotDataFeed virtual ~MultiSlotDataFeed() {} virtual void Init(const paddle::framework::DataFeedDesc& data_feed_desc); virtual bool CheckFile(const char* filename); + // virtual void ReadThread(); protected: + virtual void ReadThread(); virtual void AddInstanceToInsVec(std::vector* vec_ins, const std::vector& instance, int index); virtual bool ParseOneInstance(std::vector* instance); + virtual bool ParseOneInstanceFromPipe(std::vector* instance); virtual void PutToFeedVec(const std::vector& ins_vec); private: diff --git a/paddle/fluid/framework/executor_thread_worker.cc b/paddle/fluid/framework/executor_thread_worker.cc index bac49459d4..efa148a6b5 100644 --- a/paddle/fluid/framework/executor_thread_worker.cc +++ b/paddle/fluid/framework/executor_thread_worker.cc @@ -13,12 +13,15 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/framework/executor_thread_worker.h" +#include #include #include "google/protobuf/io/zero_copy_stream_impl.h" #include "google/protobuf/message.h" #include "google/protobuf/text_format.h" #include "gflags/gflags.h" +#include "paddle/fluid/framework/common/fs.h" +#include "paddle/fluid/framework/common/shell.h" #include "paddle/fluid/framework/feed_fetch_method.h" #include "paddle/fluid/framework/feed_fetch_type.h" #include "paddle/fluid/framework/lod_rank_table.h" @@ -244,6 +247,8 @@ void ExecutorThreadWorker::TrainFilesWithTimer() { platform::SetNumThreads(1); SetDevice(); thread_reader_->Start(); + exit(0); + /* std::vector op_total_time; std::vector op_name; for (auto& op : ops_) { @@ -287,13 +292,14 @@ void ExecutorThreadWorker::TrainFilesWithTimer() { } timeline.Start(); } + */ } void ExecutorThreadWorker::TrainFiles() { platform::SetNumThreads(1); // todo: configurable - SetDevice(); + // SetDevice(); int fetch_var_num = fetch_var_names_.size(); fetch_values_.clear(); -- GitLab