未验证 提交 8f7f3ac9 编写于 作者: D danleifeng 提交者: GitHub

[GPUPS]fix dymf gpups pscore (#42991)

上级 52ff3f48
...@@ -320,12 +320,11 @@ static int compute_thread_batch_nccl( ...@@ -320,12 +320,11 @@ static int compute_thread_batch_nccl(
thread_avg_batch_num = static_cast<int>(offset.size() / thr_num); thread_avg_batch_num = static_cast<int>(offset.size() / thr_num);
#ifdef PADDLE_WITH_GLOO #ifdef PADDLE_WITH_GLOO
auto gloo_wrapper = paddle::framework::GlooWrapper::GetInstance(); auto gloo_wrapper = paddle::framework::GlooWrapper::GetInstance();
if (!gloo_wrapper->IsInitialized()) {
VLOG(0) << "GLOO is not inited";
gloo_wrapper->Init();
}
if (gloo_wrapper->Size() > 1) { if (gloo_wrapper->Size() > 1) {
if (!gloo_wrapper->IsInitialized()) {
VLOG(0) << "GLOO is not inited";
gloo_wrapper->Init();
}
// adjust batch num per thread for NCCL // adjust batch num per thread for NCCL
std::vector<int> thread_avg_batch_num_vec(1, thread_avg_batch_num); std::vector<int> thread_avg_batch_num_vec(1, thread_avg_batch_num);
std::vector<int64_t> total_instance_num_vec(1, total_instance_num); std::vector<int64_t> total_instance_num_vec(1, total_instance_num);
......
...@@ -341,6 +341,8 @@ template class HashTable<unsigned long, paddle::framework::FeatureValue*>; ...@@ -341,6 +341,8 @@ template class HashTable<unsigned long, paddle::framework::FeatureValue*>;
template class HashTable<long, int>; template class HashTable<long, int>;
template class HashTable<unsigned long, int>; template class HashTable<unsigned long, int>;
template class HashTable<unsigned long, unsigned long>; template class HashTable<unsigned long, unsigned long>;
template class HashTable<unsigned long, long>;
template class HashTable<unsigned long, long*>;
template class HashTable<long, long>; template class HashTable<long, long>;
template class HashTable<long, unsigned long>; template class HashTable<long, unsigned long>;
template class HashTable<long, unsigned int>; template class HashTable<long, unsigned int>;
...@@ -367,6 +369,8 @@ template void HashTable<long, long>::get<cudaStream_t>(const long* d_keys, ...@@ -367,6 +369,8 @@ template void HashTable<long, long>::get<cudaStream_t>(const long* d_keys,
cudaStream_t stream); cudaStream_t stream);
template void HashTable<long, unsigned int>::get<cudaStream_t>( template void HashTable<long, unsigned int>::get<cudaStream_t>(
const long* d_keys, unsigned int* d_vals, size_t len, cudaStream_t stream); const long* d_keys, unsigned int* d_vals, size_t len, cudaStream_t stream);
template void HashTable<unsigned long, long>::get<cudaStream_t>(
const unsigned long* d_keys, long* d_vals, size_t len, cudaStream_t stream);
// template void // template void
// HashTable<unsigned long, paddle::framework::FeatureValue>::get<cudaStream_t>( // HashTable<unsigned long, paddle::framework::FeatureValue>::get<cudaStream_t>(
// const unsigned long* d_keys, char* d_vals, size_t len, cudaStream_t // const unsigned long* d_keys, char* d_vals, size_t len, cudaStream_t
...@@ -402,10 +406,9 @@ template void HashTable<long, unsigned int>::insert<cudaStream_t>( ...@@ -402,10 +406,9 @@ template void HashTable<long, unsigned int>::insert<cudaStream_t>(
const long* d_keys, const unsigned int* d_vals, size_t len, const long* d_keys, const unsigned int* d_vals, size_t len,
cudaStream_t stream); cudaStream_t stream);
// template void HashTable<unsigned long, template void HashTable<unsigned long, long>::insert<cudaStream_t>(
// paddle::framework::FeatureValue>::insert< const unsigned long* d_keys, const long* d_vals, size_t len,
// cudaStream_t>(const unsigned long* d_keys, size_t len, char* pool, cudaStream_t stream);
// size_t start_index, cudaStream_t stream);
template void HashTable<unsigned long, paddle::framework::FeatureValue>:: template void HashTable<unsigned long, paddle::framework::FeatureValue>::
dump_to_cpu<cudaStream_t>(int devid, cudaStream_t stream); dump_to_cpu<cudaStream_t>(int devid, cudaStream_t stream);
......
...@@ -28,11 +28,16 @@ limitations under the License. */ ...@@ -28,11 +28,16 @@ limitations under the License. */
#ifdef PADDLE_WITH_HETERPS #ifdef PADDLE_WITH_HETERPS
#include "paddle/fluid/framework/fleet/ps_gpu_wrapper.h"
#include <algorithm> #include <algorithm>
#include <deque> #include <deque>
#include "paddle/fluid/framework/fleet/ps_gpu_wrapper.h"
#include "paddle/fluid/platform/timer.h" #include "paddle/fluid/platform/timer.h"
#if defined(PADDLE_WITH_PSCORE)
#include "paddle/fluid/distributed/ps/table/ctr_dymf_accessor.h"
#include "paddle/fluid/distributed/ps/table/depends/feature_value.h"
#endif
namespace paddle { namespace paddle {
namespace framework { namespace framework {
...@@ -292,10 +297,10 @@ void PSGPUWrapper::BuildPull(std::shared_ptr<HeterContext> gpu_task) { ...@@ -292,10 +297,10 @@ void PSGPUWrapper::BuildPull(std::shared_ptr<HeterContext> gpu_task) {
auto ptl_dynamic_mf_func = [this, &local_dim_keys, &local_dim_ptr, auto ptl_dynamic_mf_func = [this, &local_dim_keys, &local_dim_ptr,
&fleet_ptr](int i, int j) { &fleet_ptr](int i, int j) {
#ifdef PADDLE_WITH_PSLIB
size_t key_size = local_dim_keys[i][j].size(); size_t key_size = local_dim_keys[i][j].size();
int32_t status = -1; int32_t status = -1;
int32_t cnt = 0; int32_t cnt = 0;
#ifdef PADDLE_WITH_PSLIB
while (true) { while (true) {
auto tt = fleet_ptr->pslib_ptr_->_worker_ptr->pull_sparse_ptr( auto tt = fleet_ptr->pslib_ptr_->_worker_ptr->pull_sparse_ptr(
i, reinterpret_cast<char**>(local_dim_ptr[i][j].data()), i, reinterpret_cast<char**>(local_dim_ptr[i][j].data()),
...@@ -325,6 +330,38 @@ void PSGPUWrapper::BuildPull(std::shared_ptr<HeterContext> gpu_task) { ...@@ -325,6 +330,38 @@ void PSGPUWrapper::BuildPull(std::shared_ptr<HeterContext> gpu_task) {
break; break;
} }
} }
#endif
#ifdef PADDLE_WITH_PSCORE
while (true) {
auto tt = fleet_ptr->worker_ptr_->PullSparsePtr(
reinterpret_cast<char**>(local_dim_ptr[i][j].data()), this->table_id_,
local_dim_keys[i][j].data(), key_size);
bool flag = true;
tt.wait();
try {
status = tt.get();
} catch (const std::future_error& e) {
VLOG(0) << "Caught a future_error with code" << e.code()
<< ", Message:" << e.what();
}
if (status != 0) {
VLOG(0) << "fleet pull sparse failed, status[" << status << "]";
sleep(sleep_seconds_before_fail_exit_);
flag = false;
cnt++;
}
if (cnt > 3) {
VLOG(0) << "fleet pull sparse failed, retry 3 times";
exit(-1);
}
if (flag) {
break;
}
}
#endif
if (status != 0) { if (status != 0) {
LOG(ERROR) << "fleet pull sparse failed, status[" << status << "]"; LOG(ERROR) << "fleet pull sparse failed, status[" << status << "]";
sleep(300); sleep(300);
...@@ -333,7 +370,6 @@ void PSGPUWrapper::BuildPull(std::shared_ptr<HeterContext> gpu_task) { ...@@ -333,7 +370,6 @@ void PSGPUWrapper::BuildPull(std::shared_ptr<HeterContext> gpu_task) {
VLOG(0) << "FleetWrapper Pull sparse to local done with table size: " VLOG(0) << "FleetWrapper Pull sparse to local done with table size: "
<< local_dim_keys[i][j].size(); << local_dim_keys[i][j].size();
} }
#endif
}; };
threads.resize(thread_keys_shard_num_ * multi_mf_dim_); threads.resize(thread_keys_shard_num_ * multi_mf_dim_);
...@@ -369,10 +405,16 @@ void PSGPUWrapper::BuildPull(std::shared_ptr<HeterContext> gpu_task) { ...@@ -369,10 +405,16 @@ void PSGPUWrapper::BuildPull(std::shared_ptr<HeterContext> gpu_task) {
&local_dim_ptr, &device_dim_keys, &local_dim_ptr, &device_dim_keys,
&device_dim_ptr, &device_dim_ptr,
&device_dim_mutex](int i, int j) { &device_dim_mutex](int i, int j) {
#ifdef PADDLE_WITH_PSLIB
std::vector<std::vector<FeatureKey>> task_keys(device_num); std::vector<std::vector<FeatureKey>> task_keys(device_num);
#ifdef PADDLE_WITH_PSLIB
std::vector<std::vector<paddle::ps::DownpourFixedFeatureValue*>> task_ptrs( std::vector<std::vector<paddle::ps::DownpourFixedFeatureValue*>> task_ptrs(
device_num); device_num);
#endif
#ifdef PADDLE_WITH_PSCORE
std::vector<std::vector<paddle::distributed::FixedFeatureValue*>> task_ptrs(
device_num);
#endif
for (size_t k = 0; k < local_dim_keys[i][j].size(); k++) { for (size_t k = 0; k < local_dim_keys[i][j].size(); k++) {
int shard = local_dim_keys[i][j][k] % device_num; int shard = local_dim_keys[i][j][k] % device_num;
task_keys[shard].push_back(local_dim_keys[i][j][k]); task_keys[shard].push_back(local_dim_keys[i][j][k]);
...@@ -391,7 +433,6 @@ void PSGPUWrapper::BuildPull(std::shared_ptr<HeterContext> gpu_task) { ...@@ -391,7 +433,6 @@ void PSGPUWrapper::BuildPull(std::shared_ptr<HeterContext> gpu_task) {
} }
device_dim_mutex[dev][j]->unlock(); device_dim_mutex[dev][j]->unlock();
} }
#endif
}; };
auto build_func = [device_num, record_status, &pass_values, &local_keys, auto build_func = [device_num, record_status, &pass_values, &local_keys,
&local_ptr, &device_task_keys, &device_task_ptrs](int i) { &local_ptr, &device_task_keys, &device_task_ptrs](int i) {
...@@ -629,12 +670,26 @@ void PSGPUWrapper::BuildGPUTask(std::shared_ptr<HeterContext> gpu_task) { ...@@ -629,12 +670,26 @@ void PSGPUWrapper::BuildGPUTask(std::shared_ptr<HeterContext> gpu_task) {
val->lr_g2sum = val->lr_g2sum =
ptr_val[paddle::ps::DownpourCtrDymfAccessor:: ptr_val[paddle::ps::DownpourCtrDymfAccessor::
DownpourCtrDymfFeatureValue::embed_g2sum_index()]; DownpourCtrDymfFeatureValue::embed_g2sum_index()];
val->cpu_ptr = (uint64_t)(device_dim_ptrs[k]);
// TODO(xuefeng) set mf_dim while using DownpourCtrDymfAccessor // TODO(xuefeng) set mf_dim while using DownpourCtrDymfAccessor
ptr_val[paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue:: ptr_val[paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::
mf_dim_index()] = float(mf_dim); mf_dim_index()] = float(mf_dim);
val->mf_dim = mf_dim; val->mf_dim = mf_dim;
#endif
#ifdef PADDLE_WITH_PSCORE
paddle::distributed::CtrDymfAccessor accessor;
val->delta_score =
ptr_val[accessor.common_feature_value.DeltaScoreIndex()];
val->show = ptr_val[accessor.common_feature_value.ShowIndex()];
val->clk = ptr_val[accessor.common_feature_value.ClickIndex()];
val->slot = int(ptr_val[accessor.common_feature_value.SlotIndex()]);
val->lr = ptr_val[accessor.common_feature_value.EmbedWIndex()];
val->lr_g2sum = ptr_val[accessor.common_feature_value.EmbedG2SumIndex()];
val->cpu_ptr = (uint64_t)(device_dim_ptrs[k]);
// TODO(xuefeng) set mf_dim while using DownpourCtrDymfAccessor
ptr_val[accessor.common_feature_value.MfDimIndex()] = float(mf_dim);
val->mf_dim = mf_dim;
#endif #endif
if (dim > 8) { // CpuPS alreay expand as mf_dim if (dim > 8) { // CpuPS alreay expand as mf_dim
val->mf_size = mf_dim + 1; val->mf_size = mf_dim + 1;
...@@ -802,7 +857,6 @@ void PSGPUWrapper::EndPass() { ...@@ -802,7 +857,6 @@ void PSGPUWrapper::EndPass() {
cudaMemcpyDeviceToHost); cudaMemcpyDeviceToHost);
CHECK(len == hbm_pool->capacity()); CHECK(len == hbm_pool->capacity());
#ifdef PADDLE_WITH_PSLIB
uint64_t unuse_key = std::numeric_limits<uint64_t>::max(); uint64_t unuse_key = std::numeric_limits<uint64_t>::max();
for (size_t i = 0; i < len; ++i) { for (size_t i = 0; i < len; ++i) {
if (device_keys[i] == unuse_key) { if (device_keys[i] == unuse_key) {
...@@ -810,6 +864,7 @@ void PSGPUWrapper::EndPass() { ...@@ -810,6 +864,7 @@ void PSGPUWrapper::EndPass() {
} }
size_t offset = i * feature_value_size; size_t offset = i * feature_value_size;
FeatureValue* gpu_val = (FeatureValue*)(test_build_values + offset); FeatureValue* gpu_val = (FeatureValue*)(test_build_values + offset);
#ifdef PADDLE_WITH_PSLIB
auto* downpour_value = auto* downpour_value =
(paddle::ps::DownpourFixedFeatureValue*)(gpu_val->cpu_ptr); (paddle::ps::DownpourFixedFeatureValue*)(gpu_val->cpu_ptr);
int downpour_value_size = downpour_value->size(); int downpour_value_size = downpour_value->size();
...@@ -829,13 +884,32 @@ void PSGPUWrapper::EndPass() { ...@@ -829,13 +884,32 @@ void PSGPUWrapper::EndPass() {
embed_g2sum_index()] = gpu_val->lr_g2sum; embed_g2sum_index()] = gpu_val->lr_g2sum;
cpu_val[paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue:: cpu_val[paddle::ps::DownpourCtrDymfAccessor::DownpourCtrDymfFeatureValue::
slot_index()] = gpu_val->slot; slot_index()] = gpu_val->slot;
#endif
#ifdef PADDLE_WITH_PSCORE
auto* downpour_value =
(paddle::distributed::FixedFeatureValue*)(gpu_val->cpu_ptr);
int downpour_value_size = downpour_value->size();
if (gpu_val->mf_size > 0 && downpour_value_size == 8) {
downpour_value->resize(gpu_val->mf_dim + 1 + downpour_value_size);
}
float* cpu_val = downpour_value->data();
paddle::distributed::CtrDymfAccessor accessor;
cpu_val[accessor.common_feature_value.DeltaScoreIndex()] =
gpu_val->delta_score;
cpu_val[accessor.common_feature_value.ShowIndex()] = gpu_val->show;
cpu_val[accessor.common_feature_value.ClickIndex()] = gpu_val->clk;
cpu_val[accessor.common_feature_value.EmbedWIndex()] = gpu_val->lr;
cpu_val[accessor.common_feature_value.EmbedG2SumIndex()] =
gpu_val->lr_g2sum;
cpu_val[accessor.common_feature_value.SlotIndex()] = gpu_val->slot;
#endif
if (gpu_val->mf_size > 0) { if (gpu_val->mf_size > 0) {
for (int x = 0; x < gpu_val->mf_dim + 1; x++) { for (int x = 0; x < gpu_val->mf_dim + 1; x++) {
cpu_val[x + 8] = gpu_val->mf[x]; cpu_val[x + 8] = gpu_val->mf[x];
} }
} }
} }
#endif
free(test_build_values); free(test_build_values);
}; };
if (multi_mf_dim_) { if (multi_mf_dim_) {
......
...@@ -375,12 +375,12 @@ class DistributedOpsPass(PassBase): ...@@ -375,12 +375,12 @@ class DistributedOpsPass(PassBase):
if attrs['use_ps_gpu']: if attrs['use_ps_gpu']:
_program.global_block()._insert_op( _program.global_block()._insert_op(
index=distributed_idx, index=distributed_idx,
type="pull_box_sparse", type="pull_gpups_sparse",
inputs={"Ids": inputs, inputs={"Ids": inputs,
'W': w}, 'W': w},
outputs={"Out": outputs}, outputs={"Out": outputs},
attrs={ attrs={
"size": w.shape[1], "size": [w.shape[1] for i in inputs],
"is_distributed": True, "is_distributed": True,
"is_sparse": True "is_sparse": True
}) })
...@@ -679,7 +679,7 @@ class PsGpuPass(PassBase): ...@@ -679,7 +679,7 @@ class PsGpuPass(PassBase):
lookup_table_grad_var[name] = 1 lookup_table_grad_var[name] = 1
for idx, op in list(enumerate(program.global_block().ops)): for idx, op in list(enumerate(program.global_block().ops)):
if op.type == "pull_box_sparse": if op.type == "pull_box_sparse" or op.type == "pull_gpups_sparse":
continue continue
for key_name in op.input_names: for key_name in op.input_names:
for var in op.input(key_name): for var in op.input(key_name):
......
...@@ -293,12 +293,12 @@ def distributed_ops_pass(program, config, use_ps_gpu=False): ...@@ -293,12 +293,12 @@ def distributed_ops_pass(program, config, use_ps_gpu=False):
if use_ps_gpu: if use_ps_gpu:
program.global_block()._insert_op( program.global_block()._insert_op(
index=distributed_idx, index=distributed_idx,
type="pull_box_sparse", type="pull_gpups_sparse",
inputs={"Ids": inputs, inputs={"Ids": inputs,
'W': w}, 'W': w},
outputs={"Out": outputs}, outputs={"Out": outputs},
attrs={ attrs={
"size": w.shape[1], "size": [w.shape[1] for i in inputs],
"is_distributed": True, "is_distributed": True,
"is_sparse": True "is_sparse": True
}) })
...@@ -576,7 +576,7 @@ def ps_gpu_pass(program): ...@@ -576,7 +576,7 @@ def ps_gpu_pass(program):
op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName() op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName()
backward = core.op_proto_and_checker_maker.OpRole.Backward backward = core.op_proto_and_checker_maker.OpRole.Backward
for op in program.global_block().ops: for op in program.global_block().ops:
if op.type != "pull_box_sparse": if op.type != "pull_box_sparse" and op.type != "pull_gpups_sparse":
continue continue
grad_op_desc, op_grad_to_var = core.get_grad_op_desc( grad_op_desc, op_grad_to_var = core.get_grad_op_desc(
op.desc, cpt.to_text(set()), []) op.desc, cpt.to_text(set()), [])
...@@ -599,7 +599,7 @@ def ps_gpu_pass(program): ...@@ -599,7 +599,7 @@ def ps_gpu_pass(program):
lookup_table_grad_var[name] = 1 lookup_table_grad_var[name] = 1
for idx, op in list(enumerate(program.global_block().ops)): for idx, op in list(enumerate(program.global_block().ops)):
if op.type == "pull_box_sparse": if op.type == "pull_box_sparse" or op.type == "pull_gpups_sparse":
continue continue
for key_name in op.input_names: for key_name in op.input_names:
for var in op.input(key_name): for var in op.input(key_name):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册