diff --git a/paddle/fluid/framework/fleet/CMakeLists.txt b/paddle/fluid/framework/fleet/CMakeLists.txt index c3304e3f9021d69cf6e0fef77c236acb536fcf56..2e9104f40cc603914799202c96ee0ecd543633f5 100644 --- a/paddle/fluid/framework/fleet/CMakeLists.txt +++ b/paddle/fluid/framework/fleet/CMakeLists.txt @@ -12,15 +12,19 @@ else() endif(WITH_PSLIB) if(WITH_HETERPS) - if(WITH_NCCL) + if(WITH_NCCL AND WITH_GPU) nv_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cu ps_gpu_wrapper.cc DEPS heter_ps gloo_wrapper ${BRPC_DEPS}) add_subdirectory(heter_ps) + elseif(WITH_XPU_KP) + xpu_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.kps ps_gpu_wrapper.cc + DEPS heter_ps gloo_wrapper ${BRPC_DEPS}) + add_subdirectory(heter_ps) elseif(WITH_RCCL) hip_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cu ps_gpu_wrapper.cc DEPS heter_ps gloo_wrapper ${BRPC_DEPS}) add_subdirectory(heter_ps) - endif(WITH_NCCL) + endif() else() cc_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cc DEPS gloo_wrapper) endif(WITH_HETERPS) diff --git a/paddle/fluid/framework/fleet/heter_context.h b/paddle/fluid/framework/fleet/heter_context.h old mode 100755 new mode 100644 diff --git a/paddle/fluid/framework/fleet/heter_ps/CMakeLists.txt b/paddle/fluid/framework/fleet/heter_ps/CMakeLists.txt index cac366d6b22a1480eb75904e968d81c4cd43b72f..08bd3ce9ee6ffa46e48c4c721b254c0d02a3d2ab 100644 --- a/paddle/fluid/framework/fleet/heter_ps/CMakeLists.txt +++ b/paddle/fluid/framework/fleet/heter_ps/CMakeLists.txt @@ -21,6 +21,14 @@ IF(WITH_GPU) # target_link_libraries(test_sample_rate graph_gpu_ps) endif() ENDIF() +IF(WITH_XPU_KP) + SET(HETERPS_DEPS device_context) + xpu_library(heter_comm_kernel SRCS heter_comm_kernel.h heter_comm_kernel.kps feature_value.h) + xpu_library(hashtable_kernel SRCS hashtable.h hashtable_kernel.kps) + cc_library(heter_comm SRCS heter_comm.h heter_resource.cc DEPS ${HETERPS_DEPS} heter_comm_kernel hashtable_kernel) + cc_library(heter_ps SRCS heter_ps.cc DEPS heter_comm) + # xpu_library(heter_comm SRCS heter_comm.h heter_comm_kernel.kps feature_value.h heter_resource.cc heter_resource.h hashtable.h mem_pool.h DEPS ${HETERPS_DEPS}) +ENDIF() IF(WITH_ROCM) hip_library(heter_comm SRCS heter_comm.h feature_value.h heter_resource.cc heter_resource.h hashtable.h DEPS cub device_context) hip_test(test_heter_comm SRCS feature_value.h DEPS heter_comm) diff --git a/paddle/fluid/framework/fleet/heter_ps/hashtable_kernel.kps b/paddle/fluid/framework/fleet/heter_ps/hashtable_kernel.kps index 55edf883271b95a27d054f313211e3a078c864ae..e879d817b14dd1345c2f865c50bafe2581d130b4 100644 --- a/paddle/fluid/framework/fleet/heter_ps/hashtable_kernel.kps +++ b/paddle/fluid/framework/fleet/heter_ps/hashtable_kernel.kps @@ -48,7 +48,7 @@ __device__ void update_lr(float& w, float& g2sum, float g, // NOLINT GM2LM(optimizer_config::learning_rate, &local_learning_rate, sizeof(float)); GM2LM(optimizer_config::initial_g2sum, &local_initial_g2sum, sizeof(float)); GM2LM(optimizer_config::min_bound, &local_min_bound, sizeof(float)); - GM2LM(optimizr_config::max_bound, &local_max_bound, sizeof(float)); + GM2LM(optimizer_config::max_bound, &local_max_bound, sizeof(float)); double add_g2sum = 0; double ratio = local_learning_rate * @@ -136,7 +136,7 @@ __device__ void update_value(ValType& val, const GradType& grad) { // NOLINT template __global__ void insert_kernel(Table* table, const KeyType* const keys, - const ValType* const vals, size_t len) { + const ValType* const vals, long long len) { int cid = core_id(); int ncores = core_num(); if (cid >= ncores) { @@ -164,7 +164,7 @@ __global__ void insert_kernel(Table* table, const KeyType* const keys, template __global__ void search_kernel(Table* table, const KeyType* const keys, - ValType* const vals, size_t len) { + ValType* const vals, long long len) { int cid = core_id(); int ncores = core_num(); if (cid >= ncores) { @@ -194,7 +194,7 @@ __global__ void search_kernel(Table* table, const KeyType* const keys, template __global__ void update_kernel(Table* table, const KeyType* const keys, - const GradType* const grads, size_t len) { + const GradType* const grads, long long len) { int cid = core_id(); int ncores = core_num(); if (cid >= ncores) { @@ -251,7 +251,10 @@ void HashTable::get(const KeyType* d_keys, ValType* d_vals, if (len == 0) { return; } - search_kernel<<<4, 64, stream>>>(container_, d_keys, d_vals, len); + long long c_len = (long long)len; + search_kernel><<<4, 64, stream>>>( + container_, d_keys, d_vals, c_len); } template @@ -272,7 +275,10 @@ void HashTable::insert(const KeyType* d_keys, if (len == 0) { return; } - insert_kernel<<<4, 64, stream>>>(container_, d_keys, d_vals, len); + long long c_len = (long long)len; + insert_kernel><<<4, 64, stream>>>( + container_, d_keys, d_vals, c_len); } template @@ -289,7 +295,10 @@ void HashTable::update(const KeyType* d_keys, if (len == 0) { return; } - update_kernel<<<4, 64, stream>>>(container_, d_keys, d_grads, len); + long long c_len = (long long)len; + update_kernel, + GradType><<<4, 64, stream>>>(container_, d_keys, d_grads, + c_len); } template diff --git a/paddle/fluid/framework/fleet/heter_ps/heter_comm.h b/paddle/fluid/framework/fleet/heter_ps/heter_comm.h index 419bd716eb304738915adb2e74d08c9dd275bb95..42e00defcb0c0c583946c04def4a893e299e76a4 100644 --- a/paddle/fluid/framework/fleet/heter_ps/heter_comm.h +++ b/paddle/fluid/framework/fleet/heter_ps/heter_comm.h @@ -158,6 +158,7 @@ class HeterComm { #endif std::shared_ptr all_keys_mem; std::shared_ptr all_grads_mem; + KeyType* all_keys; GradType* all_grads; @@ -228,5 +229,7 @@ class HeterComm { } // end namespace framework } // end namespace paddle + #include "paddle/fluid/framework/fleet/heter_ps/heter_comm_inl.h" + #endif diff --git a/paddle/fluid/framework/fleet/heter_ps/heter_comm_inl.h b/paddle/fluid/framework/fleet/heter_ps/heter_comm_inl.h index 1e66b3cb250313850e79407846339e30f7525b14..e0cef022d3145b48d9e5561dd0de3f708912081e 100644 --- a/paddle/fluid/framework/fleet/heter_ps/heter_comm_inl.h +++ b/paddle/fluid/framework/fleet/heter_ps/heter_comm_inl.h @@ -411,7 +411,6 @@ void HeterComm::merge_grad( auto d_merge_keys = memory::Alloc(place, len * sizeof(KeyType)); KeyType* d_merge_keys_ptr = reinterpret_cast(d_merge_keys->ptr()); - auto d_merge_grads = memory::Alloc(place, len * sizeof(GradType)); GradType* d_merge_grads_ptr = reinterpret_cast(d_merge_grads->ptr()); @@ -1065,7 +1064,6 @@ void HeterComm::end_pass() { // platform::CUDADeviceGuard guard(dev_id); // tables_[index]->dump_to_cpu(dev_id, stream); //} - } // end namespace framework } // end namespace paddle #endif diff --git a/paddle/fluid/framework/fleet/heter_ps/heter_comm_kernel.kps b/paddle/fluid/framework/fleet/heter_ps/heter_comm_kernel.kps index a1923a7f6019b6ce69d1fa70f02a760ead5ca507..44eae528325dd1dd65179b056076e0132996c2d5 100644 --- a/paddle/fluid/framework/fleet/heter_ps/heter_comm_kernel.kps +++ b/paddle/fluid/framework/fleet/heter_ps/heter_comm_kernel.kps @@ -291,17 +291,21 @@ void HeterCommKernel::sort_pairs(void* d_temp_storage, bool debug_synchronous) {} template +void HeterCommKernel::reduce_by_key(void* d_temp_storage, + size_t& temp_storage_bytes, // NOLINT + KeysInputIteratorT d_keys_in, + UniqueOutputIteratorT d_unique_out, + ValuesInputIteratorT d_values_in, + AggregatesOutputIteratorT d_aggregates_out, + NumRunsOutputIteratorT d_num_runs_out, + int num_items, StreamType stream, + bool debug_synchronous) {} template void HeterCommKernel::fill_idx( int* idx, long long len, const XPUStream& stream); + template void HeterCommKernel::calc_shard_offset( int* idx, int* left, int* right, long long len, int total_devs, const XPUStream& stream); diff --git a/paddle/fluid/framework/fleet/heter_ps/heter_ps.cc b/paddle/fluid/framework/fleet/heter_ps/heter_ps.cc new file mode 100644 index 0000000000000000000000000000000000000000..3d375209ed14e91156dea7748bf3e656f1258457 --- /dev/null +++ b/paddle/fluid/framework/fleet/heter_ps/heter_ps.cc @@ -0,0 +1,61 @@ +/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/framework/fleet/heter_ps/heter_ps.h" +#include + +#ifdef PADDLE_WITH_HETERPS + +namespace paddle { +namespace framework { + +HeterPsBase* HeterPsBase::get_instance( + size_t capacity, std::shared_ptr resource) { + return new HeterPs(capacity, resource); +} + +HeterPs::HeterPs(size_t capacity, std::shared_ptr resource) { + comm_ = + std::make_shared>( + capacity, resource); +} + +HeterPs::~HeterPs() {} + +void HeterPs::pull_sparse(int num, FeatureKey* d_keys, FeatureValue* d_vals, + size_t len) { + comm_->pull_sparse(num, d_keys, d_vals, len); +} + +void HeterPs::build_ps(int num, FeatureKey* h_keys, FeatureValue* h_vals, + size_t len, size_t chunk_size, int stream_num) { + comm_->build_ps(num, h_keys, h_vals, len, chunk_size, stream_num); +} + +int HeterPs::get_index_by_devid(int devid) { + return comm_->get_index_by_devid(devid); +} + +void HeterPs::end_pass() { comm_->end_pass(); } + +void HeterPs::show_one_table(int gpu_num) { comm_->show_one_table(gpu_num); } + +void HeterPs::push_sparse(int num, FeatureKey* d_keys, + FeaturePushValue* d_grads, size_t len) { + // comm_->push_sparse_multi_node(num, d_keys, d_grads, len, opt_); +} + +} // end namespace framework +} // end namespace paddle +#endif diff --git a/paddle/fluid/framework/fleet/heter_ps/heter_resource.h b/paddle/fluid/framework/fleet/heter_ps/heter_resource.h index 164fca22768006a8872b0eee511e9ca5ed4562d1..badc7e98a65181645ce3b4945ec80acc786a7e17 100644 --- a/paddle/fluid/framework/fleet/heter_ps/heter_resource.h +++ b/paddle/fluid/framework/fleet/heter_ps/heter_resource.h @@ -94,6 +94,39 @@ using DevPlace = platform::XPUPlace; using AnyDeviceGuard = platform::XPUDeviceGuard; #endif +#elif defined(PADDLE_WITH_XPU_KP) +class XPUResource { + public: + XPUResource(std::vector& device_id, int index); // NOLINT + virtual ~XPUResource(); + XPUResource(const XPUResource&) = delete; + XPUResource& operator=(const XPUResource&) = delete; + + int dev_id() const { return dev_id_; } + int index() const { return index_; } + XPUStream local_stream(int num) { return local_streams_[num]; } + XPUStream remote_stream(int num) { return remote_streams_[num]; } + XPUStream comm_stream(int num) { return comm_streams_[num]; } + + int dev_id_; + int index_; + std::vector dev_ids_; + std::vector remote_streams_; + std::vector local_streams_; + std::vector comm_streams_; +}; +#endif + +#if defined(PADDLE_WITH_CUDA) +using DevResource = GPUResource; +using DevPlace = platform::CUDAPlace; +using AnyDeviceGuard = platform::CUDADeviceGuard; +#elif defined(PADDLE_WITH_XPU_KP) +using DevResource = XPUResource; +using DevPlace = platform::XPUPlace; +using AnyDeviceGuard = platform::XPUDeviceGuard; +#endif + class HeterPsResource { public: explicit HeterPsResource(const std::vector& dev_ids); diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc old mode 100755 new mode 100644 index 75f5c24af5a9961bafbc7296299c55bd46bce3ef..d894b71d2c3fd5a782e6d31a8599e1fba9f350c0 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc @@ -31,6 +31,7 @@ limitations under the License. */ #include #include +#include "paddle/fluid/framework/data_set.h" #include "paddle/fluid/framework/fleet/ps_gpu_wrapper.h" #include "paddle/fluid/platform/timer.h" @@ -496,115 +497,114 @@ void PSGPUWrapper::BuildPull(std::shared_ptr gpu_task) { } #endif }; - auto build_func = [device_num, record_status, &pass_values, &local_keys, - &local_ptr, &device_keys, &device_vals, - &device_mutex](int i) { - std::vector> task_keys(device_num); + auto build_func = + [device_num, record_status, &pass_values, &local_keys, &local_ptr, + &device_keys, &device_vals, &device_mutex](int i) { + std::vector> task_keys(device_num); #ifdef PADDLE_WITH_PSLIB - std::vector> task_ptrs( - device_num); + std::vector> + task_ptrs(device_num); #endif #ifdef PADDLE_WITH_PSCORE - std::vector> task_ptrs( - device_num); + std::vector> + task_ptrs(device_num); #endif - for (size_t j = 0; j < local_keys[i].size(); j++) { - int shard = local_keys[i][j] % device_num; - task_keys[shard].push_back(local_keys[i][j]); - task_ptrs[shard].push_back(local_ptr[i][j]); - } + for (size_t j = 0; j < local_keys[i].size(); j++) { + int shard = local_keys[i][j] % device_num; + task_keys[shard].push_back(local_keys[i][j]); + task_ptrs[shard].push_back(local_ptr[i][j]); + } #ifdef PADDLE_WITH_PSLIB - if (record_status) { - size_t local_keys_size = local_keys.size(); - size_t pass_values_size = pass_values.size(); - for (size_t j = 0; j < pass_values_size; j += local_keys_size) { - auto& shard_values = pass_values[j]; - for (size_t pair_idx = 0; pair_idx < pass_values[j].size(); - pair_idx++) { - auto& cur_pair = shard_values[pair_idx]; - int shard = cur_pair.first % device_num; - task_keys[shard].push_back(cur_pair.first); - task_ptrs[shard].push_back( - (paddle::ps::DownpourFixedFeatureValue*)cur_pair.second); + if (record_status) { + size_t local_keys_size = local_keys.size(); + size_t pass_values_size = pass_values.size(); + for (size_t j = 0; j < pass_values_size; j += local_keys_size) { + auto& shard_values = pass_values[j]; + for (size_t pair_idx = 0; pair_idx < pass_values[j].size(); + pair_idx++) { + auto& cur_pair = shard_values[pair_idx]; + int shard = cur_pair.first % device_num; + task_keys[shard].push_back(cur_pair.first); + task_ptrs[shard].push_back( + (paddle::ps::DownpourFixedFeatureValue*)cur_pair.second); + } + } } - } - } #endif - for (int dev = 0; dev < device_num; dev++) { - device_mutex[dev]->lock(); + for (int dev = 0; dev < device_num; dev++) { + device_mutex[dev]->lock(); - int len = task_keys[dev].size(); - int cur = device_keys[dev].size(); - device_keys[dev].resize(device_keys[dev].size() + len); - device_vals[dev].resize(device_vals[dev].size() + len); + int len = task_keys[dev].size(); + int cur = device_keys[dev].size(); + device_keys[dev].resize(device_keys[dev].size() + len); + device_vals[dev].resize(device_vals[dev].size() + len); #ifdef PADDLE_WITH_PSLIB - for (int j = 0; j < len; ++j) { - device_keys[dev][cur + j] = task_keys[dev][j]; - float* ptr_val = task_ptrs[dev][j]->data(); - FeatureValue& val = device_vals[dev][cur + j]; - size_t dim = task_ptrs[dev][j]->size(); - - val.delta_score = ptr_val[1]; - val.show = ptr_val[2]; - val.clk = ptr_val[3]; - val.slot = ptr_val[6]; - val.lr = ptr_val[4]; - val.lr_g2sum = ptr_val[5]; - val.cpu_ptr = (uint64_t)(task_ptrs[dev][j]); - - if (dim > 7) { - val.mf_size = MF_DIM + 1; - for (int x = 0; x < val.mf_size; x++) { - val.mf[x] = ptr_val[x + 7]; + for (int j = 0; j < len; ++j) { + device_keys[dev][cur + j] = task_keys[dev][j]; + float* ptr_val = task_ptrs[dev][j]->data(); + FeatureValue& val = device_vals[dev][cur + j]; + size_t dim = task_ptrs[dev][j]->size(); + + val.delta_score = ptr_val[1]; + val.show = ptr_val[2]; + val.clk = ptr_val[3]; + val.slot = ptr_val[6]; + val.lr = ptr_val[4]; + val.lr_g2sum = ptr_val[5]; + val.cpu_ptr = (uint64_t)(task_ptrs[dev][j]); + + if (dim > 7) { + val.mf_size = MF_DIM + 1; + for (int x = 0; x < val.mf_size; x++) { + val.mf[x] = ptr_val[x + 7]; + } + } else { + val.mf_size = 0; + for (int x = 0; x < MF_DIM + 1; x++) { + val.mf[x] = 0; + } + } } - } else { - val.mf_size = 0; - for (int x = 0; x < MF_DIM + 1; x++) { - val.mf[x] = 0; - } - } - } #endif #ifdef PADDLE_WITH_PSCORE - for (int j = 0; j < len; ++j) { - device_keys[dev][cur + j] = task_keys[dev][j]; - float* ptr_val = task_ptrs[dev][j]->data(); - FeatureValue& val = device_vals[dev][cur + j]; - size_t dim = task_ptrs[dev][j]->size(); - val.delta_score = ptr_val[2]; - val.show = ptr_val[3]; - val.clk = ptr_val[4]; - val.slot = ptr_val[0]; - val.lr = ptr_val[5]; - val.lr_g2sum = ptr_val[6]; - val.cpu_ptr = (uint64_t)(task_ptrs[dev][j]); - - if (dim > 7) { - val.mf_size = MF_DIM + 1; - for (int x = 0; x < val.mf_size; x++) { - val.mf[x] = ptr_val[x + 7]; - } - } else { - val.mf_size = 0; - for (int x = 0; x < MF_DIM + 1; x++) { - val.mf[x] = 0; + for (int j = 0; j < len; ++j) { + device_keys[dev][cur + j] = task_keys[dev][j]; + float* ptr_val = task_ptrs[dev][j]->data(); + FeatureValue& val = device_vals[dev][cur + j]; + size_t dim = task_ptrs[dev][j]->size(); + val.delta_score = ptr_val[2]; + val.show = ptr_val[3]; + val.clk = ptr_val[4]; + val.slot = ptr_val[0]; + val.lr = ptr_val[5]; + val.lr_g2sum = ptr_val[6]; + val.cpu_ptr = (uint64_t)(task_ptrs[dev][j]); + + if (dim > 7) { + val.mf_size = MF_DIM + 1; + for (int x = 0; x < val.mf_size; x++) { + val.mf[x] = ptr_val[x + 7]; + } + } else { + val.mf_size = 0; + for (int x = 0; x < MF_DIM + 1; x++) { + val.mf[x] = 0; + } + } } +#endif + VLOG(3) << "GpuPs build hbmps done"; } } -#endif - VLOG(3) << "GpuPs build hbmps done"; - - device_mutex[dev]->unlock(); - } - }; if (!multi_mf_dim_) { for (size_t i = 0; i < threads.size(); i++) { threads[i] = std::thread(build_func, i); } - } else { + } + else { for (int i = 0; i < thread_keys_shard_num_; i++) { for (int j = 0; j < multi_mf_dim_; j++) { threads[i * multi_mf_dim_ + j] = @@ -653,7 +653,9 @@ void PSGPUWrapper::BuildGPUTask(std::shared_ptr gpu_task) { } std::vector threads(device_num); HeterPs_ = HeterPsBase::get_instance(size_max, resource_); +#ifdef PADDLE_WITH_CUDA HeterPs_->set_nccl_comm_and_size(inner_comms_, inter_comms_, node_size_); +#endif auto build_func = [this, &gpu_task, &feature_keys_count](int i) { VLOG(3) << "building table: " << i; this->HeterPs_->build_ps(i, gpu_task->device_keys_[i].data(), @@ -791,18 +793,27 @@ void PSGPUWrapper::PullSparse(const paddle::platform::Place& place, const std::vector& values, const std::vector& slot_lengths, const int hidden_size) { - VLOG(3) << "Begine Gpu Ps PullSparse"; platform::Timer all_timer; platform::Timer pull_gpups_timer; all_timer.Start(); int64_t total_length = std::accumulate(slot_lengths.begin(), slot_lengths.end(), 0UL); +#ifdef PADDLE_WITH_CUDA + VLOG(3) << "Begine Gpu Ps PullSparse"; auto buf = memory::Alloc(place, total_length * sizeof(FeatureValue)); FeatureValue* total_values_gpu = reinterpret_cast(buf->ptr()); +#endif +#ifdef PADDLE_WITH_XPU_KP + VLOG(3) << "Begine Xpu Ps PullSparse"; + FeatureValue* total_values_gpu = nullptr; + xpu_malloc(reinterpret_cast(&total_values_gpu), + total_length * sizeof(FeatureValue)); +#endif if (platform::is_cpu_place(place)) { PADDLE_THROW(platform::errors::Unimplemented( "Warning:: CPUPlace is not supported in GpuPs now.")); } else if (platform::is_gpu_place(place)) { +#ifdef PADDLE_WITH_CUDA VLOG(3) << "Begin copy keys, key_num[" << total_length << "]"; int device_id = place.GetDeviceId(); int devid_2_index = HeterPs_->get_index_by_devid(device_id); @@ -842,9 +853,63 @@ void PSGPUWrapper::PullSparse(const paddle::platform::Place& place, this->CopyForPull(place, gpu_keys, values, total_values_gpu, gpu_len, static_cast(slot_lengths.size()), hidden_size, total_length); +#endif + } else if (platform::is_xpu_place(place)) { +#ifdef PADDLE_WITH_XPU_KP + VLOG(3) << "Begin copy keys, key_num[" << total_length << "]"; + int device_id = place.GetDeviceId(); + int devid_2_index = HeterPs_->get_index_by_devid(device_id); + LoDTensor& total_keys_tensor = keys_tensor[devid_2_index]; + uint64_t* total_keys = reinterpret_cast( + total_keys_tensor.mutable_data({total_length, 1}, place)); + + // construct slot_level lod info + auto slot_lengths_lod = slot_lengths; + for (size_t i = 1; i < slot_lengths_lod.size(); i++) { + slot_lengths_lod[i] += slot_lengths_lod[i - 1]; + } + + uint64_t* buf_key = nullptr; + int64_t* buf_length = nullptr; + PADDLE_ENFORCE_EQ(xpu_malloc(reinterpret_cast(&buf_key), + keys.size() * sizeof(uint64_t*)), + XPU_SUCCESS, platform::errors::ResourceExhausted( + "XPU has no enough memory")); + PADDLE_ENFORCE_EQ(xpu_malloc(reinterpret_cast(&buf_length), + slot_lengths.size() * sizeof(int64_t)), + XPU_SUCCESS, platform::errors::ResourceExhausted( + "XPU has no enough memory")); + + uint64_t** xpu_keys = reinterpret_cast(&buf_key); + int64_t* xpu_len = reinterpret_cast(buf_length); + PADDLE_ENFORCE_XPU_SUCCESS(xpu_memcpy(xpu_keys, keys.data(), + keys.size() * sizeof(uint64_t*), + XPU_HOST_TO_DEVICE)); + PADDLE_ENFORCE_XPU_SUCCESS(xpu_memcpy(xpu_len, slot_lengths_lod.data(), + slot_lengths.size() * sizeof(int64_t), + XPU_HOST_TO_DEVICE)); + + this->CopyKeys(place, xpu_keys, total_keys, xpu_len, + static_cast(slot_lengths.size()), + static_cast(total_length)); + VLOG(3) << "Begin call PullSparseGPU in GPUPS, dev: " << devid_2_index + << " len: " << total_length; + pull_gpups_timer.Start(); + HeterPs_->pull_sparse(devid_2_index, total_keys, total_values_gpu, + static_cast(total_length)); + // PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet( + // "PullSparseGPU failed in GPUPS.")); + pull_gpups_timer.Pause(); + + VLOG(3) << "Begin Copy result to tensor, total_length[" << total_length + << "]"; + this->CopyForPull(place, xpu_keys, values, total_values_gpu, xpu_len, + static_cast(slot_lengths.size()), hidden_size, + total_length); +#endif } else { PADDLE_THROW(platform::errors::PreconditionNotMet( - "GpuPs: PullSparse Only Support CUDAPlace Now.")); + "GpuPs/XpuPs: PullSparse Only Support CUDAPlace or XPUPlace Now.")); } all_timer.Pause(); VLOG(3) << "GpuPs PullSparse total costs: " << all_timer.ElapsedSec() @@ -859,15 +924,23 @@ void PSGPUWrapper::PushSparseGrad(const paddle::platform::Place& place, const std::vector& grad_values, const std::vector& slot_lengths, const int hidden_size, const int batch_size) { - VLOG(3) << "Begin GPUPS PushSparseGrad"; platform::Timer all_timer; platform::Timer push_gpups_timer; all_timer.Start(); int64_t total_length = std::accumulate(slot_lengths.begin(), slot_lengths.end(), 0UL); +#ifdef PADDLE_WITH_CUDA + VLOG(3) << "Begin GPUPS PushSparseGrad"; auto buf = memory::Alloc(place, total_length * sizeof(FeaturePushValue)); FeaturePushValue* total_grad_values_gpu = reinterpret_cast(buf->ptr()); +#endif +#ifdef PADDLE_WITH_XPU_KP + VLOG(3) << "Begine Xpu Ps PushSparseGrad"; + FeaturePushValue* total_grad_values_gpu = nullptr; + xpu_malloc(reinterpret_cast(&total_grad_values_gpu), + total_length * sizeof(FeaturePushValue)); +#endif if (platform::is_cpu_place(place)) { PADDLE_THROW(platform::errors::Unimplemented( "Warning:: CPUPlace is not supported in GPUPS now.")); @@ -887,6 +960,22 @@ void PSGPUWrapper::PushSparseGrad(const paddle::platform::Place& place, HeterPs_->push_sparse(devid_2_index, total_keys, total_grad_values_gpu, static_cast(total_length)); push_gpups_timer.Pause(); + } else if (platform::is_xpu_place(place)) { + int device_id = place.GetDeviceId(); + int devid_2_index = HeterPs_->get_index_by_devid(device_id); + LoDTensor& cached_total_keys_tensor = keys_tensor[devid_2_index]; + uint64_t* total_keys = + reinterpret_cast(cached_total_keys_tensor.data()); + VLOG(3) << "Begin copy grad tensor to xpups struct"; + this->CopyForPush(place, grad_values, total_grad_values_gpu, slot_lengths, + hidden_size, total_length, batch_size); + + VLOG(3) << "Begin call PushSparseXPU in XPUPS, dev: " << devid_2_index + << " len: " << total_length; + push_gpups_timer.Start(); + HeterPs_->push_sparse(devid_2_index, total_keys, total_grad_values_gpu, + static_cast(total_length)); + push_gpups_timer.Pause(); } else { PADDLE_THROW(platform::errors::PreconditionNotMet( "GPUPS: PushSparseGrad Only Support CUDAPlace Now.")); @@ -897,7 +986,6 @@ void PSGPUWrapper::PushSparseGrad(const paddle::platform::Place& place, << " s"; VLOG(3) << "End PushSparseGrad"; } - } // end namespace framework } // end namespace paddle #endif diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cu b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cu index 6a78a617b1fef6df94ab3b6d44016f558c68b889..cf7d98db27e843941cee87197e42cbc8589131e7 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cu +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cu @@ -105,6 +105,8 @@ __global__ void PushCopy(FeaturePushValue* dest, float** src, int64_t* len, } } +PSGPUWrapper::~PSGPUWrapper() { delete HeterPs_; } + void PSGPUWrapper::CopyForPull(const paddle::platform::Place& place, uint64_t** gpu_keys, const std::vector& values, diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h old mode 100755 new mode 100644 index 9145dda5f68c2ce03814e6e2017503746d72b663..d3dd4612e304b136c4f0591ac89982f0d1fa370d --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h @@ -30,16 +30,22 @@ limitations under the License. */ #include "paddle/fluid/framework/fleet/gloo_wrapper.h" #endif #include "paddle/fluid/distributed/ps/thirdparty/round_robin.h" -#include "paddle/fluid/framework/data_set.h" +#include "paddle/fluid/framework/channel.h" #include "paddle/fluid/framework/fleet/heter_context.h" #include "paddle/fluid/framework/fleet/heter_ps/heter_ps_base.h" #include "paddle/fluid/framework/fleet/heter_ps/heter_resource.h" +#include "paddle/fluid/framework/heter_util.h" +#ifdef PADDLE_WITH_CUDA #include "paddle/fluid/framework/fleet/heter_ps/mem_pool.h" +#include "paddle/fluid/platform/device/gpu/gpu_info.h" +#include "paddle/fluid/platform/dynload/nccl.h" +#endif +#ifdef PADDLE_WITH_XPU_KP +#include "paddle/fluid/platform/device/xpu/enforce_xpu.h" +#endif #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/tensor.h" #include "paddle/fluid/framework/variable_helper.h" -#include "paddle/fluid/platform/device/gpu/gpu_info.h" -#include "paddle/fluid/platform/dynload/nccl.h" #include "paddle/fluid/platform/macros.h" // for DISABLE_COPY_AND_ASSIGN #include "paddle/fluid/platform/place.h" #ifdef PADDLE_WITH_PSCORE @@ -55,9 +61,36 @@ namespace framework { #define TYPEALIGN(ALIGNVAL, LEN) \ (((uint64_t)(LEN) + ((ALIGNVAL)-1)) & ~((uint64_t)((ALIGNVAL)-1))) +class Dataset; + +#ifdef PADDLE_WITH_PSLIB +class AfsWrapper { + public: + AfsWrapper() {} + virtual ~AfsWrapper() {} + void init(const std::string& fs_name, const std::string& fs_user, + const std::string& pass_wd, const std::string& conf); + int remove(const std::string& path); + int mkdir(const std::string& path); + std::vector list(const std::string& path); + + int exist(const std::string& path); + int upload(const std::string& local_file, const std::string& afs_file); + + int download(const std::string& local_file, const std::string& afs_file); + + int touchz(const std::string& path); + std::string cat(const std::string& path); + int mv(const std::string& old_path, const std::string& dest_path); + + private: + paddle::ps::AfsApiWrapper afs_handler_; +}; +#endif + class PSGPUWrapper { public: - virtual ~PSGPUWrapper() { delete HeterPs_; } + virtual ~PSGPUWrapper(); PSGPUWrapper() { HeterPs_ = NULL; @@ -131,6 +164,7 @@ class PSGPUWrapper { PADDLE_THROW( platform::errors::Unavailable("heter ps need compile with GLOO")); #endif +#ifdef PADDLE_WITH_CUDA if (multi_node_) { int dev_size = dev_ids.size(); // init inner comm @@ -166,6 +200,7 @@ class PSGPUWrapper { platform::errors::Unavailable("heter ps need compile with GLOO")); #endif } +#endif heter_devices_ = dev_ids; data_ready_channel_->Open(); data_ready_channel_->SetCapacity(3); @@ -233,7 +268,11 @@ class PSGPUWrapper { ? 1.0 : config["mf_max_bound"]; for (size_t i = 0; i < heter_devices_.size(); i++) { +#ifdef PADDLE_WITH_CUDA PADDLE_ENFORCE_GPU_SUCCESS(cudaSetDevice(heter_devices_[i])); +#elif defined(PADDLE_WITH_XPU_KP) + PADDLE_ENFORCE_XPU_SUCCESS(xpu_set_device(heter_devices_[i])); +#endif this->SetSparseSGD(nonclk_coeff, clk_coeff, min_bound, max_bound, learning_rate, initial_g2sum, initial_range); this->SetEmbedxSGD(mf_create_thresholds, mf_learning_rate, @@ -241,6 +280,7 @@ class PSGPUWrapper { mf_max_bound); } } + void SetDate(int year, int month, int day) { year_ = year; month_ = month; @@ -268,6 +308,7 @@ class PSGPUWrapper { slot_offset_vector_ = slot_offset_vector; } +#ifdef PADDLE_WITH_CUDA void SetSlotDimVector(const std::vector& slot_mf_dim_vector) { slot_mf_dim_vector_ = slot_mf_dim_vector; assert(slot_mf_dim_vector_.size() == slot_vector_.size()); @@ -301,6 +342,7 @@ class PSGPUWrapper { grad_type_size_ = TYPEALIGN(8, sizeof(FeaturePushValue) + (max_mf_dim_ * sizeof(float))); } +#endif void ShowOneTable(int index) { HeterPs_->show_one_table(index); } @@ -342,9 +384,11 @@ class PSGPUWrapper { int multi_node_{0}; int node_size_; uint64_t table_id_; +#ifdef PADDLE_WITH_CUDA std::vector inner_comms_; std::vector inter_comms_; std::vector inter_ncclids_; +#endif std::vector heter_devices_; std::unordered_set gpu_ps_config_keys_; HeterObjectPool gpu_task_pool_; @@ -359,9 +403,11 @@ class PSGPUWrapper { int day_; int use_afs_api_ = 0; +#ifdef PADDLE_WITH_CUDA std::vector mem_pools_; std::vector hbm_pools_; // in multi mfdim, one table need hbm // pools of totol dims number +#endif std::shared_ptr< paddle::framework::ChannelObject>> diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.kps b/paddle/fluid/framework/fleet/ps_gpu_wrapper.kps new file mode 100644 index 0000000000000000000000000000000000000000..6d69ae0136d68e4c42026d50ddc24bf45350c194 --- /dev/null +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.kps @@ -0,0 +1,339 @@ +/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#ifdef PADDLE_WITH_HETERPS +#include // NOLINT +#include +#include +#include +#include +#include "paddle/fluid/framework/fleet/heter_ps/optimizer_conf.h" +#include "paddle/fluid/framework/fleet/ps_gpu_wrapper.h" +#include "paddle/fluid/framework/lod_tensor.h" +#include "xpu/kernel/cluster_header.h" // NOLINT +#include "xpu/kernel/debug.h" // NOLINT +#include "xpu/kernel/math.h" // NOLINT +#include "xpu/kernel/simd.h" + +namespace paddle { +namespace framework { + +__global__ void PullCopy(float** dest, const FeatureValue* src, + const long long* len, int hidden, int slot_num, + int total_len, unsigned long long** keys) { + int cid = core_id(); + int ncores = core_num(); + if (cid >= ncores) { + return; + } + int thread_id = ncores * cluster_id() + cid; + int nthreads = ncores * cluster_num(); + __local__ int64_t local_len[slot_num]; + GM2LM(len, local_len, slot_num * sizeof(int64_t)); + + for (int i = thread_id; i < slot_num; i += nthreads) { + // max core local memory = 8KB + // slot's max memory size = slot_len * sizeof(FeatureValue) + int slot_len = i ? local_len[i] - local_len[i - 1] : local_len[0]; + int read_len = min(roundup_div(1024 * 8, sizeof(FeatureValue)), slot_len); + int dest_len = i ? local_len[i - 1] : 0; + __local__ FeatureValue local_slot_vals[read_len]; + __local__ float local_dest_vals[read_len * hidden]; + __local__ uint64_t local_slot_keys[read_len]; + + // copy read_len (length) of slots' val to LM + for (int k = 0; k < slot_len; k += read_len) { + int real_read_len = min(read_len, slot_len - k); + GM2LM(src + dest_len + k, local_slot_vals, + real_read_len * sizeof(FeatureValue)); + GM2LM(keys[i] + k, local_slot_keys, real_read_len * sizeof(uint64_t)); + for (int j = 0; j < real_read_len; j++) { + if (local_slot_keys[j] == 0) { + local_dest_vals[j * hidden] = 0; + local_dest_vals[j * hidden + 1] = 0; + local_dest_vals[j * hidden + 2] = 0; + } else { + local_dest_vals[j * hidden] = local_slot_vals[j].show; + local_dest_vals[j * hidden + 1] = local_slot_vals[j].clk; + local_dest_vals[j * hidden + 2] = local_slot_vals[j].lr; + } + + if (local_slot_vals[j].mf_size == 0 || local_slot_keys[j] == 0) { + for (int m = 0; m < hidden - 3; m++) { + local_dest_vals[j * hidden + 3 + m] = 0; + } + } else { + for (int m = 0; m < hidden - 3; m++) { + local_dest_vals[j * hidden + 3 + m] = local_slot_vals[j].mf[1 + m]; + } + } + } + LM2GM(local_dest_vals, dest[i] + k * hidden, + real_read_len * hidden * sizeof(float)); + } + } +} + +__global__ void CopyKeysKernel(unsigned long long** src_keys, + unsigned long long* dest_total_keys, + const long long* len, int slot_num, + int total_len) { + int cid = core_id(); + int ncores = core_num(); + if (cid >= ncores) { + return; + } + int thread_id = ncores * cluster_id() + cid; + int nthreads = ncores * cluster_num(); + __local__ int64_t local_len[slot_num]; + GM2LM(len, local_len, slot_num * sizeof(int64_t)); + + for (int i = thread_id; i < slot_num; i += nthreads) { + // max core local memory = 8KB + int slot_len = i ? local_len[i] - local_len[i - 1] : local_len[0]; + int read_len = min(slot_len, 1024); + int dest_len = i ? local_len[i - 1] : 0; + __local__ uint64_t local_slot_keys[read_len]; + + for (int k = 0; k < slot_len; k += read_len) { + int real_read_len = min(read_len, slot_len - k); + GM2LM(src_keys[i] + k, local_slot_keys, real_read_len * sizeof(uint64_t)); + LM2GM(local_slot_keys, dest_total_keys + dest_len + k, + real_read_len * sizeof(uint64_t)); + } + } +} + +__global__ void PushCopy(FeaturePushValue* dest, float** src, long long* len, + int hidden, int slot_num, int total_len, int bs, + int* slot_vector) { + int cid = core_id(); + int ncores = core_num(); + if (cid >= ncores) { + return; + } + int thread_id = ncores * cluster_id() + cid; + int nthreads = ncores * cluster_num(); + __local__ int64_t local_len[slot_num]; + __local__ int local_slot[slot_num]; + GM2LM(len, local_len, slot_num * sizeof(int64_t)); + GM2LM(slot_vector, local_slot, slot_num * sizeof(int)); + + for (int i = thread_id; i < slot_num; i += nthreads) { + int slot_len = i ? local_len[i] - local_len[i - 1] : local_len[0]; + + // max core local memory = 8KB + // slot's max memory size = slot_len * hidden * 8 + int read_len = min(roundup_div(1024, hidden), slot_len); + int dest_len = i ? local_len[i - 1] : 0; + __local__ float local_slot_grads[read_len * hidden]; + __local__ FeaturePushValue local_dest_grads[read_len]; + + // copy read_len(length) of slots' grad to LM + for (int k = 0; k < slot_len; k += read_len) { + int real_read_len = min(read_len, slot_len - k); + GM2LM(src[i] + k * hidden, local_slot_grads, + real_read_len * hidden * sizeof(float)); + // copy from slots' grad to total grad + for (int j = 0; j < real_read_len; j++) { + local_dest_grads[j].slot = local_slot[i]; + local_dest_grads[j].show = local_slot_grads[j * hidden]; + local_dest_grads[j].clk = local_slot_grads[j * hidden + 1]; + local_dest_grads[j].lr_g = local_slot_grads[j * hidden + 2] * -1. * bs; + for (int m = 0; m < hidden - 3; m++) { + local_dest_grads[j].mf_g[m] = + local_slot_grads[j * hidden + 3 + m] * -1. * bs; + } + } + LM2GM(local_dest_grads, dest + dest_len + k, + real_read_len * sizeof(FeaturePushValue)); + } + } +} + +PSGPUWrapper::~PSGPUWrapper() { + delete HeterPs_; + xpu_free((void*)optimizer_config::nonclk_coeff); + xpu_free((void*)optimizer_config::clk_coeff); + xpu_free((void*)optimizer_config::min_bound); + xpu_free((void*)optimizer_config::max_bound); + xpu_free((void*)optimizer_config::learning_rate); + xpu_free((void*)optimizer_config::initial_g2sum); + xpu_free((void*)optimizer_config::initial_range); + + xpu_free((void*)optimizer_config::mf_create_thresholds); + xpu_free((void*)optimizer_config::mf_learning_rate); + xpu_free((void*)optimizer_config::mf_initial_g2sum); + xpu_free((void*)optimizer_config::mf_initial_range); + xpu_free((void*)optimizer_config::mf_min_bound); + xpu_free((void*)optimizer_config::mf_max_bound); +} + +void PSGPUWrapper::CopyForPull(const paddle::platform::Place& place, + uint64_t** gpu_keys, + const std::vector& values, + const FeatureValue* total_values_gpu, + const int64_t* gpu_len, const int slot_num, + const int hidden_size, + const int64_t total_length) { + XPUStream stream = nullptr; + auto dev_ctx = platform::DeviceContextPool::Instance().Get(place); + stream = static_cast(dev_ctx) + ->x_context() + ->xpu_stream; + float* buf_value = nullptr; + xpu_malloc(reinterpret_cast(&buf_value), + values.size() * sizeof(float*)); + float** gpu_values = reinterpret_cast(&buf_value); + xpu_memcpy(gpu_values, values.data(), values.size() * sizeof(float*), + XPU_HOST_TO_DEVICE); + + unsigned long long** c_keys = (unsigned long long**)gpu_keys; + const long long* c_len = (const long long*)gpu_len; + PullCopy<<<2, 64, stream>>>(gpu_values, total_values_gpu, c_len, hidden_size, + slot_num, total_length, c_keys); + + xpu_wait(stream); +} + +void PSGPUWrapper::CopyKeys(const paddle::platform::Place& place, + uint64_t** origin_keys, uint64_t* total_keys, + const int64_t* gpu_len, int slot_num, + int total_len) { + XPUStream stream = nullptr; + auto dev_ctx = platform::DeviceContextPool::Instance().Get(place); + stream = static_cast(dev_ctx) + ->x_context() + ->xpu_stream; + unsigned long long** o_keys = (unsigned long long**)origin_keys; + unsigned long long* t_keys = (unsigned long long*)total_keys; + const long long* c_len = (const long long*)gpu_len; + CopyKeysKernel<<<2, 64, stream>>>(o_keys, t_keys, c_len, slot_num, total_len); + xpu_wait(stream); +} + +void PSGPUWrapper::CopyForPush(const paddle::platform::Place& place, + const std::vector& grad_values, + FeaturePushValue* total_grad_values_gpu, + const std::vector& slot_lengths, + const int hidden_size, + const int64_t total_length, + const int batch_size) { + XPUStream stream = nullptr; + auto dev_ctx = platform::DeviceContextPool::Instance().Get(place); + stream = static_cast(dev_ctx) + ->x_context() + ->xpu_stream; + auto slot_lengths_lod = slot_lengths; + for (size_t i = 1; i < slot_lengths_lod.size(); i++) { + slot_lengths_lod[i] += slot_lengths_lod[i - 1]; + } + + float* buf_grad_value = nullptr; + int64_t* buf_length = nullptr; + int* buf_slot_vector = nullptr; + + xpu_malloc(reinterpret_cast(&buf_grad_value), + grad_values.size() * sizeof(float*)); + xpu_malloc(reinterpret_cast(&buf_length), + slot_lengths.size() * sizeof(int64_t)); + xpu_malloc(reinterpret_cast(&buf_slot_vector), + slot_lengths_lod.size() * sizeof(int)); + + float** gpu_values = reinterpret_cast(&buf_grad_value); + int64_t* gpu_len = reinterpret_cast(buf_length); + int* d_slot_vector = reinterpret_cast(buf_slot_vector); + xpu_memcpy(gpu_values, grad_values.data(), + grad_values.size() * sizeof(float*), XPU_HOST_TO_DEVICE); + xpu_memcpy(gpu_len, slot_lengths_lod.data(), + slot_lengths.size() * sizeof(int64_t), XPU_HOST_TO_DEVICE); + xpu_memcpy(d_slot_vector, slot_vector_.data(), + slot_lengths_lod.size() * sizeof(int), XPU_HOST_TO_DEVICE); + + long long* c_len = (long long*)gpu_len; + PushCopy<<<2, 64, stream>>>(total_grad_values_gpu, gpu_values, c_len, + hidden_size, slot_lengths.size(), total_length, + batch_size, d_slot_vector); + xpu_wait(stream); +} + +void PSGPUWrapper::SetSparseSGD(float nonclk_coeff, float clk_coeff, + float min_bound, float max_bound, + float learning_rate, float initial_g2sum, + float initial_range) { + xpu_malloc(reinterpret_cast(&optimizer_config::nonclk_coeff), + sizeof(float)); + xpu_malloc(reinterpret_cast(&optimizer_config::clk_coeff), + sizeof(float)); + xpu_malloc(reinterpret_cast(&optimizer_config::min_bound), + sizeof(float)); + xpu_malloc(reinterpret_cast(&optimizer_config::max_bound), + sizeof(float)); + xpu_malloc(reinterpret_cast(&optimizer_config::learning_rate), + sizeof(float)); + xpu_malloc(reinterpret_cast(&optimizer_config::initial_g2sum), + sizeof(float)); + xpu_malloc(reinterpret_cast(&optimizer_config::initial_range), + sizeof(float)); + + xpu_memcpy((void*)optimizer_config::nonclk_coeff, &nonclk_coeff, + sizeof(float), XPU_HOST_TO_DEVICE); + xpu_memcpy((void*)optimizer_config::clk_coeff, &clk_coeff, sizeof(float), + XPU_HOST_TO_DEVICE); + xpu_memcpy((void*)optimizer_config::min_bound, &min_bound, sizeof(float), + XPU_HOST_TO_DEVICE); + xpu_memcpy((void*)optimizer_config::max_bound, &max_bound, sizeof(float), + XPU_HOST_TO_DEVICE); + xpu_memcpy((void*)optimizer_config::learning_rate, &learning_rate, + sizeof(float), XPU_HOST_TO_DEVICE); + xpu_memcpy((void*)optimizer_config::initial_g2sum, &initial_g2sum, + sizeof(float), XPU_HOST_TO_DEVICE); + xpu_memcpy((void*)optimizer_config::initial_range, &initial_range, + sizeof(float), XPU_HOST_TO_DEVICE); +} + +void PSGPUWrapper::SetEmbedxSGD(float mf_create_thresholds, + float mf_learning_rate, float mf_initial_g2sum, + float mf_initial_range, float mf_min_bound, + float mf_max_bound) { + xpu_malloc(reinterpret_cast(&optimizer_config::mf_create_thresholds), + sizeof(float)); + xpu_malloc(reinterpret_cast(&optimizer_config::mf_learning_rate), + sizeof(float)); + xpu_malloc(reinterpret_cast(&optimizer_config::mf_initial_g2sum), + sizeof(float)); + xpu_malloc(reinterpret_cast(&optimizer_config::mf_initial_range), + sizeof(float)); + xpu_malloc(reinterpret_cast(&optimizer_config::mf_min_bound), + sizeof(float)); + xpu_malloc(reinterpret_cast(&optimizer_config::mf_max_bound), + sizeof(float)); + + xpu_memcpy((void*)optimizer_config::mf_create_thresholds, + &mf_create_thresholds, sizeof(float), XPU_HOST_TO_DEVICE); + xpu_memcpy((void*)optimizer_config::mf_initial_g2sum, &mf_initial_g2sum, + sizeof(float), XPU_HOST_TO_DEVICE); + xpu_memcpy((void*)optimizer_config::mf_initial_range, &mf_initial_range, + sizeof(float), XPU_HOST_TO_DEVICE); + xpu_memcpy((void*)optimizer_config::mf_min_bound, &mf_min_bound, + sizeof(float), XPU_HOST_TO_DEVICE); + xpu_memcpy((void*)optimizer_config::mf_max_bound, &mf_max_bound, + sizeof(float), XPU_HOST_TO_DEVICE); + xpu_memcpy((void*)optimizer_config::mf_learning_rate, &mf_learning_rate, + sizeof(float), XPU_HOST_TO_DEVICE); +} + +} // end namespace framework +} // end namespace paddle +#endif diff --git a/paddle/fluid/framework/ps_gpu_trainer.cc b/paddle/fluid/framework/ps_gpu_trainer.cc index e0cf860e5bc7b94872e612112a4d5977571db489..e4004c2fbf3b56731b460b74f5a6ed0eaaedd25b 100644 --- a/paddle/fluid/framework/ps_gpu_trainer.cc +++ b/paddle/fluid/framework/ps_gpu_trainer.cc @@ -25,7 +25,9 @@ limitations under the License. */ #include "paddle/fluid/framework/trainer.h" #if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \ (defined PADDLE_WITH_PSLIB) +#ifdef PADDLE_WITH_CUDA #include "paddle/fluid/platform/cuda_device_guard.h" +#endif namespace paddle { namespace framework { @@ -56,7 +58,12 @@ void PSGPUTrainer::Initialize(const TrainerDesc& trainer_desc, std::vector dev_ids; for (int i = 0; i < place_num; ++i) { int num = trainer_desc.worker_places(i); +#ifdef PADDLE_WITH_CUDA platform::CUDAPlace place = platform::CUDAPlace(num); +#endif +#ifdef PADDLE_WITH_XPU_KP + platform::XPUPlace place = platform::XPUPlace(num); +#endif places_.push_back(place); dev_ids.push_back(num); } diff --git a/paddle/fluid/framework/ps_gpu_worker.cc b/paddle/fluid/framework/ps_gpu_worker.cc index dc8935587e99c68f1ea0166372b98625cc4d9273..323b2c4803afd584cabd0ae56bd3c8d993006db1 100644 --- a/paddle/fluid/framework/ps_gpu_worker.cc +++ b/paddle/fluid/framework/ps_gpu_worker.cc @@ -20,7 +20,9 @@ limitations under the License. */ #if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \ (defined PADDLE_WITH_PSLIB) +#ifdef PADDLE_WITH_CUDA #include "paddle/fluid/platform/cuda_device_guard.h" +#endif #if defined _WIN32 || defined __APPLE__ #else diff --git a/paddle/fluid/operators/pull_box_sparse_op.cc b/paddle/fluid/operators/pull_box_sparse_op.cc index 90e4fc9da0d6182db8cd91ddefd15f385560bdbc..22b43910e6967d5aa8236bfc0b5a952fa4b681e4 100644 --- a/paddle/fluid/operators/pull_box_sparse_op.cc +++ b/paddle/fluid/operators/pull_box_sparse_op.cc @@ -132,5 +132,7 @@ REGISTER_OPERATOR(pull_box_sparse, ops::PullBoxSparseOp, ops::PushBoxSparseOpMaker, ops::PushBoxSparseOpMaker); REGISTER_OPERATOR(push_box_sparse, ops::PushBoxSparseOp); -REGISTER_OP_CPU_KERNEL(pull_box_sparse, ops::PullBoxSparseCPUKernel) -REGISTER_OP_CPU_KERNEL(push_box_sparse, ops::PushBoxSparseCPUKernel) +REGISTER_OP_CPU_KERNEL(pull_box_sparse, ops::PullBoxSparseCPUKernel); +REGISTER_OP_CPU_KERNEL(push_box_sparse, ops::PushBoxSparseCPUKernel); +REGISTER_OP_XPU_KERNEL(pull_box_sparse, ops::PullBoxSparseXPUKernel); +REGISTER_OP_XPU_KERNEL(push_box_sparse, ops::PushBoxSparseXPUKernel); diff --git a/paddle/fluid/operators/pull_box_sparse_op.cu b/paddle/fluid/operators/pull_box_sparse_op.cu index 96a1b1c08b79c2b564a701484bf06cf3d9c3a2bc..e3407dd3b2e8ba912a71f1cb419b9395b8382504 100644 --- a/paddle/fluid/operators/pull_box_sparse_op.cu +++ b/paddle/fluid/operators/pull_box_sparse_op.cu @@ -11,7 +11,6 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - #include "paddle/fluid/operators/pull_box_sparse_op.h" #include "paddle/fluid/platform/device/gpu/gpu_info.h" #include "paddle/fluid/platform/device/gpu/gpu_primitives.h" @@ -38,7 +37,7 @@ class PushBoxSparseCUDAKernel : public framework::OpKernel { }; } // namespace operators } // namespace paddle - namespace ops = paddle::operators; -REGISTER_OP_CUDA_KERNEL(pull_box_sparse, ops::PullBoxSparseCUDAKernel) -REGISTER_OP_CUDA_KERNEL(push_box_sparse, ops::PushBoxSparseCUDAKernel) + +REGISTER_OP_CUDA_KERNEL(pull_box_sparse, ops::PullBoxSparseCUDAKernel); +REGISTER_OP_CUDA_KERNEL(push_box_sparse, ops::PushBoxSparseCUDAKernel); diff --git a/paddle/fluid/operators/pull_box_sparse_op.h b/paddle/fluid/operators/pull_box_sparse_op.h index 77021b8961db552f1e850eb04914bf2a963aeb7b..2bde9725abdca191d9e9891ca37299b4851c02f5 100644 --- a/paddle/fluid/operators/pull_box_sparse_op.h +++ b/paddle/fluid/operators/pull_box_sparse_op.h @@ -114,5 +114,21 @@ class PushBoxSparseCPUKernel : public framework::OpKernel { PushBoxSparseFunctor(ctx); } }; + +template +class PullBoxSparseXPUKernel : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext &ctx) const override { + PullBoxSparseFunctor(ctx); + } +}; + +template +class PushBoxSparseXPUKernel : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext &ctx) const override { + PushBoxSparseFunctor(ctx); + } +}; } // namespace operators } // namespace paddle diff --git a/paddle/fluid/pybind/ps_gpu_wrapper_py.cc b/paddle/fluid/pybind/ps_gpu_wrapper_py.cc index fe1f27226bad4b95bb85e7f67dbd940a2058b92a..d1eef8b39754f63d31b64861990726907e6fb314 100644 --- a/paddle/fluid/pybind/ps_gpu_wrapper_py.cc +++ b/paddle/fluid/pybind/ps_gpu_wrapper_py.cc @@ -25,6 +25,7 @@ limitations under the License. */ #include #include +#include "paddle/fluid/framework/data_set.h" #include "paddle/fluid/framework/fleet/ps_gpu_wrapper.h" #include "paddle/fluid/pybind/ps_gpu_wrapper_py.h" @@ -39,8 +40,10 @@ void BindPSGPUWrapper(py::module* m) { .def(py::init([]() { return framework::PSGPUWrapper::GetInstance(); })) .def("set_slot_vector", &framework::PSGPUWrapper::SetSlotVector, py::call_guard()) +#ifdef PADDLE_WITH_CUDA .def("set_slot_dim_vector", &framework::PSGPUWrapper::SetSlotDimVector, py::call_guard()) +#endif .def("set_slot_offset_vector", &framework::PSGPUWrapper::SetSlotOffsetVector, py::call_guard())