From cadc4e6af8e8b1038e8f2b891a440c27c73252b0 Mon Sep 17 00:00:00 2001 From: Thunderbrook <52529258+Thunderbrook@users.noreply.github.com> Date: Mon, 28 Mar 2022 14:46:29 +0800 Subject: [PATCH] [HeterPS] So Parser (#40750) * So Parser * add macro * add macro * slotrecord * add macro * code format --- paddle/fluid/framework/data_feed.cc | 94 +++++++++++-------- paddle/fluid/framework/data_feed.h | 18 +++- .../fluid/framework/fleet/ps_gpu_wrapper.cc | 14 ++- paddle/fluid/framework/fleet/ps_gpu_wrapper.h | 19 ++++ paddle/fluid/pybind/ps_gpu_wrapper_py.cc | 4 + 5 files changed, 106 insertions(+), 43 deletions(-) diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index d2240c2110..330f5ea529 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -18,6 +18,7 @@ limitations under the License. */ #endif #include "paddle/fluid/framework/data_feed.h" +#include "paddle/fluid/framework/fleet/ps_gpu_wrapper.h" #ifdef _LINUX #include #include @@ -555,10 +556,13 @@ void InMemoryDataFeed::LoadIntoMemory() { template void InMemoryDataFeed::LoadIntoMemoryFromSo() { -#ifdef _LINUX +#if (defined _LINUX) && (defined PADDLE_WITH_HETERPS) && \ + (defined PADDLE_WITH_PSLIB) VLOG(3) << "LoadIntoMemoryFromSo() begin, thread_id=" << thread_id_; + int buf_len = 1024 * 1024 * 10; + char* buf = (char*)malloc(buf_len + 10); + auto ps_gpu_ptr = PSGPUWrapper::GetInstance(); - string::LineFileReader reader; paddle::framework::CustomParser* parser = global_dlmanager_pool().Load(so_parser_name_, slot_conf_); @@ -566,34 +570,34 @@ void InMemoryDataFeed::LoadIntoMemoryFromSo() { while (this->PickOneFile(&filename)) { VLOG(3) << "PickOneFile, filename=" << filename << ", thread_id=" << thread_id_; - int err_no = 0; - this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_); - CHECK(this->fp_ != nullptr); - __fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER); - - paddle::framework::ChannelWriter writer(input_channel_); - T instance; platform::Timer timeline; timeline.Start(); - - while (1) { - if (!reader.getline(&*(fp_.get()))) { - break; - } else { - const char* str = reader.get(); - ParseOneInstanceFromSo(str, &instance, parser); + if (ps_gpu_ptr->UseAfsApi()) { + auto afs_reader = ps_gpu_ptr->OpenReader(filename); + int read_len = 0; + char* cursor = buf; + int remain = 0; + while ((read_len = afs_reader->read(cursor, buf_len - remain)) > 0) { + std::vector instances; + read_len += remain; + remain = ParseInstanceFromSo(read_len, buf, &instances, parser); + input_channel_->Write(std::move(instances)); + instances = std::vector(); + if (remain) { + memmove(buf, buf + read_len - remain, remain); + } + cursor = buf + remain; } - - writer << std::move(instance); - instance = T(); + } else { + VLOG(0) << "Should Call InitAfsApi First"; } - writer.Flush(); timeline.Pause(); VLOG(3) << "LoadIntoMemoryFromSo() read all lines, file=" << filename << ", cost time=" << timeline.ElapsedSec() << " seconds, thread_id=" << thread_id_; } + free(buf); VLOG(3) << "LoadIntoMemoryFromSo() end, thread_id=" << thread_id_; #endif } @@ -1088,10 +1092,11 @@ void MultiSlotInMemoryDataFeed::GetMsgFromLogKey(const std::string& log_key, *rank = (uint32_t)strtoul(rank_str.c_str(), NULL, 16); } -void MultiSlotInMemoryDataFeed::ParseOneInstanceFromSo(const char* str, - Record* instance, - CustomParser* parser) { - parser->ParseOneInstance(str, instance); +int MultiSlotInMemoryDataFeed::ParseInstanceFromSo( + int len, const char* str, std::vector* instances, + CustomParser* parser) { + // VLOG(0) << "parser: " << parser; + return parser->ParseInstance(len, str, instances); } bool MultiSlotInMemoryDataFeed::ParseOneInstanceFromPipe(Record* instance) { @@ -2078,7 +2083,8 @@ void SlotRecordInMemoryDataFeed::LoadIntoMemoryByLib(void) { } void SlotRecordInMemoryDataFeed::LoadIntoMemoryByFile(void) { -#ifdef _LINUX +#if (defined _LINUX) && (defined PADDLE_WITH_HETERPS) && \ + (defined PADDLE_WITH_PSLIB) paddle::framework::CustomParser* parser = global_dlmanager_pool().Load(so_parser_name_, all_slots_info_); CHECK(parser != nullptr); @@ -2111,21 +2117,31 @@ void SlotRecordInMemoryDataFeed::LoadIntoMemoryByFile(void) { int lines = 0; bool is_ok = true; + auto ps_gpu_ptr = PSGPUWrapper::GetInstance(); do { - int err_no = 0; - this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_); - - CHECK(this->fp_ != nullptr); - __fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER); - is_ok = parser->ParseFileInstance( - [this](char* buf, int len) { - return fread(buf, sizeof(char), len, this->fp_.get()); - }, - pull_record_func, lines); - - if (!is_ok) { - LOG(WARNING) << "parser error, filename=" << filename - << ", lines=" << lines; + if (ps_gpu_ptr->UseAfsApi()) { + auto afs_reader = ps_gpu_ptr->OpenReader(filename); + is_ok = parser->ParseFileInstance( + [this, afs_reader](char* buf, int len) { + return afs_reader->read(buf, len); + }, + pull_record_func, lines); + } else { + int err_no = 0; + this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_); + + CHECK(this->fp_ != nullptr); + __fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER); + is_ok = parser->ParseFileInstance( + [this](char* buf, int len) { + return fread(buf, sizeof(char), len, this->fp_.get()); + }, + pull_record_func, lines); + + if (!is_ok) { + LOG(WARNING) << "parser error, filename=" << filename + << ", lines=" << lines; + } } } while (!is_ok); timeline.Pause(); diff --git a/paddle/fluid/framework/data_feed.h b/paddle/fluid/framework/data_feed.h index edf5e474af..eb6ed26880 100644 --- a/paddle/fluid/framework/data_feed.h +++ b/paddle/fluid/framework/data_feed.h @@ -388,8 +388,12 @@ class CustomParser { CustomParser() {} virtual ~CustomParser() {} virtual void Init(const std::vector& slots) = 0; - virtual bool Init(const std::vector& slots); + virtual bool Init(const std::vector& slots) = 0; virtual void ParseOneInstance(const char* str, Record* instance) = 0; + virtual int ParseInstance(int len, const char* str, + std::vector* instances) { + return 0; + }; virtual bool ParseOneInstance( const std::string& line, std::function&, int)> @@ -451,7 +455,7 @@ class DLManager { handle.module = dlopen(name.c_str(), RTLD_NOW); if (handle.module == nullptr) { - VLOG(0) << "Create so of " << name << " fail"; + VLOG(0) << "Create so of " << name << " fail, " << dlerror(); return nullptr; } @@ -730,6 +734,11 @@ class InMemoryDataFeed : public DataFeed { virtual bool ParseOneInstanceFromPipe(T* instance) = 0; virtual void ParseOneInstanceFromSo(const char* str, T* instance, CustomParser* parser) {} + virtual int ParseInstanceFromSo(int len, const char* str, + std::vector* instances, + CustomParser* parser) { + return 0; + } virtual void PutToFeedVec(const std::vector& ins_vec) = 0; virtual void PutToFeedVec(const T* ins_vec, int num) = 0; @@ -1104,7 +1113,10 @@ class MultiSlotInMemoryDataFeed : public InMemoryDataFeed { virtual bool ParseOneInstance(Record* instance); virtual bool ParseOneInstanceFromPipe(Record* instance); virtual void ParseOneInstanceFromSo(const char* str, Record* instance, - CustomParser* parser); + CustomParser* parser){}; + virtual int ParseInstanceFromSo(int len, const char* str, + std::vector* instances, + CustomParser* parser); virtual void PutToFeedVec(const std::vector& ins_vec); virtual void GetMsgFromLogKey(const std::string& log_key, uint64_t* search_id, uint32_t* cmatch, uint32_t* rank); diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc index 432e57107e..baf8144131 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc @@ -39,7 +39,19 @@ namespace framework { std::shared_ptr PSGPUWrapper::s_instance_ = NULL; bool PSGPUWrapper::is_initialized_ = false; - +#ifdef PADDLE_WITH_PSLIB +void PSGPUWrapper::InitAfsApi(const std::string& fs_name, + const std::string& fs_user, + const std::string& pass_wd, + const std::string& conf) { + int ret = afs_handler_.init(fs_name.c_str(), fs_user.c_str(), pass_wd.c_str(), + conf.c_str()); + if (ret != 0) { + LOG(ERROR) << "AFS Init Error"; + } + use_afs_api_ = 1; +} +#endif void PSGPUWrapper::PreBuildTask(std::shared_ptr gpu_task) { VLOG(3) << "PSGPUWrapper::BuildGPUPSTask begin"; platform::Timer timeline; diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h index ef5cd8466f..eb7bd6da1e 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h @@ -45,6 +45,9 @@ limitations under the License. */ #ifdef PADDLE_WITH_PSCORE #include "paddle/fluid/distributed/ps/service/communicator/communicator.h" #endif +#ifdef PADDLE_WITH_PSLIB +#include "afs_api.h" +#endif namespace paddle { namespace framework { @@ -303,9 +306,24 @@ class PSGPUWrapper { void ShowOneTable(int index) { HeterPs_->show_one_table(index); } + int UseAfsApi() { return use_afs_api_; } + +#ifdef PADDLE_WITH_PSLIB + std::shared_ptr OpenReader( + const std::string& filename) { + return afs_handler_.open_reader(filename); + } + + void InitAfsApi(const std::string& fs_name, const std::string& fs_user, + const std::string& pass_wd, const std::string& conf); +#endif + private: static std::shared_ptr s_instance_; Dataset* dataset_; +#ifdef PADDLE_WITH_PSLIB + paddle::ps::AfsApiWrapper afs_handler_; +#endif std::unordered_map< uint64_t, std::vector>>> local_tables_; @@ -341,6 +359,7 @@ class PSGPUWrapper { int year_; int month_; int day_; + int use_afs_api_ = 0; std::vector mem_pools_; std::vector hbm_pools_; // in multi mfdim, one table need hbm diff --git a/paddle/fluid/pybind/ps_gpu_wrapper_py.cc b/paddle/fluid/pybind/ps_gpu_wrapper_py.cc index e8c338b3fd..fe1f27226b 100644 --- a/paddle/fluid/pybind/ps_gpu_wrapper_py.cc +++ b/paddle/fluid/pybind/ps_gpu_wrapper_py.cc @@ -56,6 +56,10 @@ void BindPSGPUWrapper(py::module* m) { py::call_guard()) .def("load_into_memory", &framework::PSGPUWrapper::LoadIntoMemory, py::call_guard()) +#ifdef PADDLE_WITH_PSLIB + .def("init_afs_api", &framework::PSGPUWrapper::InitAfsApi, + py::call_guard()) +#endif .def("finalize", &framework::PSGPUWrapper::Finalize, py::call_guard()); } // end PSGPUWrapper -- GitLab