未验证 提交 68babef1 编写于 作者: S seemingwang 提交者: GitHub

distribute label evenly among partitions in graph engine (#42846)

* enable graph-engine to return all id

* change vector's dimension

* change vector's dimension

* enlarge returned ids dimensions

* add actual_val

* change vlog

* fix bug

* bug fix

* bug fix

* fix display test

* singleton of gpu_graph_wrapper

* change sample result's structure to fit training

* recover sample code

* fix

* secondary sample

* add graph partition

* fix pybind

* optimize buffer allocation

* fix node transfer problem

* remove log

* support 32G+ graph on single gpu

* remove logs

* fix

* fix

* fix cpu query

* display info

* remove log

* remove empyt file

* distribute labeled data evenly in graph engine
Co-authored-by: NDesmonDay <908660116@qq.com>
上级 6b8efc45
......@@ -246,7 +246,8 @@ void GraphTable::make_partitions(int idx, int64_t byte_size, int device_len) {
VLOG(0) << "no edges are detected,make partitions exits";
return;
}
const float a = 2.0, y = 1.25;
auto &weight_map = node_weight[0][idx];
const double a = 2.0, y = 1.25, weight_param = 1.0;
int64_t gb_size_by_discount = byte_size * 0.8 * device_len;
if (gb_size_by_discount <= 0) gb_size_by_discount = 1;
int part_len = total_memory_cost / gb_size_by_discount;
......@@ -256,8 +257,9 @@ void GraphTable::make_partitions(int idx, int64_t byte_size, int device_len) {
<< " byte size = " << gb_size_by_discount;
partitions[idx].clear();
partitions[idx].resize(part_len);
std::vector<double> weight_cost(part_len, 0);
std::vector<int64_t> memory_remaining(part_len, gb_size_by_discount);
std::vector<float> score(part_len, 0);
std::vector<double> score(part_len, 0);
std::unordered_map<int64_t, int> id_map;
std::vector<rocksdb::Iterator *> iters;
for (int i = 0; i < task_pool_size_; i++) {
......@@ -274,14 +276,15 @@ void GraphTable::make_partitions(int idx, int64_t byte_size, int device_len) {
continue;
}
std::string key = iters[next]->key().ToString();
int type_idx = *(int *)key.c_str();
int temp_idx = *(int *)(key.c_str() + sizeof(int));
if (temp_idx != idx) {
if (type_idx != 0 || temp_idx != idx) {
iters[next]->Next();
next++;
continue;
}
std::string value = iters[next]->value().ToString();
std::int64_t i_key = *(int64_t *)(key.c_str() + 8);
std::int64_t i_key = *(int64_t *)(key.c_str() + sizeof(int) * 2);
for (int i = 0; i < part_len; i++) {
if (memory_remaining[i] < (int64_t)value.size()) {
score[i] = -100000.0;
......@@ -297,11 +300,22 @@ void GraphTable::make_partitions(int idx, int64_t byte_size, int device_len) {
score[index]++;
}
}
float base;
double base, weight_base = 0;
double w = 0;
bool has_weight = false;
if (weight_map.find(i_key) != weight_map.end()) {
w = weight_map[i_key];
has_weight = true;
}
int index = 0;
for (int i = 0; i < part_len; i++) {
base = gb_size_by_discount - memory_remaining[i];
score[i] -= a * y * std::pow(1.0 * base, y - 1);
base = gb_size_by_discount - memory_remaining[i] + value.size();
if (has_weight)
weight_base = weight_cost[i] + w * weight_param;
else {
weight_base = 0;
}
score[i] -= a * y * std::pow(1.0 * base, y - 1) + weight_base;
if (score[i] > score[index]) index = i;
VLOG(2) << "score" << i << " = " << score[i] << " memory left "
<< memory_remaining[i];
......@@ -309,6 +323,7 @@ void GraphTable::make_partitions(int idx, int64_t byte_size, int device_len) {
id_map[i_key] = index;
partitions[idx][index].push_back(i_key);
memory_remaining[index] -= (int64_t)value.size();
if (has_weight) weight_cost[index] += w;
iters[next]->Next();
next++;
}
......@@ -327,6 +342,38 @@ void GraphTable::make_partitions(int idx, int64_t byte_size, int device_len) {
next_partition = 0;
}
void GraphTable::export_partition_files(int idx, std::string file_path) {
int part_len = partitions[idx].size();
if (part_len == 0) return;
if (file_path == "") file_path = ".";
if (file_path[(int)file_path.size() - 1] != '/') {
file_path += "/";
}
std::vector<std::future<int>> tasks;
for (int i = 0; i < part_len; i++) {
tasks.push_back(_shards_task_pool[i % task_pool_size_]->enqueue(
[&, i, idx, this]() -> int {
std::string output_path =
file_path + "partition_" + std::to_string(i);
std::ofstream ofs(output_path);
if (ofs.fail()) {
VLOG(0) << "creating " << output_path << " failed";
return 0;
}
for (auto x : partitions[idx][i]) {
auto str = std::to_string(x);
ofs.write(str.c_str(), str.size());
ofs.write("\n", 1);
}
ofs.close();
return 0;
}));
}
for (int i = 0; i < (int)tasks.size(); i++) tasks[i].get();
}
void GraphTable::clear_graph(int idx) {
for (auto p : edge_shards[idx]) {
delete p;
......@@ -1516,8 +1563,27 @@ int32_t GraphTable::Initialize(const TableParameter &config,
LOG(INFO) << "in graphTable initialize over";
return Initialize(graph);
}
void GraphTable::load_node_weight(int type_id, int idx, std::string path) {
auto paths = paddle::string::split_string<std::string>(path, ";");
int64_t count = 0;
auto &weight_map = node_weight[type_id][idx];
for (auto path : paths) {
std::ifstream file(path);
std::string line;
while (std::getline(file, line)) {
auto values = paddle::string::split_string<std::string>(line, "\t");
count++;
if (values.size() < 2) continue;
auto src_id = std::stoull(values[0]);
double weight = std::stod(values[1]);
weight_map[src_id] = weight;
}
}
}
int32_t GraphTable::Initialize(const GraphParameter &graph) {
task_pool_size_ = graph.task_pool_size();
#ifdef PADDLE_WITH_HETERPS
_db = NULL;
search_level = graph.search_level();
......@@ -1603,6 +1669,8 @@ int32_t GraphTable::Initialize(const GraphParameter &graph) {
VLOG(0) << "in init graph table shard idx = " << _shard_idx << " shard_start "
<< shard_start << " shard_end " << shard_end;
edge_shards.resize(id_to_edge.size());
node_weight.resize(2);
node_weight[0].resize(id_to_edge.size());
#ifdef PADDLE_WITH_HETERPS
partitions.resize(id_to_edge.size());
#endif
......@@ -1611,6 +1679,7 @@ int32_t GraphTable::Initialize(const GraphParameter &graph) {
edge_shards[k].push_back(new GraphShard());
}
}
node_weight[1].resize(id_to_feature.size());
feature_shards.resize(id_to_feature.size());
for (int k = 0; k < (int)feature_shards.size(); k++) {
for (size_t i = 0; i < shard_num_per_server; i++) {
......
......@@ -537,6 +537,7 @@ class GraphTable : public Table {
}
return 0;
}
virtual void load_node_weight(int type_id, int idx, std::string path);
#ifdef PADDLE_WITH_HETERPS
// virtual int32_t start_graph_sampling() {
// return this->graph_sampler->start_graph_sampling();
......@@ -551,6 +552,7 @@ class GraphTable : public Table {
// return 0;
// }
virtual void make_partitions(int idx, int64_t gb_size, int device_len);
virtual void export_partition_files(int idx, std::string file_path);
virtual char *random_sample_neighbor_from_ssd(
int idx, int64_t id, int sample_size,
const std::shared_ptr<std::mt19937_64> rng, int &actual_size);
......@@ -572,7 +574,6 @@ class GraphTable : public Table {
const std::string &edge_type);
int32_t load_next_partition(int idx);
void set_search_level(int search_level) { this->search_level = search_level; }
// virtual GraphSampler *get_graph_sampler() { return graph_sampler.get(); }
int search_level;
int64_t total_memory_cost;
std::vector<std::vector<std::vector<int64_t>>> partitions;
......@@ -585,6 +586,7 @@ class GraphTable : public Table {
int task_pool_size_ = 24;
const int random_sample_nodes_ranges = 3;
std::vector<std::vector<std::unordered_map<int64_t, double>>> node_weight;
std::vector<std::vector<std::string>> feat_name;
std::vector<std::vector<std::string>> feat_dtype;
std::vector<std::vector<int32_t>> feat_shape;
......
......@@ -261,6 +261,15 @@ NodeQueryResult GraphGpuWrapper::query_node_list(int gpu_id, int start,
return ((GpuPsGraphTable *)graph_table)
->query_node_list(gpu_id, start, query_size);
}
void GraphGpuWrapper::load_node_weight(int type_id, int idx, std::string path) {
return ((GpuPsGraphTable *)graph_table)
->cpu_graph_table->load_node_weight(type_id, idx, path);
}
void GraphGpuWrapper::export_partition_files(int idx, std::string file_path) {
return ((GpuPsGraphTable *)graph_table)
->cpu_graph_table->export_partition_files(idx, file_path);
}
#endif
}
};
......@@ -43,6 +43,8 @@ class GraphGpuWrapper {
void load_node_file(std::string name, std::string filepath);
int32_t load_next_partition(int idx);
int32_t get_partition_num(int idx);
void load_node_weight(int type_id, int idx, std::string path);
void export_partition_files(int idx, std::string file_path);
std::vector<int64_t> get_partition(int idx, int num);
void make_partitions(int idx, int64_t byte_size, int device_len);
void make_complementary_graph(int idx, int64_t byte_size);
......
......@@ -335,7 +335,6 @@ void BindGraphGpuWrapper(py::module* m) {
py::class_<GraphGpuWrapper, std::shared_ptr<GraphGpuWrapper>>(
*m, "GraphGpuWrapper")
.def(py::init([]() { return GraphGpuWrapper::GetInstance(); }))
// .def(py::init<>())
.def("neighbor_sample", &GraphGpuWrapper::graph_neighbor_sample_v3)
.def("graph_neighbor_sample", &GraphGpuWrapper::graph_neighbor_sample)
.def("set_device", &GraphGpuWrapper::set_device)
......@@ -356,6 +355,8 @@ void BindGraphGpuWrapper(py::module* m) {
.def("init_search_level", &GraphGpuWrapper::init_search_level)
.def("get_partition_num", &GraphGpuWrapper::get_partition_num)
.def("get_partition", &GraphGpuWrapper::get_partition)
.def("load_node_weight", &GraphGpuWrapper::load_node_weight)
.def("export_partition_files", &GraphGpuWrapper::export_partition_files)
.def("load_node_file", &GraphGpuWrapper::load_node_file);
}
#endif
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册