未验证 提交 94a3789f 编写于 作者: H hutuxian 提交者: GitHub

Add AfsAPI in PaddleBox (#23419)

* Involves AfsAPI to resolve slow downloading.
* Mainly used in PaddleBox
上级 d7dd4e1d
......@@ -370,8 +370,17 @@ void InMemoryDataFeed<T>::LoadIntoMemory() {
while (this->PickOneFile(&filename)) {
VLOG(3) << "PickOneFile, filename=" << filename
<< ", thread_id=" << thread_id_;
int err_no = 0;
this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_);
#ifdef PADDLE_WITH_BOX_PS
if (BoxWrapper::GetInstance()->UseAfsApi()) {
this->fp_ = BoxWrapper::GetInstance()->afs_manager->GetFile(
filename, this->pipe_command_);
} else {
#endif
int err_no = 0;
this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_);
#ifdef PADDLE_WITH_BOX_PS
}
#endif
CHECK(this->fp_ != nullptr);
__fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER);
paddle::framework::ChannelWriter<T> writer(input_channel_);
......
......@@ -27,6 +27,7 @@ namespace framework {
std::shared_ptr<BoxWrapper> BoxWrapper::s_instance_ = nullptr;
cudaStream_t BoxWrapper::stream_list_[8];
std::shared_ptr<boxps::BoxPSBase> BoxWrapper::boxps_ptr_ = nullptr;
AfsManager* BoxWrapper::afs_manager = nullptr;
void BasicAucCalculator::compute() {
double* table[2] = {&_table[0][0], &_table[1][0]};
......
......@@ -15,7 +15,13 @@ limitations under the License. */
#pragma once
#ifdef PADDLE_WITH_BOX_PS
#include <afs_filesystem.h>
#include <boxps_public.h>
#include <dirent.h>
#include <signal.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#endif
#include <glog/logging.h>
#include <algorithm>
......@@ -36,6 +42,7 @@ limitations under the License. */
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/timer.h"
#include "paddle/fluid/string/string_helper.h"
#define BUF_SIZE 1024 * 1024
namespace paddle {
namespace framework {
......@@ -120,6 +127,203 @@ class BasicAucCalculator {
std::mutex _table_mutex;
};
class AfsStreamFile {
public:
explicit AfsStreamFile(afs::AfsFileSystem* afsfile)
: afsfile_(afsfile), reader_(nullptr) {}
virtual ~AfsStreamFile() {
if (reader_ != NULL) {
afsfile_->CloseReader(reader_);
reader_ = NULL;
}
}
virtual int Open(const char* path) {
if (path == NULL) {
return -1;
}
reader_ = afsfile_->OpenReader(path);
PADDLE_ENFORCE_NE(reader_, nullptr,
platform::errors::PreconditionNotMet(
"OpenReader for file[%s] failed.", path));
return 0;
}
virtual int Read(char* buf, int len) {
int ret = reader_->Read(buf, len);
return ret;
}
private:
afs::AfsFileSystem* afsfile_;
afs::Reader* reader_;
};
class AfsManager {
public:
AfsManager(const std::string& fs_name, const std::string& fs_ugi,
const std::string& conf_path) {
auto split = fs_ugi.find(",");
std::string user = fs_ugi.substr(0, split);
std::string pwd = fs_ugi.substr(split + 1);
_afshandler = new afs::AfsFileSystem(fs_name.c_str(), user.c_str(),
pwd.c_str(), conf_path.c_str());
VLOG(0) << "AFSAPI Init: user: " << user << ", pwd: " << pwd;
int ret = _afshandler->Init(true, true);
PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
"Called AFSAPI Init Interface Failed."));
ret = _afshandler->Connect();
PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
"Called AFSAPI Connect Interface Failed"));
}
virtual ~AfsManager() {
if (_afshandler != NULL) {
_afshandler->DisConnect();
_afshandler->Destroy();
delete _afshandler;
_afshandler = nullptr;
}
}
static void ReadFromAfs(const std::string& path, FILE* wfp,
afs::AfsFileSystem* _afshandler) {
AfsStreamFile* read_stream = new AfsStreamFile(_afshandler);
int ret = read_stream->Open(path.c_str());
PADDLE_ENFORCE_EQ(ret, 0,
platform::errors::PreconditionNotMet(
"Called AFSAPI Open file %s Failed.", path.c_str()));
char* _buff = static_cast<char*>(calloc(BUF_SIZE + 2, sizeof(char)));
int size = 0;
while ((size = read_stream->Read(_buff, BUF_SIZE)) > 0) {
fwrite(_buff, 1, size, wfp);
}
fflush(wfp);
fclose(wfp);
delete _buff;
delete read_stream;
}
int PopenBidirectionalInternal(const char* command,
FILE*& fp_read, // NOLINT
FILE*& fp_write, pid_t& pid, // NOLINT
bool read, // NOLINT
bool write) {
std::lock_guard<std::mutex> g(g_flock);
int fd_read[2];
int fd_write[2];
if (read) {
if (pipe(fd_read) != 0) {
LOG(FATAL) << "create read pipe failed";
return -1;
}
}
if (write) {
if (pipe(fd_write) != 0) {
LOG(FATAL) << "create write pipe failed";
return -1;
}
}
pid = vfork();
if (pid < 0) {
LOG(FATAL) << "fork failed";
return -1;
}
if (pid == 0) {
if (read) {
if (-1 == dup2(fd_read[1], STDOUT_FILENO)) {
LOG(FATAL) << "dup2 failed";
}
close(fd_read[1]);
close(fd_read[0]);
}
if (write) {
if (-1 == dup2(fd_write[0], STDIN_FILENO)) {
LOG(FATAL) << "dup2 failed";
}
close(fd_write[0]);
close(fd_write[1]);
}
struct dirent* item;
DIR* dir = opendir("/proc/self/fd");
while ((item = readdir(dir)) != NULL) {
int fd = atoi(item->d_name);
if (fd >= 3) {
(void)close(fd);
}
}
closedir(dir);
execl("/bin/sh", "sh", "-c", command, NULL);
exit(127);
} else {
if (read) {
close(fd_read[1]);
fcntl(fd_read[0], F_SETFD, FD_CLOEXEC);
fp_read = fdopen(fd_read[0], "r");
if (0 == fp_read) {
LOG(FATAL) << "fdopen failed.";
return -1;
}
}
if (write) {
close(fd_write[0]);
fcntl(fd_write[1], F_SETFD, FD_CLOEXEC);
fp_write = fdopen(fd_write[1], "w");
if (0 == fp_write) {
LOG(FATAL) << "fdopen failed.";
return -1;
}
}
return 0;
}
}
std::shared_ptr<FILE> GetFile(const std::string& path,
const std::string& pipe_command) {
pid_t pid = 0;
FILE* wfp = NULL;
FILE* rfp = NULL;
// Always use set -eo pipefail. Fail fast and be aware of exit codes.
std::string cmd = "set -eo pipefail; " + pipe_command;
int ret =
PopenBidirectionalInternal(cmd.c_str(), rfp, wfp, pid, true, true);
PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
"Called PopenBidirectionalInternal Failed"));
std::string filename(path);
if (strncmp(filename.c_str(), "afs:", 4) == 0) {
filename = filename.substr(4);
}
std::thread read_thread(&AfsManager::ReadFromAfs, filename, wfp,
_afshandler);
read_thread.detach();
return {rfp, [pid, cmd](FILE* rfp) {
int wstatus = -1;
int ret = -1;
do {
ret = waitpid(pid, &wstatus, 0);
} while (ret == -1 && errno == EINTR);
fclose(rfp);
if (wstatus == 0 || wstatus == (128 + SIGPIPE) * 256 ||
(wstatus == -1 && errno == ECHILD)) {
VLOG(3) << "pclose_bidirectional pid[" << pid << "], status["
<< wstatus << "]";
} else {
LOG(WARNING) << "pclose_bidirectional pid[" << pid << "]"
<< ", ret[" << ret << "] shell open fail";
}
if (wstatus == -1 && errno == ECHILD) {
LOG(WARNING) << "errno is ECHILD";
}
}};
}
private:
afs::AfsFileSystem* _afshandler;
std::mutex g_flock;
};
class BoxWrapper {
public:
virtual ~BoxWrapper() {}
......@@ -224,6 +428,14 @@ class BoxWrapper {
return s_instance_;
}
void InitAfsAPI(const std::string& fs_name, const std::string& fs_ugi,
const std::string& conf_path) {
afs_manager = new AfsManager(fs_name, fs_ugi, conf_path);
use_afs_api_ = true;
}
bool UseAfsApi() const { return use_afs_api_; }
const std::unordered_set<std::string>& GetOmitedSlot() const {
return slot_name_omited_in_feedpass_;
}
......@@ -521,6 +733,10 @@ class BoxWrapper {
std::vector<std::string> metric_name_list_;
std::vector<int> slot_vector_;
std::vector<LoDTensor> keys_tensor; // Cache for pull_sparse
bool use_afs_api_ = false;
public:
static AfsManager* afs_manager;
};
#endif
......
......@@ -81,6 +81,8 @@ void BindBoxWrapper(py::module* m) {
py::call_guard<py::gil_scoped_release>())
.def("flip_pass_flag", &framework::BoxWrapper::FlipPassFlag,
py::call_guard<py::gil_scoped_release>())
.def("init_afs_api", &framework::BoxWrapper::InitAfsAPI,
py::call_guard<py::gil_scoped_release>())
.def("finalize", &framework::BoxWrapper::Finalize,
py::call_guard<py::gil_scoped_release>());
} // end BoxWrapper
......
......@@ -1488,11 +1488,13 @@ All parameter, weight, gradient are variables in Paddle.
m.def("is_compiled_with_mkldnn", IsCompiledWithMKLDNN);
m.def("is_compiled_with_brpc", IsCompiledWithBrpc);
m.def("is_compiled_with_dist", IsCompiledWithDIST);
m.def("run_cmd", [](const std::string &cmd, int time_out = -1,
int sleep_inter = -1) -> const std::string {
return paddle::framework::shell_get_command_output(cmd, time_out,
sleep_inter);
});
m.def("run_cmd",
[](const std::string &cmd, int time_out = -1,
int sleep_inter = -1) -> const std::string {
return paddle::framework::shell_get_command_output(cmd, time_out,
sleep_inter);
},
py::arg("cmd"), py::arg("time_out") = -1, py::arg("sleep_inter") = -1);
#ifdef PADDLE_WITH_CUDA
m.def("is_float16_supported", [](const platform::CUDAPlace &place) -> bool {
// Only GPUs with Compute Capability >= 53 support float16
......
......@@ -76,6 +76,16 @@ class TestTranspile(unittest.TestCase):
print(e)
class TestRunCmd(unittest.TestCase):
""" TestCases for run_cmd"""
def test_run_cmd(self):
ret1 = int(core.run_cmd("ls; echo $?").strip().split('\n')[-1])
ret2 = int(core.run_cmd("ls; echo $?", -1, -1).strip().split('\n')[-1])
self.assertTrue(ret1 == 0)
self.assertTrue(ret2 == 0)
class TestBoxPSPreload(unittest.TestCase):
""" TestCases for BoxPS Preload """
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册