diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc index 64765c98fd04b8b48035b8197407a0c775212563..f512fcc7b9fdbee8baad3d9241cc48305580c83b 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc @@ -898,17 +898,9 @@ void PSGPUWrapper::PullSparse(const paddle::platform::Place& place, all_timer.Start(); int64_t total_length = std::accumulate(slot_lengths.begin(), slot_lengths.end(), 0UL); -#ifdef PADDLE_WITH_CUDA - VLOG(3) << "Begine Gpu Ps PullSparse"; + VLOG(3) << "Begine Gpu/Xpu Ps PullSparse"; auto buf = memory::Alloc(place, total_length * sizeof(FeatureValue)); FeatureValue* total_values_gpu = reinterpret_cast(buf->ptr()); -#endif -#ifdef PADDLE_WITH_XPU_KP - VLOG(3) << "Begine Xpu Ps PullSparse"; - FeatureValue* total_values_gpu = nullptr; - xpu_malloc(reinterpret_cast(&total_values_gpu), - total_length * sizeof(FeatureValue)); -#endif if (platform::is_cpu_place(place)) { PADDLE_THROW(platform::errors::Unimplemented( "Warning:: CPUPlace is not supported in GpuPs now.")); @@ -969,19 +961,11 @@ void PSGPUWrapper::PullSparse(const paddle::platform::Place& place, slot_lengths_lod[i] += slot_lengths_lod[i - 1]; } - uint64_t* buf_key = nullptr; - int64_t* buf_length = nullptr; - PADDLE_ENFORCE_EQ(xpu_malloc(reinterpret_cast(&buf_key), - keys.size() * sizeof(uint64_t*)), - XPU_SUCCESS, platform::errors::ResourceExhausted( - "XPU has no enough memory")); - PADDLE_ENFORCE_EQ(xpu_malloc(reinterpret_cast(&buf_length), - slot_lengths.size() * sizeof(int64_t)), - XPU_SUCCESS, platform::errors::ResourceExhausted( - "XPU has no enough memory")); - - uint64_t** xpu_keys = reinterpret_cast(&buf_key); - int64_t* xpu_len = reinterpret_cast(buf_length); + auto buf_key = memory::Alloc(place, keys.size() * sizeof(uint64_t*)); + auto buf_length = + memory::Alloc(place, slot_lengths.size() * sizeof(int64_t)); + uint64_t** xpu_keys = reinterpret_cast(buf_key->ptr()); + int64_t* xpu_len = reinterpret_cast(buf_length->ptr()); PADDLE_ENFORCE_XPU_SUCCESS(xpu_memcpy(xpu_keys, keys.data(), keys.size() * sizeof(uint64_t*), XPU_HOST_TO_DEVICE)); @@ -997,8 +981,6 @@ void PSGPUWrapper::PullSparse(const paddle::platform::Place& place, pull_gpups_timer.Start(); HeterPs_->pull_sparse(devid_2_index, total_keys, total_values_gpu, static_cast(total_length)); - // PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet( - // "PullSparseGPU failed in GPUPS.")); pull_gpups_timer.Pause(); VLOG(3) << "Begin Copy result to tensor, total_length[" << total_length @@ -1029,22 +1011,16 @@ void PSGPUWrapper::PushSparseGrad(const paddle::platform::Place& place, all_timer.Start(); int64_t total_length = std::accumulate(slot_lengths.begin(), slot_lengths.end(), 0UL); -#ifdef PADDLE_WITH_CUDA + // #ifdef PADDLE_WITH_CUDA VLOG(3) << "Begin GPUPS PushSparseGrad"; auto buf = memory::Alloc(place, total_length * sizeof(FeaturePushValue)); FeaturePushValue* total_grad_values_gpu = reinterpret_cast(buf->ptr()); -#endif -#ifdef PADDLE_WITH_XPU_KP - VLOG(3) << "Begine Xpu Ps PushSparseGrad"; - FeaturePushValue* total_grad_values_gpu = nullptr; - xpu_malloc(reinterpret_cast(&total_grad_values_gpu), - total_length * sizeof(FeaturePushValue)); -#endif if (platform::is_cpu_place(place)) { PADDLE_THROW(platform::errors::Unimplemented( "Warning:: CPUPlace is not supported in GPUPS now.")); } else if (platform::is_gpu_place(place)) { +#ifdef PADDLE_WITH_CUDA int device_id = place.GetDeviceId(); int devid_2_index = HeterPs_->get_index_by_devid(device_id); LoDTensor& cached_total_keys_tensor = keys_tensor[devid_2_index]; @@ -1060,7 +1036,9 @@ void PSGPUWrapper::PushSparseGrad(const paddle::platform::Place& place, HeterPs_->push_sparse(devid_2_index, total_keys, total_grad_values_gpu, static_cast(total_length)); push_gpups_timer.Pause(); +#endif } else if (platform::is_xpu_place(place)) { +#ifdef PADDLE_WITH_XPU_KP int device_id = place.GetDeviceId(); int devid_2_index = HeterPs_->get_index_by_devid(device_id); LoDTensor& cached_total_keys_tensor = keys_tensor[devid_2_index]; @@ -1076,6 +1054,7 @@ void PSGPUWrapper::PushSparseGrad(const paddle::platform::Place& place, HeterPs_->push_sparse(devid_2_index, total_keys, total_grad_values_gpu, static_cast(total_length)); push_gpups_timer.Pause(); +#endif } else { PADDLE_THROW(platform::errors::PreconditionNotMet( "GPUPS: PushSparseGrad Only Support CUDAPlace Now.")); diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.kps b/paddle/fluid/framework/fleet/ps_gpu_wrapper.kps index 28dd873a117dc6a45e9cf36e5ddfea3d214dd07d..58b9f0f722f8cd2e0232821b43b73a92119fb611 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.kps +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.kps @@ -84,7 +84,7 @@ __global__ void PullCopy(float** dest, const FeatureValue* src, } } -__global__ void CopyKeysKernel(unsigned long long** src_keys, +__global__ void CopyKeysKernel(unsigned long long* src_keys, unsigned long long* dest_total_keys, const long long* len, int slot_num, int total_len) { @@ -95,21 +95,27 @@ __global__ void CopyKeysKernel(unsigned long long** src_keys, } int thread_id = ncores * cluster_id() + cid; int nthreads = ncores * cluster_num(); - __local__ int64_t local_len[slot_num]; - GM2LM(len, local_len, slot_num * sizeof(int64_t)); + __local__ long long local_len[slot_num]; + GM2LM(len, local_len, slot_num * sizeof(long long)); + + __global_ptr__ unsigned long long* local_keys[slot_num]; + GM2LM(src_keys, local_keys, + slot_num * sizeof(__global_ptr__ unsigned long long*)); for (int i = thread_id; i < slot_num; i += nthreads) { // max core local memory = 8KB int slot_len = i ? local_len[i] - local_len[i - 1] : local_len[0]; - int read_len = min(slot_len, 1024); + // int read_len = min(slot_len, 1024); + int read_len = 100; int dest_len = i ? local_len[i - 1] : 0; - __local__ uint64_t local_slot_keys[read_len]; + __local__ unsigned long long local_slot_keys[read_len]; for (int k = 0; k < slot_len; k += read_len) { int real_read_len = min(read_len, slot_len - k); - GM2LM(src_keys[i] + k, local_slot_keys, real_read_len * sizeof(uint64_t)); + GM2LM(local_keys[i] + k, local_slot_keys, + real_read_len * sizeof(unsigned long long)); LM2GM(local_slot_keys, dest_total_keys + dest_len + k, - real_read_len * sizeof(uint64_t)); + real_read_len * sizeof(unsigned long long)); } } } @@ -199,7 +205,8 @@ void PSGPUWrapper::CopyKeys(const paddle::platform::Place& place, stream = static_cast(dev_ctx) ->x_context() ->xpu_stream; - unsigned long long** o_keys = (unsigned long long**)origin_keys; + unsigned long long* o_keys = + reinterpret_cast(origin_keys); unsigned long long* t_keys = (unsigned long long*)total_keys; const long long* c_len = (const long long*)gpu_len; CopyKeysKernel<<<2, 64, stream>>>(o_keys, t_keys, c_len, slot_num, total_len);