diff --git a/paddle/fluid/framework/fleet/heter_ps/CMakeLists.txt b/paddle/fluid/framework/fleet/heter_ps/CMakeLists.txt index 189724a5455200bdfbd0497aee53bc949df412e9..17346f5fd939324e6c2d709fb09be2cb65669429 100644 --- a/paddle/fluid/framework/fleet/heter_ps/CMakeLists.txt +++ b/paddle/fluid/framework/fleet/heter_ps/CMakeLists.txt @@ -10,6 +10,8 @@ IF(WITH_GPU) nv_library(heter_comm SRCS heter_comm.h feature_value.h heter_resource.cc heter_resource.h hashtable.h mem_pool.h DEPS ${HETERPS_DEPS}) nv_test(test_heter_comm SRCS feature_value.h DEPS heter_comm) nv_library(heter_ps SRCS heter_ps.cu DEPS heter_comm) + nv_library(graph_gpu_ps SRCS graph_gpu_ps_table.h DEPS heter_comm) + nv_test(test_graph_comm SRCS test_graph.cu DEPS graph_gpu_ps) ENDIF() IF(WITH_ROCM) hip_library(heter_comm SRCS heter_comm.h feature_value.h heter_resource.cc heter_resource.h hashtable.h DEPS cub device_context) diff --git a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table.h b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table.h new file mode 100644 index 0000000000000000000000000000000000000000..a6508bf96c00f835da4aee79503f16fa5451e794 --- /dev/null +++ b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table.h @@ -0,0 +1,144 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include "heter_comm.h" +#include "paddle/fluid/platform/enforce.h" +#ifdef PADDLE_WITH_HETERPS +namespace paddle { +namespace framework { +struct GpuPsGraphNode { + int64_t node_id; + int neighbor_size, neighbor_offset; + // this node's neighbor is stored on [neighbor_offset,neighbor_offset + + // neighbor_size) of int64_t *neighbor_list; +}; + +struct GpuPsCommGraph { + int64_t *neighbor_list; + GpuPsGraphNode *node_list; + int neighbor_size, node_size; + // the size of neighbor array and graph_node_list array + GpuPsCommGraph() + : neighbor_list(NULL), node_list(NULL), neighbor_size(0), node_size(0) {} + GpuPsCommGraph(int64_t *neighbor_list_, GpuPsGraphNode *node_list_, + int neighbor_size_, int node_size_) + : neighbor_list(neighbor_list_), + node_list(node_list_), + neighbor_size(neighbor_size_), + node_size(node_size_) {} +}; + +/* +suppose we have a graph like this + +0----3-----5----7 + \ |\ |\ + 17 8 9 1 2 + +we save the nodes in arbitrary order, +in this example,the order is +[0,5,1,2,7,3,8,9,17] +let us name this array u_id; +we record each node's neighbors: +0:3,17 +5:3,7 +1:7 +2:7 +7:1,2,5 +3:0,5,8,9 +8:3 +9:3 +17:0 + +by concatenating each node's neighbor_list in the order we save the node id. +we get [3,17,3,7,7,7,1,2,5,0,5,8,9,3,3,0] +this is the neighbor_list of GpuPsCommGraph +given this neighbor_list and the order to save node id, +we know, +node 0's neighbors are in the range [0,1] of neighbor_list +node 5's neighbors are in the range [2,3] of neighbor_list +node 1's neighbors are in the range [4,4] of neighbor_list +node 2:[5,5] +node 7:[6,6] +node 3:[9,12] +node 8:[13,13] +node 9:[14,14] +node 17:[15,15] +... +by the above information, +we generate a node_list:GpuPsGraphNode *graph_node_list in GpuPsCommGraph +of size 9, +where node_list[i].id = u_id[i] +then we have: +node_list[0]-> node_id:0, neighbor_size:2, neighbor_offset:0 +node_list[1]-> node_id:5, neighbor_size:2, neighbor_offset:2 +node_list[2]-> node_id:1, neighbor_size:1, neighbor_offset:4 +node_list[3]-> node_id:2, neighbor_size:1, neighbor_offset:5 +node_list[4]-> node_id:7, neighbor_size:3, neighbor_offset:6 +node_list[5]-> node_id:3, neighbor_size:4, neighbor_offset:9 +node_list[6]-> node_id:8, neighbor_size:1, neighbor_offset:13 +node_list[7]-> node_id:9, neighbor_size:1, neighbor_offset:14 +node_list[8]-> node_id:17, neighbor_size:1, neighbor_offset:15 +*/ +struct NeighborSampleResult { + int64_t *val; + int *actual_sample_size, sample_size, key_size; + NeighborSampleResult(int _sample_size, int _key_size) + : sample_size(_sample_size), key_size(_key_size) { + actual_sample_size = NULL; + val = NULL; + }; + ~NeighborSampleResult() { + if (val != NULL) cudaFree(val); + if (actual_sample_size != NULL) cudaFree(actual_sample_size); + } +}; + +struct NodeQueryResult { + int64_t *val; + int actual_sample_size; + NodeQueryResult() { + val = NULL; + actual_sample_size = 0; + }; + ~NodeQueryResult() { + if (val != NULL) cudaFree(val); + } +}; +class GpuPsGraphTable : public HeterComm { + public: + GpuPsGraphTable(std::shared_ptr resource) + : HeterComm(1, resource) { + load_factor_ = 0.25; + } + void build_graph_from_cpu(std::vector &cpu_node_list); + NodeQueryResult *graph_node_sample(int gpu_id, int sample_size); + NeighborSampleResult *graph_neighbor_sample(int gpu_id, int64_t *key, + int sample_size, int len); + NodeQueryResult *query_node_list(int gpu_id, int start, int query_size); + void clear_graph_info(); + void move_neighbor_sample_result_to_source_gpu(int gpu_id, int gpu_num, + int sample_size, int *h_left, + int *h_right, + int64_t *src_sample_res, + int *actual_sample_size); + + private: + std::vector gpu_graph_list; +}; +} +}; +#include "paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table_inl.h" +#endif diff --git a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table_inl.h b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table_inl.h new file mode 100644 index 0000000000000000000000000000000000000000..839c7e5468c6c6938c6b4cda3dd879c7366e7d6e --- /dev/null +++ b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table_inl.h @@ -0,0 +1,447 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#ifdef PADDLE_WITH_HETERPS +namespace paddle { +namespace framework { +/* +comment 0 +this kernel just serves as an example of how to sample nodes' neighbors. +feel free to modify it +index[0,len) saves the nodes' index +actual_size[0,len) is to save the sample size of each node. +for ith node in index, actual_size[i] = min(node i's neighbor size, sample size) +sample_result is to save the neighbor sampling result, its size is len * +sample_size; + +*/ + +__global__ void neighbor_sample_example(GpuPsCommGraph graph, int* index, + int* actual_size, + int64_t* sample_result, int sample_size, + int len) { + const size_t i = blockIdx.x * blockDim.x + threadIdx.x; + if (i < len) { + auto node_index = index[i]; + actual_size[i] = graph.node_list[node_index].neighbor_size < sample_size + ? graph.node_list[node_index].neighbor_size + : sample_size; + int offset = graph.node_list[node_index].neighbor_offset; + for (int j = 0; j < actual_size[i]; j++) { + sample_result[sample_size * i + j] = graph.neighbor_list[offset + j]; + } + } +} + +/* + comment 1 + + gpu i triggers a neighbor_sample task, + when this task is done, + this function is called to move the sample result on other gpu back + to gup i and aggragate the result. + the sample_result is saved on src_sample_res and the actual sample size for + each node is saved on actual_sample_size. + the number of actual sample_result for + key[x] (refer to comment 2 for definition of key) + is saved on actual_sample_size[x], since the neighbor size of key[x] might be + smaller than sample_size, + is saved on src_sample_res [x*sample_size, x*sample_size + + actual_sample_size[x]) + + since before each gpu runs the neighbor_sample task,the key array is shuffled, + but we have the idx array to save the original order. + when the gpu i gets all the sample results from other gpus, it relies on + idx array to recover the original order. + that's what fill_dvals does. + +*/ +void GpuPsGraphTable::move_neighbor_sample_result_to_source_gpu( + int gpu_id, int gpu_num, int sample_size, int* h_left, int* h_right, + int64_t* src_sample_res, int* actual_sample_size) { + for (int i = 0; i < gpu_num; i++) { + if (h_left[i] == -1 || h_right[i] == -1) { + continue; + } + auto shard_len = h_right[i] - h_left[i] + 1; + // int cur_step = path_[gpu_id][i].nodes_.size() - 1; + // auto& node = path_[gpu_id][i].nodes_[cur_step]; + auto& node = path_[gpu_id][i].nodes_.front(); + cudaMemcpyAsync( + reinterpret_cast(src_sample_res + h_left[i] * sample_size), + node.val_storage + sizeof(int64_t) * shard_len, + node.val_bytes_len - sizeof(int64_t) * shard_len, cudaMemcpyDefault, + node.out_stream); + cudaMemcpyAsync(reinterpret_cast(actual_sample_size + h_left[i]), + node.val_storage + sizeof(int) * shard_len, + sizeof(int) * shard_len, cudaMemcpyDefault, + node.out_stream); + } + for (int i = 0; i < gpu_num; ++i) { + if (h_left[i] == -1 || h_right[i] == -1) { + continue; + } + auto& node = path_[gpu_id][i].nodes_.front(); + cudaStreamSynchronize(node.out_stream); + } +} + +/* +TODO: +how to optimize it to eliminate the for loop +*/ +__global__ void fill_dvalues(int64_t* d_shard_vals, int64_t* d_vals, + int* d_shard_actual_sample_size, + int* d_actual_sample_size, int* idx, + int sample_size, int len) { + const size_t i = blockIdx.x * blockDim.x + threadIdx.x; + if (i < len) { + d_actual_sample_size[idx[i]] = d_shard_actual_sample_size[i]; + // d_vals[idx[i]] = d_shard_vals[i]; + for (int j = 0; j < sample_size; j++) { + d_vals[idx[i] * sample_size + j] = d_shard_vals[i * sample_size + j]; + } + } +} + +__global__ void node_query_example(GpuPsCommGraph graph, int start, int size, + int64_t* res) { + const size_t i = blockIdx.x * blockDim.x + threadIdx.x; + if (i < size) { + res[i] = graph.node_list[start + i].node_id; + } +} + +void GpuPsGraphTable::clear_graph_info() { + if (tables_.size()) { + for (auto table : tables_) delete table; + } + tables_.clear(); + for (auto graph : gpu_graph_list) { + if (graph.neighbor_list != NULL) { + cudaFree(graph.neighbor_list); + } + if (graph.node_list != NULL) { + cudaFree(graph.node_list); + } + } + gpu_graph_list.clear(); +} +/* +the parameter std::vector cpu_graph_list is generated by cpu. +it saves the graph to be saved on each gpu. + +for the ith GpuPsCommGraph, any the node's key satisfies that key % gpu_number +== i + +In this function, memory is allocated on each gpu to save the graphs, +gpu i saves the ith graph from cpu_graph_list +*/ + +void GpuPsGraphTable::build_graph_from_cpu( + std::vector& cpu_graph_list) { + PADDLE_ENFORCE_EQ( + cpu_graph_list.size(), resource_->total_gpu(), + platform::errors::InvalidArgument("the cpu node list size doesn't match " + "the number of gpu on your machine.")); + clear_graph_info(); + for (int i = 0; i < cpu_graph_list.size(); i++) { + platform::CUDADeviceGuard guard(resource_->dev_id(i)); + gpu_graph_list.push_back(GpuPsCommGraph()); + auto table = + new Table(std::max(1, cpu_graph_list[i].node_size) / load_factor_); + tables_.push_back(table); + if (cpu_graph_list[i].node_size > 0) { + std::vector keys; + std::vector offset; + cudaMalloc((void**)&gpu_graph_list[i].node_list, + cpu_graph_list[i].node_size * sizeof(GpuPsGraphNode)); + cudaMemcpy(gpu_graph_list[i].node_list, cpu_graph_list[i].node_list, + cpu_graph_list[i].node_size * sizeof(GpuPsGraphNode), + cudaMemcpyHostToDevice); + for (int j = 0; j < cpu_graph_list[i].node_size; j++) { + keys.push_back(cpu_graph_list[i].node_list[j].node_id); + offset.push_back(j); + } + build_ps(i, keys.data(), offset.data(), keys.size(), 1024, 8); + gpu_graph_list[i].node_size = cpu_graph_list[i].node_size; + } else { + gpu_graph_list[i].node_list = NULL; + gpu_graph_list[i].node_size = 0; + } + if (cpu_graph_list[i].neighbor_size) { + cudaMalloc((void**)&gpu_graph_list[i].neighbor_list, + cpu_graph_list[i].neighbor_size * sizeof(int64_t)); + cudaMemcpy(gpu_graph_list[i].neighbor_list, + cpu_graph_list[i].neighbor_list, + cpu_graph_list[i].neighbor_size * sizeof(int64_t), + cudaMemcpyHostToDevice); + gpu_graph_list[i].neighbor_size = cpu_graph_list[i].neighbor_size; + } else { + gpu_graph_list[i].neighbor_list = NULL; + gpu_graph_list[i].neighbor_size = 0; + } + } + cudaDeviceSynchronize(); +} +NeighborSampleResult* GpuPsGraphTable::graph_neighbor_sample(int gpu_id, + int64_t* key, + int sample_size, + int len) { + /* + comment 2 + this function shares some kernels with heter_comm_inl.h + arguments definitions: + gpu_id:the id of gpu. + len:how many keys are used,(the length of array key) + sample_size:how many neighbors should be sampled for each node in key. + + the code below shuffle the key array to make the keys + that belong to a gpu-card stay together, + the shuffled result is saved on d_shard_keys, + if ith element in d_shard_keys_ptr is + from jth element in the original key array, then idx[i] = j, + idx could be used to recover the original array. + if keys in range [a,b] belong to ith-gpu, then h_left[i] = a, h_right[i] = + b, + if no keys are allocated for ith-gpu, then h_left[i] == h_right[i] == -1 + + for example, suppose key = [0,1,2,3,4,5,6,7,8], gpu_num = 2 + when we run this neighbor_sample function, + the key is shuffled to [0,2,4,6,8,1,3,5,7] + the first part (0,2,4,6,8) % 2 == 0,thus should be handled by gpu 0, + the rest part should be handled by gpu1, because (1,3,5,7) % 2 == 1, + h_left = [0,5],h_right = [4,8] + + */ + NeighborSampleResult* result = new NeighborSampleResult(sample_size, len); + if (len == 0) { + return result; + } + cudaMalloc((void**)&result->val, len * sample_size * sizeof(int64_t)); + cudaMalloc((void**)&result->actual_sample_size, len * sizeof(int)); + int* actual_sample_size = result->actual_sample_size; + int64_t* val = result->val; + int total_gpu = resource_->total_gpu(); + int dev_id = resource_->dev_id(gpu_id); + platform::CUDAPlace place = platform::CUDAPlace(dev_id); + platform::CUDADeviceGuard guard(dev_id); + auto stream = resource_->local_stream(gpu_id, 0); + + int grid_size = (len - 1) / block_size_ + 1; + + int h_left[total_gpu]; // NOLINT + int h_right[total_gpu]; // NOLINT + + auto d_left = memory::Alloc(place, total_gpu * sizeof(int)); + auto d_right = memory::Alloc(place, total_gpu * sizeof(int)); + int* d_left_ptr = reinterpret_cast(d_left->ptr()); + int* d_right_ptr = reinterpret_cast(d_right->ptr()); + + cudaMemsetAsync(d_left_ptr, -1, total_gpu * sizeof(int), stream); + cudaMemsetAsync(d_right_ptr, -1, total_gpu * sizeof(int), stream); + // + auto d_idx = memory::Alloc(place, len * sizeof(int)); + int* d_idx_ptr = reinterpret_cast(d_idx->ptr()); + + auto d_shard_keys = memory::Alloc(place, len * sizeof(int64_t)); + int64_t* d_shard_keys_ptr = reinterpret_cast(d_shard_keys->ptr()); + auto d_shard_vals = memory::Alloc(place, len * sizeof(int64_t)); + int64_t* d_shard_vals_ptr = reinterpret_cast(d_shard_vals->ptr()); + auto d_shard_actual_sample_size = memory::Alloc(place, len * sizeof(int)); + int* d_shard_actual_sample_size_ptr = + reinterpret_cast(d_shard_actual_sample_size->ptr()); + + split_input_to_shard(key, d_idx_ptr, len, d_left_ptr, d_right_ptr, gpu_id); + + fill_shard_key<<>>(d_shard_keys_ptr, key, + d_idx_ptr, len); + + cudaStreamSynchronize(stream); + + cudaMemcpy(h_left, d_left_ptr, total_gpu * sizeof(int), + cudaMemcpyDeviceToHost); + cudaMemcpy(h_right, d_right_ptr, total_gpu * sizeof(int), + cudaMemcpyDeviceToHost); + + for (int i = 0; i < total_gpu; ++i) { + int shard_len = h_left[i] == -1 ? 0 : h_right[i] - h_left[i] + 1; + if (shard_len == 0) { + continue; + } + /* + comment 3 + shard_len denotes the size of keys on i-th gpu here, + when we sample on i-th gpu, we allocate shard_len * (1 + sample_size) + int64_t units + of memory, we use alloc_mem_i to denote it, the range [0,shard_len) is saved + for the respective nodes' indexes + and acutal sample_size. + with nodes' indexes we could get the nodes to sample. + since size of int64_t is 8 bits, while size of int is 4, + the range of [0,shard_len) contains shard_len * 2 int uinits; + The values of the first half of this range will be updated by + the k-v map on i-th-gpu. + The second half of this range is saved for actual sample size of each node. + For node x, + its sampling result is saved on the range + [shard_len + sample_size * x,shard_len + sample_size * x + + actual_sample_size_of_x) + of alloc_mem_i, actual_sample_size_of_x equals ((int + *)alloc_mem_i)[shard_len + x] + */ + create_storage(gpu_id, i, shard_len * sizeof(int64_t), + shard_len * (1 + sample_size) * sizeof(int64_t)); + } + walk_to_dest(gpu_id, total_gpu, h_left, h_right, d_shard_keys_ptr, NULL); + + for (int i = 0; i < total_gpu; ++i) { + if (h_left[i] == -1) { + continue; + } + // auto& node = path_[gpu_id][i].nodes_.back(); + auto& node = path_[gpu_id][i].nodes_.front(); + cudaStreamSynchronize(node.in_stream); + platform::CUDADeviceGuard guard(resource_->dev_id(i)); + // use the key-value map to update alloc_mem_i[0,shard_len) + tables_[i]->rwlock_->RDLock(); + tables_[i]->get(reinterpret_cast(node.key_storage), + reinterpret_cast(node.val_storage), + h_right[i] - h_left[i] + 1, + resource_->remote_stream(i, gpu_id)); + } + for (int i = 0; i < total_gpu; ++i) { + if (h_left[i] == -1) { + continue; + } + // cudaStreamSynchronize(resource_->remote_stream(i, num)); + // tables_[i]->rwlock_->UNLock(); + platform::CUDADeviceGuard guard(resource_->dev_id(i)); + auto& node = path_[gpu_id][i].nodes_.front(); + auto shard_len = h_right[i] - h_left[i] + 1; + auto graph = gpu_graph_list[i]; + int* res_array = reinterpret_cast(node.val_storage); + int* actual_size_array = res_array + shard_len; + int64_t* sample_array = (int64_t*)(res_array + shard_len * 2); + neighbor_sample_example<<remote_stream(i, gpu_id)>>>( + graph, res_array, actual_size_array, sample_array, sample_size, + shard_len); + } + + for (int i = 0; i < total_gpu; ++i) { + if (h_left[i] == -1) { + continue; + } + cudaStreamSynchronize(resource_->remote_stream(i, gpu_id)); + tables_[i]->rwlock_->UNLock(); + } + // walk_to_src(num, total_gpu, h_left, h_right, d_shard_vals_ptr); + move_neighbor_sample_result_to_source_gpu(gpu_id, total_gpu, sample_size, + h_left, h_right, d_shard_vals_ptr, + d_shard_actual_sample_size_ptr); + + fill_dvalues<<>>( + d_shard_vals_ptr, val, d_shard_actual_sample_size_ptr, actual_sample_size, + d_idx_ptr, sample_size, len); + cudaStreamSynchronize(stream); + for (int i = 0; i < total_gpu; ++i) { + int shard_len = h_left[i] == -1 ? 0 : h_right[i] - h_left[i] + 1; + if (shard_len == 0) { + continue; + } + destroy_storage(gpu_id, i); + } + return result; +} + +NodeQueryResult* GpuPsGraphTable::graph_node_sample(int gpu_id, + int sample_size) {} + +NodeQueryResult* GpuPsGraphTable::query_node_list(int gpu_id, int start, + int query_size) { + NodeQueryResult* result = new NodeQueryResult(); + if (query_size <= 0) return result; + int& actual_size = result->actual_sample_size; + actual_size = 0; + cudaMalloc((void**)&result->val, query_size * sizeof(int64_t)); + int64_t* val = result->val; + int dev_id = resource_->dev_id(gpu_id); + platform::CUDADeviceGuard guard(dev_id); + std::vector idx, gpu_begin_pos, local_begin_pos, sample_size; + int size = 0; + /* + if idx[i] = a, gpu_begin_pos[i] = p1, + gpu_local_begin_pos[i] = p2; + sample_size[i] = s; + then on gpu a, the nodes of positions [p1,p1 + s) should be returned + and saved from the p2 position on the sample_result array + + for example: + suppose + gpu 0 saves [0,2,4,6,8], gpu1 saves [1,3,5,7] + start = 3, query_size = 5 + we know [6,8,1,3,5] should be returned; + idx = [0,1] + gpu_begin_pos = [3,0] + local_begin_pos = [0,3] + sample_size = [2,3] + + */ + for (int i = 0; i < gpu_graph_list.size() && query_size != 0; i++) { + auto graph = gpu_graph_list[i]; + if (graph.node_size == 0) { + continue; + } + if (graph.node_size + size > start) { + int cur_size = min(query_size, graph.node_size + size - start); + query_size -= cur_size; + idx.emplace_back(i); + gpu_begin_pos.emplace_back(start - size); + local_begin_pos.emplace_back(actual_size); + start += cur_size; + actual_size += cur_size; + sample_size.emplace_back(cur_size); + create_storage(gpu_id, i, 1, cur_size * sizeof(int64_t)); + } + size += graph.node_size; + } + for (int i = 0; i < idx.size(); i++) { + int dev_id_i = resource_->dev_id(idx[i]); + platform::CUDADeviceGuard guard(dev_id_i); + auto& node = path_[gpu_id][idx[i]].nodes_.front(); + int grid_size = (sample_size[i] - 1) / block_size_ + 1; + node_query_example<<remote_stream(idx[i], gpu_id)>>>( + gpu_graph_list[idx[i]], gpu_begin_pos[i], sample_size[i], + (int64_t*)node.val_storage); + } + + for (int i = 0; i < idx.size(); i++) { + cudaStreamSynchronize(resource_->remote_stream(idx[i], gpu_id)); + auto& node = path_[gpu_id][idx[i]].nodes_.front(); + cudaMemcpyAsync(reinterpret_cast(val + local_begin_pos[i]), + node.val_storage, node.val_bytes_len, cudaMemcpyDefault, + node.out_stream); + } + for (int i = 0; i < idx.size(); i++) { + auto& node = path_[gpu_id][idx[i]].nodes_.front(); + cudaStreamSynchronize(node.out_stream); + } + return result; +} +} +}; +#endif diff --git a/paddle/fluid/framework/fleet/heter_ps/heter_comm.h b/paddle/fluid/framework/fleet/heter_ps/heter_comm.h index 7b43e68ff0151e03d426e11ed8266d25a125140a..1fca8cdf8bb801a57ec36ee957b27236f488a4b3 100644 --- a/paddle/fluid/framework/fleet/heter_ps/heter_comm.h +++ b/paddle/fluid/framework/fleet/heter_ps/heter_comm.h @@ -173,16 +173,18 @@ class HeterComm { void walk_to_src(int start_index, int gpu_num, int* h_left, int* h_right, ValType* src_val); - private: + protected: using Table = HashTable; - int block_size_{256}; - float load_factor_{0.75}; std::vector tables_; std::shared_ptr resource_; - CustomGradMerger merger_; - int topo_aware_{0}; std::vector> path_; + float load_factor_{0.75}; + int block_size_{256}; + + private: std::vector storage_; + CustomGradMerger merger_; + int topo_aware_{0}; int feanum_{1800 * 2048}; int multi_node_{0}; std::vector nccl_inner_comms_; diff --git a/paddle/fluid/framework/fleet/heter_ps/test_graph.cu b/paddle/fluid/framework/fleet/heter_ps/test_graph.cu new file mode 100644 index 0000000000000000000000000000000000000000..697e0ba2cdf3475d1e7ad48105bc55959461900f --- /dev/null +++ b/paddle/fluid/framework/fleet/heter_ps/test_graph.cu @@ -0,0 +1,112 @@ +/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include +#include +#include "paddle/fluid/framework/fleet/heter_ps/feature_value.h" +#include "paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table.h" +#include "paddle/fluid/framework/fleet/heter_ps/heter_comm.h" +#include "paddle/fluid/framework/fleet/heter_ps/heter_resource.h" +#include "paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h" +#include "paddle/fluid/platform/cuda_device_guard.h" + +using namespace paddle::framework; +TEST(TEST_FLEET, graph_comm) { + int gpu_count = 3; + std::vector dev_ids; + dev_ids.push_back(0); + dev_ids.push_back(1); + dev_ids.push_back(2); + std::shared_ptr resource = + std::make_shared(dev_ids); + resource->enable_p2p(); + GpuPsGraphTable g(resource); + int node_count = 10; + std::vector> neighbors(node_count); + int ind = 0; + int64_t node_id = 0; + std::vector graph_list(gpu_count); + while (ind < node_count) { + int neighbor_size = ind + 1; + graph_list[ind % gpu_count].node_size++; + graph_list[ind % gpu_count].neighbor_size += neighbor_size; + while (neighbor_size--) { + neighbors[ind].push_back(node_id++); + } + ind++; + } + std::vector neighbor_offset(gpu_count, 0), node_index(gpu_count, 0); + for (int i = 0; i < graph_list.size(); i++) { + graph_list[i].node_list = new GpuPsGraphNode[graph_list[i].node_size]; + graph_list[i].neighbor_list = new int64_t[graph_list[i].neighbor_size]; + } + for (int i = 0; i < node_count; i++) { + ind = i % gpu_count; + graph_list[ind].node_list[node_index[ind]].node_id = i; + graph_list[ind].node_list[node_index[ind]].neighbor_offset = + neighbor_offset[ind]; + graph_list[ind].node_list[node_index[ind]].neighbor_size = + neighbors[i].size(); + for (auto x : neighbors[i]) { + graph_list[ind].neighbor_list[neighbor_offset[ind]++] = x; + } + node_index[ind]++; + } + g.build_graph_from_cpu(graph_list); + /* + gpu 0: + 0,3,6,9 + gpu 1: + 1,4,7 + gpu 2: + 2,5,8 + + query(2,6) returns nodes [6,9,1,4,7,2] + */ + int64_t answer[6] = {6, 9, 1, 4, 7, 2}; + int64_t *res = new int64_t[6]; + auto query_res = g.query_node_list(0, 2, 6); + cudaMemcpy(res, query_res->val, 48, cudaMemcpyDeviceToHost); + ASSERT_EQ(query_res->actual_sample_size, 6); + for (int i = 0; i < 6; i++) { + ASSERT_EQ(res[i], answer[i]); + } + delete[] res; + delete query_res; + /* + node x's neighbor list = [(1+x)*x/2,(1+x)*x/2 + 1,.....,(1+x)*x/2 + x] + so node 6's neighbors are [21,22...,27] + node 7's neighbors are [28,29,..35] + node 0's neighbors are [0] + query([7,0,6],sample_size=3) should return [28,29,30,0,x,x,21,22,23] + 6 --index-->2 + 0 --index--->0 + 7 --index-->2 + */ + int64_t cpu_key[3] = {7, 0, 6}; + void *key; + cudaMalloc((void **)&key, 3 * sizeof(int64_t)); + cudaMemcpy(key, cpu_key, 3 * sizeof(int64_t), cudaMemcpyHostToDevice); + auto neighbor_sample_res = g.graph_neighbor_sample(0, (int64_t *)key, 3, 3); + res = new int64_t[9]; + cudaMemcpy(res, neighbor_sample_res->val, 72, cudaMemcpyDeviceToHost); + int64_t expected_sample_val[] = {28, 29, 30, 0, -1, -1, 21, 22, 23}; + for (int i = 0; i < 9; i++) { + if (expected_sample_val[i] != -1) { + ASSERT_EQ(res[i], expected_sample_val[i]); + } + } + delete[] res; + delete neighbor_sample_res; +}