未验证 提交 83ae1619 编写于 作者: S seemingwang 提交者: GitHub

test gpu graph engine's performance (#40775)

* extract sub-graph

* graph-engine merging

* fix

* fix

* fix heter-ps config

* test performance

* test performance

* test performance

* test

* test

* update bfs

* change cmake
上级 a8df3901
......@@ -219,13 +219,13 @@ message GraphParameter {
optional string gpups_graph_sample_class = 3
[ default = "CompleteGraphSampler" ];
optional string gpups_graph_sample_args = 4 [ default = "" ];
optional bool use_cache = 5 [ default = true ];
optional float cache_ratio = 6 [ default = 0.3 ];
optional bool use_cache = 5 [ default = false ];
optional int32 cache_size_limit = 6 [ default = 100000 ];
optional int32 cache_ttl = 7 [ default = 5 ];
optional GraphFeature graph_feature = 8;
optional string table_name = 9 [ default = "" ];
optional string table_type = 10 [ default = "" ];
optional int32 gpups_mode_shard_num = 11 [ default = 127 ];
optional int32 shard_num = 11 [ default = 127 ];
optional int32 gpu_num = 12 [ default = 1 ];
}
......
......@@ -138,7 +138,6 @@ int BasicBfsGraphSampler::run_graph_sampling() {
int init_size = 0;
//__sync_fetch_and_add
std::function<int(int, int64_t)> bfs = [&, this](int i, int id) -> int {
VLOG(0) << "in bfs " << i << " " << id;
if (this->status == GraphSamplerStatus::terminating) {
int task_left = __sync_sub_and_fetch(&task_size, 1);
if (task_left == 0) {
......@@ -148,13 +147,13 @@ int BasicBfsGraphSampler::run_graph_sampling() {
}
size_t ind = i % this->graph_table->task_pool_size_;
if (nodes_left[i] > 0) {
nodes_left[i]--;
auto iter = sample_neighbors_map[ind].find(id);
if (iter == sample_neighbors_map[ind].end()) {
sample_neighbors_map[ind][id] = std::vector<int64_t>();
iter = sample_neighbors_map[ind].find(id);
Node *node = graph_table->shards[i]->find_node(id);
if (node != NULL) {
nodes_left[i]--;
sample_neighbors_map[ind][id] = std::vector<int64_t>();
iter = sample_neighbors_map[ind].find(id);
size_t edge_fetch_size =
std::min((size_t) this->edge_num_for_each_node,
node->get_neighbor_size());
......@@ -179,11 +178,14 @@ int BasicBfsGraphSampler::run_graph_sampling() {
for (size_t i = 0; i < graph_table->shards.size(); ++i) {
std::vector<Node *> &v = graph_table->shards[i]->get_bucket();
if (v.size() > 0) {
init_size++;
__sync_add_and_fetch(&task_size, 1);
int64_t id = v[0]->get_id();
graph_table->_shards_task_pool[i % graph_table->task_pool_size_]
->enqueue(bfs, i, id);
int search_size = std::min(init_search_size, (int)v.size());
for (int k = 0; k < search_size; k++) {
init_size++;
__sync_add_and_fetch(&task_size, 1);
int64_t id = v[k]->get_id();
graph_table->_shards_task_pool[i % graph_table->task_pool_size_]
->enqueue(bfs, i, id);
}
} // if
}
if (init_size == 0) {
......@@ -301,10 +303,11 @@ void BasicBfsGraphSampler::init(size_t gpu_num, GraphTable *graph_table,
std::vector<std::string> args) {
this->gpu_num = gpu_num;
this->graph_table = graph_table;
node_num_for_each_shard = args.size() > 0 ? std::stoi(args[0]) : 10;
edge_num_for_each_node = args.size() > 1 ? std::stoi(args[1]) : 10;
rounds = args.size() > 2 ? std::stoi(args[2]) : 1;
interval = args.size() > 3 ? std::stoi(args[3]) : 60;
init_search_size = args.size() > 0 ? std::stoi(args[0]) : 10;
node_num_for_each_shard = args.size() > 1 ? std::stoi(args[1]) : 10;
edge_num_for_each_node = args.size() > 2 ? std::stoi(args[2]) : 10;
rounds = args.size() > 3 ? std::stoi(args[3]) : 1;
interval = args.size() > 4 ? std::stoi(args[4]) : 60;
}
#endif
......@@ -1092,11 +1095,6 @@ int32_t GraphTable::initialize(const GraphParameter &graph) {
#ifdef PADDLE_WITH_HETERPS
if (graph.gpups_mode()) {
gpups_mode = true;
if (shard_num == 0) {
shard_num = graph.gpups_mode_shard_num();
server_num = 1;
_shard_idx = 0;
}
auto *sampler =
CREATE_PSCORE_CLASS(GraphSampler, graph.gpups_graph_sample_class());
auto slices =
......@@ -1107,7 +1105,18 @@ int32_t GraphTable::initialize(const GraphParameter &graph) {
graph_sampler.reset(sampler);
}
#endif
if (shard_num == 0) {
server_num = 1;
_shard_idx = 0;
shard_num = graph.shard_num();
}
task_pool_size_ = graph.task_pool_size();
use_cache = graph.use_cache();
if (use_cache) {
cache_size_limit = graph.cache_size_limit();
cache_ttl = graph.cache_ttl();
make_neighbor_sample_cache((size_t)cache_size_limit, (size_t)cache_ttl);
}
_shards_task_pool.resize(task_pool_size_);
for (size_t i = 0; i < _shards_task_pool.size(); ++i) {
_shards_task_pool[i].reset(new ::ThreadPool(1));
......
......@@ -547,6 +547,8 @@ class GraphTable : public SparseTable {
std::unordered_set<int64_t> extra_nodes;
std::unordered_map<int64_t, size_t> extra_nodes_to_thread_index;
bool use_cache, use_duplicate_nodes;
int cache_size_limit;
int cache_ttl;
mutable std::mutex mutex_;
std::shared_ptr<pthread_rwlock_t> rw_lock;
#ifdef PADDLE_WITH_HETERPS
......@@ -593,7 +595,7 @@ class BasicBfsGraphSampler : public GraphSampler {
std::vector<std::vector<paddle::framework::GpuPsGraphNode>> sample_nodes;
std::vector<std::vector<int64_t>> sample_neighbors;
size_t gpu_num;
int node_num_for_each_shard, edge_num_for_each_node;
int init_search_size, node_num_for_each_shard, edge_num_for_each_node;
int rounds, interval;
std::vector<std::unordered_map<int64_t, std::vector<int64_t>>>
sample_neighbors_map;
......
......@@ -456,7 +456,7 @@ void RunBrpcPushSparse() {
pull_status.wait();
ASSERT_EQ(_vs[0].size(), vs1[0].size());
for (int j = 0; j < _vs[0].size(); j++) {
for (size_t j = 0; j < _vs[0].size(); j++) {
ASSERT_EQ(_vs[0][j], vs1[0][j]);
}
}
......
......@@ -86,7 +86,7 @@ void testGraphSample() {
#ifdef PADDLE_WITH_HETERPS
::paddle::distributed::GraphParameter table_proto;
table_proto.set_gpups_mode(true);
table_proto.set_gpups_mode_shard_num(127);
table_proto.set_shard_num(127);
table_proto.set_gpu_num(2);
distributed::GraphTable graph_table, graph_table1;
......@@ -113,7 +113,7 @@ void testGraphSample() {
::paddle::distributed::GraphParameter table_proto1;
table_proto1.set_gpups_mode(true);
table_proto1.set_gpups_mode_shard_num(127);
table_proto1.set_shard_num(127);
table_proto1.set_gpu_num(2);
table_proto1.set_gpups_graph_sample_class("BasicBfsGraphSampler");
table_proto1.set_gpups_graph_sample_args("5,5,1,1");
......
......@@ -13,6 +13,9 @@ IF(WITH_GPU)
nv_library(graph_gpu_ps SRCS graph_gpu_ps_table.h DEPS heter_comm table)
nv_test(test_graph_comm SRCS test_graph.cu DEPS graph_gpu_ps)
nv_test(test_cpu_graph_sample SRCS test_cpu_graph_sample.cu DEPS graph_gpu_ps)
#nv_test(test_sample_rate SRCS test_sample_rate.cu DEPS graph_gpu_ps)
# ADD_EXECUTABLE(test_sample_rate test_sample_rate.cu)
# target_link_libraries(test_sample_rate graph_gpu_ps)
ENDIF()
IF(WITH_ROCM)
hip_library(heter_comm SRCS heter_comm.h feature_value.h heter_resource.cc heter_resource.h hashtable.h DEPS cub device_context)
......
......@@ -93,14 +93,17 @@ node_list[8]-> node_id:17, neighbor_size:1, neighbor_offset:15
struct NeighborSampleResult {
int64_t *val;
int *actual_sample_size, sample_size, key_size;
int *offset;
NeighborSampleResult(int _sample_size, int _key_size)
: sample_size(_sample_size), key_size(_key_size) {
actual_sample_size = NULL;
val = NULL;
offset = NULL;
};
~NeighborSampleResult() {
if (val != NULL) cudaFree(val);
if (actual_sample_size != NULL) cudaFree(actual_sample_size);
if (offset != NULL) cudaFree(offset);
}
};
......
......@@ -71,10 +71,10 @@ TEST(TEST_FLEET, graph_sample) {
*/
::paddle::distributed::GraphParameter table_proto;
table_proto.set_gpups_mode(true);
table_proto.set_gpups_mode_shard_num(127);
table_proto.set_shard_num(127);
table_proto.set_gpu_num(3);
table_proto.set_gpups_graph_sample_class("BasicBfsGraphSampler");
table_proto.set_gpups_graph_sample_args("5,5,1,1");
table_proto.set_gpups_graph_sample_args("100,5,5,1,1");
prepare_file(edge_file_name, edges);
g.init_cpu_table(table_proto);
g.load(std::string(edge_file_name), std::string("e>"));
......@@ -93,16 +93,53 @@ TEST(TEST_FLEET, graph_sample) {
cudaMalloc((void **)&key, 3 * sizeof(int64_t));
cudaMemcpy(key, cpu_key, 3 * sizeof(int64_t), cudaMemcpyHostToDevice);
auto neighbor_sample_res = g.graph_neighbor_sample(0, (int64_t *)key, 3, 3);
int64_t *res = new int64_t[9];
cudaMemcpy(res, neighbor_sample_res->val, 72, cudaMemcpyDeviceToHost);
int64_t *res = new int64_t[7];
/*
cudaMemcpy(res, neighbor_sample_res->val, 56, cudaMemcpyDeviceToHost);
std::sort(res, res + 3);
std::sort(res + 6, res + 9);
int64_t expected_sample_val[] = {28, 29, 30, 0, -1, -1, 21, 22, 23};
for (int i = 0; i < 9; i++) {
std::sort(res + 4, res + 7);
//int64_t expected_sample_val[] = {28, 29, 30, 0, -1, -1, 21, 22, 23};
int64_t expected_sample_val[] = {28, 29, 30, 0, 21, 22, 23};
for (int i = 0; i < 7; i++) {
VLOG(0)<<i<<" "<<res[i];
if (expected_sample_val[i] != -1) {
ASSERT_EQ(res[i], expected_sample_val[i]);
}
}
delete[] res;
delete neighbor_sample_res;
*/
cudaMemcpy(res, neighbor_sample_res->val, 56, cudaMemcpyDeviceToHost);
int *actual_sample_size = new int[3];
cudaMemcpy(actual_sample_size, neighbor_sample_res->actual_sample_size, 12,
cudaMemcpyDeviceToHost); // 3, 1, 3
int *cumsum_sample_size = new int[3];
cudaMemcpy(cumsum_sample_size, neighbor_sample_res->offset, 12,
cudaMemcpyDeviceToHost); // 0, 3, 4
std::vector<std::vector<int64_t>> neighbors_;
std::vector<int64_t> neighbors_7 = {28, 29, 30, 31, 32, 33, 34, 35};
std::vector<int64_t> neighbors_0 = {0};
std::vector<int64_t> neighbors_6 = {21, 22, 23, 24, 25, 26, 27};
neighbors_.push_back(neighbors_7);
neighbors_.push_back(neighbors_0);
neighbors_.push_back(neighbors_6);
for (int i = 0; i < 3; i++) {
for (int j = cumsum_sample_size[i];
j < cumsum_sample_size[i] + actual_sample_size[i]; j++) {
bool flag = false;
for (int k = 0; k < neighbors_[i].size(); k++) {
if (res[j] == neighbors_[i][k]) {
flag = true;
break;
}
}
ASSERT_EQ(flag, true);
}
}
delete[] res;
delete[] actual_sample_size;
delete[] cumsum_sample_size;
delete neighbor_sample_res;
}
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <unistd.h>
#include <condition_variable> // NOLINT
#include <fstream>
#include <iomanip>
#include <string>
#include <thread> // NOLINT
#include <unordered_set>
#include <vector>
#include "google/protobuf/text_format.h"
#include <chrono>
#include "gtest/gtest.h"
#include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/distributed/ps/service/env.h"
#include "paddle/fluid/distributed/ps/service/sendrecv.pb.h"
#include "paddle/fluid/distributed/ps/table/common_graph_table.h"
#include "paddle/fluid/distributed/ps/table/graph/graph_node.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/tensor_util.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/string/printf.h"
#include "paddle/phi/kernels/funcs/math_function.h"
#include "paddle/fluid/framework/fleet/heter_ps/feature_value.h"
#include "paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table.h"
#include "paddle/fluid/framework/fleet/heter_ps/heter_comm.h"
#include "paddle/fluid/framework/fleet/heter_ps/heter_resource.h"
#include "paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h"
#include "paddle/fluid/platform/cuda_device_guard.h"
using namespace paddle::framework;
namespace platform = paddle::platform;
namespace operators = paddle::operators;
namespace memory = paddle::memory;
namespace distributed = paddle::distributed;
std::string input_file;
int fixed_key_size = 100, sample_size = 100,
bfs_sample_nodes_in_each_shard = 10000, init_search_size = 1,
bfs_sample_edges = 20;
std::vector<std::string> edges = {
std::string("37\t45\t0.34"), std::string("37\t145\t0.31"),
std::string("37\t112\t0.21"), std::string("96\t48\t1.4"),
std::string("96\t247\t0.31"), std::string("96\t111\t1.21"),
std::string("59\t45\t0.34"), std::string("59\t145\t0.31"),
std::string("59\t122\t0.21"), std::string("97\t48\t0.34"),
std::string("97\t247\t0.31"), std::string("97\t111\t0.21")};
// odd id:96 48 122 112
char edge_file_name[] = "test_edges.txt";
void prepare_file(char file_name[], std::vector<std::string> data) {
std::ofstream ofile;
ofile.open(file_name);
for (auto x : data) {
ofile << x << std::endl;
}
ofile.close();
}
void testSampleRate() {
#ifdef PADDLE_WITH_HETERPS
std::vector<int64_t> ids;
int start = 0;
pthread_rwlock_t rwlock;
pthread_rwlock_init(&rwlock, NULL);
{
::paddle::distributed::GraphParameter table_proto;
table_proto.set_gpups_mode(false);
table_proto.set_shard_num(127);
table_proto.set_task_pool_size(24);
std::cerr << "initializing begin";
distributed::GraphTable graph_table;
graph_table.initialize(table_proto);
std::cerr << "initializing done";
graph_table.load(input_file, std::string("e>"));
int sample_actual_size = -1;
int step = fixed_key_size, cur = 0;
while (sample_actual_size != 0) {
std::unique_ptr<char[]> buffer;
graph_table.pull_graph_list(cur, step, buffer, sample_actual_size, false,
1);
int index = 0;
while (index < sample_actual_size) {
paddle::distributed::FeatureNode node;
node.recover_from_buffer(buffer.get() + index);
index += node.get_size(false);
// res.push_back(node);
ids.push_back(node.get_id());
int swap_pos = rand() % ids.size();
std::swap(ids[swap_pos], ids[(int)ids.size() - 1]);
}
cur = ids.size();
// if (sample_actual_size == 0) break;
// char *buff = buffer.get();
// for (int i = 0; i < sample_actual_size/sizeof(int64_t); i++) {
// ids.push_back(*((int64_t *)buff + i));
// int swap_pos = rand() % ids.size();
// std::swap(ids[swap_pos], ids[(int)ids.size() - 1]);
// }
// cur += sample_actual_size/sizeof(int64_t);
}
std::cerr << "load ids done" << std::endl;
std::vector<int64_t> sample_id[10], sample_neighbors[10];
std::vector<int> actual_size[10];
auto func = [&rwlock, &graph_table, &ids, &sample_id, &actual_size,
&sample_neighbors, &start](int i) {
while (true) {
int s, sn;
bool exit = false;
pthread_rwlock_wrlock(&rwlock);
if (start < ids.size()) {
s = start;
sn = ids.size() - start;
sn = min(sn, fixed_key_size);
start += sn;
} else {
exit = true;
}
pthread_rwlock_unlock(&rwlock);
if (exit) break;
std::vector<std::shared_ptr<char>> buffers(sn);
std::vector<int> ac(sn);
auto status = graph_table.random_sample_neighbors(
ids.data() + s, sample_size, buffers, ac, false);
for (int j = s; j < s + sn; j++) {
sample_id[i].push_back(ids[j]);
actual_size[i].push_back(ac[j - s] / sizeof(int64_t));
int ss = ac[j - s] / sizeof(int64_t);
for (int k = 0; k < ss; k++) {
sample_neighbors[i].push_back(
*((int64_t *)(buffers[j - s].get() + k * sizeof(int64_t))));
}
}
}
VLOG(0) << "func " << i << " returns ";
};
auto start1 = std::chrono::steady_clock::now();
std::thread thr[10];
for (int i = 0; i < 10; i++) {
thr[i] = std::thread(func, i);
}
for (int i = 0; i < 10; i++) thr[i].join();
auto end1 = std::chrono::steady_clock::now();
auto tt =
std::chrono::duration_cast<std::chrono::microseconds>(end1 - start1);
std::cerr << "total time cost without cache is " << tt.count() << " us"
<< std::endl;
}
const int gpu_num = 8;
::paddle::distributed::GraphParameter table_proto;
table_proto.set_gpups_mode(true);
table_proto.set_shard_num(127);
table_proto.set_gpu_num(gpu_num);
table_proto.set_gpups_graph_sample_class("BasicBfsGraphSampler");
table_proto.set_gpups_graph_sample_args(std::to_string(init_search_size) +
",100000000,10000000,1,1");
std::vector<int> dev_ids;
for (int i = 0; i < gpu_num; i++) {
dev_ids.push_back(i);
}
std::shared_ptr<HeterPsResource> resource =
std::make_shared<HeterPsResource>(dev_ids);
resource->enable_p2p();
GpuPsGraphTable g(resource);
g.init_cpu_table(table_proto);
g.load(std::string(input_file), std::string("e>"));
NodeQueryResult *query_node_res;
query_node_res = g.query_node_list(0, 0, ids.size() + 10000);
VLOG(0) << "gpu got " << query_node_res->actual_sample_size << " nodes ";
VLOG(0) << "cpu got " << ids.size() << " nodes";
ASSERT_EQ((int)query_node_res->actual_sample_size, (int)ids.size());
int64_t *gpu_node_res = new int64_t[ids.size()];
cudaMemcpy(gpu_node_res, query_node_res->val, ids.size() * sizeof(int64_t),
cudaMemcpyDeviceToHost);
std::unordered_set<int64_t> cpu_node_set, gpu_node_set;
for (auto x : ids) {
cpu_node_set.insert(x);
}
for (int i = 0; i < (int)query_node_res->actual_sample_size; i++) {
auto x = gpu_node_res[i];
ASSERT_EQ(cpu_node_set.find(x) != cpu_node_set.end(), true);
gpu_node_set.insert(x);
}
VLOG(0) << " cpu_node_size = " << cpu_node_set.size();
VLOG(0) << " gpu_node_size = " << gpu_node_set.size();
ASSERT_EQ(cpu_node_set.size(), gpu_node_set.size());
for (int i = 0; i < 20; i++) {
int st = ids.size() / 20 * i;
auto q = g.query_node_list(0, st, ids.size() / 20);
VLOG(0) << " the " << i << "th iteration size = " << q->actual_sample_size;
}
// NodeQueryResult *query_node_list(int gpu_id, int start, int query_size);
/*
void *key;
cudaMalloc((void **)&key, ids.size() * sizeof(int64_t));
cudaMemcpy(key, ids.data(), ids.size() * sizeof(int64_t),
cudaMemcpyHostToDevice);
std::vector<NeighborSampleResult *> res[gpu_num];
start = 0;
auto func = [&rwlock, &g, &res, &start,
&gpu_num, &ids, &key](int i) {
while (true) {
int s, sn;
bool exit = false;
pthread_rwlock_wrlock(&rwlock);
if (start < ids.size()) {
s = start;
sn = ids.size() - start;
sn = min(sn, fixed_key_size);
start += sn;
} else {
exit = true;
}
pthread_rwlock_unlock(&rwlock);
if (exit) break;
auto r =
g.graph_neighbor_sample(i, (int64_t *)(key + s), sample_size, sn);
res[i].push_back(r);
}
};
auto start1 = std::chrono::steady_clock::now();
std::thread thr[gpu_num];
for (int i = 0; i < gpu_num; i++) {
thr[i] = std::thread(func, i);
}
for (int i = 0; i < gpu_num; i++) thr[i].join();
auto end1 = std::chrono::steady_clock::now();
auto tt =
std::chrono::duration_cast<std::chrono::microseconds>(end1 - start1);
std::cerr << "total time cost without cache is " << tt.count() << " us"
<< std::endl;
*/
#endif
}
// TEST(testSampleRate, Run) { testSampleRate(); }
int main(int argc, char *argv[]) {
for (int i = 0; i < argc; i++)
VLOG(0) << "Argument " << i << " is " << std::string(argv[i]);
if (argc > 1) {
input_file = argv[1];
} else {
prepare_file(edge_file_name, edges);
input_file = edge_file_name;
}
VLOG(0) << "input_file is " << input_file;
if (argc > 2) {
fixed_key_size = std::stoi(argv[2]);
}
VLOG(0) << "sample_node_size for every batch is " << fixed_key_size;
if (argc > 3) {
sample_size = std::stoi(argv[3]);
}
VLOG(0) << "sample_size neighbor_size is " << sample_size;
if (argc > 4) init_search_size = std::stoi(argv[4]);
VLOG(0) << " init_search_size " << init_search_size;
testSampleRate();
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册