From 501c9f25d26ca0f79b7786a36302f167fcaed1e4 Mon Sep 17 00:00:00 2001 From: rensilin Date: Tue, 6 Aug 2019 19:01:31 +0800 Subject: [PATCH] file_system Change-Id: I4dd39c85a5ba7225373a753783a9af809775f52e --- BCLOUD | 2 + paddle/fluid/string/string_helper.h | 12 + .../train/custom_trainer/feed/.clang-format | 33 ++ .../feed/io/auto_file_system.cc | 129 ++++++ .../custom_trainer/feed/io/file_system.cc | 42 ++ .../custom_trainer/feed/io/file_system.h | 59 +++ .../feed/io/hadoop_file_system.cc | 181 +++++++++ .../feed/io/local_file_system.cc | 136 +++++++ .../train/custom_trainer/feed/io/shell.cc | 378 ++++++++++++++++++ .../train/custom_trainer/feed/io/shell.h | 78 ++++ 10 files changed, 1050 insertions(+) create mode 100644 paddle/fluid/train/custom_trainer/feed/.clang-format create mode 100644 paddle/fluid/train/custom_trainer/feed/io/auto_file_system.cc create mode 100644 paddle/fluid/train/custom_trainer/feed/io/file_system.cc create mode 100644 paddle/fluid/train/custom_trainer/feed/io/file_system.h create mode 100644 paddle/fluid/train/custom_trainer/feed/io/hadoop_file_system.cc create mode 100644 paddle/fluid/train/custom_trainer/feed/io/local_file_system.cc create mode 100644 paddle/fluid/train/custom_trainer/feed/io/shell.cc create mode 100644 paddle/fluid/train/custom_trainer/feed/io/shell.h diff --git a/BCLOUD b/BCLOUD index 61cdb29b..eeca6170 100644 --- a/BCLOUD +++ b/BCLOUD @@ -72,6 +72,7 @@ HEADERS('paddle/fluid/train/custom_trainer/feed/dataset/*.h', '$INC/paddle/fluid HEADERS('paddle/fluid/train/custom_trainer/feed/process/*.h', '$INC/paddle/fluid/train/custom_trainer/feed/process/') HEADERS('paddle/fluid/train/custom_trainer/feed/shuffler/*.h', '$INC/paddle/fluid/train/custom_trainer/feed/shuffler/') HEADERS('paddle/fluid/train/custom_trainer/feed/accessor/*.h', '$INC/paddle/fluid/train/custom_trainer/feed/accessor/') +HEADERS('paddle/fluid/train/custom_trainer/feed/io/*.h', '$INC/paddle/fluid/train/custom_trainer/feed/io/') NEED_OUTPUT("baidu/third-party/mklml") OUTPUT('paddle/fluid/train/custom_trainer/feed/conf', '$OUT') OUTPUT('paddle/fluid/train/custom_trainer/feed/scripts', '$OUT') @@ -79,6 +80,7 @@ OUTPUT('paddle/fluid/train/custom_trainer/feed/so', '$OUT') def UT_FILE(filename): UT_DIR = 'paddle/fluid/train/custom_trainer/feed/unit_test' + import os return os.path.join(UT_DIR, filename) custom_trainer_src = GLOB('paddle/fluid/train/custom_trainer/feed/*/*.cc', Exclude(UT_FILE('*'))) diff --git a/paddle/fluid/string/string_helper.h b/paddle/fluid/string/string_helper.h index cc09088c..cbf7fd2a 100644 --- a/paddle/fluid/string/string_helper.h +++ b/paddle/fluid/string/string_helper.h @@ -136,6 +136,18 @@ std::string join_strings(const Container& strs, char delim) { return str; } + + static inline bool end_with(const std::string& main_str, const std::string& str) { + return main_str.length() >= str.length() && + strncmp(main_str.c_str() + main_str.length() - str.length(), str.c_str(), str.length()) == + 0; + } + + static inline bool begin_with(const std::string& main_str, const std::string& str) { + return main_str.length() >= str.length() && + strncmp(main_str.c_str(), str.c_str(), str.length()) == 0; + } + // A helper class for reading lines from file. A line buffer is maintained. It // doesn't need to know the maximum possible length of a line. diff --git a/paddle/fluid/train/custom_trainer/feed/.clang-format b/paddle/fluid/train/custom_trainer/feed/.clang-format new file mode 100644 index 00000000..50fee4ce --- /dev/null +++ b/paddle/fluid/train/custom_trainer/feed/.clang-format @@ -0,0 +1,33 @@ +BasedOnStyle: Google +AccessModifierOffset: -4 +AlignAfterOpenBracket: AlwaysBreak +AlignOperands: false +AllowAllParametersOfDeclarationOnNextLine: false +AllowShortBlocksOnASingleLine: false +AllowShortCaseLabelsOnASingleLine: false +AllowShortFunctionsOnASingleLine: Empty +AllowShortIfStatementsOnASingleLine: false +AllowShortLoopsOnASingleLine: false +AlwaysBreakAfterReturnType: None +AlwaysBreakTemplateDeclarations: true +BinPackArguments: false +BinPackParameters: false +BreakConstructorInitializers: AfterColon +ColumnLimit: 100 +ConstructorInitializerIndentWidth: 8 +ContinuationIndentWidth: 8 +DerivePointerAlignment: true +FixNamespaceComments: true +IndentCaseLabels: false +IndentWidth: 4 +MaxEmptyLinesToKeep: 1 +NamespaceIndentation: None +PenaltyBreakAssignment: 2 +PenaltyBreakBeforeFirstCallParameter: 1 +PenaltyBreakComment: 500 +PenaltyBreakFirstLessLess: 120 +PenaltyBreakString: 1000 +PenaltyExcessCharacter: 1000000 +PenaltyReturnTypeOnItsOwnLine: 400 +PointerAlignment: Left +SortIncludes: false diff --git a/paddle/fluid/train/custom_trainer/feed/io/auto_file_system.cc b/paddle/fluid/train/custom_trainer/feed/io/auto_file_system.cc new file mode 100644 index 00000000..24302eb2 --- /dev/null +++ b/paddle/fluid/train/custom_trainer/feed/io/auto_file_system.cc @@ -0,0 +1,129 @@ +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/train/custom_trainer/feed/io/file_system.h" + +#include +#include + +#include "paddle/fluid/train/custom_trainer/feed/io/shell.h" +#include "paddle/fluid/string/string_helper.h" +#include "glog/logging.h" + +namespace paddle { +namespace custom_trainer { +namespace feed { + +class AutoFileSystem : public FileSystem { +public: + int initialize(const YAML::Node& config, std::shared_ptr context) override { + _file_system.clear(); + if (config) { + for (auto& prefix_fs: config) { + std::unique_ptr fs(CREATE_CLASS(FileSystem, prefix_fs.second["class"].as(""))); + if (fs == nullptr) { + VLOG(2) << "fail to create class: " << prefix_fs.second["class"].as(""); + return -1; + } + if (fs->initialize(prefix_fs.second, context) != 0) { + VLOG(2) << "fail to initialize class: " << prefix_fs.second["class"].as(""); + return 0; + } + _file_system.emplace(prefix_fs.first.as(""), std::move(fs)); + } + } + if (_file_system.find("default") == _file_system.end()) { + std::unique_ptr fs(CREATE_CLASS(FileSystem, "LocalFileSystem")); + if (fs == nullptr || fs->initialize(YAML::Load(""), context) != 0) { + return -1; + } + _file_system.emplace("default", std::move(fs)); + } + return 0; + } + + std::shared_ptr open_read(const std::string& path, const std::string& converter) + override { + return get_file_system(path)->open_read(path, converter); + } + + std::shared_ptr open_write(const std::string& path, const std::string& converter) + override { + return get_file_system(path)->open_write(path, converter); + } + + int64_t file_size(const std::string& path) override { + return get_file_system(path)->file_size(path); + } + + void remove(const std::string& path) override { + get_file_system(path)->remove(path); + } + + std::vector list(const std::string& path) override { + return get_file_system(path)->list(path); + } + + std::string tail(const std::string& path) override { + return get_file_system(path)->tail(path); + } + + bool exists(const std::string& path) override { + return get_file_system(path)->exists(path); + } + + void mkdir(const std::string& path) override { + get_file_system(path)->mkdir(path); + } + + FileSystem* get_file_system(const std::string& path) { + auto pos = path.find_first_of(":"); + if (pos != std::string::npos) { + auto substr = path.substr(0, pos + 1); + auto fs_it = _file_system.find(substr); + if (fs_it != _file_system.end()) { + return fs_it->second.get(); + } + } + VLOG(5) << "path: " << path << ", select default file system"; + return _file_system["default"].get(); + } + + int err_no() const override { + if (_err_no == 0) { + for (const auto& file_system : _file_system) { + if (file_system.second->err_no() != 0) { + const_cast(_err_no) = -1; + break; + } + } + } + return FileSystem::err_no(); + } + + void reset_err_no() override { + _err_no = 0; + for (auto& file_system : _file_system) { + file_system.second->reset_err_no(); + } + } + +private: + std::unordered_map> _file_system; +}; +REGISTER_CLASS(FileSystem, AutoFileSystem); + +} // namespace feed +} // namespace custom_trainer +} // namespace paddle diff --git a/paddle/fluid/train/custom_trainer/feed/io/file_system.cc b/paddle/fluid/train/custom_trainer/feed/io/file_system.cc new file mode 100644 index 00000000..2014cd23 --- /dev/null +++ b/paddle/fluid/train/custom_trainer/feed/io/file_system.cc @@ -0,0 +1,42 @@ +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/train/custom_trainer/feed/io/file_system.h" +#include + +namespace paddle { +namespace custom_trainer { +namespace feed { + +std::string FileSystem::path_join(const std::string& dir, const std::string& path) { + if (dir.empty()) { + return path; + } + if (dir.back() == '/') { + return dir + path; + } + return dir + '/' + path; +} + +std::pair FileSystem::path_split(const std::string& path) { + size_t pos = path.find_last_of('/'); + if (pos == std::string::npos) { + return {".", path}; + } + return {path.substr(0, pos), path.substr(pos + 1)}; +} + +} // namespace feed +} // namespace custom_trainer +} // namespace paddle diff --git a/paddle/fluid/train/custom_trainer/feed/io/file_system.h b/paddle/fluid/train/custom_trainer/feed/io/file_system.h new file mode 100644 index 00000000..d7aa1cc2 --- /dev/null +++ b/paddle/fluid/train/custom_trainer/feed/io/file_system.h @@ -0,0 +1,59 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include "paddle/fluid/train/custom_trainer/feed/common/registerer.h" +#include "paddle/fluid/train/custom_trainer/feed/trainer_context.h" +#include + +namespace paddle { +namespace custom_trainer { +namespace feed { + +class FileSystem { +public: + FileSystem() {} + virtual ~FileSystem() {} + virtual int initialize(const YAML::Node& config, std::shared_ptr context) = 0; + virtual std::shared_ptr open_read(const std::string& path, const std::string& converter) = 0; + virtual std::shared_ptr open_write(const std::string& path, const std::string& converter) = 0; + virtual int64_t file_size(const std::string& path) = 0; + virtual void remove(const std::string& path) = 0; + virtual std::vector list(const std::string& path) = 0; + virtual std::string tail(const std::string& path) = 0; + virtual bool exists(const std::string& path) = 0; + virtual void mkdir(const std::string& path) = 0; + virtual std::string path_join(const std::string& dir, const std::string& path); + virtual std::pair path_split(const std::string& path); + virtual int err_no() const { + return _err_no; + } + inline operator bool() { + return err_no() == 0; + } + virtual void reset_err_no() { + _err_no = 0; + } +protected: + int _err_no = 0; +}; +REGISTER_REGISTERER(FileSystem); + +} // namespace feed +} // namespace custom_trainer +} // namespace paddle diff --git a/paddle/fluid/train/custom_trainer/feed/io/hadoop_file_system.cc b/paddle/fluid/train/custom_trainer/feed/io/hadoop_file_system.cc new file mode 100644 index 00000000..abaa43a7 --- /dev/null +++ b/paddle/fluid/train/custom_trainer/feed/io/hadoop_file_system.cc @@ -0,0 +1,181 @@ +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/train/custom_trainer/feed/io/file_system.h" + +#include +#include + +#include "paddle/fluid/train/custom_trainer/feed/io/shell.h" +#include "paddle/fluid/string/string_helper.h" +#include "glog/logging.h" + +namespace paddle { +namespace custom_trainer { +namespace feed { + +class HadoopFileSystem : public FileSystem { +public: + int initialize(const YAML::Node& config, std::shared_ptr context) override { + _buffer_size = config["buffer_size"].as(0); + _hdfs_command = config["hdfs_command"].as("hadoop fs"); + _ugi.clear(); + for (const auto& prefix_ugi : config["ugi"]) { + _ugi.emplace(prefix_ugi.first.as(), prefix_ugi.second.as()); + } + if (_ugi.find("default") == _ugi.end()) { + VLOG(2) << "fail to load default ugi"; + return -1; + } + return 0; + } + + std::shared_ptr open_read(const std::string& path, const std::string& converter) + override { + std::string cmd; + if (string::end_with(path, ".gz")) { + cmd = string::format_string( + "%s -text \"%s\"", hdfs_command(path).c_str(), path.c_str()); + } else { + cmd = string::format_string( + "%s -cat \"%s\"", hdfs_command(path).c_str(), path.c_str()); + } + + bool is_pipe = true; + shell_add_read_converter(cmd, is_pipe, converter); + return shell_open(cmd, is_pipe, "r", _buffer_size, &_err_no); + } + + std::shared_ptr open_write(const std::string& path, const std::string& converter) + override { + std::string cmd = string::format_string("%s -put - \"%s\"", hdfs_command(path).c_str(), path.c_str()); + bool is_pipe = true; + + if (string::end_with(path, ".gz\"")) { + shell_add_write_converter(cmd, is_pipe, "gzip"); + } + + shell_add_write_converter(cmd, is_pipe, converter); + return shell_open(cmd, is_pipe, "w", _buffer_size, &_err_no); + } + + int64_t file_size(const std::string& path) override { + _err_no = -1; + VLOG(2) << "not support"; + return 0; + } + + void remove(const std::string& path) override { + if (path == "") { + return; + } + + shell_execute(string::format_string( + "%s -rmr %s &>/dev/null; true", _hdfs_command.c_str(), path.c_str())); + } + + std::vector list(const std::string& path) override { + if (path == "") { + return {}; + } + + std::string prefix = "hdfs:"; + + if (string::begin_with(path, "afs:")) { + prefix = "afs:"; + } + int err_no = 0; + std::vector list; + do { + err_no = 0; + std::shared_ptr pipe; + pipe = shell_popen( + string::format_string( + "%s -ls %s | ( grep ^- ; [ $? != 2 ] )", + hdfs_command(path).c_str(), + path.c_str()), + "r", + &err_no); + string::LineFileReader reader; + list.clear(); + + while (reader.getline(&*pipe)) { + std::vector line = string::split_string(reader.get()); + if (line.size() != 8) { + continue; + } + list.push_back(prefix + line[7]); + } + } while (err_no == -1); + return list; + } + + std::string tail(const std::string& path) override { + if (path == "") { + return ""; + } + + return shell_get_command_output(string::format_string( + "%s -text %s | tail -1 ", hdfs_command(path).c_str(), path.c_str())); + } + + bool exists(const std::string& path) override { + std::string test = shell_get_command_output(string::format_string( + "%s -test -e %s ; echo $?", hdfs_command(path).c_str(), path.c_str())); + + if (string::trim_spaces(test) == "0") { + return true; + } + + return false; + } + + void mkdir(const std::string& path) override { + if (path == "") { + return; + } + + shell_execute( + string::format_string("%s -mkdir %s; true", hdfs_command(path).c_str(), path.c_str())); + } + + + std::string hdfs_command(const std::string& path) { + auto start_pos = path.find_first_of(':'); + auto end_pos = path.find_first_of('/'); + if (start_pos != std::string::npos && end_pos != std::string::npos && start_pos < end_pos) { + auto fs_path = path.substr(start_pos + 1, end_pos - start_pos - 1); + auto ugi_it = _ugi.find(fs_path); + if (ugi_it != _ugi.end()) { + return hdfs_command_with_ugi(ugi_it->second); + } + } + VLOG(5) << "path: " << path << ", select default ugi"; + return hdfs_command_with_ugi(_ugi["default"]); + } + + std::string hdfs_command_with_ugi(std::string ugi) { + return string::format_string("%s -Dhadoop.job.ugi=\"%s\"", _hdfs_command.c_str(), ugi.c_str()); + } + +private: + size_t _buffer_size = 0; + std::string _hdfs_command; + std::unordered_map _ugi; +}; +REGISTER_CLASS(FileSystem, HadoopFileSystem); + +} // namespace feed +} // namespace custom_trainer +} // namespace paddle diff --git a/paddle/fluid/train/custom_trainer/feed/io/local_file_system.cc b/paddle/fluid/train/custom_trainer/feed/io/local_file_system.cc new file mode 100644 index 00000000..287d3e0a --- /dev/null +++ b/paddle/fluid/train/custom_trainer/feed/io/local_file_system.cc @@ -0,0 +1,136 @@ +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/train/custom_trainer/feed/io/file_system.h" + +#include + +#include "paddle/fluid/train/custom_trainer/feed/io/shell.h" +#include "paddle/fluid/string/string_helper.h" +#include "glog/logging.h" + +namespace paddle { +namespace custom_trainer { +namespace feed { + +class LocalFileSystem : public FileSystem { +public: + int initialize(const YAML::Node& config, std::shared_ptr context) override { + _buffer_size = config["buffer_size"].as(0); + return 0; + } + + std::shared_ptr open_read(const std::string& path, const std::string& converter) override { + std::string cmd = path; + bool is_pipe = false; + if (string::end_with(path, ".gz")) { + shell_add_read_converter(cmd, is_pipe, "zcat"); + } + + shell_add_read_converter(cmd, is_pipe, converter); + return shell_open(cmd, is_pipe, "r", _buffer_size); + } + + std::shared_ptr open_write(const std::string& path, const std::string& converter) override { + std::string cmd = path; + + shell_execute(string::format_string("mkdir -p $(dirname \"%s\")", path.c_str())); + + bool is_pipe = false; + + if (string::end_with(path, ".gz")) { + shell_add_write_converter(cmd, is_pipe, "gzip"); + } + + shell_add_write_converter(cmd, is_pipe, converter); + return shell_open(cmd, is_pipe, "w", _buffer_size); + } + + int64_t file_size(const std::string& path) override { + struct stat buf; + if (0 != stat(path.c_str(), &buf)) { + LOG(FATAL) << "file stat not zero"; + return -1; + } + return (int64_t)buf.st_size; + } + + void remove(const std::string& path) override { + if (path == "") { + return; + } + + shell_execute(string::format_string("rm -rf %s", path.c_str())); + } + + std::vector list(const std::string& path) override { + if (path == "") { + return {}; + } + + std::shared_ptr pipe; + pipe = shell_popen( + string::format_string("find %s -maxdepth 1 -type f", path.c_str()), "r", &_err_no); + string::LineFileReader reader; + std::vector list; + + while (reader.getline(&*pipe)) { + list.push_back(reader.get()); + } + + return list; + } + + std::string tail(const std::string& path) override { + if (path == "") { + return ""; + } + + return shell_get_command_output(string::format_string("tail -1 %s ", path.c_str())); + } + + bool exists(const std::string& path) override { + std::string test_f = shell_get_command_output( + string::format_string("[ -f %s ] ; echo $?", path.c_str())); + + if (string::trim_spaces(test_f) == "0") { + return true; + } + + std::string test_d = shell_get_command_output( + string::format_string("[ -d %s ] ; echo $?", path.c_str())); + + if (string::trim_spaces(test_d) == "0") { + return true; + } + + return false; + } + + void mkdir(const std::string& path) override { + if (path == "") { + return; + } + + shell_execute(string::format_string("mkdir -p %s", path.c_str())); + } + +private: + size_t _buffer_size = 0; +}; +REGISTER_CLASS(FileSystem, LocalFileSystem); + +} // namespace feed +} // namespace custom_trainer +} // namespace paddle diff --git a/paddle/fluid/train/custom_trainer/feed/io/shell.cc b/paddle/fluid/train/custom_trainer/feed/io/shell.cc new file mode 100644 index 00000000..db1bd2cb --- /dev/null +++ b/paddle/fluid/train/custom_trainer/feed/io/shell.cc @@ -0,0 +1,378 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/train/custom_trainer/feed/io/shell.h" + +namespace paddle { +namespace custom_trainer { +namespace feed { + +void shell_add_write_converter(std::string& path, bool& is_pipe, // NOLINT + const std::string& converter) { + if (converter == "") { + return; + } + + if (!is_pipe) { + path = string::format_string("( %s ) > \"%s\"", converter.c_str(), path.c_str()); + is_pipe = true; + } else { + path = string::format_string("%s | %s", converter.c_str(), path.c_str()); + } +} + +void shell_add_read_converter(std::string& path, bool& is_pipe, const std::string& converter) { + if (converter == "") { + return; + } + + if (!is_pipe) { + path = string::format_string("( %s ) < \"%s\"", converter.c_str(), path.c_str()); + is_pipe = true; + } else { + path = string::format_string("%s | %s", path.c_str(), converter.c_str()); + } +} + +std::shared_ptr shell_open( + const std::string& path, + bool is_pipe, + const std::string& mode, + size_t buffer_size, + int* err_no) { + std::shared_ptr fp = nullptr; + + if (!is_pipe) { + fp = shell_fopen(path, mode); + } else { + fp = shell_popen(path, mode, err_no); + } + + if (buffer_size > 0) { + char* buffer = new char[buffer_size]; + CHECK_EQ(0, setvbuf(&*fp, buffer, _IOFBF, buffer_size)); + fp = {&*fp, [fp, buffer](FILE*) mutable { // NOLINT + CHECK(fp.unique()); // NOLINT + fp = nullptr; + delete[] buffer; + }}; + } + + return fp; +} + +std::shared_ptr shell_fopen(const std::string& path, const std::string& mode) { +#if defined _WIN32 || defined __APPLE__ + return nullptr; +#else + if (shell_verbose()) { + LOG(INFO) << "Opening file[" << path << "] with mode[" << mode << "]"; + } + FILE* fp; + if (!(fp = fopen(path.c_str(), mode.c_str()))) { + LOG(FATAL) << "fopen fail, path[" << path << "], mode[" << mode << "]"; + } + return {fp, [path](FILE* fp) { + if (shell_verbose()) { + LOG(INFO) << "Closing file[" << path << "]"; + } + if (0 != fclose(fp)) { + LOG(FATAL) << "fclose fail, path[" << path << "]"; + } + }}; +#endif +} + +// Close all open file descriptors +// The implementation is async signal safe +// Mostly copy from CPython code +static int close_open_fds_internal() { +#if defined _WIN32 || defined __APPLE__ + return 0; +#else + struct linux_dirent { + long d_ino = 0; // NOLINT + off_t d_off; + unsigned short d_reclen = 0; // NOLINT + char d_name[256]; + }; + + int dir_fd = -1; + if ((dir_fd = open("/proc/self/fd", O_RDONLY)) < 0) { + LOG(FATAL) << "proc/self/fd open fail"; + return -1; + } + char buffer[sizeof(linux_dirent)]; + + for (;;) { + int bytes = 0; + if ((bytes = + syscall(SYS_getdents, + dir_fd, + reinterpret_cast(buffer), + sizeof(buffer))) < 0) { + LOG(FATAL) << "syscall fail"; + return -1; + } + + if (bytes == 0) { + break; + } + + linux_dirent* entry = NULL; + + for (int offset = 0; offset < bytes; offset += entry->d_reclen) { + entry = reinterpret_cast(buffer + offset); + int fd = 0; + const char* s = entry->d_name; + + while (*s >= '0' && *s <= '9') { + fd = fd * 10 + (*s - '0'); + s++; + } + + if (s != entry->d_name && fd != dir_fd && fd >= 3) { + close(fd); + } + } + } + + close(dir_fd); + return 0; +#endif +} + +static int shell_popen_fork_internal( + const char* real_cmd, + bool do_read, + int parent_end, + int child_end) { +#if defined _WIN32 || defined __APPLE__ + return 0; +#else + int child_pid = -1; + // Too frequent calls to fork() makes openmpi very slow. Use vfork() instead. + // But vfork() is very dangerous. Be careful. + if ((child_pid = vfork()) < 0) { + return -1; + } + + // The following code is async signal safe (No memory allocation, no access to + // global data, etc.) + if (child_pid != 0) { + return child_pid; + } + + int child_std_end = do_read ? 1 : 0; + close(parent_end); + + if (child_end != child_std_end) { + if (dup2(child_end, child_std_end) != child_std_end) { + return -1; + } + close(child_end); + } + + close_open_fds_internal(); + if (execl("/bin/bash", "bash", "-c", real_cmd, NULL) < 0) { + return -1; + } + exit(127); +#endif +} + +std::shared_ptr shell_popen(const std::string& cmd, const std::string& mode, int* err_no) { +#if defined _WIN32 || defined __APPLE__ + return nullptr; +#else + bool do_read = mode == "r"; + bool do_write = mode == "w"; + if (!(do_read || do_write)) { + *err_no = -1; + return NULL; + } + + if (shell_verbose()) { + LOG(INFO) << "Opening pipe[" << cmd << "] with mode[" << mode << "]"; + } + + std::string real_cmd = "set -o pipefail; " + cmd; + + int pipe_fds[2]; + if (pipe(pipe_fds) != 0) { + *err_no = -1; + return NULL; + } + int parent_end = 0; + int child_end = 0; + + if (do_read) { + parent_end = pipe_fds[0]; + child_end = pipe_fds[1]; + } else if (do_write) { + parent_end = pipe_fds[1]; + child_end = pipe_fds[0]; + } + + int child_pid = shell_popen_fork_internal(real_cmd.c_str(), do_read, parent_end, child_end); + close(child_end); + fcntl(parent_end, F_SETFD, FD_CLOEXEC); + FILE* fp; + if ((fp = fdopen(parent_end, mode.c_str())) == NULL) { + *err_no = -1; + return NULL; + } + return {fp, [child_pid, cmd, err_no](FILE* fp) { + if (shell_verbose()) { + LOG(INFO) << "Closing pipe[" << cmd << "]"; + } + + if (fclose(fp) != 0) { + *err_no = -1; + } + int wstatus = -1; + waitpid(child_pid, &wstatus, 0); + if (wstatus == 0 || wstatus == (128 + SIGPIPE) * 256 || + (wstatus == -1 && errno == ECHILD)) { + } else { + *err_no = -1; + LOG(WARNING) << "status[" << wstatus << "], cmd[" << cmd << "]" + << ", err_no[" << *err_no << "]"; + } + if (wstatus == -1 && errno == ECHILD) { + LOG(WARNING) << "errno is ECHILD"; + } + }}; +#endif +} + +static int shell_p2open_fork_internal(const char* real_cmd, int pipein_fds[2], int pipeout_fds[2]) { +#if defined _WIN32 || defined __APPLE__ + return 0; +#else + int child_pid = -1; + if ((child_pid = fork()) < 0) { + return -1; + } + + if (child_pid != 0) { + return child_pid; + } + + close(pipein_fds[0]); + close(pipeout_fds[1]); + + if (pipein_fds[1] != 1) { + if (dup2(pipein_fds[1], 1) != 1) { + return -1; + } + close(pipein_fds[1]); + } + + if (pipeout_fds[0] != 0) { + if (dup2(pipeout_fds[0], 0) != 0) { + return -1; + } + close(pipeout_fds[0]); + } + + close_open_fds_internal(); + if (execl("/bin/sh", "sh", "-c", real_cmd, NULL) < 0) { + return -1; + } + exit(127); +#endif +} + +std::pair, std::shared_ptr> shell_p2open(const std::string& cmd) { +#if defined _WIN32 || defined __APPLE__ + return {}; +#else + if (shell_verbose()) { + LOG(INFO) << "Opening bidirectional pipe[" << cmd << "]"; + } + + std::string real_cmd = "set -o pipefail; " + cmd; + + int pipein_fds[2]; + int pipeout_fds[2]; + if (pipe(pipein_fds) != 0) { + return {NULL, NULL}; + } + if (pipe(pipeout_fds) != 0) { + return {NULL, NULL}; + } + + int child_pid = shell_p2open_fork_internal(real_cmd.c_str(), pipein_fds, pipeout_fds); + + close(pipein_fds[1]); + close(pipeout_fds[0]); + fcntl(pipein_fds[0], F_SETFD, FD_CLOEXEC); + fcntl(pipeout_fds[1], F_SETFD, FD_CLOEXEC); + + std::shared_ptr child_life = { + NULL, [child_pid, cmd](void*) { + if (shell_verbose()) { + LOG(INFO) << "Closing bidirectional pipe[" << cmd << "]"; + } + + int wstatus, ret; + + do { + PCHECK((ret = waitpid(child_pid, &wstatus, 0)) >= 0 || + (ret == -1 && errno == EINTR)); + } while (ret == -1 && errno == EINTR); + + PCHECK(wstatus == 0 || wstatus == (128 + SIGPIPE) * 256 || + (wstatus == -1 && errno == ECHILD)) + << "status[" << wstatus << "], cmd[" << cmd << "]"; + + if (wstatus == -1 && errno == ECHILD) { + LOG(WARNING) << "errno is ECHILD"; + } + }}; + + FILE* in_fp; + PCHECK((in_fp = fdopen(pipein_fds[0], "r")) != NULL); + FILE* out_fp; + PCHECK((out_fp = fdopen(pipeout_fds[1], "w")) != NULL); + return {{in_fp, [child_life](FILE* fp) { PCHECK(fclose(fp) == 0); }}, + {out_fp, [child_life](FILE* fp) { PCHECK(fclose(fp) == 0); }}}; +#endif +} + +std::string shell_get_command_output(const std::string& cmd) { +#if defined _WIN32 || defined __APPLE__ + return ""; +#else + int err_no = 0; + do { + err_no = 0; + std::shared_ptr pipe = shell_popen(cmd, "r", &err_no); + string::LineFileReader reader; + + if (reader.getdelim(&*pipe, 0)) { + pipe = nullptr; + if (err_no == 0) { + return reader.get(); + } + } + } while (err_no == -1); + return ""; +#endif +} + +} // namespace feed +} // namespace custom_trainer +} // namespace paddle diff --git a/paddle/fluid/train/custom_trainer/feed/io/shell.h b/paddle/fluid/train/custom_trainer/feed/io/shell.h new file mode 100644 index 00000000..930d64d5 --- /dev/null +++ b/paddle/fluid/train/custom_trainer/feed/io/shell.h @@ -0,0 +1,78 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#ifdef _WIN32 +#include +#else +#include +#endif +#include +#ifndef _WIN32 +#include +#endif +#include +#include +#include +#include "paddle/fluid/platform/port.h" +#include "paddle/fluid/string/string_helper.h" + +namespace paddle { +namespace custom_trainer { +namespace feed { + +inline bool& shell_verbose_internal() { + static bool x = false; + return x; +} + +inline bool shell_verbose() { + return shell_verbose_internal(); +} + +inline void shell_set_verbose(bool x) { + shell_verbose_internal() = x; +} + +extern std::shared_ptr shell_fopen(const std::string& path, const std::string& mode); + +extern std::shared_ptr shell_popen( + const std::string& cmd, + const std::string& mode, + int* err_no); + +extern std::pair, std::shared_ptr> shell_p2open(const std::string& cmd); + +inline void shell_execute(const std::string& cmd) { + int err_no = 0; + do { + err_no = 0; + shell_popen(cmd, "w", &err_no); + } while (err_no == -1); +} + +extern std::string shell_get_command_output(const std::string& cmd); + +extern void shell_add_read_converter(std::string& path, bool& is_pipe, const std::string& converter); + +extern std::shared_ptr shell_open(const std::string& path, bool is_pipe, const std::string& mode, size_t buffer_size, int* err_no = 0); + +extern void shell_add_write_converter(std::string& path, bool& is_pipe, const std::string& converter); + +} // namespace feed +} // namespace custom_trainer +} // namespace paddle -- GitLab