提交 501c9f25 编写于 作者: R rensilin

file_system

Change-Id: I4dd39c85a5ba7225373a753783a9af809775f52e
上级 1f9f8ebb
......@@ -72,6 +72,7 @@ HEADERS('paddle/fluid/train/custom_trainer/feed/dataset/*.h', '$INC/paddle/fluid
HEADERS('paddle/fluid/train/custom_trainer/feed/process/*.h', '$INC/paddle/fluid/train/custom_trainer/feed/process/')
HEADERS('paddle/fluid/train/custom_trainer/feed/shuffler/*.h', '$INC/paddle/fluid/train/custom_trainer/feed/shuffler/')
HEADERS('paddle/fluid/train/custom_trainer/feed/accessor/*.h', '$INC/paddle/fluid/train/custom_trainer/feed/accessor/')
HEADERS('paddle/fluid/train/custom_trainer/feed/io/*.h', '$INC/paddle/fluid/train/custom_trainer/feed/io/')
NEED_OUTPUT("baidu/third-party/mklml")
OUTPUT('paddle/fluid/train/custom_trainer/feed/conf', '$OUT')
OUTPUT('paddle/fluid/train/custom_trainer/feed/scripts', '$OUT')
......@@ -79,6 +80,7 @@ OUTPUT('paddle/fluid/train/custom_trainer/feed/so', '$OUT')
def UT_FILE(filename):
UT_DIR = 'paddle/fluid/train/custom_trainer/feed/unit_test'
import os
return os.path.join(UT_DIR, filename)
custom_trainer_src = GLOB('paddle/fluid/train/custom_trainer/feed/*/*.cc', Exclude(UT_FILE('*')))
......
......@@ -136,6 +136,18 @@ std::string join_strings(const Container& strs, char delim) {
return str;
}
static inline bool end_with(const std::string& main_str, const std::string& str) {
return main_str.length() >= str.length() &&
strncmp(main_str.c_str() + main_str.length() - str.length(), str.c_str(), str.length()) ==
0;
}
static inline bool begin_with(const std::string& main_str, const std::string& str) {
return main_str.length() >= str.length() &&
strncmp(main_str.c_str(), str.c_str(), str.length()) == 0;
}
// A helper class for reading lines from file. A line buffer is maintained. It
// doesn't need to know the maximum possible length of a line.
......
BasedOnStyle: Google
AccessModifierOffset: -4
AlignAfterOpenBracket: AlwaysBreak
AlignOperands: false
AllowAllParametersOfDeclarationOnNextLine: false
AllowShortBlocksOnASingleLine: false
AllowShortCaseLabelsOnASingleLine: false
AllowShortFunctionsOnASingleLine: Empty
AllowShortIfStatementsOnASingleLine: false
AllowShortLoopsOnASingleLine: false
AlwaysBreakAfterReturnType: None
AlwaysBreakTemplateDeclarations: true
BinPackArguments: false
BinPackParameters: false
BreakConstructorInitializers: AfterColon
ColumnLimit: 100
ConstructorInitializerIndentWidth: 8
ContinuationIndentWidth: 8
DerivePointerAlignment: true
FixNamespaceComments: true
IndentCaseLabels: false
IndentWidth: 4
MaxEmptyLinesToKeep: 1
NamespaceIndentation: None
PenaltyBreakAssignment: 2
PenaltyBreakBeforeFirstCallParameter: 1
PenaltyBreakComment: 500
PenaltyBreakFirstLessLess: 120
PenaltyBreakString: 1000
PenaltyExcessCharacter: 1000000
PenaltyReturnTypeOnItsOwnLine: 400
PointerAlignment: Left
SortIncludes: false
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/train/custom_trainer/feed/io/file_system.h"
#include <string>
#include <unordered_map>
#include "paddle/fluid/train/custom_trainer/feed/io/shell.h"
#include "paddle/fluid/string/string_helper.h"
#include "glog/logging.h"
namespace paddle {
namespace custom_trainer {
namespace feed {
class AutoFileSystem : public FileSystem {
public:
int initialize(const YAML::Node& config, std::shared_ptr<TrainerContext> context) override {
_file_system.clear();
if (config) {
for (auto& prefix_fs: config) {
std::unique_ptr<FileSystem> fs(CREATE_CLASS(FileSystem, prefix_fs.second["class"].as<std::string>("")));
if (fs == nullptr) {
VLOG(2) << "fail to create class: " << prefix_fs.second["class"].as<std::string>("");
return -1;
}
if (fs->initialize(prefix_fs.second, context) != 0) {
VLOG(2) << "fail to initialize class: " << prefix_fs.second["class"].as<std::string>("");
return 0;
}
_file_system.emplace(prefix_fs.first.as<std::string>(""), std::move(fs));
}
}
if (_file_system.find("default") == _file_system.end()) {
std::unique_ptr<FileSystem> fs(CREATE_CLASS(FileSystem, "LocalFileSystem"));
if (fs == nullptr || fs->initialize(YAML::Load(""), context) != 0) {
return -1;
}
_file_system.emplace("default", std::move(fs));
}
return 0;
}
std::shared_ptr<FILE> open_read(const std::string& path, const std::string& converter)
override {
return get_file_system(path)->open_read(path, converter);
}
std::shared_ptr<FILE> open_write(const std::string& path, const std::string& converter)
override {
return get_file_system(path)->open_write(path, converter);
}
int64_t file_size(const std::string& path) override {
return get_file_system(path)->file_size(path);
}
void remove(const std::string& path) override {
get_file_system(path)->remove(path);
}
std::vector<std::string> list(const std::string& path) override {
return get_file_system(path)->list(path);
}
std::string tail(const std::string& path) override {
return get_file_system(path)->tail(path);
}
bool exists(const std::string& path) override {
return get_file_system(path)->exists(path);
}
void mkdir(const std::string& path) override {
get_file_system(path)->mkdir(path);
}
FileSystem* get_file_system(const std::string& path) {
auto pos = path.find_first_of(":");
if (pos != std::string::npos) {
auto substr = path.substr(0, pos + 1);
auto fs_it = _file_system.find(substr);
if (fs_it != _file_system.end()) {
return fs_it->second.get();
}
}
VLOG(5) << "path: " << path << ", select default file system";
return _file_system["default"].get();
}
int err_no() const override {
if (_err_no == 0) {
for (const auto& file_system : _file_system) {
if (file_system.second->err_no() != 0) {
const_cast<int&>(_err_no) = -1;
break;
}
}
}
return FileSystem::err_no();
}
void reset_err_no() override {
_err_no = 0;
for (auto& file_system : _file_system) {
file_system.second->reset_err_no();
}
}
private:
std::unordered_map<std::string, std::unique_ptr<FileSystem>> _file_system;
};
REGISTER_CLASS(FileSystem, AutoFileSystem);
} // namespace feed
} // namespace custom_trainer
} // namespace paddle
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/train/custom_trainer/feed/io/file_system.h"
#include <string>
namespace paddle {
namespace custom_trainer {
namespace feed {
std::string FileSystem::path_join(const std::string& dir, const std::string& path) {
if (dir.empty()) {
return path;
}
if (dir.back() == '/') {
return dir + path;
}
return dir + '/' + path;
}
std::pair<std::string, std::string> FileSystem::path_split(const std::string& path) {
size_t pos = path.find_last_of('/');
if (pos == std::string::npos) {
return {".", path};
}
return {path.substr(0, pos), path.substr(pos + 1)};
}
} // namespace feed
} // namespace custom_trainer
} // namespace paddle
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <memory>
#include <cstdio>
#include <vector>
#include "paddle/fluid/train/custom_trainer/feed/common/registerer.h"
#include "paddle/fluid/train/custom_trainer/feed/trainer_context.h"
#include <yaml-cpp/yaml.h>
namespace paddle {
namespace custom_trainer {
namespace feed {
class FileSystem {
public:
FileSystem() {}
virtual ~FileSystem() {}
virtual int initialize(const YAML::Node& config, std::shared_ptr<TrainerContext> context) = 0;
virtual std::shared_ptr<FILE> open_read(const std::string& path, const std::string& converter) = 0;
virtual std::shared_ptr<FILE> open_write(const std::string& path, const std::string& converter) = 0;
virtual int64_t file_size(const std::string& path) = 0;
virtual void remove(const std::string& path) = 0;
virtual std::vector<std::string> list(const std::string& path) = 0;
virtual std::string tail(const std::string& path) = 0;
virtual bool exists(const std::string& path) = 0;
virtual void mkdir(const std::string& path) = 0;
virtual std::string path_join(const std::string& dir, const std::string& path);
virtual std::pair<std::string, std::string> path_split(const std::string& path);
virtual int err_no() const {
return _err_no;
}
inline operator bool() {
return err_no() == 0;
}
virtual void reset_err_no() {
_err_no = 0;
}
protected:
int _err_no = 0;
};
REGISTER_REGISTERER(FileSystem);
} // namespace feed
} // namespace custom_trainer
} // namespace paddle
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/train/custom_trainer/feed/io/file_system.h"
#include <string>
#include <unordered_map>
#include "paddle/fluid/train/custom_trainer/feed/io/shell.h"
#include "paddle/fluid/string/string_helper.h"
#include "glog/logging.h"
namespace paddle {
namespace custom_trainer {
namespace feed {
class HadoopFileSystem : public FileSystem {
public:
int initialize(const YAML::Node& config, std::shared_ptr<TrainerContext> context) override {
_buffer_size = config["buffer_size"].as<size_t>(0);
_hdfs_command = config["hdfs_command"].as<std::string>("hadoop fs");
_ugi.clear();
for (const auto& prefix_ugi : config["ugi"]) {
_ugi.emplace(prefix_ugi.first.as<std::string>(), prefix_ugi.second.as<std::string>());
}
if (_ugi.find("default") == _ugi.end()) {
VLOG(2) << "fail to load default ugi";
return -1;
}
return 0;
}
std::shared_ptr<FILE> open_read(const std::string& path, const std::string& converter)
override {
std::string cmd;
if (string::end_with(path, ".gz")) {
cmd = string::format_string(
"%s -text \"%s\"", hdfs_command(path).c_str(), path.c_str());
} else {
cmd = string::format_string(
"%s -cat \"%s\"", hdfs_command(path).c_str(), path.c_str());
}
bool is_pipe = true;
shell_add_read_converter(cmd, is_pipe, converter);
return shell_open(cmd, is_pipe, "r", _buffer_size, &_err_no);
}
std::shared_ptr<FILE> open_write(const std::string& path, const std::string& converter)
override {
std::string cmd = string::format_string("%s -put - \"%s\"", hdfs_command(path).c_str(), path.c_str());
bool is_pipe = true;
if (string::end_with(path, ".gz\"")) {
shell_add_write_converter(cmd, is_pipe, "gzip");
}
shell_add_write_converter(cmd, is_pipe, converter);
return shell_open(cmd, is_pipe, "w", _buffer_size, &_err_no);
}
int64_t file_size(const std::string& path) override {
_err_no = -1;
VLOG(2) << "not support";
return 0;
}
void remove(const std::string& path) override {
if (path == "") {
return;
}
shell_execute(string::format_string(
"%s -rmr %s &>/dev/null; true", _hdfs_command.c_str(), path.c_str()));
}
std::vector<std::string> list(const std::string& path) override {
if (path == "") {
return {};
}
std::string prefix = "hdfs:";
if (string::begin_with(path, "afs:")) {
prefix = "afs:";
}
int err_no = 0;
std::vector<std::string> list;
do {
err_no = 0;
std::shared_ptr<FILE> pipe;
pipe = shell_popen(
string::format_string(
"%s -ls %s | ( grep ^- ; [ $? != 2 ] )",
hdfs_command(path).c_str(),
path.c_str()),
"r",
&err_no);
string::LineFileReader reader;
list.clear();
while (reader.getline(&*pipe)) {
std::vector<std::string> line = string::split_string(reader.get());
if (line.size() != 8) {
continue;
}
list.push_back(prefix + line[7]);
}
} while (err_no == -1);
return list;
}
std::string tail(const std::string& path) override {
if (path == "") {
return "";
}
return shell_get_command_output(string::format_string(
"%s -text %s | tail -1 ", hdfs_command(path).c_str(), path.c_str()));
}
bool exists(const std::string& path) override {
std::string test = shell_get_command_output(string::format_string(
"%s -test -e %s ; echo $?", hdfs_command(path).c_str(), path.c_str()));
if (string::trim_spaces(test) == "0") {
return true;
}
return false;
}
void mkdir(const std::string& path) override {
if (path == "") {
return;
}
shell_execute(
string::format_string("%s -mkdir %s; true", hdfs_command(path).c_str(), path.c_str()));
}
std::string hdfs_command(const std::string& path) {
auto start_pos = path.find_first_of(':');
auto end_pos = path.find_first_of('/');
if (start_pos != std::string::npos && end_pos != std::string::npos && start_pos < end_pos) {
auto fs_path = path.substr(start_pos + 1, end_pos - start_pos - 1);
auto ugi_it = _ugi.find(fs_path);
if (ugi_it != _ugi.end()) {
return hdfs_command_with_ugi(ugi_it->second);
}
}
VLOG(5) << "path: " << path << ", select default ugi";
return hdfs_command_with_ugi(_ugi["default"]);
}
std::string hdfs_command_with_ugi(std::string ugi) {
return string::format_string("%s -Dhadoop.job.ugi=\"%s\"", _hdfs_command.c_str(), ugi.c_str());
}
private:
size_t _buffer_size = 0;
std::string _hdfs_command;
std::unordered_map<std::string, std::string> _ugi;
};
REGISTER_CLASS(FileSystem, HadoopFileSystem);
} // namespace feed
} // namespace custom_trainer
} // namespace paddle
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/train/custom_trainer/feed/io/file_system.h"
#include <string>
#include "paddle/fluid/train/custom_trainer/feed/io/shell.h"
#include "paddle/fluid/string/string_helper.h"
#include "glog/logging.h"
namespace paddle {
namespace custom_trainer {
namespace feed {
class LocalFileSystem : public FileSystem {
public:
int initialize(const YAML::Node& config, std::shared_ptr<TrainerContext> context) override {
_buffer_size = config["buffer_size"].as<size_t>(0);
return 0;
}
std::shared_ptr<FILE> open_read(const std::string& path, const std::string& converter) override {
std::string cmd = path;
bool is_pipe = false;
if (string::end_with(path, ".gz")) {
shell_add_read_converter(cmd, is_pipe, "zcat");
}
shell_add_read_converter(cmd, is_pipe, converter);
return shell_open(cmd, is_pipe, "r", _buffer_size);
}
std::shared_ptr<FILE> open_write(const std::string& path, const std::string& converter) override {
std::string cmd = path;
shell_execute(string::format_string("mkdir -p $(dirname \"%s\")", path.c_str()));
bool is_pipe = false;
if (string::end_with(path, ".gz")) {
shell_add_write_converter(cmd, is_pipe, "gzip");
}
shell_add_write_converter(cmd, is_pipe, converter);
return shell_open(cmd, is_pipe, "w", _buffer_size);
}
int64_t file_size(const std::string& path) override {
struct stat buf;
if (0 != stat(path.c_str(), &buf)) {
LOG(FATAL) << "file stat not zero";
return -1;
}
return (int64_t)buf.st_size;
}
void remove(const std::string& path) override {
if (path == "") {
return;
}
shell_execute(string::format_string("rm -rf %s", path.c_str()));
}
std::vector<std::string> list(const std::string& path) override {
if (path == "") {
return {};
}
std::shared_ptr<FILE> pipe;
pipe = shell_popen(
string::format_string("find %s -maxdepth 1 -type f", path.c_str()), "r", &_err_no);
string::LineFileReader reader;
std::vector<std::string> list;
while (reader.getline(&*pipe)) {
list.push_back(reader.get());
}
return list;
}
std::string tail(const std::string& path) override {
if (path == "") {
return "";
}
return shell_get_command_output(string::format_string("tail -1 %s ", path.c_str()));
}
bool exists(const std::string& path) override {
std::string test_f = shell_get_command_output(
string::format_string("[ -f %s ] ; echo $?", path.c_str()));
if (string::trim_spaces(test_f) == "0") {
return true;
}
std::string test_d = shell_get_command_output(
string::format_string("[ -d %s ] ; echo $?", path.c_str()));
if (string::trim_spaces(test_d) == "0") {
return true;
}
return false;
}
void mkdir(const std::string& path) override {
if (path == "") {
return;
}
shell_execute(string::format_string("mkdir -p %s", path.c_str()));
}
private:
size_t _buffer_size = 0;
};
REGISTER_CLASS(FileSystem, LocalFileSystem);
} // namespace feed
} // namespace custom_trainer
} // namespace paddle
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/train/custom_trainer/feed/io/shell.h"
namespace paddle {
namespace custom_trainer {
namespace feed {
void shell_add_write_converter(std::string& path, bool& is_pipe, // NOLINT
const std::string& converter) {
if (converter == "") {
return;
}
if (!is_pipe) {
path = string::format_string("( %s ) > \"%s\"", converter.c_str(), path.c_str());
is_pipe = true;
} else {
path = string::format_string("%s | %s", converter.c_str(), path.c_str());
}
}
void shell_add_read_converter(std::string& path, bool& is_pipe, const std::string& converter) {
if (converter == "") {
return;
}
if (!is_pipe) {
path = string::format_string("( %s ) < \"%s\"", converter.c_str(), path.c_str());
is_pipe = true;
} else {
path = string::format_string("%s | %s", path.c_str(), converter.c_str());
}
}
std::shared_ptr<FILE> shell_open(
const std::string& path,
bool is_pipe,
const std::string& mode,
size_t buffer_size,
int* err_no) {
std::shared_ptr<FILE> fp = nullptr;
if (!is_pipe) {
fp = shell_fopen(path, mode);
} else {
fp = shell_popen(path, mode, err_no);
}
if (buffer_size > 0) {
char* buffer = new char[buffer_size];
CHECK_EQ(0, setvbuf(&*fp, buffer, _IOFBF, buffer_size));
fp = {&*fp, [fp, buffer](FILE*) mutable { // NOLINT
CHECK(fp.unique()); // NOLINT
fp = nullptr;
delete[] buffer;
}};
}
return fp;
}
std::shared_ptr<FILE> shell_fopen(const std::string& path, const std::string& mode) {
#if defined _WIN32 || defined __APPLE__
return nullptr;
#else
if (shell_verbose()) {
LOG(INFO) << "Opening file[" << path << "] with mode[" << mode << "]";
}
FILE* fp;
if (!(fp = fopen(path.c_str(), mode.c_str()))) {
LOG(FATAL) << "fopen fail, path[" << path << "], mode[" << mode << "]";
}
return {fp, [path](FILE* fp) {
if (shell_verbose()) {
LOG(INFO) << "Closing file[" << path << "]";
}
if (0 != fclose(fp)) {
LOG(FATAL) << "fclose fail, path[" << path << "]";
}
}};
#endif
}
// Close all open file descriptors
// The implementation is async signal safe
// Mostly copy from CPython code
static int close_open_fds_internal() {
#if defined _WIN32 || defined __APPLE__
return 0;
#else
struct linux_dirent {
long d_ino = 0; // NOLINT
off_t d_off;
unsigned short d_reclen = 0; // NOLINT
char d_name[256];
};
int dir_fd = -1;
if ((dir_fd = open("/proc/self/fd", O_RDONLY)) < 0) {
LOG(FATAL) << "proc/self/fd open fail";
return -1;
}
char buffer[sizeof(linux_dirent)];
for (;;) {
int bytes = 0;
if ((bytes =
syscall(SYS_getdents,
dir_fd,
reinterpret_cast<linux_dirent*>(buffer),
sizeof(buffer))) < 0) {
LOG(FATAL) << "syscall fail";
return -1;
}
if (bytes == 0) {
break;
}
linux_dirent* entry = NULL;
for (int offset = 0; offset < bytes; offset += entry->d_reclen) {
entry = reinterpret_cast<linux_dirent*>(buffer + offset);
int fd = 0;
const char* s = entry->d_name;
while (*s >= '0' && *s <= '9') {
fd = fd * 10 + (*s - '0');
s++;
}
if (s != entry->d_name && fd != dir_fd && fd >= 3) {
close(fd);
}
}
}
close(dir_fd);
return 0;
#endif
}
static int shell_popen_fork_internal(
const char* real_cmd,
bool do_read,
int parent_end,
int child_end) {
#if defined _WIN32 || defined __APPLE__
return 0;
#else
int child_pid = -1;
// Too frequent calls to fork() makes openmpi very slow. Use vfork() instead.
// But vfork() is very dangerous. Be careful.
if ((child_pid = vfork()) < 0) {
return -1;
}
// The following code is async signal safe (No memory allocation, no access to
// global data, etc.)
if (child_pid != 0) {
return child_pid;
}
int child_std_end = do_read ? 1 : 0;
close(parent_end);
if (child_end != child_std_end) {
if (dup2(child_end, child_std_end) != child_std_end) {
return -1;
}
close(child_end);
}
close_open_fds_internal();
if (execl("/bin/bash", "bash", "-c", real_cmd, NULL) < 0) {
return -1;
}
exit(127);
#endif
}
std::shared_ptr<FILE> shell_popen(const std::string& cmd, const std::string& mode, int* err_no) {
#if defined _WIN32 || defined __APPLE__
return nullptr;
#else
bool do_read = mode == "r";
bool do_write = mode == "w";
if (!(do_read || do_write)) {
*err_no = -1;
return NULL;
}
if (shell_verbose()) {
LOG(INFO) << "Opening pipe[" << cmd << "] with mode[" << mode << "]";
}
std::string real_cmd = "set -o pipefail; " + cmd;
int pipe_fds[2];
if (pipe(pipe_fds) != 0) {
*err_no = -1;
return NULL;
}
int parent_end = 0;
int child_end = 0;
if (do_read) {
parent_end = pipe_fds[0];
child_end = pipe_fds[1];
} else if (do_write) {
parent_end = pipe_fds[1];
child_end = pipe_fds[0];
}
int child_pid = shell_popen_fork_internal(real_cmd.c_str(), do_read, parent_end, child_end);
close(child_end);
fcntl(parent_end, F_SETFD, FD_CLOEXEC);
FILE* fp;
if ((fp = fdopen(parent_end, mode.c_str())) == NULL) {
*err_no = -1;
return NULL;
}
return {fp, [child_pid, cmd, err_no](FILE* fp) {
if (shell_verbose()) {
LOG(INFO) << "Closing pipe[" << cmd << "]";
}
if (fclose(fp) != 0) {
*err_no = -1;
}
int wstatus = -1;
waitpid(child_pid, &wstatus, 0);
if (wstatus == 0 || wstatus == (128 + SIGPIPE) * 256 ||
(wstatus == -1 && errno == ECHILD)) {
} else {
*err_no = -1;
LOG(WARNING) << "status[" << wstatus << "], cmd[" << cmd << "]"
<< ", err_no[" << *err_no << "]";
}
if (wstatus == -1 && errno == ECHILD) {
LOG(WARNING) << "errno is ECHILD";
}
}};
#endif
}
static int shell_p2open_fork_internal(const char* real_cmd, int pipein_fds[2], int pipeout_fds[2]) {
#if defined _WIN32 || defined __APPLE__
return 0;
#else
int child_pid = -1;
if ((child_pid = fork()) < 0) {
return -1;
}
if (child_pid != 0) {
return child_pid;
}
close(pipein_fds[0]);
close(pipeout_fds[1]);
if (pipein_fds[1] != 1) {
if (dup2(pipein_fds[1], 1) != 1) {
return -1;
}
close(pipein_fds[1]);
}
if (pipeout_fds[0] != 0) {
if (dup2(pipeout_fds[0], 0) != 0) {
return -1;
}
close(pipeout_fds[0]);
}
close_open_fds_internal();
if (execl("/bin/sh", "sh", "-c", real_cmd, NULL) < 0) {
return -1;
}
exit(127);
#endif
}
std::pair<std::shared_ptr<FILE>, std::shared_ptr<FILE>> shell_p2open(const std::string& cmd) {
#if defined _WIN32 || defined __APPLE__
return {};
#else
if (shell_verbose()) {
LOG(INFO) << "Opening bidirectional pipe[" << cmd << "]";
}
std::string real_cmd = "set -o pipefail; " + cmd;
int pipein_fds[2];
int pipeout_fds[2];
if (pipe(pipein_fds) != 0) {
return {NULL, NULL};
}
if (pipe(pipeout_fds) != 0) {
return {NULL, NULL};
}
int child_pid = shell_p2open_fork_internal(real_cmd.c_str(), pipein_fds, pipeout_fds);
close(pipein_fds[1]);
close(pipeout_fds[0]);
fcntl(pipein_fds[0], F_SETFD, FD_CLOEXEC);
fcntl(pipeout_fds[1], F_SETFD, FD_CLOEXEC);
std::shared_ptr<int> child_life = {
NULL, [child_pid, cmd](void*) {
if (shell_verbose()) {
LOG(INFO) << "Closing bidirectional pipe[" << cmd << "]";
}
int wstatus, ret;
do {
PCHECK((ret = waitpid(child_pid, &wstatus, 0)) >= 0 ||
(ret == -1 && errno == EINTR));
} while (ret == -1 && errno == EINTR);
PCHECK(wstatus == 0 || wstatus == (128 + SIGPIPE) * 256 ||
(wstatus == -1 && errno == ECHILD))
<< "status[" << wstatus << "], cmd[" << cmd << "]";
if (wstatus == -1 && errno == ECHILD) {
LOG(WARNING) << "errno is ECHILD";
}
}};
FILE* in_fp;
PCHECK((in_fp = fdopen(pipein_fds[0], "r")) != NULL);
FILE* out_fp;
PCHECK((out_fp = fdopen(pipeout_fds[1], "w")) != NULL);
return {{in_fp, [child_life](FILE* fp) { PCHECK(fclose(fp) == 0); }},
{out_fp, [child_life](FILE* fp) { PCHECK(fclose(fp) == 0); }}};
#endif
}
std::string shell_get_command_output(const std::string& cmd) {
#if defined _WIN32 || defined __APPLE__
return "";
#else
int err_no = 0;
do {
err_no = 0;
std::shared_ptr<FILE> pipe = shell_popen(cmd, "r", &err_no);
string::LineFileReader reader;
if (reader.getdelim(&*pipe, 0)) {
pipe = nullptr;
if (err_no == 0) {
return reader.get();
}
}
} while (err_no == -1);
return "";
#endif
}
} // namespace feed
} // namespace custom_trainer
} // namespace paddle
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <fcntl.h>
#include <sys/stat.h>
#ifdef _WIN32
#include <windows.h>
#else
#include <sys/syscall.h>
#endif
#include <sys/types.h>
#ifndef _WIN32
#include <sys/wait.h>
#endif
#include <memory>
#include <string>
#include <utility>
#include "paddle/fluid/platform/port.h"
#include "paddle/fluid/string/string_helper.h"
namespace paddle {
namespace custom_trainer {
namespace feed {
inline bool& shell_verbose_internal() {
static bool x = false;
return x;
}
inline bool shell_verbose() {
return shell_verbose_internal();
}
inline void shell_set_verbose(bool x) {
shell_verbose_internal() = x;
}
extern std::shared_ptr<FILE> shell_fopen(const std::string& path, const std::string& mode);
extern std::shared_ptr<FILE> shell_popen(
const std::string& cmd,
const std::string& mode,
int* err_no);
extern std::pair<std::shared_ptr<FILE>, std::shared_ptr<FILE>> shell_p2open(const std::string& cmd);
inline void shell_execute(const std::string& cmd) {
int err_no = 0;
do {
err_no = 0;
shell_popen(cmd, "w", &err_no);
} while (err_no == -1);
}
extern std::string shell_get_command_output(const std::string& cmd);
extern void shell_add_read_converter(std::string& path, bool& is_pipe, const std::string& converter);
extern std::shared_ptr<FILE> shell_open(const std::string& path, bool is_pipe, const std::string& mode, size_t buffer_size, int* err_no = 0);
extern void shell_add_write_converter(std::string& path, bool& is_pipe, const std::string& converter);
} // namespace feed
} // namespace custom_trainer
} // namespace paddle
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册