未验证 提交 a60d93fb 编写于 作者: Q Qi Li 提交者: GitHub

[ROCM] update fluid framework for rocm (part2), test=develop (#31010)

上级 565354f6
...@@ -4,6 +4,10 @@ if(WITH_PSLIB) ...@@ -4,6 +4,10 @@ if(WITH_PSLIB)
nv_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cu ps_gpu_wrapper.cc nv_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cu ps_gpu_wrapper.cc
DEPS heter_ps) DEPS heter_ps)
add_subdirectory(heter_ps) add_subdirectory(heter_ps)
elseif(WITH_RCCL)
hip_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cu ps_gpu_wrapper.cc
DEPS heter_ps)
add_subdirectory(heter_ps)
else() else()
cc_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cc) cc_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cc)
endif(WITH_NCCL) endif(WITH_NCCL)
...@@ -12,11 +16,16 @@ else() ...@@ -12,11 +16,16 @@ else()
cc_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cc) cc_library(ps_gpu_wrapper SRCS ps_gpu_wrapper.cc)
endif(WITH_PSLIB) endif(WITH_PSLIB)
if(WITH_NCCL) if(WITH_NCCL OR WITH_RCCL)
cc_library(nccl_wrapper SRCS nccl_wrapper.cc DEPS framework_proto variable_helper scope) cc_library(nccl_wrapper SRCS nccl_wrapper.cc DEPS framework_proto variable_helper scope)
endif() endif()
if(WITH_BOX_PS) if(WITH_BOX_PS)
if(WITH_GPU)
nv_library(box_wrapper SRCS box_wrapper.cc box_wrapper.cu DEPS framework_proto lod_tensor box_ps) nv_library(box_wrapper SRCS box_wrapper.cc box_wrapper.cu DEPS framework_proto lod_tensor box_ps)
endif()
if(WITH_ROCM)
hip_library(box_wrapper SRCS box_wrapper.cc box_wrapper.cu DEPS framework_proto lod_tensor box_ps)
endif()
else() else()
cc_library(box_wrapper SRCS box_wrapper.cc DEPS framework_proto lod_tensor) cc_library(box_wrapper SRCS box_wrapper.cc DEPS framework_proto lod_tensor)
endif(WITH_BOX_PS) endif(WITH_BOX_PS)
......
...@@ -25,7 +25,7 @@ namespace paddle { ...@@ -25,7 +25,7 @@ namespace paddle {
namespace framework { namespace framework {
std::shared_ptr<BoxWrapper> BoxWrapper::s_instance_ = nullptr; std::shared_ptr<BoxWrapper> BoxWrapper::s_instance_ = nullptr;
cudaStream_t BoxWrapper::stream_list_[8]; gpuStream_t BoxWrapper::stream_list_[8];
std::shared_ptr<boxps::BoxPSBase> BoxWrapper::boxps_ptr_ = nullptr; std::shared_ptr<boxps::BoxPSBase> BoxWrapper::boxps_ptr_ = nullptr;
AfsManager* BoxWrapper::afs_manager = nullptr; AfsManager* BoxWrapper::afs_manager = nullptr;
int BoxWrapper::embedx_dim_ = 8; int BoxWrapper::embedx_dim_ = 8;
......
...@@ -142,8 +142,13 @@ void BoxWrapper::CopyForPull(const paddle::platform::Place& place, ...@@ -142,8 +142,13 @@ void BoxWrapper::CopyForPull(const paddle::platform::Place& place,
->stream(); ->stream();
auto buf_value = memory::AllocShared(place, values.size() * sizeof(float*)); auto buf_value = memory::AllocShared(place, values.size() * sizeof(float*));
float** gpu_values = reinterpret_cast<float**>(buf_value->ptr()); float** gpu_values = reinterpret_cast<float**>(buf_value->ptr());
#ifdef PADDLE_WITH_HIP
hipMemcpy(gpu_values, values.data(), values.size() * sizeof(float*),
hipMemcpyHostToDevice);
#else
cudaMemcpy(gpu_values, values.data(), values.size() * sizeof(float*), cudaMemcpy(gpu_values, values.data(), values.size() * sizeof(float*),
cudaMemcpyHostToDevice); cudaMemcpyHostToDevice);
#endif
#define EMBEDX_CASE(i, ...) \ #define EMBEDX_CASE(i, ...) \
case i: { \ case i: { \
constexpr size_t EmbedxDim = i; \ constexpr size_t EmbedxDim = i; \
...@@ -155,6 +160,19 @@ void BoxWrapper::CopyForPull(const paddle::platform::Place& place, ...@@ -155,6 +160,19 @@ void BoxWrapper::CopyForPull(const paddle::platform::Place& place,
} \ } \
} break } break
#ifdef PADDLE_WITH_HIP
#define EXPAND_EMBED_PUSH_CASE(i, ...) \
case i: { \
constexpr size_t ExpandDim = i; \
hipLaunchKernelGGL( \
PushCopy<EmbedxDim, ExpandDim>, dim3((total_length + 512 - 1) / 512), \
dim3(512), 0, stream, gpu_values, \
reinterpret_cast<boxps::FeatureValueGpu<EmbedxDim, ExpandDim>*>( \
total_values_gpu), \
gpu_len, hidden_size, expand_embed_dim, slot_num, total_length, \
gpu_keys); \
} break
#else
#define EXPAND_EMBED_PULL_CASE(i, ...) \ #define EXPAND_EMBED_PULL_CASE(i, ...) \
case i: { \ case i: { \
constexpr size_t ExpandDim = i; \ constexpr size_t ExpandDim = i; \
...@@ -166,6 +184,7 @@ void BoxWrapper::CopyForPull(const paddle::platform::Place& place, ...@@ -166,6 +184,7 @@ void BoxWrapper::CopyForPull(const paddle::platform::Place& place,
gpu_len, hidden_size, expand_embed_dim, slot_num, total_length, \ gpu_len, hidden_size, expand_embed_dim, slot_num, total_length, \
gpu_keys); \ gpu_keys); \
} break } break
#endif
switch (hidden_size - 3) { switch (hidden_size - 3) {
EMBEDX_CASE(8, EXPAND_EMBED_PULL_CASE(0); EXPAND_EMBED_PULL_CASE(8); EMBEDX_CASE(8, EXPAND_EMBED_PULL_CASE(0); EXPAND_EMBED_PULL_CASE(8);
...@@ -187,9 +206,16 @@ void BoxWrapper::CopyKeys(const paddle::platform::Place& place, ...@@ -187,9 +206,16 @@ void BoxWrapper::CopyKeys(const paddle::platform::Place& place,
platform::DeviceContextPool::Instance().Get( platform::DeviceContextPool::Instance().Get(
BOOST_GET_CONST(platform::CUDAPlace, place))) BOOST_GET_CONST(platform::CUDAPlace, place)))
->stream(); ->stream();
#ifdef PADDLE_WITH_HIP
hipLaunchKernelGGL(CopyKeysKernel, dim3((total_len + 512 - 1) / 512),
dim3(512), 0, stream, origin_keys, total_keys, gpu_len,
slot_num, total_len);
hipStreamSynchronize(stream);
#else
CopyKeysKernel<<<(total_len + 512 - 1) / 512, 512, 0, stream>>>( CopyKeysKernel<<<(total_len + 512 - 1) / 512, 512, 0, stream>>>(
origin_keys, total_keys, gpu_len, slot_num, total_len); origin_keys, total_keys, gpu_len, slot_num, total_len);
cudaStreamSynchronize(stream); cudaStreamSynchronize(stream);
#endif
} }
void BoxWrapper::CopyForPush(const paddle::platform::Place& place, void BoxWrapper::CopyForPush(const paddle::platform::Place& place,
...@@ -217,12 +243,21 @@ void BoxWrapper::CopyForPush(const paddle::platform::Place& place, ...@@ -217,12 +243,21 @@ void BoxWrapper::CopyForPush(const paddle::platform::Place& place,
int64_t* gpu_len = reinterpret_cast<int64_t*>(buf_length->ptr()); int64_t* gpu_len = reinterpret_cast<int64_t*>(buf_length->ptr());
int* d_slot_vector = reinterpret_cast<int*>(buf_slot_vector->ptr()); int* d_slot_vector = reinterpret_cast<int*>(buf_slot_vector->ptr());
#ifdef PADDLE_WITH_HIP
hipMemcpy(gpu_values, grad_values.data(), grad_values.size() * sizeof(float*),
hipMemcpyHostToDevice);
hipMemcpy(gpu_len, slot_lengths_lod.data(),
slot_lengths.size() * sizeof(int64_t), hipMemcpyHostToDevice);
hipMemcpy(d_slot_vector, slot_vector_.data(),
slot_lengths_lod.size() * sizeof(int), hipMemcpyHostToDevice);
#else
cudaMemcpy(gpu_values, grad_values.data(), cudaMemcpy(gpu_values, grad_values.data(),
grad_values.size() * sizeof(float*), cudaMemcpyHostToDevice); grad_values.size() * sizeof(float*), cudaMemcpyHostToDevice);
cudaMemcpy(gpu_len, slot_lengths_lod.data(), cudaMemcpy(gpu_len, slot_lengths_lod.data(),
slot_lengths.size() * sizeof(int64_t), cudaMemcpyHostToDevice); slot_lengths.size() * sizeof(int64_t), cudaMemcpyHostToDevice);
cudaMemcpy(d_slot_vector, slot_vector_.data(), cudaMemcpy(d_slot_vector, slot_vector_.data(),
slot_lengths_lod.size() * sizeof(int), cudaMemcpyHostToDevice); slot_lengths_lod.size() * sizeof(int), cudaMemcpyHostToDevice);
#endif
#define EMBEDX_CASE(i, ...) \ #define EMBEDX_CASE(i, ...) \
case i: { \ case i: { \
...@@ -235,6 +270,18 @@ void BoxWrapper::CopyForPush(const paddle::platform::Place& place, ...@@ -235,6 +270,18 @@ void BoxWrapper::CopyForPush(const paddle::platform::Place& place,
} \ } \
} break } break
#ifdef PADDLE_WITH_HIP
#define EXPAND_EMBED_PUSH_CASE(i, ...) \
case i: { \
constexpr size_t ExpandDim = i; \
hipLaunchKernelGGL(PushCopy<EmbedxDim, ExpandDim>, \
dim3(total_length + 512 - 1) / 512), dim3(512), 0, stream, \
reinterpret_cast<boxps::FeaturePushValueGpu<EmbedxDim, ExpandDim>*>( \
total_grad_values_gpu), \
gpu_values, gpu_len, hidden_size, expand_embed_dim, \
slot_lengths.size(), total_length, batch_size, d_slot_vector); \
} break
#else
#define EXPAND_EMBED_PUSH_CASE(i, ...) \ #define EXPAND_EMBED_PUSH_CASE(i, ...) \
case i: { \ case i: { \
constexpr size_t ExpandDim = i; \ constexpr size_t ExpandDim = i; \
...@@ -245,6 +292,7 @@ void BoxWrapper::CopyForPush(const paddle::platform::Place& place, ...@@ -245,6 +292,7 @@ void BoxWrapper::CopyForPush(const paddle::platform::Place& place,
gpu_values, gpu_len, hidden_size, expand_embed_dim, \ gpu_values, gpu_len, hidden_size, expand_embed_dim, \
slot_lengths.size(), total_length, batch_size, d_slot_vector); \ slot_lengths.size(), total_length, batch_size, d_slot_vector); \
} break } break
#endif
switch (hidden_size - 3) { switch (hidden_size - 3) {
EMBEDX_CASE(8, EXPAND_EMBED_PUSH_CASE(0); EXPAND_EMBED_PUSH_CASE(8); EMBEDX_CASE(8, EXPAND_EMBED_PUSH_CASE(0); EXPAND_EMBED_PUSH_CASE(8);
......
...@@ -396,7 +396,7 @@ class BoxWrapper { ...@@ -396,7 +396,7 @@ class BoxWrapper {
const std::string& model_path) { const std::string& model_path) {
if (nullptr != s_instance_) { if (nullptr != s_instance_) {
VLOG(3) << "Begin InitializeGPU"; VLOG(3) << "Begin InitializeGPU";
std::vector<cudaStream_t*> stream_list; std::vector<gpuStream_t*> stream_list;
for (int i = 0; i < platform::GetCUDADeviceCount(); ++i) { for (int i = 0; i < platform::GetCUDADeviceCount(); ++i) {
VLOG(3) << "before get context i[" << i << "]"; VLOG(3) << "before get context i[" << i << "]";
platform::CUDADeviceContext* context = platform::CUDADeviceContext* context =
...@@ -542,8 +542,12 @@ class BoxWrapper { ...@@ -542,8 +542,12 @@ class BoxWrapper {
auto* gpu_data = gpu_tensor.data<T>(); auto* gpu_data = gpu_tensor.data<T>();
auto len = gpu_tensor.numel(); auto len = gpu_tensor.numel();
data->resize(len); data->resize(len);
#ifdef PADDLE_WITH_HIP
hipMemcpy(data->data(), gpu_data, sizeof(T) * len, hipMemcpyDeviceToHost);
#else
cudaMemcpy(data->data(), gpu_data, sizeof(T) * len, cudaMemcpy(data->data(), gpu_data, sizeof(T) * len,
cudaMemcpyDeviceToHost); cudaMemcpyDeviceToHost);
#endif
} }
static inline std::pair<int, int> parse_cmatch_rank(uint64_t x) { static inline std::pair<int, int> parse_cmatch_rank(uint64_t x) {
// first 32 bit store cmatch and second 32 bit store rank // first 32 bit store cmatch and second 32 bit store rank
...@@ -819,7 +823,7 @@ class BoxWrapper { ...@@ -819,7 +823,7 @@ class BoxWrapper {
} }
private: private:
static cudaStream_t stream_list_[8]; static gpuStream_t stream_list_[8];
static std::shared_ptr<boxps::BoxPSBase> boxps_ptr_; static std::shared_ptr<boxps::BoxPSBase> boxps_ptr_;
boxps::PSAgentBase* p_agent_ = nullptr; boxps::PSAgentBase* p_agent_ = nullptr;
// TODO(hutuxian): magic number, will add a config to specify // TODO(hutuxian): magic number, will add a config to specify
......
...@@ -43,7 +43,7 @@ void BoxWrapper::PullSparseCase(const paddle::platform::Place& place, ...@@ -43,7 +43,7 @@ void BoxWrapper::PullSparseCase(const paddle::platform::Place& place,
PADDLE_THROW(platform::errors::Unimplemented( PADDLE_THROW(platform::errors::Unimplemented(
"Warning:: CPUPlace is not supported in PaddleBox now.")); "Warning:: CPUPlace is not supported in PaddleBox now."));
} else if (platform::is_gpu_place(place)) { } else if (platform::is_gpu_place(place)) {
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) #if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && !defined(_WIN32)
VLOG(3) << "Begin copy keys, key_num[" << total_length << "]"; VLOG(3) << "Begin copy keys, key_num[" << total_length << "]";
int device_id = BOOST_GET_CONST(platform::CUDAPlace, place).GetDeviceId(); int device_id = BOOST_GET_CONST(platform::CUDAPlace, place).GetDeviceId();
LoDTensor& total_keys_tensor = keys_tensor[device_id]; LoDTensor& total_keys_tensor = keys_tensor[device_id];
...@@ -60,11 +60,17 @@ void BoxWrapper::PullSparseCase(const paddle::platform::Place& place, ...@@ -60,11 +60,17 @@ void BoxWrapper::PullSparseCase(const paddle::platform::Place& place,
memory::AllocShared(place, slot_lengths.size() * sizeof(int64_t)); memory::AllocShared(place, slot_lengths.size() * sizeof(int64_t));
uint64_t** gpu_keys = reinterpret_cast<uint64_t**>(buf_key->ptr()); uint64_t** gpu_keys = reinterpret_cast<uint64_t**>(buf_key->ptr());
int64_t* gpu_len = reinterpret_cast<int64_t*>(buf_length->ptr()); int64_t* gpu_len = reinterpret_cast<int64_t*>(buf_length->ptr());
#ifdef PADDLE_WITH_HIP
hipMemcpy(gpu_keys, keys.data(), keys.size() * sizeof(uint64_t*),
hipMemcpyHostToDevice);
hipMemcpy(gpu_len, slot_lengths_lod.data(),
slot_lengths.size() * sizeof(int64_t), hipMemcpyHostToDevice);
#else
cudaMemcpy(gpu_keys, keys.data(), keys.size() * sizeof(uint64_t*), cudaMemcpy(gpu_keys, keys.data(), keys.size() * sizeof(uint64_t*),
cudaMemcpyHostToDevice); cudaMemcpyHostToDevice);
cudaMemcpy(gpu_len, slot_lengths_lod.data(), cudaMemcpy(gpu_len, slot_lengths_lod.data(),
slot_lengths.size() * sizeof(int64_t), cudaMemcpyHostToDevice); slot_lengths.size() * sizeof(int64_t), cudaMemcpyHostToDevice);
#endif
this->CopyKeys(place, gpu_keys, total_keys, gpu_len, this->CopyKeys(place, gpu_keys, total_keys, gpu_len,
static_cast<int>(slot_lengths.size()), static_cast<int>(slot_lengths.size()),
static_cast<int>(total_length)); static_cast<int>(total_length));
...@@ -124,7 +130,7 @@ void BoxWrapper::PushSparseGradCase( ...@@ -124,7 +130,7 @@ void BoxWrapper::PushSparseGradCase(
PADDLE_THROW(platform::errors::Unimplemented( PADDLE_THROW(platform::errors::Unimplemented(
"Warning:: CPUPlace is not supported in PaddleBox now.")); "Warning:: CPUPlace is not supported in PaddleBox now."));
} else if (platform::is_gpu_place(place)) { } else if (platform::is_gpu_place(place)) {
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) #if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && !defined(_WIN32)
int device_id = BOOST_GET_CONST(platform::CUDAPlace, place).GetDeviceId(); int device_id = BOOST_GET_CONST(platform::CUDAPlace, place).GetDeviceId();
LoDTensor& cached_total_keys_tensor = keys_tensor[device_id]; LoDTensor& cached_total_keys_tensor = keys_tensor[device_id];
uint64_t* total_keys = uint64_t* total_keys =
......
...@@ -698,13 +698,14 @@ void FleetWrapper::PushDenseVarsSync( ...@@ -698,13 +698,14 @@ void FleetWrapper::PushDenseVarsSync(
Scope* scope, const uint64_t table_id, Scope* scope, const uint64_t table_id,
const std::vector<std::string>& var_names) {} const std::vector<std::string>& var_names) {}
#if (defined PADDLE_WITH_CUDA) && (defined PADDLE_WITH_PSLIB) #if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && \
(defined PADDLE_WITH_PSLIB)
void FleetWrapper::PushDenseVarsAsync( void FleetWrapper::PushDenseVarsAsync(
const Scope& scope, const uint64_t table_id, const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names, const std::vector<std::string>& var_names,
std::vector<::std::future<int32_t>>* push_sparse_status, std::vector<::std::future<int32_t>>* push_sparse_status,
float scale_datanorm, int batch_size, const paddle::platform::Place& place, float scale_datanorm, int batch_size, const paddle::platform::Place& place,
cudaStream_t stream, cudaEvent_t event) { gpuStream_t stream, gpuEvent_t event) {
std::vector<paddle::ps::Region> regions; std::vector<paddle::ps::Region> regions;
for (auto& t : var_names) { for (auto& t : var_names) {
Variable* var = scope.FindVar(t); Variable* var = scope.FindVar(t);
...@@ -719,8 +720,13 @@ void FleetWrapper::PushDenseVarsAsync( ...@@ -719,8 +720,13 @@ void FleetWrapper::PushDenseVarsAsync(
memory::Copy(platform::CUDAPinnedPlace(), pin_g, memory::Copy(platform::CUDAPinnedPlace(), pin_g,
BOOST_GET_CONST(platform::CUDAPlace, place), g_data, BOOST_GET_CONST(platform::CUDAPlace, place), g_data,
sizeof(float) * count, stream); sizeof(float) * count, stream);
#ifdef PADDLE_WITH_HIP
PADDLE_ENFORCE_CUDA_SUCCESS(hipEventRecord(event, stream));
hipEventSynchronize(event);
#else
PADDLE_ENFORCE_CUDA_SUCCESS(cudaEventRecord(event, stream)); PADDLE_ENFORCE_CUDA_SUCCESS(cudaEventRecord(event, stream));
cudaEventSynchronize(event); cudaEventSynchronize(event);
#endif
float* g = pin_g; float* g = pin_g;
if (scale_datanorm >= 0) { if (scale_datanorm >= 0) {
......
...@@ -152,14 +152,14 @@ class FleetWrapper { ...@@ -152,14 +152,14 @@ class FleetWrapper {
// Push dense variables to server in async mode // Push dense variables to server in async mode
// Param<in>: scope, table_id, var_names, scale_datanorm, batch_size // Param<in>: scope, table_id, var_names, scale_datanorm, batch_size
// Param<out>: push_sparse_status // Param<out>: push_sparse_status
#ifdef PADDLE_WITH_CUDA #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
void PushDenseVarsAsync( void PushDenseVarsAsync(
const Scope& scope, const uint64_t table_id, const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names, const std::vector<std::string>& var_names,
std::vector<::std::future<int32_t>>* push_sparse_status, std::vector<::std::future<int32_t>>* push_sparse_status,
float scale_datanorm, int batch_size, float scale_datanorm, int batch_size,
const paddle::platform::Place& place, cudaStream_t stream, const paddle::platform::Place& place, gpuStream_t stream,
cudaEvent_t event); gpuEvent_t event);
#endif #endif
#ifdef PADDLE_WITH_XPU #ifdef PADDLE_WITH_XPU
void PushDenseVarsAsync( void PushDenseVarsAsync(
......
...@@ -14,7 +14,8 @@ limitations under the License. */ ...@@ -14,7 +14,8 @@ limitations under the License. */
#pragma once #pragma once
#if (defined PADDLE_WITH_NCCL) && (defined PADDLE_WITH_PSLIB) #if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
(defined PADDLE_WITH_PSLIB)
#include <algorithm> #include <algorithm>
#include <map> #include <map>
......
nv_library(heter_comm SRCS heter_comm.h feature_value.h heter_resource.cc IF(WITH_GPU)
heter_resource.h hashtable.h DEPS cub device_context) nv_library(heter_comm SRCS heter_comm.h feature_value.h heter_resource.cc heter_resource.h hashtable.h DEPS cub device_context)
nv_test(test_heter_comm SRCS test_heter_comm.cu feature_value.h DEPS nv_test(test_heter_comm SRCS test_heter_comm.cu feature_value.h DEPS heter_comm)
heter_comm) nv_library(heter_ps SRCS heter_ps.cu DEPS heter_comm)
ENDIF()
nv_library(heter_ps SRCS heter_ps.cu DEPS heter_comm) IF(WITH_ROCM)
hip_library(heter_comm SRCS heter_comm.h feature_value.h heter_resource.cc heter_resource.h hashtable.h DEPS cub device_context)
hip_test(test_heter_comm SRCS test_heter_comm.cu feature_value.h DEPS heter_comm)
hip_library(heter_ps SRCS heter_ps.cu DEPS heter_comm)
ENDIF()
...@@ -45,15 +45,15 @@ class HashTable { ...@@ -45,15 +45,15 @@ class HashTable {
HashTable(const HashTable&) = delete; HashTable(const HashTable&) = delete;
HashTable& operator=(const HashTable&) = delete; HashTable& operator=(const HashTable&) = delete;
void insert(const KeyType* d_keys, const ValType* d_vals, size_t len, void insert(const KeyType* d_keys, const ValType* d_vals, size_t len,
cudaStream_t stream); gpuStream_t stream);
void get(const KeyType* d_keys, ValType* d_vals, size_t len, void get(const KeyType* d_keys, ValType* d_vals, size_t len,
cudaStream_t stream); gpuStream_t stream);
void show(); void show();
void dump_to_cpu(int devid, cudaStream_t stream); void dump_to_cpu(int devid, cudaStream_t stream);
template <typename GradType, typename Sgd> template <typename GradType, typename Sgd>
void update(const KeyType* d_keys, const GradType* d_grads, size_t len, void update(const KeyType* d_keys, const GradType* d_grads, size_t len,
Sgd sgd, cudaStream_t stream); Sgd sgd, gpuStream_t stream);
private: private:
TableContainer<KeyType, ValType>* container_; TableContainer<KeyType, ValType>* container_;
......
...@@ -87,7 +87,7 @@ void HashTable<KeyType, ValType>::show() { ...@@ -87,7 +87,7 @@ void HashTable<KeyType, ValType>::show() {
template <typename KeyType, typename ValType> template <typename KeyType, typename ValType>
void HashTable<KeyType, ValType>::get(const KeyType* d_keys, ValType* d_vals, void HashTable<KeyType, ValType>::get(const KeyType* d_keys, ValType* d_vals,
size_t len, cudaStream_t stream) { size_t len, gpuStream_t stream) {
if (len == 0) { if (len == 0) {
return; return;
} }
...@@ -99,7 +99,7 @@ void HashTable<KeyType, ValType>::get(const KeyType* d_keys, ValType* d_vals, ...@@ -99,7 +99,7 @@ void HashTable<KeyType, ValType>::get(const KeyType* d_keys, ValType* d_vals,
template <typename KeyType, typename ValType> template <typename KeyType, typename ValType>
void HashTable<KeyType, ValType>::insert(const KeyType* d_keys, void HashTable<KeyType, ValType>::insert(const KeyType* d_keys,
const ValType* d_vals, size_t len, const ValType* d_vals, size_t len,
cudaStream_t stream) { gpuStream_t stream) {
if (len == 0) { if (len == 0) {
return; return;
} }
...@@ -147,7 +147,7 @@ template <typename KeyType, typename ValType> ...@@ -147,7 +147,7 @@ template <typename KeyType, typename ValType>
template <typename GradType, typename Sgd> template <typename GradType, typename Sgd>
void HashTable<KeyType, ValType>::update(const KeyType* d_keys, void HashTable<KeyType, ValType>::update(const KeyType* d_keys,
const GradType* d_grads, size_t len, const GradType* d_grads, size_t len,
Sgd sgd, cudaStream_t stream) { Sgd sgd, gpuStream_t stream) {
if (len == 0) { if (len == 0) {
return; return;
} }
......
...@@ -25,7 +25,7 @@ __global__ void fill_idx(T* idx, size_t len) { ...@@ -25,7 +25,7 @@ __global__ void fill_idx(T* idx, size_t len) {
} }
template <typename T> template <typename T>
void show_tensor(T* input, size_t len, cudaStream_t stream, std::string name) { void show_tensor(T* input, size_t len, gpuStream_t stream, std::string name) {
T tmp[len]; T tmp[len];
cudaMemcpyAsync(&tmp, input, sizeof(T) * len, cudaMemcpyDeviceToHost, stream); cudaMemcpyAsync(&tmp, input, sizeof(T) * len, cudaMemcpyDeviceToHost, stream);
cudaStreamSynchronize(stream); cudaStreamSynchronize(stream);
...@@ -270,7 +270,7 @@ void HeterComm<KeyType, ValType, GradType>::build_ps(int num, KeyType* h_keys, ...@@ -270,7 +270,7 @@ void HeterComm<KeyType, ValType, GradType>::build_ps(int num, KeyType* h_keys,
std::vector<std::shared_ptr<memory::Allocation>> d_key_bufs; std::vector<std::shared_ptr<memory::Allocation>> d_key_bufs;
std::vector<std::shared_ptr<memory::Allocation>> d_val_bufs; std::vector<std::shared_ptr<memory::Allocation>> d_val_bufs;
cudaStream_t streams[stream_num]; gpuStream_t streams[stream_num];
for (int i = 0; i < stream_num; ++i) { for (int i = 0; i < stream_num; ++i) {
PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamCreate(&(streams[i]))); PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamCreate(&(streams[i])));
auto d_k_buf = memory::AllocShared(place, chunk_size * sizeof(KeyType)); auto d_k_buf = memory::AllocShared(place, chunk_size * sizeof(KeyType));
......
...@@ -34,16 +34,16 @@ class GPUResource { ...@@ -34,16 +34,16 @@ class GPUResource {
int dev_id() const { return dev_id_; } int dev_id() const { return dev_id_; }
int index() const { return index_; } int index() const { return index_; }
cudaStream_t local_stream(int num) { return local_streams_[num]; } gpuStream_t local_stream(int num) { return local_streams_[num]; }
cudaStream_t remote_stream() { return remote_stream_; } gpuStream_t remote_stream() { return remote_stream_; }
cudaStream_t comm_stream(int num) { return comm_streams_[num]; } gpuStream_t comm_stream(int num) { return comm_streams_[num]; }
int dev_id_; int dev_id_;
int index_; int index_;
std::vector<int> dev_ids_; std::vector<int> dev_ids_;
cudaStream_t remote_stream_; gpuStream_t remote_stream_;
std::vector<cudaStream_t> local_streams_; std::vector<gpuStream_t> local_streams_;
std::vector<cudaStream_t> comm_streams_; std::vector<gpuStream_t> comm_streams_;
}; };
class HeterPsResource { class HeterPsResource {
...@@ -56,9 +56,9 @@ class HeterPsResource { ...@@ -56,9 +56,9 @@ class HeterPsResource {
int total_gpu(); int total_gpu();
int get_index_by_devid(int devid); int get_index_by_devid(int devid);
int dev_id(int num); int dev_id(int num);
cudaStream_t local_stream(int gpu_num, int stream_num); gpuStream_t local_stream(int gpu_num, int stream_num);
cudaStream_t remote_stream(int gpu_num); gpuStream_t remote_stream(int gpu_num);
cudaStream_t comm_stream(int gpu_num, int stream_num); gpuStream_t comm_stream(int gpu_num, int stream_num);
std::vector<std::shared_ptr<GPUResource>> resources_; std::vector<std::shared_ptr<GPUResource>> resources_;
std::vector<int> dev_ids_; std::vector<int> dev_ids_;
......
...@@ -114,7 +114,7 @@ void HeterWrapper::SerializeToReq(const std::string& varname, Scope* scope, ...@@ -114,7 +114,7 @@ void HeterWrapper::SerializeToReq(const std::string& varname, Scope* scope,
memcpy(data_ptr, tensor->data<void>(), memcpy(data_ptr, tensor->data<void>(),
tensor->numel() * SizeOfType(tensor->type())); tensor->numel() * SizeOfType(tensor->type()));
} else { } else {
#ifdef PADDLE_WITH_CUDA #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
memory::Copy(platform::CPUPlace(), data_ptr, memory::Copy(platform::CPUPlace(), data_ptr,
BOOST_GET_CONST(platform::CUDAPlace, tensor->place()), BOOST_GET_CONST(platform::CUDAPlace, tensor->place()),
tensor->data<void>(), tensor->data<void>(),
...@@ -129,11 +129,11 @@ void HeterWrapper::SerializeToReq(const std::string& varname, Scope* scope, ...@@ -129,11 +129,11 @@ void HeterWrapper::SerializeToReq(const std::string& varname, Scope* scope,
} }
} }
#ifdef PADDLE_WITH_CUDA #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
void HeterWrapper::DeSerializeToTensor(Scope* scope, void HeterWrapper::DeSerializeToTensor(Scope* scope,
const VariableMessage& req_var, const VariableMessage& req_var,
platform::Place place, platform::Place place,
cudaStream_t stream) { gpuStream_t stream) {
// const VariableMessage& req_var = request->vars(); // const VariableMessage& req_var = request->vars();
auto* var = scope->FindVar(req_var.varname()); auto* var = scope->FindVar(req_var.varname());
auto* tensor = var->GetMutable<LoDTensor>(); auto* tensor = var->GetMutable<LoDTensor>();
...@@ -157,7 +157,7 @@ void HeterWrapper::DeSerializeToTensor(Scope* scope, ...@@ -157,7 +157,7 @@ void HeterWrapper::DeSerializeToTensor(Scope* scope,
void* tensor_data = void* tensor_data =
tensor->mutable_data(place, ToVarType(req_var.data_type())); tensor->mutable_data(place, ToVarType(req_var.data_type()));
#ifdef PADDLE_WITH_CUDA #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, place), tensor_data, memory::Copy(BOOST_GET_CONST(platform::CUDAPlace, place), tensor_data,
platform::CPUPlace(), req_var.data().data(), platform::CPUPlace(), req_var.data().data(),
tensor->numel() * SizeOfType(tensor->type()), stream); tensor->numel() * SizeOfType(tensor->type()), stream);
......
...@@ -86,9 +86,9 @@ class HeterWrapper { ...@@ -86,9 +86,9 @@ class HeterWrapper {
framework::proto::VarType::Type ToVarType(VariableMessage::Type type); framework::proto::VarType::Type ToVarType(VariableMessage::Type type);
#ifdef PADDLE_WITH_CUDA #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
void DeSerializeToTensor(Scope* scope, const VariableMessage& req_var, void DeSerializeToTensor(Scope* scope, const VariableMessage& req_var,
platform::Place place, cudaStream_t stream); platform::Place place, gpuStream_t stream);
#endif #endif
void DeSerializeToTensor(Scope* scope, const VariableMessage& req_var, void DeSerializeToTensor(Scope* scope, const VariableMessage& req_var,
platform::Place place); platform::Place place);
......
...@@ -21,7 +21,7 @@ std::shared_ptr<NCCLWrapper> NCCLWrapper::s_instance_ = NULL; ...@@ -21,7 +21,7 @@ std::shared_ptr<NCCLWrapper> NCCLWrapper::s_instance_ = NULL;
bool NCCLWrapper::is_initialized_ = false; bool NCCLWrapper::is_initialized_ = false;
void NCCLWrapper::InitNCCL() { void NCCLWrapper::InitNCCL() {
#if defined(PADDLE_WITH_NCCL) #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclCommInitRank( PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclCommInitRank(
&(nccl_info_.comm_), nccl_info_.global_ranks_, nccl_info_.nccl_id_, &(nccl_info_.comm_), nccl_info_.global_ranks_, nccl_info_.nccl_id_,
nccl_info_.my_global_rank_)); nccl_info_.my_global_rank_));
...@@ -30,14 +30,14 @@ void NCCLWrapper::InitNCCL() { ...@@ -30,14 +30,14 @@ void NCCLWrapper::InitNCCL() {
} }
void NCCLWrapper::SetNCCLId(const NCCLInfo& nccl_info) { void NCCLWrapper::SetNCCLId(const NCCLInfo& nccl_info) {
#if defined(PADDLE_WITH_NCCL) #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
nccl_info_.nccl_id_ = nccl_info.nccl_id_; nccl_info_.nccl_id_ = nccl_info.nccl_id_;
#endif #endif
return; return;
} }
NCCLInfo NCCLWrapper::GetNCCLId() { NCCLInfo NCCLWrapper::GetNCCLId() {
#if defined(PADDLE_WITH_NCCL) #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
PADDLE_ENFORCE_CUDA_SUCCESS( PADDLE_ENFORCE_CUDA_SUCCESS(
platform::dynload::ncclGetUniqueId(&(nccl_info_.nccl_id_))); platform::dynload::ncclGetUniqueId(&(nccl_info_.nccl_id_)));
#endif #endif
...@@ -46,19 +46,23 @@ NCCLInfo NCCLWrapper::GetNCCLId() { ...@@ -46,19 +46,23 @@ NCCLInfo NCCLWrapper::GetNCCLId() {
void NCCLWrapper::SetRankInfo(const int local_rank, const int global_rank, void NCCLWrapper::SetRankInfo(const int local_rank, const int global_rank,
const int ranks) { const int ranks) {
#if defined(PADDLE_WITH_NCCL) #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
nccl_info_.local_rank_ = local_rank; nccl_info_.local_rank_ = local_rank;
nccl_info_.my_global_rank_ = global_rank; nccl_info_.my_global_rank_ = global_rank;
nccl_info_.global_ranks_ = ranks; nccl_info_.global_ranks_ = ranks;
platform::SetDeviceId(local_rank); platform::SetDeviceId(local_rank);
#ifdef PADDLE_WITH_RCCL
PADDLE_ENFORCE_CUDA_SUCCESS(hipStreamCreate(&(nccl_info_.stream_)));
#else
PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamCreate(&(nccl_info_.stream_))); PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamCreate(&(nccl_info_.stream_)));
#endif
#endif #endif
return; return;
} }
void NCCLWrapper::SyncVar(const int root_rank, const Scope& scope, void NCCLWrapper::SyncVar(const int root_rank, const Scope& scope,
const std::vector<std::string>& var_names) { const std::vector<std::string>& var_names) {
#if defined(PADDLE_WITH_NCCL) #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
for (auto& name : var_names) { for (auto& name : var_names) {
auto var = scope.FindVar(name); auto var = scope.FindVar(name);
LoDTensor* tensor = var->GetMutable<LoDTensor>(); LoDTensor* tensor = var->GetMutable<LoDTensor>();
...@@ -66,7 +70,11 @@ void NCCLWrapper::SyncVar(const int root_rank, const Scope& scope, ...@@ -66,7 +70,11 @@ void NCCLWrapper::SyncVar(const int root_rank, const Scope& scope,
PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclBcast( PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclBcast(
reinterpret_cast<void*>(tensor->data<float>()), total_size, ncclFloat, reinterpret_cast<void*>(tensor->data<float>()), total_size, ncclFloat,
root_rank, nccl_info_.comm_, nccl_info_.stream_)); root_rank, nccl_info_.comm_, nccl_info_.stream_));
#ifdef PADDLE_WITH_RCCL
hipStreamSynchronize(nccl_info_.stream_);
#else
cudaStreamSynchronize(nccl_info_.stream_); cudaStreamSynchronize(nccl_info_.stream_);
#endif
} }
#endif #endif
return; return;
......
...@@ -25,9 +25,12 @@ limitations under the License. */ ...@@ -25,9 +25,12 @@ limitations under the License. */
#include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/variable_helper.h" #include "paddle/fluid/framework/variable_helper.h"
#if defined(PADDLE_WITH_NCCL) #ifdef PADDLE_WITH_NCCL
#include "paddle/fluid/platform/dynload/nccl.h" #include "paddle/fluid/platform/dynload/nccl.h"
#endif #endif
#ifdef PADDLE_WITH_RCCL
#include "paddle/fluid/platform/dynload/rccl.h"
#endif
#include "paddle/fluid/platform/macros.h" // for DISABLE_COPY_AND_ASSIGN #include "paddle/fluid/platform/macros.h" // for DISABLE_COPY_AND_ASSIGN
namespace paddle { namespace paddle {
...@@ -48,10 +51,10 @@ class NCCLInfo { ...@@ -48,10 +51,10 @@ class NCCLInfo {
int local_rank_; int local_rank_;
int global_ranks_; int global_ranks_;
int my_global_rank_; int my_global_rank_;
#if defined(PADDLE_WITH_NCCL) #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
ncclUniqueId nccl_id_; ncclUniqueId nccl_id_;
ncclComm_t comm_; ncclComm_t comm_;
cudaStream_t stream_; gpuStream_t stream_;
#endif #endif
}; };
......
...@@ -26,7 +26,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ...@@ -26,7 +26,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#if (defined PADDLE_WITH_NCCL) && (defined PADDLE_WITH_PSLIB) #if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
(defined PADDLE_WITH_PSLIB)
#include <algorithm> #include <algorithm>
#include <deque> #include <deque>
......
...@@ -14,7 +14,8 @@ limitations under the License. */ ...@@ -14,7 +14,8 @@ limitations under the License. */
#pragma once #pragma once
#if (defined PADDLE_WITH_NCCL) && (defined PADDLE_WITH_PSLIB) #if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
(defined PADDLE_WITH_PSLIB)
#include <atomic> #include <atomic>
#include <ctime> #include <ctime>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册