未验证 提交 a3917625 编写于 作者: S seemingwang 提交者: GitHub

graph partition (#42472)

* enable graph-engine to return all id (#42319)

* enable graph-engine to return all id

* change vector's dimension

* change vector's dimension

* enlarge returned ids dimensions

* change sample result's structure to fit training (#42426)

* enable graph-engine to return all id

* change vector's dimension

* change vector's dimension

* enlarge returned ids dimensions

* add actual_val

* change vlog

* fix bug

* bug fix

* bug fix

* fix display test

* singleton of gpu_graph_wrapper

* change sample result's structure to fit training

* recover sample code

* fix

* secondary sample

* add graph partition

* fix pybind
Co-authored-by: NDesmonDay <908660116@qq.com>
Co-authored-by: NDesmonDay <908660116@qq.com>
上级 a5745864
......@@ -28,6 +28,22 @@ namespace paddle {
namespace distributed {
#ifdef PADDLE_WITH_HETERPS
int32_t GraphTable::Load_to_ssd(const std::string &path,
const std::string &param) {
bool load_edge = (param[0] == 'e');
bool load_node = (param[0] == 'n');
if (load_edge) {
bool reverse_edge = (param[1] == '<');
std::string edge_type = param.substr(2);
return this->load_edges_to_ssd(path, reverse_edge, edge_type);
}
if (load_node) {
std::string node_type = param.substr(1);
return this->load_nodes(path, node_type);
}
return 0;
}
paddle::framework::GpuPsCommGraph GraphTable::make_gpu_ps_graph(
int idx, std::vector<int64_t> ids) {
std::vector<std::vector<int64_t>> bags(task_pool_size_);
......@@ -38,11 +54,11 @@ paddle::framework::GpuPsCommGraph GraphTable::make_gpu_ps_graph(
std::vector<std::future<int>> tasks;
std::vector<int64_t> edge_array[task_pool_size_];
std::vector<paddle::framework::GpuPsGraphNode> node_array[task_pool_size_];
for (int i = 0; i < (int)bags.size(); i++) {
for (size_t i = 0; i < bags.size(); i++) {
if (bags[i].size() > 0) {
tasks.push_back(_shards_task_pool[i]->enqueue([&, i, this]() -> int {
paddle::framework::GpuPsGraphNode x;
for (int j = 0; j < (int)bags[i].size(); j++) {
for (size_t j = 0; j < bags[i].size(); j++) {
Node *v = find_node(0, idx, bags[i][j]);
x.node_id = bags[i][j];
if (v == NULL) {
......@@ -53,7 +69,7 @@ paddle::framework::GpuPsCommGraph GraphTable::make_gpu_ps_graph(
x.neighbor_size = v->get_neighbor_size();
x.neighbor_offset = edge_array[i].size();
node_array[i].push_back(x);
for (int k = 0; k < x.neighbor_size; k++) {
for (size_t k = 0; k < x.neighbor_size; k++) {
edge_array[i].push_back(v->get_neighbor_id(k));
}
}
......@@ -64,27 +80,29 @@ paddle::framework::GpuPsCommGraph GraphTable::make_gpu_ps_graph(
}
for (int i = 0; i < (int)tasks.size(); i++) tasks[i].get();
paddle::framework::GpuPsCommGraph res;
int tot_len = 0;
unsigned int tot_len = 0;
for (int i = 0; i < task_pool_size_; i++) {
tot_len += (int)edge_array[i].size();
}
res.neighbor_size = tot_len;
res.node_size = ids.size();
res.neighbor_list = new int64_t[tot_len];
res.node_list = new paddle::framework::GpuPsGraphNode[ids.size()];
int offset = 0, ind = 0;
tot_len += edge_array[i].size();
}
// res.neighbor_size = tot_len;
// res.node_size = ids.size();
// res.neighbor_list = new int64_t[tot_len];
// res.node_list = new paddle::framework::GpuPsGraphNode[ids.size()];
res.init_on_cpu(tot_len, (unsigned int)ids.size());
unsigned int offset = 0, ind = 0;
for (int i = 0; i < task_pool_size_; i++) {
for (int j = 0; j < (int)node_array[i].size(); j++) {
res.node_list[ind] = node_array[i][j];
res.node_list[ind++].neighbor_offset += offset;
}
for (int j = 0; j < (int)edge_array[i].size(); j++) {
for (size_t j = 0; j < edge_array[i].size(); j++) {
res.neighbor_list[offset + j] = edge_array[i][j];
}
offset += edge_array[i].size();
}
return res;
}
int32_t GraphTable::add_node_to_ssd(int type_id, int idx, int64_t src_id,
char *data, int len) {
if (_db != NULL) {
......@@ -92,8 +110,31 @@ int32_t GraphTable::add_node_to_ssd(int type_id, int idx, int64_t src_id,
memcpy(ch, &type_id, sizeof(int));
memcpy(ch + sizeof(int), &idx, sizeof(int));
memcpy(ch + sizeof(int) * 2, &src_id, sizeof(int64_t));
_db->put(src_id % shard_num % task_pool_size_, ch,
sizeof(int) * 2 + sizeof(int64_t), (char *)data, len);
std::string str;
if (_db->get(src_id % shard_num % task_pool_size_, ch,
sizeof(int) * 2 + sizeof(int64_t), str) == 0) {
int64_t *stored_data = ((int64_t *)str.c_str());
int n = str.size() / sizeof(int64_t);
char *new_data = new char[n * sizeof(int64_t) + len];
memcpy(new_data, stored_data, n * sizeof(int64_t));
memcpy(new_data + n * sizeof(int64_t), data, len);
_db->put(src_id % shard_num % task_pool_size_, ch,
sizeof(int) * 2 + sizeof(int64_t), (char *)new_data,
n * sizeof(int64_t) + len);
delete[] new_data;
} else {
_db->put(src_id % shard_num % task_pool_size_, ch,
sizeof(int) * 2 + sizeof(int64_t), (char *)data, len);
}
_db->flush(src_id % shard_num % task_pool_size_);
std::string x;
// if (_db->get(src_id % shard_num % task_pool_size_, ch, sizeof(int64_t) +
// 2 * sizeof(int), x) ==0){
// VLOG(0)<<"put result";
// for(int i = 0;i < x.size();i+=8){
// VLOG(0)<<"get an id "<<*((int64_t *)(x.c_str() + i));
// }
//}
}
return 0;
}
......@@ -109,8 +150,8 @@ char *GraphTable::random_sample_neighbor_from_ssd(
memset(ch, 0, sizeof(int));
memcpy(ch + sizeof(int), &idx, sizeof(int));
memcpy(ch + sizeof(int) * 2, &id, sizeof(int64_t));
if (_db->get(id % shard_num % task_pool_size_, ch, sizeof(uint64_t), str) ==
0) {
if (_db->get(id % shard_num % task_pool_size_, ch,
sizeof(int) * 2 + sizeof(int64_t), str) == 0) {
int64_t *data = ((int64_t *)str.c_str());
int n = str.size() / sizeof(int64_t);
std::unordered_map<int, int> m;
......@@ -142,7 +183,298 @@ char *GraphTable::random_sample_neighbor_from_ssd(
actual_size = 0;
return NULL;
}
int64_t GraphTable::load_graph_to_memory_from_ssd(int idx,
std::vector<int64_t> &ids) {
std::vector<std::vector<int64_t>> bags(task_pool_size_);
for (auto x : ids) {
int location = x % shard_num % task_pool_size_;
bags[location].push_back(x);
}
std::vector<std::future<int>> tasks;
std::vector<int64_t> count(task_pool_size_, 0);
for (size_t i = 0; i < bags.size(); i++) {
if (bags[i].size() > 0) {
tasks.push_back(_shards_task_pool[i]->enqueue([&, i, idx, this]() -> int {
char ch[sizeof(int) * 2 + sizeof(int64_t)];
memset(ch, 0, sizeof(int));
memcpy(ch + sizeof(int), &idx, sizeof(int));
for (size_t k = 0; k < bags[i].size(); k++) {
auto v = bags[i][k];
memcpy(ch + sizeof(int) * 2, &v, sizeof(int64_t));
std::string str;
if (_db->get(i, ch, sizeof(int) * 2 + sizeof(int64_t), str) == 0) {
count[i] += (int64_t)str.size();
for (int j = 0; j < str.size(); j += sizeof(int64_t)) {
int64_t id = *(int64_t *)(str.c_str() + j);
add_comm_edge(idx, v, id);
}
}
}
return 0;
}));
}
}
for (int i = 0; i < (int)tasks.size(); i++) tasks[i].get();
int64_t tot = 0;
for (auto x : count) tot += x;
return tot;
}
void GraphTable::make_partitions(int idx, int64_t byte_size, int device_len) {
VLOG(2) << "start to make graph partitions , byte_size = " << byte_size
<< " total memory cost = " << total_memory_cost;
if (total_memory_cost == 0) {
VLOG(0) << "no edges are detected,make partitions exits";
return;
}
const float a = 2.0, y = 1.25;
int64_t gb_size_by_discount = byte_size * 0.8 * device_len;
if (gb_size_by_discount <= 0) gb_size_by_discount = 1;
int part_len = total_memory_cost / gb_size_by_discount;
if (part_len == 0) part_len = 1;
VLOG(2) << "part_len = " << part_len
<< " byte size = " << gb_size_by_discount;
partitions[idx].clear();
partitions[idx].resize(part_len);
std::vector<int64_t> memory_remaining(part_len, gb_size_by_discount);
std::vector<float> score(part_len, 0);
std::unordered_map<int64_t, int> id_map;
std::vector<rocksdb::Iterator *> iters;
for (int i = 0; i < task_pool_size_; i++) {
iters.push_back(_db->get_iterator(i));
iters[i]->SeekToFirst();
}
int next = 0;
while (iters.size()) {
if (next >= iters.size()) {
next = 0;
}
if (!iters[next]->Valid()) {
iters.erase(iters.begin() + next);
continue;
}
std::string key = iters[next]->key().ToString();
int temp_idx = *(int *)(key.c_str() + sizeof(int));
if (temp_idx != idx) {
iters[next]->Next();
next++;
continue;
}
std::string value = iters[next]->value().ToString();
std::int64_t i_key = *(int64_t *)(key.c_str() + 8);
for (int i = 0; i < part_len; i++) {
if (memory_remaining[i] < (int64_t)value.size()) {
score[i] = -100000.0;
} else {
score[i] = 0;
}
}
for (int j = 0; j < value.size(); j += sizeof(int64_t)) {
int64_t v = *((int64_t *)(value.c_str() + j));
int index = -1;
if (id_map.find(v) != id_map.end()) {
index = id_map[v];
score[index]++;
}
}
float base;
int index = 0;
for (int i = 0; i < part_len; i++) {
base = gb_size_by_discount - memory_remaining[i];
score[i] -= a * y * std::pow(1.0 * base, y - 1);
if (score[i] > score[index]) index = i;
VLOG(2) << "score" << i << " = " << score[i] << " memory left "
<< memory_remaining[i];
}
id_map[i_key] = index;
partitions[idx][index].push_back(i_key);
memory_remaining[index] -= (int64_t)value.size();
iters[next]->Next();
next++;
}
for (int i = 0; i < part_len; i++) {
if (partitions[idx][i].size() == 0) {
partitions[idx].erase(partitions[idx].begin() + i);
i--;
part_len--;
continue;
}
VLOG(2) << " partition " << i << " size = " << partitions[idx][i].size();
for (auto x : partitions[idx][i]) {
VLOG(2) << "find a id " << x;
}
}
next_partition = 0;
}
void GraphTable::clear_graph(int idx) {
for (auto p : edge_shards[idx]) {
delete p;
}
edge_shards[idx].clear();
for (size_t i = 0; i < shard_num_per_server; i++) {
edge_shards[idx].push_back(new GraphShard());
}
}
int32_t GraphTable::load_next_partition(int idx) {
if (next_partition >= partitions[idx].size()) {
VLOG(0) << "partition iteration is done";
return -1;
}
clear_graph(idx);
load_graph_to_memory_from_ssd(idx, partitions[idx][next_partition]);
next_partition++;
return 0;
}
int32_t GraphTable::load_edges_to_ssd(const std::string &path,
bool reverse_edge,
const std::string &edge_type) {
int idx = 0;
if (edge_type == "") {
VLOG(0) << "edge_type not specified, loading edges to " << id_to_edge[0]
<< " part";
} else {
if (edge_to_id.find(edge_type) == edge_to_id.end()) {
VLOG(0) << "edge_type " << edge_type
<< " is not defined, nothing will be loaded";
return 0;
}
idx = edge_to_id[edge_type];
}
total_memory_cost = 0;
auto paths = paddle::string::split_string<std::string>(path, ";");
int64_t count = 0;
std::string sample_type = "random";
bool is_weighted = false;
int valid_count = 0;
for (auto path : paths) {
std::ifstream file(path);
std::string line;
while (std::getline(file, line)) {
VLOG(0) << "get a line from file " << line;
auto values = paddle::string::split_string<std::string>(line, "\t");
count++;
if (values.size() < 2) continue;
auto src_id = std::stoll(values[0]);
auto dist_ids = paddle::string::split_string<std::string>(values[1], ";");
std::vector<int64_t> dist_data;
for (auto x : dist_ids) {
dist_data.push_back(std::stoll(x));
total_memory_cost += sizeof(int64_t);
}
add_node_to_ssd(0, idx, src_id, (char *)dist_data.data(),
(int)(dist_data.size() * sizeof(int64_t)));
}
}
VLOG(0) << "total memory cost = " << total_memory_cost << " bytes";
return 0;
}
int32_t GraphTable::dump_edges_to_ssd(int idx) {
VLOG(0) << "calling dump edges to ssd";
const int64_t fixed_size = 10000;
// std::vector<int64_t> edge_array[task_pool_size_];
std::vector<std::unordered_map<int64_t, int>> count(task_pool_size_);
std::vector<std::future<int64_t>> tasks;
auto &shards = edge_shards[idx];
for (size_t i = 0; i < shards.size(); ++i) {
tasks.push_back(_shards_task_pool[i % task_pool_size_]->enqueue(
[&, i, this]() -> int64_t {
int64_t cost = 0;
std::vector<Node *> &v = shards[i]->get_bucket();
std::vector<int64_t> s;
size_t ind = i % this->task_pool_size_;
for (size_t j = 0; j < v.size(); j++) {
for (int k = 0; k < v[j]->get_neighbor_size(); k++) {
s.push_back(v[j]->get_neighbor_id(k));
}
cost += v[j]->get_neighbor_size() * sizeof(int64_t);
add_node_to_ssd(0, idx, v[j]->get_id(), (char *)s.data(),
s.size() * sizeof(int64_t));
}
return cost;
}));
}
for (size_t i = 0; i < tasks.size(); i++) total_memory_cost += tasks[i].get();
return 0;
}
int32_t GraphTable::make_complementary_graph(int idx, int64_t byte_size) {
VLOG(0) << "make_complementary_graph";
const int64_t fixed_size = 10000;
// std::vector<int64_t> edge_array[task_pool_size_];
std::vector<std::unordered_map<int64_t, int>> count(task_pool_size_);
std::vector<std::future<int>> tasks;
auto &shards = edge_shards[idx];
for (size_t i = 0; i < shards.size(); ++i) {
tasks.push_back(
_shards_task_pool[i % task_pool_size_]->enqueue([&, i, this]() -> int {
std::vector<Node *> &v = shards[i]->get_bucket();
size_t ind = i % this->task_pool_size_;
for (size_t j = 0; j < v.size(); j++) {
size_t location = v[j]->get_id();
for (int k = 0; k < v[j]->get_neighbor_size(); k++) {
count[ind][v[j]->get_neighbor_id(k)]++;
}
}
return 0;
}));
}
std::unordered_map<int64_t, int> final_count;
std::map<int, std::vector<int64_t>> count_to_id;
std::vector<int64_t> buffer;
for (auto p : edge_shards[idx]) {
delete p;
}
edge_shards[idx].clear();
for (size_t i = 0; i < shard_num_per_server; i++) {
edge_shards[idx].push_back(new GraphShard());
}
for (size_t i = 0; i < tasks.size(); i++) tasks[i].get();
for (int i = 0; i < task_pool_size_; i++) {
for (auto &p : count[i]) {
final_count[p.first] = final_count[p.first] + p.second;
}
count[i].clear();
}
for (auto &p : final_count) {
count_to_id[p.second].push_back(p.first);
VLOG(2) << p.first << " appear " << p.second << " times";
}
// std::map<int,std::vector<int64_t>>::iterator iter= count_to_id.rbegin();
auto iter = count_to_id.rbegin();
while (iter != count_to_id.rend() && byte_size > 0) {
for (auto x : iter->second) {
buffer.push_back(x);
if (buffer.size() >= fixed_size) {
int64_t res = load_graph_to_memory_from_ssd(idx, buffer);
byte_size -= res;
}
if (byte_size <= 0) break;
}
iter++;
}
if (byte_size > 0 && buffer.size() > 0) {
int64_t res = load_graph_to_memory_from_ssd(idx, buffer);
byte_size -= res;
}
std::string sample_type = "random";
for (auto &shard : edge_shards[idx]) {
auto bucket = shard->get_bucket();
for (size_t i = 0; i < bucket.size(); i++) {
bucket[i]->build_sampler(sample_type);
}
}
return 0;
}
#endif
/*
int CompleteGraphSampler::run_graph_sampling() {
pthread_rwlock_t *rw_lock = graph_table->rw_lock.get();
......@@ -700,9 +1032,11 @@ int32_t GraphTable::build_sampler(int idx, std::string sample_type) {
}
int32_t GraphTable::load_edges(const std::string &path, bool reverse_edge,
const std::string &edge_type) {
// #ifdef PADDLE_WITH_HETERPS
// if (gpups_mode) pthread_rwlock_rdlock(rw_lock.get());
// #endif
#ifdef PADDLE_WITH_HETERPS
// if (gpups_mode) pthread_rwlock_rdlock(rw_lock.get());
if (search_level == 2) total_memory_cost = 0;
const int64_t fixed_load_edges = 1000000;
#endif
int idx = 0;
if (edge_type == "") {
VLOG(0) << "edge_type not specified, loading edges to " << id_to_edge[0]
......@@ -715,6 +1049,7 @@ int32_t GraphTable::load_edges(const std::string &path, bool reverse_edge,
}
idx = edge_to_id[edge_type];
}
auto paths = paddle::string::split_string<std::string>(path, ";");
int64_t count = 0;
std::string sample_type = "random";
......@@ -756,13 +1091,33 @@ int32_t GraphTable::load_edges(const std::string &path, bool reverse_edge,
edge_shards[idx][index]->add_graph_node(src_id)->build_edges(is_weighted);
edge_shards[idx][index]->add_neighbor(src_id, dst_id, weight);
valid_count++;
#ifdef PADDLE_WITH_HETERPS
// if (gpups_mode) pthread_rwlock_rdlock(rw_lock.get());
if (count > fixed_load_edges && search_level == 2) {
dump_edges_to_ssd(idx);
VLOG(0) << "dumping edges to ssd, edge count is reset to 0";
clear_graph(idx);
count = 0;
}
#endif
}
}
VLOG(0) << valid_count << "/" << count << " edges are loaded successfully in "
<< path;
// Build Sampler j
// Build Sampler j
#ifdef PADDLE_WITH_HETERPS
// if (gpups_mode) pthread_rwlock_rdlock(rw_lock.get());
if (search_level == 2) {
if (count > 0) {
dump_edges_to_ssd(idx);
VLOG(0) << "dumping edges to ssd, edge count is reset to 0";
clear_graph(idx);
count = 0;
}
return 0;
}
#endif
for (auto &shard : edge_shards[idx]) {
auto bucket = shard->get_bucket();
for (size_t i = 0; i < bucket.size(); i++) {
......@@ -892,7 +1247,6 @@ int32_t GraphTable::random_sample_neighbors(
scaled_lru->query(i, id_list[i].data(), id_list[i].size(), r);
}
int index = 0;
uint32_t idx;
std::vector<SampleResult> sample_res;
std::vector<SampleKey> sample_keys;
auto &rng = _shards_task_rng_pool[i];
......@@ -911,6 +1265,7 @@ int32_t GraphTable::random_sample_neighbors(
if (node == nullptr) {
#ifdef PADDLE_WITH_HETERPS
if (search_level == 2) {
VLOG(2) << "enter sample from ssd";
char *buffer_addr = random_sample_neighbor_from_ssd(
idx, node_id, sample_size, rng, actual_size);
if (actual_size != 0) {
......@@ -1060,6 +1415,26 @@ std::pair<int32_t, std::string> GraphTable::parse_feature(
return std::make_pair<int32_t, std::string>(-1, "");
}
std::vector<std::vector<int64_t>> GraphTable::get_all_id(int type_id, int idx,
int slice_num) {
std::vector<std::vector<int64_t>> res(slice_num);
auto &search_shards = type_id == 0 ? edge_shards[idx] : feature_shards[idx];
std::vector<std::future<std::vector<int64_t>>> tasks;
for (int i = 0; i < search_shards.size(); i++) {
tasks.push_back(_shards_task_pool[i % task_pool_size_]->enqueue(
[&search_shards, i]() -> std::vector<int64_t> {
return search_shards[i]->get_all_id();
}));
}
for (size_t i = 0; i < tasks.size(); ++i) {
tasks[i].wait();
}
for (size_t i = 0; i < tasks.size(); i++) {
auto ids = tasks[i].get();
for (auto &id : ids) res[id % slice_num].push_back(id);
}
return res;
}
int32_t GraphTable::pull_graph_list(int type_id, int idx, int start,
int total_size,
std::unique_ptr<char[]> &buffer,
......@@ -1218,6 +1593,9 @@ int32_t GraphTable::Initialize(const GraphParameter &graph) {
VLOG(0) << "in init graph table shard idx = " << _shard_idx << " shard_start "
<< shard_start << " shard_end " << shard_end;
edge_shards.resize(id_to_edge.size());
#ifdef PADDLE_WITH_HETERPS
partitions.resize(id_to_edge.size());
#endif
for (int k = 0; k < (int)edge_shards.size(); k++) {
for (size_t i = 0; i < shard_num_per_server; i++) {
edge_shards[k].push_back(new GraphShard());
......
......@@ -63,7 +63,13 @@ class GraphShard {
}
return res;
}
std::vector<int64_t> get_all_id() {
std::vector<int64_t> res;
for (int i = 0; i < (int)bucket.size(); i++) {
res.push_back(bucket[i]->get_id());
}
return res;
}
GraphNode *add_graph_node(int64_t id);
GraphNode *add_graph_node(Node *node);
FeatureNode *add_feature_node(int64_t id);
......@@ -420,6 +426,10 @@ class GraphTable : public Table {
use_cache = false;
shard_num = 0;
rw_lock.reset(new pthread_rwlock_t());
#ifdef PADDLE_WITH_HETERPS
next_partition = 0;
total_memory_cost = 0;
#endif
}
virtual ~GraphTable();
......@@ -465,6 +475,8 @@ class GraphTable : public Table {
int32_t load_edges(const std::string &path, bool reverse,
const std::string &edge_type);
std::vector<std::vector<int64_t>> get_all_id(int type, int idx,
int slice_num);
int32_t load_nodes(const std::string &path, std::string node_type);
int32_t add_graph_node(int idx, std::vector<int64_t> &id_list,
......@@ -513,7 +525,7 @@ class GraphTable : public Table {
const std::vector<std::vector<std::string>> &res);
size_t get_server_num() { return server_num; }
void clear_graph(int idx);
virtual int32_t make_neighbor_sample_cache(size_t size_limit, size_t ttl) {
{
std::unique_lock<std::mutex> lock(mutex_);
......@@ -538,6 +550,7 @@ class GraphTable : public Table {
// graph_sampler->set_graph_sample_callback(callback);
// return 0;
// }
virtual void make_partitions(int idx, int64_t gb_size, int device_len);
virtual char *random_sample_neighbor_from_ssd(
int idx, int64_t id, int sample_size,
const std::shared_ptr<std::mt19937_64> rng, int &actual_size);
......@@ -545,8 +558,25 @@ class GraphTable : public Table {
char *data, int len);
virtual paddle::framework::GpuPsCommGraph make_gpu_ps_graph(
int idx, std::vector<int64_t> ids);
int32_t Load_to_ssd(const std::string &path, const std::string &param);
int64_t load_graph_to_memory_from_ssd(int idx, std::vector<int64_t> &ids);
int32_t make_complementary_graph(int idx, int64_t byte_size);
int32_t dump_edges_to_ssd(int idx);
int32_t get_partition_num(int idx) { return partitions[idx].size(); }
std::vector<int64_t> get_partition(int idx, int index) {
if (idx >= partitions.size() || index >= partitions[idx].size())
return std::vector<int64_t>();
return partitions[idx][index];
}
int32_t load_edges_to_ssd(const std::string &path, bool reverse_edge,
const std::string &edge_type);
int32_t load_next_partition(int idx);
void set_search_level(int search_level) { this->search_level = search_level; }
// virtual GraphSampler *get_graph_sampler() { return graph_sampler.get(); }
int search_level;
int64_t total_memory_cost;
std::vector<std::vector<std::vector<int64_t>>> partitions;
int next_partition;
#endif
virtual int32_t add_comm_edge(int idx, int64_t src_id, int64_t dst_id);
virtual int32_t build_sampler(int idx, std::string sample_type = "random");
......
......@@ -24,7 +24,7 @@ namespace paddle {
namespace framework {
struct GpuPsGraphNode {
int64_t node_id;
int neighbor_size, neighbor_offset;
unsigned int neighbor_size, neighbor_offset;
// this node's neighbor is stored on [neighbor_offset,neighbor_offset +
// neighbor_size) of int64_t *neighbor_list;
};
......@@ -32,28 +32,38 @@ struct GpuPsGraphNode {
struct GpuPsCommGraph {
int64_t *neighbor_list;
GpuPsGraphNode *node_list;
int neighbor_size, node_size;
unsigned int neighbor_size, node_size;
// the size of neighbor array and graph_node_list array
GpuPsCommGraph()
: neighbor_list(NULL), node_list(NULL), neighbor_size(0), node_size(0) {}
GpuPsCommGraph(int64_t *neighbor_list_, GpuPsGraphNode *node_list_,
int neighbor_size_, int node_size_)
unsigned int neighbor_size_, unsigned int node_size_)
: neighbor_list(neighbor_list_),
node_list(node_list_),
neighbor_size(neighbor_size_),
node_size(node_size_) {}
void init_on_cpu(unsigned int neighbor_size, unsigned int node_size) {
this->neighbor_size = neighbor_size;
this->node_size = node_size;
this->neighbor_list = new int64_t[neighbor_size];
this->node_list = new paddle::framework::GpuPsGraphNode[node_size];
}
void release_on_cpu() {
delete[] neighbor_list;
delete[] node_list;
}
void display_on_cpu() {
VLOG(0) << "neighbor_size = " << neighbor_size;
VLOG(0) << "node_size = " << node_size;
for (int i = 0; i < neighbor_size; i++) {
for (size_t i = 0; i < neighbor_size; i++) {
VLOG(0) << "neighbor " << i << " " << neighbor_list[i];
}
for (int i = 0; i < node_size; i++) {
for (size_t i = 0; i < node_size; i++) {
VLOG(0) << "node i " << node_list[i].node_id
<< " neighbor_size = " << node_list[i].neighbor_size;
std::string str;
int offset = node_list[i].neighbor_offset;
for (int j = 0; j < node_list[i].neighbor_size; j++) {
for (size_t j = 0; j < node_list[i].neighbor_size; j++) {
if (j > 0) str += ",";
str += std::to_string(neighbor_list[j + offset]);
}
......@@ -139,12 +149,18 @@ struct NeighborSampleQuery {
};
struct NeighborSampleResult {
int64_t *val;
int64_t *actual_val;
int *actual_sample_size, sample_size, key_size;
int total_sample_size;
std::shared_ptr<memory::Allocation> val_mem, actual_sample_size_mem;
std::shared_ptr<memory::Allocation> actual_val_mem;
int64_t *get_val() { return val; }
int64_t get_actual_val() { return (int64_t)actual_val; }
int *get_actual_sample_size() { return actual_sample_size; }
int get_sample_size() { return sample_size; }
int get_key_size() { return key_size; }
void set_total_sample_size(int s) { total_sample_size = s; }
int get_len() { return total_sample_size; }
void initialize(int _sample_size, int _key_size, int dev_id) {
sample_size = _sample_size;
key_size = _key_size;
......@@ -165,18 +181,30 @@ struct NeighborSampleResult {
int *ac_size = new int[key_size];
cudaMemcpy(ac_size, actual_sample_size, key_size * sizeof(int),
cudaMemcpyDeviceToHost); // 3, 1, 3
int total_sample_size = 0;
for (int i = 0; i < key_size; i++) {
total_sample_size += ac_size[i];
}
int64_t *res2 = new int64_t[total_sample_size]; // r
cudaMemcpy(res2, actual_val, total_sample_size * sizeof(int64_t),
cudaMemcpyDeviceToHost); // r
int start = 0;
for (int i = 0; i < key_size; i++) {
VLOG(0) << "actual sample size for " << i << "th key is " << ac_size[i];
VLOG(0) << "sampled neighbors are ";
std::string neighbor;
std::string neighbor, neighbor2;
for (int j = 0; j < ac_size[i]; j++) {
if (neighbor.size() > 0) neighbor += ";";
neighbor += std::to_string(res[i * sample_size + j]);
// if (neighbor.size() > 0) neighbor += ";";
if (neighbor2.size() > 0) neighbor2 += ";"; // r
// neighbor += std::to_string(res[i * sample_size + j]);
neighbor2 += std::to_string(res2[start + j]); // r
}
VLOG(0) << neighbor;
VLOG(0) << neighbor << " " << neighbor2;
start += ac_size[i]; // r
}
delete[] res;
delete[] res2; // r
delete[] ac_size;
VLOG(0) << " ------------------";
}
......
......@@ -23,13 +23,18 @@
#ifdef PADDLE_WITH_HETERPS
namespace paddle {
namespace framework {
class GpuPsGraphTable : public HeterComm<int64_t, int, int> {
class GpuPsGraphTable : public HeterComm<int64_t, unsigned int, int> {
public:
GpuPsGraphTable(std::shared_ptr<HeterPsResource> resource, int topo_aware)
: HeterComm<int64_t, int, int>(1, resource) {
: HeterComm<int64_t, unsigned int, int>(1, resource) {
load_factor_ = 0.25;
rw_lock.reset(new pthread_rwlock_t());
gpu_num = resource_->total_device();
for (int i = 0; i < gpu_num; i++) {
gpu_graph_list.push_back(GpuPsCommGraph());
sample_status.push_back(NULL);
tables_.push_back(NULL);
}
cpu_table_status = -1;
if (topo_aware) {
int total_gpu = resource_->total_device();
......@@ -82,6 +87,8 @@ class GpuPsGraphTable : public HeterComm<int64_t, int, int> {
// end_graph_sampling();
// }
}
void build_graph_on_single_gpu(GpuPsCommGraph &g, int gpu_id);
void clear_graph_info(int gpu_id);
void build_graph_from_cpu(std::vector<GpuPsCommGraph> &cpu_node_list);
NodeQueryResult graph_node_sample(int gpu_id, int sample_size);
NeighborSampleResult graph_neighbor_sample_v3(NeighborSampleQuery q,
......
......@@ -13,6 +13,8 @@
// limitations under the License.
#include <thrust/device_vector.h>
#include <thrust/reduce.h>
#include <thrust/scan.h>
#include <functional>
#pragma once
#ifdef PADDLE_WITH_HETERPS
......@@ -30,10 +32,11 @@ sample_result is to save the neighbor sampling result, its size is len *
sample_size;
*/
__global__ void get_cpu_id_index(int64_t* key, int* val, int64_t* cpu_key,
int* sum, int* index, int len) {
__global__ void get_cpu_id_index(int64_t* key, unsigned int* val,
int64_t* cpu_key, int* sum, int* index,
int len) {
CUDA_KERNEL_LOOP(i, len) {
if (val[i] == -1) {
if (val[i] == ((unsigned int)-1)) {
int old = atomicAdd(sum, 1);
cpu_key[old] = key[i];
index[old] = i;
......@@ -43,9 +46,9 @@ __global__ void get_cpu_id_index(int64_t* key, int* val, int64_t* cpu_key,
template <int WARP_SIZE, int BLOCK_WARPS, int TILE_SIZE>
__global__ void neighbor_sample_example_v2(GpuPsCommGraph graph,
int* node_index, int* actual_size,
int64_t* res, int sample_len,
int n) {
unsigned int* node_index,
int* actual_size, int64_t* res,
int sample_len, int n) {
assert(blockDim.x == WARP_SIZE);
assert(blockDim.y == BLOCK_WARPS);
......@@ -55,7 +58,7 @@ __global__ void neighbor_sample_example_v2(GpuPsCommGraph graph,
curand_init(blockIdx.x, threadIdx.y * WARP_SIZE + threadIdx.x, 0, &rng);
while (i < last_idx) {
if (node_index[i] == -1) {
if (node_index[i] == (unsigned int)(-1)) {
actual_size[i] = 0;
i += BLOCK_WARPS;
continue;
......@@ -92,13 +95,14 @@ __global__ void neighbor_sample_example_v2(GpuPsCommGraph graph,
}
}
__global__ void neighbor_sample_example(GpuPsCommGraph graph, int* node_index,
__global__ void neighbor_sample_example(GpuPsCommGraph graph,
unsigned int* node_index,
int* actual_size, int64_t* res,
int sample_len, int* sample_status,
int n, int from) {
int id = blockIdx.x * blockDim.y + threadIdx.y;
if (id < n) {
if (node_index[id] == -1) {
if (node_index[id] == (unsigned int)(-1)) {
actual_size[id] = 0;
return;
}
......@@ -374,6 +378,18 @@ __global__ void fill_dvalues(int64_t* d_shard_vals, int64_t* d_vals,
}
}
__global__ void fill_actual_vals(int64_t* vals, int64_t* actual_vals,
int* actual_sample_size,
int* cumsum_actual_sample_size,
int sample_size, int len) {
const size_t i = blockIdx.x * blockDim.x + threadIdx.x;
if (i < len) {
for (int j = 0; j < actual_sample_size[i]; j++) {
actual_vals[cumsum_actual_sample_size[i] + j] = vals[sample_size * i + j];
}
}
}
__global__ void node_query_example(GpuPsCommGraph graph, int start, int size,
int64_t* res) {
const size_t i = blockIdx.x * blockDim.x + threadIdx.x;
......@@ -382,6 +398,18 @@ __global__ void node_query_example(GpuPsCommGraph graph, int start, int size,
}
}
void GpuPsGraphTable::clear_graph_info(int gpu_id) {
if (tables_.size() && tables_[gpu_id] != NULL) {
delete tables_[gpu_id];
}
auto& graph = gpu_graph_list[gpu_id];
if (graph.neighbor_list != NULL) {
cudaFree(graph.neighbor_list);
}
if (graph.node_list != NULL) {
cudaFree(graph.node_list);
}
}
void GpuPsGraphTable::clear_graph_info() {
if (tables_.size()) {
for (auto table : tables_) delete table;
......@@ -406,6 +434,46 @@ In this function, memory is allocated on each gpu to save the graphs,
gpu i saves the ith graph from cpu_graph_list
*/
void GpuPsGraphTable::build_graph_on_single_gpu(GpuPsCommGraph& g, int i) {
clear_graph_info(i);
platform::CUDADeviceGuard guard(resource_->dev_id(i));
// platform::CUDADeviceGuard guard(i);
gpu_graph_list[i] = GpuPsCommGraph();
sample_status[i] = NULL;
tables_[i] = new Table(std::max((unsigned int)1, g.node_size) / load_factor_);
if (g.node_size > 0) {
std::vector<int64_t> keys;
std::vector<unsigned int> offset;
cudaMalloc((void**)&gpu_graph_list[i].node_list,
g.node_size * sizeof(GpuPsGraphNode));
cudaMemcpy(gpu_graph_list[i].node_list, g.node_list,
g.node_size * sizeof(GpuPsGraphNode), cudaMemcpyHostToDevice);
for (unsigned int j = 0; j < g.node_size; j++) {
keys.push_back(g.node_list[j].node_id);
offset.push_back(j);
}
build_ps(i, keys.data(), offset.data(), keys.size(), 1024, 8);
gpu_graph_list[i].node_size = g.node_size;
} else {
build_ps(i, NULL, NULL, 0, 1024, 8);
gpu_graph_list[i].node_list = NULL;
gpu_graph_list[i].node_size = 0;
}
if (g.neighbor_size) {
int* addr;
cudaMalloc((void**)&addr, g.neighbor_size * sizeof(int));
cudaMemset(addr, 0, g.neighbor_size * sizeof(int));
sample_status[i] = addr;
cudaMalloc((void**)&gpu_graph_list[i].neighbor_list,
g.neighbor_size * sizeof(int64_t));
cudaMemcpy(gpu_graph_list[i].neighbor_list, g.neighbor_list,
g.neighbor_size * sizeof(int64_t), cudaMemcpyHostToDevice);
gpu_graph_list[i].neighbor_size = g.neighbor_size;
} else {
gpu_graph_list[i].neighbor_list = NULL;
gpu_graph_list[i].neighbor_size = 0;
}
}
void GpuPsGraphTable::build_graph_from_cpu(
std::vector<GpuPsCommGraph>& cpu_graph_list) {
VLOG(0) << "in build_graph_from_cpu cpu_graph_list size = "
......@@ -418,20 +486,21 @@ void GpuPsGraphTable::build_graph_from_cpu(
for (int i = 0; i < cpu_graph_list.size(); i++) {
platform::CUDADeviceGuard guard(resource_->dev_id(i));
// platform::CUDADeviceGuard guard(i);
gpu_graph_list.push_back(GpuPsCommGraph());
sample_status.push_back(NULL);
auto table =
new Table(std::max(1, cpu_graph_list[i].node_size) / load_factor_);
tables_.push_back(table);
gpu_graph_list[i] = GpuPsCommGraph();
sample_status[i] = NULL;
// auto table =
// new Table(std::max(1, cpu_graph_list[i].node_size) / load_factor_);
tables_[i] = new Table(
std::max((unsigned int)1, cpu_graph_list[i].node_size) / load_factor_);
if (cpu_graph_list[i].node_size > 0) {
std::vector<int64_t> keys;
std::vector<int> offset;
std::vector<unsigned int> offset;
cudaMalloc((void**)&gpu_graph_list[i].node_list,
cpu_graph_list[i].node_size * sizeof(GpuPsGraphNode));
cudaMemcpy(gpu_graph_list[i].node_list, cpu_graph_list[i].node_list,
cpu_graph_list[i].node_size * sizeof(GpuPsGraphNode),
cudaMemcpyHostToDevice);
for (int j = 0; j < cpu_graph_list[i].node_size; j++) {
for (unsigned int j = 0; j < cpu_graph_list[i].node_size; j++) {
keys.push_back(cpu_graph_list[i].node_list[j].node_id);
offset.push_back(j);
}
......@@ -597,15 +666,15 @@ NeighborSampleResult GpuPsGraphTable::graph_neighbor_sample(int gpu_id,
// use the key-value map to update alloc_mem_i[0,shard_len)
// tables_[i]->rwlock_->RDLock();
tables_[i]->get(reinterpret_cast<int64_t*>(node.key_storage),
reinterpret_cast<int*>(node.val_storage),
reinterpret_cast<unsigned int*>(node.val_storage),
h_right[i] - h_left[i] + 1,
resource_->remote_stream(i, gpu_id));
// node.in_stream);
int shard_len = h_right[i] - h_left[i] + 1;
auto graph = gpu_graph_list[i];
int* id_array = reinterpret_cast<int*>(node.val_storage);
int* actual_size_array = id_array + shard_len;
int64_t* sample_array = (int64_t*)(id_array + shard_len * 2);
unsigned int* id_array = reinterpret_cast<unsigned int*>(node.val_storage);
int* actual_size_array = (int*)(id_array + shard_len);
int64_t* sample_array = (int64_t*)(actual_size_array + shard_len);
int sample_grid_size = (shard_len - 1) / dim_y + 1;
dim3 block(parallel_sample_size, dim_y);
dim3 grid(sample_grid_size);
......@@ -738,6 +807,8 @@ NeighborSampleResult GpuPsGraphTable::graph_neighbor_sample_v2(
if (shard_len == 0) {
continue;
}
// create_storage(gpu_id, i, shard_len * sizeof(int64_t),
// shard_len * (1 + sample_size) * sizeof(int64_t));
create_storage(gpu_id, i, shard_len * sizeof(int64_t),
shard_len * (1 + sample_size) * sizeof(int64_t));
}
......@@ -760,15 +831,18 @@ NeighborSampleResult GpuPsGraphTable::graph_neighbor_sample_v2(
platform::CUDADeviceGuard guard(resource_->dev_id(i));
// If not found, val is -1.
tables_[i]->get(reinterpret_cast<int64_t*>(node.key_storage),
reinterpret_cast<int*>(node.val_storage),
reinterpret_cast<unsigned int*>(node.val_storage),
h_right[i] - h_left[i] + 1,
resource_->remote_stream(i, gpu_id));
auto shard_len = h_right[i] - h_left[i] + 1;
auto graph = gpu_graph_list[i];
int* id_array = reinterpret_cast<int*>(node.val_storage);
int* actual_size_array = id_array + shard_len;
int64_t* sample_array = (int64_t*)(id_array + shard_len * 2);
// int* id_array = reinterpret_cast<int*>(node.val_storage);
// int* actual_size_array = id_array + shard_len;
// int64_t* sample_array = (int64_t*)(id_array + shard_len * 2);
unsigned int* id_array = reinterpret_cast<unsigned int*>(node.val_storage);
int* actual_size_array = (int*)(id_array + shard_len);
int64_t* sample_array = (int64_t*)(actual_size_array + shard_len);
constexpr int WARP_SIZE = 32;
constexpr int BLOCK_WARPS = 128 / WARP_SIZE;
constexpr int TILE_SIZE = BLOCK_WARPS * 16;
......@@ -846,6 +920,28 @@ NeighborSampleResult GpuPsGraphTable::graph_neighbor_sample_v2(
fill_dvalues<<<grid_size, block_size_, 0, stream>>>(
d_shard_vals_ptr, val, d_shard_actual_sample_size_ptr, actual_sample_size,
d_idx_ptr, sample_size, len);
{
platform::CUDAPlace place = platform::CUDAPlace(resource_->dev_id(gpu_id));
platform::CUDADeviceGuard guard(resource_->dev_id(gpu_id));
thrust::device_ptr<int> t_actual_sample_size(actual_sample_size);
int total_sample_size =
thrust::reduce(t_actual_sample_size, t_actual_sample_size + len);
result.actual_val_mem =
memory::AllocShared(place, total_sample_size * sizeof(int64_t));
result.actual_val = (int64_t*)(result.actual_val_mem)->ptr();
result.set_total_sample_size(total_sample_size);
thrust::device_vector<int> cumsum_actual_sample_size(len);
thrust::exclusive_scan(t_actual_sample_size, t_actual_sample_size + len,
cumsum_actual_sample_size.begin(), 0);
fill_actual_vals<<<grid_size, block_size_, 0, stream>>>(
val, result.actual_val, actual_sample_size,
thrust::raw_pointer_cast(cumsum_actual_sample_size.data()), sample_size,
len);
}
for (int i = 0; i < total_gpu; ++i) {
int shard_len = h_left[i] == -1 ? 0 : h_right[i] - h_left[i] + 1;
if (shard_len == 0) {
......@@ -868,13 +964,10 @@ NodeQueryResult GpuPsGraphTable::query_node_list(int gpu_id, int start,
if (query_size <= 0) return result;
int& actual_size = result.actual_sample_size;
actual_size = 0;
result.initialize(query_size, resource_->dev_id(gpu_id));
int64_t* val = result.val;
// int dev_id = resource_->dev_id(gpu_id);
// platform::CUDADeviceGuard guard(dev_id);
platform::CUDADeviceGuard guard(resource_->dev_id(gpu_id));
std::vector<int> idx, gpu_begin_pos, local_begin_pos, sample_size;
int size = 0;
std::vector<int> idx, gpu_begin_pos, local_begin_pos;
int sample_size;
/*
if idx[i] = a, gpu_begin_pos[i] = p1,
gpu_local_begin_pos[i] = p2;
......@@ -898,6 +991,31 @@ NodeQueryResult GpuPsGraphTable::query_node_list(int gpu_id, int start,
x2 = max(x1, x);
return y2 - x2;
};
auto graph = gpu_graph_list[gpu_id];
if (graph.node_size == 0) {
return result;
}
int x2, y2;
int len = range_check(start, start + query_size, 0, graph.node_size, x2, y2);
if (len == 0) {
return result;
}
int64_t* val;
sample_size = len;
result.initialize(len, resource_->dev_id(gpu_id));
actual_size = len;
val = result.val;
int dev_id_i = resource_->dev_id(gpu_id);
platform::CUDADeviceGuard guard(dev_id_i);
// platform::CUDADeviceGuard guard(i);
int grid_size = (len - 1) / block_size_ + 1;
node_query_example<<<grid_size, block_size_, 0,
resource_->remote_stream(gpu_id, gpu_id)>>>(
gpu_graph_list[gpu_id], x2, len, (int64_t*)val);
cudaStreamSynchronize(resource_->remote_stream(gpu_id, gpu_id));
return result;
/*
for (int i = 0; i < gpu_graph_list.size() && query_size != 0; i++) {
auto graph = gpu_graph_list[i];
if (graph.node_size == 0) {
......@@ -943,6 +1061,7 @@ NodeQueryResult GpuPsGraphTable::query_node_list(int gpu_id, int start,
destroy_storage(gpu_id, x);
}
return result;
*/
}
}
};
......
......@@ -58,6 +58,11 @@ void GraphGpuWrapper::set_device(std::vector<int> ids) {
device_id_mapping.push_back(device_id);
}
}
std::vector<std::vector<int64_t>> GraphGpuWrapper::get_all_id(int type, int idx,
int slice_num) {
return ((GpuPsGraphTable *)graph_table)
->cpu_graph_table->get_all_id(type, idx, slice_num);
}
void GraphGpuWrapper::set_up_types(std::vector<std::string> &edge_types,
std::vector<std::string> &node_types) {
id_to_edge = edge_types;
......@@ -76,6 +81,32 @@ void GraphGpuWrapper::set_up_types(std::vector<std::string> &edge_types,
this->table_feat_conf_feat_shape.resize(node_types.size());
}
void GraphGpuWrapper::make_partitions(int idx, int64_t byte_size,
int device_len) {
((GpuPsGraphTable *)graph_table)
->cpu_graph_table->make_partitions(idx, byte_size, device_len);
}
int32_t GraphGpuWrapper::load_next_partition(int idx) {
return ((GpuPsGraphTable *)graph_table)
->cpu_graph_table->load_next_partition(idx);
}
void GraphGpuWrapper::set_search_level(int level) {
((GpuPsGraphTable *)graph_table)->cpu_graph_table->set_search_level(level);
}
std::vector<int64_t> GraphGpuWrapper::get_partition(int idx, int num) {
return ((GpuPsGraphTable *)graph_table)
->cpu_graph_table->get_partition(idx, num);
}
int32_t GraphGpuWrapper::get_partition_num(int idx) {
return ((GpuPsGraphTable *)graph_table)
->cpu_graph_table->get_partition_num(idx);
}
void GraphGpuWrapper::make_complementary_graph(int idx, int64_t byte_size) {
((GpuPsGraphTable *)graph_table)
->cpu_graph_table->make_complementary_graph(idx, byte_size);
}
void GraphGpuWrapper::load_edge_file(std::string name, std::string filepath,
bool reverse) {
// 'e' means load edge
......@@ -132,10 +163,11 @@ void GraphGpuWrapper::add_table_feat_conf(std::string table_name,
}
VLOG(0) << "add conf over";
}
void GraphGpuWrapper::init_search_level(int level) { search_level = level; }
void GraphGpuWrapper::init_service() {
table_proto.set_task_pool_size(24);
table_proto.set_search_level(search_level);
table_proto.set_table_name("cpu_graph_table");
table_proto.set_use_cache(false);
for (int i = 0; i < id_to_edge.size(); i++)
......@@ -161,11 +193,16 @@ void GraphGpuWrapper::init_service() {
void GraphGpuWrapper::upload_batch(int idx,
std::vector<std::vector<int64_t>> &ids) {
GpuPsGraphTable *g = (GpuPsGraphTable *)graph_table;
std::vector<paddle::framework::GpuPsCommGraph> vec;
// std::vector<paddle::framework::GpuPsCommGraph> vec;
for (int i = 0; i < ids.size(); i++) {
vec.push_back(g->cpu_graph_table->make_gpu_ps_graph(idx, ids[i]));
// vec.push_back(g->cpu_graph_table->make_gpu_ps_graph(idx, ids[i]));
GpuPsCommGraph sub_graph =
g->cpu_graph_table->make_gpu_ps_graph(idx, ids[i]);
g->build_graph_on_single_gpu(sub_graph, i);
sub_graph.release_on_cpu();
VLOG(0) << "sub graph on gpu " << i << " is built";
}
g->build_graph_from_cpu(vec);
// g->build_graph_from_cpu(vec);
}
void GraphGpuWrapper::initialize() {
......
......@@ -22,7 +22,10 @@ namespace framework {
#ifdef PADDLE_WITH_HETERPS
class GraphGpuWrapper {
public:
char* graph_table;
static GraphGpuWrapper* GetInstance() {
static GraphGpuWrapper wrapper;
return &wrapper;
}
void initialize();
void test();
void set_device(std::vector<int> ids);
......@@ -34,12 +37,22 @@ class GraphGpuWrapper {
std::string feat_dtype, int feat_shape);
void load_edge_file(std::string name, std::string filepath, bool reverse);
void load_node_file(std::string name, std::string filepath);
int32_t load_next_partition(int idx);
int32_t get_partition_num(int idx);
std::vector<int64_t> get_partition(int idx, int num);
void make_partitions(int idx, int64_t byte_size, int device_len);
void make_complementary_graph(int idx, int64_t byte_size);
void set_search_level(int level);
void init_search_level(int level);
std::vector<std::vector<int64_t>> get_all_id(int type, int idx,
int slice_num);
NodeQueryResult query_node_list(int gpu_id, int start, int query_size);
NeighborSampleResult graph_neighbor_sample_v3(NeighborSampleQuery q,
bool cpu_switch);
std::vector<int64_t> graph_neighbor_sample(int gpu_id,
std::vector<int64_t>& key,
int sample_size);
std::unordered_map<std::string, int> edge_to_id, feature_to_id;
std::vector<std::string> id_to_feature, id_to_edge;
std::vector<std::unordered_map<std::string, int>> table_feat_mapping;
......@@ -48,6 +61,8 @@ class GraphGpuWrapper {
std::vector<std::vector<int>> table_feat_conf_feat_shape;
::paddle::distributed::GraphParameter table_proto;
std::vector<int> device_id_mapping;
int search_level = 1;
char* graph_table;
};
#endif
}
......
......@@ -298,6 +298,8 @@ void HashTable<KeyType, ValType>::update(const KeyType* d_keys,
template class HashTable<unsigned long, paddle::framework::FeatureValue>;
template class HashTable<long, int>;
template class HashTable<long, unsigned long>;
template class HashTable<long, unsigned int>;
template void HashTable<unsigned long, paddle::framework::FeatureValue>::get<
cudaStream_t>(const unsigned long* d_keys,
......@@ -308,6 +310,10 @@ template void HashTable<long, int>::get<cudaStream_t>(const long* d_keys,
int* d_vals, size_t len,
cudaStream_t stream);
template void HashTable<long, unsigned long>::get<cudaStream_t>(
const long* d_keys, unsigned long* d_vals, size_t len, cudaStream_t stream);
template void HashTable<long, unsigned int>::get<cudaStream_t>(
const long* d_keys, unsigned int* d_vals, size_t len, cudaStream_t stream);
// template void
// HashTable<unsigned long, paddle::framework::FeatureValue>::get<cudaStream_t>(
// const unsigned long* d_keys, char* d_vals, size_t len, cudaStream_t
......@@ -323,6 +329,14 @@ template void HashTable<long, int>::insert<cudaStream_t>(const long* d_keys,
size_t len,
cudaStream_t stream);
template void HashTable<long, unsigned long>::insert<cudaStream_t>(
const long* d_keys, const unsigned long* d_vals, size_t len,
cudaStream_t stream);
template void HashTable<long, unsigned int>::insert<cudaStream_t>(
const long* d_keys, const unsigned int* d_vals, size_t len,
cudaStream_t stream);
// template void HashTable<unsigned long,
// paddle::framework::FeatureValue>::insert<
// cudaStream_t>(const unsigned long* d_keys, size_t len, char* pool,
......
......@@ -28,6 +28,16 @@ namespace platform = paddle::platform;
// paddle::framework::GpuPsCommGraph GraphTable::make_gpu_ps_graph(
// std::vector<int64_t> ids)
std::string edges[] = {
std::string("0\t1"), std::string("0\t9"), std::string("1\t2"),
std::string("1\t0"), std::string("2\t1"), std::string("2\t3"),
std::string("3\t2"), std::string("3\t4"), std::string("4\t3"),
std::string("4\t5"), std::string("5\t4"), std::string("5\t6"),
std::string("6\t5"), std::string("6\t7"), std::string("7\t6"),
std::string("7\t8"),
};
char edge_file_name[] = "edges1.txt";
std::string nodes[] = {
std::string("user\t37\ta 0.34\tb 13 14\tc hello\td abc"),
std::string("user\t96\ta 0.31\tb 15 10\tc 96hello\td abcd"),
......@@ -53,12 +63,17 @@ std::vector<std::string> user_feature_dtype = {"float32", "int32", "string",
std::vector<std::string> item_feature_dtype = {"float32"};
std::vector<int> user_feature_shape = {1, 2, 1, 1};
std::vector<int> item_feature_shape = {1};
void prepare_file(char file_name[]) {
void prepare_file(char file_name[], bool load_edge) {
std::ofstream ofile;
ofile.open(file_name);
for (auto x : nodes) {
ofile << x << std::endl;
if (load_edge) {
for (auto x : edges) {
ofile << x << std::endl;
}
} else {
for (auto x : nodes) {
ofile << x << std::endl;
}
}
ofile.close();
}
......@@ -85,9 +100,10 @@ TEST(TEST_FLEET, test_cpu_cache) {
g_f1->add_dtype(item_feature_dtype[i]);
g_f1->add_shape(item_feature_shape[i]);
}
prepare_file(node_file_name);
prepare_file(node_file_name, false);
prepare_file(edge_file_name, true);
table_proto.set_shard_num(24);
table_proto.set_search_level(2);
std::shared_ptr<HeterPsResource> resource =
std::make_shared<HeterPsResource>(device_id_mapping);
resource->enable_p2p();
......@@ -120,11 +136,14 @@ TEST(TEST_FLEET, test_cpu_cache) {
}
g.cpu_graph_table->build_sampler(0);
ids1.push_back(5);
ids1.push_back(7);
vec.push_back(g.cpu_graph_table->make_gpu_ps_graph(0, ids0));
vec.push_back(g.cpu_graph_table->make_gpu_ps_graph(0, ids1));
vec[0].display_on_cpu();
vec[1].display_on_cpu();
g.build_graph_from_cpu(vec);
// g.build_graph_from_cpu(vec);
g.build_graph_on_single_gpu(vec[0], 0);
g.build_graph_on_single_gpu(vec[1], 1);
int64_t cpu_key[3] = {0, 1, 2};
/*
std::vector<std::shared_ptr<char>> buffers(3);
......@@ -136,20 +155,84 @@ TEST(TEST_FLEET, test_cpu_cache) {
}
*/
void *key;
platform::CUDADeviceGuard guard(0);
cudaMalloc((void **)&key, 3 * sizeof(int64_t));
cudaMemcpy(key, cpu_key, 3 * sizeof(int64_t), cudaMemcpyHostToDevice);
auto neighbor_sample_res =
g.graph_neighbor_sample_v2(0, (int64_t *)key, 2, 3, true);
neighbor_sample_res.display();
//{1,9} or {9,1} is expected for key 0
//{0,2} or {2,0} is expected for key 1
//{1,3} or {3,1} is expected for key 2
auto node_query_res = g.query_node_list(0, 0, 4);
node_query_res.display();
NeighborSampleQuery query;
query.initialize(0, node_query_res.get_val(), 2, node_query_res.get_len());
query.display();
auto c = g.graph_neighbor_sample_v3(query, false);
c.display();
int device_len = 2;
for (int i = 0; i < 2; i++) {
// platform::CUDADeviceGuard guard(i);
LOG(0) << "query on card " << i;
//{1,9} or {9,1} is expected for key 0
//{0,2} or {2,0} is expected for key 1
//{1,3} or {3,1} is expected for key 2
int step = 2;
int cur = 0;
while (true) {
auto node_query_res = g.query_node_list(i, cur, step);
node_query_res.display();
if (node_query_res.get_len() == 0) {
VLOG(0) << "no more ids,break";
break;
}
cur += node_query_res.get_len();
NeighborSampleQuery query;
query.initialize(i, node_query_res.get_val(), 1,
node_query_res.get_len());
query.display();
auto c = g.graph_neighbor_sample_v3(query, false);
c.display();
}
}
g.cpu_graph_table->set_search_level(2);
// g.cpu_graph_table->Load_to_ssd(edge_file_name,"e>u2u");
g.cpu_graph_table->Load(edge_file_name, "e>u2u");
g.cpu_graph_table->make_partitions(0, 64, 2);
int index = 0;
while (g.cpu_graph_table->load_next_partition(0) != -1) {
auto all_ids = g.cpu_graph_table->get_all_id(0, 0, device_len);
for (auto x : all_ids) {
for (auto y : x) {
VLOG(0) << "part " << index << " " << y;
}
}
for (int i = 0; i < all_ids.size(); i++) {
GpuPsCommGraph sub_graph =
g.cpu_graph_table->make_gpu_ps_graph(0, all_ids[i]);
g.build_graph_on_single_gpu(sub_graph, i);
VLOG(2) << "sub graph on gpu " << i << " is built";
}
VLOG(0) << "start to iterate gpu graph node";
g.cpu_graph_table->make_complementary_graph(0, 64);
for (int i = 0; i < 2; i++) {
// platform::CUDADeviceGuard guard(i);
LOG(0) << "query on card " << i;
int step = 2;
int cur = 0;
while (true) {
auto node_query_res = g.query_node_list(i, cur, step);
node_query_res.display();
if (node_query_res.get_len() == 0) {
VLOG(0) << "no more ids,break";
break;
}
cur += node_query_res.get_len();
NeighborSampleQuery query, q1;
query.initialize(i, node_query_res.get_val(), 4,
node_query_res.get_len());
query.display();
auto c = g.graph_neighbor_sample_v3(query, true);
c.display();
platform::CUDADeviceGuard guard(i);
int64_t *key;
VLOG(0) << "sample key 1 globally";
g.cpu_graph_table->set_search_level(2);
cudaMalloc((void **)&key, sizeof(int64_t));
int64_t t_key = 1;
cudaMemcpy(key, &t_key, sizeof(int64_t), cudaMemcpyHostToDevice);
q1.initialize(i, (int64_t)key, 2, 1);
auto d = g.graph_neighbor_sample_v3(q1, true);
d.display();
cudaFree(key);
g.cpu_graph_table->set_search_level(1);
}
}
index++;
}
}
......@@ -34,7 +34,6 @@ void MultiTrainer::Initialize(const TrainerDesc& trainer_desc,
mpi_rank_ = trainer_desc.mpi_rank();
mpi_size_ = trainer_desc.mpi_size();
dump_file_num_ = trainer_desc.dump_file_num();
for (int i = 0; i < trainer_desc.downpour_param().stat_var_names_size();
i++) {
need_merge_var_names_.push_back(
......
......@@ -325,14 +325,18 @@ void BindNeighborSampleResult(py::module* m) {
py::class_<NeighborSampleResult>(*m, "NeighborSampleResult")
.def(py::init<>())
.def("initialize", &NeighborSampleResult::initialize)
.def("get_len", &NeighborSampleResult::get_len)
.def("get_val", &NeighborSampleResult::get_actual_val)
.def("display", &NeighborSampleResult::display);
}
void BindGraphGpuWrapper(py::module* m) {
py::class_<GraphGpuWrapper>(*m, "GraphGpuWrapper")
.def(py::init<>())
// nit<>())
//.def("test", &GraphGpuWrapper::test)
.def("initialize", &GraphGpuWrapper::initialize)
//.def(py::init([]() { return framework::GraphGpuWrapper::GetInstance();
//}))
.def(py::init<>())
.def("neighbor_sample", &GraphGpuWrapper::graph_neighbor_sample_v3)
.def("graph_neighbor_sample", &GraphGpuWrapper::graph_neighbor_sample)
.def("set_device", &GraphGpuWrapper::set_device)
......@@ -342,6 +346,15 @@ void BindGraphGpuWrapper(py::module* m) {
.def("add_table_feat_conf", &GraphGpuWrapper::add_table_feat_conf)
.def("load_edge_file", &GraphGpuWrapper::load_edge_file)
.def("upload_batch", &GraphGpuWrapper::upload_batch)
.def("get_all_id", &GraphGpuWrapper::get_all_id)
.def("load_next_partition", &GraphGpuWrapper::load_next_partition)
.def("make_partitions", &GraphGpuWrapper::make_partitions)
.def("make_complementary_graph",
&GraphGpuWrapper::make_complementary_graph)
.def("set_search_level", &GraphGpuWrapper::set_search_level)
.def("init_search_level", &GraphGpuWrapper::init_search_level)
.def("get_partition_num", &GraphGpuWrapper::get_partition_num)
.def("get_partition", &GraphGpuWrapper::get_partition)
.def("load_node_file", &GraphGpuWrapper::load_node_file);
}
#endif
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册