From 8464c04d00b3933cf6fd23935a5fb45c627506dd Mon Sep 17 00:00:00 2001 From: pangengzheng <117730991+pangengzheng@users.noreply.github.com> Date: Wed, 8 Feb 2023 11:40:28 +0800 Subject: [PATCH] Support compile pslib v3 (#50166) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix feature_value.h and feature_value.cu to support pslib * code style * align DistPsArch pre-stable branch --------- Co-authored-by: Sławomir Siwek Co-authored-by: Tomasz Socha Co-authored-by: heliqi <1101791222@qq.com> Co-authored-by: zqw_1997 <118182234+zhengqiwen1997@users.noreply.github.com> Co-authored-by: jameszhang Co-authored-by: xiaoguoguo626807 <100397923+xiaoguoguo626807@users.noreply.github.com> Co-authored-by: Feiyu Chan Co-authored-by: GGBond8488 <33050871+GGBond8488@users.noreply.github.com> Co-authored-by: sprouteer <89541335+sprouteer@users.noreply.github.com> Co-authored-by: jakpiase Co-authored-by: Jiabin Yang <360788950@qq.com> Co-authored-by: limingshu <61349199+JamesLim-sy@users.noreply.github.com> Co-authored-by: zhangbopd <1299246947@qq.com> Co-authored-by: 张春乔 <83450930+Liyulingyue@users.noreply.github.com> Co-authored-by: LiYuRio <63526175+LiYuRio@users.noreply.github.com> Co-authored-by: 姜永久 <34344716+yjjiang11@users.noreply.github.com> Co-authored-by: Yuang Liu Co-authored-by: jiangcheng Co-authored-by: ronnywang Co-authored-by: sneaxiy <32832641+sneaxiy@users.noreply.github.com> Co-authored-by: houj04 <35131887+houj04@users.noreply.github.com> Co-authored-by: zhangbo9674 <82555433+zhangbo9674@users.noreply.github.com> Co-authored-by: gem5 <117625383+linsheng011@users.noreply.github.com> Co-authored-by: wanghuancoder Co-authored-by: Ryan <44900829+DrRyanHuang@users.noreply.github.com> Co-authored-by: Ruibiao Chen Co-authored-by: engineer1109 Co-authored-by: RedContritio Co-authored-by: mjxs <52824616+kk-2000@users.noreply.github.com> Co-authored-by: Yiqun Liu Co-authored-by: 张正海 <65210872+ccsuzzh@users.noreply.github.com> Co-authored-by: HongyuJia Co-authored-by: pangyoki Co-authored-by: LoneRanger <836253168@qq.com> Co-authored-by: TeFeng Chen Co-authored-by: Leo Guo <58431564+ZibinGuo@users.noreply.github.com> Co-authored-by: xiaoting <31891223+tink2123@users.noreply.github.com> Co-authored-by: 201716010711 <87008376+201716010711@users.noreply.github.com> Co-authored-by: wangxiaoning <71813629+wangxn12138@users.noreply.github.com> Co-authored-by: Yuanle Liu Co-authored-by: ZZK <359521840@qq.com> Co-authored-by: zhangkaihuo Co-authored-by: Roc <30228238+sljlp@users.noreply.github.com> Co-authored-by: PuQing Co-authored-by: Zhang Jun Co-authored-by: Charles-hit <56987902+Charles-hit@users.noreply.github.com> Co-authored-by: niuliling123 <51102941+niuliling123@users.noreply.github.com> Co-authored-by: wenbin Co-authored-by: wangshengxiang <121413869+shengxiangwang@users.noreply.github.com> Co-authored-by: Bo Zhang <105368690+zhangbopd@users.noreply.github.com> Co-authored-by: Aurelius84 Co-authored-by: zxcd <228587199@qq.com> Co-authored-by: zhoutianzi666 <39978853+zhoutianzi666@users.noreply.github.com> Co-authored-by: gouzil <66515297+gouzil@users.noreply.github.com> Co-authored-by: zhangyikun02 <48021248+zhangyk0314@users.noreply.github.com> Co-authored-by: Hui Zhang Co-authored-by: Wang Bojun <105858416+wwbitejotunn@users.noreply.github.com> Co-authored-by: Guanghua Yu <742925032@qq.com> Co-authored-by: YUNSHEN XIE <1084314248@qq.com> Co-authored-by: Zhong Hui Co-authored-by: risemeup1 <62429225+risemeup1@users.noreply.github.com> Co-authored-by: liuruyan <44316842+liuruyan@users.noreply.github.com> Co-authored-by: Leo Chen Co-authored-by: YuanRisheng Co-authored-by: wuhuachaocoding <77733235+wuhuachaocoding@users.noreply.github.com> Co-authored-by: Ccc <52520497+juncaipeng@users.noreply.github.com> --- cmake/external/leveldb.cmake | 9 +- .../distributed/fleet_executor/CMakeLists.txt | 2 +- .../distributed/fleet_executor/message_bus.cc | 8 +- .../distributed/fleet_executor/message_bus.h | 6 +- .../fleet_executor/message_service.cc | 2 +- .../fleet_executor/message_service.h | 2 +- .../fleet_executor/test/CMakeLists.txt | 4 +- paddle/fluid/distributed/ps/wrapper/fleet.cc | 10 - paddle/fluid/distributed/ps/wrapper/fleet.h | 7 + paddle/fluid/framework/data_feed.cu | 4 + paddle/fluid/framework/fleet/fleet_wrapper.cc | 8 +- paddle/fluid/framework/fleet/fleet_wrapper.h | 10 +- .../framework/fleet/heter_ps/feature_value.cu | 4 + .../framework/fleet/heter_ps/feature_value.h | 175 +++++++- .../fleet/heter_ps/hashtable_kernel.cu | 50 --- .../fluid/framework/fleet/ps_gpu_wrapper.cc | 401 ++++++++---------- paddle/fluid/framework/fleet/ps_gpu_wrapper.h | 201 ++++++++- paddle/fluid/framework/ps_gpu_worker.cc | 1 + paddle/fluid/framework/trainer.h | 4 - 19 files changed, 582 insertions(+), 326 deletions(-) diff --git a/cmake/external/leveldb.cmake b/cmake/external/leveldb.cmake index b1f2345794..4eb630ad0c 100644 --- a/cmake/external/leveldb.cmake +++ b/cmake/external/leveldb.cmake @@ -23,7 +23,10 @@ set(LEVELDB_LIBRARIES "${LEVELDB_INSTALL_DIR}/lib/libleveldb.a" CACHE FILEPATH "leveldb library." FORCE) include_directories(${LEVELDB_INCLUDE_DIR}) - +set(LEVELDN_CXXFLAGS "-fPIC") +if(WITH_HETERPS AND WITH_PSLIB) + set(LEVELDN_CXXFLAGS "${LEVELDN_CXXFLAGS} -D_GLIBCXX_USE_CXX11_ABI=0") +endif() ExternalProject_Add( extern_leveldb ${EXTERNAL_PROJECT_LOG_ARGS} @@ -32,7 +35,8 @@ ExternalProject_Add( GIT_TAG v1.18 UPDATE_COMMAND "" CONFIGURE_COMMAND "" - BUILD_COMMAND CXXFLAGS=-fPIC make -j ${NUM_OF_PROCESSOR} libleveldb.a + BUILD_COMMAND export "CXXFLAGS=${LEVELDN_CXXFLAGS}" && make -j + ${NUM_OF_PROCESSOR} libleveldb.a INSTALL_COMMAND mkdir -p ${LEVELDB_INSTALL_DIR}/lib/ && cp ${LEVELDB_PREFIX_DIR}/src/extern_leveldb/libleveldb.a ${LEVELDB_LIBRARIES} @@ -40,7 +44,6 @@ ExternalProject_Add( ${LEVELDB_INSTALL_DIR}/ BUILD_IN_SOURCE 1 BUILD_BYPRODUCTS ${LEVELDB_LIBRARIES}) - add_dependencies(extern_leveldb snappy) add_library(leveldb STATIC IMPORTED GLOBAL) diff --git a/paddle/fluid/distributed/fleet_executor/CMakeLists.txt b/paddle/fluid/distributed/fleet_executor/CMakeLists.txt index ff8ed811ee..2f9b0aed29 100755 --- a/paddle/fluid/distributed/fleet_executor/CMakeLists.txt +++ b/paddle/fluid/distributed/fleet_executor/CMakeLists.txt @@ -6,7 +6,7 @@ proto_library(interceptor_message_proto SRCS interceptor_message.proto) if(WITH_ARM_BRPC) set(BRPC_DEPS arm_brpc snappy gflags glog) -elseif(WITH_DISTRIBUTE) +elseif(WITH_DISTRIBUTE AND NOT WITH_PSLIB) set(BRPC_DEPS brpc ssl diff --git a/paddle/fluid/distributed/fleet_executor/message_bus.cc b/paddle/fluid/distributed/fleet_executor/message_bus.cc index d1a23cc575..a18ba321f8 100644 --- a/paddle/fluid/distributed/fleet_executor/message_bus.cc +++ b/paddle/fluid/distributed/fleet_executor/message_bus.cc @@ -73,7 +73,7 @@ bool MessageBus::IsInit() const { return is_init_; } MessageBus::~MessageBus() { VLOG(3) << "Message bus releases resource."; -#if defined(PADDLE_WITH_DISTRIBUTE) +#if defined(PADDLE_WITH_DISTRIBUTE) && !defined(PADDLE_WITH_PSLIB) server_.Stop(1000); server_.Join(); #endif @@ -94,7 +94,7 @@ bool MessageBus::Send(int64_t dst_rank, true, platform::errors::PreconditionNotMet( "Using message bus since it has not been initialized.")); -#if defined(PADDLE_WITH_DISTRIBUTE) +#if defined(PADDLE_WITH_DISTRIBUTE) && !defined(PADDLE_WITH_PSLIB) int retry_time = 0; // message bus will retry sending for 10 times while (retry_time < 10) { ++retry_time; @@ -179,7 +179,7 @@ void MessageBus::ListenPort() { LOG(INFO) << "No need listen to port since training on single card."; return; } -#if defined(PADDLE_WITH_DISTRIBUTE) +#if defined(PADDLE_WITH_DISTRIBUTE) && !defined(PADDLE_WITH_PSLIB) // function keep listen the port and handle the message PADDLE_ENFORCE_EQ( server_.AddService(&message_service_, brpc::SERVER_DOESNT_OWN_SERVICE), @@ -209,7 +209,7 @@ void MessageBus::ListenPort() { #endif } -#if defined(PADDLE_WITH_DISTRIBUTE) +#if defined(PADDLE_WITH_DISTRIBUTE) && !defined(PADDLE_WITH_PSLIB) bool MessageBus::SendInterRank(int64_t dst_rank, const InterceptorMessage& interceptor_message) { const auto& dst_addr = GetAddr(dst_rank); diff --git a/paddle/fluid/distributed/fleet_executor/message_bus.h b/paddle/fluid/distributed/fleet_executor/message_bus.h index 481a64b71c..07ab6bb630 100644 --- a/paddle/fluid/distributed/fleet_executor/message_bus.h +++ b/paddle/fluid/distributed/fleet_executor/message_bus.h @@ -20,7 +20,7 @@ #include #include -#if defined(PADDLE_WITH_DISTRIBUTE) +#if defined(PADDLE_WITH_DISTRIBUTE) && !defined(PADDLE_WITH_PSLIB) #include "brpc/channel.h" #include "brpc/server.h" #include "paddle/fluid/distributed/fleet_executor/message_service.h" @@ -63,7 +63,7 @@ class MessageBus final { const std::string& GetAddr(int64_t rank) const; -#if defined(PADDLE_WITH_DISTRIBUTE) +#if defined(PADDLE_WITH_DISTRIBUTE) && !defined(PADDLE_WITH_PSLIB) // send the message inter rank (dst is different rank with src) bool SendInterRank(int64_t dst_rank, const InterceptorMessage& interceptor_message); @@ -79,7 +79,7 @@ class MessageBus final { // the ip needs to be listened std::string addr_; -#if defined(PADDLE_WITH_DISTRIBUTE) +#if defined(PADDLE_WITH_DISTRIBUTE) && !defined(PADDLE_WITH_PSLIB) MessageServiceImpl message_service_; // brpc server brpc::Server server_; diff --git a/paddle/fluid/distributed/fleet_executor/message_service.cc b/paddle/fluid/distributed/fleet_executor/message_service.cc index 5a1f3bf34d..d99ca36ee4 100644 --- a/paddle/fluid/distributed/fleet_executor/message_service.cc +++ b/paddle/fluid/distributed/fleet_executor/message_service.cc @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -#if defined(PADDLE_WITH_DISTRIBUTE) +#if defined(PADDLE_WITH_DISTRIBUTE) && !defined(PADDLE_WITH_PSLIB) #include "paddle/fluid/distributed/fleet_executor/message_service.h" #include "brpc/server.h" diff --git a/paddle/fluid/distributed/fleet_executor/message_service.h b/paddle/fluid/distributed/fleet_executor/message_service.h index 115732ea08..8f14fe0213 100644 --- a/paddle/fluid/distributed/fleet_executor/message_service.h +++ b/paddle/fluid/distributed/fleet_executor/message_service.h @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -#if defined(PADDLE_WITH_DISTRIBUTE) +#if defined(PADDLE_WITH_DISTRIBUTE) && !defined(PADDLE_WITH_PSLIB) #pragma once #include "brpc/server.h" diff --git a/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt b/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt index 466071e3be..582d6b7d05 100644 --- a/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt +++ b/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt @@ -51,7 +51,9 @@ cc_test_old( scope device_context) -if(WITH_DISTRIBUTE AND NOT (WITH_ASCEND OR WITH_ASCEND_CL)) +if(WITH_DISTRIBUTE + AND NOT WITH_PSLIB + AND NOT (WITH_ASCEND OR WITH_ASCEND_CL)) set_source_files_properties( interceptor_ping_pong_with_brpc_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) diff --git a/paddle/fluid/distributed/ps/wrapper/fleet.cc b/paddle/fluid/distributed/ps/wrapper/fleet.cc index 077c21e263..93294bc84d 100644 --- a/paddle/fluid/distributed/ps/wrapper/fleet.cc +++ b/paddle/fluid/distributed/ps/wrapper/fleet.cc @@ -18,9 +18,6 @@ limitations under the License. */ #include "paddle/fluid/distributed/ps/service/communicator/communicator.h" #include "paddle/fluid/distributed/ps/table/table.h" -#if defined PADDLE_WITH_HETERPS && defined PADDLE_WITH_PSCORE -#include "paddle/fluid/framework/fleet/ps_gpu_wrapper.h" -#endif namespace paddle { namespace distributed { @@ -131,13 +128,6 @@ void FleetWrapper::InitWorker(const std::string& dist_desc, worker_ptr_ = std::shared_ptr( paddle::distributed::PSClientFactory::Create(ps_param)); worker_ptr_->Configure(ps_param, dense_pull_regions, ps_env_, index); -#if defined PADDLE_WITH_HETERPS && defined PADDLE_WITH_PSCORE - VLOG(3) << "FleetWrapper::InitWorker InitializeGPUServer"; - auto* accessor = worker_ptr_->GetTableAccessor(0); - auto ps_gpu_wrapper = paddle::framework::PSGPUWrapper::GetInstance(); - ps_gpu_wrapper->InitializeGPUServer(ps_param); - ps_gpu_wrapper->SetTableAccessor(accessor); -#endif } } else { VLOG(3) << "Client can be initialized only once"; diff --git a/paddle/fluid/distributed/ps/wrapper/fleet.h b/paddle/fluid/distributed/ps/wrapper/fleet.h index acf2c3f722..9bf6f3c84a 100644 --- a/paddle/fluid/distributed/ps/wrapper/fleet.h +++ b/paddle/fluid/distributed/ps/wrapper/fleet.h @@ -286,6 +286,12 @@ class FleetWrapper { int to_client_id, const std::string& msg); + std::string GetDistDesc() const { + CHECK(is_initialized_ == true) + << "fleetwrapper should be initialized first!!!"; + return dist_desc_; + } + // FleetWrapper singleton static std::shared_ptr GetInstance() { if (NULL == s_instance_) { @@ -321,6 +327,7 @@ class FleetWrapper { private: static std::shared_ptr s_instance_; + std::string dist_desc_; paddle::distributed::PaddlePSEnvironment ps_env_; size_t GetAbsoluteSum(size_t start, size_t end, diff --git a/paddle/fluid/framework/data_feed.cu b/paddle/fluid/framework/data_feed.cu index a2100e656d..e26bb36c95 100644 --- a/paddle/fluid/framework/data_feed.cu +++ b/paddle/fluid/framework/data_feed.cu @@ -23,9 +23,11 @@ limitations under the License. */ #include #include #include "cub/cub.cuh" +#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH) #include "paddle/fluid/framework/fleet/heter_ps/gpu_graph_node.h" #include "paddle/fluid/framework/fleet/heter_ps/gpu_graph_utils.h" #include "paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h" +#endif #include "paddle/fluid/framework/fleet/heter_ps/hashtable.h" #include "paddle/fluid/framework/fleet/ps_gpu_wrapper.h" #include "paddle/fluid/framework/io/fs.h" @@ -435,6 +437,7 @@ __global__ void CopyDuplicateKeys(int64_t *dist_tensor, } } +#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH) int GraphDataGenerator::AcquireInstance(BufState *state) { if (state->GetNextStep()) { DEBUG_STATE(state); @@ -2938,6 +2941,7 @@ void GraphDataGenerator::SetConfig( infer_node_type_ = graph_config.infer_node_type(); } } +#endif void GraphDataGenerator::DumpWalkPath(std::string dump_path, size_t dump_rate) { #ifdef _LINUX diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.cc b/paddle/fluid/framework/fleet/fleet_wrapper.cc index 3d557506ef..05433c1014 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.cc +++ b/paddle/fluid/framework/fleet/fleet_wrapper.cc @@ -78,6 +78,7 @@ void FleetWrapper::InitWorker(const std::string& dist_desc, const_cast(host_sign_list.data()), node_num, index); + dist_desc_ = dist_desc; is_initialized_ = true; } else { VLOG(3) << "Worker can be initialized only once"; @@ -1462,9 +1463,12 @@ void FleetWrapper::SetDate(const uint64_t table_id, const std::string& date) { #endif } -void FleetWrapper::PrintTableStat(const uint64_t table_id) { +void FleetWrapper::PrintTableStat(uint64_t table_id, + uint32_t pass_id, + size_t threshold) { #ifdef PADDLE_WITH_PSLIB - auto ret = pslib_ptr_->_worker_ptr->print_table_stat(table_id); + auto ret = + pslib_ptr_->_worker_ptr->print_table_stat(table_id, pass_id, threshold); ret.wait(); int32_t err_code = ret.get(); if (err_code == -1) { diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.h b/paddle/fluid/framework/fleet/fleet_wrapper.h index b3af181e42..fb5cf91729 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.h +++ b/paddle/fluid/framework/fleet/fleet_wrapper.h @@ -307,7 +307,7 @@ class FleetWrapper { std::vector table_var_list, bool load_combine); - void PrintTableStat(const uint64_t table_id); + void PrintTableStat(uint64_t table_id, uint32_t pass_id, size_t threshold); void SetFileNumOneShard(const uint64_t table_id, int file_num); // mode = 0, load all feature // mode = 1, load delta feature, which means load diff @@ -381,6 +381,13 @@ class FleetWrapper { void Confirm(); // revert all the updated params in the current pass void Revert(); + + std::string GetDistDesc() const { + CHECK(is_initialized_ == true) + << "fleetwrapper should be initialized first!!!"; + return dist_desc_; + } + // FleetWrapper singleton static std::shared_ptr GetInstance() { { @@ -402,6 +409,7 @@ class FleetWrapper { private: static std::shared_ptr s_instance_; + std::string dist_desc_; static std::mutex ins_mutex; #ifdef PADDLE_WITH_PSLIB std::map> _regions; diff --git a/paddle/fluid/framework/fleet/heter_ps/feature_value.cu b/paddle/fluid/framework/fleet/heter_ps/feature_value.cu index d7004a43cd..ea70338604 100644 --- a/paddle/fluid/framework/fleet/heter_ps/feature_value.cu +++ b/paddle/fluid/framework/fleet/heter_ps/feature_value.cu @@ -496,6 +496,10 @@ void AccessorWrapper::CopyForPushDedupImpl( template class AccessorWrapper; #endif +#ifdef PADDLE_WITH_PSLIB +template class AccessorWrapper; +#endif + } // namespace framework } // namespace paddle #endif diff --git a/paddle/fluid/framework/fleet/heter_ps/feature_value.h b/paddle/fluid/framework/fleet/heter_ps/feature_value.h index 8c6925e938..2478754b00 100644 --- a/paddle/fluid/framework/fleet/heter_ps/feature_value.h +++ b/paddle/fluid/framework/fleet/heter_ps/feature_value.h @@ -28,6 +28,11 @@ limitations under the License. */ #include "paddle/fluid/distributed/ps/table/depends/feature_value.h" #endif +#ifdef PADDLE_WITH_PSLIB +#include "downpour_accessor.h" // NOLINT +#include "pslib.h" // NOLINT +#endif + namespace paddle { namespace framework { #define MF_DIM 8 @@ -272,13 +277,13 @@ class CommonFeatureValueAccessor { return 0; } +#ifdef PADDLE_WITH_PSCORE // // build阶段从cpu_val赋值给gpu_val __host__ void BuildFill( float* gpu_val, void* cpu, paddle::distributed::ValueAccessor* cpu_table_accessor, int mf_dim) { -#ifdef PADDLE_WITH_PSCORE paddle::distributed::CtrDymfAccessor* cpu_accessor = dynamic_cast(cpu_table_accessor); paddle::distributed::FixedFeatureValue* cpu_ptr = @@ -324,14 +329,76 @@ class CommonFeatureValueAccessor { gpu_val[x] = 0; } } + } #endif + +#ifdef PADDLE_WITH_PSLIB + // build阶段从cpu_val赋值给gpu_val + template + __host__ void BuildFill(float* gpu_val, + void* _cpu_val, + ::paddle::ps::ValueAccessor* _cpu_accessor, + int mf_dim) { + auto* cpu_accessor = + dynamic_cast<::paddle::ps::DownpourCtrDymfTplAccessor*>( + _cpu_accessor); + auto* cpu_val = + reinterpret_cast<::paddle::ps::DownpourFixedFeatureValue*>(_cpu_val); + float* ptr_val = cpu_val->data(); + size_t cpu_dim = cpu_val->size(); + + gpu_val[common_feature_value.DeltaScoreIndex()] = + ptr_val[cpu_accessor->get_delta_score_index()]; + gpu_val[common_feature_value.ShowIndex()] = cpu_accessor->get_show(ptr_val); + gpu_val[common_feature_value.ClickIndex()] = + cpu_accessor->get_click(ptr_val); + + gpu_val[common_feature_value.SlotIndex()] = + ptr_val[cpu_accessor->get_slot_index()]; + + // lr + gpu_val[common_feature_value.EmbedWIndex()] = + ptr_val[cpu_accessor->get_embed_w_index()]; + + // cpu_ptr + *(reinterpret_cast( + gpu_val + common_feature_value.CpuPtrIndex())) = (uint64_t)(cpu_val); + + // lr_g2sum + // for dymf && adagrad, embed_dim = 1 + for (int i = 0; i < common_feature_value.EmbedDim(); i++) { + gpu_val[common_feature_value.EmbedG2SumIndex() + i] = + ptr_val[cpu_accessor->get_embed_g2sum_index() + i]; + } + + ptr_val[cpu_accessor->get_mf_dim_index()] = static_cast(mf_dim); + gpu_val[common_feature_value.MfDimIndex()] = static_cast(mf_dim); + constexpr int n = 2 * (sizeof(ShowClickType) / sizeof(float) - 1); + + if (cpu_dim > 8 + n) { + gpu_val[common_feature_value.MfSizeIndex()] = + common_feature_value.MFSize(mf_dim) / sizeof(float); + + for (int x = 0; x < static_cast(common_feature_value.MFSize(mf_dim) / + sizeof(float)); + x++) { + gpu_val[common_feature_value.EmbedxG2SumIndex() + x] = + ptr_val[8 + n + x]; + } + } else { + gpu_val[common_feature_value.MfSizeIndex()] = 0; + for (int i = 0; i < mf_dim + common_feature_value.EmbedXDim(); i++) { + gpu_val[common_feature_value.EmbedxG2SumIndex() + i] = 0; + } + } } +#endif +#ifdef PADDLE_WITH_PSCORE // dump_to_cpu阶段从gpu_val赋值给cpu_val __host__ void DumpFill(float* gpu_val, paddle::distributed::ValueAccessor* cpu_table_accessor, int mf_dim) { -#ifdef PADDLE_WITH_PSCORE paddle::distributed::CtrDymfAccessor* cpu_accessor = dynamic_cast(cpu_table_accessor); @@ -371,8 +438,64 @@ class CommonFeatureValueAccessor { gpu_val[common_feature_value.EmbedxG2SumIndex() + x]; } } + } #endif + +#ifdef PADDLE_WITH_PSLIB + // dump_to_cpu阶段从gpu_val赋值给cpu_val + // gpu_val is firstly copied to mem + // so gpu_val is in mem, not in hbm + template + __host__ void DumpFill(float* gpu_val, + ::paddle::ps::ValueAccessor* _cpu_accessor, + int mf_dim) { + auto* cpu_accessor = + dynamic_cast<::paddle::ps::DownpourCtrDymfTplAccessor*>( + _cpu_accessor); + uint64_t cpu_addr = *reinterpret_cast( + gpu_val + common_feature_value.CpuPtrIndex()); + auto* downpour_value = (::paddle::ps::DownpourFixedFeatureValue*)cpu_addr; + int downpour_value_size = downpour_value->size(); + constexpr int n = 2 * (sizeof(ShowClickType) / sizeof(float) - 1); + if (static_cast(gpu_val[common_feature_value.MfSizeIndex()]) > 0 && + downpour_value_size == 8 + n) { + int mf_size = + common_feature_value.MFSize(mf_dim) / + sizeof( + float); // mf_size = gpu_val[common_feature_value.MfSizeIndex()]; + downpour_value->resize(downpour_value_size + mf_size); + } + float* cpu_val = downpour_value->data(); + + cpu_val[cpu_accessor->get_delta_score_index()] = + gpu_val[common_feature_value.DeltaScoreIndex()]; + *reinterpret_cast(cpu_val + + cpu_accessor->get_show_index()) = + (ShowClickType)gpu_val[common_feature_value.ShowIndex()]; + *reinterpret_cast(cpu_val + + cpu_accessor->get_click_index()) = + (ShowClickType)gpu_val[common_feature_value.ClickIndex()]; + cpu_val[cpu_accessor->get_embed_w_index()] = + gpu_val[common_feature_value.EmbedWIndex()]; + cpu_val[cpu_accessor->get_slot_index()] = + gpu_val[common_feature_value.SlotIndex()]; + + // for dymf && adagrad, embed_dim = 1 + for (int i = 0; i < common_feature_value.EmbedDim(); i++) { + cpu_val[cpu_accessor->get_embed_g2sum_index() + i] = + gpu_val[common_feature_value.EmbedG2SumIndex() + i]; + } + + if (static_cast(gpu_val[common_feature_value.MfSizeIndex()]) > 0) { + for (int x = 0; x < static_cast(common_feature_value.MFSize(mf_dim) / + sizeof(float)); + x++) { + cpu_val[x + 8 + n] = + gpu_val[common_feature_value.EmbedxG2SumIndex() + x]; + } + } } +#endif // dy_mf_fill_dvals_kernel 阶段 gpukernel // 中从src_val赋值给dest_val @@ -667,6 +790,7 @@ class VirtualAccessor { virtual size_t GetPullValueSize(int& mf_dim) = 0; // NOLINT +#ifdef PADDLE_WITH_PSCORE virtual void BuildFill(void* gpu_val, void* cpu_val, paddle::distributed::ValueAccessor* cpu_table_accessor, @@ -675,6 +799,22 @@ class VirtualAccessor { virtual void DumpFill(float* gpu_val, paddle::distributed::ValueAccessor* cpu_table_accessor, int mf_dim) = 0; +#endif + +#ifdef PADDLE_WITH_PSLIB + virtual void BuildFill( + float* gpu_val, + void* cpu_val, + paddle::ps::ValueAccessor* cpu_accessor, + int mf_dim, + const std::string& accessor_type = "DownpourCtrDymfAccessor") = 0; + + virtual void DumpFill( + float* gpu_val, + ::paddle::ps::ValueAccessor* cpu_accessor, + int mf_dim, + const std::string& accessor_type = "DownpourCtrDymfAccessor") = 0; +#endif virtual void CopyForPull(const paddle::platform::Place& place, uint64_t** gpu_keys, @@ -771,6 +911,7 @@ class AccessorWrapper : public VirtualAccessor { GPUAccessor* AccessorPtr() { return &gpu_accessor_; } +#ifdef PADDLE_WITH_PSCORE virtual void BuildFill(void* gpu_val, void* cpu_val, paddle::distributed::ValueAccessor* cpu_table_accessor, @@ -784,6 +925,36 @@ class AccessorWrapper : public VirtualAccessor { int mf_dim) { gpu_accessor_.DumpFill(gpu_val, cpu_table_accessor, mf_dim); } +#endif + +#ifdef PADDLE_WITH_PSLIB + virtual void BuildFill( + float* gpu_val, + void* cpu_val, + paddle::ps::ValueAccessor* cpu_accessor, + int mf_dim, + const std::string& accessor_type = "DownpourCtrDymfAccessor") { + if (accessor_type == "DownpourCtrDymfAccessor") { + gpu_accessor_.template BuildFill( + gpu_val, cpu_val, cpu_accessor, mf_dim); + } else if (accessor_type == "DownpourCtrDoubleDymfAccessor") { + gpu_accessor_.template BuildFill( + gpu_val, cpu_val, cpu_accessor, mf_dim); + } + } + + virtual void DumpFill( + float* gpu_val, + paddle::ps::ValueAccessor* cpu_accessor, + int mf_dim, + const std::string& accessor_type = "DownpourCtrDymfAccessor") { + if (accessor_type == "DownpourCtrDymfAccessor") { + gpu_accessor_.template DumpFill(gpu_val, cpu_accessor, mf_dim); + } else if (accessor_type == "DownpourCtrDoubleDymfAccessor") { + gpu_accessor_.template DumpFill(gpu_val, cpu_accessor, mf_dim); + } + } +#endif virtual void CopyForPull(const paddle::platform::Place& place, uint64_t** gpu_keys, diff --git a/paddle/fluid/framework/fleet/heter_ps/hashtable_kernel.cu b/paddle/fluid/framework/fleet/heter_ps/hashtable_kernel.cu index a81a70d7e0..752c112394 100644 --- a/paddle/fluid/framework/fleet/heter_ps/hashtable_kernel.cu +++ b/paddle/fluid/framework/fleet/heter_ps/hashtable_kernel.cu @@ -370,56 +370,6 @@ template template void HashTable::dump_to_cpu(int devid, StreamType stream) { container_->prefetch(cudaCpuDeviceId, stream); - std::vector threads; - size_t num = container_->size(); - KeyType unuse_key = std::numeric_limits::max(); - thrust::pair* kv = container_->data(); - - int thread_num = 8; - int len_per_thread = num / thread_num; - int remain = num % thread_num; - int begin = 0; - - auto dump_func = [unuse_key, kv](int left, int right) { - for (int i = left; i < right; i++) { - if (kv[i].first == unuse_key) { - continue; - } - ValType& gpu_val = kv[i].second; -#ifdef PADDLE_WITH_PSLIB - auto* downpour_value = - (paddle::ps::DownpourFixedFeatureValue*)(gpu_val.cpu_ptr); - int downpour_value_size = downpour_value->size(); - if (gpu_val.mf_size > 0 && downpour_value_size == 7) { - downpour_value->resize(gpu_val.mf_size + downpour_value_size); - } - float* cpu_val = downpour_value->data(); - // cpu_val[0] = 0; - cpu_val[1] = gpu_val.delta_score; - cpu_val[2] = gpu_val.show; - cpu_val[3] = gpu_val.clk; - cpu_val[4] = gpu_val.lr; - cpu_val[5] = gpu_val.lr_g2sum; - cpu_val[6] = gpu_val.slot; - if (gpu_val.mf_size > 0) { - for (int x = 0; x < gpu_val.mf_size; x++) { - cpu_val[x + 7] = gpu_val.mf[x]; - } - } -#endif - } - }; - - for (int i = 0; i < thread_num; i++) { - threads.push_back(std::thread( - dump_func, begin, begin + len_per_thread + (i < remain ? 1 : 0))); - begin += len_per_thread + (i < remain ? 1 : 0); - } - for (std::thread& t : threads) { - t.join(); - } - - // container_->prefetch(devid, stream); } template diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc index 52fad986ff..7ab49bc025 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc @@ -35,7 +35,9 @@ limitations under the License. */ #include "paddle/fluid/framework/data_set.h" #include "paddle/fluid/framework/fleet/heter_ps/gpu_graph_utils.h" +#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH) #include "paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h" +#endif #include "paddle/fluid/platform/timer.h" #if defined(PADDLE_WITH_PSCORE) #include "paddle/fluid/distributed/ps/table/depends/feature_value.h" @@ -196,12 +198,17 @@ void PSGPUWrapper::add_key_to_gputask(std::shared_ptr gpu_task) { VLOG(1) << "GpuPs task add keys cost " << timeline.ElapsedSec() << " seconds."; timeline.Start(); +#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH) size_t slot_num = static_cast(slot_num_for_pull_feature_); // no slot_fea mode and whole_hbm mode, only keep one unique_sort action if (slot_num > 0 && FLAGS_gpugraph_storage_mode != paddle::framework::GpuGraphStorageMode::WHOLE_HBM) { gpu_task->UniqueKeys(); } +#endif +#ifdef PADDLE_WITH_PSLIB + gpu_task->UniqueKeys(); +#endif timeline.Pause(); VLOG(1) << "GpuPs task unique cost " << timeline.ElapsedSec() << " seconds."; } @@ -364,8 +371,6 @@ void PSGPUWrapper::add_slot_feature(std::shared_ptr gpu_task) { // 8卡数据分片 size_t device_num = heter_devices_.size(); std::vector threads; - size_t slot_num = static_cast( - slot_num_for_pull_feature_); // node slot 9008 in slot_vector auto& local_dim_keys = gpu_task->feature_dim_keys_; // [shard_num, 0, keys]] double divide_nodeid_cost = 0; double get_feature_id_cost = 0; @@ -431,15 +436,19 @@ void PSGPUWrapper::add_slot_feature(std::shared_ptr gpu_task) { threads.clear(); time_stage.Pause(); divide_nodeid_cost = time_stage.ElapsedSec(); +#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH) gpu_task->sub_graph_feas = reinterpret_cast(new std::vector); std::vector& sub_graph_feas = *((std::vector*)gpu_task->sub_graph_feas); +#endif std::vector> feature_ids(device_num); std::vector feature_list(device_num); std::vector feature_list_size(device_num); +#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH) size_t batch = 40000; - + size_t slot_num = static_cast( + slot_num_for_pull_feature_); // node slot 9008 in slot_vector time_stage.Start(); if (FLAGS_gpugraph_storage_mode == paddle::framework::GpuGraphStorageMode::MEM_EMB_AND_GPU_GRAPH) { @@ -535,6 +544,7 @@ void PSGPUWrapper::add_slot_feature(std::shared_ptr gpu_task) { } time_stage.Pause(); get_feature_id_cost = time_stage.ElapsedSec(); +#endif size_t feature_num = 0; for (size_t i = 0; i < device_num; i++) { feature_num += feature_list_size[i]; @@ -650,12 +660,16 @@ void PSGPUWrapper::add_slot_feature(std::shared_ptr gpu_task) { void PSGPUWrapper::BuildPull(std::shared_ptr gpu_task) { platform::Timer timeline; +#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH) if (slot_num_for_pull_feature_ > 0 && FLAGS_gpugraph_storage_mode != paddle::framework::GpuGraphStorageMode::WHOLE_HBM) { add_slot_feature(gpu_task); } - +#endif +#ifdef PADDLE_WITH_PSLIB + add_slot_feature(gpu_task); +#endif resize_gputask(gpu_task); platform::Timer time_stage; @@ -680,12 +694,6 @@ void PSGPUWrapper::BuildPull(std::shared_ptr gpu_task) { // auto& device_mutex = gpu_task->mutex_; std::vector threads(thread_keys_shard_num_); -#ifdef PADDLE_WITH_PSLIB - auto fleet_ptr = FleetWrapper::GetInstance(); -#endif -#ifdef PADDLE_WITH_PSCORE - auto fleet_ptr = paddle::distributed::FleetWrapper::GetInstance(); -#endif #if (defined PADDLE_WITH_PSLIB) && (defined PADDLE_WITH_HETERPS) // get day_id: day nums from 1970 @@ -696,100 +704,100 @@ void PSGPUWrapper::BuildPull(std::shared_ptr gpu_task) { b.tm_min = b.tm_hour = b.tm_sec = 0; std::time_t seconds_from_1970 = std::mktime(&b); int day_id = seconds_from_1970 / 86400; - fleet_ptr->pslib_ptr_->_worker_ptr->set_day_id(table_id_, day_id); + fleet_ptr_->pslib_ptr_->_worker_ptr->set_day_id(table_id_, day_id); #endif timeline.Start(); - auto ptl_dynamic_mf_func = - [this, &local_dim_keys, &local_dim_ptr, &fleet_ptr, &gpu_task](int i, - int j) { - size_t key_size = local_dim_keys[i][j].size(); - int32_t status = -1; - int32_t cnt = 0; + auto ptl_dynamic_mf_func = [this, &local_dim_keys, &local_dim_ptr, &gpu_task]( + int i, int j) { + size_t key_size = local_dim_keys[i][j].size(); + int32_t status = -1; + int32_t cnt = 0; #ifdef PADDLE_WITH_PSLIB - while (true) { - auto tt = fleet_ptr->pslib_ptr_->_worker_ptr->pull_sparse_ptr( - i, - reinterpret_cast(local_dim_ptr[i][j].data()), - this->table_id_, - local_dim_keys[i][j].data(), - key_size); - bool flag = true; - - tt.wait(); - - try { - status = tt.get(); - } catch (const std::future_error& e) { - VLOG(0) << "Caught a future_error with code" << e.code() - << ", Message:" << e.what(); - } - if (status != 0) { - VLOG(0) << "fleet pull sparse failed, status[" << status << "]"; - sleep(sleep_seconds_before_fail_exit_); - flag = false; - cnt++; - } - if (cnt > 3) { - VLOG(0) << "fleet pull sparse failed, retry 3 times"; - exit(-1); - } + while (true) { + auto tt = fleet_ptr_->pslib_ptr_->_worker_ptr->pull_sparse_ptr( + i, + reinterpret_cast(local_dim_ptr[i][j].data()), + this->table_id_, + local_dim_keys[i][j].data(), + key_size, + gpu_task->pass_id_); + bool flag = true; + + tt.wait(); + + try { + status = tt.get(); + } catch (const std::future_error& e) { + VLOG(0) << "Caught a future_error with code" << e.code() + << ", Message:" << e.what(); + } + if (status != 0) { + VLOG(0) << "fleet pull sparse failed, status[" << status << "]"; + sleep(sleep_seconds_before_fail_exit_); + flag = false; + cnt++; + } + if (cnt > 3) { + VLOG(0) << "fleet pull sparse failed, retry 3 times"; + exit(-1); + } - if (flag) { - break; - } - } + if (flag) { + break; + } + } #endif #ifdef PADDLE_WITH_PSCORE - while (true) { - auto tt = fleet_ptr->worker_ptr_->PullSparsePtr( - i, - reinterpret_cast(local_dim_ptr[i][j].data()), - this->table_id_, - local_dim_keys[i][j].data(), - key_size, - gpu_task->pass_id_, - j); - bool flag = true; - - tt.wait(); - - try { - status = tt.get(); - } catch (const std::future_error& e) { - VLOG(0) << "Caught a future_error with code" << e.code() - << ", Message:" << e.what(); - } - if (status != 0) { - VLOG(0) << "fleet pull sparse failed, status[" << status << "]"; - sleep(sleep_seconds_before_fail_exit_); - flag = false; - cnt++; - } - if (cnt > 3) { - VLOG(0) << "fleet pull sparse failed, retry 3 times"; - exit(-1); - } + while (true) { + auto tt = fleet_ptr_->worker_ptr_->PullSparsePtr( + i, + reinterpret_cast(local_dim_ptr[i][j].data()), + this->table_id_, + local_dim_keys[i][j].data(), + key_size, + gpu_task->pass_id_, + j); + bool flag = true; + + tt.wait(); + + try { + status = tt.get(); + } catch (const std::future_error& e) { + VLOG(0) << "Caught a future_error with code" << e.code() + << ", Message:" << e.what(); + } + if (status != 0) { + VLOG(0) << "fleet pull sparse failed, status[" << status << "]"; + sleep(sleep_seconds_before_fail_exit_); + flag = false; + cnt++; + } + if (cnt > 3) { + VLOG(0) << "fleet pull sparse failed, retry 3 times"; + exit(-1); + } - if (flag) { - break; - } - } + if (flag) { + break; + } + } #endif - if (status != 0) { - LOG(ERROR) << "fleet pull sparse failed, status[" << status << "]"; - sleep(300); - exit(-1); - } else { - VLOG(1) << "FleetWrapper Pull sparse to local done with table size: " - << local_dim_keys[i][j].size(); - } - if (multi_node_) { - // filter rank data - FilterPull(gpu_task, i, j); - } - }; + if (status != 0) { + LOG(ERROR) << "fleet pull sparse failed, status[" << status << "]"; + sleep(300); + exit(-1); + } else { + VLOG(1) << "FleetWrapper Pull sparse to local done with table size: " + << local_dim_keys[i][j].size(); + } + if (multi_node_) { + // filter rank data + FilterPull(gpu_task, i, j); + } + }; threads.resize(thread_keys_shard_num_ * multi_mf_dim_); @@ -838,7 +846,7 @@ void PSGPUWrapper::MergePull(std::shared_ptr gpu_task) { if (!multi_node_) { return; } -#ifdef PADDLE_WITH_GPU_GRAPH +#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH) platform::Timer timeline; timeline.Start(); // barrier @@ -853,10 +861,9 @@ void PSGPUWrapper::MergePull(std::shared_ptr gpu_task) { auto barrier_span = timeline.ElapsedSec(); timeline.Start(); - auto fleet_ptr = paddle::distributed::FleetWrapper::GetInstance(); std::vector> task_futures; for (int dim_id = 0; dim_id < multi_mf_dim_; ++dim_id) { - auto pass_values = fleet_ptr->worker_ptr_->TakePassSparseReferedValues( + auto pass_values = fleet_ptr_->worker_ptr_->TakePassSparseReferedValues( table_id_, gpu_task->pass_id_, dim_id); if (pass_values == nullptr) { continue; @@ -1225,56 +1232,6 @@ void PSGPUWrapper::PrepareGPUTask(std::shared_ptr gpu_task) { task_futures.clear(); } VLOG(0) << "prefix done"; - auto prepare_dev_value_func = [device_num, - &prefix_sum, - &device_keys, - &device_vals, - &device_task_keys, - &device_task_ptrs](int dev, int shard_id) { -#ifdef PADDLE_WITH_PSLIB - auto& task_ptrs = device_task_ptrs[shard_id]; - - for (int j = 0; j < len; ++j) { - device_keys[dev][cur + j] = task_keys[dev][j]; - float* ptr_val = task_ptrs[dev][j]->data(); - FeatureValue& val = device_vals[dev][cur + j]; - size_t dim = task_ptrs[dev][j]->size(); - - val.delta_score = ptr_val[1]; - val.show = ptr_val[2]; - val.clk = ptr_val[3]; - val.slot = ptr_val[6]; - val.lr = ptr_val[4]; - val.lr_g2sum = ptr_val[5]; - val.cpu_ptr = (uint64_t)(task_ptrs[dev][j]); - - if (dim > 7) { - val.mf_size = MF_DIM + 1; - for (int x = 0; x < val.mf_size; x++) { - val.mf[x] = ptr_val[x + 7]; - } - } else { - val.mf_size = 0; - for (int x = 0; x < MF_DIM + 1; x++) { - val.mf[x] = 0; - } - } - } -#endif - VLOG(3) << "GpuPs build hbmps done"; - }; - if (!multi_mf_dim_) { - for (int i = 0; i < thread_keys_shard_num_; i++) { - for (int j = 0; j < device_num; j++) { - task_futures.emplace_back( - hbm_thread_pool_[i]->enqueue(prepare_dev_value_func, j, i)); - } - } - for (auto& f : task_futures) { - f.wait(); - } - task_futures.clear(); - } timeline.Pause(); VLOG(0) << "passid=" << gpu_task->pass_id_ << ", GpuPs prepare for build hbm cost " << timeline.ElapsedSec() @@ -1322,90 +1279,57 @@ void PSGPUWrapper::BuildGPUTask(std::shared_ptr gpu_task) { << " s."; stagetime.Start(); - auto build_dynamic_mf_func = [this, &gpu_task, &accessor_wrapper_ptr]( - const int i, - const size_t tid, - const size_t once_gpu_copy) { - // VLOG(0) << "begin build_dynamic_mf_func tid=" << tid << ", i=" << i; - for (int j = 0; j < multi_mf_dim_; j++) { - auto& device_dim_ptrs = gpu_task->device_dim_ptr_[i][j]; - int mf_dim = this->index_dim_vec_[j]; - size_t feature_value_size = - accessor_wrapper_ptr->GetFeatureValueSize(mf_dim); - size_t len = device_dim_ptrs.size(); - size_t start = tid * once_gpu_copy; - while (start < len) { - size_t real_len = - (len - start) > once_gpu_copy ? once_gpu_copy : (len - start); - size_t end = start + real_len; - std::shared_ptr build_values( - new char[feature_value_size * real_len], - [](char* p) { delete[] p; }); - char* test_build_values = build_values.get(); - for (size_t k = start; k < end; k++) { + auto build_dynamic_mf_func = + [this, &gpu_task, &accessor_wrapper_ptr]( + const int i, const size_t tid, const size_t once_gpu_copy) { + // VLOG(0) << "begin build_dynamic_mf_func tid=" << tid << ", i=" << + // i; + for (int j = 0; j < multi_mf_dim_; j++) { + auto& device_dim_ptrs = gpu_task->device_dim_ptr_[i][j]; + int mf_dim = this->index_dim_vec_[j]; + size_t feature_value_size = + accessor_wrapper_ptr->GetFeatureValueSize(mf_dim); + size_t len = device_dim_ptrs.size(); + size_t start = tid * once_gpu_copy; + while (start < len) { + size_t real_len = + (len - start) > once_gpu_copy ? once_gpu_copy : (len - start); + size_t end = start + real_len; + std::shared_ptr build_values( + new char[feature_value_size * real_len], + [](char* p) { delete[] p; }); + char* test_build_values = build_values.get(); + for (size_t k = start; k < end; k++) { +#ifdef PADDLE_WITH_PSCORE + void* val = reinterpret_cast( + test_build_values + (k - start) * feature_value_size); + accessor_wrapper_ptr->BuildFill( + val, device_dim_ptrs[k], cpu_table_accessor_, mf_dim); +#endif #ifdef PADDLE_WITH_PSLIB - float* val = reinterpret_cast( - test_build_values + (k - start) * feature_value_size); - float* ptr_val = device_dim_ptrs[k]->data(); - size_t dim = device_dim_ptrs[k]->size(); - val->delta_score = - ptr_val[paddle::ps::DownpourCtrDymfAccessor:: - DownpourCtrDymfFeatureValue::delta_score_index()]; - val->show = ptr_val[paddle::ps::DownpourCtrDymfAccessor:: - DownpourCtrDymfFeatureValue::show_index()]; - val->clk = ptr_val[paddle::ps::DownpourCtrDymfAccessor:: - DownpourCtrDymfFeatureValue::click_index()]; - val->slot = static_cast( - ptr_val[paddle::ps::DownpourCtrDymfAccessor:: - DownpourCtrDymfFeatureValue::slot_index()]); - val->lr = ptr_val[paddle::ps::DownpourCtrDymfAccessor:: - DownpourCtrDymfFeatureValue::embed_w_index()]; - val->lr_g2sum = - ptr_val[paddle::ps::DownpourCtrDymfAccessor:: - DownpourCtrDymfFeatureValue::embed_g2sum_index()]; - // TODO(xuefeng) set mf_dim while using DownpourCtrDymfAccessor - ptr_val[paddle::ps::DownpourCtrDymfAccessor:: - DownpourCtrDymfFeatureValue::mf_dim_index()] = - static_cast(mf_dim); - val->mf_dim = mf_dim; - if (dim > 8) { // CpuPS alreay expand as mf_dim - val->mf_size = mf_dim + 1; - for (int x = 0; x < val->mf_dim + 1; x++) { - val->mf[x] = ptr_val[x + 8]; - } - } else { - val->mf_size = 0; - for (int x = 0; x < val->mf_dim + 1; x++) { - val->mf[x] = 0; + float* val = reinterpret_cast( + test_build_values + (k - start) * feature_value_size); + accessor_wrapper_ptr->BuildFill(val, + device_dim_ptrs[k], + cpu_table_accessor_, + mf_dim, + accessor_class_); +#endif } + task_info task; + task.build_values = build_values; + task.offset = start; + task.device_id = i; + task.multi_mf_dim = j; + task.start = 0; + task.end = static_cast(real_len); + cpu_reday_channels_[i]->Put(task); + // step + start = start + (once_gpu_copy * cpu_device_thread_num_); } - VLOG(5) << "build " << k << " : " - << feature_value_accessor_.ParseToString( - val, - feature_value_accessor_.common_feature_value.Dim( - mf_dim)); -#endif -#ifdef PADDLE_WITH_PSCORE - void* val = reinterpret_cast( - test_build_values + (k - start) * feature_value_size); - accessor_wrapper_ptr->BuildFill( - val, device_dim_ptrs[k], cpu_table_accessor_, mf_dim); -#endif } - task_info task; - task.build_values = build_values; - task.offset = start; - task.device_id = i; - task.multi_mf_dim = j; - task.start = 0; - task.end = static_cast(real_len); - cpu_reday_channels_[i]->Put(task); - // step - start = start + (once_gpu_copy * cpu_device_thread_num_); - } - } - // VLOG(0) << "end build_dynamic_mf_func tid=" << tid << ", i=" << i; - }; + // VLOG(0) << "end build_dynamic_mf_func tid=" << tid << ", i=" << i; + }; auto build_dymf_hbm_pool = [this, &gpu_task, @@ -1447,10 +1371,12 @@ void PSGPUWrapper::BuildGPUTask(std::shared_ptr gpu_task) { HeterPs_->show_one_table(i); } } + stagetime.Pause(); auto build_span = stagetime.ElapsedSec(); stagetime.Start(); +#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH) // build feature table if (slot_num_for_pull_feature_ > 0 && (FLAGS_gpugraph_storage_mode == paddle::framework::GpuGraphStorageMode:: @@ -1463,6 +1389,7 @@ void PSGPUWrapper::BuildGPUTask(std::shared_ptr gpu_task) { (std::vector*)gpu_task->sub_graph_feas; gpu_graph_ptr->build_gpu_graph_fea((*tmp)[i], i); } +#endif stagetime.Pause(); auto build_feature_span = stagetime.ElapsedSec(); @@ -1525,6 +1452,7 @@ void PSGPUWrapper::BuildGPUTask(std::shared_ptr gpu_task) { f.wait(); } gpu_task_futures.clear(); +#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH) if (FLAGS_gpugraph_storage_mode == paddle::framework::GpuGraphStorageMode:: MEM_EMB_FEATURE_AND_GPU_GRAPH || FLAGS_gpugraph_storage_mode == paddle::framework::GpuGraphStorageMode:: @@ -1534,6 +1462,7 @@ void PSGPUWrapper::BuildGPUTask(std::shared_ptr gpu_task) { delete tmp; gpu_task->sub_graph_feas = NULL; } +#endif stagetime.Pause(); VLOG(1) << " build_dymf_hbm_pool " << " cost " << stagetime.ElapsedSec() << " s."; @@ -1709,6 +1638,7 @@ void PSGPUWrapper::EndPass() { } void PSGPUWrapper::SparseTableToHbm() { +#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH) std::shared_ptr gpu_task = gpu_task_pool_.Get(); gpu_task->Reset(); size_t device_num = heter_devices_.size(); @@ -1739,6 +1669,7 @@ void PSGPUWrapper::SparseTableToHbm() { BuildGPUTask(gpu_task); current_task_ = gpu_task; hbm_sparse_table_initialized_ = true; +#endif } void PSGPUWrapper::HbmToSparseTable() { @@ -1824,22 +1755,22 @@ void PSGPUWrapper::HbmToSparseTable() { auto& device_keys = this->current_task_ ->device_dim_keys_[task.device_id][task.multi_mf_dim]; - char* test_build_values = task.build_values.get(); - int mf_dim = this->index_dim_vec_[task.multi_mf_dim]; - size_t feature_value_size = - accessor_wrapper_ptr->GetFeatureValueSize(mf_dim); uint64_t unuse_key = std::numeric_limits::max(); for (int i = task.start; i < task.end; ++i) { if (device_keys[i + task.offset] == unuse_key) { continue; } - size_t local_offset = i * feature_value_size; - float* gpu_val = - reinterpret_cast(test_build_values + local_offset); #ifdef PADDLE_WITH_PSLIB // TODO(lxsbupt): PSLIB DumpFill #endif #ifdef PADDLE_WITH_PSCORE + int mf_dim = this->index_dim_vec_[task.multi_mf_dim]; + char* test_build_values = task.build_values.get(); + size_t feature_value_size = + accessor_wrapper_ptr->GetFeatureValueSize(mf_dim); + size_t local_offset = i * feature_value_size; + float* gpu_val = + reinterpret_cast(test_build_values + local_offset); accessor_wrapper_ptr->DumpFill(gpu_val, cpu_table_accessor_, mf_dim); #endif } @@ -1886,9 +1817,11 @@ void PSGPUWrapper::HbmToSparseTable() { } void PSGPUWrapper::DumpToMem() { +#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH) if (FLAGS_gpugraph_storage_mode == GpuGraphStorageMode::WHOLE_HBM) { this->HbmToSparseTable(); } +#endif } void PSGPUWrapper::PullSparse(const paddle::platform::Place& place, diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h index 78136c9995..84076aad26 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h @@ -15,6 +15,7 @@ limitations under the License. */ #pragma once #ifdef PADDLE_WITH_HETERPS +#include #include #include #include @@ -35,7 +36,9 @@ limitations under the License. */ #include "paddle/fluid/distributed/ps/thirdparty/round_robin.h" #include "paddle/fluid/framework/channel.h" #include "paddle/fluid/framework/fleet/heter_context.h" +#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_GPU_GRAPH) #include "paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h" +#endif #include "paddle/fluid/framework/fleet/heter_ps/heter_ps_base.h" #include "paddle/fluid/framework/fleet/heter_ps/heter_resource.h" #include "paddle/fluid/framework/heter_util.h" @@ -59,9 +62,7 @@ limitations under the License. */ #include "paddle/fluid/distributed/the_one_ps.pb.h" #endif #ifdef PADDLE_WITH_PSLIB -#include "afs_api.h" // NOLINT -#endif -#ifdef PADDLE_WITH_PSLIB +#include "afs_api.h" // NOLINT #include "downpour_accessor.h" // NOLINT #endif #include "paddle/fluid/framework/fleet/heter_ps/log_patch.h" @@ -354,6 +355,14 @@ class PSGPUWrapper { // start build cpu&gpu ps thread start_build_thread(); } +#ifdef PADDLE_WITH_PSCORE + cpu_table_accessor_ = fleet_ptr_->worker_ptr_->GetTableAccessor(0); +#endif +#ifdef PADDLE_WITH_PSLIB + cpu_table_accessor_ = + fleet_ptr_->pslib_ptr_->_worker_ptr->table_accessor(0); +#endif + InitializeGPUServer(fleet_ptr_->GetDistDesc()); } void SetSparseSGD(float nonclk_coeff, @@ -425,7 +434,9 @@ class PSGPUWrapper { } } - void InitializeGPUServer(paddle::distributed::PSParameter ps_param) { + void InitializeGPUServer(const std::string& dist_desc) { + paddle::distributed::PSParameter ps_param; + google::protobuf::TextFormat::ParseFromString(dist_desc, &ps_param); auto sparse_table = ps_param.server_param().downpour_server_param().downpour_table_param(0); // set build thread_num and shard_num @@ -478,6 +489,176 @@ class PSGPUWrapper { } #endif +#ifdef PADDLE_WITH_PSLIB + void add_sparse_optimizer( + std::unordered_map& config, // NOLINT + const paddle::SparseCommonSGDRuleParameter& sgd_param, + const std::string& prefix = "") { + auto optimizer_name = sgd_param.name(); + if (optimizer_name == "naive") { + config[prefix + "optimizer_type"] = 0; + config[prefix + "learning_rate"] = sgd_param.naive().learning_rate(); + config[prefix + "initial_range"] = sgd_param.naive().initial_range(); + if (sgd_param.naive().weight_bounds_size() == 2) { + config[prefix + "min_bound"] = sgd_param.naive().weight_bounds()[0]; + config[prefix + "max_bound"] = sgd_param.naive().weight_bounds()[1]; + } + } else if (optimizer_name == "adagrad") { + config[prefix + "optimizer_type"] = 1; + config[prefix + "learning_rate"] = sgd_param.adagrad().learning_rate(); + config[prefix + "initial_range"] = sgd_param.adagrad().initial_range(); + config[prefix + "initial_g2sum"] = sgd_param.adagrad().initial_g2sum(); + if (sgd_param.adagrad().weight_bounds_size() == 2) { + config[prefix + "min_bound"] = sgd_param.adagrad().weight_bounds()[0]; + config[prefix + "max_bound"] = sgd_param.adagrad().weight_bounds()[1]; + } + } else if (optimizer_name == "std_adagrad") { + config[prefix + "optimizer_type"] = 2; + config[prefix + "learning_rate"] = sgd_param.adagrad().learning_rate(); + config[prefix + "initial_range"] = sgd_param.adagrad().initial_range(); + config[prefix + "initial_g2sum"] = sgd_param.adagrad().initial_g2sum(); + if (sgd_param.adagrad().weight_bounds_size() == 2) { + config[prefix + "min_bound"] = sgd_param.adagrad().weight_bounds()[0]; + config[prefix + "max_bound"] = sgd_param.adagrad().weight_bounds()[1]; + } + } else if (optimizer_name == "adam") { + config[prefix + "optimizer_type"] = 3; + config[prefix + "learning_rate"] = sgd_param.adam().learning_rate(); + config[prefix + "initial_range"] = sgd_param.adam().initial_range(); + if (sgd_param.adam().weight_bounds_size() == 2) { + config[prefix + "min_bound"] = sgd_param.adam().weight_bounds()[0]; + config[prefix + "max_bound"] = sgd_param.adam().weight_bounds()[1]; + } + } + } + + void InitializeGPUServer(const std::string& dist_desc) { + // optimizer config for hbmps + paddle::PSParameter ps_param; + google::protobuf::TextFormat::ParseFromString(dist_desc, &ps_param); + auto sparse_table = + ps_param.server_param().downpour_server_param().downpour_table_param(0); + auto sparse_table_accessor = sparse_table.accessor(); + auto sparse_table_accessor_parameter = + sparse_table_accessor.downpour_accessor_param(); + accessor_class_ = sparse_table_accessor.accessor_class(); + + // NOTE(zhangminxu): gpups' sparse table optimizer config, + // now only support single sparse table + // auto sparse_table = param_.sparse_table(0); + std::unordered_map config; + if (accessor_class_ == "DownpourFeatureValueAccessor" || + accessor_class_ == "DownpourCtrAccessor" || + accessor_class_ == "DownpourCtrDoubleAccessor") { + config["nonclk_coeff"] = sparse_table_accessor_parameter.nonclk_coeff(); + config["clk_coeff"] = sparse_table_accessor_parameter.click_coeff(); + config["learning_rate"] = + sparse_table_accessor.sparse_sgd_param().learning_rate(); + config["initial_g2sum"] = + sparse_table_accessor.sparse_sgd_param().initial_g2sum(); + config["initial_range"] = + sparse_table_accessor.sparse_sgd_param().initial_range(); + if (sparse_table_accessor.sparse_sgd_param().weight_bounds_size() == 2) { + config["min_bound"] = + sparse_table_accessor.sparse_sgd_param().weight_bounds()[0]; + config["max_bound"] = + sparse_table_accessor.sparse_sgd_param().weight_bounds()[1]; + } + // NOTE(zhangminxu): for DownpourCtrAccessor & DownpourCtrDoubleAccessor, + // optimizer config for embed_w & embedx_w is the same + config["mf_create_thresholds"] = + sparse_table_accessor.embedx_threshold(); // default = 10 + config["mf_learning_rate"] = config["learning_rate"]; + config["mf_initial_g2sum"] = config["initial_g2sum"]; + config["mf_initial_range"] = config["initial_range"]; + config["mf_min_bound"] = config["min_bound"]; + config["mf_max_bound"] = config["max_bound"]; + config["mf_embedx_dim"] = + sparse_table_accessor.embedx_dim(); // default = 8 + + } else if (accessor_class_ == "DownpourSparseValueAccessor") { + auto optimizer_name = + sparse_table_accessor.sparse_commonsgd_param().name(); + if (optimizer_name == "naive") { + config["learning_rate"] = sparse_table_accessor.sparse_commonsgd_param() + .naive() + .learning_rate(); + config["initial_range"] = sparse_table_accessor.sparse_commonsgd_param() + .naive() + .initial_range(); + if (sparse_table_accessor.sparse_commonsgd_param() + .naive() + .weight_bounds_size() == 2) { + config["min_bound"] = sparse_table_accessor.sparse_commonsgd_param() + .naive() + .weight_bounds()[0]; + config["max_bound"] = sparse_table_accessor.sparse_commonsgd_param() + .naive() + .weight_bounds()[1]; + } + } else if (optimizer_name == "adagrad") { + config["learning_rate"] = sparse_table_accessor.sparse_commonsgd_param() + .adagrad() + .learning_rate(); + config["initial_range"] = sparse_table_accessor.sparse_commonsgd_param() + .adagrad() + .initial_range(); + config["initial_g2sum"] = sparse_table_accessor.sparse_commonsgd_param() + .adagrad() + .initial_g2sum(); + if (sparse_table_accessor.sparse_commonsgd_param() + .adagrad() + .weight_bounds_size() == 2) { + config["min_bound"] = sparse_table_accessor.sparse_commonsgd_param() + .adagrad() + .weight_bounds()[0]; + config["max_bound"] = sparse_table_accessor.sparse_commonsgd_param() + .adagrad() + .weight_bounds()[1]; + } + } else if (optimizer_name == "adam") { + config["learning_rate"] = sparse_table_accessor.sparse_commonsgd_param() + .adam() + .learning_rate(); + config["initial_range"] = sparse_table_accessor.sparse_commonsgd_param() + .adam() + .initial_range(); + if (sparse_table_accessor.sparse_commonsgd_param() + .adam() + .weight_bounds_size() == 2) { + config["min_bound"] = sparse_table_accessor.sparse_commonsgd_param() + .adam() + .weight_bounds()[0]; + config["max_bound"] = sparse_table_accessor.sparse_commonsgd_param() + .adam() + .weight_bounds()[1]; + } + } + } else if (accessor_class_ == "DownpourUnitAccessor" || + accessor_class_ == "DownpourDoubleUnitAccessor" || + accessor_class_ == "DownpourCtrDymfAccessor" || + accessor_class_ == "DownpourCtrDoubleDymfAccessor") { + config["nonclk_coeff"] = sparse_table_accessor_parameter.nonclk_coeff(); + config["clk_coeff"] = sparse_table_accessor_parameter.click_coeff(); + config["mf_create_thresholds"] = sparse_table_accessor.embedx_threshold(); + // optimizer config for embed_w and embedx + add_sparse_optimizer(config, sparse_table_accessor.embed_sgd_param()); + add_sparse_optimizer( + config, sparse_table_accessor.embedx_sgd_param(), "mf_"); + config["mf_embedx_dim"] = + sparse_table_accessor.embedx_dim(); // default = 8 + } + config["sparse_shard_num"] = sparse_table.shard_num(); + + GlobalAccessorFactory::GetInstance().Init(accessor_class_); + + GlobalAccessorFactory::GetInstance().GetAccessorWrapper()->Configure( + config); + + InitializeGPUServer(config); + } +#endif + void InitializeGPUServer(std::unordered_map config) { float nonclk_coeff = (config.find("nonclk_coeff") == config.end()) ? 1.0 @@ -708,11 +889,6 @@ class PSGPUWrapper { const std::string& conf); #endif -#ifdef PADDLE_WITH_PSCORE - void SetTableAccessor(paddle::distributed::ValueAccessor* accessor) { - cpu_table_accessor_ = accessor; - } -#endif // for node rank int PartitionKeyForRank(const uint64_t& key) { return ((key / device_num_) % node_size_); @@ -782,9 +958,16 @@ class PSGPUWrapper { std::string accessor_class_; std::unordered_map fleet_config_; #ifdef PADDLE_WITH_PSCORE + std::shared_ptr fleet_ptr_ = + paddle::distributed::FleetWrapper::GetInstance(); paddle::distributed::ValueAccessor* cpu_table_accessor_; #endif +#ifdef PADDLE_WITH_PSLIB + std::shared_ptr fleet_ptr_ = FleetWrapper::GetInstance(); + paddle::ps::ValueAccessor* cpu_table_accessor_; +#endif + #ifdef PADDLE_WITH_CUDA std::vector mem_pools_; std::vector hbm_pools_; // in multi mfdim, one table need diff --git a/paddle/fluid/framework/ps_gpu_worker.cc b/paddle/fluid/framework/ps_gpu_worker.cc index c59a14e96b..930d8d7d52 100644 --- a/paddle/fluid/framework/ps_gpu_worker.cc +++ b/paddle/fluid/framework/ps_gpu_worker.cc @@ -14,6 +14,7 @@ limitations under the License. */ #include "paddle/fluid/framework/device_worker.h" #include "paddle/fluid/framework/device_worker_factory.h" +#include "paddle/fluid/operators/isfinite_op.h" #include "paddle/fluid/platform/cpu_helper.h" #include "paddle/fluid/platform/lodtensor_printer.h" #include "paddle/fluid/string/string_helper.h" diff --git a/paddle/fluid/framework/trainer.h b/paddle/fluid/framework/trainer.h index 27727440c4..08696e4112 100644 --- a/paddle/fluid/framework/trainer.h +++ b/paddle/fluid/framework/trainer.h @@ -36,10 +36,6 @@ limitations under the License. */ #include "paddle/fluid/operators/reader/blocking_queue.h" #include "paddle/phi/backends/dynload/port.h" -#ifdef PADDLE_WITH_PSLIB -#include "proto/the_one_ps.pb.h" -#endif - namespace paddle { namespace framework { -- GitLab