未验证 提交 8464c04d 编写于 作者: P pangengzheng 提交者: GitHub

Support compile pslib v3 (#50166)

* fix feature_value.h and feature_value.cu to support pslib

* code style

* align DistPsArch pre-stable branch
---------
Co-authored-by: NSławomir Siwek <slawomir.siwek@intel.com>
Co-authored-by: NTomasz Socha <tomasz.socha@intel.com>
Co-authored-by: Nheliqi <1101791222@qq.com>
Co-authored-by: Nzqw_1997 <118182234+zhengqiwen1997@users.noreply.github.com>
Co-authored-by: Njameszhang <zhangxiaoci@baidu.com>
Co-authored-by: Nxiaoguoguo626807 <100397923+xiaoguoguo626807@users.noreply.github.com>
Co-authored-by: NFeiyu Chan <chenfeiyu@baidu.com>
Co-authored-by: NGGBond8488 <33050871+GGBond8488@users.noreply.github.com>
Co-authored-by: Nsprouteer <89541335+sprouteer@users.noreply.github.com>
Co-authored-by: Njakpiase <jakpia21@gmail.com>
Co-authored-by: NJiabin Yang <360788950@qq.com>
Co-authored-by: Nlimingshu <61349199+JamesLim-sy@users.noreply.github.com>
Co-authored-by: Nzhangbopd <1299246947@qq.com>
Co-authored-by: N张春乔 <83450930+Liyulingyue@users.noreply.github.com>
Co-authored-by: NLiYuRio <63526175+LiYuRio@users.noreply.github.com>
Co-authored-by: N姜永久 <34344716+yjjiang11@users.noreply.github.com>
Co-authored-by: NYuang Liu <liuyuang@baidu.com>
Co-authored-by: Njiangcheng <thisjiang@qq.com>
Co-authored-by: Nronnywang <ronny1996@163.com>
Co-authored-by: Nsneaxiy <32832641+sneaxiy@users.noreply.github.com>
Co-authored-by: Nhouj04 <35131887+houj04@users.noreply.github.com>
Co-authored-by: Nzhangbo9674 <82555433+zhangbo9674@users.noreply.github.com>
Co-authored-by: Ngem5 <117625383+linsheng011@users.noreply.github.com>
Co-authored-by: Nwanghuancoder <wanghuan29@baidu.com>
Co-authored-by: NRyan <44900829+DrRyanHuang@users.noreply.github.com>
Co-authored-by: NRuibiao Chen <chenruibiao@baidu.com>
Co-authored-by: Nengineer1109 <jialiang.wang@xdxct.com>
Co-authored-by: NRedContritio <RedContritio@qq.com>
Co-authored-by: Nmjxs <52824616+kk-2000@users.noreply.github.com>
Co-authored-by: NYiqun Liu <Xreki@users.noreply.github.com>
Co-authored-by: N张正海 <65210872+ccsuzzh@users.noreply.github.com>
Co-authored-by: NHongyuJia <jiahongyu@baidu.com>
Co-authored-by: Npangyoki <pangyoki@126.com>
Co-authored-by: NLoneRanger <836253168@qq.com>
Co-authored-by: NTeFeng Chen <ctfeng66@163.com>
Co-authored-by: NLeo Guo <58431564+ZibinGuo@users.noreply.github.com>
Co-authored-by: Nxiaoting <31891223+tink2123@users.noreply.github.com>
Co-authored-by: N201716010711 <87008376+201716010711@users.noreply.github.com>
Co-authored-by: Nwangxiaoning <71813629+wangxn12138@users.noreply.github.com>
Co-authored-by: NYuanle Liu <yuanlehome@163.com>
Co-authored-by: MarDino's avatarZZK <359521840@qq.com>
Co-authored-by: Nzhangkaihuo <zhangkaihuo@baidu.com>
Co-authored-by: NRoc <30228238+sljlp@users.noreply.github.com>
Co-authored-by: NPuQing <me@puqing.work>
Co-authored-by: NZhang Jun <ewalker@live.cn>
Co-authored-by: NCharles-hit <56987902+Charles-hit@users.noreply.github.com>
Co-authored-by: Nniuliling123 <51102941+niuliling123@users.noreply.github.com>
Co-authored-by: Nwenbin <wang3323032@qq.com>
Co-authored-by: Nwangshengxiang <121413869+shengxiangwang@users.noreply.github.com>
Co-authored-by: NBo Zhang <105368690+zhangbopd@users.noreply.github.com>
Co-authored-by: NAurelius84 <zhangliujie@baidu.com>
Co-authored-by: Nzxcd <228587199@qq.com>
Co-authored-by: Nzhoutianzi666 <39978853+zhoutianzi666@users.noreply.github.com>
Co-authored-by: Ngouzil <66515297+gouzil@users.noreply.github.com>
Co-authored-by: Nzhangyikun02 <48021248+zhangyk0314@users.noreply.github.com>
Co-authored-by: NHui Zhang <zhtclz@foxmail.com>
Co-authored-by: NWang Bojun <105858416+wwbitejotunn@users.noreply.github.com>
Co-authored-by: NGuanghua Yu <742925032@qq.com>
Co-authored-by: NYUNSHEN XIE <1084314248@qq.com>
Co-authored-by: NZhong Hui <zhonghui.net@gmail.com>
Co-authored-by: Nrisemeup1 <62429225+risemeup1@users.noreply.github.com>
Co-authored-by: Nliuruyan <44316842+liuruyan@users.noreply.github.com>
Co-authored-by: NLeo Chen <chenqiuliang@baidu.com>
Co-authored-by: NYuanRisheng <yuanrisheng@baidu.com>
Co-authored-by: Nwuhuachaocoding <77733235+wuhuachaocoding@users.noreply.github.com>
Co-authored-by: NCcc <52520497+juncaipeng@users.noreply.github.com>
上级 35d7d1f0
......@@ -23,7 +23,10 @@ set(LEVELDB_LIBRARIES
"${LEVELDB_INSTALL_DIR}/lib/libleveldb.a"
CACHE FILEPATH "leveldb library." FORCE)
include_directories(${LEVELDB_INCLUDE_DIR})
set(LEVELDN_CXXFLAGS "-fPIC")
if(WITH_HETERPS AND WITH_PSLIB)
set(LEVELDN_CXXFLAGS "${LEVELDN_CXXFLAGS} -D_GLIBCXX_USE_CXX11_ABI=0")
endif()
ExternalProject_Add(
extern_leveldb
${EXTERNAL_PROJECT_LOG_ARGS}
......@@ -32,7 +35,8 @@ ExternalProject_Add(
GIT_TAG v1.18
UPDATE_COMMAND ""
CONFIGURE_COMMAND ""
BUILD_COMMAND CXXFLAGS=-fPIC make -j ${NUM_OF_PROCESSOR} libleveldb.a
BUILD_COMMAND export "CXXFLAGS=${LEVELDN_CXXFLAGS}" && make -j
${NUM_OF_PROCESSOR} libleveldb.a
INSTALL_COMMAND
mkdir -p ${LEVELDB_INSTALL_DIR}/lib/ && cp
${LEVELDB_PREFIX_DIR}/src/extern_leveldb/libleveldb.a ${LEVELDB_LIBRARIES}
......@@ -40,7 +44,6 @@ ExternalProject_Add(
${LEVELDB_INSTALL_DIR}/
BUILD_IN_SOURCE 1
BUILD_BYPRODUCTS ${LEVELDB_LIBRARIES})
add_dependencies(extern_leveldb snappy)
add_library(leveldb STATIC IMPORTED GLOBAL)
......
......@@ -6,7 +6,7 @@ proto_library(interceptor_message_proto SRCS interceptor_message.proto)
if(WITH_ARM_BRPC)
set(BRPC_DEPS arm_brpc snappy gflags glog)
elseif(WITH_DISTRIBUTE)
elseif(WITH_DISTRIBUTE AND NOT WITH_PSLIB)
set(BRPC_DEPS
brpc
ssl
......
......@@ -73,7 +73,7 @@ bool MessageBus::IsInit() const { return is_init_; }
MessageBus::~MessageBus() {
VLOG(3) << "Message bus releases resource.";
#if defined(PADDLE_WITH_DISTRIBUTE)
#if defined(PADDLE_WITH_DISTRIBUTE) && !defined(PADDLE_WITH_PSLIB)
server_.Stop(1000);
server_.Join();
#endif
......@@ -94,7 +94,7 @@ bool MessageBus::Send(int64_t dst_rank,
true,
platform::errors::PreconditionNotMet(
"Using message bus since it has not been initialized."));
#if defined(PADDLE_WITH_DISTRIBUTE)
#if defined(PADDLE_WITH_DISTRIBUTE) && !defined(PADDLE_WITH_PSLIB)
int retry_time = 0; // message bus will retry sending for 10 times
while (retry_time < 10) {
++retry_time;
......@@ -179,7 +179,7 @@ void MessageBus::ListenPort() {
LOG(INFO) << "No need listen to port since training on single card.";
return;
}
#if defined(PADDLE_WITH_DISTRIBUTE)
#if defined(PADDLE_WITH_DISTRIBUTE) && !defined(PADDLE_WITH_PSLIB)
// function keep listen the port and handle the message
PADDLE_ENFORCE_EQ(
server_.AddService(&message_service_, brpc::SERVER_DOESNT_OWN_SERVICE),
......@@ -209,7 +209,7 @@ void MessageBus::ListenPort() {
#endif
}
#if defined(PADDLE_WITH_DISTRIBUTE)
#if defined(PADDLE_WITH_DISTRIBUTE) && !defined(PADDLE_WITH_PSLIB)
bool MessageBus::SendInterRank(int64_t dst_rank,
const InterceptorMessage& interceptor_message) {
const auto& dst_addr = GetAddr(dst_rank);
......
......@@ -20,7 +20,7 @@
#include <thread>
#include <unordered_map>
#if defined(PADDLE_WITH_DISTRIBUTE)
#if defined(PADDLE_WITH_DISTRIBUTE) && !defined(PADDLE_WITH_PSLIB)
#include "brpc/channel.h"
#include "brpc/server.h"
#include "paddle/fluid/distributed/fleet_executor/message_service.h"
......@@ -63,7 +63,7 @@ class MessageBus final {
const std::string& GetAddr(int64_t rank) const;
#if defined(PADDLE_WITH_DISTRIBUTE)
#if defined(PADDLE_WITH_DISTRIBUTE) && !defined(PADDLE_WITH_PSLIB)
// send the message inter rank (dst is different rank with src)
bool SendInterRank(int64_t dst_rank,
const InterceptorMessage& interceptor_message);
......@@ -79,7 +79,7 @@ class MessageBus final {
// the ip needs to be listened
std::string addr_;
#if defined(PADDLE_WITH_DISTRIBUTE)
#if defined(PADDLE_WITH_DISTRIBUTE) && !defined(PADDLE_WITH_PSLIB)
MessageServiceImpl message_service_;
// brpc server
brpc::Server server_;
......
......@@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#if defined(PADDLE_WITH_DISTRIBUTE)
#if defined(PADDLE_WITH_DISTRIBUTE) && !defined(PADDLE_WITH_PSLIB)
#include "paddle/fluid/distributed/fleet_executor/message_service.h"
#include "brpc/server.h"
......
......@@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#if defined(PADDLE_WITH_DISTRIBUTE)
#if defined(PADDLE_WITH_DISTRIBUTE) && !defined(PADDLE_WITH_PSLIB)
#pragma once
#include "brpc/server.h"
......
......@@ -51,7 +51,9 @@ cc_test_old(
scope
device_context)
if(WITH_DISTRIBUTE AND NOT (WITH_ASCEND OR WITH_ASCEND_CL))
if(WITH_DISTRIBUTE
AND NOT WITH_PSLIB
AND NOT (WITH_ASCEND OR WITH_ASCEND_CL))
set_source_files_properties(
interceptor_ping_pong_with_brpc_test.cc
PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
......
......@@ -18,9 +18,6 @@ limitations under the License. */
#include "paddle/fluid/distributed/ps/service/communicator/communicator.h"
#include "paddle/fluid/distributed/ps/table/table.h"
#if defined PADDLE_WITH_HETERPS && defined PADDLE_WITH_PSCORE
#include "paddle/fluid/framework/fleet/ps_gpu_wrapper.h"
#endif
namespace paddle {
namespace distributed {
......@@ -131,13 +128,6 @@ void FleetWrapper::InitWorker(const std::string& dist_desc,
worker_ptr_ = std::shared_ptr<paddle::distributed::PSClient>(
paddle::distributed::PSClientFactory::Create(ps_param));
worker_ptr_->Configure(ps_param, dense_pull_regions, ps_env_, index);
#if defined PADDLE_WITH_HETERPS && defined PADDLE_WITH_PSCORE
VLOG(3) << "FleetWrapper::InitWorker InitializeGPUServer";
auto* accessor = worker_ptr_->GetTableAccessor(0);
auto ps_gpu_wrapper = paddle::framework::PSGPUWrapper::GetInstance();
ps_gpu_wrapper->InitializeGPUServer(ps_param);
ps_gpu_wrapper->SetTableAccessor(accessor);
#endif
}
} else {
VLOG(3) << "Client can be initialized only once";
......
......@@ -286,6 +286,12 @@ class FleetWrapper {
int to_client_id,
const std::string& msg);
std::string GetDistDesc() const {
CHECK(is_initialized_ == true)
<< "fleetwrapper should be initialized first!!!";
return dist_desc_;
}
// FleetWrapper singleton
static std::shared_ptr<FleetWrapper> GetInstance() {
if (NULL == s_instance_) {
......@@ -321,6 +327,7 @@ class FleetWrapper {
private:
static std::shared_ptr<FleetWrapper> s_instance_;
std::string dist_desc_;
paddle::distributed::PaddlePSEnvironment ps_env_;
size_t GetAbsoluteSum(size_t start,
size_t end,
......
......@@ -23,9 +23,11 @@ limitations under the License. */
#include <thrust/shuffle.h>
#include <sstream>
#include "cub/cub.cuh"
#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH)
#include "paddle/fluid/framework/fleet/heter_ps/gpu_graph_node.h"
#include "paddle/fluid/framework/fleet/heter_ps/gpu_graph_utils.h"
#include "paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h"
#endif
#include "paddle/fluid/framework/fleet/heter_ps/hashtable.h"
#include "paddle/fluid/framework/fleet/ps_gpu_wrapper.h"
#include "paddle/fluid/framework/io/fs.h"
......@@ -435,6 +437,7 @@ __global__ void CopyDuplicateKeys(int64_t *dist_tensor,
}
}
#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH)
int GraphDataGenerator::AcquireInstance(BufState *state) {
if (state->GetNextStep()) {
DEBUG_STATE(state);
......@@ -2938,6 +2941,7 @@ void GraphDataGenerator::SetConfig(
infer_node_type_ = graph_config.infer_node_type();
}
}
#endif
void GraphDataGenerator::DumpWalkPath(std::string dump_path, size_t dump_rate) {
#ifdef _LINUX
......
......@@ -78,6 +78,7 @@ void FleetWrapper::InitWorker(const std::string& dist_desc,
const_cast<uint64_t*>(host_sign_list.data()),
node_num,
index);
dist_desc_ = dist_desc;
is_initialized_ = true;
} else {
VLOG(3) << "Worker can be initialized only once";
......@@ -1462,9 +1463,12 @@ void FleetWrapper::SetDate(const uint64_t table_id, const std::string& date) {
#endif
}
void FleetWrapper::PrintTableStat(const uint64_t table_id) {
void FleetWrapper::PrintTableStat(uint64_t table_id,
uint32_t pass_id,
size_t threshold) {
#ifdef PADDLE_WITH_PSLIB
auto ret = pslib_ptr_->_worker_ptr->print_table_stat(table_id);
auto ret =
pslib_ptr_->_worker_ptr->print_table_stat(table_id, pass_id, threshold);
ret.wait();
int32_t err_code = ret.get();
if (err_code == -1) {
......
......@@ -307,7 +307,7 @@ class FleetWrapper {
std::vector<std::string> table_var_list,
bool load_combine);
void PrintTableStat(const uint64_t table_id);
void PrintTableStat(uint64_t table_id, uint32_t pass_id, size_t threshold);
void SetFileNumOneShard(const uint64_t table_id, int file_num);
// mode = 0, load all feature
// mode = 1, load delta feature, which means load diff
......@@ -381,6 +381,13 @@ class FleetWrapper {
void Confirm();
// revert all the updated params in the current pass
void Revert();
std::string GetDistDesc() const {
CHECK(is_initialized_ == true)
<< "fleetwrapper should be initialized first!!!";
return dist_desc_;
}
// FleetWrapper singleton
static std::shared_ptr<FleetWrapper> GetInstance() {
{
......@@ -402,6 +409,7 @@ class FleetWrapper {
private:
static std::shared_ptr<FleetWrapper> s_instance_;
std::string dist_desc_;
static std::mutex ins_mutex;
#ifdef PADDLE_WITH_PSLIB
std::map<uint64_t, std::vector<paddle::ps::Region>> _regions;
......
......@@ -496,6 +496,10 @@ void AccessorWrapper<GPUAccessor>::CopyForPushDedupImpl(
template class AccessorWrapper<CommonFeatureValueAccessor>;
#endif
#ifdef PADDLE_WITH_PSLIB
template class AccessorWrapper<CommonFeatureValueAccessor>;
#endif
} // namespace framework
} // namespace paddle
#endif
......@@ -28,6 +28,11 @@ limitations under the License. */
#include "paddle/fluid/distributed/ps/table/depends/feature_value.h"
#endif
#ifdef PADDLE_WITH_PSLIB
#include "downpour_accessor.h" // NOLINT
#include "pslib.h" // NOLINT
#endif
namespace paddle {
namespace framework {
#define MF_DIM 8
......@@ -272,13 +277,13 @@ class CommonFeatureValueAccessor {
return 0;
}
#ifdef PADDLE_WITH_PSCORE
// // build阶段从cpu_val赋值给gpu_val
__host__ void BuildFill(
float* gpu_val,
void* cpu,
paddle::distributed::ValueAccessor* cpu_table_accessor,
int mf_dim) {
#ifdef PADDLE_WITH_PSCORE
paddle::distributed::CtrDymfAccessor* cpu_accessor =
dynamic_cast<paddle::distributed::CtrDymfAccessor*>(cpu_table_accessor);
paddle::distributed::FixedFeatureValue* cpu_ptr =
......@@ -324,14 +329,76 @@ class CommonFeatureValueAccessor {
gpu_val[x] = 0;
}
}
}
#endif
#ifdef PADDLE_WITH_PSLIB
// build阶段从cpu_val赋值给gpu_val
template <typename ShowClickType>
__host__ void BuildFill(float* gpu_val,
void* _cpu_val,
::paddle::ps::ValueAccessor* _cpu_accessor,
int mf_dim) {
auto* cpu_accessor =
dynamic_cast<::paddle::ps::DownpourCtrDymfTplAccessor<ShowClickType>*>(
_cpu_accessor);
auto* cpu_val =
reinterpret_cast<::paddle::ps::DownpourFixedFeatureValue*>(_cpu_val);
float* ptr_val = cpu_val->data();
size_t cpu_dim = cpu_val->size();
gpu_val[common_feature_value.DeltaScoreIndex()] =
ptr_val[cpu_accessor->get_delta_score_index()];
gpu_val[common_feature_value.ShowIndex()] = cpu_accessor->get_show(ptr_val);
gpu_val[common_feature_value.ClickIndex()] =
cpu_accessor->get_click(ptr_val);
gpu_val[common_feature_value.SlotIndex()] =
ptr_val[cpu_accessor->get_slot_index()];
// lr
gpu_val[common_feature_value.EmbedWIndex()] =
ptr_val[cpu_accessor->get_embed_w_index()];
// cpu_ptr
*(reinterpret_cast<uint64_t*>(
gpu_val + common_feature_value.CpuPtrIndex())) = (uint64_t)(cpu_val);
// lr_g2sum
// for dymf && adagrad, embed_dim = 1
for (int i = 0; i < common_feature_value.EmbedDim(); i++) {
gpu_val[common_feature_value.EmbedG2SumIndex() + i] =
ptr_val[cpu_accessor->get_embed_g2sum_index() + i];
}
ptr_val[cpu_accessor->get_mf_dim_index()] = static_cast<float>(mf_dim);
gpu_val[common_feature_value.MfDimIndex()] = static_cast<float>(mf_dim);
constexpr int n = 2 * (sizeof(ShowClickType) / sizeof(float) - 1);
if (cpu_dim > 8 + n) {
gpu_val[common_feature_value.MfSizeIndex()] =
common_feature_value.MFSize(mf_dim) / sizeof(float);
for (int x = 0; x < static_cast<int>(common_feature_value.MFSize(mf_dim) /
sizeof(float));
x++) {
gpu_val[common_feature_value.EmbedxG2SumIndex() + x] =
ptr_val[8 + n + x];
}
} else {
gpu_val[common_feature_value.MfSizeIndex()] = 0;
for (int i = 0; i < mf_dim + common_feature_value.EmbedXDim(); i++) {
gpu_val[common_feature_value.EmbedxG2SumIndex() + i] = 0;
}
}
}
#endif
#ifdef PADDLE_WITH_PSCORE
// dump_to_cpu阶段从gpu_val赋值给cpu_val
__host__ void DumpFill(float* gpu_val,
paddle::distributed::ValueAccessor* cpu_table_accessor,
int mf_dim) {
#ifdef PADDLE_WITH_PSCORE
paddle::distributed::CtrDymfAccessor* cpu_accessor =
dynamic_cast<paddle::distributed::CtrDymfAccessor*>(cpu_table_accessor);
......@@ -371,8 +438,64 @@ class CommonFeatureValueAccessor {
gpu_val[common_feature_value.EmbedxG2SumIndex() + x];
}
}
}
#endif
#ifdef PADDLE_WITH_PSLIB
// dump_to_cpu阶段从gpu_val赋值给cpu_val
// gpu_val is firstly copied to mem
// so gpu_val is in mem, not in hbm
template <typename ShowClickType>
__host__ void DumpFill(float* gpu_val,
::paddle::ps::ValueAccessor* _cpu_accessor,
int mf_dim) {
auto* cpu_accessor =
dynamic_cast<::paddle::ps::DownpourCtrDymfTplAccessor<ShowClickType>*>(
_cpu_accessor);
uint64_t cpu_addr = *reinterpret_cast<uint64_t*>(
gpu_val + common_feature_value.CpuPtrIndex());
auto* downpour_value = (::paddle::ps::DownpourFixedFeatureValue*)cpu_addr;
int downpour_value_size = downpour_value->size();
constexpr int n = 2 * (sizeof(ShowClickType) / sizeof(float) - 1);
if (static_cast<int>(gpu_val[common_feature_value.MfSizeIndex()]) > 0 &&
downpour_value_size == 8 + n) {
int mf_size =
common_feature_value.MFSize(mf_dim) /
sizeof(
float); // mf_size = gpu_val[common_feature_value.MfSizeIndex()];
downpour_value->resize(downpour_value_size + mf_size);
}
float* cpu_val = downpour_value->data();
cpu_val[cpu_accessor->get_delta_score_index()] =
gpu_val[common_feature_value.DeltaScoreIndex()];
*reinterpret_cast<ShowClickType*>(cpu_val +
cpu_accessor->get_show_index()) =
(ShowClickType)gpu_val[common_feature_value.ShowIndex()];
*reinterpret_cast<ShowClickType*>(cpu_val +
cpu_accessor->get_click_index()) =
(ShowClickType)gpu_val[common_feature_value.ClickIndex()];
cpu_val[cpu_accessor->get_embed_w_index()] =
gpu_val[common_feature_value.EmbedWIndex()];
cpu_val[cpu_accessor->get_slot_index()] =
gpu_val[common_feature_value.SlotIndex()];
// for dymf && adagrad, embed_dim = 1
for (int i = 0; i < common_feature_value.EmbedDim(); i++) {
cpu_val[cpu_accessor->get_embed_g2sum_index() + i] =
gpu_val[common_feature_value.EmbedG2SumIndex() + i];
}
if (static_cast<int>(gpu_val[common_feature_value.MfSizeIndex()]) > 0) {
for (int x = 0; x < static_cast<int>(common_feature_value.MFSize(mf_dim) /
sizeof(float));
x++) {
cpu_val[x + 8 + n] =
gpu_val[common_feature_value.EmbedxG2SumIndex() + x];
}
}
}
#endif
// dy_mf_fill_dvals_kernel 阶段 gpukernel
// 中从src_val赋值给dest_val
......@@ -667,6 +790,7 @@ class VirtualAccessor {
virtual size_t GetPullValueSize(int& mf_dim) = 0; // NOLINT
#ifdef PADDLE_WITH_PSCORE
virtual void BuildFill(void* gpu_val,
void* cpu_val,
paddle::distributed::ValueAccessor* cpu_table_accessor,
......@@ -675,6 +799,22 @@ class VirtualAccessor {
virtual void DumpFill(float* gpu_val,
paddle::distributed::ValueAccessor* cpu_table_accessor,
int mf_dim) = 0;
#endif
#ifdef PADDLE_WITH_PSLIB
virtual void BuildFill(
float* gpu_val,
void* cpu_val,
paddle::ps::ValueAccessor* cpu_accessor,
int mf_dim,
const std::string& accessor_type = "DownpourCtrDymfAccessor") = 0;
virtual void DumpFill(
float* gpu_val,
::paddle::ps::ValueAccessor* cpu_accessor,
int mf_dim,
const std::string& accessor_type = "DownpourCtrDymfAccessor") = 0;
#endif
virtual void CopyForPull(const paddle::platform::Place& place,
uint64_t** gpu_keys,
......@@ -771,6 +911,7 @@ class AccessorWrapper : public VirtualAccessor {
GPUAccessor* AccessorPtr() { return &gpu_accessor_; }
#ifdef PADDLE_WITH_PSCORE
virtual void BuildFill(void* gpu_val,
void* cpu_val,
paddle::distributed::ValueAccessor* cpu_table_accessor,
......@@ -784,6 +925,36 @@ class AccessorWrapper : public VirtualAccessor {
int mf_dim) {
gpu_accessor_.DumpFill(gpu_val, cpu_table_accessor, mf_dim);
}
#endif
#ifdef PADDLE_WITH_PSLIB
virtual void BuildFill(
float* gpu_val,
void* cpu_val,
paddle::ps::ValueAccessor* cpu_accessor,
int mf_dim,
const std::string& accessor_type = "DownpourCtrDymfAccessor") {
if (accessor_type == "DownpourCtrDymfAccessor") {
gpu_accessor_.template BuildFill<float>(
gpu_val, cpu_val, cpu_accessor, mf_dim);
} else if (accessor_type == "DownpourCtrDoubleDymfAccessor") {
gpu_accessor_.template BuildFill<double>(
gpu_val, cpu_val, cpu_accessor, mf_dim);
}
}
virtual void DumpFill(
float* gpu_val,
paddle::ps::ValueAccessor* cpu_accessor,
int mf_dim,
const std::string& accessor_type = "DownpourCtrDymfAccessor") {
if (accessor_type == "DownpourCtrDymfAccessor") {
gpu_accessor_.template DumpFill<float>(gpu_val, cpu_accessor, mf_dim);
} else if (accessor_type == "DownpourCtrDoubleDymfAccessor") {
gpu_accessor_.template DumpFill<double>(gpu_val, cpu_accessor, mf_dim);
}
}
#endif
virtual void CopyForPull(const paddle::platform::Place& place,
uint64_t** gpu_keys,
......
......@@ -370,56 +370,6 @@ template <typename KeyType, typename ValType>
template <typename StreamType>
void HashTable<KeyType, ValType>::dump_to_cpu(int devid, StreamType stream) {
container_->prefetch(cudaCpuDeviceId, stream);
std::vector<std::thread> threads;
size_t num = container_->size();
KeyType unuse_key = std::numeric_limits<KeyType>::max();
thrust::pair<KeyType, ValType>* kv = container_->data();
int thread_num = 8;
int len_per_thread = num / thread_num;
int remain = num % thread_num;
int begin = 0;
auto dump_func = [unuse_key, kv](int left, int right) {
for (int i = left; i < right; i++) {
if (kv[i].first == unuse_key) {
continue;
}
ValType& gpu_val = kv[i].second;
#ifdef PADDLE_WITH_PSLIB
auto* downpour_value =
(paddle::ps::DownpourFixedFeatureValue*)(gpu_val.cpu_ptr);
int downpour_value_size = downpour_value->size();
if (gpu_val.mf_size > 0 && downpour_value_size == 7) {
downpour_value->resize(gpu_val.mf_size + downpour_value_size);
}
float* cpu_val = downpour_value->data();
// cpu_val[0] = 0;
cpu_val[1] = gpu_val.delta_score;
cpu_val[2] = gpu_val.show;
cpu_val[3] = gpu_val.clk;
cpu_val[4] = gpu_val.lr;
cpu_val[5] = gpu_val.lr_g2sum;
cpu_val[6] = gpu_val.slot;
if (gpu_val.mf_size > 0) {
for (int x = 0; x < gpu_val.mf_size; x++) {
cpu_val[x + 7] = gpu_val.mf[x];
}
}
#endif
}
};
for (int i = 0; i < thread_num; i++) {
threads.push_back(std::thread(
dump_func, begin, begin + len_per_thread + (i < remain ? 1 : 0)));
begin += len_per_thread + (i < remain ? 1 : 0);
}
for (std::thread& t : threads) {
t.join();
}
// container_->prefetch(devid, stream);
}
template <typename KeyType, typename ValType>
......
......@@ -35,7 +35,9 @@ limitations under the License. */
#include "paddle/fluid/framework/data_set.h"
#include "paddle/fluid/framework/fleet/heter_ps/gpu_graph_utils.h"
#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH)
#include "paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h"
#endif
#include "paddle/fluid/platform/timer.h"
#if defined(PADDLE_WITH_PSCORE)
#include "paddle/fluid/distributed/ps/table/depends/feature_value.h"
......@@ -196,12 +198,17 @@ void PSGPUWrapper::add_key_to_gputask(std::shared_ptr<HeterContext> gpu_task) {
VLOG(1) << "GpuPs task add keys cost " << timeline.ElapsedSec()
<< " seconds.";
timeline.Start();
#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH)
size_t slot_num = static_cast<size_t>(slot_num_for_pull_feature_);
// no slot_fea mode and whole_hbm mode, only keep one unique_sort action
if (slot_num > 0 && FLAGS_gpugraph_storage_mode !=
paddle::framework::GpuGraphStorageMode::WHOLE_HBM) {
gpu_task->UniqueKeys();
}
#endif
#ifdef PADDLE_WITH_PSLIB
gpu_task->UniqueKeys();
#endif
timeline.Pause();
VLOG(1) << "GpuPs task unique cost " << timeline.ElapsedSec() << " seconds.";
}
......@@ -364,8 +371,6 @@ void PSGPUWrapper::add_slot_feature(std::shared_ptr<HeterContext> gpu_task) {
// 8卡数据分片
size_t device_num = heter_devices_.size();
std::vector<std::thread> threads;
size_t slot_num = static_cast<size_t>(
slot_num_for_pull_feature_); // node slot 9008 in slot_vector
auto& local_dim_keys = gpu_task->feature_dim_keys_; // [shard_num, 0, keys]]
double divide_nodeid_cost = 0;
double get_feature_id_cost = 0;
......@@ -431,15 +436,19 @@ void PSGPUWrapper::add_slot_feature(std::shared_ptr<HeterContext> gpu_task) {
threads.clear();
time_stage.Pause();
divide_nodeid_cost = time_stage.ElapsedSec();
#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH)
gpu_task->sub_graph_feas =
reinterpret_cast<void*>(new std::vector<GpuPsCommGraphFea>);
std::vector<GpuPsCommGraphFea>& sub_graph_feas =
*((std::vector<GpuPsCommGraphFea>*)gpu_task->sub_graph_feas);
#endif
std::vector<std::vector<uint64_t>> feature_ids(device_num);
std::vector<uint64_t*> feature_list(device_num);
std::vector<size_t> feature_list_size(device_num);
#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH)
size_t batch = 40000;
size_t slot_num = static_cast<size_t>(
slot_num_for_pull_feature_); // node slot 9008 in slot_vector
time_stage.Start();
if (FLAGS_gpugraph_storage_mode ==
paddle::framework::GpuGraphStorageMode::MEM_EMB_AND_GPU_GRAPH) {
......@@ -535,6 +544,7 @@ void PSGPUWrapper::add_slot_feature(std::shared_ptr<HeterContext> gpu_task) {
}
time_stage.Pause();
get_feature_id_cost = time_stage.ElapsedSec();
#endif
size_t feature_num = 0;
for (size_t i = 0; i < device_num; i++) {
feature_num += feature_list_size[i];
......@@ -650,12 +660,16 @@ void PSGPUWrapper::add_slot_feature(std::shared_ptr<HeterContext> gpu_task) {
void PSGPUWrapper::BuildPull(std::shared_ptr<HeterContext> gpu_task) {
platform::Timer timeline;
#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH)
if (slot_num_for_pull_feature_ > 0 &&
FLAGS_gpugraph_storage_mode !=
paddle::framework::GpuGraphStorageMode::WHOLE_HBM) {
add_slot_feature(gpu_task);
}
#endif
#ifdef PADDLE_WITH_PSLIB
add_slot_feature(gpu_task);
#endif
resize_gputask(gpu_task);
platform::Timer time_stage;
......@@ -680,12 +694,6 @@ void PSGPUWrapper::BuildPull(std::shared_ptr<HeterContext> gpu_task) {
// auto& device_mutex = gpu_task->mutex_;
std::vector<std::thread> threads(thread_keys_shard_num_);
#ifdef PADDLE_WITH_PSLIB
auto fleet_ptr = FleetWrapper::GetInstance();
#endif
#ifdef PADDLE_WITH_PSCORE
auto fleet_ptr = paddle::distributed::FleetWrapper::GetInstance();
#endif
#if (defined PADDLE_WITH_PSLIB) && (defined PADDLE_WITH_HETERPS)
// get day_id: day nums from 1970
......@@ -696,100 +704,100 @@ void PSGPUWrapper::BuildPull(std::shared_ptr<HeterContext> gpu_task) {
b.tm_min = b.tm_hour = b.tm_sec = 0;
std::time_t seconds_from_1970 = std::mktime(&b);
int day_id = seconds_from_1970 / 86400;
fleet_ptr->pslib_ptr_->_worker_ptr->set_day_id(table_id_, day_id);
fleet_ptr_->pslib_ptr_->_worker_ptr->set_day_id(table_id_, day_id);
#endif
timeline.Start();
auto ptl_dynamic_mf_func =
[this, &local_dim_keys, &local_dim_ptr, &fleet_ptr, &gpu_task](int i,
int j) {
size_t key_size = local_dim_keys[i][j].size();
int32_t status = -1;
int32_t cnt = 0;
auto ptl_dynamic_mf_func = [this, &local_dim_keys, &local_dim_ptr, &gpu_task](
int i, int j) {
size_t key_size = local_dim_keys[i][j].size();
int32_t status = -1;
int32_t cnt = 0;
#ifdef PADDLE_WITH_PSLIB
while (true) {
auto tt = fleet_ptr->pslib_ptr_->_worker_ptr->pull_sparse_ptr(
i,
reinterpret_cast<char**>(local_dim_ptr[i][j].data()),
this->table_id_,
local_dim_keys[i][j].data(),
key_size);
bool flag = true;
tt.wait();
try {
status = tt.get();
} catch (const std::future_error& e) {
VLOG(0) << "Caught a future_error with code" << e.code()
<< ", Message:" << e.what();
}
if (status != 0) {
VLOG(0) << "fleet pull sparse failed, status[" << status << "]";
sleep(sleep_seconds_before_fail_exit_);
flag = false;
cnt++;
}
if (cnt > 3) {
VLOG(0) << "fleet pull sparse failed, retry 3 times";
exit(-1);
}
while (true) {
auto tt = fleet_ptr_->pslib_ptr_->_worker_ptr->pull_sparse_ptr(
i,
reinterpret_cast<char**>(local_dim_ptr[i][j].data()),
this->table_id_,
local_dim_keys[i][j].data(),
key_size,
gpu_task->pass_id_);
bool flag = true;
tt.wait();
try {
status = tt.get();
} catch (const std::future_error& e) {
VLOG(0) << "Caught a future_error with code" << e.code()
<< ", Message:" << e.what();
}
if (status != 0) {
VLOG(0) << "fleet pull sparse failed, status[" << status << "]";
sleep(sleep_seconds_before_fail_exit_);
flag = false;
cnt++;
}
if (cnt > 3) {
VLOG(0) << "fleet pull sparse failed, retry 3 times";
exit(-1);
}
if (flag) {
break;
}
}
if (flag) {
break;
}
}
#endif
#ifdef PADDLE_WITH_PSCORE
while (true) {
auto tt = fleet_ptr->worker_ptr_->PullSparsePtr(
i,
reinterpret_cast<char**>(local_dim_ptr[i][j].data()),
this->table_id_,
local_dim_keys[i][j].data(),
key_size,
gpu_task->pass_id_,
j);
bool flag = true;
tt.wait();
try {
status = tt.get();
} catch (const std::future_error& e) {
VLOG(0) << "Caught a future_error with code" << e.code()
<< ", Message:" << e.what();
}
if (status != 0) {
VLOG(0) << "fleet pull sparse failed, status[" << status << "]";
sleep(sleep_seconds_before_fail_exit_);
flag = false;
cnt++;
}
if (cnt > 3) {
VLOG(0) << "fleet pull sparse failed, retry 3 times";
exit(-1);
}
while (true) {
auto tt = fleet_ptr_->worker_ptr_->PullSparsePtr(
i,
reinterpret_cast<char**>(local_dim_ptr[i][j].data()),
this->table_id_,
local_dim_keys[i][j].data(),
key_size,
gpu_task->pass_id_,
j);
bool flag = true;
tt.wait();
try {
status = tt.get();
} catch (const std::future_error& e) {
VLOG(0) << "Caught a future_error with code" << e.code()
<< ", Message:" << e.what();
}
if (status != 0) {
VLOG(0) << "fleet pull sparse failed, status[" << status << "]";
sleep(sleep_seconds_before_fail_exit_);
flag = false;
cnt++;
}
if (cnt > 3) {
VLOG(0) << "fleet pull sparse failed, retry 3 times";
exit(-1);
}
if (flag) {
break;
}
}
if (flag) {
break;
}
}
#endif
if (status != 0) {
LOG(ERROR) << "fleet pull sparse failed, status[" << status << "]";
sleep(300);
exit(-1);
} else {
VLOG(1) << "FleetWrapper Pull sparse to local done with table size: "
<< local_dim_keys[i][j].size();
}
if (multi_node_) {
// filter rank data
FilterPull(gpu_task, i, j);
}
};
if (status != 0) {
LOG(ERROR) << "fleet pull sparse failed, status[" << status << "]";
sleep(300);
exit(-1);
} else {
VLOG(1) << "FleetWrapper Pull sparse to local done with table size: "
<< local_dim_keys[i][j].size();
}
if (multi_node_) {
// filter rank data
FilterPull(gpu_task, i, j);
}
};
threads.resize(thread_keys_shard_num_ * multi_mf_dim_);
......@@ -838,7 +846,7 @@ void PSGPUWrapper::MergePull(std::shared_ptr<HeterContext> gpu_task) {
if (!multi_node_) {
return;
}
#ifdef PADDLE_WITH_GPU_GRAPH
#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH)
platform::Timer timeline;
timeline.Start();
// barrier
......@@ -853,10 +861,9 @@ void PSGPUWrapper::MergePull(std::shared_ptr<HeterContext> gpu_task) {
auto barrier_span = timeline.ElapsedSec();
timeline.Start();
auto fleet_ptr = paddle::distributed::FleetWrapper::GetInstance();
std::vector<std::future<void>> task_futures;
for (int dim_id = 0; dim_id < multi_mf_dim_; ++dim_id) {
auto pass_values = fleet_ptr->worker_ptr_->TakePassSparseReferedValues(
auto pass_values = fleet_ptr_->worker_ptr_->TakePassSparseReferedValues(
table_id_, gpu_task->pass_id_, dim_id);
if (pass_values == nullptr) {
continue;
......@@ -1225,56 +1232,6 @@ void PSGPUWrapper::PrepareGPUTask(std::shared_ptr<HeterContext> gpu_task) {
task_futures.clear();
}
VLOG(0) << "prefix done";
auto prepare_dev_value_func = [device_num,
&prefix_sum,
&device_keys,
&device_vals,
&device_task_keys,
&device_task_ptrs](int dev, int shard_id) {
#ifdef PADDLE_WITH_PSLIB
auto& task_ptrs = device_task_ptrs[shard_id];
for (int j = 0; j < len; ++j) {
device_keys[dev][cur + j] = task_keys[dev][j];
float* ptr_val = task_ptrs[dev][j]->data();
FeatureValue& val = device_vals[dev][cur + j];
size_t dim = task_ptrs[dev][j]->size();
val.delta_score = ptr_val[1];
val.show = ptr_val[2];
val.clk = ptr_val[3];
val.slot = ptr_val[6];
val.lr = ptr_val[4];
val.lr_g2sum = ptr_val[5];
val.cpu_ptr = (uint64_t)(task_ptrs[dev][j]);
if (dim > 7) {
val.mf_size = MF_DIM + 1;
for (int x = 0; x < val.mf_size; x++) {
val.mf[x] = ptr_val[x + 7];
}
} else {
val.mf_size = 0;
for (int x = 0; x < MF_DIM + 1; x++) {
val.mf[x] = 0;
}
}
}
#endif
VLOG(3) << "GpuPs build hbmps done";
};
if (!multi_mf_dim_) {
for (int i = 0; i < thread_keys_shard_num_; i++) {
for (int j = 0; j < device_num; j++) {
task_futures.emplace_back(
hbm_thread_pool_[i]->enqueue(prepare_dev_value_func, j, i));
}
}
for (auto& f : task_futures) {
f.wait();
}
task_futures.clear();
}
timeline.Pause();
VLOG(0) << "passid=" << gpu_task->pass_id_
<< ", GpuPs prepare for build hbm cost " << timeline.ElapsedSec()
......@@ -1322,90 +1279,57 @@ void PSGPUWrapper::BuildGPUTask(std::shared_ptr<HeterContext> gpu_task) {
<< " s.";
stagetime.Start();
auto build_dynamic_mf_func = [this, &gpu_task, &accessor_wrapper_ptr](
const int i,
const size_t tid,
const size_t once_gpu_copy) {
// VLOG(0) << "begin build_dynamic_mf_func tid=" << tid << ", i=" << i;
for (int j = 0; j < multi_mf_dim_; j++) {
auto& device_dim_ptrs = gpu_task->device_dim_ptr_[i][j];
int mf_dim = this->index_dim_vec_[j];
size_t feature_value_size =
accessor_wrapper_ptr->GetFeatureValueSize(mf_dim);
size_t len = device_dim_ptrs.size();
size_t start = tid * once_gpu_copy;
while (start < len) {
size_t real_len =
(len - start) > once_gpu_copy ? once_gpu_copy : (len - start);
size_t end = start + real_len;
std::shared_ptr<char> build_values(
new char[feature_value_size * real_len],
[](char* p) { delete[] p; });
char* test_build_values = build_values.get();
for (size_t k = start; k < end; k++) {
auto build_dynamic_mf_func =
[this, &gpu_task, &accessor_wrapper_ptr](
const int i, const size_t tid, const size_t once_gpu_copy) {
// VLOG(0) << "begin build_dynamic_mf_func tid=" << tid << ", i=" <<
// i;
for (int j = 0; j < multi_mf_dim_; j++) {
auto& device_dim_ptrs = gpu_task->device_dim_ptr_[i][j];
int mf_dim = this->index_dim_vec_[j];
size_t feature_value_size =
accessor_wrapper_ptr->GetFeatureValueSize(mf_dim);
size_t len = device_dim_ptrs.size();
size_t start = tid * once_gpu_copy;
while (start < len) {
size_t real_len =
(len - start) > once_gpu_copy ? once_gpu_copy : (len - start);
size_t end = start + real_len;
std::shared_ptr<char> build_values(
new char[feature_value_size * real_len],
[](char* p) { delete[] p; });
char* test_build_values = build_values.get();
for (size_t k = start; k < end; k++) {
#ifdef PADDLE_WITH_PSCORE
void* val = reinterpret_cast<float*>(
test_build_values + (k - start) * feature_value_size);
accessor_wrapper_ptr->BuildFill(
val, device_dim_ptrs[k], cpu_table_accessor_, mf_dim);
#endif
#ifdef PADDLE_WITH_PSLIB
float* val = reinterpret_cast<float*>(
test_build_values + (k - start) * feature_value_size);
float* ptr_val = device_dim_ptrs[k]->data();
size_t dim = device_dim_ptrs[k]->size();
val->delta_score =
ptr_val[paddle::ps::DownpourCtrDymfAccessor::
DownpourCtrDymfFeatureValue::delta_score_index()];
val->show = ptr_val[paddle::ps::DownpourCtrDymfAccessor::
DownpourCtrDymfFeatureValue::show_index()];
val->clk = ptr_val[paddle::ps::DownpourCtrDymfAccessor::
DownpourCtrDymfFeatureValue::click_index()];
val->slot = static_cast<int>(
ptr_val[paddle::ps::DownpourCtrDymfAccessor::
DownpourCtrDymfFeatureValue::slot_index()]);
val->lr = ptr_val[paddle::ps::DownpourCtrDymfAccessor::
DownpourCtrDymfFeatureValue::embed_w_index()];
val->lr_g2sum =
ptr_val[paddle::ps::DownpourCtrDymfAccessor::
DownpourCtrDymfFeatureValue::embed_g2sum_index()];
// TODO(xuefeng) set mf_dim while using DownpourCtrDymfAccessor
ptr_val[paddle::ps::DownpourCtrDymfAccessor::
DownpourCtrDymfFeatureValue::mf_dim_index()] =
static_cast<float>(mf_dim);
val->mf_dim = mf_dim;
if (dim > 8) { // CpuPS alreay expand as mf_dim
val->mf_size = mf_dim + 1;
for (int x = 0; x < val->mf_dim + 1; x++) {
val->mf[x] = ptr_val[x + 8];
}
} else {
val->mf_size = 0;
for (int x = 0; x < val->mf_dim + 1; x++) {
val->mf[x] = 0;
float* val = reinterpret_cast<float*>(
test_build_values + (k - start) * feature_value_size);
accessor_wrapper_ptr->BuildFill(val,
device_dim_ptrs[k],
cpu_table_accessor_,
mf_dim,
accessor_class_);
#endif
}
task_info task;
task.build_values = build_values;
task.offset = start;
task.device_id = i;
task.multi_mf_dim = j;
task.start = 0;
task.end = static_cast<int>(real_len);
cpu_reday_channels_[i]->Put(task);
// step
start = start + (once_gpu_copy * cpu_device_thread_num_);
}
VLOG(5) << "build " << k << " : "
<< feature_value_accessor_.ParseToString(
val,
feature_value_accessor_.common_feature_value.Dim(
mf_dim));
#endif
#ifdef PADDLE_WITH_PSCORE
void* val = reinterpret_cast<float*>(
test_build_values + (k - start) * feature_value_size);
accessor_wrapper_ptr->BuildFill(
val, device_dim_ptrs[k], cpu_table_accessor_, mf_dim);
#endif
}
task_info task;
task.build_values = build_values;
task.offset = start;
task.device_id = i;
task.multi_mf_dim = j;
task.start = 0;
task.end = static_cast<int>(real_len);
cpu_reday_channels_[i]->Put(task);
// step
start = start + (once_gpu_copy * cpu_device_thread_num_);
}
}
// VLOG(0) << "end build_dynamic_mf_func tid=" << tid << ", i=" << i;
};
// VLOG(0) << "end build_dynamic_mf_func tid=" << tid << ", i=" << i;
};
auto build_dymf_hbm_pool = [this,
&gpu_task,
......@@ -1447,10 +1371,12 @@ void PSGPUWrapper::BuildGPUTask(std::shared_ptr<HeterContext> gpu_task) {
HeterPs_->show_one_table(i);
}
}
stagetime.Pause();
auto build_span = stagetime.ElapsedSec();
stagetime.Start();
#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH)
// build feature table
if (slot_num_for_pull_feature_ > 0 &&
(FLAGS_gpugraph_storage_mode == paddle::framework::GpuGraphStorageMode::
......@@ -1463,6 +1389,7 @@ void PSGPUWrapper::BuildGPUTask(std::shared_ptr<HeterContext> gpu_task) {
(std::vector<GpuPsCommGraphFea>*)gpu_task->sub_graph_feas;
gpu_graph_ptr->build_gpu_graph_fea((*tmp)[i], i);
}
#endif
stagetime.Pause();
auto build_feature_span = stagetime.ElapsedSec();
......@@ -1525,6 +1452,7 @@ void PSGPUWrapper::BuildGPUTask(std::shared_ptr<HeterContext> gpu_task) {
f.wait();
}
gpu_task_futures.clear();
#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH)
if (FLAGS_gpugraph_storage_mode == paddle::framework::GpuGraphStorageMode::
MEM_EMB_FEATURE_AND_GPU_GRAPH ||
FLAGS_gpugraph_storage_mode == paddle::framework::GpuGraphStorageMode::
......@@ -1534,6 +1462,7 @@ void PSGPUWrapper::BuildGPUTask(std::shared_ptr<HeterContext> gpu_task) {
delete tmp;
gpu_task->sub_graph_feas = NULL;
}
#endif
stagetime.Pause();
VLOG(1) << " build_dymf_hbm_pool "
<< " cost " << stagetime.ElapsedSec() << " s.";
......@@ -1709,6 +1638,7 @@ void PSGPUWrapper::EndPass() {
}
void PSGPUWrapper::SparseTableToHbm() {
#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH)
std::shared_ptr<HeterContext> gpu_task = gpu_task_pool_.Get();
gpu_task->Reset();
size_t device_num = heter_devices_.size();
......@@ -1739,6 +1669,7 @@ void PSGPUWrapper::SparseTableToHbm() {
BuildGPUTask(gpu_task);
current_task_ = gpu_task;
hbm_sparse_table_initialized_ = true;
#endif
}
void PSGPUWrapper::HbmToSparseTable() {
......@@ -1824,22 +1755,22 @@ void PSGPUWrapper::HbmToSparseTable() {
auto& device_keys =
this->current_task_
->device_dim_keys_[task.device_id][task.multi_mf_dim];
char* test_build_values = task.build_values.get();
int mf_dim = this->index_dim_vec_[task.multi_mf_dim];
size_t feature_value_size =
accessor_wrapper_ptr->GetFeatureValueSize(mf_dim);
uint64_t unuse_key = std::numeric_limits<uint64_t>::max();
for (int i = task.start; i < task.end; ++i) {
if (device_keys[i + task.offset] == unuse_key) {
continue;
}
size_t local_offset = i * feature_value_size;
float* gpu_val =
reinterpret_cast<float*>(test_build_values + local_offset);
#ifdef PADDLE_WITH_PSLIB
// TODO(lxsbupt): PSLIB DumpFill
#endif
#ifdef PADDLE_WITH_PSCORE
int mf_dim = this->index_dim_vec_[task.multi_mf_dim];
char* test_build_values = task.build_values.get();
size_t feature_value_size =
accessor_wrapper_ptr->GetFeatureValueSize(mf_dim);
size_t local_offset = i * feature_value_size;
float* gpu_val =
reinterpret_cast<float*>(test_build_values + local_offset);
accessor_wrapper_ptr->DumpFill(gpu_val, cpu_table_accessor_, mf_dim);
#endif
}
......@@ -1886,9 +1817,11 @@ void PSGPUWrapper::HbmToSparseTable() {
}
void PSGPUWrapper::DumpToMem() {
#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH)
if (FLAGS_gpugraph_storage_mode == GpuGraphStorageMode::WHOLE_HBM) {
this->HbmToSparseTable();
}
#endif
}
void PSGPUWrapper::PullSparse(const paddle::platform::Place& place,
......
......@@ -15,6 +15,7 @@ limitations under the License. */
#pragma once
#ifdef PADDLE_WITH_HETERPS
#include <google/protobuf/text_format.h>
#include <atomic>
#include <ctime>
#include <map>
......@@ -35,7 +36,9 @@ limitations under the License. */
#include "paddle/fluid/distributed/ps/thirdparty/round_robin.h"
#include "paddle/fluid/framework/channel.h"
#include "paddle/fluid/framework/fleet/heter_context.h"
#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH)
#include "paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h"
#endif
#include "paddle/fluid/framework/fleet/heter_ps/heter_ps_base.h"
#include "paddle/fluid/framework/fleet/heter_ps/heter_resource.h"
#include "paddle/fluid/framework/heter_util.h"
......@@ -59,9 +62,7 @@ limitations under the License. */
#include "paddle/fluid/distributed/the_one_ps.pb.h"
#endif
#ifdef PADDLE_WITH_PSLIB
#include "afs_api.h" // NOLINT
#endif
#ifdef PADDLE_WITH_PSLIB
#include "afs_api.h" // NOLINT
#include "downpour_accessor.h" // NOLINT
#endif
#include "paddle/fluid/framework/fleet/heter_ps/log_patch.h"
......@@ -354,6 +355,14 @@ class PSGPUWrapper {
// start build cpu&gpu ps thread
start_build_thread();
}
#ifdef PADDLE_WITH_PSCORE
cpu_table_accessor_ = fleet_ptr_->worker_ptr_->GetTableAccessor(0);
#endif
#ifdef PADDLE_WITH_PSLIB
cpu_table_accessor_ =
fleet_ptr_->pslib_ptr_->_worker_ptr->table_accessor(0);
#endif
InitializeGPUServer(fleet_ptr_->GetDistDesc());
}
void SetSparseSGD(float nonclk_coeff,
......@@ -425,7 +434,9 @@ class PSGPUWrapper {
}
}
void InitializeGPUServer(paddle::distributed::PSParameter ps_param) {
void InitializeGPUServer(const std::string& dist_desc) {
paddle::distributed::PSParameter ps_param;
google::protobuf::TextFormat::ParseFromString(dist_desc, &ps_param);
auto sparse_table =
ps_param.server_param().downpour_server_param().downpour_table_param(0);
// set build thread_num and shard_num
......@@ -478,6 +489,176 @@ class PSGPUWrapper {
}
#endif
#ifdef PADDLE_WITH_PSLIB
void add_sparse_optimizer(
std::unordered_map<std::string, float>& config, // NOLINT
const paddle::SparseCommonSGDRuleParameter& sgd_param,
const std::string& prefix = "") {
auto optimizer_name = sgd_param.name();
if (optimizer_name == "naive") {
config[prefix + "optimizer_type"] = 0;
config[prefix + "learning_rate"] = sgd_param.naive().learning_rate();
config[prefix + "initial_range"] = sgd_param.naive().initial_range();
if (sgd_param.naive().weight_bounds_size() == 2) {
config[prefix + "min_bound"] = sgd_param.naive().weight_bounds()[0];
config[prefix + "max_bound"] = sgd_param.naive().weight_bounds()[1];
}
} else if (optimizer_name == "adagrad") {
config[prefix + "optimizer_type"] = 1;
config[prefix + "learning_rate"] = sgd_param.adagrad().learning_rate();
config[prefix + "initial_range"] = sgd_param.adagrad().initial_range();
config[prefix + "initial_g2sum"] = sgd_param.adagrad().initial_g2sum();
if (sgd_param.adagrad().weight_bounds_size() == 2) {
config[prefix + "min_bound"] = sgd_param.adagrad().weight_bounds()[0];
config[prefix + "max_bound"] = sgd_param.adagrad().weight_bounds()[1];
}
} else if (optimizer_name == "std_adagrad") {
config[prefix + "optimizer_type"] = 2;
config[prefix + "learning_rate"] = sgd_param.adagrad().learning_rate();
config[prefix + "initial_range"] = sgd_param.adagrad().initial_range();
config[prefix + "initial_g2sum"] = sgd_param.adagrad().initial_g2sum();
if (sgd_param.adagrad().weight_bounds_size() == 2) {
config[prefix + "min_bound"] = sgd_param.adagrad().weight_bounds()[0];
config[prefix + "max_bound"] = sgd_param.adagrad().weight_bounds()[1];
}
} else if (optimizer_name == "adam") {
config[prefix + "optimizer_type"] = 3;
config[prefix + "learning_rate"] = sgd_param.adam().learning_rate();
config[prefix + "initial_range"] = sgd_param.adam().initial_range();
if (sgd_param.adam().weight_bounds_size() == 2) {
config[prefix + "min_bound"] = sgd_param.adam().weight_bounds()[0];
config[prefix + "max_bound"] = sgd_param.adam().weight_bounds()[1];
}
}
}
void InitializeGPUServer(const std::string& dist_desc) {
// optimizer config for hbmps
paddle::PSParameter ps_param;
google::protobuf::TextFormat::ParseFromString(dist_desc, &ps_param);
auto sparse_table =
ps_param.server_param().downpour_server_param().downpour_table_param(0);
auto sparse_table_accessor = sparse_table.accessor();
auto sparse_table_accessor_parameter =
sparse_table_accessor.downpour_accessor_param();
accessor_class_ = sparse_table_accessor.accessor_class();
// NOTE(zhangminxu): gpups' sparse table optimizer config,
// now only support single sparse table
// auto sparse_table = param_.sparse_table(0);
std::unordered_map<std::string, float> config;
if (accessor_class_ == "DownpourFeatureValueAccessor" ||
accessor_class_ == "DownpourCtrAccessor" ||
accessor_class_ == "DownpourCtrDoubleAccessor") {
config["nonclk_coeff"] = sparse_table_accessor_parameter.nonclk_coeff();
config["clk_coeff"] = sparse_table_accessor_parameter.click_coeff();
config["learning_rate"] =
sparse_table_accessor.sparse_sgd_param().learning_rate();
config["initial_g2sum"] =
sparse_table_accessor.sparse_sgd_param().initial_g2sum();
config["initial_range"] =
sparse_table_accessor.sparse_sgd_param().initial_range();
if (sparse_table_accessor.sparse_sgd_param().weight_bounds_size() == 2) {
config["min_bound"] =
sparse_table_accessor.sparse_sgd_param().weight_bounds()[0];
config["max_bound"] =
sparse_table_accessor.sparse_sgd_param().weight_bounds()[1];
}
// NOTE(zhangminxu): for DownpourCtrAccessor & DownpourCtrDoubleAccessor,
// optimizer config for embed_w & embedx_w is the same
config["mf_create_thresholds"] =
sparse_table_accessor.embedx_threshold(); // default = 10
config["mf_learning_rate"] = config["learning_rate"];
config["mf_initial_g2sum"] = config["initial_g2sum"];
config["mf_initial_range"] = config["initial_range"];
config["mf_min_bound"] = config["min_bound"];
config["mf_max_bound"] = config["max_bound"];
config["mf_embedx_dim"] =
sparse_table_accessor.embedx_dim(); // default = 8
} else if (accessor_class_ == "DownpourSparseValueAccessor") {
auto optimizer_name =
sparse_table_accessor.sparse_commonsgd_param().name();
if (optimizer_name == "naive") {
config["learning_rate"] = sparse_table_accessor.sparse_commonsgd_param()
.naive()
.learning_rate();
config["initial_range"] = sparse_table_accessor.sparse_commonsgd_param()
.naive()
.initial_range();
if (sparse_table_accessor.sparse_commonsgd_param()
.naive()
.weight_bounds_size() == 2) {
config["min_bound"] = sparse_table_accessor.sparse_commonsgd_param()
.naive()
.weight_bounds()[0];
config["max_bound"] = sparse_table_accessor.sparse_commonsgd_param()
.naive()
.weight_bounds()[1];
}
} else if (optimizer_name == "adagrad") {
config["learning_rate"] = sparse_table_accessor.sparse_commonsgd_param()
.adagrad()
.learning_rate();
config["initial_range"] = sparse_table_accessor.sparse_commonsgd_param()
.adagrad()
.initial_range();
config["initial_g2sum"] = sparse_table_accessor.sparse_commonsgd_param()
.adagrad()
.initial_g2sum();
if (sparse_table_accessor.sparse_commonsgd_param()
.adagrad()
.weight_bounds_size() == 2) {
config["min_bound"] = sparse_table_accessor.sparse_commonsgd_param()
.adagrad()
.weight_bounds()[0];
config["max_bound"] = sparse_table_accessor.sparse_commonsgd_param()
.adagrad()
.weight_bounds()[1];
}
} else if (optimizer_name == "adam") {
config["learning_rate"] = sparse_table_accessor.sparse_commonsgd_param()
.adam()
.learning_rate();
config["initial_range"] = sparse_table_accessor.sparse_commonsgd_param()
.adam()
.initial_range();
if (sparse_table_accessor.sparse_commonsgd_param()
.adam()
.weight_bounds_size() == 2) {
config["min_bound"] = sparse_table_accessor.sparse_commonsgd_param()
.adam()
.weight_bounds()[0];
config["max_bound"] = sparse_table_accessor.sparse_commonsgd_param()
.adam()
.weight_bounds()[1];
}
}
} else if (accessor_class_ == "DownpourUnitAccessor" ||
accessor_class_ == "DownpourDoubleUnitAccessor" ||
accessor_class_ == "DownpourCtrDymfAccessor" ||
accessor_class_ == "DownpourCtrDoubleDymfAccessor") {
config["nonclk_coeff"] = sparse_table_accessor_parameter.nonclk_coeff();
config["clk_coeff"] = sparse_table_accessor_parameter.click_coeff();
config["mf_create_thresholds"] = sparse_table_accessor.embedx_threshold();
// optimizer config for embed_w and embedx
add_sparse_optimizer(config, sparse_table_accessor.embed_sgd_param());
add_sparse_optimizer(
config, sparse_table_accessor.embedx_sgd_param(), "mf_");
config["mf_embedx_dim"] =
sparse_table_accessor.embedx_dim(); // default = 8
}
config["sparse_shard_num"] = sparse_table.shard_num();
GlobalAccessorFactory::GetInstance().Init(accessor_class_);
GlobalAccessorFactory::GetInstance().GetAccessorWrapper()->Configure(
config);
InitializeGPUServer(config);
}
#endif
void InitializeGPUServer(std::unordered_map<std::string, float> config) {
float nonclk_coeff = (config.find("nonclk_coeff") == config.end())
? 1.0
......@@ -708,11 +889,6 @@ class PSGPUWrapper {
const std::string& conf);
#endif
#ifdef PADDLE_WITH_PSCORE
void SetTableAccessor(paddle::distributed::ValueAccessor* accessor) {
cpu_table_accessor_ = accessor;
}
#endif
// for node rank
int PartitionKeyForRank(const uint64_t& key) {
return ((key / device_num_) % node_size_);
......@@ -782,9 +958,16 @@ class PSGPUWrapper {
std::string accessor_class_;
std::unordered_map<std::string, float> fleet_config_;
#ifdef PADDLE_WITH_PSCORE
std::shared_ptr<paddle::distributed::FleetWrapper> fleet_ptr_ =
paddle::distributed::FleetWrapper::GetInstance();
paddle::distributed::ValueAccessor* cpu_table_accessor_;
#endif
#ifdef PADDLE_WITH_PSLIB
std::shared_ptr<FleetWrapper> fleet_ptr_ = FleetWrapper::GetInstance();
paddle::ps::ValueAccessor* cpu_table_accessor_;
#endif
#ifdef PADDLE_WITH_CUDA
std::vector<MemoryPool*> mem_pools_;
std::vector<HBMMemoryPoolFix*> hbm_pools_; // in multi mfdim, one table need
......
......@@ -14,6 +14,7 @@ limitations under the License. */
#include "paddle/fluid/framework/device_worker.h"
#include "paddle/fluid/framework/device_worker_factory.h"
#include "paddle/fluid/operators/isfinite_op.h"
#include "paddle/fluid/platform/cpu_helper.h"
#include "paddle/fluid/platform/lodtensor_printer.h"
#include "paddle/fluid/string/string_helper.h"
......
......@@ -36,10 +36,6 @@ limitations under the License. */
#include "paddle/fluid/operators/reader/blocking_queue.h"
#include "paddle/phi/backends/dynload/port.h"
#ifdef PADDLE_WITH_PSLIB
#include "proto/the_one_ps.pb.h"
#endif
namespace paddle {
namespace framework {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册