未验证 提交 27a5f52b 编写于 作者: T Thunderbrook 提交者: GitHub

[HeterPs] fix allocation (#37476)

* auc temp

* cuballocator

* code format

* code format
上级 5b962bd9
...@@ -16,6 +16,7 @@ limitations under the License. */ ...@@ -16,6 +16,7 @@ limitations under the License. */
#include <thread> #include <thread>
#include <vector> #include <vector>
#include "cub/cub.cuh" #include "cub/cub.cuh"
#include "cub/util_allocator.cuh"
#include "hashtable.h" #include "hashtable.h"
#include "heter_resource.h" #include "heter_resource.h"
#include "paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h" #include "paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h"
...@@ -163,9 +164,9 @@ class HeterComm { ...@@ -163,9 +164,9 @@ class HeterComm {
}; };
void init_path(); void init_path();
void create_storage(
int start_index, int end_index, int keylen, int vallen, void create_storage(int start_index, int end_index, int keylen, int vallen);
std::vector<std::shared_ptr<memory::Allocation>>& local_strorage); void destroy_storage(int start_index, int end_index);
void walk_to_dest(int start_index, int gpu_num, int* h_left, int* h_right, void walk_to_dest(int start_index, int gpu_num, int* h_left, int* h_right,
KeyType* src_key, GradType* src_val); KeyType* src_key, GradType* src_val);
void walk_to_src(int start_index, int gpu_num, int* h_left, int* h_right, void walk_to_src(int start_index, int gpu_num, int* h_left, int* h_right,
...@@ -178,7 +179,7 @@ class HeterComm { ...@@ -178,7 +179,7 @@ class HeterComm {
std::vector<Table*> tables_; std::vector<Table*> tables_;
std::shared_ptr<HeterPsResource> resource_; std::shared_ptr<HeterPsResource> resource_;
CustomGradMerger merger_; CustomGradMerger merger_;
int topo_aware_{1}; int topo_aware_{0};
std::vector<std::vector<Path>> path_; std::vector<std::vector<Path>> path_;
std::vector<LocalStorage> storage_; std::vector<LocalStorage> storage_;
int feanum_{1800 * 2048}; int feanum_{1800 * 2048};
...@@ -186,6 +187,7 @@ class HeterComm { ...@@ -186,6 +187,7 @@ class HeterComm {
std::vector<ncclComm_t> nccl_inner_comms_; std::vector<ncclComm_t> nccl_inner_comms_;
std::vector<ncclComm_t> nccl_inter_comms_; std::vector<ncclComm_t> nccl_inter_comms_;
int node_size_; int node_size_;
std::vector<std::shared_ptr<cub::CachingDeviceAllocator>> allocators_;
}; };
} // end namespace framework } // end namespace framework
......
...@@ -100,6 +100,8 @@ HeterComm<KeyType, ValType, GradType>::HeterComm( ...@@ -100,6 +100,8 @@ HeterComm<KeyType, ValType, GradType>::HeterComm(
storage_.resize(resource_->total_gpu()); storage_.resize(resource_->total_gpu());
for (int i = 0; i < resource_->total_gpu(); ++i) { for (int i = 0; i < resource_->total_gpu(); ++i) {
platform::CUDADeviceGuard guard(resource_->dev_id(i)); platform::CUDADeviceGuard guard(resource_->dev_id(i));
allocators_.push_back(std::make_shared<cub::CachingDeviceAllocator>(
8, 1, (unsigned int)-1, (size_t)-1, false, false));
auto table = new Table(capacity / load_factor_); auto table = new Table(capacity / load_factor_);
tables_.push_back(table); tables_.push_back(table);
if (multi_node_) { if (multi_node_) {
...@@ -115,14 +117,14 @@ void HeterComm<KeyType, ValType, GradType>::init_path() { ...@@ -115,14 +117,14 @@ void HeterComm<KeyType, ValType, GradType>::init_path() {
path_.resize(total_gpu); path_.resize(total_gpu);
if (!topo_aware_) { if (!topo_aware_) {
VLOG(3) << "init path without topo aware"; VLOG(0) << "init path without topo aware";
for (int i = 0; i < total_gpu; ++i) { for (int i = 0; i < total_gpu; ++i) {
path_[i].resize(total_gpu); path_[i].resize(total_gpu);
for (int j = 0; j < total_gpu; ++j) { for (int j = 0; j < total_gpu; ++j) {
auto& nodes = path_[i][j].nodes_; auto& nodes = path_[i][j].nodes_;
nodes.resize(1); nodes.resize(1);
nodes[0].in_stream = resource_->comm_stream(i, j); nodes[0].in_stream = resource_->comm_stream(i, j);
nodes[0].out_stream = resource_->comm_stream(j, i); nodes[0].out_stream = resource_->comm_stream(i, j);
nodes[0].key_storage = NULL; nodes[0].key_storage = NULL;
nodes[0].val_storage = NULL; nodes[0].val_storage = NULL;
nodes[0].sync = 0; nodes[0].sync = 0;
...@@ -130,7 +132,7 @@ void HeterComm<KeyType, ValType, GradType>::init_path() { ...@@ -130,7 +132,7 @@ void HeterComm<KeyType, ValType, GradType>::init_path() {
} }
} }
} else { } else {
VLOG(3) << "init path with topo aware"; VLOG(0) << "init path with topo aware";
for (int i = 0; i < total_gpu; ++i) { for (int i = 0; i < total_gpu; ++i) {
path_[i].resize(total_gpu); path_[i].resize(total_gpu);
for (int j = 0; j < total_gpu; ++j) { for (int j = 0; j < total_gpu; ++j) {
...@@ -163,26 +165,41 @@ void HeterComm<KeyType, ValType, GradType>::init_path() { ...@@ -163,26 +165,41 @@ void HeterComm<KeyType, ValType, GradType>::init_path() {
} }
template <typename KeyType, typename ValType, typename GradType> template <typename KeyType, typename ValType, typename GradType>
void HeterComm<KeyType, ValType, GradType>::create_storage( void HeterComm<KeyType, ValType, GradType>::create_storage(int start_index,
int start_index, int end_index, int keylen, int vallen, int end_index,
std::vector<std::shared_ptr<memory::Allocation>>& local_storage) { int keylen,
int vallen) {
auto& allocator = allocators_[start_index];
auto& nodes = path_[start_index][end_index].nodes_; auto& nodes = path_[start_index][end_index].nodes_;
for (size_t i = 0; i < nodes.size(); ++i) { for (size_t i = 0; i < nodes.size(); ++i) {
platform::CUDADeviceGuard guard(resource_->dev_id(nodes[i].gpu_num)); platform::CUDADeviceGuard guard(resource_->dev_id(nodes[i].gpu_num));
platform::CUDAPlace remote_place = allocator->DeviceAllocate(
platform::CUDAPlace(resource_->dev_id(nodes[i].gpu_num)); resource_->dev_id(nodes[i].gpu_num), (void**)&(nodes[i].key_storage),
auto key_mem = memory::AllocShared(remote_place, keylen); keylen, resource_->remote_stream(nodes[i].gpu_num, start_index));
local_storage.push_back(key_mem); allocator->DeviceAllocate(
nodes[i].key_storage = reinterpret_cast<char*>(key_mem->ptr()); resource_->dev_id(nodes[i].gpu_num), (void**)&(nodes[i].val_storage),
vallen, resource_->remote_stream(nodes[i].gpu_num, start_index));
auto val_mem = memory::AllocShared(remote_place, vallen);
local_storage.push_back(val_mem);
nodes[i].val_storage = reinterpret_cast<char*>(val_mem->ptr());
nodes[i].key_bytes_len = keylen; nodes[i].key_bytes_len = keylen;
nodes[i].val_bytes_len = vallen; nodes[i].val_bytes_len = vallen;
} }
} }
template <typename KeyType, typename ValType, typename GradType>
void HeterComm<KeyType, ValType, GradType>::destroy_storage(int start_index,
int end_index) {
auto& allocator = allocators_[start_index];
auto& nodes = path_[start_index][end_index].nodes_;
for (size_t i = 0; i < nodes.size(); ++i) {
platform::CUDADeviceGuard guard(resource_->dev_id(nodes[i].gpu_num));
allocator->DeviceFree(resource_->dev_id(nodes[i].gpu_num),
nodes[i].key_storage);
allocator->DeviceFree(resource_->dev_id(nodes[i].gpu_num),
nodes[i].val_storage);
}
}
template <typename KeyType, typename ValType, typename GradType> template <typename KeyType, typename ValType, typename GradType>
void HeterComm<KeyType, ValType, GradType>::walk_to_dest( void HeterComm<KeyType, ValType, GradType>::walk_to_dest(
int start_index, int gpu_num, int* h_left, int* h_right, KeyType* src_key, int start_index, int gpu_num, int* h_left, int* h_right, KeyType* src_key,
...@@ -482,8 +499,8 @@ void HeterComm<KeyType, ValType, GradType>::pull_sparse(int num, ...@@ -482,8 +499,8 @@ void HeterComm<KeyType, ValType, GradType>::pull_sparse(int num,
int* d_left_ptr = reinterpret_cast<int*>(d_left->ptr()); int* d_left_ptr = reinterpret_cast<int*>(d_left->ptr());
int* d_right_ptr = reinterpret_cast<int*>(d_right->ptr()); int* d_right_ptr = reinterpret_cast<int*>(d_right->ptr());
cudaMemset(d_left_ptr, -1, total_gpu * sizeof(int)); cudaMemsetAsync(d_left_ptr, -1, total_gpu * sizeof(int), stream);
cudaMemset(d_right_ptr, -1, total_gpu * sizeof(int)); cudaMemsetAsync(d_right_ptr, -1, total_gpu * sizeof(int), stream);
// //
auto d_idx = memory::AllocShared(place, len * sizeof(int)); auto d_idx = memory::AllocShared(place, len * sizeof(int));
int* d_idx_ptr = reinterpret_cast<int*>(d_idx->ptr()); int* d_idx_ptr = reinterpret_cast<int*>(d_idx->ptr());
...@@ -505,15 +522,13 @@ void HeterComm<KeyType, ValType, GradType>::pull_sparse(int num, ...@@ -505,15 +522,13 @@ void HeterComm<KeyType, ValType, GradType>::pull_sparse(int num,
cudaMemcpy(h_right, d_right_ptr, total_gpu * sizeof(int), cudaMemcpy(h_right, d_right_ptr, total_gpu * sizeof(int),
cudaMemcpyDeviceToHost); cudaMemcpyDeviceToHost);
std::vector<std::shared_ptr<memory::Allocation>> local_storage;
for (int i = 0; i < total_gpu; ++i) { for (int i = 0; i < total_gpu; ++i) {
int shard_len = h_right[i] - h_left[i] + 1; int shard_len = h_right[i] - h_left[i] + 1;
if (shard_len == 0) { if (shard_len == 0) {
continue; continue;
} }
create_storage(num, i, shard_len * sizeof(KeyType), create_storage(num, i, shard_len * sizeof(KeyType),
shard_len * sizeof(ValType), local_storage); shard_len * sizeof(ValType));
} }
walk_to_dest(num, total_gpu, h_left, h_right, d_shard_keys_ptr, NULL); walk_to_dest(num, total_gpu, h_left, h_right, d_shard_keys_ptr, NULL);
...@@ -533,6 +548,9 @@ void HeterComm<KeyType, ValType, GradType>::pull_sparse(int num, ...@@ -533,6 +548,9 @@ void HeterComm<KeyType, ValType, GradType>::pull_sparse(int num,
} }
for (int i = 0; i < total_gpu; ++i) { for (int i = 0; i < total_gpu; ++i) {
cudaStreamSynchronize(resource_->remote_stream(i, num)); cudaStreamSynchronize(resource_->remote_stream(i, num));
if (h_left[i] == -1) {
continue;
}
tables_[i]->rwlock_->UNLock(); tables_[i]->rwlock_->UNLock();
} }
...@@ -546,6 +564,9 @@ void HeterComm<KeyType, ValType, GradType>::pull_sparse(int num, ...@@ -546,6 +564,9 @@ void HeterComm<KeyType, ValType, GradType>::pull_sparse(int num,
fill_dvals<<<grid_size, block_size_, 0, stream>>>(d_shard_vals_ptr, d_vals, fill_dvals<<<grid_size, block_size_, 0, stream>>>(d_shard_vals_ptr, d_vals,
d_idx_ptr, len); d_idx_ptr, len);
cudaStreamSynchronize(stream); cudaStreamSynchronize(stream);
for (int i = 0; i < total_gpu; ++i) {
destroy_storage(num, i);
}
} }
template <typename KeyType, typename ValType, typename GradType> template <typename KeyType, typename ValType, typename GradType>
...@@ -572,8 +593,8 @@ void HeterComm<KeyType, ValType, GradType>::push_sparse(int gpu_num, ...@@ -572,8 +593,8 @@ void HeterComm<KeyType, ValType, GradType>::push_sparse(int gpu_num,
int* d_left_ptr = reinterpret_cast<int*>(d_left->ptr()); int* d_left_ptr = reinterpret_cast<int*>(d_left->ptr());
int* d_right_ptr = reinterpret_cast<int*>(d_right->ptr()); int* d_right_ptr = reinterpret_cast<int*>(d_right->ptr());
cudaMemset(d_left_ptr, -1, total_gpu * sizeof(int)); cudaMemsetAsync(d_left_ptr, -1, total_gpu * sizeof(int), stream);
cudaMemset(d_right_ptr, -1, total_gpu * sizeof(int)); cudaMemsetAsync(d_right_ptr, -1, total_gpu * sizeof(int), stream);
// //
auto d_idx = memory::AllocShared(place, len * sizeof(int)); auto d_idx = memory::AllocShared(place, len * sizeof(int));
int* d_idx_ptr = reinterpret_cast<int*>(d_idx->ptr()); int* d_idx_ptr = reinterpret_cast<int*>(d_idx->ptr());
...@@ -603,14 +624,13 @@ void HeterComm<KeyType, ValType, GradType>::push_sparse(int gpu_num, ...@@ -603,14 +624,13 @@ void HeterComm<KeyType, ValType, GradType>::push_sparse(int gpu_num,
cudaMemcpy(h_right, d_right_ptr, total_gpu * sizeof(int), cudaMemcpy(h_right, d_right_ptr, total_gpu * sizeof(int),
cudaMemcpyDeviceToHost); cudaMemcpyDeviceToHost);
std::vector<std::shared_ptr<memory::Allocation>> local_storage;
for (int i = 0; i < total_gpu; ++i) { for (int i = 0; i < total_gpu; ++i) {
int shard_len = h_right[i] - h_left[i] + 1; int shard_len = h_right[i] - h_left[i] + 1;
if (h_left[i] == -1 || h_right[i] == -1) { if (h_left[i] == -1 || h_right[i] == -1) {
continue; continue;
} }
create_storage(gpu_num, i, shard_len * sizeof(KeyType), create_storage(gpu_num, i, shard_len * sizeof(KeyType),
shard_len * sizeof(GradType), local_storage); shard_len * sizeof(GradType));
} }
walk_to_dest(gpu_num, total_gpu, h_left, h_right, d_shard_keys_ptr, walk_to_dest(gpu_num, total_gpu, h_left, h_right, d_shard_keys_ptr,
...@@ -632,7 +652,12 @@ void HeterComm<KeyType, ValType, GradType>::push_sparse(int gpu_num, ...@@ -632,7 +652,12 @@ void HeterComm<KeyType, ValType, GradType>::push_sparse(int gpu_num,
} }
for (int i = 0; i < total_gpu; ++i) { for (int i = 0; i < total_gpu; ++i) {
cudaStreamSynchronize(resource_->remote_stream(i, gpu_num)); cudaStreamSynchronize(resource_->remote_stream(i, gpu_num));
tables_[i]->rwlock_->UNLock(); if (h_left[i] != -1) {
tables_[i]->rwlock_->UNLock();
}
}
for (int i = 0; i < total_gpu; ++i) {
destroy_storage(gpu_num, i);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册