diff --git a/paddle/fluid/distributed/ps/table/common_graph_table.cc b/paddle/fluid/distributed/ps/table/common_graph_table.cc index 88f0211160003bb6ec05f48a54abd8a23cefaa85..43dee275a3dc690dfc16cbc2149d47368ea66fb5 100644 --- a/paddle/fluid/distributed/ps/table/common_graph_table.cc +++ b/paddle/fluid/distributed/ps/table/common_graph_table.cc @@ -246,7 +246,8 @@ void GraphTable::make_partitions(int idx, int64_t byte_size, int device_len) { VLOG(0) << "no edges are detected,make partitions exits"; return; } - const float a = 2.0, y = 1.25; + auto &weight_map = node_weight[0][idx]; + const double a = 2.0, y = 1.25, weight_param = 1.0; int64_t gb_size_by_discount = byte_size * 0.8 * device_len; if (gb_size_by_discount <= 0) gb_size_by_discount = 1; int part_len = total_memory_cost / gb_size_by_discount; @@ -256,8 +257,9 @@ void GraphTable::make_partitions(int idx, int64_t byte_size, int device_len) { << " byte size = " << gb_size_by_discount; partitions[idx].clear(); partitions[idx].resize(part_len); + std::vector weight_cost(part_len, 0); std::vector memory_remaining(part_len, gb_size_by_discount); - std::vector score(part_len, 0); + std::vector score(part_len, 0); std::unordered_map id_map; std::vector iters; for (int i = 0; i < task_pool_size_; i++) { @@ -274,14 +276,15 @@ void GraphTable::make_partitions(int idx, int64_t byte_size, int device_len) { continue; } std::string key = iters[next]->key().ToString(); + int type_idx = *(int *)key.c_str(); int temp_idx = *(int *)(key.c_str() + sizeof(int)); - if (temp_idx != idx) { + if (type_idx != 0 || temp_idx != idx) { iters[next]->Next(); next++; continue; } std::string value = iters[next]->value().ToString(); - std::int64_t i_key = *(int64_t *)(key.c_str() + 8); + std::int64_t i_key = *(int64_t *)(key.c_str() + sizeof(int) * 2); for (int i = 0; i < part_len; i++) { if (memory_remaining[i] < (int64_t)value.size()) { score[i] = -100000.0; @@ -297,11 +300,22 @@ void GraphTable::make_partitions(int idx, int64_t byte_size, int device_len) { score[index]++; } } - float base; + double base, weight_base = 0; + double w = 0; + bool has_weight = false; + if (weight_map.find(i_key) != weight_map.end()) { + w = weight_map[i_key]; + has_weight = true; + } int index = 0; for (int i = 0; i < part_len; i++) { - base = gb_size_by_discount - memory_remaining[i]; - score[i] -= a * y * std::pow(1.0 * base, y - 1); + base = gb_size_by_discount - memory_remaining[i] + value.size(); + if (has_weight) + weight_base = weight_cost[i] + w * weight_param; + else { + weight_base = 0; + } + score[i] -= a * y * std::pow(1.0 * base, y - 1) + weight_base; if (score[i] > score[index]) index = i; VLOG(2) << "score" << i << " = " << score[i] << " memory left " << memory_remaining[i]; @@ -309,6 +323,7 @@ void GraphTable::make_partitions(int idx, int64_t byte_size, int device_len) { id_map[i_key] = index; partitions[idx][index].push_back(i_key); memory_remaining[index] -= (int64_t)value.size(); + if (has_weight) weight_cost[index] += w; iters[next]->Next(); next++; } @@ -327,6 +342,38 @@ void GraphTable::make_partitions(int idx, int64_t byte_size, int device_len) { next_partition = 0; } +void GraphTable::export_partition_files(int idx, std::string file_path) { + int part_len = partitions[idx].size(); + if (part_len == 0) return; + if (file_path == "") file_path = "."; + if (file_path[(int)file_path.size() - 1] != '/') { + file_path += "/"; + } + std::vector> tasks; + for (int i = 0; i < part_len; i++) { + tasks.push_back(_shards_task_pool[i % task_pool_size_]->enqueue( + [&, i, idx, this]() -> int { + + std::string output_path = + file_path + "partition_" + std::to_string(i); + + std::ofstream ofs(output_path); + if (ofs.fail()) { + VLOG(0) << "creating " << output_path << " failed"; + return 0; + } + for (auto x : partitions[idx][i]) { + auto str = std::to_string(x); + ofs.write(str.c_str(), str.size()); + ofs.write("\n", 1); + } + ofs.close(); + return 0; + })); + } + + for (int i = 0; i < (int)tasks.size(); i++) tasks[i].get(); +} void GraphTable::clear_graph(int idx) { for (auto p : edge_shards[idx]) { delete p; @@ -1516,8 +1563,27 @@ int32_t GraphTable::Initialize(const TableParameter &config, LOG(INFO) << "in graphTable initialize over"; return Initialize(graph); } + +void GraphTable::load_node_weight(int type_id, int idx, std::string path) { + auto paths = paddle::string::split_string(path, ";"); + int64_t count = 0; + auto &weight_map = node_weight[type_id][idx]; + for (auto path : paths) { + std::ifstream file(path); + std::string line; + while (std::getline(file, line)) { + auto values = paddle::string::split_string(line, "\t"); + count++; + if (values.size() < 2) continue; + auto src_id = std::stoull(values[0]); + double weight = std::stod(values[1]); + weight_map[src_id] = weight; + } + } +} int32_t GraphTable::Initialize(const GraphParameter &graph) { task_pool_size_ = graph.task_pool_size(); + #ifdef PADDLE_WITH_HETERPS _db = NULL; search_level = graph.search_level(); @@ -1603,6 +1669,8 @@ int32_t GraphTable::Initialize(const GraphParameter &graph) { VLOG(0) << "in init graph table shard idx = " << _shard_idx << " shard_start " << shard_start << " shard_end " << shard_end; edge_shards.resize(id_to_edge.size()); + node_weight.resize(2); + node_weight[0].resize(id_to_edge.size()); #ifdef PADDLE_WITH_HETERPS partitions.resize(id_to_edge.size()); #endif @@ -1611,6 +1679,7 @@ int32_t GraphTable::Initialize(const GraphParameter &graph) { edge_shards[k].push_back(new GraphShard()); } } + node_weight[1].resize(id_to_feature.size()); feature_shards.resize(id_to_feature.size()); for (int k = 0; k < (int)feature_shards.size(); k++) { for (size_t i = 0; i < shard_num_per_server; i++) { diff --git a/paddle/fluid/distributed/ps/table/common_graph_table.h b/paddle/fluid/distributed/ps/table/common_graph_table.h index 2d869dc805a940a7151ba30a1abd1c431ff69f61..25bec5276e729371f4b40c6070904e2f34655784 100644 --- a/paddle/fluid/distributed/ps/table/common_graph_table.h +++ b/paddle/fluid/distributed/ps/table/common_graph_table.h @@ -537,6 +537,7 @@ class GraphTable : public Table { } return 0; } + virtual void load_node_weight(int type_id, int idx, std::string path); #ifdef PADDLE_WITH_HETERPS // virtual int32_t start_graph_sampling() { // return this->graph_sampler->start_graph_sampling(); @@ -551,6 +552,7 @@ class GraphTable : public Table { // return 0; // } virtual void make_partitions(int idx, int64_t gb_size, int device_len); + virtual void export_partition_files(int idx, std::string file_path); virtual char *random_sample_neighbor_from_ssd( int idx, int64_t id, int sample_size, const std::shared_ptr rng, int &actual_size); @@ -572,7 +574,6 @@ class GraphTable : public Table { const std::string &edge_type); int32_t load_next_partition(int idx); void set_search_level(int search_level) { this->search_level = search_level; } - // virtual GraphSampler *get_graph_sampler() { return graph_sampler.get(); } int search_level; int64_t total_memory_cost; std::vector>> partitions; @@ -585,6 +586,7 @@ class GraphTable : public Table { int task_pool_size_ = 24; const int random_sample_nodes_ranges = 3; + std::vector>> node_weight; std::vector> feat_name; std::vector> feat_dtype; std::vector> feat_shape; diff --git a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu index a3f2a0c5b92a913b391d9b2ef8ffed75a40a2007..c976bb67cb21e102f7e5fd4c18de20b3170d3c70 100644 --- a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu +++ b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu @@ -261,6 +261,15 @@ NodeQueryResult GraphGpuWrapper::query_node_list(int gpu_id, int start, return ((GpuPsGraphTable *)graph_table) ->query_node_list(gpu_id, start, query_size); } +void GraphGpuWrapper::load_node_weight(int type_id, int idx, std::string path) { + return ((GpuPsGraphTable *)graph_table) + ->cpu_graph_table->load_node_weight(type_id, idx, path); +} + +void GraphGpuWrapper::export_partition_files(int idx, std::string file_path) { + return ((GpuPsGraphTable *)graph_table) + ->cpu_graph_table->export_partition_files(idx, file_path); +} #endif } }; diff --git a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h index d8b11682bc8c5d30b4402d592c4b4eed37dc7243..a34e752fc7ea7d4ff988f24a0eb9be4684f2ba1a 100644 --- a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h +++ b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h @@ -43,6 +43,8 @@ class GraphGpuWrapper { void load_node_file(std::string name, std::string filepath); int32_t load_next_partition(int idx); int32_t get_partition_num(int idx); + void load_node_weight(int type_id, int idx, std::string path); + void export_partition_files(int idx, std::string file_path); std::vector get_partition(int idx, int num); void make_partitions(int idx, int64_t byte_size, int device_len); void make_complementary_graph(int idx, int64_t byte_size); diff --git a/paddle/fluid/pybind/fleet_py.cc b/paddle/fluid/pybind/fleet_py.cc index 2549240aa15daac619334316f1a76c58fe626cdb..4ffb513671c56598356a0311ecec15772dcc917d 100644 --- a/paddle/fluid/pybind/fleet_py.cc +++ b/paddle/fluid/pybind/fleet_py.cc @@ -335,7 +335,6 @@ void BindGraphGpuWrapper(py::module* m) { py::class_>( *m, "GraphGpuWrapper") .def(py::init([]() { return GraphGpuWrapper::GetInstance(); })) - // .def(py::init<>()) .def("neighbor_sample", &GraphGpuWrapper::graph_neighbor_sample_v3) .def("graph_neighbor_sample", &GraphGpuWrapper::graph_neighbor_sample) .def("set_device", &GraphGpuWrapper::set_device) @@ -356,6 +355,8 @@ void BindGraphGpuWrapper(py::module* m) { .def("init_search_level", &GraphGpuWrapper::init_search_level) .def("get_partition_num", &GraphGpuWrapper::get_partition_num) .def("get_partition", &GraphGpuWrapper::get_partition) + .def("load_node_weight", &GraphGpuWrapper::load_node_weight) + .def("export_partition_files", &GraphGpuWrapper::export_partition_files) .def("load_node_file", &GraphGpuWrapper::load_node_file); } #endif