From 8f7f3ac9f2a0209959d0fe3bd8c8f50744f03b64 Mon Sep 17 00:00:00 2001 From: danleifeng <52735331+danleifeng@users.noreply.github.com> Date: Thu, 26 May 2022 12:42:32 +0800 Subject: [PATCH] [GPUPS]fix dymf gpups pscore (#42991) --- paddle/fluid/framework/data_set.cc | 9 +- .../fleet/heter_ps/hashtable_kernel.cu | 11 ++- .../fluid/framework/fleet/ps_gpu_wrapper.cc | 92 +++++++++++++++++-- .../distributed/passes/ps_trainer_pass.py | 6 +- .../fleet/parameter_server/ir/trainer_pass.py | 8 +- 5 files changed, 101 insertions(+), 25 deletions(-) diff --git a/paddle/fluid/framework/data_set.cc b/paddle/fluid/framework/data_set.cc index de563330d6..0c762ab2e7 100644 --- a/paddle/fluid/framework/data_set.cc +++ b/paddle/fluid/framework/data_set.cc @@ -320,12 +320,11 @@ static int compute_thread_batch_nccl( thread_avg_batch_num = static_cast(offset.size() / thr_num); #ifdef PADDLE_WITH_GLOO auto gloo_wrapper = paddle::framework::GlooWrapper::GetInstance(); - if (!gloo_wrapper->IsInitialized()) { - VLOG(0) << "GLOO is not inited"; - gloo_wrapper->Init(); - } - if (gloo_wrapper->Size() > 1) { + if (!gloo_wrapper->IsInitialized()) { + VLOG(0) << "GLOO is not inited"; + gloo_wrapper->Init(); + } // adjust batch num per thread for NCCL std::vector thread_avg_batch_num_vec(1, thread_avg_batch_num); std::vector total_instance_num_vec(1, total_instance_num); diff --git a/paddle/fluid/framework/fleet/heter_ps/hashtable_kernel.cu b/paddle/fluid/framework/fleet/heter_ps/hashtable_kernel.cu index f5807d2fd7..6b0141f546 100644 --- a/paddle/fluid/framework/fleet/heter_ps/hashtable_kernel.cu +++ b/paddle/fluid/framework/fleet/heter_ps/hashtable_kernel.cu @@ -341,6 +341,8 @@ template class HashTable; template class HashTable; template class HashTable; template class HashTable; +template class HashTable; +template class HashTable; template class HashTable; template class HashTable; template class HashTable; @@ -367,6 +369,8 @@ template void HashTable::get(const long* d_keys, cudaStream_t stream); template void HashTable::get( const long* d_keys, unsigned int* d_vals, size_t len, cudaStream_t stream); +template void HashTable::get( + const unsigned long* d_keys, long* d_vals, size_t len, cudaStream_t stream); // template void // HashTable::get( // const unsigned long* d_keys, char* d_vals, size_t len, cudaStream_t @@ -402,10 +406,9 @@ template void HashTable::insert( const long* d_keys, const unsigned int* d_vals, size_t len, cudaStream_t stream); -// template void HashTable::insert< -// cudaStream_t>(const unsigned long* d_keys, size_t len, char* pool, -// size_t start_index, cudaStream_t stream); +template void HashTable::insert( + const unsigned long* d_keys, const long* d_vals, size_t len, + cudaStream_t stream); template void HashTable:: dump_to_cpu(int devid, cudaStream_t stream); diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc index 18eec174fe..ac08e37aec 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc @@ -28,11 +28,16 @@ limitations under the License. */ #ifdef PADDLE_WITH_HETERPS +#include "paddle/fluid/framework/fleet/ps_gpu_wrapper.h" + #include #include -#include "paddle/fluid/framework/fleet/ps_gpu_wrapper.h" #include "paddle/fluid/platform/timer.h" +#if defined(PADDLE_WITH_PSCORE) +#include "paddle/fluid/distributed/ps/table/ctr_dymf_accessor.h" +#include "paddle/fluid/distributed/ps/table/depends/feature_value.h" +#endif namespace paddle { namespace framework { @@ -292,10 +297,10 @@ void PSGPUWrapper::BuildPull(std::shared_ptr gpu_task) { auto ptl_dynamic_mf_func = [this, &local_dim_keys, &local_dim_ptr, &fleet_ptr](int i, int j) { -#ifdef PADDLE_WITH_PSLIB size_t key_size = local_dim_keys[i][j].size(); int32_t status = -1; int32_t cnt = 0; +#ifdef PADDLE_WITH_PSLIB while (true) { auto tt = fleet_ptr->pslib_ptr_->_worker_ptr->pull_sparse_ptr( i, reinterpret_cast(local_dim_ptr[i][j].data()), @@ -325,6 +330,38 @@ void PSGPUWrapper::BuildPull(std::shared_ptr gpu_task) { break; } } +#endif +#ifdef PADDLE_WITH_PSCORE + while (true) { + auto tt = fleet_ptr->worker_ptr_->PullSparsePtr( + reinterpret_cast(local_dim_ptr[i][j].data()), this->table_id_, + local_dim_keys[i][j].data(), key_size); + bool flag = true; + + tt.wait(); + + try { + status = tt.get(); + } catch (const std::future_error& e) { + VLOG(0) << "Caught a future_error with code" << e.code() + << ", Message:" << e.what(); + } + if (status != 0) { + VLOG(0) << "fleet pull sparse failed, status[" << status << "]"; + sleep(sleep_seconds_before_fail_exit_); + flag = false; + cnt++; + } + if (cnt > 3) { + VLOG(0) << "fleet pull sparse failed, retry 3 times"; + exit(-1); + } + + if (flag) { + break; + } + } +#endif if (status != 0) { LOG(ERROR) << "fleet pull sparse failed, status[" << status << "]"; sleep(300); @@ -333,7 +370,6 @@ void PSGPUWrapper::BuildPull(std::shared_ptr gpu_task) { VLOG(0) << "FleetWrapper Pull sparse to local done with table size: " << local_dim_keys[i][j].size(); } -#endif }; threads.resize(thread_keys_shard_num_ * multi_mf_dim_); @@ -369,10 +405,16 @@ void PSGPUWrapper::BuildPull(std::shared_ptr gpu_task) { &local_dim_ptr, &device_dim_keys, &device_dim_ptr, &device_dim_mutex](int i, int j) { -#ifdef PADDLE_WITH_PSLIB std::vector> task_keys(device_num); +#ifdef PADDLE_WITH_PSLIB std::vector> task_ptrs( device_num); +#endif + +#ifdef PADDLE_WITH_PSCORE + std::vector> task_ptrs( + device_num); +#endif for (size_t k = 0; k < local_dim_keys[i][j].size(); k++) { int shard = local_dim_keys[i][j][k] % device_num; task_keys[shard].push_back(local_dim_keys[i][j][k]); @@ -391,7 +433,6 @@ void PSGPUWrapper::BuildPull(std::shared_ptr gpu_task) { } device_dim_mutex[dev][j]->unlock(); } -#endif }; auto build_func = [device_num, record_status, &pass_values, &local_keys, &local_ptr, &device_task_keys, &device_task_ptrs](int i) { @@ -629,12 +670,26 @@ void PSGPUWrapper::BuildGPUTask(std::shared_ptr gpu_task) { val->lr_g2sum = ptr_val[paddle::ps::DownpourCtrDymfAccessor:: DownpourCtrDymfFeatureValue::embed_g2sum_index()]; - val->cpu_ptr = (uint64_t)(device_dim_ptrs[k]); - // TODO(xuefeng) set mf_dim while using DownpourCtrDymfAccessor ptr_val[paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue:: mf_dim_index()] = float(mf_dim); val->mf_dim = mf_dim; +#endif +#ifdef PADDLE_WITH_PSCORE + paddle::distributed::CtrDymfAccessor accessor; + val->delta_score = + ptr_val[accessor.common_feature_value.DeltaScoreIndex()]; + val->show = ptr_val[accessor.common_feature_value.ShowIndex()]; + val->clk = ptr_val[accessor.common_feature_value.ClickIndex()]; + val->slot = int(ptr_val[accessor.common_feature_value.SlotIndex()]); + val->lr = ptr_val[accessor.common_feature_value.EmbedWIndex()]; + val->lr_g2sum = ptr_val[accessor.common_feature_value.EmbedG2SumIndex()]; + + val->cpu_ptr = (uint64_t)(device_dim_ptrs[k]); + + // TODO(xuefeng) set mf_dim while using DownpourCtrDymfAccessor + ptr_val[accessor.common_feature_value.MfDimIndex()] = float(mf_dim); + val->mf_dim = mf_dim; #endif if (dim > 8) { // CpuPS alreay expand as mf_dim val->mf_size = mf_dim + 1; @@ -802,7 +857,6 @@ void PSGPUWrapper::EndPass() { cudaMemcpyDeviceToHost); CHECK(len == hbm_pool->capacity()); -#ifdef PADDLE_WITH_PSLIB uint64_t unuse_key = std::numeric_limits::max(); for (size_t i = 0; i < len; ++i) { if (device_keys[i] == unuse_key) { @@ -810,6 +864,7 @@ void PSGPUWrapper::EndPass() { } size_t offset = i * feature_value_size; FeatureValue* gpu_val = (FeatureValue*)(test_build_values + offset); +#ifdef PADDLE_WITH_PSLIB auto* downpour_value = (paddle::ps::DownpourFixedFeatureValue*)(gpu_val->cpu_ptr); int downpour_value_size = downpour_value->size(); @@ -829,13 +884,32 @@ void PSGPUWrapper::EndPass() { embed_g2sum_index()] = gpu_val->lr_g2sum; cpu_val[paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue:: slot_index()] = gpu_val->slot; +#endif +#ifdef PADDLE_WITH_PSCORE + auto* downpour_value = + (paddle::distributed::FixedFeatureValue*)(gpu_val->cpu_ptr); + int downpour_value_size = downpour_value->size(); + if (gpu_val->mf_size > 0 && downpour_value_size == 8) { + downpour_value->resize(gpu_val->mf_dim + 1 + downpour_value_size); + } + float* cpu_val = downpour_value->data(); + + paddle::distributed::CtrDymfAccessor accessor; + cpu_val[accessor.common_feature_value.DeltaScoreIndex()] = + gpu_val->delta_score; + cpu_val[accessor.common_feature_value.ShowIndex()] = gpu_val->show; + cpu_val[accessor.common_feature_value.ClickIndex()] = gpu_val->clk; + cpu_val[accessor.common_feature_value.EmbedWIndex()] = gpu_val->lr; + cpu_val[accessor.common_feature_value.EmbedG2SumIndex()] = + gpu_val->lr_g2sum; + cpu_val[accessor.common_feature_value.SlotIndex()] = gpu_val->slot; +#endif if (gpu_val->mf_size > 0) { for (int x = 0; x < gpu_val->mf_dim + 1; x++) { cpu_val[x + 8] = gpu_val->mf[x]; } } } -#endif free(test_build_values); }; if (multi_mf_dim_) { diff --git a/python/paddle/distributed/passes/ps_trainer_pass.py b/python/paddle/distributed/passes/ps_trainer_pass.py index 0792a1eddc..6112a9a1f4 100755 --- a/python/paddle/distributed/passes/ps_trainer_pass.py +++ b/python/paddle/distributed/passes/ps_trainer_pass.py @@ -375,12 +375,12 @@ class DistributedOpsPass(PassBase): if attrs['use_ps_gpu']: _program.global_block()._insert_op( index=distributed_idx, - type="pull_box_sparse", + type="pull_gpups_sparse", inputs={"Ids": inputs, 'W': w}, outputs={"Out": outputs}, attrs={ - "size": w.shape[1], + "size": [w.shape[1] for i in inputs], "is_distributed": True, "is_sparse": True }) @@ -679,7 +679,7 @@ class PsGpuPass(PassBase): lookup_table_grad_var[name] = 1 for idx, op in list(enumerate(program.global_block().ops)): - if op.type == "pull_box_sparse": + if op.type == "pull_box_sparse" or op.type == "pull_gpups_sparse": continue for key_name in op.input_names: for var in op.input(key_name): diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py b/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py index 2c09abac9e..51e89cc301 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py @@ -293,12 +293,12 @@ def distributed_ops_pass(program, config, use_ps_gpu=False): if use_ps_gpu: program.global_block()._insert_op( index=distributed_idx, - type="pull_box_sparse", + type="pull_gpups_sparse", inputs={"Ids": inputs, 'W': w}, outputs={"Out": outputs}, attrs={ - "size": w.shape[1], + "size": [w.shape[1] for i in inputs], "is_distributed": True, "is_sparse": True }) @@ -576,7 +576,7 @@ def ps_gpu_pass(program): op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName() backward = core.op_proto_and_checker_maker.OpRole.Backward for op in program.global_block().ops: - if op.type != "pull_box_sparse": + if op.type != "pull_box_sparse" and op.type != "pull_gpups_sparse": continue grad_op_desc, op_grad_to_var = core.get_grad_op_desc( op.desc, cpt.to_text(set()), []) @@ -599,7 +599,7 @@ def ps_gpu_pass(program): lookup_table_grad_var[name] = 1 for idx, op in list(enumerate(program.global_block().ops)): - if op.type == "pull_box_sparse": + if op.type == "pull_box_sparse" or op.type == "pull_gpups_sparse": continue for key_name in op.input_names: for var in op.input(key_name): -- GitLab