未验证 提交 b61a6e71 编写于 作者: S seemingwang 提交者: GitHub

fix node transfer problem (#42674)

* enable graph-engine to return all id

* change vector's dimension

* change vector's dimension

* enlarge returned ids dimensions

* add actual_val

* change vlog

* fix bug

* bug fix

* bug fix

* fix display test

* singleton of gpu_graph_wrapper

* change sample result's structure to fit training

* recover sample code

* fix

* secondary sample

* add graph partition

* fix pybind

* optimize buffer allocation

* fix node transfer problem

* remove log

* support 32G+ graph on single gpu

* remove logs

* fix

* fix

* fix cpu query

* display info

* remove log

* remove empyt file
Co-authored-by: NDesmonDay <908660116@qq.com>
上级 2eacef49
...@@ -80,7 +80,7 @@ paddle::framework::GpuPsCommGraph GraphTable::make_gpu_ps_graph( ...@@ -80,7 +80,7 @@ paddle::framework::GpuPsCommGraph GraphTable::make_gpu_ps_graph(
} }
for (int i = 0; i < (int)tasks.size(); i++) tasks[i].get(); for (int i = 0; i < (int)tasks.size(); i++) tasks[i].get();
paddle::framework::GpuPsCommGraph res; paddle::framework::GpuPsCommGraph res;
unsigned int tot_len = 0; int64_t tot_len = 0;
for (int i = 0; i < task_pool_size_; i++) { for (int i = 0; i < task_pool_size_; i++) {
tot_len += edge_array[i].size(); tot_len += edge_array[i].size();
} }
...@@ -88,8 +88,8 @@ paddle::framework::GpuPsCommGraph GraphTable::make_gpu_ps_graph( ...@@ -88,8 +88,8 @@ paddle::framework::GpuPsCommGraph GraphTable::make_gpu_ps_graph(
// res.node_size = ids.size(); // res.node_size = ids.size();
// res.neighbor_list = new int64_t[tot_len]; // res.neighbor_list = new int64_t[tot_len];
// res.node_list = new paddle::framework::GpuPsGraphNode[ids.size()]; // res.node_list = new paddle::framework::GpuPsGraphNode[ids.size()];
res.init_on_cpu(tot_len, (unsigned int)ids.size()); res.init_on_cpu(tot_len, ids.size());
unsigned int offset = 0, ind = 0; int64_t offset = 0, ind = 0;
for (int i = 0; i < task_pool_size_; i++) { for (int i = 0; i < task_pool_size_; i++) {
for (int j = 0; j < (int)node_array[i].size(); j++) { for (int j = 0; j < (int)node_array[i].size(); j++) {
res.node_list[ind] = node_array[i][j]; res.node_list[ind] = node_array[i][j];
...@@ -126,8 +126,8 @@ int32_t GraphTable::add_node_to_ssd(int type_id, int idx, int64_t src_id, ...@@ -126,8 +126,8 @@ int32_t GraphTable::add_node_to_ssd(int type_id, int idx, int64_t src_id,
_db->put(src_id % shard_num % task_pool_size_, ch, _db->put(src_id % shard_num % task_pool_size_, ch,
sizeof(int) * 2 + sizeof(int64_t), (char *)data, len); sizeof(int) * 2 + sizeof(int64_t), (char *)data, len);
} }
_db->flush(src_id % shard_num % task_pool_size_); // _db->flush(src_id % shard_num % task_pool_size_);
std::string x; // std::string x;
// if (_db->get(src_id % shard_num % task_pool_size_, ch, sizeof(int64_t) + // if (_db->get(src_id % shard_num % task_pool_size_, ch, sizeof(int64_t) +
// 2 * sizeof(int), x) ==0){ // 2 * sizeof(int), x) ==0){
// VLOG(0)<<"put result"; // VLOG(0)<<"put result";
...@@ -135,6 +135,18 @@ int32_t GraphTable::add_node_to_ssd(int type_id, int idx, int64_t src_id, ...@@ -135,6 +135,18 @@ int32_t GraphTable::add_node_to_ssd(int type_id, int idx, int64_t src_id,
// VLOG(0)<<"get an id "<<*((int64_t *)(x.c_str() + i)); // VLOG(0)<<"get an id "<<*((int64_t *)(x.c_str() + i));
// } // }
//} //}
// if(src_id == 429){
// str = "";
// _db->get(src_id % shard_num % task_pool_size_, ch,
// sizeof(int) * 2 + sizeof(int64_t), str);
// int64_t *stored_data = ((int64_t *)str.c_str());
// int n = str.size() / sizeof(int64_t);
// VLOG(0)<<"429 has "<<n<<"neighbors";
// for(int i =0;i< n;i++){
// VLOG(0)<<"get an id "<<*((int64_t *)(str.c_str() +
// i*sizeof(int64_t)));
// }
// }
} }
return 0; return 0;
} }
...@@ -146,6 +158,7 @@ char *GraphTable::random_sample_neighbor_from_ssd( ...@@ -146,6 +158,7 @@ char *GraphTable::random_sample_neighbor_from_ssd(
return NULL; return NULL;
} }
std::string str; std::string str;
VLOG(2) << "sample ssd for key " << id;
char ch[sizeof(int) * 2 + sizeof(int64_t)]; char ch[sizeof(int) * 2 + sizeof(int64_t)];
memset(ch, 0, sizeof(int)); memset(ch, 0, sizeof(int));
memcpy(ch + sizeof(int), &idx, sizeof(int)); memcpy(ch + sizeof(int), &idx, sizeof(int));
...@@ -178,6 +191,9 @@ char *GraphTable::random_sample_neighbor_from_ssd( ...@@ -178,6 +191,9 @@ char *GraphTable::random_sample_neighbor_from_ssd(
memcpy(buff + i * Node::id_size, &data[pos], Node::id_size); memcpy(buff + i * Node::id_size, &data[pos], Node::id_size);
// res.push_back(data[pos]); // res.push_back(data[pos]);
} }
for (int i = 0; i < actual_size; i += 8) {
VLOG(2) << "sampled an neighbor " << *(int64_t *)&buff[i];
}
return buff; return buff;
} }
actual_size = 0; actual_size = 0;
...@@ -376,7 +392,7 @@ int32_t GraphTable::load_edges_to_ssd(const std::string &path, ...@@ -376,7 +392,7 @@ int32_t GraphTable::load_edges_to_ssd(const std::string &path,
} }
int32_t GraphTable::dump_edges_to_ssd(int idx) { int32_t GraphTable::dump_edges_to_ssd(int idx) {
VLOG(0) << "calling dump edges to ssd"; VLOG(2) << "calling dump edges to ssd";
const int64_t fixed_size = 10000; const int64_t fixed_size = 10000;
// std::vector<int64_t> edge_array[task_pool_size_]; // std::vector<int64_t> edge_array[task_pool_size_];
std::vector<std::unordered_map<int64_t, int>> count(task_pool_size_); std::vector<std::unordered_map<int64_t, int>> count(task_pool_size_);
...@@ -387,9 +403,9 @@ int32_t GraphTable::dump_edges_to_ssd(int idx) { ...@@ -387,9 +403,9 @@ int32_t GraphTable::dump_edges_to_ssd(int idx) {
[&, i, this]() -> int64_t { [&, i, this]() -> int64_t {
int64_t cost = 0; int64_t cost = 0;
std::vector<Node *> &v = shards[i]->get_bucket(); std::vector<Node *> &v = shards[i]->get_bucket();
std::vector<int64_t> s;
size_t ind = i % this->task_pool_size_; size_t ind = i % this->task_pool_size_;
for (size_t j = 0; j < v.size(); j++) { for (size_t j = 0; j < v.size(); j++) {
std::vector<int64_t> s;
for (int k = 0; k < v[j]->get_neighbor_size(); k++) { for (int k = 0; k < v[j]->get_neighbor_size(); k++) {
s.push_back(v[j]->get_neighbor_id(k)); s.push_back(v[j]->get_neighbor_id(k));
} }
...@@ -405,7 +421,7 @@ int32_t GraphTable::dump_edges_to_ssd(int idx) { ...@@ -405,7 +421,7 @@ int32_t GraphTable::dump_edges_to_ssd(int idx) {
} }
int32_t GraphTable::make_complementary_graph(int idx, int64_t byte_size) { int32_t GraphTable::make_complementary_graph(int idx, int64_t byte_size) {
VLOG(0) << "make_complementary_graph"; VLOG(0) << "make_complementary_graph";
const int64_t fixed_size = 10000; const int64_t fixed_size = byte_size / 8;
// std::vector<int64_t> edge_array[task_pool_size_]; // std::vector<int64_t> edge_array[task_pool_size_];
std::vector<std::unordered_map<int64_t, int>> count(task_pool_size_); std::vector<std::unordered_map<int64_t, int>> count(task_pool_size_);
std::vector<std::future<int>> tasks; std::vector<std::future<int>> tasks;
...@@ -416,7 +432,7 @@ int32_t GraphTable::make_complementary_graph(int idx, int64_t byte_size) { ...@@ -416,7 +432,7 @@ int32_t GraphTable::make_complementary_graph(int idx, int64_t byte_size) {
std::vector<Node *> &v = shards[i]->get_bucket(); std::vector<Node *> &v = shards[i]->get_bucket();
size_t ind = i % this->task_pool_size_; size_t ind = i % this->task_pool_size_;
for (size_t j = 0; j < v.size(); j++) { for (size_t j = 0; j < v.size(); j++) {
size_t location = v[j]->get_id(); // size_t location = v[j]->get_id();
for (int k = 0; k < v[j]->get_neighbor_size(); k++) { for (int k = 0; k < v[j]->get_neighbor_size(); k++) {
count[ind][v[j]->get_neighbor_id(k)]++; count[ind][v[j]->get_neighbor_id(k)]++;
} }
...@@ -424,19 +440,12 @@ int32_t GraphTable::make_complementary_graph(int idx, int64_t byte_size) { ...@@ -424,19 +440,12 @@ int32_t GraphTable::make_complementary_graph(int idx, int64_t byte_size) {
return 0; return 0;
})); }));
} }
for (size_t i = 0; i < tasks.size(); i++) tasks[i].get();
std::unordered_map<int64_t, int> final_count; std::unordered_map<int64_t, int> final_count;
std::map<int, std::vector<int64_t>> count_to_id; std::map<int, std::vector<int64_t>> count_to_id;
std::vector<int64_t> buffer; std::vector<int64_t> buffer;
for (auto p : edge_shards[idx]) { clear_graph(idx);
delete p;
}
edge_shards[idx].clear();
for (size_t i = 0; i < shard_num_per_server; i++) {
edge_shards[idx].push_back(new GraphShard());
}
for (size_t i = 0; i < tasks.size(); i++) tasks[i].get();
for (int i = 0; i < task_pool_size_; i++) { for (int i = 0; i < task_pool_size_; i++) {
for (auto &p : count[i]) { for (auto &p : count[i]) {
final_count[p.first] = final_count[p.first] + p.second; final_count[p.first] = final_count[p.first] + p.second;
...@@ -447,13 +456,13 @@ int32_t GraphTable::make_complementary_graph(int idx, int64_t byte_size) { ...@@ -447,13 +456,13 @@ int32_t GraphTable::make_complementary_graph(int idx, int64_t byte_size) {
count_to_id[p.second].push_back(p.first); count_to_id[p.second].push_back(p.first);
VLOG(2) << p.first << " appear " << p.second << " times"; VLOG(2) << p.first << " appear " << p.second << " times";
} }
// std::map<int,std::vector<int64_t>>::iterator iter= count_to_id.rbegin();
auto iter = count_to_id.rbegin(); auto iter = count_to_id.rbegin();
while (iter != count_to_id.rend() && byte_size > 0) { while (iter != count_to_id.rend() && byte_size > 0) {
for (auto x : iter->second) { for (auto x : iter->second) {
buffer.push_back(x); buffer.push_back(x);
if (buffer.size() >= fixed_size) { if (buffer.size() >= fixed_size) {
int64_t res = load_graph_to_memory_from_ssd(idx, buffer); int64_t res = load_graph_to_memory_from_ssd(idx, buffer);
buffer.clear();
byte_size -= res; byte_size -= res;
} }
if (byte_size <= 0) break; if (byte_size <= 0) break;
...@@ -1265,13 +1274,14 @@ int32_t GraphTable::random_sample_neighbors( ...@@ -1265,13 +1274,14 @@ int32_t GraphTable::random_sample_neighbors(
if (node == nullptr) { if (node == nullptr) {
#ifdef PADDLE_WITH_HETERPS #ifdef PADDLE_WITH_HETERPS
if (search_level == 2) { if (search_level == 2) {
VLOG(2) << "enter sample from ssd"; VLOG(2) << "enter sample from ssd for node_id " << node_id;
char *buffer_addr = random_sample_neighbor_from_ssd( char *buffer_addr = random_sample_neighbor_from_ssd(
idx, node_id, sample_size, rng, actual_size); idx, node_id, sample_size, rng, actual_size);
if (actual_size != 0) { if (actual_size != 0) {
std::shared_ptr<char> &buffer = buffers[idx]; std::shared_ptr<char> &buffer = buffers[idy];
buffer.reset(buffer_addr, char_del); buffer.reset(buffer_addr, char_del);
} }
VLOG(2) << "actual sampled size from ssd = " << actual_sizes[idy];
continue; continue;
} }
#endif #endif
......
...@@ -13,11 +13,10 @@ IF(WITH_GPU) ...@@ -13,11 +13,10 @@ IF(WITH_GPU)
nv_test(test_heter_comm SRCS feature_value.h DEPS heter_comm) nv_test(test_heter_comm SRCS feature_value.h DEPS heter_comm)
nv_library(heter_ps SRCS heter_ps.cu DEPS heter_comm) nv_library(heter_ps SRCS heter_ps.cu DEPS heter_comm)
if(WITH_PSCORE) if(WITH_PSCORE)
nv_library(graph_gpu_ps SRCS graph_gpu_ps_table.h DEPS heter_comm table hashtable_kernel) nv_library(graph_gpu_ps SRCS graph_gpu_ps_table_inl.cu DEPS heter_comm table hashtable_kernel)
nv_library(graph_sampler SRCS graph_sampler_inl.h DEPS graph_gpu_ps) nv_library(graph_sampler SRCS graph_sampler_inl.h DEPS graph_gpu_ps)
nv_library(graph_gpu_wrapper SRCS graph_gpu_wrapper.cu DEPS heter_comm table heter_comm_kernel hashtable_kernel heter_ps ${HETERPS_DEPS} graph_gpu_ps)
nv_test(test_cpu_query SRCS test_cpu_query.cu DEPS heter_comm table heter_comm_kernel hashtable_kernel heter_ps ${HETERPS_DEPS}) nv_test(test_cpu_query SRCS test_cpu_query.cu DEPS heter_comm table heter_comm_kernel hashtable_kernel heter_ps ${HETERPS_DEPS} graph_gpu_ps graph_gpu_wrapper)
nv_library(graph_gpu_wrapper SRCS graph_gpu_wrapper.cu DEPS heter_comm table heter_comm_kernel hashtable_kernel heter_ps ${HETERPS_DEPS})
#ADD_EXECUTABLE(test_sample_rate test_sample_rate.cu) #ADD_EXECUTABLE(test_sample_rate test_sample_rate.cu)
#target_link_libraries(test_sample_rate heter_comm table heter_comm_kernel hashtable_kernel heter_ps ${HETERPS_DEPS}) #target_link_libraries(test_sample_rate heter_comm table heter_comm_kernel hashtable_kernel heter_ps ${HETERPS_DEPS})
#nv_test(test_sample_rate SRCS test_sample_rate.cu DEPS heter_comm table heter_comm_kernel hashtable_kernel heter_ps ${HETERPS_DEPS}) #nv_test(test_sample_rate SRCS test_sample_rate.cu DEPS heter_comm table heter_comm_kernel hashtable_kernel heter_ps ${HETERPS_DEPS})
......
...@@ -24,7 +24,7 @@ namespace paddle { ...@@ -24,7 +24,7 @@ namespace paddle {
namespace framework { namespace framework {
struct GpuPsGraphNode { struct GpuPsGraphNode {
int64_t node_id; int64_t node_id;
unsigned int neighbor_size, neighbor_offset; int64_t neighbor_size, neighbor_offset;
// this node's neighbor is stored on [neighbor_offset,neighbor_offset + // this node's neighbor is stored on [neighbor_offset,neighbor_offset +
// neighbor_size) of int64_t *neighbor_list; // neighbor_size) of int64_t *neighbor_list;
}; };
...@@ -32,17 +32,17 @@ struct GpuPsGraphNode { ...@@ -32,17 +32,17 @@ struct GpuPsGraphNode {
struct GpuPsCommGraph { struct GpuPsCommGraph {
int64_t *neighbor_list; int64_t *neighbor_list;
GpuPsGraphNode *node_list; GpuPsGraphNode *node_list;
unsigned int neighbor_size, node_size; int64_t neighbor_size, node_size;
// the size of neighbor array and graph_node_list array // the size of neighbor array and graph_node_list array
GpuPsCommGraph() GpuPsCommGraph()
: neighbor_list(NULL), node_list(NULL), neighbor_size(0), node_size(0) {} : neighbor_list(NULL), node_list(NULL), neighbor_size(0), node_size(0) {}
GpuPsCommGraph(int64_t *neighbor_list_, GpuPsGraphNode *node_list_, GpuPsCommGraph(int64_t *neighbor_list_, GpuPsGraphNode *node_list_,
unsigned int neighbor_size_, unsigned int node_size_) int64_t neighbor_size_, int64_t node_size_)
: neighbor_list(neighbor_list_), : neighbor_list(neighbor_list_),
node_list(node_list_), node_list(node_list_),
neighbor_size(neighbor_size_), neighbor_size(neighbor_size_),
node_size(node_size_) {} node_size(node_size_) {}
void init_on_cpu(unsigned int neighbor_size, unsigned int node_size) { void init_on_cpu(int64_t neighbor_size, int64_t node_size) {
this->neighbor_size = neighbor_size; this->neighbor_size = neighbor_size;
this->node_size = node_size; this->node_size = node_size;
this->neighbor_list = new int64_t[neighbor_size]; this->neighbor_list = new int64_t[neighbor_size];
...@@ -208,12 +208,43 @@ struct NeighborSampleResult { ...@@ -208,12 +208,43 @@ struct NeighborSampleResult {
delete[] ac_size; delete[] ac_size;
VLOG(0) << " ------------------"; VLOG(0) << " ------------------";
} }
NeighborSampleResult(){}; std::vector<int64_t> get_sampled_graph(NeighborSampleQuery q) {
~NeighborSampleResult() { std::vector<int64_t> graph;
// if (val != NULL) cudaFree(val); int64_t *sample_keys = new int64_t[q.len];
// if (actual_sample_size != NULL) cudaFree(actual_sample_size); std::string key_str;
// if (offset != NULL) cudaFree(offset); cudaMemcpy(sample_keys, q.key, q.len * sizeof(int64_t),
cudaMemcpyDeviceToHost);
int64_t *res = new int64_t[sample_size * key_size];
cudaMemcpy(res, val, sample_size * key_size * sizeof(int64_t),
cudaMemcpyDeviceToHost);
int *ac_size = new int[key_size];
cudaMemcpy(ac_size, actual_sample_size, key_size * sizeof(int),
cudaMemcpyDeviceToHost); // 3, 1, 3
int total_sample_size = 0;
for (int i = 0; i < key_size; i++) {
total_sample_size += ac_size[i];
}
int64_t *res2 = new int64_t[total_sample_size]; // r
cudaMemcpy(res2, actual_val, total_sample_size * sizeof(int64_t),
cudaMemcpyDeviceToHost); // r
int start = 0;
for (int i = 0; i < key_size; i++) {
graph.push_back(sample_keys[i]);
graph.push_back(ac_size[i]);
for (int j = 0; j < ac_size[i]; j++) {
graph.push_back(res2[start + j]);
}
start += ac_size[i]; // r
}
delete[] res;
delete[] res2; // r
delete[] ac_size;
delete[] sample_keys;
return graph;
} }
NeighborSampleResult(){};
~NeighborSampleResult() {}
}; };
struct NodeQueryResult { struct NodeQueryResult {
......
...@@ -23,15 +23,17 @@ ...@@ -23,15 +23,17 @@
#ifdef PADDLE_WITH_HETERPS #ifdef PADDLE_WITH_HETERPS
namespace paddle { namespace paddle {
namespace framework { namespace framework {
class GpuPsGraphTable : public HeterComm<int64_t, unsigned int, int> { class GpuPsGraphTable : public HeterComm<int64_t, int64_t, int> {
public: public:
GpuPsGraphTable(std::shared_ptr<HeterPsResource> resource, int topo_aware) GpuPsGraphTable(std::shared_ptr<HeterPsResource> resource, int topo_aware)
: HeterComm<int64_t, unsigned int, int>(1, resource) { : HeterComm<int64_t, int64_t, int>(1, resource) {
load_factor_ = 0.25; load_factor_ = 0.25;
rw_lock.reset(new pthread_rwlock_t()); rw_lock.reset(new pthread_rwlock_t());
gpu_num = resource_->total_device(); gpu_num = resource_->total_device();
memset(global_device_map, -1, sizeof(global_device_map));
for (int i = 0; i < gpu_num; i++) { for (int i = 0; i < gpu_num; i++) {
gpu_graph_list.push_back(GpuPsCommGraph()); gpu_graph_list.push_back(GpuPsCommGraph());
global_device_map[resource_->dev_id(i)] = i;
sample_status.push_back(NULL); sample_status.push_back(NULL);
tables_.push_back(NULL); tables_.push_back(NULL);
} }
...@@ -98,27 +100,20 @@ class GpuPsGraphTable : public HeterComm<int64_t, unsigned int, int> { ...@@ -98,27 +100,20 @@ class GpuPsGraphTable : public HeterComm<int64_t, unsigned int, int> {
NeighborSampleResult graph_neighbor_sample_v2(int gpu_id, int64_t *key, NeighborSampleResult graph_neighbor_sample_v2(int gpu_id, int64_t *key,
int sample_size, int len, int sample_size, int len,
bool cpu_query_switch); bool cpu_query_switch);
void init_sample_status();
void free_sample_status();
NodeQueryResult query_node_list(int gpu_id, int start, int query_size); NodeQueryResult query_node_list(int gpu_id, int start, int query_size);
void clear_graph_info(); void clear_graph_info();
void display_sample_res(void *key, void *val, int len, int sample_len);
void move_neighbor_sample_result_to_source_gpu(int gpu_id, int gpu_num, void move_neighbor_sample_result_to_source_gpu(int gpu_id, int gpu_num,
int sample_size, int *h_left, int sample_size, int *h_left,
int *h_right, int *h_right,
int64_t *src_sample_res, int64_t *src_sample_res,
int *actual_sample_size); int *actual_sample_size);
// void move_neighbor_sample_result_to_source_gpu(
// int gpu_id, int gpu_num, int *h_left, int *h_right,
// int64_t *src_sample_res, thrust::host_vector<int> &total_sample_size);
// void move_neighbor_sample_size_to_source_gpu(int gpu_id, int gpu_num,
// int *h_left, int *h_right,
// int *actual_sample_size,
// int *total_sample_size);
int init_cpu_table(const paddle::distributed::GraphParameter &graph); int init_cpu_table(const paddle::distributed::GraphParameter &graph);
// int load(const std::string &path, const std::string &param);
// virtual int32_t end_graph_sampling() {
// return cpu_graph_table->end_graph_sampling();
// }
int gpu_num; int gpu_num;
std::vector<GpuPsCommGraph> gpu_graph_list; std::vector<GpuPsCommGraph> gpu_graph_list;
int global_device_map[32];
std::vector<int *> sample_status; std::vector<int *> sample_status;
const int parallel_sample_size = 1; const int parallel_sample_size = 1;
const int dim_y = 256; const int dim_y = 256;
...@@ -130,5 +125,5 @@ class GpuPsGraphTable : public HeterComm<int64_t, unsigned int, int> { ...@@ -130,5 +125,5 @@ class GpuPsGraphTable : public HeterComm<int64_t, unsigned int, int> {
}; };
} }
}; };
#include "paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table_inl.h" //#include "paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table_inl.h"
#endif #endif
...@@ -18,41 +18,8 @@ ...@@ -18,41 +18,8 @@
namespace paddle { namespace paddle {
namespace framework { namespace framework {
#ifdef PADDLE_WITH_HETERPS #ifdef PADDLE_WITH_HETERPS
std::string nodes[] = {
std::string("user\t37\ta 0.34\tb 13 14\tc hello\td abc"),
std::string("user\t96\ta 0.31\tb 15 10\tc 96hello\td abcd"),
std::string("user\t59\ta 0.11\tb 11 14"),
std::string("user\t97\ta 0.11\tb 12 11"),
std::string("item\t45\ta 0.21"),
std::string("item\t145\ta 0.21"),
std::string("item\t112\ta 0.21"),
std::string("item\t48\ta 0.21"),
std::string("item\t247\ta 0.21"),
std::string("item\t111\ta 0.21"),
std::string("item\t46\ta 0.21"),
std::string("item\t146\ta 0.21"),
std::string("item\t122\ta 0.21"),
std::string("item\t49\ta 0.21"),
std::string("item\t248\ta 0.21"),
std::string("item\t113\ta 0.21")};
char node_file_name[] = "nodes.txt";
std::vector<std::string> user_feature_name = {"a", "b", "c", "d"};
std::vector<std::string> item_feature_name = {"a"};
std::vector<std::string> user_feature_dtype = {"float32", "int32", "string",
"string"};
std::vector<std::string> item_feature_dtype = {"float32"};
std::vector<int> user_feature_shape = {1, 2, 1, 1};
std::vector<int> item_feature_shape = {1};
void prepare_file(char file_name[]) {
std::ofstream ofile;
ofile.open(file_name);
for (auto x : nodes) {
ofile << x << std::endl;
}
ofile.close();
}
std::shared_ptr<GraphGpuWrapper> GraphGpuWrapper::s_instance_(nullptr);
void GraphGpuWrapper::set_device(std::vector<int> ids) { void GraphGpuWrapper::set_device(std::vector<int> ids) {
for (auto device_id : ids) { for (auto device_id : ids) {
device_id_mapping.push_back(device_id); device_id_mapping.push_back(device_id);
...@@ -205,96 +172,35 @@ void GraphGpuWrapper::upload_batch(int idx, ...@@ -205,96 +172,35 @@ void GraphGpuWrapper::upload_batch(int idx,
// g->build_graph_from_cpu(vec); // g->build_graph_from_cpu(vec);
} }
void GraphGpuWrapper::initialize() { // void GraphGpuWrapper::test() {
std::vector<int> device_id_mapping; // int64_t cpu_key[3] = {0, 1, 2};
for (int i = 0; i < 2; i++) device_id_mapping.push_back(i); // void *key;
int gpu_num = device_id_mapping.size(); // platform::CUDADeviceGuard guard(0);
::paddle::distributed::GraphParameter table_proto; // cudaMalloc((void **)&key, 3 * sizeof(int64_t));
table_proto.add_edge_types("u2u"); // cudaMemcpy(key, cpu_key, 3 * sizeof(int64_t), cudaMemcpyHostToDevice);
table_proto.add_node_types("user"); // auto neighbor_sample_res =
table_proto.add_node_types("item"); // ((GpuPsGraphTable *)graph_table)
::paddle::distributed::GraphFeature *g_f = table_proto.add_graph_feature(); // ->graph_neighbor_sample(0, (int64_t *)key, 2, 3);
// int64_t *res = new int64_t[7];
for (int i = 0; i < user_feature_name.size(); i++) { // cudaMemcpy(res, neighbor_sample_res.val, 3 * 2 * sizeof(int64_t),
g_f->add_name(user_feature_name[i]); // cudaMemcpyDeviceToHost);
g_f->add_dtype(user_feature_dtype[i]); // int *actual_sample_size = new int[3];
g_f->add_shape(user_feature_shape[i]); // cudaMemcpy(actual_sample_size, neighbor_sample_res.actual_sample_size,
} // 3 * sizeof(int),
::paddle::distributed::GraphFeature *g_f1 = table_proto.add_graph_feature(); // cudaMemcpyDeviceToHost); // 3, 1, 3
for (int i = 0; i < item_feature_name.size(); i++) {
g_f1->add_name(item_feature_name[i]);
g_f1->add_dtype(item_feature_dtype[i]);
g_f1->add_shape(item_feature_shape[i]);
}
prepare_file(node_file_name);
table_proto.set_shard_num(24);
std::shared_ptr<HeterPsResource> resource = // //{0,9} or {9,0} is expected for key 0
std::make_shared<HeterPsResource>(device_id_mapping); // //{0,2} or {2,0} is expected for key 1
resource->enable_p2p(); // //{1,3} or {3,1} is expected for key 2
GpuPsGraphTable *g = new GpuPsGraphTable(resource, 1); // for (int i = 0; i < 3; i++) {
g->init_cpu_table(table_proto); // VLOG(0) << "actual sample size for " << i << " is "
graph_table = (char *)g; // << actual_sample_size[i];
g->cpu_graph_table->Load(node_file_name, "nuser"); // for (int j = 0; j < actual_sample_size[i]; j++) {
g->cpu_graph_table->Load(node_file_name, "nitem"); // VLOG(0) << "sampled an neighbor for node" << i << " : " << res[i * 2 +
std::remove(node_file_name); // j];
std::vector<paddle::framework::GpuPsCommGraph> vec; // }
std::vector<int64_t> node_ids; // }
node_ids.push_back(37); // }
node_ids.push_back(96);
std::vector<std::vector<std::string>> node_feat(2,
std::vector<std::string>(2));
std::vector<std::string> feature_names;
feature_names.push_back(std::string("c"));
feature_names.push_back(std::string("d"));
g->cpu_graph_table->get_node_feat(0, node_ids, feature_names, node_feat);
VLOG(0) << "get_node_feat: " << node_feat[0][0];
VLOG(0) << "get_node_feat: " << node_feat[0][1];
VLOG(0) << "get_node_feat: " << node_feat[1][0];
VLOG(0) << "get_node_feat: " << node_feat[1][1];
int n = 10;
std::vector<int64_t> ids0, ids1;
for (int i = 0; i < n; i++) {
g->cpu_graph_table->add_comm_edge(0, i, (i + 1) % n);
g->cpu_graph_table->add_comm_edge(0, i, (i - 1 + n) % n);
if (i % 2 == 0) ids0.push_back(i);
}
g->cpu_graph_table->build_sampler(0);
ids1.push_back(5);
vec.push_back(g->cpu_graph_table->make_gpu_ps_graph(0, ids0));
vec.push_back(g->cpu_graph_table->make_gpu_ps_graph(0, ids1));
vec[0].display_on_cpu();
vec[1].display_on_cpu();
g->build_graph_from_cpu(vec);
}
void GraphGpuWrapper::test() {
int64_t cpu_key[3] = {0, 1, 2};
void *key;
platform::CUDADeviceGuard guard(0);
cudaMalloc((void **)&key, 3 * sizeof(int64_t));
cudaMemcpy(key, cpu_key, 3 * sizeof(int64_t), cudaMemcpyHostToDevice);
auto neighbor_sample_res =
((GpuPsGraphTable *)graph_table)
->graph_neighbor_sample(0, (int64_t *)key, 2, 3);
int64_t *res = new int64_t[7];
cudaMemcpy(res, neighbor_sample_res.val, 3 * 2 * sizeof(int64_t),
cudaMemcpyDeviceToHost);
int *actual_sample_size = new int[3];
cudaMemcpy(actual_sample_size, neighbor_sample_res.actual_sample_size,
3 * sizeof(int),
cudaMemcpyDeviceToHost); // 3, 1, 3
//{0,9} or {9,0} is expected for key 0
//{0,2} or {2,0} is expected for key 1
//{1,3} or {3,1} is expected for key 2
for (int i = 0; i < 3; i++) {
VLOG(0) << "actual sample size for " << i << " is "
<< actual_sample_size[i];
for (int j = 0; j < actual_sample_size[i]; j++) {
VLOG(0) << "sampled an neighbor for node" << i << " : " << res[i * 2 + j];
}
}
}
NeighborSampleResult GraphGpuWrapper::graph_neighbor_sample_v3( NeighborSampleResult GraphGpuWrapper::graph_neighbor_sample_v3(
NeighborSampleQuery q, bool cpu_switch) { NeighborSampleQuery q, bool cpu_switch) {
return ((GpuPsGraphTable *)graph_table) return ((GpuPsGraphTable *)graph_table)
...@@ -314,7 +220,6 @@ std::vector<int64_t> GraphGpuWrapper::graph_neighbor_sample( ...@@ -314,7 +220,6 @@ std::vector<int64_t> GraphGpuWrapper::graph_neighbor_sample(
auto neighbor_sample_res = auto neighbor_sample_res =
((GpuPsGraphTable *)graph_table) ((GpuPsGraphTable *)graph_table)
->graph_neighbor_sample(gpu_id, cuda_key, sample_size, key.size()); ->graph_neighbor_sample(gpu_id, cuda_key, sample_size, key.size());
int *actual_sample_size = new int[key.size()]; int *actual_sample_size = new int[key.size()];
cudaMemcpy(actual_sample_size, neighbor_sample_res.actual_sample_size, cudaMemcpy(actual_sample_size, neighbor_sample_res.actual_sample_size,
key.size() * sizeof(int), key.size() * sizeof(int),
...@@ -323,7 +228,6 @@ std::vector<int64_t> GraphGpuWrapper::graph_neighbor_sample( ...@@ -323,7 +228,6 @@ std::vector<int64_t> GraphGpuWrapper::graph_neighbor_sample(
for (int i = 0; i < key.size(); i++) { for (int i = 0; i < key.size(); i++) {
cumsum += actual_sample_size[i]; cumsum += actual_sample_size[i];
} }
/* VLOG(0) << "cumsum " << cumsum; */
std::vector<int64_t> cpu_key, res; std::vector<int64_t> cpu_key, res;
cpu_key.resize(key.size() * sample_size); cpu_key.resize(key.size() * sample_size);
...@@ -340,11 +244,18 @@ std::vector<int64_t> GraphGpuWrapper::graph_neighbor_sample( ...@@ -340,11 +244,18 @@ std::vector<int64_t> GraphGpuWrapper::graph_neighbor_sample(
/* for(int i = 0;i < res.size();i ++) { */ /* for(int i = 0;i < res.size();i ++) { */
/* VLOG(0) << i << " " << res[i]; */ /* VLOG(0) << i << " " << res[i]; */
/* } */ /* } */
delete[] actual_sample_size;
cudaFree(cuda_key); cudaFree(cuda_key);
return res; return res;
} }
void GraphGpuWrapper::init_sample_status() {
((GpuPsGraphTable *)graph_table)->init_sample_status();
}
void GraphGpuWrapper::free_sample_status() {
((GpuPsGraphTable *)graph_table)->free_sample_status();
}
NodeQueryResult GraphGpuWrapper::query_node_list(int gpu_id, int start, NodeQueryResult GraphGpuWrapper::query_node_list(int gpu_id, int start,
int query_size) { int query_size) {
return ((GpuPsGraphTable *)graph_table) return ((GpuPsGraphTable *)graph_table)
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#pragma once
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
...@@ -22,10 +23,13 @@ namespace framework { ...@@ -22,10 +23,13 @@ namespace framework {
#ifdef PADDLE_WITH_HETERPS #ifdef PADDLE_WITH_HETERPS
class GraphGpuWrapper { class GraphGpuWrapper {
public: public:
static GraphGpuWrapper* GetInstance() { static std::shared_ptr<GraphGpuWrapper> GetInstance() {
static GraphGpuWrapper wrapper; if (NULL == s_instance_) {
return &wrapper; s_instance_.reset(new paddle::framework::GraphGpuWrapper());
}
return s_instance_;
} }
static std::shared_ptr<GraphGpuWrapper> s_instance_;
void initialize(); void initialize();
void test(); void test();
void set_device(std::vector<int> ids); void set_device(std::vector<int> ids);
...@@ -53,6 +57,8 @@ class GraphGpuWrapper { ...@@ -53,6 +57,8 @@ class GraphGpuWrapper {
std::vector<int64_t>& key, std::vector<int64_t>& key,
int sample_size); int sample_size);
void init_sample_status();
void free_sample_status();
std::unordered_map<std::string, int> edge_to_id, feature_to_id; std::unordered_map<std::string, int> edge_to_id, feature_to_id;
std::vector<std::string> id_to_feature, id_to_edge; std::vector<std::string> id_to_feature, id_to_edge;
std::vector<std::unordered_map<std::string, int>> table_feat_mapping; std::vector<std::unordered_map<std::string, int>> table_feat_mapping;
...@@ -62,7 +68,7 @@ class GraphGpuWrapper { ...@@ -62,7 +68,7 @@ class GraphGpuWrapper {
::paddle::distributed::GraphParameter table_proto; ::paddle::distributed::GraphParameter table_proto;
std::vector<int> device_id_mapping; std::vector<int> device_id_mapping;
int search_level = 1; int search_level = 1;
char* graph_table; void* graph_table;
}; };
#endif #endif
} }
......
...@@ -320,6 +320,7 @@ void HashTable<KeyType, ValType>::update(const KeyType* d_keys, ...@@ -320,6 +320,7 @@ void HashTable<KeyType, ValType>::update(const KeyType* d_keys,
template class HashTable<unsigned long, paddle::framework::FeatureValue>; template class HashTable<unsigned long, paddle::framework::FeatureValue>;
template class HashTable<long, int>; template class HashTable<long, int>;
template class HashTable<long, long>;
template class HashTable<long, unsigned long>; template class HashTable<long, unsigned long>;
template class HashTable<long, unsigned int>; template class HashTable<long, unsigned int>;
...@@ -334,6 +335,9 @@ template void HashTable<long, int>::get<cudaStream_t>(const long* d_keys, ...@@ -334,6 +335,9 @@ template void HashTable<long, int>::get<cudaStream_t>(const long* d_keys,
template void HashTable<long, unsigned long>::get<cudaStream_t>( template void HashTable<long, unsigned long>::get<cudaStream_t>(
const long* d_keys, unsigned long* d_vals, size_t len, cudaStream_t stream); const long* d_keys, unsigned long* d_vals, size_t len, cudaStream_t stream);
template void HashTable<long, long>::get<cudaStream_t>(const long* d_keys,
long* d_vals, size_t len,
cudaStream_t stream);
template void HashTable<long, unsigned int>::get<cudaStream_t>( template void HashTable<long, unsigned int>::get<cudaStream_t>(
const long* d_keys, unsigned int* d_vals, size_t len, cudaStream_t stream); const long* d_keys, unsigned int* d_vals, size_t len, cudaStream_t stream);
// template void // template void
...@@ -350,6 +354,10 @@ template void HashTable<long, int>::insert<cudaStream_t>(const long* d_keys, ...@@ -350,6 +354,10 @@ template void HashTable<long, int>::insert<cudaStream_t>(const long* d_keys,
const int* d_vals, const int* d_vals,
size_t len, size_t len,
cudaStream_t stream); cudaStream_t stream);
template void HashTable<long, long>::insert<cudaStream_t>(const long* d_keys,
const long* d_vals,
size_t len,
cudaStream_t stream);
template void HashTable<long, unsigned long>::insert<cudaStream_t>( template void HashTable<long, unsigned long>::insert<cudaStream_t>(
const long* d_keys, const unsigned long* d_vals, size_t len, const long* d_keys, const unsigned long* d_vals, size_t len,
......
...@@ -193,9 +193,10 @@ void HeterComm<KeyType, ValType, GradType>::walk_to_dest(int start_index, ...@@ -193,9 +193,10 @@ void HeterComm<KeyType, ValType, GradType>::walk_to_dest(int start_index,
memory_copy(dst_place, node.key_storage, src_place, memory_copy(dst_place, node.key_storage, src_place,
reinterpret_cast<char*>(src_key + h_left[i]), reinterpret_cast<char*>(src_key + h_left[i]),
node.key_bytes_len, node.in_stream); node.key_bytes_len, node.in_stream);
#if defined(PADDLE_WITH_CUDA) // adapt for gpu-graph // #if defined(PADDLE_WITH_CUDA) // adapt for gpu-graph
cudaMemsetAsync(node.val_storage, -1, node.val_bytes_len, node.in_stream); // cudaMemsetAsync(node.val_storage, -1, node.val_bytes_len,
#endif // node.in_stream);
// #endif
if (need_copy_val) { if (need_copy_val) {
memory_copy(dst_place, node.val_storage, src_place, memory_copy(dst_place, node.val_storage, src_place,
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include <vector> #include <vector>
#include "paddle/fluid/framework/fleet/heter_ps/feature_value.h" #include "paddle/fluid/framework/fleet/heter_ps/feature_value.h"
#include "paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table.h" #include "paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table.h"
#include "paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h"
#include "paddle/fluid/framework/fleet/heter_ps/heter_comm.h" #include "paddle/fluid/framework/fleet/heter_ps/heter_comm.h"
#include "paddle/fluid/framework/fleet/heter_ps/heter_resource.h" #include "paddle/fluid/framework/fleet/heter_ps/heter_resource.h"
#include "paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h" #include "paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h"
...@@ -235,4 +236,9 @@ TEST(TEST_FLEET, test_cpu_cache) { ...@@ -235,4 +236,9 @@ TEST(TEST_FLEET, test_cpu_cache) {
} }
index++; index++;
} }
auto iter = paddle::framework::GraphGpuWrapper::GetInstance();
std::vector<int> device;
device.push_back(0);
device.push_back(1);
iter->set_device(device);
} }
...@@ -327,16 +327,15 @@ void BindNeighborSampleResult(py::module* m) { ...@@ -327,16 +327,15 @@ void BindNeighborSampleResult(py::module* m) {
.def("initialize", &NeighborSampleResult::initialize) .def("initialize", &NeighborSampleResult::initialize)
.def("get_len", &NeighborSampleResult::get_len) .def("get_len", &NeighborSampleResult::get_len)
.def("get_val", &NeighborSampleResult::get_actual_val) .def("get_val", &NeighborSampleResult::get_actual_val)
.def("get_sampled_graph", &NeighborSampleResult::get_sampled_graph)
.def("display", &NeighborSampleResult::display); .def("display", &NeighborSampleResult::display);
} }
void BindGraphGpuWrapper(py::module* m) { void BindGraphGpuWrapper(py::module* m) {
py::class_<GraphGpuWrapper>(*m, "GraphGpuWrapper") py::class_<GraphGpuWrapper, std::shared_ptr<GraphGpuWrapper>>(
// nit<>()) *m, "GraphGpuWrapper")
//.def("test", &GraphGpuWrapper::test) .def(py::init([]() { return GraphGpuWrapper::GetInstance(); }))
//.def(py::init([]() { return framework::GraphGpuWrapper::GetInstance(); // .def(py::init<>())
//}))
.def(py::init<>())
.def("neighbor_sample", &GraphGpuWrapper::graph_neighbor_sample_v3) .def("neighbor_sample", &GraphGpuWrapper::graph_neighbor_sample_v3)
.def("graph_neighbor_sample", &GraphGpuWrapper::graph_neighbor_sample) .def("graph_neighbor_sample", &GraphGpuWrapper::graph_neighbor_sample)
.def("set_device", &GraphGpuWrapper::set_device) .def("set_device", &GraphGpuWrapper::set_device)
...@@ -347,6 +346,8 @@ void BindGraphGpuWrapper(py::module* m) { ...@@ -347,6 +346,8 @@ void BindGraphGpuWrapper(py::module* m) {
.def("load_edge_file", &GraphGpuWrapper::load_edge_file) .def("load_edge_file", &GraphGpuWrapper::load_edge_file)
.def("upload_batch", &GraphGpuWrapper::upload_batch) .def("upload_batch", &GraphGpuWrapper::upload_batch)
.def("get_all_id", &GraphGpuWrapper::get_all_id) .def("get_all_id", &GraphGpuWrapper::get_all_id)
.def("init_sample_status", &GraphGpuWrapper::init_sample_status)
.def("free_sample_status", &GraphGpuWrapper::free_sample_status)
.def("load_next_partition", &GraphGpuWrapper::load_next_partition) .def("load_next_partition", &GraphGpuWrapper::load_next_partition)
.def("make_partitions", &GraphGpuWrapper::make_partitions) .def("make_partitions", &GraphGpuWrapper::make_partitions)
.def("make_complementary_graph", .def("make_complementary_graph",
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册