未验证 提交 cadc4e6a 编写于 作者: T Thunderbrook 提交者: GitHub

[HeterPS] So Parser (#40750)

* So Parser

* add macro

* add macro

* slotrecord

* add macro

* code format
上级 2b53c68c
......@@ -18,6 +18,7 @@ limitations under the License. */
#endif
#include "paddle/fluid/framework/data_feed.h"
#include "paddle/fluid/framework/fleet/ps_gpu_wrapper.h"
#ifdef _LINUX
#include <stdio_ext.h>
#include <sys/mman.h>
......@@ -555,10 +556,13 @@ void InMemoryDataFeed<T>::LoadIntoMemory() {
template <typename T>
void InMemoryDataFeed<T>::LoadIntoMemoryFromSo() {
#ifdef _LINUX
#if (defined _LINUX) && (defined PADDLE_WITH_HETERPS) && \
(defined PADDLE_WITH_PSLIB)
VLOG(3) << "LoadIntoMemoryFromSo() begin, thread_id=" << thread_id_;
int buf_len = 1024 * 1024 * 10;
char* buf = (char*)malloc(buf_len + 10);
auto ps_gpu_ptr = PSGPUWrapper::GetInstance();
string::LineFileReader reader;
paddle::framework::CustomParser* parser =
global_dlmanager_pool().Load(so_parser_name_, slot_conf_);
......@@ -566,34 +570,34 @@ void InMemoryDataFeed<T>::LoadIntoMemoryFromSo() {
while (this->PickOneFile(&filename)) {
VLOG(3) << "PickOneFile, filename=" << filename
<< ", thread_id=" << thread_id_;
int err_no = 0;
this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_);
CHECK(this->fp_ != nullptr);
__fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER);
paddle::framework::ChannelWriter<T> writer(input_channel_);
T instance;
platform::Timer timeline;
timeline.Start();
while (1) {
if (!reader.getline(&*(fp_.get()))) {
break;
} else {
const char* str = reader.get();
ParseOneInstanceFromSo(str, &instance, parser);
if (ps_gpu_ptr->UseAfsApi()) {
auto afs_reader = ps_gpu_ptr->OpenReader(filename);
int read_len = 0;
char* cursor = buf;
int remain = 0;
while ((read_len = afs_reader->read(cursor, buf_len - remain)) > 0) {
std::vector<T> instances;
read_len += remain;
remain = ParseInstanceFromSo(read_len, buf, &instances, parser);
input_channel_->Write(std::move(instances));
instances = std::vector<T>();
if (remain) {
memmove(buf, buf + read_len - remain, remain);
}
cursor = buf + remain;
}
writer << std::move(instance);
instance = T();
} else {
VLOG(0) << "Should Call InitAfsApi First";
}
writer.Flush();
timeline.Pause();
VLOG(3) << "LoadIntoMemoryFromSo() read all lines, file=" << filename
<< ", cost time=" << timeline.ElapsedSec()
<< " seconds, thread_id=" << thread_id_;
}
free(buf);
VLOG(3) << "LoadIntoMemoryFromSo() end, thread_id=" << thread_id_;
#endif
}
......@@ -1088,10 +1092,11 @@ void MultiSlotInMemoryDataFeed::GetMsgFromLogKey(const std::string& log_key,
*rank = (uint32_t)strtoul(rank_str.c_str(), NULL, 16);
}
void MultiSlotInMemoryDataFeed::ParseOneInstanceFromSo(const char* str,
Record* instance,
CustomParser* parser) {
parser->ParseOneInstance(str, instance);
int MultiSlotInMemoryDataFeed::ParseInstanceFromSo(
int len, const char* str, std::vector<Record>* instances,
CustomParser* parser) {
// VLOG(0) << "parser: " << parser;
return parser->ParseInstance(len, str, instances);
}
bool MultiSlotInMemoryDataFeed::ParseOneInstanceFromPipe(Record* instance) {
......@@ -2078,7 +2083,8 @@ void SlotRecordInMemoryDataFeed::LoadIntoMemoryByLib(void) {
}
void SlotRecordInMemoryDataFeed::LoadIntoMemoryByFile(void) {
#ifdef _LINUX
#if (defined _LINUX) && (defined PADDLE_WITH_HETERPS) && \
(defined PADDLE_WITH_PSLIB)
paddle::framework::CustomParser* parser =
global_dlmanager_pool().Load(so_parser_name_, all_slots_info_);
CHECK(parser != nullptr);
......@@ -2111,21 +2117,31 @@ void SlotRecordInMemoryDataFeed::LoadIntoMemoryByFile(void) {
int lines = 0;
bool is_ok = true;
auto ps_gpu_ptr = PSGPUWrapper::GetInstance();
do {
int err_no = 0;
this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_);
CHECK(this->fp_ != nullptr);
__fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER);
is_ok = parser->ParseFileInstance(
[this](char* buf, int len) {
return fread(buf, sizeof(char), len, this->fp_.get());
},
pull_record_func, lines);
if (!is_ok) {
LOG(WARNING) << "parser error, filename=" << filename
<< ", lines=" << lines;
if (ps_gpu_ptr->UseAfsApi()) {
auto afs_reader = ps_gpu_ptr->OpenReader(filename);
is_ok = parser->ParseFileInstance(
[this, afs_reader](char* buf, int len) {
return afs_reader->read(buf, len);
},
pull_record_func, lines);
} else {
int err_no = 0;
this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_);
CHECK(this->fp_ != nullptr);
__fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER);
is_ok = parser->ParseFileInstance(
[this](char* buf, int len) {
return fread(buf, sizeof(char), len, this->fp_.get());
},
pull_record_func, lines);
if (!is_ok) {
LOG(WARNING) << "parser error, filename=" << filename
<< ", lines=" << lines;
}
}
} while (!is_ok);
timeline.Pause();
......
......@@ -388,8 +388,12 @@ class CustomParser {
CustomParser() {}
virtual ~CustomParser() {}
virtual void Init(const std::vector<SlotConf>& slots) = 0;
virtual bool Init(const std::vector<AllSlotInfo>& slots);
virtual bool Init(const std::vector<AllSlotInfo>& slots) = 0;
virtual void ParseOneInstance(const char* str, Record* instance) = 0;
virtual int ParseInstance(int len, const char* str,
std::vector<Record>* instances) {
return 0;
};
virtual bool ParseOneInstance(
const std::string& line,
std::function<void(std::vector<SlotRecord>&, int)>
......@@ -451,7 +455,7 @@ class DLManager {
handle.module = dlopen(name.c_str(), RTLD_NOW);
if (handle.module == nullptr) {
VLOG(0) << "Create so of " << name << " fail";
VLOG(0) << "Create so of " << name << " fail, " << dlerror();
return nullptr;
}
......@@ -730,6 +734,11 @@ class InMemoryDataFeed : public DataFeed {
virtual bool ParseOneInstanceFromPipe(T* instance) = 0;
virtual void ParseOneInstanceFromSo(const char* str, T* instance,
CustomParser* parser) {}
virtual int ParseInstanceFromSo(int len, const char* str,
std::vector<T>* instances,
CustomParser* parser) {
return 0;
}
virtual void PutToFeedVec(const std::vector<T>& ins_vec) = 0;
virtual void PutToFeedVec(const T* ins_vec, int num) = 0;
......@@ -1104,7 +1113,10 @@ class MultiSlotInMemoryDataFeed : public InMemoryDataFeed<Record> {
virtual bool ParseOneInstance(Record* instance);
virtual bool ParseOneInstanceFromPipe(Record* instance);
virtual void ParseOneInstanceFromSo(const char* str, Record* instance,
CustomParser* parser);
CustomParser* parser){};
virtual int ParseInstanceFromSo(int len, const char* str,
std::vector<Record>* instances,
CustomParser* parser);
virtual void PutToFeedVec(const std::vector<Record>& ins_vec);
virtual void GetMsgFromLogKey(const std::string& log_key, uint64_t* search_id,
uint32_t* cmatch, uint32_t* rank);
......
......@@ -39,7 +39,19 @@ namespace framework {
std::shared_ptr<PSGPUWrapper> PSGPUWrapper::s_instance_ = NULL;
bool PSGPUWrapper::is_initialized_ = false;
#ifdef PADDLE_WITH_PSLIB
void PSGPUWrapper::InitAfsApi(const std::string& fs_name,
const std::string& fs_user,
const std::string& pass_wd,
const std::string& conf) {
int ret = afs_handler_.init(fs_name.c_str(), fs_user.c_str(), pass_wd.c_str(),
conf.c_str());
if (ret != 0) {
LOG(ERROR) << "AFS Init Error";
}
use_afs_api_ = 1;
}
#endif
void PSGPUWrapper::PreBuildTask(std::shared_ptr<HeterContext> gpu_task) {
VLOG(3) << "PSGPUWrapper::BuildGPUPSTask begin";
platform::Timer timeline;
......
......@@ -45,6 +45,9 @@ limitations under the License. */
#ifdef PADDLE_WITH_PSCORE
#include "paddle/fluid/distributed/ps/service/communicator/communicator.h"
#endif
#ifdef PADDLE_WITH_PSLIB
#include "afs_api.h"
#endif
namespace paddle {
namespace framework {
......@@ -303,9 +306,24 @@ class PSGPUWrapper {
void ShowOneTable(int index) { HeterPs_->show_one_table(index); }
int UseAfsApi() { return use_afs_api_; }
#ifdef PADDLE_WITH_PSLIB
std::shared_ptr<paddle::ps::AfsReader> OpenReader(
const std::string& filename) {
return afs_handler_.open_reader(filename);
}
void InitAfsApi(const std::string& fs_name, const std::string& fs_user,
const std::string& pass_wd, const std::string& conf);
#endif
private:
static std::shared_ptr<PSGPUWrapper> s_instance_;
Dataset* dataset_;
#ifdef PADDLE_WITH_PSLIB
paddle::ps::AfsApiWrapper afs_handler_;
#endif
std::unordered_map<
uint64_t, std::vector<std::unordered_map<uint64_t, std::vector<float>>>>
local_tables_;
......@@ -341,6 +359,7 @@ class PSGPUWrapper {
int year_;
int month_;
int day_;
int use_afs_api_ = 0;
std::vector<MemoryPool*> mem_pools_;
std::vector<HBMMemoryPool*> hbm_pools_; // in multi mfdim, one table need hbm
......
......@@ -56,6 +56,10 @@ void BindPSGPUWrapper(py::module* m) {
py::call_guard<py::gil_scoped_release>())
.def("load_into_memory", &framework::PSGPUWrapper::LoadIntoMemory,
py::call_guard<py::gil_scoped_release>())
#ifdef PADDLE_WITH_PSLIB
.def("init_afs_api", &framework::PSGPUWrapper::InitAfsApi,
py::call_guard<py::gil_scoped_release>())
#endif
.def("finalize", &framework::PSGPUWrapper::Finalize,
py::call_guard<py::gil_scoped_release>());
} // end PSGPUWrapper
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册