From 272b7f1c17f822eb81d524a55efb634af0d10934 Mon Sep 17 00:00:00 2001 From: Fan Zhang Date: Thu, 12 May 2022 18:55:45 +0800 Subject: [PATCH] Xpups dev (#42692) * Adapt XPUPS - 1st version - 3.24 * Adapt XPUPS - update XPU PushSparse - 2nd version - 3.24 * Adapt XPUPS - add XPU PullSparseOp - 3nd version - 3.25 * refactor heter comm kernel * update. test=develop * Adapt XPUPS - modify by compilation - 4th version - 3.27 * update calc_shard_offset. test=develop * update xpu kernel. test=develop * update args of calc_shard_offset * update. test=develop * remove customGradMerger * update. test=develop * heter_comm update * heter_comm update * update calc_shard_offset. test=develop * heter_comm update * update args of calc_shard_offset * update. test=develop * remove customGradMerger * update. test=develop * fix. test=develop * update. test=develop * update. test=develop * update optimizer kernel * Adapt XPUPS - use WITH_XPU_KP and modify wrapper kernel function - 5th version - 3.30 * update. test=develop * update pslib.cmake * update. test=develop * update. test=develop * update. test=develop * update. test=develop * update. test=develop * Adapt XPUPS - modify by kp compilation - 6th version - 3.30 * update. test=develop * update. test=develop * update. test=develop * update optimizer kernel * update. test=develop * update. test=develop * update. test=develop * update. test=develop * update. test=develop * update. test=develop * update. test=develop * update. test=develop * fix. test=develop * fix. test=develop * used by minxu * update heter_comm_inl * fix. test=develop * Adapt XPUPS - modify by kp compilation - 7th version - 3.30 * fix. test=develop * add optimizer kernel. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * 3.31 update * Adapt XPUPS - update kp compilation path - 8th version - 3.31 * add optimizer kernel. test=develop * fix kunlun not support size_t. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * fix kunlun not support size_t. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * update heter_comm_kernel.kps 3.31 * fix. test=develop * fix. test=develop * update heter_comm_kernel.kps 3.31 * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * update heter_comm.h 3.31 * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * update hashtable. test=develop * update. test=develop * Adapt XPUPS - update by kp compilation - 9th version - 4.1 * update hashtable. test=develop * fix. test=develop * update hashtable 4.1 * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * Adapt XPUPS - update by kp compilation - 10th version - 4.1 * fix. test=develop * fix. test=develop * fix. test=develop * update. test=develop * modify by compilation 4.1 * update. test=develop * update. test=develop * fix. test=develop * modify by compilation 4.1 * update. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * modify by compilation 4.1 * fix. test=develop * fix. test=develop * fix. test=develop * modify by compilation 4.1 19:30 * fix. test=develop * update ps_gpu_wrapper.kps 4.1 * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * Adapt XPUPS - update by kp compilation - 11th version - 4.1 * fix. test=develop * Adapt XPUPS - update by kp compilation - 12nd version - 4.2 * fix. test=develop * fix. test=develop * modify by compilation 4.2 * 4.2 update * fix. test=develop * template init. test=develop * update 4.6 * fix. test=develop * template init. test=develop * 4.6 modify by compilation * hashtable template init. test=develop * hashtable template init. test=develop * fix. test=develop * fix. test=develop * fix. test=devlop * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=devlop * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * Adapt XPUPS - update by kp compilation - 13nd version - 4.7 * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * 4.11 update * fix. test=develop * fix. test=develop * 4.11 update * update by pre-commit * fix. test=develop * fix. test=develop * fix. test=develop * fix. test=develop * 4.12 update * fix. test=develop * Adapt XPUPS - update by kp compilation - 14th version - 4.13 * 4.13 update * 4.14 update * 4.14 update * 4.14 update * 4.14 modify by merged latest compilation * retry CI 4.14 * 4.15 pass static check * 4.15 modify by gpups CI * 3.16 update by gpups CI - modify ps_gpu_wrapper.h * 4.16 update * 4.16 pass xpu compile * 4.16 retry CI * 4.16 update * Adapt XPUPS - adapt BKCL comm for XPUPS - 4.24 * update by compilation * Adapt XPUPS - register PSGPUTrainer for XPUPS - 4.25 * update device_worker_factory * Adapt XPUPS - split heter_ps into .cu and .cc - 4.27 * Adapt XPUPS - register pull_box_sparse op under XPU_KP - 4.28 * update * 5.7 modify ps_gpu_wrapper pull_sparse * 5.11 update ps_gpu_wrapper CopyKeysKernel Co-authored-by: zmxdream --- .../fluid/framework/fleet/ps_gpu_wrapper.cc | 43 +++++-------------- .../fluid/framework/fleet/ps_gpu_wrapper.kps | 23 ++++++---- 2 files changed, 26 insertions(+), 40 deletions(-) diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc index 64765c98fd..f512fcc7b9 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc @@ -898,17 +898,9 @@ void PSGPUWrapper::PullSparse(const paddle::platform::Place& place, all_timer.Start(); int64_t total_length = std::accumulate(slot_lengths.begin(), slot_lengths.end(), 0UL); -#ifdef PADDLE_WITH_CUDA - VLOG(3) << "Begine Gpu Ps PullSparse"; + VLOG(3) << "Begine Gpu/Xpu Ps PullSparse"; auto buf = memory::Alloc(place, total_length * sizeof(FeatureValue)); FeatureValue* total_values_gpu = reinterpret_cast(buf->ptr()); -#endif -#ifdef PADDLE_WITH_XPU_KP - VLOG(3) << "Begine Xpu Ps PullSparse"; - FeatureValue* total_values_gpu = nullptr; - xpu_malloc(reinterpret_cast(&total_values_gpu), - total_length * sizeof(FeatureValue)); -#endif if (platform::is_cpu_place(place)) { PADDLE_THROW(platform::errors::Unimplemented( "Warning:: CPUPlace is not supported in GpuPs now.")); @@ -969,19 +961,11 @@ void PSGPUWrapper::PullSparse(const paddle::platform::Place& place, slot_lengths_lod[i] += slot_lengths_lod[i - 1]; } - uint64_t* buf_key = nullptr; - int64_t* buf_length = nullptr; - PADDLE_ENFORCE_EQ(xpu_malloc(reinterpret_cast(&buf_key), - keys.size() * sizeof(uint64_t*)), - XPU_SUCCESS, platform::errors::ResourceExhausted( - "XPU has no enough memory")); - PADDLE_ENFORCE_EQ(xpu_malloc(reinterpret_cast(&buf_length), - slot_lengths.size() * sizeof(int64_t)), - XPU_SUCCESS, platform::errors::ResourceExhausted( - "XPU has no enough memory")); - - uint64_t** xpu_keys = reinterpret_cast(&buf_key); - int64_t* xpu_len = reinterpret_cast(buf_length); + auto buf_key = memory::Alloc(place, keys.size() * sizeof(uint64_t*)); + auto buf_length = + memory::Alloc(place, slot_lengths.size() * sizeof(int64_t)); + uint64_t** xpu_keys = reinterpret_cast(buf_key->ptr()); + int64_t* xpu_len = reinterpret_cast(buf_length->ptr()); PADDLE_ENFORCE_XPU_SUCCESS(xpu_memcpy(xpu_keys, keys.data(), keys.size() * sizeof(uint64_t*), XPU_HOST_TO_DEVICE)); @@ -997,8 +981,6 @@ void PSGPUWrapper::PullSparse(const paddle::platform::Place& place, pull_gpups_timer.Start(); HeterPs_->pull_sparse(devid_2_index, total_keys, total_values_gpu, static_cast(total_length)); - // PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet( - // "PullSparseGPU failed in GPUPS.")); pull_gpups_timer.Pause(); VLOG(3) << "Begin Copy result to tensor, total_length[" << total_length @@ -1029,22 +1011,16 @@ void PSGPUWrapper::PushSparseGrad(const paddle::platform::Place& place, all_timer.Start(); int64_t total_length = std::accumulate(slot_lengths.begin(), slot_lengths.end(), 0UL); -#ifdef PADDLE_WITH_CUDA + // #ifdef PADDLE_WITH_CUDA VLOG(3) << "Begin GPUPS PushSparseGrad"; auto buf = memory::Alloc(place, total_length * sizeof(FeaturePushValue)); FeaturePushValue* total_grad_values_gpu = reinterpret_cast(buf->ptr()); -#endif -#ifdef PADDLE_WITH_XPU_KP - VLOG(3) << "Begine Xpu Ps PushSparseGrad"; - FeaturePushValue* total_grad_values_gpu = nullptr; - xpu_malloc(reinterpret_cast(&total_grad_values_gpu), - total_length * sizeof(FeaturePushValue)); -#endif if (platform::is_cpu_place(place)) { PADDLE_THROW(platform::errors::Unimplemented( "Warning:: CPUPlace is not supported in GPUPS now.")); } else if (platform::is_gpu_place(place)) { +#ifdef PADDLE_WITH_CUDA int device_id = place.GetDeviceId(); int devid_2_index = HeterPs_->get_index_by_devid(device_id); LoDTensor& cached_total_keys_tensor = keys_tensor[devid_2_index]; @@ -1060,7 +1036,9 @@ void PSGPUWrapper::PushSparseGrad(const paddle::platform::Place& place, HeterPs_->push_sparse(devid_2_index, total_keys, total_grad_values_gpu, static_cast(total_length)); push_gpups_timer.Pause(); +#endif } else if (platform::is_xpu_place(place)) { +#ifdef PADDLE_WITH_XPU_KP int device_id = place.GetDeviceId(); int devid_2_index = HeterPs_->get_index_by_devid(device_id); LoDTensor& cached_total_keys_tensor = keys_tensor[devid_2_index]; @@ -1076,6 +1054,7 @@ void PSGPUWrapper::PushSparseGrad(const paddle::platform::Place& place, HeterPs_->push_sparse(devid_2_index, total_keys, total_grad_values_gpu, static_cast(total_length)); push_gpups_timer.Pause(); +#endif } else { PADDLE_THROW(platform::errors::PreconditionNotMet( "GPUPS: PushSparseGrad Only Support CUDAPlace Now.")); diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.kps b/paddle/fluid/framework/fleet/ps_gpu_wrapper.kps index 28dd873a11..58b9f0f722 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.kps +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.kps @@ -84,7 +84,7 @@ __global__ void PullCopy(float** dest, const FeatureValue* src, } } -__global__ void CopyKeysKernel(unsigned long long** src_keys, +__global__ void CopyKeysKernel(unsigned long long* src_keys, unsigned long long* dest_total_keys, const long long* len, int slot_num, int total_len) { @@ -95,21 +95,27 @@ __global__ void CopyKeysKernel(unsigned long long** src_keys, } int thread_id = ncores * cluster_id() + cid; int nthreads = ncores * cluster_num(); - __local__ int64_t local_len[slot_num]; - GM2LM(len, local_len, slot_num * sizeof(int64_t)); + __local__ long long local_len[slot_num]; + GM2LM(len, local_len, slot_num * sizeof(long long)); + + __global_ptr__ unsigned long long* local_keys[slot_num]; + GM2LM(src_keys, local_keys, + slot_num * sizeof(__global_ptr__ unsigned long long*)); for (int i = thread_id; i < slot_num; i += nthreads) { // max core local memory = 8KB int slot_len = i ? local_len[i] - local_len[i - 1] : local_len[0]; - int read_len = min(slot_len, 1024); + // int read_len = min(slot_len, 1024); + int read_len = 100; int dest_len = i ? local_len[i - 1] : 0; - __local__ uint64_t local_slot_keys[read_len]; + __local__ unsigned long long local_slot_keys[read_len]; for (int k = 0; k < slot_len; k += read_len) { int real_read_len = min(read_len, slot_len - k); - GM2LM(src_keys[i] + k, local_slot_keys, real_read_len * sizeof(uint64_t)); + GM2LM(local_keys[i] + k, local_slot_keys, + real_read_len * sizeof(unsigned long long)); LM2GM(local_slot_keys, dest_total_keys + dest_len + k, - real_read_len * sizeof(uint64_t)); + real_read_len * sizeof(unsigned long long)); } } } @@ -199,7 +205,8 @@ void PSGPUWrapper::CopyKeys(const paddle::platform::Place& place, stream = static_cast(dev_ctx) ->x_context() ->xpu_stream; - unsigned long long** o_keys = (unsigned long long**)origin_keys; + unsigned long long* o_keys = + reinterpret_cast(origin_keys); unsigned long long* t_keys = (unsigned long long*)total_keys; const long long* c_len = (const long long*)gpu_len; CopyKeysKernel<<<2, 64, stream>>>(o_keys, t_keys, c_len, slot_num, total_len); -- GitLab