diff --git a/paddle/fluid/distributed/ps/table/common_graph_table.cc b/paddle/fluid/distributed/ps/table/common_graph_table.cc index a9cd0021c85780f025b73e26a2589f0f6a41cc6f..a3fa80b3865e4ca54178b226bfdb1ccae234ed1d 100644 --- a/paddle/fluid/distributed/ps/table/common_graph_table.cc +++ b/paddle/fluid/distributed/ps/table/common_graph_table.cc @@ -28,6 +28,22 @@ namespace paddle { namespace distributed { #ifdef PADDLE_WITH_HETERPS +int32_t GraphTable::Load_to_ssd(const std::string &path, + const std::string ¶m) { + bool load_edge = (param[0] == 'e'); + bool load_node = (param[0] == 'n'); + if (load_edge) { + bool reverse_edge = (param[1] == '<'); + std::string edge_type = param.substr(2); + return this->load_edges_to_ssd(path, reverse_edge, edge_type); + } + if (load_node) { + std::string node_type = param.substr(1); + return this->load_nodes(path, node_type); + } + return 0; +} + paddle::framework::GpuPsCommGraph GraphTable::make_gpu_ps_graph( int idx, std::vector ids) { std::vector> bags(task_pool_size_); @@ -38,11 +54,11 @@ paddle::framework::GpuPsCommGraph GraphTable::make_gpu_ps_graph( std::vector> tasks; std::vector edge_array[task_pool_size_]; std::vector node_array[task_pool_size_]; - for (int i = 0; i < (int)bags.size(); i++) { + for (size_t i = 0; i < bags.size(); i++) { if (bags[i].size() > 0) { tasks.push_back(_shards_task_pool[i]->enqueue([&, i, this]() -> int { paddle::framework::GpuPsGraphNode x; - for (int j = 0; j < (int)bags[i].size(); j++) { + for (size_t j = 0; j < bags[i].size(); j++) { Node *v = find_node(0, idx, bags[i][j]); x.node_id = bags[i][j]; if (v == NULL) { @@ -53,7 +69,7 @@ paddle::framework::GpuPsCommGraph GraphTable::make_gpu_ps_graph( x.neighbor_size = v->get_neighbor_size(); x.neighbor_offset = edge_array[i].size(); node_array[i].push_back(x); - for (int k = 0; k < x.neighbor_size; k++) { + for (size_t k = 0; k < x.neighbor_size; k++) { edge_array[i].push_back(v->get_neighbor_id(k)); } } @@ -64,27 +80,29 @@ paddle::framework::GpuPsCommGraph GraphTable::make_gpu_ps_graph( } for (int i = 0; i < (int)tasks.size(); i++) tasks[i].get(); paddle::framework::GpuPsCommGraph res; - int tot_len = 0; + unsigned int tot_len = 0; for (int i = 0; i < task_pool_size_; i++) { - tot_len += (int)edge_array[i].size(); - } - res.neighbor_size = tot_len; - res.node_size = ids.size(); - res.neighbor_list = new int64_t[tot_len]; - res.node_list = new paddle::framework::GpuPsGraphNode[ids.size()]; - int offset = 0, ind = 0; + tot_len += edge_array[i].size(); + } + // res.neighbor_size = tot_len; + // res.node_size = ids.size(); + // res.neighbor_list = new int64_t[tot_len]; + // res.node_list = new paddle::framework::GpuPsGraphNode[ids.size()]; + res.init_on_cpu(tot_len, (unsigned int)ids.size()); + unsigned int offset = 0, ind = 0; for (int i = 0; i < task_pool_size_; i++) { for (int j = 0; j < (int)node_array[i].size(); j++) { res.node_list[ind] = node_array[i][j]; res.node_list[ind++].neighbor_offset += offset; } - for (int j = 0; j < (int)edge_array[i].size(); j++) { + for (size_t j = 0; j < edge_array[i].size(); j++) { res.neighbor_list[offset + j] = edge_array[i][j]; } offset += edge_array[i].size(); } return res; } + int32_t GraphTable::add_node_to_ssd(int type_id, int idx, int64_t src_id, char *data, int len) { if (_db != NULL) { @@ -92,8 +110,31 @@ int32_t GraphTable::add_node_to_ssd(int type_id, int idx, int64_t src_id, memcpy(ch, &type_id, sizeof(int)); memcpy(ch + sizeof(int), &idx, sizeof(int)); memcpy(ch + sizeof(int) * 2, &src_id, sizeof(int64_t)); - _db->put(src_id % shard_num % task_pool_size_, ch, - sizeof(int) * 2 + sizeof(int64_t), (char *)data, len); + std::string str; + if (_db->get(src_id % shard_num % task_pool_size_, ch, + sizeof(int) * 2 + sizeof(int64_t), str) == 0) { + int64_t *stored_data = ((int64_t *)str.c_str()); + int n = str.size() / sizeof(int64_t); + char *new_data = new char[n * sizeof(int64_t) + len]; + memcpy(new_data, stored_data, n * sizeof(int64_t)); + memcpy(new_data + n * sizeof(int64_t), data, len); + _db->put(src_id % shard_num % task_pool_size_, ch, + sizeof(int) * 2 + sizeof(int64_t), (char *)new_data, + n * sizeof(int64_t) + len); + delete[] new_data; + } else { + _db->put(src_id % shard_num % task_pool_size_, ch, + sizeof(int) * 2 + sizeof(int64_t), (char *)data, len); + } + _db->flush(src_id % shard_num % task_pool_size_); + std::string x; + // if (_db->get(src_id % shard_num % task_pool_size_, ch, sizeof(int64_t) + + // 2 * sizeof(int), x) ==0){ + // VLOG(0)<<"put result"; + // for(int i = 0;i < x.size();i+=8){ + // VLOG(0)<<"get an id "<<*((int64_t *)(x.c_str() + i)); + // } + //} } return 0; } @@ -109,8 +150,8 @@ char *GraphTable::random_sample_neighbor_from_ssd( memset(ch, 0, sizeof(int)); memcpy(ch + sizeof(int), &idx, sizeof(int)); memcpy(ch + sizeof(int) * 2, &id, sizeof(int64_t)); - if (_db->get(id % shard_num % task_pool_size_, ch, sizeof(uint64_t), str) == - 0) { + if (_db->get(id % shard_num % task_pool_size_, ch, + sizeof(int) * 2 + sizeof(int64_t), str) == 0) { int64_t *data = ((int64_t *)str.c_str()); int n = str.size() / sizeof(int64_t); std::unordered_map m; @@ -142,7 +183,298 @@ char *GraphTable::random_sample_neighbor_from_ssd( actual_size = 0; return NULL; } + +int64_t GraphTable::load_graph_to_memory_from_ssd(int idx, + std::vector &ids) { + std::vector> bags(task_pool_size_); + for (auto x : ids) { + int location = x % shard_num % task_pool_size_; + bags[location].push_back(x); + } + std::vector> tasks; + std::vector count(task_pool_size_, 0); + for (size_t i = 0; i < bags.size(); i++) { + if (bags[i].size() > 0) { + tasks.push_back(_shards_task_pool[i]->enqueue([&, i, idx, this]() -> int { + + char ch[sizeof(int) * 2 + sizeof(int64_t)]; + memset(ch, 0, sizeof(int)); + memcpy(ch + sizeof(int), &idx, sizeof(int)); + for (size_t k = 0; k < bags[i].size(); k++) { + auto v = bags[i][k]; + memcpy(ch + sizeof(int) * 2, &v, sizeof(int64_t)); + std::string str; + if (_db->get(i, ch, sizeof(int) * 2 + sizeof(int64_t), str) == 0) { + count[i] += (int64_t)str.size(); + for (int j = 0; j < str.size(); j += sizeof(int64_t)) { + int64_t id = *(int64_t *)(str.c_str() + j); + add_comm_edge(idx, v, id); + } + } + } + return 0; + })); + } + } + + for (int i = 0; i < (int)tasks.size(); i++) tasks[i].get(); + int64_t tot = 0; + for (auto x : count) tot += x; + return tot; +} + +void GraphTable::make_partitions(int idx, int64_t byte_size, int device_len) { + VLOG(2) << "start to make graph partitions , byte_size = " << byte_size + << " total memory cost = " << total_memory_cost; + if (total_memory_cost == 0) { + VLOG(0) << "no edges are detected,make partitions exits"; + return; + } + const float a = 2.0, y = 1.25; + int64_t gb_size_by_discount = byte_size * 0.8 * device_len; + if (gb_size_by_discount <= 0) gb_size_by_discount = 1; + int part_len = total_memory_cost / gb_size_by_discount; + if (part_len == 0) part_len = 1; + + VLOG(2) << "part_len = " << part_len + << " byte size = " << gb_size_by_discount; + partitions[idx].clear(); + partitions[idx].resize(part_len); + std::vector memory_remaining(part_len, gb_size_by_discount); + std::vector score(part_len, 0); + std::unordered_map id_map; + std::vector iters; + for (int i = 0; i < task_pool_size_; i++) { + iters.push_back(_db->get_iterator(i)); + iters[i]->SeekToFirst(); + } + int next = 0; + while (iters.size()) { + if (next >= iters.size()) { + next = 0; + } + if (!iters[next]->Valid()) { + iters.erase(iters.begin() + next); + continue; + } + std::string key = iters[next]->key().ToString(); + int temp_idx = *(int *)(key.c_str() + sizeof(int)); + if (temp_idx != idx) { + iters[next]->Next(); + next++; + continue; + } + std::string value = iters[next]->value().ToString(); + std::int64_t i_key = *(int64_t *)(key.c_str() + 8); + for (int i = 0; i < part_len; i++) { + if (memory_remaining[i] < (int64_t)value.size()) { + score[i] = -100000.0; + } else { + score[i] = 0; + } + } + for (int j = 0; j < value.size(); j += sizeof(int64_t)) { + int64_t v = *((int64_t *)(value.c_str() + j)); + int index = -1; + if (id_map.find(v) != id_map.end()) { + index = id_map[v]; + score[index]++; + } + } + float base; + int index = 0; + for (int i = 0; i < part_len; i++) { + base = gb_size_by_discount - memory_remaining[i]; + score[i] -= a * y * std::pow(1.0 * base, y - 1); + if (score[i] > score[index]) index = i; + VLOG(2) << "score" << i << " = " << score[i] << " memory left " + << memory_remaining[i]; + } + id_map[i_key] = index; + partitions[idx][index].push_back(i_key); + memory_remaining[index] -= (int64_t)value.size(); + iters[next]->Next(); + next++; + } + for (int i = 0; i < part_len; i++) { + if (partitions[idx][i].size() == 0) { + partitions[idx].erase(partitions[idx].begin() + i); + i--; + part_len--; + continue; + } + VLOG(2) << " partition " << i << " size = " << partitions[idx][i].size(); + for (auto x : partitions[idx][i]) { + VLOG(2) << "find a id " << x; + } + } + next_partition = 0; +} + +void GraphTable::clear_graph(int idx) { + for (auto p : edge_shards[idx]) { + delete p; + } + + edge_shards[idx].clear(); + for (size_t i = 0; i < shard_num_per_server; i++) { + edge_shards[idx].push_back(new GraphShard()); + } +} +int32_t GraphTable::load_next_partition(int idx) { + if (next_partition >= partitions[idx].size()) { + VLOG(0) << "partition iteration is done"; + return -1; + } + clear_graph(idx); + load_graph_to_memory_from_ssd(idx, partitions[idx][next_partition]); + next_partition++; + return 0; +} +int32_t GraphTable::load_edges_to_ssd(const std::string &path, + bool reverse_edge, + const std::string &edge_type) { + int idx = 0; + if (edge_type == "") { + VLOG(0) << "edge_type not specified, loading edges to " << id_to_edge[0] + << " part"; + } else { + if (edge_to_id.find(edge_type) == edge_to_id.end()) { + VLOG(0) << "edge_type " << edge_type + << " is not defined, nothing will be loaded"; + return 0; + } + idx = edge_to_id[edge_type]; + } + total_memory_cost = 0; + auto paths = paddle::string::split_string(path, ";"); + int64_t count = 0; + std::string sample_type = "random"; + bool is_weighted = false; + int valid_count = 0; + for (auto path : paths) { + std::ifstream file(path); + std::string line; + while (std::getline(file, line)) { + VLOG(0) << "get a line from file " << line; + auto values = paddle::string::split_string(line, "\t"); + count++; + if (values.size() < 2) continue; + auto src_id = std::stoll(values[0]); + auto dist_ids = paddle::string::split_string(values[1], ";"); + std::vector dist_data; + for (auto x : dist_ids) { + dist_data.push_back(std::stoll(x)); + total_memory_cost += sizeof(int64_t); + } + add_node_to_ssd(0, idx, src_id, (char *)dist_data.data(), + (int)(dist_data.size() * sizeof(int64_t))); + } + } + VLOG(0) << "total memory cost = " << total_memory_cost << " bytes"; + return 0; +} + +int32_t GraphTable::dump_edges_to_ssd(int idx) { + VLOG(0) << "calling dump edges to ssd"; + const int64_t fixed_size = 10000; + // std::vector edge_array[task_pool_size_]; + std::vector> count(task_pool_size_); + std::vector> tasks; + auto &shards = edge_shards[idx]; + for (size_t i = 0; i < shards.size(); ++i) { + tasks.push_back(_shards_task_pool[i % task_pool_size_]->enqueue( + [&, i, this]() -> int64_t { + int64_t cost = 0; + std::vector &v = shards[i]->get_bucket(); + std::vector s; + size_t ind = i % this->task_pool_size_; + for (size_t j = 0; j < v.size(); j++) { + for (int k = 0; k < v[j]->get_neighbor_size(); k++) { + s.push_back(v[j]->get_neighbor_id(k)); + } + cost += v[j]->get_neighbor_size() * sizeof(int64_t); + add_node_to_ssd(0, idx, v[j]->get_id(), (char *)s.data(), + s.size() * sizeof(int64_t)); + } + return cost; + })); + } + for (size_t i = 0; i < tasks.size(); i++) total_memory_cost += tasks[i].get(); + return 0; +} +int32_t GraphTable::make_complementary_graph(int idx, int64_t byte_size) { + VLOG(0) << "make_complementary_graph"; + const int64_t fixed_size = 10000; + // std::vector edge_array[task_pool_size_]; + std::vector> count(task_pool_size_); + std::vector> tasks; + auto &shards = edge_shards[idx]; + for (size_t i = 0; i < shards.size(); ++i) { + tasks.push_back( + _shards_task_pool[i % task_pool_size_]->enqueue([&, i, this]() -> int { + std::vector &v = shards[i]->get_bucket(); + size_t ind = i % this->task_pool_size_; + for (size_t j = 0; j < v.size(); j++) { + size_t location = v[j]->get_id(); + for (int k = 0; k < v[j]->get_neighbor_size(); k++) { + count[ind][v[j]->get_neighbor_id(k)]++; + } + } + return 0; + })); + } + + std::unordered_map final_count; + std::map> count_to_id; + std::vector buffer; + for (auto p : edge_shards[idx]) { + delete p; + } + + edge_shards[idx].clear(); + for (size_t i = 0; i < shard_num_per_server; i++) { + edge_shards[idx].push_back(new GraphShard()); + } + for (size_t i = 0; i < tasks.size(); i++) tasks[i].get(); + for (int i = 0; i < task_pool_size_; i++) { + for (auto &p : count[i]) { + final_count[p.first] = final_count[p.first] + p.second; + } + count[i].clear(); + } + for (auto &p : final_count) { + count_to_id[p.second].push_back(p.first); + VLOG(2) << p.first << " appear " << p.second << " times"; + } + // std::map>::iterator iter= count_to_id.rbegin(); + auto iter = count_to_id.rbegin(); + while (iter != count_to_id.rend() && byte_size > 0) { + for (auto x : iter->second) { + buffer.push_back(x); + if (buffer.size() >= fixed_size) { + int64_t res = load_graph_to_memory_from_ssd(idx, buffer); + byte_size -= res; + } + if (byte_size <= 0) break; + } + iter++; + } + if (byte_size > 0 && buffer.size() > 0) { + int64_t res = load_graph_to_memory_from_ssd(idx, buffer); + byte_size -= res; + } + std::string sample_type = "random"; + for (auto &shard : edge_shards[idx]) { + auto bucket = shard->get_bucket(); + for (size_t i = 0; i < bucket.size(); i++) { + bucket[i]->build_sampler(sample_type); + } + } + return 0; +} #endif + /* int CompleteGraphSampler::run_graph_sampling() { pthread_rwlock_t *rw_lock = graph_table->rw_lock.get(); @@ -700,9 +1032,11 @@ int32_t GraphTable::build_sampler(int idx, std::string sample_type) { } int32_t GraphTable::load_edges(const std::string &path, bool reverse_edge, const std::string &edge_type) { - // #ifdef PADDLE_WITH_HETERPS - // if (gpups_mode) pthread_rwlock_rdlock(rw_lock.get()); - // #endif +#ifdef PADDLE_WITH_HETERPS + // if (gpups_mode) pthread_rwlock_rdlock(rw_lock.get()); + if (search_level == 2) total_memory_cost = 0; + const int64_t fixed_load_edges = 1000000; +#endif int idx = 0; if (edge_type == "") { VLOG(0) << "edge_type not specified, loading edges to " << id_to_edge[0] @@ -715,6 +1049,7 @@ int32_t GraphTable::load_edges(const std::string &path, bool reverse_edge, } idx = edge_to_id[edge_type]; } + auto paths = paddle::string::split_string(path, ";"); int64_t count = 0; std::string sample_type = "random"; @@ -756,13 +1091,33 @@ int32_t GraphTable::load_edges(const std::string &path, bool reverse_edge, edge_shards[idx][index]->add_graph_node(src_id)->build_edges(is_weighted); edge_shards[idx][index]->add_neighbor(src_id, dst_id, weight); valid_count++; +#ifdef PADDLE_WITH_HETERPS + // if (gpups_mode) pthread_rwlock_rdlock(rw_lock.get()); + if (count > fixed_load_edges && search_level == 2) { + dump_edges_to_ssd(idx); + VLOG(0) << "dumping edges to ssd, edge count is reset to 0"; + clear_graph(idx); + count = 0; + } +#endif } } VLOG(0) << valid_count << "/" << count << " edges are loaded successfully in " << path; - // Build Sampler j - +// Build Sampler j +#ifdef PADDLE_WITH_HETERPS + // if (gpups_mode) pthread_rwlock_rdlock(rw_lock.get()); + if (search_level == 2) { + if (count > 0) { + dump_edges_to_ssd(idx); + VLOG(0) << "dumping edges to ssd, edge count is reset to 0"; + clear_graph(idx); + count = 0; + } + return 0; + } +#endif for (auto &shard : edge_shards[idx]) { auto bucket = shard->get_bucket(); for (size_t i = 0; i < bucket.size(); i++) { @@ -892,7 +1247,6 @@ int32_t GraphTable::random_sample_neighbors( scaled_lru->query(i, id_list[i].data(), id_list[i].size(), r); } int index = 0; - uint32_t idx; std::vector sample_res; std::vector sample_keys; auto &rng = _shards_task_rng_pool[i]; @@ -911,6 +1265,7 @@ int32_t GraphTable::random_sample_neighbors( if (node == nullptr) { #ifdef PADDLE_WITH_HETERPS if (search_level == 2) { + VLOG(2) << "enter sample from ssd"; char *buffer_addr = random_sample_neighbor_from_ssd( idx, node_id, sample_size, rng, actual_size); if (actual_size != 0) { @@ -1060,6 +1415,26 @@ std::pair GraphTable::parse_feature( return std::make_pair(-1, ""); } +std::vector> GraphTable::get_all_id(int type_id, int idx, + int slice_num) { + std::vector> res(slice_num); + auto &search_shards = type_id == 0 ? edge_shards[idx] : feature_shards[idx]; + std::vector>> tasks; + for (int i = 0; i < search_shards.size(); i++) { + tasks.push_back(_shards_task_pool[i % task_pool_size_]->enqueue( + [&search_shards, i]() -> std::vector { + return search_shards[i]->get_all_id(); + })); + } + for (size_t i = 0; i < tasks.size(); ++i) { + tasks[i].wait(); + } + for (size_t i = 0; i < tasks.size(); i++) { + auto ids = tasks[i].get(); + for (auto &id : ids) res[id % slice_num].push_back(id); + } + return res; +} int32_t GraphTable::pull_graph_list(int type_id, int idx, int start, int total_size, std::unique_ptr &buffer, @@ -1218,6 +1593,9 @@ int32_t GraphTable::Initialize(const GraphParameter &graph) { VLOG(0) << "in init graph table shard idx = " << _shard_idx << " shard_start " << shard_start << " shard_end " << shard_end; edge_shards.resize(id_to_edge.size()); +#ifdef PADDLE_WITH_HETERPS + partitions.resize(id_to_edge.size()); +#endif for (int k = 0; k < (int)edge_shards.size(); k++) { for (size_t i = 0; i < shard_num_per_server; i++) { edge_shards[k].push_back(new GraphShard()); diff --git a/paddle/fluid/distributed/ps/table/common_graph_table.h b/paddle/fluid/distributed/ps/table/common_graph_table.h index 89a626ae943b02538212ca26b87a63b7ce1c6b06..b3a280bb2cefc0613d1b00b8565e4def42723dbb 100644 --- a/paddle/fluid/distributed/ps/table/common_graph_table.h +++ b/paddle/fluid/distributed/ps/table/common_graph_table.h @@ -63,7 +63,13 @@ class GraphShard { } return res; } - + std::vector get_all_id() { + std::vector res; + for (int i = 0; i < (int)bucket.size(); i++) { + res.push_back(bucket[i]->get_id()); + } + return res; + } GraphNode *add_graph_node(int64_t id); GraphNode *add_graph_node(Node *node); FeatureNode *add_feature_node(int64_t id); @@ -420,6 +426,10 @@ class GraphTable : public Table { use_cache = false; shard_num = 0; rw_lock.reset(new pthread_rwlock_t()); +#ifdef PADDLE_WITH_HETERPS + next_partition = 0; + total_memory_cost = 0; +#endif } virtual ~GraphTable(); @@ -465,6 +475,8 @@ class GraphTable : public Table { int32_t load_edges(const std::string &path, bool reverse, const std::string &edge_type); + std::vector> get_all_id(int type, int idx, + int slice_num); int32_t load_nodes(const std::string &path, std::string node_type); int32_t add_graph_node(int idx, std::vector &id_list, @@ -513,7 +525,7 @@ class GraphTable : public Table { const std::vector> &res); size_t get_server_num() { return server_num; } - + void clear_graph(int idx); virtual int32_t make_neighbor_sample_cache(size_t size_limit, size_t ttl) { { std::unique_lock lock(mutex_); @@ -538,6 +550,7 @@ class GraphTable : public Table { // graph_sampler->set_graph_sample_callback(callback); // return 0; // } + virtual void make_partitions(int idx, int64_t gb_size, int device_len); virtual char *random_sample_neighbor_from_ssd( int idx, int64_t id, int sample_size, const std::shared_ptr rng, int &actual_size); @@ -545,8 +558,25 @@ class GraphTable : public Table { char *data, int len); virtual paddle::framework::GpuPsCommGraph make_gpu_ps_graph( int idx, std::vector ids); + int32_t Load_to_ssd(const std::string &path, const std::string ¶m); + int64_t load_graph_to_memory_from_ssd(int idx, std::vector &ids); + int32_t make_complementary_graph(int idx, int64_t byte_size); + int32_t dump_edges_to_ssd(int idx); + int32_t get_partition_num(int idx) { return partitions[idx].size(); } + std::vector get_partition(int idx, int index) { + if (idx >= partitions.size() || index >= partitions[idx].size()) + return std::vector(); + return partitions[idx][index]; + } + int32_t load_edges_to_ssd(const std::string &path, bool reverse_edge, + const std::string &edge_type); + int32_t load_next_partition(int idx); + void set_search_level(int search_level) { this->search_level = search_level; } // virtual GraphSampler *get_graph_sampler() { return graph_sampler.get(); } int search_level; + int64_t total_memory_cost; + std::vector>> partitions; + int next_partition; #endif virtual int32_t add_comm_edge(int idx, int64_t src_id, int64_t dst_id); virtual int32_t build_sampler(int idx, std::string sample_type = "random"); diff --git a/paddle/fluid/framework/fleet/heter_ps/gpu_graph_node.h b/paddle/fluid/framework/fleet/heter_ps/gpu_graph_node.h index a8fde3f36bc6d892e564f2308802ef79a64681a6..e7601edb0ca070ae92b9dd947c1888352434a8e3 100644 --- a/paddle/fluid/framework/fleet/heter_ps/gpu_graph_node.h +++ b/paddle/fluid/framework/fleet/heter_ps/gpu_graph_node.h @@ -24,7 +24,7 @@ namespace paddle { namespace framework { struct GpuPsGraphNode { int64_t node_id; - int neighbor_size, neighbor_offset; + unsigned int neighbor_size, neighbor_offset; // this node's neighbor is stored on [neighbor_offset,neighbor_offset + // neighbor_size) of int64_t *neighbor_list; }; @@ -32,28 +32,38 @@ struct GpuPsGraphNode { struct GpuPsCommGraph { int64_t *neighbor_list; GpuPsGraphNode *node_list; - int neighbor_size, node_size; + unsigned int neighbor_size, node_size; // the size of neighbor array and graph_node_list array GpuPsCommGraph() : neighbor_list(NULL), node_list(NULL), neighbor_size(0), node_size(0) {} GpuPsCommGraph(int64_t *neighbor_list_, GpuPsGraphNode *node_list_, - int neighbor_size_, int node_size_) + unsigned int neighbor_size_, unsigned int node_size_) : neighbor_list(neighbor_list_), node_list(node_list_), neighbor_size(neighbor_size_), node_size(node_size_) {} + void init_on_cpu(unsigned int neighbor_size, unsigned int node_size) { + this->neighbor_size = neighbor_size; + this->node_size = node_size; + this->neighbor_list = new int64_t[neighbor_size]; + this->node_list = new paddle::framework::GpuPsGraphNode[node_size]; + } + void release_on_cpu() { + delete[] neighbor_list; + delete[] node_list; + } void display_on_cpu() { VLOG(0) << "neighbor_size = " << neighbor_size; VLOG(0) << "node_size = " << node_size; - for (int i = 0; i < neighbor_size; i++) { + for (size_t i = 0; i < neighbor_size; i++) { VLOG(0) << "neighbor " << i << " " << neighbor_list[i]; } - for (int i = 0; i < node_size; i++) { + for (size_t i = 0; i < node_size; i++) { VLOG(0) << "node i " << node_list[i].node_id << " neighbor_size = " << node_list[i].neighbor_size; std::string str; int offset = node_list[i].neighbor_offset; - for (int j = 0; j < node_list[i].neighbor_size; j++) { + for (size_t j = 0; j < node_list[i].neighbor_size; j++) { if (j > 0) str += ","; str += std::to_string(neighbor_list[j + offset]); } @@ -139,12 +149,18 @@ struct NeighborSampleQuery { }; struct NeighborSampleResult { int64_t *val; + int64_t *actual_val; int *actual_sample_size, sample_size, key_size; + int total_sample_size; std::shared_ptr val_mem, actual_sample_size_mem; + std::shared_ptr actual_val_mem; int64_t *get_val() { return val; } + int64_t get_actual_val() { return (int64_t)actual_val; } int *get_actual_sample_size() { return actual_sample_size; } int get_sample_size() { return sample_size; } int get_key_size() { return key_size; } + void set_total_sample_size(int s) { total_sample_size = s; } + int get_len() { return total_sample_size; } void initialize(int _sample_size, int _key_size, int dev_id) { sample_size = _sample_size; key_size = _key_size; @@ -165,18 +181,30 @@ struct NeighborSampleResult { int *ac_size = new int[key_size]; cudaMemcpy(ac_size, actual_sample_size, key_size * sizeof(int), cudaMemcpyDeviceToHost); // 3, 1, 3 + int total_sample_size = 0; + for (int i = 0; i < key_size; i++) { + total_sample_size += ac_size[i]; + } + int64_t *res2 = new int64_t[total_sample_size]; // r + cudaMemcpy(res2, actual_val, total_sample_size * sizeof(int64_t), + cudaMemcpyDeviceToHost); // r + int start = 0; for (int i = 0; i < key_size; i++) { VLOG(0) << "actual sample size for " << i << "th key is " << ac_size[i]; VLOG(0) << "sampled neighbors are "; - std::string neighbor; + std::string neighbor, neighbor2; for (int j = 0; j < ac_size[i]; j++) { - if (neighbor.size() > 0) neighbor += ";"; - neighbor += std::to_string(res[i * sample_size + j]); + // if (neighbor.size() > 0) neighbor += ";"; + if (neighbor2.size() > 0) neighbor2 += ";"; // r + // neighbor += std::to_string(res[i * sample_size + j]); + neighbor2 += std::to_string(res2[start + j]); // r } - VLOG(0) << neighbor; + VLOG(0) << neighbor << " " << neighbor2; + start += ac_size[i]; // r } delete[] res; + delete[] res2; // r delete[] ac_size; VLOG(0) << " ------------------"; } diff --git a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table.h b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table.h index 7e5aa402677674bb5fc31aed1953ec40b8db484d..8a0088114e2ec35cc6b90114a756a126940fb1b0 100644 --- a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table.h +++ b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table.h @@ -23,13 +23,18 @@ #ifdef PADDLE_WITH_HETERPS namespace paddle { namespace framework { -class GpuPsGraphTable : public HeterComm { +class GpuPsGraphTable : public HeterComm { public: GpuPsGraphTable(std::shared_ptr resource, int topo_aware) - : HeterComm(1, resource) { + : HeterComm(1, resource) { load_factor_ = 0.25; rw_lock.reset(new pthread_rwlock_t()); gpu_num = resource_->total_device(); + for (int i = 0; i < gpu_num; i++) { + gpu_graph_list.push_back(GpuPsCommGraph()); + sample_status.push_back(NULL); + tables_.push_back(NULL); + } cpu_table_status = -1; if (topo_aware) { int total_gpu = resource_->total_device(); @@ -82,6 +87,8 @@ class GpuPsGraphTable : public HeterComm { // end_graph_sampling(); // } } + void build_graph_on_single_gpu(GpuPsCommGraph &g, int gpu_id); + void clear_graph_info(int gpu_id); void build_graph_from_cpu(std::vector &cpu_node_list); NodeQueryResult graph_node_sample(int gpu_id, int sample_size); NeighborSampleResult graph_neighbor_sample_v3(NeighborSampleQuery q, diff --git a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table_inl.h b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table_inl.h index 1c59f318517d0ded14336f2095335ad493592a8d..605019cb607fc41bd32bd9053128fd4791bb2c40 100644 --- a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table_inl.h +++ b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table_inl.h @@ -13,6 +13,8 @@ // limitations under the License. #include +#include +#include #include #pragma once #ifdef PADDLE_WITH_HETERPS @@ -30,10 +32,11 @@ sample_result is to save the neighbor sampling result, its size is len * sample_size; */ -__global__ void get_cpu_id_index(int64_t* key, int* val, int64_t* cpu_key, - int* sum, int* index, int len) { +__global__ void get_cpu_id_index(int64_t* key, unsigned int* val, + int64_t* cpu_key, int* sum, int* index, + int len) { CUDA_KERNEL_LOOP(i, len) { - if (val[i] == -1) { + if (val[i] == ((unsigned int)-1)) { int old = atomicAdd(sum, 1); cpu_key[old] = key[i]; index[old] = i; @@ -43,9 +46,9 @@ __global__ void get_cpu_id_index(int64_t* key, int* val, int64_t* cpu_key, template __global__ void neighbor_sample_example_v2(GpuPsCommGraph graph, - int* node_index, int* actual_size, - int64_t* res, int sample_len, - int n) { + unsigned int* node_index, + int* actual_size, int64_t* res, + int sample_len, int n) { assert(blockDim.x == WARP_SIZE); assert(blockDim.y == BLOCK_WARPS); @@ -55,7 +58,7 @@ __global__ void neighbor_sample_example_v2(GpuPsCommGraph graph, curand_init(blockIdx.x, threadIdx.y * WARP_SIZE + threadIdx.x, 0, &rng); while (i < last_idx) { - if (node_index[i] == -1) { + if (node_index[i] == (unsigned int)(-1)) { actual_size[i] = 0; i += BLOCK_WARPS; continue; @@ -92,13 +95,14 @@ __global__ void neighbor_sample_example_v2(GpuPsCommGraph graph, } } -__global__ void neighbor_sample_example(GpuPsCommGraph graph, int* node_index, +__global__ void neighbor_sample_example(GpuPsCommGraph graph, + unsigned int* node_index, int* actual_size, int64_t* res, int sample_len, int* sample_status, int n, int from) { int id = blockIdx.x * blockDim.y + threadIdx.y; if (id < n) { - if (node_index[id] == -1) { + if (node_index[id] == (unsigned int)(-1)) { actual_size[id] = 0; return; } @@ -374,6 +378,18 @@ __global__ void fill_dvalues(int64_t* d_shard_vals, int64_t* d_vals, } } +__global__ void fill_actual_vals(int64_t* vals, int64_t* actual_vals, + int* actual_sample_size, + int* cumsum_actual_sample_size, + int sample_size, int len) { + const size_t i = blockIdx.x * blockDim.x + threadIdx.x; + if (i < len) { + for (int j = 0; j < actual_sample_size[i]; j++) { + actual_vals[cumsum_actual_sample_size[i] + j] = vals[sample_size * i + j]; + } + } +} + __global__ void node_query_example(GpuPsCommGraph graph, int start, int size, int64_t* res) { const size_t i = blockIdx.x * blockDim.x + threadIdx.x; @@ -382,6 +398,18 @@ __global__ void node_query_example(GpuPsCommGraph graph, int start, int size, } } +void GpuPsGraphTable::clear_graph_info(int gpu_id) { + if (tables_.size() && tables_[gpu_id] != NULL) { + delete tables_[gpu_id]; + } + auto& graph = gpu_graph_list[gpu_id]; + if (graph.neighbor_list != NULL) { + cudaFree(graph.neighbor_list); + } + if (graph.node_list != NULL) { + cudaFree(graph.node_list); + } +} void GpuPsGraphTable::clear_graph_info() { if (tables_.size()) { for (auto table : tables_) delete table; @@ -406,6 +434,46 @@ In this function, memory is allocated on each gpu to save the graphs, gpu i saves the ith graph from cpu_graph_list */ +void GpuPsGraphTable::build_graph_on_single_gpu(GpuPsCommGraph& g, int i) { + clear_graph_info(i); + platform::CUDADeviceGuard guard(resource_->dev_id(i)); + // platform::CUDADeviceGuard guard(i); + gpu_graph_list[i] = GpuPsCommGraph(); + sample_status[i] = NULL; + tables_[i] = new Table(std::max((unsigned int)1, g.node_size) / load_factor_); + if (g.node_size > 0) { + std::vector keys; + std::vector offset; + cudaMalloc((void**)&gpu_graph_list[i].node_list, + g.node_size * sizeof(GpuPsGraphNode)); + cudaMemcpy(gpu_graph_list[i].node_list, g.node_list, + g.node_size * sizeof(GpuPsGraphNode), cudaMemcpyHostToDevice); + for (unsigned int j = 0; j < g.node_size; j++) { + keys.push_back(g.node_list[j].node_id); + offset.push_back(j); + } + build_ps(i, keys.data(), offset.data(), keys.size(), 1024, 8); + gpu_graph_list[i].node_size = g.node_size; + } else { + build_ps(i, NULL, NULL, 0, 1024, 8); + gpu_graph_list[i].node_list = NULL; + gpu_graph_list[i].node_size = 0; + } + if (g.neighbor_size) { + int* addr; + cudaMalloc((void**)&addr, g.neighbor_size * sizeof(int)); + cudaMemset(addr, 0, g.neighbor_size * sizeof(int)); + sample_status[i] = addr; + cudaMalloc((void**)&gpu_graph_list[i].neighbor_list, + g.neighbor_size * sizeof(int64_t)); + cudaMemcpy(gpu_graph_list[i].neighbor_list, g.neighbor_list, + g.neighbor_size * sizeof(int64_t), cudaMemcpyHostToDevice); + gpu_graph_list[i].neighbor_size = g.neighbor_size; + } else { + gpu_graph_list[i].neighbor_list = NULL; + gpu_graph_list[i].neighbor_size = 0; + } +} void GpuPsGraphTable::build_graph_from_cpu( std::vector& cpu_graph_list) { VLOG(0) << "in build_graph_from_cpu cpu_graph_list size = " @@ -418,20 +486,21 @@ void GpuPsGraphTable::build_graph_from_cpu( for (int i = 0; i < cpu_graph_list.size(); i++) { platform::CUDADeviceGuard guard(resource_->dev_id(i)); // platform::CUDADeviceGuard guard(i); - gpu_graph_list.push_back(GpuPsCommGraph()); - sample_status.push_back(NULL); - auto table = - new Table(std::max(1, cpu_graph_list[i].node_size) / load_factor_); - tables_.push_back(table); + gpu_graph_list[i] = GpuPsCommGraph(); + sample_status[i] = NULL; + // auto table = + // new Table(std::max(1, cpu_graph_list[i].node_size) / load_factor_); + tables_[i] = new Table( + std::max((unsigned int)1, cpu_graph_list[i].node_size) / load_factor_); if (cpu_graph_list[i].node_size > 0) { std::vector keys; - std::vector offset; + std::vector offset; cudaMalloc((void**)&gpu_graph_list[i].node_list, cpu_graph_list[i].node_size * sizeof(GpuPsGraphNode)); cudaMemcpy(gpu_graph_list[i].node_list, cpu_graph_list[i].node_list, cpu_graph_list[i].node_size * sizeof(GpuPsGraphNode), cudaMemcpyHostToDevice); - for (int j = 0; j < cpu_graph_list[i].node_size; j++) { + for (unsigned int j = 0; j < cpu_graph_list[i].node_size; j++) { keys.push_back(cpu_graph_list[i].node_list[j].node_id); offset.push_back(j); } @@ -597,15 +666,15 @@ NeighborSampleResult GpuPsGraphTable::graph_neighbor_sample(int gpu_id, // use the key-value map to update alloc_mem_i[0,shard_len) // tables_[i]->rwlock_->RDLock(); tables_[i]->get(reinterpret_cast(node.key_storage), - reinterpret_cast(node.val_storage), + reinterpret_cast(node.val_storage), h_right[i] - h_left[i] + 1, resource_->remote_stream(i, gpu_id)); // node.in_stream); int shard_len = h_right[i] - h_left[i] + 1; auto graph = gpu_graph_list[i]; - int* id_array = reinterpret_cast(node.val_storage); - int* actual_size_array = id_array + shard_len; - int64_t* sample_array = (int64_t*)(id_array + shard_len * 2); + unsigned int* id_array = reinterpret_cast(node.val_storage); + int* actual_size_array = (int*)(id_array + shard_len); + int64_t* sample_array = (int64_t*)(actual_size_array + shard_len); int sample_grid_size = (shard_len - 1) / dim_y + 1; dim3 block(parallel_sample_size, dim_y); dim3 grid(sample_grid_size); @@ -738,6 +807,8 @@ NeighborSampleResult GpuPsGraphTable::graph_neighbor_sample_v2( if (shard_len == 0) { continue; } + // create_storage(gpu_id, i, shard_len * sizeof(int64_t), + // shard_len * (1 + sample_size) * sizeof(int64_t)); create_storage(gpu_id, i, shard_len * sizeof(int64_t), shard_len * (1 + sample_size) * sizeof(int64_t)); } @@ -760,15 +831,18 @@ NeighborSampleResult GpuPsGraphTable::graph_neighbor_sample_v2( platform::CUDADeviceGuard guard(resource_->dev_id(i)); // If not found, val is -1. tables_[i]->get(reinterpret_cast(node.key_storage), - reinterpret_cast(node.val_storage), + reinterpret_cast(node.val_storage), h_right[i] - h_left[i] + 1, resource_->remote_stream(i, gpu_id)); auto shard_len = h_right[i] - h_left[i] + 1; auto graph = gpu_graph_list[i]; - int* id_array = reinterpret_cast(node.val_storage); - int* actual_size_array = id_array + shard_len; - int64_t* sample_array = (int64_t*)(id_array + shard_len * 2); + // int* id_array = reinterpret_cast(node.val_storage); + // int* actual_size_array = id_array + shard_len; + // int64_t* sample_array = (int64_t*)(id_array + shard_len * 2); + unsigned int* id_array = reinterpret_cast(node.val_storage); + int* actual_size_array = (int*)(id_array + shard_len); + int64_t* sample_array = (int64_t*)(actual_size_array + shard_len); constexpr int WARP_SIZE = 32; constexpr int BLOCK_WARPS = 128 / WARP_SIZE; constexpr int TILE_SIZE = BLOCK_WARPS * 16; @@ -846,6 +920,28 @@ NeighborSampleResult GpuPsGraphTable::graph_neighbor_sample_v2( fill_dvalues<<>>( d_shard_vals_ptr, val, d_shard_actual_sample_size_ptr, actual_sample_size, d_idx_ptr, sample_size, len); + + { + platform::CUDAPlace place = platform::CUDAPlace(resource_->dev_id(gpu_id)); + platform::CUDADeviceGuard guard(resource_->dev_id(gpu_id)); + thrust::device_ptr t_actual_sample_size(actual_sample_size); + int total_sample_size = + thrust::reduce(t_actual_sample_size, t_actual_sample_size + len); + result.actual_val_mem = + memory::AllocShared(place, total_sample_size * sizeof(int64_t)); + result.actual_val = (int64_t*)(result.actual_val_mem)->ptr(); + + result.set_total_sample_size(total_sample_size); + + thrust::device_vector cumsum_actual_sample_size(len); + thrust::exclusive_scan(t_actual_sample_size, t_actual_sample_size + len, + cumsum_actual_sample_size.begin(), 0); + fill_actual_vals<<>>( + val, result.actual_val, actual_sample_size, + thrust::raw_pointer_cast(cumsum_actual_sample_size.data()), sample_size, + len); + } + for (int i = 0; i < total_gpu; ++i) { int shard_len = h_left[i] == -1 ? 0 : h_right[i] - h_left[i] + 1; if (shard_len == 0) { @@ -868,13 +964,10 @@ NodeQueryResult GpuPsGraphTable::query_node_list(int gpu_id, int start, if (query_size <= 0) return result; int& actual_size = result.actual_sample_size; actual_size = 0; - result.initialize(query_size, resource_->dev_id(gpu_id)); - int64_t* val = result.val; // int dev_id = resource_->dev_id(gpu_id); // platform::CUDADeviceGuard guard(dev_id); - platform::CUDADeviceGuard guard(resource_->dev_id(gpu_id)); - std::vector idx, gpu_begin_pos, local_begin_pos, sample_size; - int size = 0; + std::vector idx, gpu_begin_pos, local_begin_pos; + int sample_size; /* if idx[i] = a, gpu_begin_pos[i] = p1, gpu_local_begin_pos[i] = p2; @@ -898,6 +991,31 @@ NodeQueryResult GpuPsGraphTable::query_node_list(int gpu_id, int start, x2 = max(x1, x); return y2 - x2; }; + auto graph = gpu_graph_list[gpu_id]; + if (graph.node_size == 0) { + return result; + } + int x2, y2; + int len = range_check(start, start + query_size, 0, graph.node_size, x2, y2); + + if (len == 0) { + return result; + } + int64_t* val; + sample_size = len; + result.initialize(len, resource_->dev_id(gpu_id)); + actual_size = len; + val = result.val; + int dev_id_i = resource_->dev_id(gpu_id); + platform::CUDADeviceGuard guard(dev_id_i); + // platform::CUDADeviceGuard guard(i); + int grid_size = (len - 1) / block_size_ + 1; + node_query_example<<remote_stream(gpu_id, gpu_id)>>>( + gpu_graph_list[gpu_id], x2, len, (int64_t*)val); + cudaStreamSynchronize(resource_->remote_stream(gpu_id, gpu_id)); + return result; + /* for (int i = 0; i < gpu_graph_list.size() && query_size != 0; i++) { auto graph = gpu_graph_list[i]; if (graph.node_size == 0) { @@ -943,6 +1061,7 @@ NodeQueryResult GpuPsGraphTable::query_node_list(int gpu_id, int start, destroy_storage(gpu_id, x); } return result; + */ } } }; diff --git a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu index b0899b4a7f5b3f810a9a1f15f213f1282c996467..93854d7f1ec3f6a6ffa454a4b65c8adcafd77002 100644 --- a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu +++ b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu @@ -58,6 +58,11 @@ void GraphGpuWrapper::set_device(std::vector ids) { device_id_mapping.push_back(device_id); } } +std::vector> GraphGpuWrapper::get_all_id(int type, int idx, + int slice_num) { + return ((GpuPsGraphTable *)graph_table) + ->cpu_graph_table->get_all_id(type, idx, slice_num); +} void GraphGpuWrapper::set_up_types(std::vector &edge_types, std::vector &node_types) { id_to_edge = edge_types; @@ -76,6 +81,32 @@ void GraphGpuWrapper::set_up_types(std::vector &edge_types, this->table_feat_conf_feat_shape.resize(node_types.size()); } +void GraphGpuWrapper::make_partitions(int idx, int64_t byte_size, + int device_len) { + ((GpuPsGraphTable *)graph_table) + ->cpu_graph_table->make_partitions(idx, byte_size, device_len); +} +int32_t GraphGpuWrapper::load_next_partition(int idx) { + return ((GpuPsGraphTable *)graph_table) + ->cpu_graph_table->load_next_partition(idx); +} + +void GraphGpuWrapper::set_search_level(int level) { + ((GpuPsGraphTable *)graph_table)->cpu_graph_table->set_search_level(level); +} + +std::vector GraphGpuWrapper::get_partition(int idx, int num) { + return ((GpuPsGraphTable *)graph_table) + ->cpu_graph_table->get_partition(idx, num); +} +int32_t GraphGpuWrapper::get_partition_num(int idx) { + return ((GpuPsGraphTable *)graph_table) + ->cpu_graph_table->get_partition_num(idx); +} +void GraphGpuWrapper::make_complementary_graph(int idx, int64_t byte_size) { + ((GpuPsGraphTable *)graph_table) + ->cpu_graph_table->make_complementary_graph(idx, byte_size); +} void GraphGpuWrapper::load_edge_file(std::string name, std::string filepath, bool reverse) { // 'e' means load edge @@ -132,10 +163,11 @@ void GraphGpuWrapper::add_table_feat_conf(std::string table_name, } VLOG(0) << "add conf over"; } +void GraphGpuWrapper::init_search_level(int level) { search_level = level; } void GraphGpuWrapper::init_service() { table_proto.set_task_pool_size(24); - + table_proto.set_search_level(search_level); table_proto.set_table_name("cpu_graph_table"); table_proto.set_use_cache(false); for (int i = 0; i < id_to_edge.size(); i++) @@ -161,11 +193,16 @@ void GraphGpuWrapper::init_service() { void GraphGpuWrapper::upload_batch(int idx, std::vector> &ids) { GpuPsGraphTable *g = (GpuPsGraphTable *)graph_table; - std::vector vec; + // std::vector vec; for (int i = 0; i < ids.size(); i++) { - vec.push_back(g->cpu_graph_table->make_gpu_ps_graph(idx, ids[i])); + // vec.push_back(g->cpu_graph_table->make_gpu_ps_graph(idx, ids[i])); + GpuPsCommGraph sub_graph = + g->cpu_graph_table->make_gpu_ps_graph(idx, ids[i]); + g->build_graph_on_single_gpu(sub_graph, i); + sub_graph.release_on_cpu(); + VLOG(0) << "sub graph on gpu " << i << " is built"; } - g->build_graph_from_cpu(vec); + // g->build_graph_from_cpu(vec); } void GraphGpuWrapper::initialize() { diff --git a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h index 6972551b896edab4445ff9b8d783e8f9dbd913db..b638311304773eeb131b662b15da1722160d9958 100644 --- a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h +++ b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h @@ -22,7 +22,10 @@ namespace framework { #ifdef PADDLE_WITH_HETERPS class GraphGpuWrapper { public: - char* graph_table; + static GraphGpuWrapper* GetInstance() { + static GraphGpuWrapper wrapper; + return &wrapper; + } void initialize(); void test(); void set_device(std::vector ids); @@ -34,12 +37,22 @@ class GraphGpuWrapper { std::string feat_dtype, int feat_shape); void load_edge_file(std::string name, std::string filepath, bool reverse); void load_node_file(std::string name, std::string filepath); + int32_t load_next_partition(int idx); + int32_t get_partition_num(int idx); + std::vector get_partition(int idx, int num); + void make_partitions(int idx, int64_t byte_size, int device_len); + void make_complementary_graph(int idx, int64_t byte_size); + void set_search_level(int level); + void init_search_level(int level); + std::vector> get_all_id(int type, int idx, + int slice_num); NodeQueryResult query_node_list(int gpu_id, int start, int query_size); NeighborSampleResult graph_neighbor_sample_v3(NeighborSampleQuery q, bool cpu_switch); std::vector graph_neighbor_sample(int gpu_id, std::vector& key, int sample_size); + std::unordered_map edge_to_id, feature_to_id; std::vector id_to_feature, id_to_edge; std::vector> table_feat_mapping; @@ -48,6 +61,8 @@ class GraphGpuWrapper { std::vector> table_feat_conf_feat_shape; ::paddle::distributed::GraphParameter table_proto; std::vector device_id_mapping; + int search_level = 1; + char* graph_table; }; #endif } diff --git a/paddle/fluid/framework/fleet/heter_ps/hashtable_kernel.cu b/paddle/fluid/framework/fleet/heter_ps/hashtable_kernel.cu index fc54be447fe1719a434a5e8896f903a04dc749ae..87b62c6d380a42793eafd88a84cb6c506fce4532 100644 --- a/paddle/fluid/framework/fleet/heter_ps/hashtable_kernel.cu +++ b/paddle/fluid/framework/fleet/heter_ps/hashtable_kernel.cu @@ -298,6 +298,8 @@ void HashTable::update(const KeyType* d_keys, template class HashTable; template class HashTable; +template class HashTable; +template class HashTable; template void HashTable::get< cudaStream_t>(const unsigned long* d_keys, @@ -308,6 +310,10 @@ template void HashTable::get(const long* d_keys, int* d_vals, size_t len, cudaStream_t stream); +template void HashTable::get( + const long* d_keys, unsigned long* d_vals, size_t len, cudaStream_t stream); +template void HashTable::get( + const long* d_keys, unsigned int* d_vals, size_t len, cudaStream_t stream); // template void // HashTable::get( // const unsigned long* d_keys, char* d_vals, size_t len, cudaStream_t @@ -323,6 +329,14 @@ template void HashTable::insert(const long* d_keys, size_t len, cudaStream_t stream); +template void HashTable::insert( + const long* d_keys, const unsigned long* d_vals, size_t len, + cudaStream_t stream); + +template void HashTable::insert( + const long* d_keys, const unsigned int* d_vals, size_t len, + cudaStream_t stream); + // template void HashTable::insert< // cudaStream_t>(const unsigned long* d_keys, size_t len, char* pool, diff --git a/paddle/fluid/framework/fleet/heter_ps/test_cpu_query.cu b/paddle/fluid/framework/fleet/heter_ps/test_cpu_query.cu index f35a1c41bbe1d0903a1d5dfe7ee5e4e3cdc95f1f..b3a38a6dfde49717a722e80cacda02a70350ee63 100644 --- a/paddle/fluid/framework/fleet/heter_ps/test_cpu_query.cu +++ b/paddle/fluid/framework/fleet/heter_ps/test_cpu_query.cu @@ -28,6 +28,16 @@ namespace platform = paddle::platform; // paddle::framework::GpuPsCommGraph GraphTable::make_gpu_ps_graph( // std::vector ids) +std::string edges[] = { + std::string("0\t1"), std::string("0\t9"), std::string("1\t2"), + std::string("1\t0"), std::string("2\t1"), std::string("2\t3"), + std::string("3\t2"), std::string("3\t4"), std::string("4\t3"), + std::string("4\t5"), std::string("5\t4"), std::string("5\t6"), + std::string("6\t5"), std::string("6\t7"), std::string("7\t6"), + std::string("7\t8"), +}; +char edge_file_name[] = "edges1.txt"; + std::string nodes[] = { std::string("user\t37\ta 0.34\tb 13 14\tc hello\td abc"), std::string("user\t96\ta 0.31\tb 15 10\tc 96hello\td abcd"), @@ -53,12 +63,17 @@ std::vector user_feature_dtype = {"float32", "int32", "string", std::vector item_feature_dtype = {"float32"}; std::vector user_feature_shape = {1, 2, 1, 1}; std::vector item_feature_shape = {1}; -void prepare_file(char file_name[]) { +void prepare_file(char file_name[], bool load_edge) { std::ofstream ofile; ofile.open(file_name); - - for (auto x : nodes) { - ofile << x << std::endl; + if (load_edge) { + for (auto x : edges) { + ofile << x << std::endl; + } + } else { + for (auto x : nodes) { + ofile << x << std::endl; + } } ofile.close(); } @@ -85,9 +100,10 @@ TEST(TEST_FLEET, test_cpu_cache) { g_f1->add_dtype(item_feature_dtype[i]); g_f1->add_shape(item_feature_shape[i]); } - prepare_file(node_file_name); + prepare_file(node_file_name, false); + prepare_file(edge_file_name, true); table_proto.set_shard_num(24); - + table_proto.set_search_level(2); std::shared_ptr resource = std::make_shared(device_id_mapping); resource->enable_p2p(); @@ -120,11 +136,14 @@ TEST(TEST_FLEET, test_cpu_cache) { } g.cpu_graph_table->build_sampler(0); ids1.push_back(5); + ids1.push_back(7); vec.push_back(g.cpu_graph_table->make_gpu_ps_graph(0, ids0)); vec.push_back(g.cpu_graph_table->make_gpu_ps_graph(0, ids1)); vec[0].display_on_cpu(); vec[1].display_on_cpu(); - g.build_graph_from_cpu(vec); + // g.build_graph_from_cpu(vec); + g.build_graph_on_single_gpu(vec[0], 0); + g.build_graph_on_single_gpu(vec[1], 1); int64_t cpu_key[3] = {0, 1, 2}; /* std::vector> buffers(3); @@ -136,20 +155,84 @@ TEST(TEST_FLEET, test_cpu_cache) { } */ void *key; - platform::CUDADeviceGuard guard(0); - cudaMalloc((void **)&key, 3 * sizeof(int64_t)); - cudaMemcpy(key, cpu_key, 3 * sizeof(int64_t), cudaMemcpyHostToDevice); - auto neighbor_sample_res = - g.graph_neighbor_sample_v2(0, (int64_t *)key, 2, 3, true); - neighbor_sample_res.display(); - //{1,9} or {9,1} is expected for key 0 - //{0,2} or {2,0} is expected for key 1 - //{1,3} or {3,1} is expected for key 2 - auto node_query_res = g.query_node_list(0, 0, 4); - node_query_res.display(); - NeighborSampleQuery query; - query.initialize(0, node_query_res.get_val(), 2, node_query_res.get_len()); - query.display(); - auto c = g.graph_neighbor_sample_v3(query, false); - c.display(); + int device_len = 2; + for (int i = 0; i < 2; i++) { + // platform::CUDADeviceGuard guard(i); + LOG(0) << "query on card " << i; + //{1,9} or {9,1} is expected for key 0 + //{0,2} or {2,0} is expected for key 1 + //{1,3} or {3,1} is expected for key 2 + int step = 2; + int cur = 0; + while (true) { + auto node_query_res = g.query_node_list(i, cur, step); + node_query_res.display(); + if (node_query_res.get_len() == 0) { + VLOG(0) << "no more ids,break"; + break; + } + cur += node_query_res.get_len(); + NeighborSampleQuery query; + query.initialize(i, node_query_res.get_val(), 1, + node_query_res.get_len()); + query.display(); + auto c = g.graph_neighbor_sample_v3(query, false); + c.display(); + } + } + g.cpu_graph_table->set_search_level(2); + // g.cpu_graph_table->Load_to_ssd(edge_file_name,"e>u2u"); + g.cpu_graph_table->Load(edge_file_name, "e>u2u"); + g.cpu_graph_table->make_partitions(0, 64, 2); + int index = 0; + while (g.cpu_graph_table->load_next_partition(0) != -1) { + auto all_ids = g.cpu_graph_table->get_all_id(0, 0, device_len); + for (auto x : all_ids) { + for (auto y : x) { + VLOG(0) << "part " << index << " " << y; + } + } + for (int i = 0; i < all_ids.size(); i++) { + GpuPsCommGraph sub_graph = + g.cpu_graph_table->make_gpu_ps_graph(0, all_ids[i]); + g.build_graph_on_single_gpu(sub_graph, i); + VLOG(2) << "sub graph on gpu " << i << " is built"; + } + VLOG(0) << "start to iterate gpu graph node"; + g.cpu_graph_table->make_complementary_graph(0, 64); + for (int i = 0; i < 2; i++) { + // platform::CUDADeviceGuard guard(i); + LOG(0) << "query on card " << i; + int step = 2; + int cur = 0; + while (true) { + auto node_query_res = g.query_node_list(i, cur, step); + node_query_res.display(); + if (node_query_res.get_len() == 0) { + VLOG(0) << "no more ids,break"; + break; + } + cur += node_query_res.get_len(); + NeighborSampleQuery query, q1; + query.initialize(i, node_query_res.get_val(), 4, + node_query_res.get_len()); + query.display(); + auto c = g.graph_neighbor_sample_v3(query, true); + c.display(); + platform::CUDADeviceGuard guard(i); + int64_t *key; + VLOG(0) << "sample key 1 globally"; + g.cpu_graph_table->set_search_level(2); + cudaMalloc((void **)&key, sizeof(int64_t)); + int64_t t_key = 1; + cudaMemcpy(key, &t_key, sizeof(int64_t), cudaMemcpyHostToDevice); + q1.initialize(i, (int64_t)key, 2, 1); + auto d = g.graph_neighbor_sample_v3(q1, true); + d.display(); + cudaFree(key); + g.cpu_graph_table->set_search_level(1); + } + } + index++; + } } diff --git a/paddle/fluid/framework/multi_trainer.cc b/paddle/fluid/framework/multi_trainer.cc index 61cd7ad01696e1a34891b490f6bddd4713384cd0..7a83fdccc218c48859bf644d282601c828f4bb16 100644 --- a/paddle/fluid/framework/multi_trainer.cc +++ b/paddle/fluid/framework/multi_trainer.cc @@ -34,7 +34,6 @@ void MultiTrainer::Initialize(const TrainerDesc& trainer_desc, mpi_rank_ = trainer_desc.mpi_rank(); mpi_size_ = trainer_desc.mpi_size(); dump_file_num_ = trainer_desc.dump_file_num(); - for (int i = 0; i < trainer_desc.downpour_param().stat_var_names_size(); i++) { need_merge_var_names_.push_back( diff --git a/paddle/fluid/pybind/fleet_py.cc b/paddle/fluid/pybind/fleet_py.cc index b6d6844e99715908c775bb9c7d46972788941739..ae52b96d37ce36b10854dd36b933fb38ed321bef 100644 --- a/paddle/fluid/pybind/fleet_py.cc +++ b/paddle/fluid/pybind/fleet_py.cc @@ -325,14 +325,18 @@ void BindNeighborSampleResult(py::module* m) { py::class_(*m, "NeighborSampleResult") .def(py::init<>()) .def("initialize", &NeighborSampleResult::initialize) + .def("get_len", &NeighborSampleResult::get_len) + .def("get_val", &NeighborSampleResult::get_actual_val) .def("display", &NeighborSampleResult::display); } void BindGraphGpuWrapper(py::module* m) { py::class_(*m, "GraphGpuWrapper") - .def(py::init<>()) + // nit<>()) //.def("test", &GraphGpuWrapper::test) - .def("initialize", &GraphGpuWrapper::initialize) + //.def(py::init([]() { return framework::GraphGpuWrapper::GetInstance(); + //})) + .def(py::init<>()) .def("neighbor_sample", &GraphGpuWrapper::graph_neighbor_sample_v3) .def("graph_neighbor_sample", &GraphGpuWrapper::graph_neighbor_sample) .def("set_device", &GraphGpuWrapper::set_device) @@ -342,6 +346,15 @@ void BindGraphGpuWrapper(py::module* m) { .def("add_table_feat_conf", &GraphGpuWrapper::add_table_feat_conf) .def("load_edge_file", &GraphGpuWrapper::load_edge_file) .def("upload_batch", &GraphGpuWrapper::upload_batch) + .def("get_all_id", &GraphGpuWrapper::get_all_id) + .def("load_next_partition", &GraphGpuWrapper::load_next_partition) + .def("make_partitions", &GraphGpuWrapper::make_partitions) + .def("make_complementary_graph", + &GraphGpuWrapper::make_complementary_graph) + .def("set_search_level", &GraphGpuWrapper::set_search_level) + .def("init_search_level", &GraphGpuWrapper::init_search_level) + .def("get_partition_num", &GraphGpuWrapper::get_partition_num) + .def("get_partition", &GraphGpuWrapper::get_partition) .def("load_node_file", &GraphGpuWrapper::load_node_file); } #endif