From 94a3789fd0aa7beec34398a0ea09b57b96e19761 Mon Sep 17 00:00:00 2001 From: hutuxian Date: Fri, 10 Apr 2020 12:24:36 +0800 Subject: [PATCH] Add AfsAPI in PaddleBox (#23419) * Involves AfsAPI to resolve slow downloading. * Mainly used in PaddleBox --- paddle/fluid/framework/data_feed.cc | 13 +- paddle/fluid/framework/fleet/box_wrapper.cc | 1 + paddle/fluid/framework/fleet/box_wrapper.h | 216 ++++++++++++++++++ paddle/fluid/pybind/box_helper_py.cc | 2 + paddle/fluid/pybind/pybind.cc | 12 +- .../fluid/tests/unittests/test_boxps.py | 10 + 6 files changed, 247 insertions(+), 7 deletions(-) diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index 9e31b581a01..12e517dec62 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -370,8 +370,17 @@ void InMemoryDataFeed::LoadIntoMemory() { while (this->PickOneFile(&filename)) { VLOG(3) << "PickOneFile, filename=" << filename << ", thread_id=" << thread_id_; - int err_no = 0; - this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_); +#ifdef PADDLE_WITH_BOX_PS + if (BoxWrapper::GetInstance()->UseAfsApi()) { + this->fp_ = BoxWrapper::GetInstance()->afs_manager->GetFile( + filename, this->pipe_command_); + } else { +#endif + int err_no = 0; + this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_); +#ifdef PADDLE_WITH_BOX_PS + } +#endif CHECK(this->fp_ != nullptr); __fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER); paddle::framework::ChannelWriter writer(input_channel_); diff --git a/paddle/fluid/framework/fleet/box_wrapper.cc b/paddle/fluid/framework/fleet/box_wrapper.cc index 3e1d4558b84..9d894d2ef38 100644 --- a/paddle/fluid/framework/fleet/box_wrapper.cc +++ b/paddle/fluid/framework/fleet/box_wrapper.cc @@ -27,6 +27,7 @@ namespace framework { std::shared_ptr BoxWrapper::s_instance_ = nullptr; cudaStream_t BoxWrapper::stream_list_[8]; std::shared_ptr BoxWrapper::boxps_ptr_ = nullptr; +AfsManager* BoxWrapper::afs_manager = nullptr; void BasicAucCalculator::compute() { double* table[2] = {&_table[0][0], &_table[1][0]}; diff --git a/paddle/fluid/framework/fleet/box_wrapper.h b/paddle/fluid/framework/fleet/box_wrapper.h index b2a2718bbc1..8f4eebd7c07 100644 --- a/paddle/fluid/framework/fleet/box_wrapper.h +++ b/paddle/fluid/framework/fleet/box_wrapper.h @@ -15,7 +15,13 @@ limitations under the License. */ #pragma once #ifdef PADDLE_WITH_BOX_PS +#include #include +#include +#include +#include +#include +#include #endif #include #include @@ -36,6 +42,7 @@ limitations under the License. */ #include "paddle/fluid/platform/place.h" #include "paddle/fluid/platform/timer.h" #include "paddle/fluid/string/string_helper.h" +#define BUF_SIZE 1024 * 1024 namespace paddle { namespace framework { @@ -120,6 +127,203 @@ class BasicAucCalculator { std::mutex _table_mutex; }; +class AfsStreamFile { + public: + explicit AfsStreamFile(afs::AfsFileSystem* afsfile) + : afsfile_(afsfile), reader_(nullptr) {} + virtual ~AfsStreamFile() { + if (reader_ != NULL) { + afsfile_->CloseReader(reader_); + reader_ = NULL; + } + } + virtual int Open(const char* path) { + if (path == NULL) { + return -1; + } + reader_ = afsfile_->OpenReader(path); + PADDLE_ENFORCE_NE(reader_, nullptr, + platform::errors::PreconditionNotMet( + "OpenReader for file[%s] failed.", path)); + return 0; + } + virtual int Read(char* buf, int len) { + int ret = reader_->Read(buf, len); + return ret; + } + + private: + afs::AfsFileSystem* afsfile_; + afs::Reader* reader_; +}; + +class AfsManager { + public: + AfsManager(const std::string& fs_name, const std::string& fs_ugi, + const std::string& conf_path) { + auto split = fs_ugi.find(","); + std::string user = fs_ugi.substr(0, split); + std::string pwd = fs_ugi.substr(split + 1); + _afshandler = new afs::AfsFileSystem(fs_name.c_str(), user.c_str(), + pwd.c_str(), conf_path.c_str()); + VLOG(0) << "AFSAPI Init: user: " << user << ", pwd: " << pwd; + int ret = _afshandler->Init(true, true); + PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet( + "Called AFSAPI Init Interface Failed.")); + ret = _afshandler->Connect(); + PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet( + "Called AFSAPI Connect Interface Failed")); + } + virtual ~AfsManager() { + if (_afshandler != NULL) { + _afshandler->DisConnect(); + _afshandler->Destroy(); + delete _afshandler; + _afshandler = nullptr; + } + } + static void ReadFromAfs(const std::string& path, FILE* wfp, + afs::AfsFileSystem* _afshandler) { + AfsStreamFile* read_stream = new AfsStreamFile(_afshandler); + int ret = read_stream->Open(path.c_str()); + PADDLE_ENFORCE_EQ(ret, 0, + platform::errors::PreconditionNotMet( + "Called AFSAPI Open file %s Failed.", path.c_str())); + char* _buff = static_cast(calloc(BUF_SIZE + 2, sizeof(char))); + int size = 0; + while ((size = read_stream->Read(_buff, BUF_SIZE)) > 0) { + fwrite(_buff, 1, size, wfp); + } + fflush(wfp); + fclose(wfp); + delete _buff; + delete read_stream; + } + int PopenBidirectionalInternal(const char* command, + FILE*& fp_read, // NOLINT + FILE*& fp_write, pid_t& pid, // NOLINT + bool read, // NOLINT + bool write) { + std::lock_guard g(g_flock); + int fd_read[2]; + int fd_write[2]; + if (read) { + if (pipe(fd_read) != 0) { + LOG(FATAL) << "create read pipe failed"; + return -1; + } + } + if (write) { + if (pipe(fd_write) != 0) { + LOG(FATAL) << "create write pipe failed"; + return -1; + } + } + pid = vfork(); + if (pid < 0) { + LOG(FATAL) << "fork failed"; + return -1; + } + if (pid == 0) { + if (read) { + if (-1 == dup2(fd_read[1], STDOUT_FILENO)) { + LOG(FATAL) << "dup2 failed"; + } + close(fd_read[1]); + close(fd_read[0]); + } + + if (write) { + if (-1 == dup2(fd_write[0], STDIN_FILENO)) { + LOG(FATAL) << "dup2 failed"; + } + close(fd_write[0]); + close(fd_write[1]); + } + + struct dirent* item; + DIR* dir = opendir("/proc/self/fd"); + while ((item = readdir(dir)) != NULL) { + int fd = atoi(item->d_name); + if (fd >= 3) { + (void)close(fd); + } + } + + closedir(dir); + + execl("/bin/sh", "sh", "-c", command, NULL); + exit(127); + } else { + if (read) { + close(fd_read[1]); + fcntl(fd_read[0], F_SETFD, FD_CLOEXEC); + fp_read = fdopen(fd_read[0], "r"); + if (0 == fp_read) { + LOG(FATAL) << "fdopen failed."; + return -1; + } + } + + if (write) { + close(fd_write[0]); + fcntl(fd_write[1], F_SETFD, FD_CLOEXEC); + fp_write = fdopen(fd_write[1], "w"); + if (0 == fp_write) { + LOG(FATAL) << "fdopen failed."; + return -1; + } + } + return 0; + } + } + std::shared_ptr GetFile(const std::string& path, + const std::string& pipe_command) { + pid_t pid = 0; + FILE* wfp = NULL; + FILE* rfp = NULL; + + // Always use set -eo pipefail. Fail fast and be aware of exit codes. + std::string cmd = "set -eo pipefail; " + pipe_command; + int ret = + PopenBidirectionalInternal(cmd.c_str(), rfp, wfp, pid, true, true); + + PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet( + "Called PopenBidirectionalInternal Failed")); + std::string filename(path); + if (strncmp(filename.c_str(), "afs:", 4) == 0) { + filename = filename.substr(4); + } + std::thread read_thread(&AfsManager::ReadFromAfs, filename, wfp, + _afshandler); + read_thread.detach(); + return {rfp, [pid, cmd](FILE* rfp) { + int wstatus = -1; + int ret = -1; + do { + ret = waitpid(pid, &wstatus, 0); + } while (ret == -1 && errno == EINTR); + + fclose(rfp); + if (wstatus == 0 || wstatus == (128 + SIGPIPE) * 256 || + (wstatus == -1 && errno == ECHILD)) { + VLOG(3) << "pclose_bidirectional pid[" << pid << "], status[" + << wstatus << "]"; + } else { + LOG(WARNING) << "pclose_bidirectional pid[" << pid << "]" + << ", ret[" << ret << "] shell open fail"; + } + if (wstatus == -1 && errno == ECHILD) { + LOG(WARNING) << "errno is ECHILD"; + } + }}; + } + + private: + afs::AfsFileSystem* _afshandler; + std::mutex g_flock; +}; + class BoxWrapper { public: virtual ~BoxWrapper() {} @@ -224,6 +428,14 @@ class BoxWrapper { return s_instance_; } + void InitAfsAPI(const std::string& fs_name, const std::string& fs_ugi, + const std::string& conf_path) { + afs_manager = new AfsManager(fs_name, fs_ugi, conf_path); + use_afs_api_ = true; + } + + bool UseAfsApi() const { return use_afs_api_; } + const std::unordered_set& GetOmitedSlot() const { return slot_name_omited_in_feedpass_; } @@ -521,6 +733,10 @@ class BoxWrapper { std::vector metric_name_list_; std::vector slot_vector_; std::vector keys_tensor; // Cache for pull_sparse + bool use_afs_api_ = false; + + public: + static AfsManager* afs_manager; }; #endif diff --git a/paddle/fluid/pybind/box_helper_py.cc b/paddle/fluid/pybind/box_helper_py.cc index 287de7e6be3..c2b6a73bbd4 100644 --- a/paddle/fluid/pybind/box_helper_py.cc +++ b/paddle/fluid/pybind/box_helper_py.cc @@ -81,6 +81,8 @@ void BindBoxWrapper(py::module* m) { py::call_guard()) .def("flip_pass_flag", &framework::BoxWrapper::FlipPassFlag, py::call_guard()) + .def("init_afs_api", &framework::BoxWrapper::InitAfsAPI, + py::call_guard()) .def("finalize", &framework::BoxWrapper::Finalize, py::call_guard()); } // end BoxWrapper diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index a99be84e19b..ca3be55d4e5 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -1488,11 +1488,13 @@ All parameter, weight, gradient are variables in Paddle. m.def("is_compiled_with_mkldnn", IsCompiledWithMKLDNN); m.def("is_compiled_with_brpc", IsCompiledWithBrpc); m.def("is_compiled_with_dist", IsCompiledWithDIST); - m.def("run_cmd", [](const std::string &cmd, int time_out = -1, - int sleep_inter = -1) -> const std::string { - return paddle::framework::shell_get_command_output(cmd, time_out, - sleep_inter); - }); + m.def("run_cmd", + [](const std::string &cmd, int time_out = -1, + int sleep_inter = -1) -> const std::string { + return paddle::framework::shell_get_command_output(cmd, time_out, + sleep_inter); + }, + py::arg("cmd"), py::arg("time_out") = -1, py::arg("sleep_inter") = -1); #ifdef PADDLE_WITH_CUDA m.def("is_float16_supported", [](const platform::CUDAPlace &place) -> bool { // Only GPUs with Compute Capability >= 53 support float16 diff --git a/python/paddle/fluid/tests/unittests/test_boxps.py b/python/paddle/fluid/tests/unittests/test_boxps.py index 563ccc2b8af..17e378115a3 100644 --- a/python/paddle/fluid/tests/unittests/test_boxps.py +++ b/python/paddle/fluid/tests/unittests/test_boxps.py @@ -76,6 +76,16 @@ class TestTranspile(unittest.TestCase): print(e) +class TestRunCmd(unittest.TestCase): + """ TestCases for run_cmd""" + + def test_run_cmd(self): + ret1 = int(core.run_cmd("ls; echo $?").strip().split('\n')[-1]) + ret2 = int(core.run_cmd("ls; echo $?", -1, -1).strip().split('\n')[-1]) + self.assertTrue(ret1 == 0) + self.assertTrue(ret2 == 0) + + class TestBoxPSPreload(unittest.TestCase): """ TestCases for BoxPS Preload """ -- GitLab