提交 afaf9370 编写于 作者: D dongdaxiang

add fs_local_open example

上级 cf136064
...@@ -23,7 +23,11 @@ endfunction() ...@@ -23,7 +23,11 @@ endfunction()
add_subdirectory(ir) add_subdirectory(ir)
add_subdirectory(details) add_subdirectory(details)
<<<<<<< HEAD
add_subdirectory(fleet) add_subdirectory(fleet)
=======
add_subdirectory(common)
>>>>>>> add fs_local_open example
#ddim lib #ddim lib
proto_library(framework_proto SRCS framework.proto) proto_library(framework_proto SRCS framework.proto)
proto_library(async_executor_proto SRCS data_feed.proto) proto_library(async_executor_proto SRCS data_feed.proto)
......
...@@ -18,6 +18,8 @@ limitations under the License. */ ...@@ -18,6 +18,8 @@ limitations under the License. */
#include "google/protobuf/text_format.h" #include "google/protobuf/text_format.h"
#include "gflags/gflags.h" #include "gflags/gflags.h"
#include "paddle/fluid/framework/common/fs.h"
#include "paddle/fluid/framework/common/shell.h"
#include "paddle/fluid/framework/data_feed_factory.h" #include "paddle/fluid/framework/data_feed_factory.h"
#include "paddle/fluid/framework/executor_thread_worker.h" #include "paddle/fluid/framework/executor_thread_worker.h"
#include "paddle/fluid/framework/feed_fetch_method.h" #include "paddle/fluid/framework/feed_fetch_method.h"
......
cc_library(fs SRCS fs.cc DEPS glog boost)
cc_library(shell SRCS shell.cc DEPS glog)
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/framework/common/fs.h"
namespace paddle {
namespace framework {
static void fs_add_read_converter_internal(std::string& path, // NOLINT
bool& is_pipe, // NOLINT
const std::string& converter) {
if (converter == "") {
return;
}
if (!is_pipe) {
path = format_string("( %s ) < \"%s\"", converter.c_str(), path.c_str());
is_pipe = true;
} else {
path = format_string("%s | %s", path.c_str(), converter.c_str());
}
}
static void fs_add_write_converter_internal(std::string& path, // NOLINT
bool& is_pipe, // NOLINT
const std::string& converter) {
if (converter == "") {
return;
}
if (!is_pipe) {
path = format_string("( %s ) > \"%s\"", converter.c_str(), path.c_str());
is_pipe = true;
} else {
path = format_string("%s | %s", converter.c_str(), path.c_str());
}
}
static std::shared_ptr<FILE> fs_open_internal(const std::string& path,
bool is_pipe,
const std::string& mode,
size_t buffer_size,
int* err_no = 0) {
std::shared_ptr<FILE> fp = nullptr;
if (!is_pipe) {
fp = shell_fopen(path, mode);
} else {
fp = shell_popen(path, mode, err_no);
}
if (buffer_size > 0) {
char* buffer = new char[buffer_size];
CHECK_EQ(0, setvbuf(&*fp, buffer, _IOFBF, buffer_size));
fp = {&*fp,
[ fp, buffer ] reinterpret_cast<FILE*> mutable {CHECK(fp.unique());
fp = nullptr;
delete[] buffer;
}
};
}
return fp;
}
static bool fs_begin_with_internal(const std::string& path,
const std::string& str) {
return strncmp(path.c_str(), str.c_str(), str.length()) == 0;
}
static bool fs_end_with_internal(const std::string& path,
const std::string& str) {
return path.length() >= str.length() &&
strncmp(&path[path.length() - str.length()], str.c_str(),
str.length()) == 0;
}
static size_t& localfs_buffer_size_internal() {
static size_t x = 0;
return x;
}
size_t localfs_buffer_size() { return localfs_buffer_size_internal(); }
void localfs_set_buffer_size(size_t x) { localfs_buffer_size_internal() = x; }
std::shared_ptr<FILE> localfs_open_read(std::string path,
const std::string& converter) {
bool is_pipe = false;
if (fs_end_with_internal(path, ".gz")) {
fs_add_read_converter_internal(path, is_pipe, "zcat");
}
fs_add_read_converter_internal(path, is_pipe, converter);
return fs_open_internal(path, is_pipe, "r", localfs_buffer_size());
}
std::shared_ptr<FILE> localfs_open_write(std::string path,
const std::string& converter) {
shell_execute(format_string("mkdir -p $(dirname \"%s\")", path.c_str()));
bool is_pipe = false;
if (fs_end_with_internal(path, ".gz")) {
fs_add_write_converter_internal(path, is_pipe, "gzip");
}
fs_add_write_converter_internal(path, is_pipe, converter);
return fs_open_internal(path, is_pipe, "w", localfs_buffer_size());
}
int64_t localfs_file_size(const std::string& path) {
struct stat buf;
if (0 != stat(path.c_str(), &buf)) {
LOG(FATAL) << "file stat not zero";
return -1;
}
return (int64_t)buf.st_size;
}
void localfs_remove(const std::string& path) {
if (path == "") {
return;
}
shell_execute(format_string("rm -rf %s", path.c_str()));
}
std::vector<std::string> localfs_list(const std::string& path) {
if (path == "") {
return {};
}
std::shared_ptr<FILE> pipe;
int err_no = 0;
pipe = shell_popen(format_string("find %s -type f -maxdepth 1", path.c_str()),
"r", &err_no);
LineFileReader reader;
std::vector<std::string> list;
while (reader.getline(&*pipe)) {
list.push_back(reader.get());
}
return list;
}
std::string localfs_tail(const std::string& path) {
if (path == "") {
return "";
}
return shell_get_command_output(format_string("tail -1 %s ", path.c_str()));
}
bool localfs_exists(const std::string& path) {
std::string test_f = shell_get_command_output(
format_string("[ -f %s ] ; echo $?", path.c_str()));
if (trim_spaces(test_f) == "0") {
return true;
}
std::string test_d = shell_get_command_output(
format_string("[ -d %s ] ; echo $?", path.c_str()));
if (trim_spaces(test_d) == "0") {
return true;
}
return false;
}
void localfs_mkdir(const std::string& path) {
if (path == "") {
return;
}
shell_execute(format_string("mkdir -p %s", path.c_str()));
}
static size_t& hdfs_buffer_size_internal() {
static size_t x = 0;
return x;
}
size_t hdfs_buffer_size() { return hdfs_buffer_size_internal(); }
void hdfs_set_buffer_size(size_t x) { hdfs_buffer_size_internal() = x; }
static std::string& hdfs_command_internal() {
static std::string x = "hadoop fs";
return x;
}
const std::string& hdfs_command() { return hdfs_command_internal(); }
void hdfs_set_command(const std::string& x) { hdfs_command_internal() = x; }
std::shared_ptr<FILE> hdfs_open_read(std::string path, int* err_no,
const std::string& converter) {
if (fs_end_with_internal(path, ".gz")) {
path =
format_string("%s -text \"%s\"", hdfs_command().c_str(), path.c_str());
} else {
path =
format_string("%s -cat \"%s\"", hdfs_command().c_str(), path.c_str());
}
bool is_pipe = true;
fs_add_read_converter_internal(path, is_pipe, converter);
return fs_open_internal(path, is_pipe, "r", hdfs_buffer_size(), err_no);
}
std::shared_ptr<FILE> hdfs_open_write(std::string path, int* err_no,
const std::string& converter) {
path =
format_string("%s -put - \"%s\"", hdfs_command().c_str(), path.c_str());
bool is_pipe = true;
if (fs_end_with_internal(path, ".gz\"")) {
fs_add_write_converter_internal(path, is_pipe, "gzip");
}
fs_add_write_converter_internal(path, is_pipe, converter);
return fs_open_internal(path, is_pipe, "w", hdfs_buffer_size(), err_no);
}
void hdfs_remove(const std::string& path) {
if (path == "") {
return;
}
shell_execute(format_string("%s -rmr %s &>/dev/null; true",
hdfs_command().c_str(), path.c_str()));
}
std::vector<std::string> hdfs_list(const std::string& path) {
if (path == "") {
return {};
}
std::string prefix = "hdfs:";
if (fs_begin_with_internal(path, "afs:")) {
prefix = "afs:";
}
int err_no = 0;
std::vector<std::string> list;
do {
err_no = 0;
std::shared_ptr<FILE> pipe;
pipe = shell_popen(format_string("%s -ls %s | ( grep ^- ; [ $? != 2 ] )",
hdfs_command().c_str(), path.c_str()),
"r", &err_no);
LineFileReader reader;
list.clear();
while (reader.getline(&*pipe)) {
std::vector<std::string> line = split_string(reader.get());
if (line.size() != 8) {
continue;
}
list.push_back(prefix + line[7]);
}
} while (err_no == -1);
return list;
}
std::string hdfs_tail(const std::string& path) {
if (path == "") {
return "";
}
return shell_get_command_output(format_string(
"%s -text %s | tail -1 ", hdfs_command().c_str(), path.c_str()));
}
bool hdfs_exists(const std::string& path) {
std::string test = shell_get_command_output(format_string(
"%s -test -e %s ; echo $?", hdfs_command().c_str(), path.c_str()));
if (trim_spaces(test) == "0") {
return true;
}
return false;
}
void hdfs_mkdir(const std::string& path) {
if (path == "") {
return;
}
shell_execute(format_string("%s -mkdir %s; true", hdfs_command().c_str(),
path.c_str()));
}
int fs_select_internal(const std::string& path) {
if (fs_begin_with_internal(path, "hdfs:")) {
return 1;
} else if (fs_begin_with_internal(path, "afs:")) {
return 1;
}
return 0;
}
std::shared_ptr<FILE> fs_open_read(const std::string& path, int* err_no,
const std::string& converter) {
switch (fs_select_internal(path)) {
case 0:
return localfs_open_read(path, converter);
case 1:
return hdfs_open_read(path, err_no, converter);
default:
LOG(FATAL) << "Not supported";
}
return {};
}
std::shared_ptr<FILE> fs_open_write(const std::string& path, int* err_no,
const std::string& converter) {
switch (fs_select_internal(path)) {
case 0:
return localfs_open_write(path, converter);
case 1:
return hdfs_open_write(path, err_no, converter);
default:
LOG(FATAL) << "Not supported";
}
return {};
}
std::shared_ptr<FILE> fs_open(const std::string& path, const std::string& mode,
int* err_no, const std::string& converter) {
if (mode == "r" || mode == "rb") {
return fs_open_read(path, err_no, converter);
}
if (mode == "w" || mode == "wb") {
return fs_open_write(path, err_no, converter);
}
LOG(FATAL) << "Unknown mode: " << mode;
return {};
}
int64_t fs_file_size(const std::string& path) {
switch (fs_select_internal(path)) {
case 0:
return localfs_file_size(path);
default:
LOG(FATAL) << "Not supported";
}
return 0;
}
void fs_remove(const std::string& path) {
switch (fs_select_internal(path)) {
case 0:
return localfs_remove(path);
case 1:
return hdfs_remove(path);
default:
LOG(FATAL) << "Not supported";
}
}
std::vector<std::string> fs_list(const std::string& path) {
switch (fs_select_internal(path)) {
case 0:
return localfs_list(path);
case 1:
return hdfs_list(path);
default:
LOG(FATAL) << "Not supported";
}
return {};
}
std::string fs_tail(const std::string& path) {
switch (fs_select_internal(path)) {
case 0:
return localfs_tail(path);
case 1:
return hdfs_tail(path);
default:
LOG(FATAL) << "Not supported";
}
return "";
}
bool fs_exists(const std::string& path) {
switch (fs_select_internal(path)) {
case 0:
return localfs_exists(path);
case 1:
return hdfs_exists(path);
default:
LOG(FATAL) << "Not supported";
}
return false;
}
void fs_mkdir(const std::string& path) {
switch (fs_select_internal(path)) {
case 0:
return localfs_mkdir(path);
case 1:
return hdfs_mkdir(path);
default:
LOG(FATAL) << "Not supported";
}
}
} // namespace framework
} // namespace paddle
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <stdio.h>
#include <string>
#include <vector>
#include "glog/logging.h"
#include "paddle/fluid/framework/common/ps_string.h"
#include "paddle/fluid/framework/common/shell.h"
namespace paddle {
namespace framework {
int fs_select_internal(const std::string& path);
// localfs
extern size_t localfs_buffer_size();
extern void localfs_set_buffer_size(size_t x);
extern std::shared_ptr<FILE> localfs_open_read(std::string path,
const std::string& converter);
extern std::shared_ptr<FILE> localfs_open_write(std::string path,
const std::string& converter);
extern int64_t localfs_file_size(const std::string& path);
extern void localfs_remove(const std::string& path);
extern std::vector<std::string> localfs_list(const std::string& path);
extern std::string localfs_tail(const std::string& path);
extern bool localfs_exists(const std::string& path);
extern void localfs_mkdir(const std::string& path);
// hdfs
extern size_t hdfs_buffer_size();
extern void hdfs_set_buffer_size(size_t x);
extern const std::string& hdfs_command();
extern void hdfs_set_command(const std::string& x);
extern std::shared_ptr<FILE> hdfs_open_read(std::string path, int* err_no,
const std::string& converter);
extern std::shared_ptr<FILE> hdfs_open_write(std::string path, int* err_no,
const std::string& converter);
extern void hdfs_remove(const std::string& path);
extern std::vector<std::string> hdfs_list(const std::string& path);
extern std::string hdfs_tail(const std::string& path);
extern bool hdfs_exists(const std::string& path);
extern void hdfs_mkdir(const std::string& path);
// aut-detect fs
extern std::shared_ptr<FILE> fs_open_read(const std::string& path, int* err_no,
const std::string& converter);
extern std::shared_ptr<FILE> fs_open_write(const std::string& path, int* err_no,
const std::string& converter);
extern std::shared_ptr<FILE> fs_open(const std::string& path,
const std::string& mode, int* err_no,
const std::string& converter = "");
extern int64_t fs_file_size(const std::string& path);
extern void fs_remove(const std::string& path);
extern std::vector<std::string> fs_list(const std::string& path);
extern std::string fs_tail(const std::string& path);
extern bool fs_exists(const std::string& path);
extern void fs_mkdir(const std::string& path);
} // namespace framework
} // namespace paddle
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <ctype.h>
#include <stdio.h>
#include <cstring>
#include <string>
#include <vector>
#include "boost/lexical_cast.hpp"
#include "glog/logging.h"
namespace paddle {
namespace framework {
inline size_t count_spaces(const char* s) {
size_t count = 0;
while (*s != 0 && isspace(*s++)) {
count++;
}
return count;
}
inline size_t count_nonspaces(const char* s) {
size_t count = 0;
while (*s != 0 && !isspace(*s++)) {
count++;
}
return count;
}
template <class... ARGS>
void format_string_append(std::string& str, const char* fmt, // NOLINT
ARGS&&... args) { // use VA_ARGS may be better ?
int len = snprintf(NULL, 0, fmt, args...);
CHECK_GE(len, 0);
size_t oldlen = str.length();
str.resize(oldlen + len + 1);
CHECK(snprintf(&str[oldlen], (size_t)len + 1, fmt, args...) == len);
str.resize(oldlen + len);
}
template <class... ARGS>
void format_string_append(std::string& str, const std::string& fmt, // NOLINT
ARGS&&... args) {
format_string_append(str, fmt.c_str(), args...);
}
template <class... ARGS>
std::string format_string(const char* fmt, ARGS&&... args) {
std::string str;
format_string_append(str, fmt, args...);
return std::move(str);
}
template <class... ARGS>
std::string format_string(const std::string& fmt, ARGS&&... args) {
return format_string(fmt.c_str(), args...);
}
// remove leading and tailing spaces
inline std::string trim_spaces(const std::string& str) {
const char* p = str.c_str();
while (*p != 0 && isspace(*p)) {
p++;
}
size_t len = strlen(p);
while (len > 0 && isspace(p[len - 1])) {
len--;
}
return std::string(p, len);
}
inline int str_to_float(const char* str, float* v) {
const char* head = str;
char* cursor = NULL;
int index = 0;
while (*(head += count_spaces(head)) != 0) {
v[index++] = std::strtof(head, &cursor);
if (head == cursor) {
break;
}
head = cursor;
}
return index;
}
// split string by delim
template <class T = std::string>
std::vector<T> split_string(const std::string& str, const std::string& delim) {
size_t pre_pos = 0;
size_t pos = 0;
std::string tmp_str;
std::vector<T> res_list;
res_list.clear();
if (str.empty()) {
return res_list;
}
while ((pos = str.find(delim, pre_pos)) != std::string::npos) {
tmp_str.assign(str, pre_pos, pos - pre_pos);
res_list.push_back(tmp_str);
pre_pos = pos + 1;
}
tmp_str.assign(str, pre_pos, str.length() - pre_pos);
if (!tmp_str.empty()) {
res_list.push_back(tmp_str);
}
return res_list;
/*
size_t num = 1;
const char* p;
for (p = str.c_str(); *p != 0; p++) {
if (*p == delim) {
num++;
}
}
std::vector<T> list(num);
const char* last = str.c_str();
num = 0;
for (p = str.c_str(); *p != 0; p++) {
if (*p == delim) {
list[num++] = boost::lexical_cast<T>(last, p - last);
last = p + 1;
}
}
list[num] = boost::lexical_cast<T>(last, p - last);
return list;
*/
}
// split string by spaces. Leading and tailing spaces are ignored. Consecutive
// spaces are treated as one delim.
template <class T = std::string>
std::vector<T> split_string(const std::string& str) {
std::vector<T> list;
const char* p;
int pre_pos = 0;
int pos = 0;
std::string tmp_str;
if (str.empty()) {
return list;
}
for (p = str.c_str(); *p != 0;) {
if (!isspace(*p)) {
pos = pre_pos;
p++;
while (*p != 0 && !isspace(*p)) {
pos++;
p++;
}
tmp_str.assign(str, pre_pos, pos - pre_pos + 1);
list.push_back(tmp_str);
pre_pos = pos + 1;
} else {
pre_pos++;
p++;
}
}
return list;
}
template <class T>
std::string join_strings(const std::vector<T>& strs, char delim) {
std::string str;
for (size_t i = 0; i < strs.size(); i++) {
if (i > 0) {
str += delim;
}
str += boost::lexical_cast<std::string>(strs[i]);
}
return str;
}
// A helper class for reading lines from file. A line buffer is maintained. It
// doesn't need to know the maximum possible length of a line.
class LineFileReader {
public:
LineFileReader() {}
LineFileReader(LineFileReader&&) = delete;
LineFileReader(const LineFileReader&) = delete;
~LineFileReader() { ::free(_buffer); }
char* getline(FILE* f) { return this->getdelim(f, '\n'); }
char* getdelim(FILE* f, char delim) {
ssize_t ret = ::getdelim(&_buffer, &_buf_size, delim, f);
if (ret >= 0) {
if (ret >= 1 && _buffer[ret - 1] == delim) {
_buffer[--ret] = 0;
}
_length = (size_t)ret;
return _buffer;
} else {
_length = 0;
CHECK(feof(f));
return NULL;
}
}
char* get() { return _buffer; }
size_t length() { return _length; }
private:
char* _buffer = NULL;
size_t _buf_size = 0;
size_t _length = 0;
};
} // end namespace framework
} // end namespace paddle
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/common/shell.h"
namespace paddle {
namespace framework {
std::shared_ptr<FILE> shell_fopen(const std::string& path,
const std::string& mode) {
if (shell_verbose()) {
LOG(INFO) << "Opening file[" << path << "] with mode[" << mode << "]";
}
FILE* fp;
if (!(fp = fopen(path.c_str(), mode.c_str()))) {
LOG(FATAL) << "fopen fail, path[" << path << "], mode[" << mode << "]";
}
return {fp, [path](FILE* fp) {
if (shell_verbose()) {
LOG(INFO) << "Closing file[" << path << "]";
}
if (0 != fclose(fp)) {
LOG(FATAL) << "fclose fail, path[" << path << "]";
}
}};
}
// Close all open file descriptors
// The implementation is async signal safe
// Mostly copy from CPython code
static int close_open_fds_internal() {
struct linux_dirent {
int64 d_ino = 0;
off_t d_off;
uint16 d_reclen = 0;
char d_name[256];
};
int dir_fd = -1;
if ((dir_fd = open("/proc/self/fd", O_RDONLY)) < 0) {
LOG(FATAL) << "proc/self/fd open fail";
return -1;
}
char buffer[sizeof(linux_dirent)];
for (;;) {
int bytes = 0;
if ((bytes = syscall(SYS_getdents, dir_fd,
reinterpret_cast<linux_dirent*>(buffer),
sizeof(buffer))) < 0) {
LOG(FATAL) << "syscall fail";
return -1;
}
if (bytes == 0) {
break;
}
linux_dirent* entry = NULL;
for (int offset = 0; offset < bytes; offset += entry->d_reclen) {
entry = reinterpret_cast<linux_dirent*>(buffer + offset);
int fd = 0;
const char* s = entry->d_name;
while (*s >= '0' && *s <= '9') {
fd = fd * 10 + (*s - '0');
s++;
}
if (s != entry->d_name && fd != dir_fd && fd >= 3) {
close(fd);
}
}
}
close(dir_fd);
return 0;
}
static int shell_popen_fork_internal(const char* real_cmd, bool do_read,
int parent_end, int child_end) {
int child_pid = -1;
// Too frequent calls to fork() makes openmpi very slow. Use vfork() instead.
// But vfork() is very dangerous. Be careful.
if ((child_pid = vfork()) < 0) {
return -1;
}
// The following code is async signal safe (No memory allocation, no access to
// global data, etc.)
if (child_pid != 0) {
return child_pid;
}
int child_std_end = do_read ? 1 : 0;
close(parent_end);
if (child_end != child_std_end) {
if (dup2(child_end, child_std_end) != child_std_end) {
return -1;
}
close(child_end);
}
close_open_fds_internal();
if (execl("/bin/sh", "sh", "-c", real_cmd, NULL) < 0) {
return -1;
}
exit(127);
}
std::shared_ptr<FILE> shell_popen(const std::string& cmd,
const std::string& mode, int* err_no) {
bool do_read = mode == "r";
bool do_write = mode == "w";
if (!(do_read || do_write)) {
*err_no = -1;
return NULL;
}
if (shell_verbose()) {
LOG(INFO) << "Opening pipe[" << cmd << "] with mode[" << mode << "]";
}
std::string real_cmd = "set -o pipefail; " + cmd;
int pipe_fds[2];
if (pipe(pipe_fds) != 0) {
*err_no = -1;
return NULL;
}
int parent_end = 0;
int child_end = 0;
if (do_read) {
parent_end = pipe_fds[0];
child_end = pipe_fds[1];
} else if (do_write) {
parent_end = pipe_fds[1];
child_end = pipe_fds[0];
}
int child_pid = shell_popen_fork_internal(real_cmd.c_str(), do_read,
parent_end, child_end);
close(child_end);
fcntl(parent_end, F_SETFD, FD_CLOEXEC);
FILE* fp;
if ((fp = fdopen(parent_end, mode.c_str())) == NULL) {
*err_no = -1;
return NULL;
}
return {fp, [child_pid, cmd, err_no](FILE* fp) {
if (shell_verbose()) {
LOG(INFO) << "Closing pipe[" << cmd << "]";
}
if (fclose(fp) != 0) {
*err_no = -1;
}
int wstatus = -1;
// int ret = waitpid(child_pid, &wstatus, 0);
waitpid(child_pid, &wstatus, 0);
if (wstatus == 0 || wstatus == (128 + SIGPIPE) * 256 ||
(wstatus == -1 && errno == ECHILD)) {
// LOG(INFO) << "status[" << wstatus << "], cmd[" << cmd << "]" <<
// ", err_no[" << *err_no << "]";
} else {
*err_no = -1;
LOG(WARNING) << "status[" << wstatus << "], cmd[" << cmd << "]"
<< ", err_no[" << *err_no << "]";
}
if (wstatus == -1 && errno == ECHILD) {
LOG(WARNING) << "errno is ECHILD";
}
}};
}
static int shell_p2open_fork_internal(const char* real_cmd, int pipein_fds[2],
int pipeout_fds[2]) {
int child_pid = -1;
if ((child_pid = fork()) < 0) {
return -1;
}
if (child_pid != 0) {
return child_pid;
}
close(pipein_fds[0]);
close(pipeout_fds[1]);
if (pipein_fds[1] != 1) {
if (dup2(pipein_fds[1], 1) != 1) {
return -1;
}
close(pipein_fds[1]);
}
if (pipeout_fds[0] != 0) {
if (dup2(pipeout_fds[0], 0) != 0) {
return -1;
}
close(pipeout_fds[0]);
}
close_open_fds_internal();
if (execl("/bin/sh", "sh", "-c", real_cmd, NULL) < 0) {
return -1;
}
exit(127);
}
std::pair<std::shared_ptr<FILE>, std::shared_ptr<FILE>> shell_p2open(
const std::string& cmd) {
if (shell_verbose()) {
LOG(INFO) << "Opening bidirectional pipe[" << cmd << "]";
}
std::string real_cmd = "set -o pipefail; " + cmd;
int pipein_fds[2];
int pipeout_fds[2];
if (pipe(pipein_fds) != 0) {
return {NULL, NULL};
}
if (pipe(pipeout_fds) != 0) {
return {NULL, NULL};
}
int child_pid =
shell_p2open_fork_internal(real_cmd.c_str(), pipein_fds, pipeout_fds);
close(pipein_fds[1]);
close(pipeout_fds[0]);
fcntl(pipein_fds[0], F_SETFD, FD_CLOEXEC);
fcntl(pipeout_fds[1], F_SETFD, FD_CLOEXEC);
std::shared_ptr<int> child_life = {
NULL, [child_pid, cmd](void*) {
if (shell_verbose()) {
LOG(INFO) << "Closing bidirectional pipe[" << cmd << "]";
}
int wstatus, ret;
do {
PCHECK((ret = waitpid(child_pid, &wstatus, 0)) >= 0 ||
(ret == -1 && errno == EINTR));
} while (ret == -1 && errno == EINTR);
PCHECK(wstatus == 0 || wstatus == (128 + SIGPIPE) * 256 ||
(wstatus == -1 && errno == ECHILD))
<< "status[" << wstatus << "], cmd[" << cmd << "]";
if (wstatus == -1 && errno == ECHILD) {
LOG(WARNING) << "errno is ECHILD";
}
}};
FILE* in_fp;
PCHECK((in_fp = fdopen(pipein_fds[0], "r")) != NULL);
FILE* out_fp;
PCHECK((out_fp = fdopen(pipeout_fds[1], "w")) != NULL);
return {{in_fp, [child_life](FILE* fp) { PCHECK(fclose(fp) == 0); }},
{out_fp, [child_life](FILE* fp) { PCHECK(fclose(fp) == 0); }}};
}
std::string shell_get_command_output(const std::string& cmd) {
int err_no = 0;
do {
err_no = 0;
std::shared_ptr<FILE> pipe = shell_popen(cmd, "r", &err_no);
LineFileReader reader;
if (reader.getdelim(&*pipe, 0)) {
pipe = nullptr;
if (err_no == 0) {
return reader.get();
}
}
} while (err_no == -1);
return "";
}
} // namespace framework
} // namespace paddle
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/syscall.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <memory>
#include <string>
#include <utility>
#include "glog/logging.h"
#include "paddle/fluid/framework/common/ps_string.h"
namespace paddle {
namespace framework {
inline bool& shell_verbose_internal() {
static bool x = false;
return x;
}
inline bool shell_verbose() { return shell_verbose_internal(); }
inline void shell_set_verbose(bool x) { shell_verbose_internal() = x; }
extern std::shared_ptr<FILE> shell_fopen(const std::string& path,
const std::string& mode);
extern std::shared_ptr<FILE> shell_popen(const std::string& cmd,
const std::string& mode, int* err_no);
extern std::pair<std::shared_ptr<FILE>, std::shared_ptr<FILE>> shell_p2open(
const std::string& cmd);
inline void shell_execute(const std::string& cmd) {
int err_no = 0;
do {
err_no = 0;
shell_popen(cmd, "w", &err_no);
} while (err_no == -1);
}
extern std::string shell_get_command_output(const std::string& cmd);
} // namespace framework
} // namespace paddle
...@@ -12,10 +12,13 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ...@@ -12,10 +12,13 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include <stdio_ext.h>
#include "google/protobuf/io/zero_copy_stream_impl.h" #include "google/protobuf/io/zero_copy_stream_impl.h"
#include "google/protobuf/message.h" #include "google/protobuf/message.h"
#include "google/protobuf/text_format.h" #include "google/protobuf/text_format.h"
#include "common/fs.h"
#include "common/shell.h"
#include "gflags/gflags.h" #include "gflags/gflags.h"
#include "paddle/fluid/framework/data_feed.h" #include "paddle/fluid/framework/data_feed.h"
#include "paddle/fluid/framework/feed_fetch_method.h" #include "paddle/fluid/framework/feed_fetch_method.h"
...@@ -64,7 +67,7 @@ bool DataFeed::PickOneFile(std::string* filename) { ...@@ -64,7 +67,7 @@ bool DataFeed::PickOneFile(std::string* filename) {
return false; return false;
} }
*filename = filelist_[file_idx_++]; *filename = filelist_[file_idx_++];
LOG(ERROR) << "pick file:" << *filename; // LOG(ERROR) << "pick file:" << *filename;
return true; return true;
} }
...@@ -91,8 +94,24 @@ void PrivateQueueDataFeed<T>::SetQueueSize(int queue_size) { ...@@ -91,8 +94,24 @@ void PrivateQueueDataFeed<T>::SetQueueSize(int queue_size) {
template <typename T> template <typename T>
bool PrivateQueueDataFeed<T>::Start() { bool PrivateQueueDataFeed<T>::Start() {
CheckSetFileList(); CheckSetFileList();
std::string filename;
while (PickOneFile(&filename)) {
int err_no = 0;
std::string pipeline_cmd = "cat";
std::string path =
"/home/users/dongdaxiang/pslib_ctr/local/data_mod/part-00012";
fp_ = fs_open_read(path, &err_no, pipeline_cmd);
__fsetlocking(&*fp_, FSETLOCKING_BYCALLER);
thread_local LineFileReader reader;
while (reader.getline(&*(fp_.get()))) {
LOG(ERROR) << "read a line";
}
read_thread_ = std::thread(&PrivateQueueDataFeed::ReadThread, this); read_thread_ = std::thread(&PrivateQueueDataFeed::ReadThread, this);
read_thread_.detach(); read_thread_.detach();
}
queue_->Close();
finish_start_ = true; finish_start_ = true;
return true; return true;
...@@ -100,17 +119,10 @@ bool PrivateQueueDataFeed<T>::Start() { ...@@ -100,17 +119,10 @@ bool PrivateQueueDataFeed<T>::Start() {
template <typename T> template <typename T>
void PrivateQueueDataFeed<T>::ReadThread() { void PrivateQueueDataFeed<T>::ReadThread() {
std::string filename;
while (PickOneFile(&filename)) {
file_.open(filename.c_str()); // is_text_feed
PADDLE_ENFORCE(file_.good(), "Open file<%s> fail.", filename.c_str());
T instance; T instance;
while (ParseOneInstance(&instance)) { while (ParseOneInstanceFromPipe(&instance)) {
queue_->Send(instance); queue_->Send(instance);
} }
file_.close();
}
queue_->Close();
} }
template <typename T> template <typename T>
...@@ -168,6 +180,14 @@ void MultiSlotDataFeed::Init( ...@@ -168,6 +180,14 @@ void MultiSlotDataFeed::Init(
finish_init_ = true; finish_init_ = true;
} }
void MultiSlotDataFeed::ReadThread() {
LOG(ERROR) << "Haha";
std::vector<MultiSlotType> instance;
while (ParseOneInstanceFromPipe(&instance)) {
queue_->Send(instance);
}
}
bool MultiSlotDataFeed::CheckFile(const char* filename) { bool MultiSlotDataFeed::CheckFile(const char* filename) {
CheckInit(); // get info of slots CheckInit(); // get info of slots
std::ifstream fin(filename); std::ifstream fin(filename);
...@@ -279,6 +299,65 @@ bool MultiSlotDataFeed::CheckFile(const char* filename) { ...@@ -279,6 +299,65 @@ bool MultiSlotDataFeed::CheckFile(const char* filename) {
return true; return true;
} }
bool MultiSlotDataFeed::ParseOneInstanceFromPipe(
std::vector<MultiSlotType>* instance) {
LOG(ERROR) << "hehe";
thread_local LineFileReader reader;
while (reader.getline(&*(fp_.get()))) {
/*
const char* str = reader.get();
std::string line = std::string(str);
LOG(ERROR) << line;
*/
LOG(ERROR) << "read a line";
}
return true;
/*
if (!reader.getline(fp_.get())) {
return false;
} else {
// std::string& line = reader_.get();
// const char* str = line.c_str();
const char* str = reader.get();
std::string line = std::string(str);
LOG(ERROR) << line;
char* endptr = const_cast<char*>(str);
int pos = 0;
for (size_t i = 0; i < use_slots_index_.size(); ++i) {
int idx = use_slots_index_[i];
int num = strtol(&str[pos], &endptr, 10);
PADDLE_ENFORCE(
num,
"The number of ids can not be zero, you need padding "
"it in data generator; or if there is something wrong with "
"the data, please check if the data contains unresolvable "
"characters.\nplease check this error line: %s",
str);
if (idx != -1) {
(*instance)[idx].Init(all_slots_type_[i]);
if ((*instance)[idx].GetType()[0] == 'f') { // float
for (int j = 0; j < num; ++j) {
float feasign = strtof(endptr, &endptr);
(*instance)[idx].AddValue(feasign);
}
} else if ((*instance)[idx].GetType()[0] == 'u') { // uint64
for (int j = 0; j < num; ++j) {
uint64_t feasign = (uint64_t)strtoull(endptr, &endptr, 10);
(*instance)[idx].AddValue(feasign);
}
}
pos = endptr - str;
} else {
for (int j = 0; j <= num; ++j) {
pos = line.find_first_of(' ', pos + 1);
}
}
}
return true;
}
*/
}
bool MultiSlotDataFeed::ParseOneInstance(std::vector<MultiSlotType>* instance) { bool MultiSlotDataFeed::ParseOneInstance(std::vector<MultiSlotType>* instance) {
std::string line; std::string line;
if (getline(file_, line)) { if (getline(file_, line)) {
......
...@@ -21,6 +21,7 @@ limitations under the License. */ ...@@ -21,6 +21,7 @@ limitations under the License. */
#include <thread> // NOLINT #include <thread> // NOLINT
#include <vector> #include <vector>
#include "paddle/fluid/framework/common/ps_string.h"
#include "paddle/fluid/framework/data_feed.pb.h" #include "paddle/fluid/framework/data_feed.pb.h"
#include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/reader.h" #include "paddle/fluid/framework/reader.h"
...@@ -136,6 +137,7 @@ class PrivateQueueDataFeed : public DataFeed { ...@@ -136,6 +137,7 @@ class PrivateQueueDataFeed : public DataFeed {
virtual void SetQueueSize(int queue_size); virtual void SetQueueSize(int queue_size);
// The reading and parsing method called in the ReadThread. // The reading and parsing method called in the ReadThread.
virtual bool ParseOneInstance(T* instance) = 0; virtual bool ParseOneInstance(T* instance) = 0;
virtual bool ParseOneInstanceFromPipe(T* instance) = 0;
// This function is used to put instance to vec_ins // This function is used to put instance to vec_ins
virtual void AddInstanceToInsVec(T* vec_ins, const T& instance, virtual void AddInstanceToInsVec(T* vec_ins, const T& instance,
int index) = 0; int index) = 0;
...@@ -150,7 +152,9 @@ class PrivateQueueDataFeed : public DataFeed { ...@@ -150,7 +152,9 @@ class PrivateQueueDataFeed : public DataFeed {
// ifstream one line and one line parse: 6034 ms // ifstream one line and one line parse: 6034 ms
// fread one buffer and one buffer parse: 7097 ms // fread one buffer and one buffer parse: 7097 ms
std::ifstream file_; std::ifstream file_;
std::shared_ptr<FILE> fp_;
size_t queue_size_; size_t queue_size_;
LineFileReader reader_;
// The queue for store parsed data // The queue for store parsed data
std::unique_ptr<paddle::operators::reader::BlockingQueue<T>> queue_; std::unique_ptr<paddle::operators::reader::BlockingQueue<T>> queue_;
}; };
...@@ -228,12 +232,15 @@ class MultiSlotDataFeed ...@@ -228,12 +232,15 @@ class MultiSlotDataFeed
virtual ~MultiSlotDataFeed() {} virtual ~MultiSlotDataFeed() {}
virtual void Init(const paddle::framework::DataFeedDesc& data_feed_desc); virtual void Init(const paddle::framework::DataFeedDesc& data_feed_desc);
virtual bool CheckFile(const char* filename); virtual bool CheckFile(const char* filename);
// virtual void ReadThread();
protected: protected:
virtual void ReadThread();
virtual void AddInstanceToInsVec(std::vector<MultiSlotType>* vec_ins, virtual void AddInstanceToInsVec(std::vector<MultiSlotType>* vec_ins,
const std::vector<MultiSlotType>& instance, const std::vector<MultiSlotType>& instance,
int index); int index);
virtual bool ParseOneInstance(std::vector<MultiSlotType>* instance); virtual bool ParseOneInstance(std::vector<MultiSlotType>* instance);
virtual bool ParseOneInstanceFromPipe(std::vector<MultiSlotType>* instance);
virtual void PutToFeedVec(const std::vector<MultiSlotType>& ins_vec); virtual void PutToFeedVec(const std::vector<MultiSlotType>& ins_vec);
private: private:
......
...@@ -13,12 +13,15 @@ See the License for the specific language governing permissions and ...@@ -13,12 +13,15 @@ See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include "paddle/fluid/framework/executor_thread_worker.h" #include "paddle/fluid/framework/executor_thread_worker.h"
#include <stdio_ext.h>
#include <algorithm> #include <algorithm>
#include "google/protobuf/io/zero_copy_stream_impl.h" #include "google/protobuf/io/zero_copy_stream_impl.h"
#include "google/protobuf/message.h" #include "google/protobuf/message.h"
#include "google/protobuf/text_format.h" #include "google/protobuf/text_format.h"
#include "gflags/gflags.h" #include "gflags/gflags.h"
#include "paddle/fluid/framework/common/fs.h"
#include "paddle/fluid/framework/common/shell.h"
#include "paddle/fluid/framework/feed_fetch_method.h" #include "paddle/fluid/framework/feed_fetch_method.h"
#include "paddle/fluid/framework/feed_fetch_type.h" #include "paddle/fluid/framework/feed_fetch_type.h"
#include "paddle/fluid/framework/lod_rank_table.h" #include "paddle/fluid/framework/lod_rank_table.h"
...@@ -244,6 +247,8 @@ void ExecutorThreadWorker::TrainFilesWithTimer() { ...@@ -244,6 +247,8 @@ void ExecutorThreadWorker::TrainFilesWithTimer() {
platform::SetNumThreads(1); platform::SetNumThreads(1);
SetDevice(); SetDevice();
thread_reader_->Start(); thread_reader_->Start();
exit(0);
/*
std::vector<double> op_total_time; std::vector<double> op_total_time;
std::vector<std::string> op_name; std::vector<std::string> op_name;
for (auto& op : ops_) { for (auto& op : ops_) {
...@@ -287,13 +292,14 @@ void ExecutorThreadWorker::TrainFilesWithTimer() { ...@@ -287,13 +292,14 @@ void ExecutorThreadWorker::TrainFilesWithTimer() {
} }
timeline.Start(); timeline.Start();
} }
*/
} }
void ExecutorThreadWorker::TrainFiles() { void ExecutorThreadWorker::TrainFiles() {
platform::SetNumThreads(1); platform::SetNumThreads(1);
// todo: configurable // todo: configurable
SetDevice(); // SetDevice();
int fetch_var_num = fetch_var_names_.size(); int fetch_var_num = fetch_var_names_.size();
fetch_values_.clear(); fetch_values_.clear();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册