From 9fc11db74fd47bff98b341af2e255d3dc0cb19ca Mon Sep 17 00:00:00 2001 From: seemingwang Date: Fri, 19 Nov 2021 14:54:04 +0800 Subject: [PATCH] optimize graph-engine sample api's data-transfer process (#37341) * graph engine demo * upload unsaved changes * fix dependency error * fix shard_num problem * py client * remove lock and graph-type * add load direct graph * add load direct graph * add load direct graph * batch random_sample * batch_sample_k * fix num_nodes size * batch brpc * batch brpc * add test * add test * add load_nodes; change add_node function * change sample return type to pair * resolve conflict * resolved conflict * resolved conflict * separate server and client * merge pair type * fix * resolved conflict * fixed segment fault; high-level VLOG for load edges and load nodes * random_sample return 0 * rm useless loop * test:load edge * fix ret -1 * test: rm sample * rm sample * random_sample return future * random_sample return int * test fake node * fixed here * memory leak * remove test code * fix return problem * add common_graph_table * random sample node &test & change data-structure from linkedList to vector * add common_graph_table * sample with srand * add node_types * optimize nodes sample * recover test * random sample * destruct weighted sampler * GraphEdgeBlob * WeightedGraphEdgeBlob to GraphEdgeBlob * WeightedGraphEdgeBlob to GraphEdgeBlob * pybind sample nodes api * pull nodes with step * fixed pull_graph_list bug; add test for pull_graph_list by step * add graph table;name * add graph table;name * add pybind * add pybind * add FeatureNode * add FeatureNode * add FeatureNode Serialize * add FeatureNode Serialize * get_feat_node * avoid local rpc * fix get_node_feat * fix get_node_feat * remove log * get_node_feat return py:bytes * merge develop with graph_engine * fix threadpool.h head * fix * fix typo * resolve conflict * fix conflict * recover lost content * fix pybind of FeatureNode * recover cmake * recover tools * resolve conflict * resolve linking problem * code style * change test_server port * fix code problems * remove shard_num config * remove redundent threads * optimize start server * remove logs * fix code problems by reviewers' suggestions * move graph files into a folder * code style change * remove graph operations from base table * optimize get_feat function of graph engine * fix long long count problem * remove redandunt graph files * remove unused shell * recover dropout_op_pass.h * fix potential stack overflow when request number is too large & node add & node clear & node remove * when sample k is larger than neigbor num, return directly * using random seed generator of paddle to speed up * fix bug of random sample k * fix code style * fix code style * add remove graph to fleet_py.cc * fix blocking_queue problem * fix style * fix * recover capacity check * add remove graph node; add set_feature * add remove graph node; add set_feature * add remove graph node; add set_feature * add remove graph node; add set_feature * fix distributed op combining problems * optimize * remove logs * fix MultiSlotDataGenerator error * cache for graph engine * fix type compare error * more test&fix thread terminating problem * remove header * change time interval of shrink * use cache when sample nodes * remove unused function * change unique_ptr to shared_ptr * simplify cache template * cache api on client * fix * reduce sample threads when cache is not used * reduce cache memory * cache optimization * remove test function * remove extra fetch function * graph-engine data transfer optimization Co-authored-by: Huang Zhengjie <270018958@qq.com> Co-authored-by: Weiyue Su Co-authored-by: suweiyue Co-authored-by: luobin06 Co-authored-by: liweibin02 Co-authored-by: tangwei12 --- .../distributed/service/graph_brpc_client.cc | 43 ++++++++++----- .../distributed/service/graph_brpc_client.h | 3 +- .../distributed/service/graph_brpc_server.cc | 22 +++++--- .../distributed/service/graph_py_service.cc | 13 +++-- .../distributed/table/common_graph_table.cc | 20 ++++--- .../distributed/table/common_graph_table.h | 12 +++-- .../distributed/table/graph/graph_node.h | 1 + .../fluid/distributed/test/graph_node_test.cc | 53 ++++++++++--------- 8 files changed, 104 insertions(+), 63 deletions(-) diff --git a/paddle/fluid/distributed/service/graph_brpc_client.cc b/paddle/fluid/distributed/service/graph_brpc_client.cc index 13132740bb..c5ad4b0099 100644 --- a/paddle/fluid/distributed/service/graph_brpc_client.cc +++ b/paddle/fluid/distributed/service/graph_brpc_client.cc @@ -304,10 +304,15 @@ std::future GraphBrpcClient::remove_graph_node( // char* &buffer,int &actual_size std::future GraphBrpcClient::batch_sample_neighbors( uint32_t table_id, std::vector node_ids, int sample_size, - std::vector>> &res, + // std::vector>> &res, + std::vector> &res, + std::vector> &res_weight, bool need_weight, int server_index) { if (server_index != -1) { res.resize(node_ids.size()); + if (need_weight) { + res_weight.resize(node_ids.size()); + } DownpourBrpcClosure *closure = new DownpourBrpcClosure(1, [&](void *done) { int ret = 0; auto *closure = (DownpourBrpcClosure *)done; @@ -331,11 +336,14 @@ std::future GraphBrpcClient::batch_sample_neighbors( int actual_size = actual_sizes[node_idx]; int start = 0; while (start < actual_size) { - res[node_idx].push_back( - {*(uint64_t *)(node_buffer + offset + start), - *(float *)(node_buffer + offset + start + - GraphNode::id_size)}); - start += GraphNode::id_size + GraphNode::weight_size; + res[node_idx].emplace_back( + *(uint64_t *)(node_buffer + offset + start)); + start += GraphNode::id_size; + if (need_weight) { + res_weight[node_idx].emplace_back( + *(float *)(node_buffer + offset + start)); + start += GraphNode::weight_size; + } } offset += actual_size; } @@ -352,6 +360,7 @@ std::future GraphBrpcClient::batch_sample_neighbors( closure->request(0)->add_params((char *)node_ids.data(), sizeof(uint64_t) * node_ids.size()); closure->request(0)->add_params((char *)&sample_size, sizeof(int)); + closure->request(0)->add_params((char *)&need_weight, sizeof(bool)); ; // PsService_Stub rpc_stub(get_cmd_channel(server_index)); GraphPsService_Stub rpc_stub = @@ -364,13 +373,18 @@ std::future GraphBrpcClient::batch_sample_neighbors( std::vector request2server; std::vector server2request(server_size, -1); res.clear(); + res_weight.clear(); for (int query_idx = 0; query_idx < node_ids.size(); ++query_idx) { int server_index = get_server_index_by_id(node_ids[query_idx]); if (server2request[server_index] == -1) { server2request[server_index] = request2server.size(); request2server.push_back(server_index); } - res.push_back(std::vector>()); + // res.push_back(std::vector>()); + res.push_back({}); + if (need_weight) { + res_weight.push_back({}); + } } size_t request_call_num = request2server.size(); std::vector> node_id_buckets(request_call_num); @@ -413,11 +427,14 @@ std::future GraphBrpcClient::batch_sample_neighbors( int actual_size = actual_sizes[node_idx]; int start = 0; while (start < actual_size) { - res[query_idx].push_back( - {*(uint64_t *)(node_buffer + offset + start), - *(float *)(node_buffer + offset + start + - GraphNode::id_size)}); - start += GraphNode::id_size + GraphNode::weight_size; + res[query_idx].emplace_back( + *(uint64_t *)(node_buffer + offset + start)); + start += GraphNode::id_size; + if (need_weight) { + res_weight[query_idx].emplace_back( + *(float *)(node_buffer + offset + start)); + start += GraphNode::weight_size; + } } offset += actual_size; } @@ -445,6 +462,8 @@ std::future GraphBrpcClient::batch_sample_neighbors( sizeof(uint64_t) * node_num); closure->request(request_idx) ->add_params((char *)&sample_size, sizeof(int)); + closure->request(request_idx) + ->add_params((char *)&need_weight, sizeof(bool)); // PsService_Stub rpc_stub(get_cmd_channel(server_index)); GraphPsService_Stub rpc_stub = getServiceStub(get_cmd_channel(server_index)); diff --git a/paddle/fluid/distributed/service/graph_brpc_client.h b/paddle/fluid/distributed/service/graph_brpc_client.h index c1083afb71..e3d2ff1d32 100644 --- a/paddle/fluid/distributed/service/graph_brpc_client.h +++ b/paddle/fluid/distributed/service/graph_brpc_client.h @@ -64,7 +64,8 @@ class GraphBrpcClient : public BrpcPsClient { // given a batch of nodes, sample graph_neighbors for each of them virtual std::future batch_sample_neighbors( uint32_t table_id, std::vector node_ids, int sample_size, - std::vector>>& res, + std::vector>& res, + std::vector>& res_weight, bool need_weight, int server_index = -1); virtual std::future pull_graph_list(uint32_t table_id, diff --git a/paddle/fluid/distributed/service/graph_brpc_server.cc b/paddle/fluid/distributed/service/graph_brpc_server.cc index 0aba2b9f44..094ecbbd40 100644 --- a/paddle/fluid/distributed/service/graph_brpc_server.cc +++ b/paddle/fluid/distributed/service/graph_brpc_server.cc @@ -378,19 +378,21 @@ int32_t GraphBrpcService::graph_random_sample_neighbors( Table *table, const PsRequestMessage &request, PsResponseMessage &response, brpc::Controller *cntl) { CHECK_TABLE_EXIST(table, request, response) - if (request.params_size() < 2) { + if (request.params_size() < 3) { set_response_code( response, -1, - "graph_random_sample request requires at least 2 arguments"); + "graph_random_sample_neighbors request requires at least 3 arguments"); return 0; } size_t node_num = request.params(0).size() / sizeof(uint64_t); uint64_t *node_data = (uint64_t *)(request.params(0).c_str()); int sample_size = *(uint64_t *)(request.params(1).c_str()); + bool need_weight = *(bool *)(request.params(2).c_str()); std::vector> buffers(node_num); std::vector actual_sizes(node_num, 0); ((GraphTable *)table) - ->random_sample_neighbors(node_data, sample_size, buffers, actual_sizes); + ->random_sample_neighbors(node_data, sample_size, buffers, actual_sizes, + need_weight); cntl->response_attachment().append(&node_num, sizeof(size_t)); cntl->response_attachment().append(actual_sizes.data(), @@ -454,16 +456,17 @@ int32_t GraphBrpcService::sample_neighbors_across_multi_servers( brpc::Controller *cntl) { // sleep(5); CHECK_TABLE_EXIST(table, request, response) - if (request.params_size() < 2) { - set_response_code( - response, -1, - "graph_random_neighbors_sample request requires at least 2 arguments"); + if (request.params_size() < 3) { + set_response_code(response, -1, + "sample_neighbors_across_multi_servers request requires " + "at least 3 arguments"); return 0; } size_t node_num = request.params(0).size() / sizeof(uint64_t), size_of_size_t = sizeof(size_t); uint64_t *node_data = (uint64_t *)(request.params(0).c_str()); int sample_size = *(uint64_t *)(request.params(1).c_str()); + bool need_weight = *(uint64_t *)(request.params(2).c_str()); // std::vector res = ((GraphTable // *)table).filter_out_non_exist_nodes(node_data, sample_size); std::vector request2server; @@ -581,6 +584,8 @@ int32_t GraphBrpcService::sample_neighbors_across_multi_servers( sizeof(uint64_t) * node_num); closure->request(request_idx) ->add_params((char *)&sample_size, sizeof(int)); + closure->request(request_idx) + ->add_params((char *)&need_weight, sizeof(bool)); PsService_Stub rpc_stub( ((GraphBrpcServer *)get_server())->get_cmd_channel(server_index)); // GraphPsService_Stub rpc_stub = @@ -592,7 +597,8 @@ int32_t GraphBrpcService::sample_neighbors_across_multi_servers( if (server2request[rank] != -1) { ((GraphTable *)table) ->random_sample_neighbors(node_id_buckets.back().data(), sample_size, - local_buffers, local_actual_sizes); + local_buffers, local_actual_sizes, + need_weight); } local_promise.get()->set_value(0); if (remote_call_num == 0) func(closure); diff --git a/paddle/fluid/distributed/service/graph_py_service.cc b/paddle/fluid/distributed/service/graph_py_service.cc index 130a76a683..8d7a822321 100644 --- a/paddle/fluid/distributed/service/graph_py_service.cc +++ b/paddle/fluid/distributed/service/graph_py_service.cc @@ -295,11 +295,13 @@ GraphPyClient::batch_sample_neighbors(std::string name, std::vector node_ids, int sample_size, bool return_weight, bool return_edges) { - std::vector>> v; + // std::vector>> v; + std::vector> v; + std::vector> v1; if (this->table_id_map.count(name)) { uint32_t table_id = this->table_id_map[name]; - auto status = - worker_ptr->batch_sample_neighbors(table_id, node_ids, sample_size, v); + auto status = worker_ptr->batch_sample_neighbors( + table_id, node_ids, sample_size, v, v1, return_weight); status.wait(); } @@ -313,9 +315,10 @@ GraphPyClient::batch_sample_neighbors(std::string name, if (return_edges) res.first.push_back({}); for (size_t i = 0; i < v.size(); i++) { for (size_t j = 0; j < v[i].size(); j++) { - res.first[0].push_back(v[i][j].first); + // res.first[0].push_back(v[i][j].first); + res.first[0].push_back(v[i][j]); if (return_edges) res.first[2].push_back(node_ids[i]); - if (return_weight) res.second.push_back(v[i][j].second); + if (return_weight) res.second.push_back(v1[i][j]); } if (i == v.size() - 1) break; diff --git a/paddle/fluid/distributed/table/common_graph_table.cc b/paddle/fluid/distributed/table/common_graph_table.cc index 0c4a473570..b690d71eab 100644 --- a/paddle/fluid/distributed/table/common_graph_table.cc +++ b/paddle/fluid/distributed/table/common_graph_table.cc @@ -396,8 +396,8 @@ int32_t GraphTable::random_sample_nodes(int sample_size, } int32_t GraphTable::random_sample_neighbors( uint64_t *node_ids, int sample_size, - std::vector> &buffers, - std::vector &actual_sizes) { + std::vector> &buffers, std::vector &actual_sizes, + bool need_weight) { size_t node_num = buffers.size(); std::function char_del = [](char *c) { delete[] c; }; std::vector> tasks; @@ -407,7 +407,7 @@ int32_t GraphTable::random_sample_neighbors( for (size_t idx = 0; idx < node_num; ++idx) { index = get_thread_pool_index(node_ids[idx]); seq_id[index].emplace_back(idx); - id_list[index].emplace_back(node_ids[idx], sample_size); + id_list[index].emplace_back(node_ids[idx], sample_size, need_weight); } for (int i = 0; i < seq_id.size(); i++) { if (seq_id[i].size() == 0) continue; @@ -442,13 +442,15 @@ int32_t GraphTable::random_sample_neighbors( } std::shared_ptr &buffer = buffers[idx]; std::vector res = node->sample_k(sample_size, rng); - actual_size = res.size() * (Node::id_size + Node::weight_size); + actual_size = + res.size() * (need_weight ? (Node::id_size + Node::weight_size) + : Node::id_size); int offset = 0; uint64_t id; float weight; char *buffer_addr = new char[actual_size]; if (response == LRUResponse::ok) { - sample_keys.emplace_back(node_id, sample_size); + sample_keys.emplace_back(node_id, sample_size, need_weight); sample_res.emplace_back(actual_size, buffer_addr); buffer = sample_res.back().buffer; } else { @@ -456,11 +458,13 @@ int32_t GraphTable::random_sample_neighbors( } for (int &x : res) { id = node->get_neighbor_id(x); - weight = node->get_neighbor_weight(x); memcpy(buffer_addr + offset, &id, Node::id_size); offset += Node::id_size; - memcpy(buffer_addr + offset, &weight, Node::weight_size); - offset += Node::weight_size; + if (need_weight) { + weight = node->get_neighbor_weight(x); + memcpy(buffer_addr + offset, &weight, Node::weight_size); + offset += Node::weight_size; + } } } } diff --git a/paddle/fluid/distributed/table/common_graph_table.h b/paddle/fluid/distributed/table/common_graph_table.h index 77732fac06..9ca59db3bb 100644 --- a/paddle/fluid/distributed/table/common_graph_table.h +++ b/paddle/fluid/distributed/table/common_graph_table.h @@ -80,10 +80,14 @@ enum LRUResponse { ok = 0, blocked = 1, err = 2 }; struct SampleKey { uint64_t node_key; size_t sample_size; - SampleKey(uint64_t _node_key, size_t _sample_size) - : node_key(_node_key), sample_size(_sample_size) {} + bool is_weighted; + SampleKey(uint64_t _node_key, size_t _sample_size, bool _is_weighted) + : node_key(_node_key), + sample_size(_sample_size), + is_weighted(_is_weighted) {} bool operator==(const SampleKey &s) const { - return node_key == s.node_key && sample_size == s.sample_size; + return node_key == s.node_key && sample_size == s.sample_size && + is_weighted == s.is_weighted; } }; @@ -360,7 +364,7 @@ class GraphTable : public SparseTable { virtual int32_t random_sample_neighbors( uint64_t *node_ids, int sample_size, std::vector> &buffers, - std::vector &actual_sizes); + std::vector &actual_sizes, bool need_weight); int32_t random_sample_nodes(int sample_size, std::unique_ptr &buffers, int &actual_sizes); diff --git a/paddle/fluid/distributed/table/graph/graph_node.h b/paddle/fluid/distributed/table/graph/graph_node.h index 62c101ec02..b7a564ef7b 100644 --- a/paddle/fluid/distributed/table/graph/graph_node.h +++ b/paddle/fluid/distributed/table/graph/graph_node.h @@ -51,6 +51,7 @@ class Node { protected: uint64_t id; + bool is_weighted; }; class GraphNode : public Node { diff --git a/paddle/fluid/distributed/test/graph_node_test.cc b/paddle/fluid/distributed/test/graph_node_test.cc index 9b55daa210..3a430d7a51 100644 --- a/paddle/fluid/distributed/test/graph_node_test.cc +++ b/paddle/fluid/distributed/test/graph_node_test.cc @@ -107,15 +107,16 @@ void testFeatureNodeSerializeFloat64() { void testSingleSampleNeighboor( std::shared_ptr& worker_ptr_) { - std::vector>> vs; + std::vector> vs; + std::vector> vs1; auto pull_status = worker_ptr_->batch_sample_neighbors( - 0, std::vector(1, 37), 4, vs); + 0, std::vector(1, 37), 4, vs, vs1, true); pull_status.wait(); std::unordered_set s; std::unordered_set s1 = {112, 45, 145}; for (auto g : vs[0]) { - s.insert(g.first); + s.insert(g); } ASSERT_EQ(s.size(), 3); for (auto g : s) { @@ -124,19 +125,21 @@ void testSingleSampleNeighboor( s.clear(); s1.clear(); vs.clear(); + vs1.clear(); pull_status = worker_ptr_->batch_sample_neighbors( - 0, std::vector(1, 96), 4, vs); + 0, std::vector(1, 96), 4, vs, vs1, true); pull_status.wait(); s1 = {111, 48, 247}; for (auto g : vs[0]) { - s.insert(g.first); + s.insert(g); } ASSERT_EQ(s.size(), 3); for (auto g : s) { ASSERT_EQ(true, s1.find(g) != s1.end()); } vs.clear(); - pull_status = worker_ptr_->batch_sample_neighbors(0, {96, 37}, 4, vs, 0); + pull_status = + worker_ptr_->batch_sample_neighbors(0, {96, 37}, 4, vs, vs1, true, 0); pull_status.wait(); ASSERT_EQ(vs.size(), 2); } @@ -194,14 +197,16 @@ void testAddNode( } void testBatchSampleNeighboor( std::shared_ptr& worker_ptr_) { - std::vector>> vs; + std::vector> vs; + std::vector> vs1; std::vector v = {37, 96}; - auto pull_status = worker_ptr_->batch_sample_neighbors(0, v, 4, vs); + auto pull_status = + worker_ptr_->batch_sample_neighbors(0, v, 4, vs, vs1, false); pull_status.wait(); std::unordered_set s; std::unordered_set s1 = {112, 45, 145}; for (auto g : vs[0]) { - s.insert(g.first); + s.insert(g); } ASSERT_EQ(s.size(), 3); for (auto g : s) { @@ -211,7 +216,7 @@ void testBatchSampleNeighboor( s1.clear(); s1 = {111, 48, 247}; for (auto g : vs[1]) { - s.insert(g.first); + s.insert(g); } ASSERT_EQ(s.size(), 3); for (auto g : s) { @@ -221,10 +226,6 @@ void testBatchSampleNeighboor( void testCache(); void testGraphToBuffer(); -// std::string nodes[] = {std::string("37\taa\t45;0.34\t145;0.31\t112;0.21"), -// std::string("96\tfeature\t48;1.4\t247;0.31\t111;1.21"), -// std::string("59\ttreat\t45;0.34\t145;0.31\t112;0.21"), -// std::string("97\tfood\t48;1.4\t247;0.31\t111;1.21")}; std::string edges[] = { std::string("37\t45\t0.34"), std::string("37\t145\t0.31"), @@ -427,15 +428,16 @@ void RunBrpcPushSparse() { worker_ptr_->load(0, std::string(edge_file_name), std::string("e>")); srand(time(0)); pull_status.wait(); - std::vector>> vs; + std::vector> _vs; + std::vector> vs; testSampleNodes(worker_ptr_); sleep(5); testSingleSampleNeighboor(worker_ptr_); testBatchSampleNeighboor(worker_ptr_); pull_status = worker_ptr_->batch_sample_neighbors( - 0, std::vector(1, 10240001024), 4, vs); + 0, std::vector(1, 10240001024), 4, _vs, vs, true); pull_status.wait(); - ASSERT_EQ(0, vs[0].size()); + ASSERT_EQ(0, _vs[0].size()); paddle::distributed::GraphTable* g = (paddle::distributed::GraphTable*)pserver_ptr_->table(0); size_t ttl = 6; @@ -444,18 +446,19 @@ void RunBrpcPushSparse() { while (round--) { vs.clear(); pull_status = worker_ptr_->batch_sample_neighbors( - 0, std::vector(1, 37), 1, vs); + 0, std::vector(1, 37), 1, _vs, vs, false); pull_status.wait(); for (int i = 0; i < ttl; i++) { - std::vector>> vs1; + std::vector> vs1; + std::vector> vs2; pull_status = worker_ptr_->batch_sample_neighbors( - 0, std::vector(1, 37), 1, vs1); + 0, std::vector(1, 37), 1, vs1, vs2, false); pull_status.wait(); - ASSERT_EQ(vs[0].size(), vs1[0].size()); + ASSERT_EQ(_vs[0].size(), vs1[0].size()); - for (int j = 0; j < vs[0].size(); j++) { - ASSERT_EQ(vs[0][j].first, vs1[0][j].first); + for (int j = 0; j < _vs[0].size(); j++) { + ASSERT_EQ(_vs[0][j], vs1[0][j]); } } } @@ -639,7 +642,7 @@ void testCache() { strcpy(str, "54321"); ::paddle::distributed::SampleResult* result = new ::paddle::distributed::SampleResult(5, str); - ::paddle::distributed::SampleKey skey = {6, 1}; + ::paddle::distributed::SampleKey skey = {6, 1, false}; std::vector> r; @@ -695,4 +698,4 @@ void testGraphToBuffer() { VLOG(0) << s1.get_feature(0); } -TEST(RunBrpcPushSparse, Run) { RunBrpcPushSparse(); } +TEST(RunBrpcPushSparse, Run) { RunBrpcPushSparse(); } \ No newline at end of file -- GitLab