未验证 提交 798670bb 编写于 作者: D danleifeng 提交者: GitHub
上级 1149a378
......@@ -241,3 +241,6 @@ endif()
if(WITH_CUSTOM_DEVICE AND NOT WIN32)
add_definitions(-DPADDLE_WITH_CUSTOM_DEVICE)
endif()
if(WITH_GPU_GRAPH)
add_definitions(-DPADDLE_WITH_GPU_GRAPH)
endif()
......@@ -144,10 +144,8 @@ int32_t GraphBrpcService::add_graph_node(Table *table,
int idx_ = *(int *)(request.params(0).c_str());
size_t node_num = request.params(1).size() / sizeof(int64_t);
int64_t *node_data = (int64_t *)(request.params(1).c_str());
// size_t node_num = request.params(0).size() / sizeof(int64_t);
// int64_t *node_data = (int64_t *)(request.params(0).c_str());
std::vector<int64_t> node_ids(node_data, node_data + node_num);
uint64_t *node_data = (uint64_t *)(request.params(1).c_str());
std::vector<uint64_t> node_ids(node_data, node_data + node_num);
std::vector<bool> is_weighted_list;
if (request.params_size() == 3) {
size_t weight_list_size = request.params(2).size() / sizeof(bool);
......@@ -179,11 +177,9 @@ int32_t GraphBrpcService::remove_graph_node(Table *table,
return 0;
}
int idx_ = *(int *)(request.params(0).c_str());
size_t node_num = request.params(1).size() / sizeof(int64_t);
int64_t *node_data = (int64_t *)(request.params(1).c_str());
// size_t node_num = request.params(0).size() / sizeof(int64_t);
// int64_t *node_data = (int64_t *)(request.params(0).c_str());
std::vector<int64_t> node_ids(node_data, node_data + node_num);
size_t node_num = request.params(1).size() / sizeof(uint64_t);
uint64_t *node_data = (uint64_t *)(request.params(1).c_str());
std::vector<uint64_t> node_ids(node_data, node_data + node_num);
((GraphTable *)table)->remove_graph_node(idx_, node_ids);
return 0;
......@@ -217,11 +213,6 @@ int32_t GraphBrpcService::Initialize() {
&GraphBrpcService::graph_set_node_feat;
_service_handler_map[PS_GRAPH_SAMPLE_NODES_FROM_ONE_SERVER] =
&GraphBrpcService::sample_neighbors_across_multi_servers;
// _service_handler_map[PS_GRAPH_USE_NEIGHBORS_SAMPLE_CACHE] =
// &GraphBrpcService::use_neighbors_sample_cache;
// _service_handler_map[PS_GRAPH_LOAD_GRAPH_SPLIT_CONFIG] =
// &GraphBrpcService::load_graph_split_config;
// shard初始化,server启动后才可从env获取到server_list的shard信息
InitializeShardInfo();
return 0;
......@@ -389,9 +380,6 @@ int32_t GraphBrpcService::pull_graph_list(Table *table,
int start = *(int *)(request.params(2).c_str());
int size = *(int *)(request.params(3).c_str());
int step = *(int *)(request.params(4).c_str());
// int start = *(int *)(request.params(0).c_str());
// int size = *(int *)(request.params(1).c_str());
// int step = *(int *)(request.params(2).c_str());
std::unique_ptr<char[]> buffer;
int actual_size;
((GraphTable *)table)
......@@ -414,14 +402,10 @@ int32_t GraphBrpcService::graph_random_sample_neighbors(
return 0;
}
int idx_ = *(int *)(request.params(0).c_str());
size_t node_num = request.params(1).size() / sizeof(int64_t);
int64_t *node_data = (int64_t *)(request.params(1).c_str());
int sample_size = *(int64_t *)(request.params(2).c_str());
size_t node_num = request.params(1).size() / sizeof(uint64_t);
uint64_t *node_data = (uint64_t *)(request.params(1).c_str());
int sample_size = *(int *)(request.params(2).c_str());
bool need_weight = *(bool *)(request.params(3).c_str());
// size_t node_num = request.params(0).size() / sizeof(int64_t);
// int64_t *node_data = (int64_t *)(request.params(0).c_str());
// int sample_size = *(int64_t *)(request.params(1).c_str());
// bool need_weight = *(bool *)(request.params(2).c_str());
std::vector<std::shared_ptr<char>> buffers(node_num);
std::vector<int> actual_sizes(node_num, 0);
((GraphTable *)table)
......@@ -443,7 +427,7 @@ int32_t GraphBrpcService::graph_random_sample_nodes(
brpc::Controller *cntl) {
int type_id = *(int *)(request.params(0).c_str());
int idx_ = *(int *)(request.params(1).c_str());
size_t size = *(int64_t *)(request.params(2).c_str());
size_t size = *(uint64_t *)(request.params(2).c_str());
// size_t size = *(int64_t *)(request.params(0).c_str());
std::unique_ptr<char[]> buffer;
int actual_size;
......@@ -470,11 +454,9 @@ int32_t GraphBrpcService::graph_get_node_feat(Table *table,
return 0;
}
int idx_ = *(int *)(request.params(0).c_str());
size_t node_num = request.params(1).size() / sizeof(int64_t);
int64_t *node_data = (int64_t *)(request.params(1).c_str());
// size_t node_num = request.params(0).size() / sizeof(int64_t);
// int64_t *node_data = (int64_t *)(request.params(0).c_str());
std::vector<int64_t> node_ids(node_data, node_data + node_num);
size_t node_num = request.params(1).size() / sizeof(uint64_t);
uint64_t *node_data = (uint64_t *)(request.params(1).c_str());
std::vector<uint64_t> node_ids(node_data, node_data + node_num);
std::vector<std::string> feature_names =
paddle::string::split_string<std::string>(request.params(2), "\t");
......@@ -511,21 +493,14 @@ int32_t GraphBrpcService::sample_neighbors_across_multi_servers(
}
int idx_ = *(int *)(request.params(0).c_str());
size_t node_num = request.params(1).size() / sizeof(int64_t);
int64_t *node_data = (int64_t *)(request.params(1).c_str());
int sample_size = *(int64_t *)(request.params(2).c_str());
bool need_weight = *(int64_t *)(request.params(3).c_str());
// size_t node_num = request.params(0).size() / sizeof(int64_t),
// size_of_size_t = sizeof(size_t);
// int64_t *node_data = (int64_t *)(request.params(0).c_str());
// int sample_size = *(int64_t *)(request.params(1).c_str());
// bool need_weight = *(int64_t *)(request.params(2).c_str());
// std::vector<int64_t> res = ((GraphTable
// *)table).filter_out_non_exist_nodes(node_data, sample_size);
size_t node_num = request.params(1).size() / sizeof(uint64_t);
uint64_t *node_data = (uint64_t *)(request.params(1).c_str());
int sample_size = *(int *)(request.params(2).c_str());
bool need_weight = *(bool *)(request.params(3).c_str());
std::vector<int> request2server;
std::vector<int> server2request(server_size, -1);
std::vector<int64_t> local_id;
std::vector<uint64_t> local_id;
std::vector<int> local_query_idx;
size_t rank = GetRank();
for (size_t query_idx = 0; query_idx < node_num; ++query_idx) {
......@@ -548,7 +523,7 @@ int32_t GraphBrpcService::sample_neighbors_across_multi_servers(
std::vector<std::shared_ptr<char>> local_buffers;
std::vector<int> local_actual_sizes;
std::vector<size_t> seq;
std::vector<std::vector<int64_t>> node_id_buckets(request_call_num);
std::vector<std::vector<uint64_t>> node_id_buckets(request_call_num);
std::vector<std::vector<int>> query_idx_buckets(request_call_num);
for (size_t query_idx = 0; query_idx < node_num; ++query_idx) {
int server_index =
......@@ -639,7 +614,7 @@ int32_t GraphBrpcService::sample_neighbors_across_multi_servers(
closure->request(request_idx)
->add_params((char *)node_id_buckets[request_idx].data(),
sizeof(int64_t) * node_num);
sizeof(uint64_t) * node_num);
closure->request(request_idx)
->add_params((char *)&sample_size, sizeof(int));
closure->request(request_idx)
......@@ -682,11 +657,9 @@ int32_t GraphBrpcService::graph_set_node_feat(Table *table,
}
int idx_ = *(int *)(request.params(0).c_str());
// size_t node_num = request.params(0).size() / sizeof(int64_t);
// int64_t *node_data = (int64_t *)(request.params(0).c_str());
size_t node_num = request.params(1).size() / sizeof(int64_t);
int64_t *node_data = (int64_t *)(request.params(1).c_str());
std::vector<int64_t> node_ids(node_data, node_data + node_num);
size_t node_num = request.params(1).size() / sizeof(uint64_t);
uint64_t *node_data = (uint64_t *)(request.params(1).c_str());
std::vector<uint64_t> node_ids(node_data, node_data + node_num);
// std::vector<std::string> feature_names =
// paddle::string::split_string<std::string>(request.params(1), "\t");
......
......@@ -18,7 +18,7 @@ set_source_files_properties(
cc_library(
graph_node
SRCS ${graphDir}/graph_node.cc
DEPS WeightedSampler)
DEPS WeightedSampler enforce)
set_source_files_properties(
memory_dense_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(
......
......@@ -58,33 +58,80 @@ class GraphShard {
~GraphShard();
std::vector<Node *> &get_bucket() { return bucket; }
std::vector<Node *> get_batch(int start, int end, int step);
std::vector<int64_t> get_ids_by_range(int start, int end) {
std::vector<int64_t> res;
void get_ids_by_range(int start, int end, std::vector<uint64_t> *res) {
res->reserve(res->size() + end - start);
for (int i = start; i < end && i < (int)bucket.size(); i++) {
res.push_back(bucket[i]->get_id());
res->emplace_back(bucket[i]->get_id());
}
return res;
}
std::vector<int64_t> get_all_id() {
std::vector<int64_t> res;
size_t get_all_id(std::vector<std::vector<uint64_t>> *shard_keys,
int slice_num) {
int bucket_num = bucket.size();
shard_keys->resize(slice_num);
for (int i = 0; i < slice_num; ++i) {
(*shard_keys)[i].reserve(bucket_num / slice_num);
}
for (int i = 0; i < bucket_num; i++) {
uint64_t k = bucket[i]->get_id();
(*shard_keys)[k % slice_num].emplace_back(k);
}
return bucket_num;
}
size_t get_all_neighbor_id(std::vector<std::vector<uint64_t>> *total_res,
int slice_num) {
std::vector<uint64_t> keys;
for (size_t i = 0; i < bucket.size(); i++) {
size_t neighbor_size = bucket[i]->get_neighbor_size();
size_t n = keys.size();
keys.resize(n + neighbor_size);
for (size_t j = 0; j < neighbor_size; j++) {
keys[n + j] = bucket[i]->get_neighbor_id(j);
}
}
return dedup2shard_keys(&keys, total_res, slice_num);
}
size_t get_all_feature_ids(std::vector<std::vector<uint64_t>> *total_res,
int slice_num) {
std::vector<uint64_t> keys;
for (int i = 0; i < (int)bucket.size(); i++) {
res.push_back(bucket[i]->get_id());
bucket[i]->get_feature_ids(&keys);
}
return dedup2shard_keys(&keys, total_res, slice_num);
}
size_t dedup2shard_keys(std::vector<uint64_t> *keys,
std::vector<std::vector<uint64_t>> *total_res,
int slice_num) {
size_t num = keys->size();
uint64_t last_key = 0;
// sort key insert to vector
std::sort(keys->begin(), keys->end());
total_res->resize(slice_num);
for (int shard_id = 0; shard_id < slice_num; ++shard_id) {
(*total_res)[shard_id].reserve(num / slice_num);
}
return res;
for (size_t i = 0; i < num; ++i) {
const uint64_t &k = (*keys)[i];
if (i > 0 && last_key == k) {
continue;
}
last_key = k;
(*total_res)[k % slice_num].push_back(k);
}
return num;
}
GraphNode *add_graph_node(int64_t id);
GraphNode *add_graph_node(uint64_t id);
GraphNode *add_graph_node(Node *node);
FeatureNode *add_feature_node(int64_t id);
Node *find_node(int64_t id);
void delete_node(int64_t id);
FeatureNode *add_feature_node(uint64_t id, bool is_overlap = true);
Node *find_node(uint64_t id);
void delete_node(uint64_t id);
void clear();
void add_neighbor(int64_t id, int64_t dst_id, float weight);
std::unordered_map<int64_t, int> &get_node_location() {
void add_neighbor(uint64_t id, uint64_t dst_id, float weight);
std::unordered_map<uint64_t, int> &get_node_location() {
return node_location;
}
private:
std::unordered_map<int64_t, int> node_location;
std::unordered_map<uint64_t, int> node_location;
std::vector<Node *> bucket;
};
......@@ -92,11 +139,11 @@ enum LRUResponse { ok = 0, blocked = 1, err = 2 };
struct SampleKey {
int idx;
int64_t node_key;
uint64_t node_key;
size_t sample_size;
bool is_weighted;
SampleKey(int _idx,
int64_t _node_key,
uint64_t _node_key,
size_t _sample_size,
bool _is_weighted) {
idx = _idx;
......@@ -467,7 +514,7 @@ class GraphTable : public Table {
virtual int32_t random_sample_neighbors(
int idx,
int64_t *node_ids,
uint64_t *node_ids,
int sample_size,
std::vector<std::shared_ptr<char>> &buffers,
std::vector<int> &actual_sizes,
......@@ -483,30 +530,62 @@ class GraphTable : public Table {
int type_id,
int idx,
std::vector<std::pair<int, int>> ranges,
std::vector<int64_t> &res);
std::vector<uint64_t> &res);
virtual int32_t Initialize() { return 0; }
virtual int32_t Initialize(const TableParameter &config,
const FsClientParameter &fs_config);
virtual int32_t Initialize(const GraphParameter &config);
int32_t Load(const std::string &path, const std::string &param);
int32_t load_node_and_edge_file(std::string etype,
std::string ntype,
std::string epath,
std::string npath,
int part_num,
bool reverse);
std::string get_inverse_etype(std::string &etype);
int32_t load_edges(const std::string &path,
bool reverse,
const std::string &edge_type);
std::vector<std::vector<int64_t>> get_all_id(int type,
int idx,
int slice_num);
int32_t load_nodes(const std::string &path, std::string node_type);
int get_all_id(int type,
int slice_num,
std::vector<std::vector<uint64_t>> *output);
int get_all_neighbor_id(int type,
int slice_num,
std::vector<std::vector<uint64_t>> *output);
int get_all_id(int type,
int idx,
int slice_num,
std::vector<std::vector<uint64_t>> *output);
int get_all_neighbor_id(int type_id,
int id,
int slice_num,
std::vector<std::vector<uint64_t>> *output);
int get_all_feature_ids(int type,
int idx,
int slice_num,
std::vector<std::vector<uint64_t>> *output);
int32_t load_nodes(const std::string &path,
std::string node_type = std::string());
std::pair<uint64_t, uint64_t> parse_edge_file(const std::string &path,
int idx,
bool reverse);
std::pair<uint64_t, uint64_t> parse_node_file(const std::string &path,
const std::string &node_type,
int idx);
std::pair<uint64_t, uint64_t> parse_node_file(const std::string &path);
int32_t add_graph_node(int idx,
std::vector<int64_t> &id_list,
std::vector<uint64_t> &id_list,
std::vector<bool> &is_weight_list);
int32_t remove_graph_node(int idx, std::vector<int64_t> &id_list);
int32_t remove_graph_node(int idx, std::vector<uint64_t> &id_list);
int32_t get_server_index_by_id(int64_t id);
Node *find_node(int type_id, int idx, int64_t id);
int32_t get_server_index_by_id(uint64_t id);
Node *find_node(int type_id, int idx, uint64_t id);
Node *find_node(int type_id, uint64_t id);
virtual int32_t Pull(TableContext &context) { return 0; }
virtual int32_t Push(TableContext &context) { return 0; }
......@@ -531,19 +610,21 @@ class GraphTable : public Table {
this->server_num = server_num;
return 0;
}
virtual uint32_t get_thread_pool_index_by_shard_index(int64_t shard_index);
virtual uint32_t get_thread_pool_index(int64_t node_id);
virtual std::pair<int32_t, std::string> parse_feature(int idx,
std::string feat_str);
virtual uint32_t get_thread_pool_index_by_shard_index(uint64_t shard_index);
virtual uint32_t get_thread_pool_index(uint64_t node_id);
virtual int parse_feature(int idx,
const char *feat_str,
size_t len,
FeatureNode *node);
virtual int32_t get_node_feat(int idx,
const std::vector<int64_t> &node_ids,
const std::vector<uint64_t> &node_ids,
const std::vector<std::string> &feature_names,
std::vector<std::vector<std::string>> &res);
virtual int32_t set_node_feat(
int idx,
const std::vector<int64_t> &node_ids,
const std::vector<uint64_t> &node_ids,
const std::vector<std::string> &feature_names,
const std::vector<std::vector<std::string>> &res);
......@@ -578,22 +659,24 @@ class GraphTable : public Table {
virtual void export_partition_files(int idx, std::string file_path);
virtual char *random_sample_neighbor_from_ssd(
int idx,
int64_t id,
uint64_t id,
int sample_size,
const std::shared_ptr<std::mt19937_64> rng,
int &actual_size);
virtual int32_t add_node_to_ssd(
int type_id, int idx, int64_t src_id, char *data, int len);
int type_id, int idx, uint64_t src_id, char *data, int len);
virtual paddle::framework::GpuPsCommGraph make_gpu_ps_graph(
int idx, std::vector<int64_t> ids);
int idx, std::vector<uint64_t> ids);
virtual paddle::framework::GpuPsCommGraphFea make_gpu_ps_graph_fea(
std::vector<uint64_t> &node_ids, int slot_num);
int32_t Load_to_ssd(const std::string &path, const std::string &param);
int64_t load_graph_to_memory_from_ssd(int idx, std::vector<int64_t> &ids);
int64_t load_graph_to_memory_from_ssd(int idx, std::vector<uint64_t> &ids);
int32_t make_complementary_graph(int idx, int64_t byte_size);
int32_t dump_edges_to_ssd(int idx);
int32_t get_partition_num(int idx) { return partitions[idx].size(); }
std::vector<int64_t> get_partition(int idx, int index) {
if (idx >= partitions.size() || index >= partitions[idx].size())
return std::vector<int64_t>();
std::vector<uint64_t> get_partition(int idx, int index) {
if (idx >= (int)partitions.size() || index >= (int)partitions[idx].size())
return std::vector<uint64_t>();
return partitions[idx][index];
}
int32_t load_edges_to_ssd(const std::string &path,
......@@ -603,17 +686,20 @@ class GraphTable : public Table {
void set_search_level(int search_level) { this->search_level = search_level; }
int search_level;
int64_t total_memory_cost;
std::vector<std::vector<std::vector<int64_t>>> partitions;
std::vector<std::vector<std::vector<uint64_t>>> partitions;
int next_partition;
#endif
virtual int32_t add_comm_edge(int idx, int64_t src_id, int64_t dst_id);
virtual int32_t add_comm_edge(int idx, uint64_t src_id, uint64_t dst_id);
virtual int32_t build_sampler(int idx, std::string sample_type = "random");
void set_feature_separator(const std::string &ch);
std::vector<std::vector<GraphShard *>> edge_shards, feature_shards;
size_t shard_start, shard_end, server_num, shard_num_per_server, shard_num;
int task_pool_size_ = 24;
int load_thread_num = 160;
const int random_sample_nodes_ranges = 3;
std::vector<std::vector<std::unordered_map<int64_t, double>>> node_weight;
std::vector<std::vector<std::unordered_map<uint64_t, double>>> node_weight;
std::vector<std::vector<std::string>> feat_name;
std::vector<std::vector<std::string>> feat_dtype;
std::vector<std::vector<int32_t>> feat_shape;
......@@ -625,21 +711,24 @@ class GraphTable : public Table {
std::vector<std::shared_ptr<::ThreadPool>> _shards_task_pool;
std::vector<std::shared_ptr<std::mt19937_64>> _shards_task_rng_pool;
std::shared_ptr<::ThreadPool> load_node_edge_task_pool;
std::shared_ptr<ScaledLRU<SampleKey, SampleResult>> scaled_lru;
std::unordered_set<int64_t> extra_nodes;
std::unordered_map<int64_t, size_t> extra_nodes_to_thread_index;
std::unordered_set<uint64_t> extra_nodes;
std::unordered_map<uint64_t, size_t> extra_nodes_to_thread_index;
bool use_cache, use_duplicate_nodes;
int cache_size_limit;
int cache_ttl;
mutable std::mutex mutex_;
bool build_sampler_on_cpu;
std::shared_ptr<pthread_rwlock_t> rw_lock;
#ifdef PADDLE_WITH_HETERPS
// paddle::framework::GpuPsGraphTable gpu_graph_table;
paddle::distributed::RocksDBHandler *_db;
// std::shared_ptr<::ThreadPool> graph_sample_pool;
// std::shared_ptr<GraphSampler> graph_sampler;
// REGISTER_GRAPH_FRIEND_CLASS(2, CompleteGraphSampler, BasicBfsGraphSampler)
// std::shared_ptr<::ThreadPool> graph_sample_pool;
// std::shared_ptr<GraphSampler> graph_sampler;
// REGISTER_GRAPH_FRIEND_CLASS(2, CompleteGraphSampler, BasicBfsGraphSampler)
#endif
std::string feature_separator_ = std::string(" ");
};
/*
......@@ -657,7 +746,7 @@ class CompleteGraphSampler : public GraphSampler {
protected:
GraphTable *graph_table;
std::vector<std::vector<paddle::framework::GpuPsGraphNode>> sample_nodes;
std::vector<std::vector<int64_t>> sample_neighbors;
std::vector<std::vector<uint64_t>> sample_neighbors;
// std::vector<GpuPsCommGraph> sample_res;
// std::shared_ptr<std::mt19937_64> random;
int gpu_num;
......@@ -676,11 +765,11 @@ class BasicBfsGraphSampler : public GraphSampler {
GraphTable *graph_table;
// std::vector<std::vector<GpuPsGraphNode>> sample_nodes;
std::vector<std::vector<paddle::framework::GpuPsGraphNode>> sample_nodes;
std::vector<std::vector<int64_t>> sample_neighbors;
std::vector<std::vector<uint64_t>> sample_neighbors;
size_t gpu_num;
int init_search_size, node_num_for_each_shard, edge_num_for_each_node;
int rounds, interval;
std::vector<std::unordered_map<int64_t, std::vector<int64_t>>>
std::vector<std::unordered_map<uint64_t, std::vector<uint64_t>>>
sample_neighbors_map;
};
#endif
......
......@@ -16,10 +16,15 @@
#include <cstring>
#include <iostream>
#include <memory>
#include <set>
#include <sstream>
#include <vector>
#include "glog/logging.h"
#include "paddle/fluid/distributed/ps/table/graph/graph_weighted_sampler.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/string/string_helper.h"
namespace paddle {
namespace distributed {
......@@ -30,6 +35,7 @@ class Node {
virtual ~Node() {}
static int id_size, int_size, weight_size;
uint64_t get_id() { return id; }
int64_t get_py_id() { return (int64_t)id; }
void set_id(uint64_t id) { this->id = id; }
virtual void build_edges(bool is_weighted) {}
......@@ -46,7 +52,11 @@ class Node {
virtual void to_buffer(char *buffer, bool need_feature);
virtual void recover_from_buffer(char *buffer);
virtual std::string get_feature(int idx) { return std::string(""); }
virtual void set_feature(int idx, std::string str) {}
virtual int get_feature_ids(std::vector<uint64_t> *res) const { return 0; }
virtual int get_feature_ids(int slot_idx, std::vector<uint64_t> *res) const {
return 0;
}
virtual void set_feature(int idx, const std::string &str) {}
virtual void set_feature_size(int size) {}
virtual int get_feature_size() { return 0; }
virtual size_t get_neighbor_size() { return 0; }
......@@ -95,7 +105,64 @@ class FeatureNode : public Node {
}
}
virtual void set_feature(int idx, std::string str) {
virtual int get_feature_ids(std::vector<uint64_t> *res) const {
PADDLE_ENFORCE_NOT_NULL(res,
paddle::platform::errors::InvalidArgument(
"get_feature_ids res should not be null"));
errno = 0;
for (auto &feature_item : feature) {
const uint64_t *feas = (const uint64_t *)(feature_item.c_str());
size_t num = feature_item.length() / sizeof(uint64_t);
CHECK((feature_item.length() % sizeof(uint64_t)) == 0)
<< "bad feature_item: [" << feature_item << "]";
size_t n = res->size();
res->resize(n + num);
for (size_t i = 0; i < num; ++i) {
(*res)[n + i] = feas[i];
}
}
PADDLE_ENFORCE_EQ(
errno,
0,
paddle::platform::errors::InvalidArgument(
"get_feature_ids get errno should be 0, but got %d.", errno));
return 0;
}
virtual int get_feature_ids(int slot_idx, std::vector<uint64_t> *res) const {
PADDLE_ENFORCE_NOT_NULL(res,
paddle::platform::errors::InvalidArgument(
"get_feature_ids res should not be null"));
res->clear();
errno = 0;
if (slot_idx < (int)this->feature.size()) {
const std::string &s = this->feature[slot_idx];
const uint64_t *feas = (const uint64_t *)(s.c_str());
size_t num = s.length() / sizeof(uint64_t);
CHECK((s.length() % sizeof(uint64_t)) == 0)
<< "bad feature_item: [" << s << "]";
res->resize(num);
for (size_t i = 0; i < num; ++i) {
(*res)[i] = feas[i];
}
}
PADDLE_ENFORCE_EQ(
errno,
0,
paddle::platform::errors::InvalidArgument(
"get_feature_ids get errno should be 0, but got %d.", errno));
return 0;
}
virtual std::string *mutable_feature(int idx) {
if (idx >= (int)this->feature.size()) {
this->feature.resize(idx + 1);
}
return &(this->feature[idx]);
}
virtual void set_feature(int idx, const std::string &str) {
if (idx >= (int)this->feature.size()) {
this->feature.resize(idx + 1);
}
......@@ -117,6 +184,23 @@ class FeatureNode : public Node {
return std::string(buffer, Tsize);
}
template <typename T>
static void parse_value_to_bytes(
std::vector<std::string>::iterator feat_str_begin,
std::vector<std::string>::iterator feat_str_end,
std::string *output) {
T v;
size_t feat_str_size = feat_str_end - feat_str_begin;
size_t Tsize = sizeof(T) * feat_str_size;
char buffer[Tsize] = {'\0'};
for (size_t i = 0; i < feat_str_size; i++) {
std::stringstream ss(*(feat_str_begin + i));
ss >> v;
std::memcpy(buffer + sizeof(T) * i, (char *)&v, sizeof(T));
}
output->assign(buffer);
}
template <typename T>
static std::vector<T> parse_bytes_to_array(std::string feat_str) {
T v;
......@@ -131,8 +215,28 @@ class FeatureNode : public Node {
return out;
}
template <typename T>
static void parse_value_to_bytes(
std::vector<paddle::string::str_ptr>::iterator feat_str_begin,
std::vector<paddle::string::str_ptr>::iterator feat_str_end,
std::string *output) {
size_t feat_str_size = feat_str_end - feat_str_begin;
size_t Tsize = sizeof(T) * feat_str_size;
size_t num = output->length();
output->resize(num + Tsize);
T *fea_ptrs = (T *)(&(*output)[num]);
thread_local paddle::string::str_ptr_stream ss;
for (size_t i = 0; i < feat_str_size; i++) {
ss.reset(*(feat_str_begin + i));
ss >> fea_ptrs[i];
}
}
protected:
std::vector<std::string> feature;
};
} // namespace distributed
} // namespace paddle
......@@ -41,14 +41,14 @@ namespace paddle {
namespace distributed {
int32_t MemorySparseTable::Initialize() {
_shards_task_pool.resize(_task_pool_size);
for (size_t i = 0; i < _shards_task_pool.size(); ++i) {
_shards_task_pool[i].reset(new ::ThreadPool(1));
}
auto& profiler = CostProfiler::instance();
profiler.register_profiler("pserver_sparse_update_all");
profiler.register_profiler("pserver_sparse_select_all");
InitializeValue();
_shards_task_pool.resize(_task_pool_size);
for (int i = 0; i < _shards_task_pool.size(); ++i) {
_shards_task_pool[i].reset(new ::ThreadPool(1));
}
VLOG(0) << "initalize MemorySparseTable succ";
return 0;
}
......@@ -65,9 +65,13 @@ int32_t MemorySparseTable::InitializeValue() {
_real_local_shard_num =
_real_local_shard_num < 0 ? 0 : _real_local_shard_num;
}
#ifdef PADDLE_WITH_HETERPS
_task_pool_size = _sparse_table_shard_num;
#endif
VLOG(1) << "memory sparse table _avg_local_shard_num: "
<< _avg_local_shard_num
<< " _real_local_shard_num: " << _real_local_shard_num;
<< " _real_local_shard_num: " << _real_local_shard_num
<< " _task_pool_size:" << _task_pool_size;
_local_shards.reset(new shard_type[_real_local_shard_num]);
......@@ -336,7 +340,11 @@ int32_t MemorySparseTable::Save(const std::string& dirname,
size_t file_start_idx = _avg_local_shard_num * _shard_idx;
#ifdef PADDLE_WITH_GPU_GRAPH
int thread_num = _real_local_shard_num;
#else
int thread_num = _real_local_shard_num < 20 ? _real_local_shard_num : 20;
#endif
omp_set_num_threads(thread_num);
#pragma omp parallel for schedule(dynamic)
for (int i = 0; i < _real_local_shard_num; ++i) {
......
......@@ -112,7 +112,7 @@ class MemorySparseTable : public Table {
virtual int32_t LoadPatch(const std::vector<std::string>& file_list,
int save_param);
const int _task_pool_size = 24;
int _task_pool_size = 24;
int _avg_local_shard_num;
int _real_local_shard_num;
int _sparse_table_shard_num;
......
......@@ -126,13 +126,20 @@ message TableParameter {
message TableAccessorParameter {
optional string accessor_class = 1;
optional uint32 fea_dim = 4 [ default = 11 ];
optional uint32 embedx_dim = 5 [ default = 8 ];
optional uint32 embedx_threshold = 6 [ default = 10 ];
optional uint32 fea_dim = 4 [ default = 11 ]; // field size of one value
optional uint32 embedx_dim = 5 [ default = 8 ]; // embedx feature size
optional uint32 embedx_threshold = 6
[ default = 10 ]; // embedx feature create threshold
optional CtrAccessorParameter ctr_accessor_param = 7;
repeated TableAccessorSaveParameter table_accessor_save_param = 8;
optional SparseCommonSGDRuleParameter embed_sgd_param = 10;
optional SparseCommonSGDRuleParameter embedx_sgd_param = 11;
optional GraphSGDParameter graph_sgd_param = 12;
}
message GraphSGDParameter {
optional uint32 nodeid_slot = 1 [ default = 9008 ];
optional float feature_learning_rate = 2 [ default = 0.05 ];
}
message CtrAccessorParameter {
......@@ -232,6 +239,7 @@ message GraphParameter {
optional string table_type = 9 [ default = "" ];
optional int32 shard_num = 10 [ default = 127 ];
optional int32 search_level = 11 [ default = 1 ];
optional bool build_sampler_on_cpu = 12 [ default = true ];
}
message GraphFeature {
......
......@@ -740,6 +740,19 @@ if(WITH_DISTRIBUTE)
set_source_files_properties(
heterxpu_trainer.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
elseif(WITH_PSCORE)
# cc_library(executor SRCS executor.cc multi_trainer.cc pipeline_trainer.cc dataset_factory.cc
# dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc
# heterxpu_trainer.cc heter_pipeline_trainer.cc
# data_feed.cc device_worker.cc hogwild_worker.cc hetercpu_worker.cc
# downpour_worker.cc downpour_lite_worker.cc downpour_worker_opt.cc data_feed.cu
# pull_dense_worker.cc section_worker.cc heter_section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
# device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog
# index_sampler index_wrapper sampler index_dataset_proto
# lod_rank_table fs shell fleet_wrapper heter_wrapper box_wrapper metrics lodtensor_printer feed_fetch_method
# graph_to_program_pass variable_helper timer monitor
# heter_service_proto fleet heter_server brpc fleet_executor
# graph_gpu_wrapper)
cc_library(
executor
SRCS executor.cc
......@@ -1001,21 +1014,41 @@ cc_library(
DEPS parallel_executor)
if(WITH_PSCORE)
get_property(RPC_DEPS GLOBAL PROPERTY RPC_DEPS)
cc_test(
dist_multi_trainer_test
SRCS dist_multi_trainer_test.cc
DEPS conditional_block_op executor gloo_wrapper ${RPC_DEPS})
cc_test(
heter_pipeline_trainer_test
SRCS heter_pipeline_trainer_test.cc
DEPS conditional_block_op
scale_op
heter_listen_and_serv_op
executor
heter_server
gloo_wrapper
eigen_function
${RPC_DEPS})
if(WITH_HETERPS)
cc_test(
dist_multi_trainer_test
SRCS dist_multi_trainer_test.cc
DEPS conditional_block_op executor gloo_wrapper ${RPC_DEPS}
graph_gpu_wrapper)
cc_test(
heter_pipeline_trainer_test
SRCS heter_pipeline_trainer_test.cc
DEPS conditional_block_op
scale_op
heter_listen_and_serv_op
executor
heter_server
gloo_wrapper
eigen_function
${RPC_DEPS}
graph_gpu_wrapper)
else()
cc_test(
dist_multi_trainer_test
SRCS dist_multi_trainer_test.cc
DEPS conditional_block_op executor gloo_wrapper ${RPC_DEPS})
cc_test(
heter_pipeline_trainer_test
SRCS heter_pipeline_trainer_test.cc
DEPS conditional_block_op
scale_op
heter_listen_and_serv_op
executor
heter_server
gloo_wrapper
eigen_function
${RPC_DEPS})
endif()
else()
cc_test(
dist_multi_trainer_test
......
......@@ -2108,6 +2108,9 @@ void SlotRecordInMemoryDataFeed::Init(const DataFeedDesc& data_feed_desc) {
} else {
so_parser_name_.clear();
}
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
gpu_graph_data_generator_.SetConfig(data_feed_desc);
#endif
}
void SlotRecordInMemoryDataFeed::LoadIntoMemory() {
......@@ -2644,6 +2647,9 @@ bool SlotRecordInMemoryDataFeed::Start() {
#if defined(PADDLE_WITH_CUDA) && defined(PADDLE_WITH_HETERPS)
CHECK(paddle::platform::is_gpu_place(this->place_));
pack_ = BatchGpuPackMgr().get(this->GetPlace(), used_slots_info_);
#endif
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
gpu_graph_data_generator_.AllocResource(this->place_, feed_vec_);
#endif
return true;
}
......@@ -2651,27 +2657,33 @@ bool SlotRecordInMemoryDataFeed::Start() {
int SlotRecordInMemoryDataFeed::Next() {
#ifdef _LINUX
this->CheckStart();
VLOG(3) << "enable heter next: " << offset_index_
<< " batch_offsets: " << batch_offsets_.size();
if (offset_index_ >= batch_offsets_.size()) {
VLOG(3) << "offset_index: " << offset_index_
if (!gpu_graph_mode_) {
VLOG(3) << "enable heter next: " << offset_index_
<< " batch_offsets: " << batch_offsets_.size();
return 0;
}
auto& batch = batch_offsets_[offset_index_++];
this->batch_size_ = batch.second;
VLOG(3) << "batch_size_=" << this->batch_size_
<< ", thread_id=" << thread_id_;
if (this->batch_size_ != 0) {
PutToFeedVec(&records_[batch.first], this->batch_size_);
if (offset_index_ >= batch_offsets_.size()) {
VLOG(3) << "offset_index: " << offset_index_
<< " batch_offsets: " << batch_offsets_.size();
return 0;
}
auto& batch = batch_offsets_[offset_index_++];
this->batch_size_ = batch.second;
VLOG(3) << "batch_size_=" << this->batch_size_
<< ", thread_id=" << thread_id_;
if (this->batch_size_ != 0) {
PutToFeedVec(&records_[batch.first], this->batch_size_);
} else {
VLOG(3) << "finish reading for heterps, batch size zero, thread_id="
<< thread_id_;
}
VLOG(3) << "enable heter next: " << offset_index_
<< " batch_offsets: " << batch_offsets_.size()
<< " baych_size: " << this->batch_size_;
} else {
VLOG(3) << "finish reading for heterps, batch size zero, thread_id="
<< thread_id_;
VLOG(3) << "datafeed in gpu graph mode";
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
this->batch_size_ = gpu_graph_data_generator_.GenerateBatch();
#endif
}
VLOG(3) << "enable heter next: " << offset_index_
<< " batch_offsets: " << batch_offsets_.size()
<< " baych_size: " << this->batch_size_;
return this->batch_size_;
#else
......
此差异已折叠。
......@@ -23,6 +23,7 @@ limitations under the License. */
#include <future> // NOLINT
#include <memory>
#include <mutex> // NOLINT
#include <random>
#include <sstream>
#include <string>
#include <thread> // NOLINT
......@@ -42,6 +43,7 @@ limitations under the License. */
#include "paddle/fluid/platform/timer.h"
#include "paddle/fluid/string/string_helper.h"
#if defined(PADDLE_WITH_CUDA)
#include "paddle/fluid/framework/fleet/heter_ps/gpu_graph_utils.h"
#include "paddle/fluid/platform/cuda_device_guard.h"
#include "paddle/fluid/platform/device/gpu/gpu_info.h"
#endif
......@@ -56,6 +58,8 @@ namespace framework {
class DataFeedDesc;
class Scope;
class Variable;
class NeighborSampleResult;
class NodeQueryResult;
} // namespace framework
} // namespace paddle
......@@ -420,7 +424,6 @@ struct UsedSlotGpuType {
};
#if defined(PADDLE_WITH_CUDA) && defined(PADDLE_WITH_HETERPS)
#define CUDA_CHECK(val) CHECK(val == gpuSuccess)
template <typename T>
struct CudaBuffer {
T* cu_buffer;
......@@ -776,6 +779,202 @@ class DLManager {
std::map<std::string, DLHandle> handle_map_;
};
struct engine_wrapper_t {
std::default_random_engine engine;
#if !defined(_WIN32)
engine_wrapper_t() {
struct timespec tp;
clock_gettime(CLOCK_REALTIME, &tp);
double cur_time = tp.tv_sec + tp.tv_nsec * 1e-9;
static std::atomic<uint64_t> x(0);
std::seed_seq sseq = {x++, x++, x++, (uint64_t)(cur_time * 1000)};
engine.seed(sseq);
}
#endif
};
struct BufState {
int left;
int right;
int central_word;
int step;
engine_wrapper_t random_engine_;
int len;
int cursor;
int row_num;
int batch_size;
int walk_len;
std::vector<int>* window;
BufState() {}
~BufState() {}
void Init(int graph_batch_size,
int graph_walk_len,
std::vector<int>* graph_window) {
batch_size = graph_batch_size;
walk_len = graph_walk_len;
window = graph_window;
left = 0;
right = window->size() - 1;
central_word = -1;
step = -1;
len = 0;
cursor = 0;
row_num = 0;
for (size_t i = 0; i < graph_window->size(); i++) {
VLOG(2) << "graph_window[" << i << "] = " << (*graph_window)[i];
}
}
void Reset(int total_rows) {
cursor = 0;
row_num = total_rows;
int tmp_len = cursor + batch_size > row_num ? row_num - cursor : batch_size;
len = tmp_len;
central_word = -1;
step = -1;
GetNextCentrolWord();
}
int GetNextStep() {
step++;
if (step <= right && central_word + (*window)[step] < walk_len) {
return 1;
}
return 0;
}
void Debug() {
VLOG(2) << "left: " << left << " right: " << right
<< " central_word: " << central_word << " step: " << step
<< " cursor: " << cursor << " len: " << len
<< " row_num: " << row_num;
}
int GetNextCentrolWord() {
if (++central_word >= walk_len) {
return 0;
}
int window_size = window->size() / 2;
int random_window = random_engine_.engine() % window_size + 1;
left = window_size - random_window;
right = window_size + random_window - 1;
VLOG(2) << "random window: " << random_window << " window[" << left
<< "] = " << (*window)[left] << " window[" << right
<< "] = " << (*window)[right];
for (step = left; step <= right; step++) {
if (central_word + (*window)[step] >= 0) {
return 1;
}
}
return 0;
}
int GetNextBatch() {
cursor += len;
int tmp_len = cursor + batch_size > row_num ? row_num - cursor : batch_size;
if (tmp_len == 0) {
return 0;
}
len = tmp_len;
central_word = -1;
step = -1;
GetNextCentrolWord();
return tmp_len != 0;
}
};
class GraphDataGenerator {
public:
GraphDataGenerator(){};
virtual ~GraphDataGenerator(){};
void SetConfig(const paddle::framework::DataFeedDesc& data_feed_desc);
void AllocResource(const paddle::platform::Place& place,
std::vector<LoDTensor*> feed_vec);
int AcquireInstance(BufState* state);
int GenerateBatch();
int FillWalkBuf(std::shared_ptr<phi::Allocation> d_walk);
int FillFeatureBuf(uint64_t* d_walk, uint64_t* d_feature, size_t key_num);
int FillFeatureBuf(std::shared_ptr<phi::Allocation> d_walk,
std::shared_ptr<phi::Allocation> d_feature);
void FillOneStep(uint64_t* start_ids,
uint64_t* walk,
int len,
NeighborSampleResult& sample_res,
int cur_degree,
int step,
int* len_per_row);
int FillInsBuf();
void SetDeviceKeys(std::vector<uint64_t>* device_keys, int type) {
type_to_index_[type] = h_device_keys_.size();
h_device_keys_.push_back(device_keys);
}
protected:
int walk_degree_;
int walk_len_;
int window_;
int once_sample_startid_len_;
int gpuid_;
// start ids
// int64_t* device_keys_;
// size_t device_key_size_;
std::vector<std::vector<uint64_t>*> h_device_keys_;
std::unordered_map<int, int> type_to_index_;
// point to device_keys_
size_t cursor_;
size_t jump_rows_;
int64_t* id_tensor_ptr_;
int64_t* show_tensor_ptr_;
int64_t* clk_tensor_ptr_;
cudaStream_t stream_;
paddle::platform::Place place_;
std::vector<LoDTensor*> feed_vec_;
std::vector<size_t> offset_;
std::shared_ptr<phi::Allocation> d_prefix_sum_;
std::vector<std::shared_ptr<phi::Allocation>> d_device_keys_;
std::shared_ptr<phi::Allocation> d_walk_;
std::shared_ptr<phi::Allocation> d_feature_;
std::shared_ptr<phi::Allocation> d_len_per_row_;
std::shared_ptr<phi::Allocation> d_random_row_;
//
std::vector<std::shared_ptr<phi::Allocation>> d_sampleidx2rows_;
int cur_sampleidx2row_;
// record the keys to call graph_neighbor_sample
std::shared_ptr<phi::Allocation> d_sample_keys_;
int sample_keys_len_;
std::set<int> finish_node_type_;
std::unordered_map<int, size_t> node_type_start_;
std::vector<int> infer_node_type_start_;
std::shared_ptr<phi::Allocation> d_ins_buf_;
std::shared_ptr<phi::Allocation> d_feature_buf_;
std::shared_ptr<phi::Allocation> d_pair_num_;
std::shared_ptr<phi::Allocation> d_slot_tensor_ptr_;
std::shared_ptr<phi::Allocation> d_slot_lod_tensor_ptr_;
int ins_buf_pair_len_;
// size of a d_walk buf
size_t buf_size_;
int repeat_time_;
std::vector<int> window_step_;
BufState buf_state_;
int batch_size_;
int slot_num_;
int shuffle_seed_;
int debug_mode_;
std::vector<int> first_node_type_;
std::vector<std::vector<int>> meta_path_;
bool gpu_graph_training_;
};
class DataFeed {
public:
DataFeed() {
......@@ -838,6 +1037,14 @@ class DataFeed {
virtual void SetParseLogKey(bool parse_logkey) {}
virtual void SetEnablePvMerge(bool enable_pv_merge) {}
virtual void SetCurrentPhase(int current_phase) {}
virtual void SetDeviceKeys(std::vector<uint64_t>* device_keys, int type) {
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
gpu_graph_data_generator_.SetDeviceKeys(device_keys, type);
#endif
}
virtual void SetGpuGraphMode(int gpu_graph_mode) {
gpu_graph_mode_ = gpu_graph_mode;
}
virtual void SetFileListMutex(std::mutex* mutex) {
mutex_for_pick_file_ = mutex;
}
......@@ -921,6 +1128,10 @@ class DataFeed {
// The input type of pipe reader, 0 for one sample, 1 for one batch
int input_type_;
int gpu_graph_mode_ = 0;
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
GraphDataGenerator gpu_graph_data_generator_;
#endif
};
// PrivateQueueDataFeed is the base virtual class for ohther DataFeeds.
......
......@@ -27,6 +27,19 @@ message MultiSlotDesc {
optional string uid_slot = 2;
}
message GraphConfig {
optional int32 walk_degree = 1 [ default = 1 ];
optional int32 walk_len = 2 [ default = 20 ];
optional int32 window = 3 [ default = 5 ];
optional int32 once_sample_startid_len = 4 [ default = 8000 ];
optional int32 sample_times_one_chunk = 5 [ default = 10 ];
optional int32 batch_size = 6 [ default = 1 ];
optional int32 debug_mode = 7 [ default = 0 ];
optional string first_node_type = 8;
optional string meta_path = 9;
optional bool gpu_graph_training = 10 [ default = true ];
}
message DataFeedDesc {
optional string name = 1;
optional int32 batch_size = 2 [ default = 32 ];
......@@ -37,4 +50,5 @@ message DataFeedDesc {
optional int32 pv_batch_size = 7 [ default = 32 ];
optional int32 input_type = 8 [ default = 0 ];
optional string so_parser_name = 9;
optional GraphConfig graph_config = 10;
}
......@@ -14,6 +14,7 @@
#include "paddle/fluid/framework/data_set.h"
#include "gflags/gflags.h"
#include "google/protobuf/text_format.h"
#if (defined PADDLE_WITH_DISTRIBUTE) && (defined PADDLE_WITH_PSCORE)
#include "paddle/fluid/distributed/index_dataset/index_sampler.h"
......@@ -26,6 +27,7 @@
#ifdef PADDLE_WITH_PSCORE
#include "paddle/fluid/distributed/ps/wrapper/fleet.h"
#include "paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h"
#endif
#if defined _WIN32 || defined __APPLE__
......@@ -34,6 +36,8 @@
#endif
USE_INT_STAT(STAT_total_feasign_num_in_mem);
DECLARE_bool(graph_get_neighbor_id);
namespace paddle {
namespace framework {
......@@ -196,6 +200,16 @@ void DatasetImpl<T>::SetFeaEval(bool fea_eval, int record_candidate_size) {
<< " with record candidate size: " << record_candidate_size;
}
template <typename T>
void DatasetImpl<T>::SetGpuGraphMode(int is_graph_mode) {
gpu_graph_mode_ = is_graph_mode;
}
template <typename T>
int DatasetImpl<T>::GetGpuGraphMode() {
return gpu_graph_mode_;
}
template <typename T>
std::vector<paddle::framework::DataFeed*> DatasetImpl<T>::GetReaders() {
std::vector<paddle::framework::DataFeed*> ret;
......@@ -440,12 +454,91 @@ void DatasetImpl<T>::LoadIntoMemory() {
platform::Timer timeline;
timeline.Start();
std::vector<std::thread> load_threads;
for (int64_t i = 0; i < thread_num_; ++i) {
load_threads.push_back(std::thread(
&paddle::framework::DataFeed::LoadIntoMemory, readers_[i].get()));
}
for (std::thread& t : load_threads) {
t.join();
if (gpu_graph_mode_) {
VLOG(0) << "in gpu_graph_mode";
#ifdef PADDLE_WITH_HETERPS
graph_all_type_total_keys_.clear();
auto gpu_graph_ptr = GraphGpuWrapper::GetInstance();
auto node_to_id = gpu_graph_ptr->feature_to_id;
auto edge_to_id = gpu_graph_ptr->edge_to_id;
graph_all_type_total_keys_.resize(node_to_id.size());
int cnt = 0;
for (auto& iter : node_to_id) {
int node_idx = iter.second;
std::vector<std::vector<uint64_t>> gpu_graph_device_keys;
gpu_graph_ptr->get_all_id(
1, node_idx, thread_num_, &gpu_graph_device_keys);
auto& type_total_key = graph_all_type_total_keys_[cnt];
type_total_key.resize(thread_num_);
for (size_t i = 0; i < gpu_graph_device_keys.size(); i++) {
VLOG(2) << "node type: " << node_idx << ", gpu_graph_device_keys[" << i
<< "] = " << gpu_graph_device_keys[i].size();
for (size_t j = 0; j < gpu_graph_device_keys[i].size(); j++) {
gpu_graph_total_keys_.push_back(gpu_graph_device_keys[i][j]);
type_total_key[i].push_back(gpu_graph_device_keys[i][j]);
}
}
for (size_t i = 0; i < readers_.size(); i++) {
readers_[i]->SetDeviceKeys(&type_total_key[i], node_idx);
readers_[i]->SetGpuGraphMode(gpu_graph_mode_);
}
cnt++;
}
VLOG(2) << "begin add feature_id into gpu_graph_total_keys_ size["
<< gpu_graph_total_keys_.size() << "]";
for (auto& iter : node_to_id) {
std::vector<std::vector<uint64_t>> gpu_graph_device_keys;
int node_idx = iter.second;
gpu_graph_ptr->get_all_feature_ids(
1, node_idx, thread_num_, &gpu_graph_device_keys);
for (size_t i = 0; i < gpu_graph_device_keys.size(); i++) {
VLOG(2) << "begin node type: " << node_idx << ", gpu_graph_device_keys["
<< i << "] = " << gpu_graph_device_keys[i].size();
for (size_t j = 0; j < gpu_graph_device_keys[i].size(); j++) {
gpu_graph_total_keys_.push_back(gpu_graph_device_keys[i][j]);
}
VLOG(2) << "end node type: " << node_idx << ", gpu_graph_device_keys["
<< i << "] = " << gpu_graph_device_keys[i].size();
}
}
VLOG(2) << "end add feature_id into gpu_graph_total_keys_ size["
<< gpu_graph_total_keys_.size() << "]";
// FIX: trick for iterate edge table
for (auto& iter : edge_to_id) {
int edge_idx = iter.second;
std::vector<std::vector<uint64_t>> gpu_graph_device_keys;
gpu_graph_ptr->get_all_id(
0, edge_idx, thread_num_, &gpu_graph_device_keys);
for (size_t i = 0; i < gpu_graph_device_keys.size(); i++) {
VLOG(1) << "edge type: " << edge_idx << ", gpu_graph_device_keys[" << i
<< "] = " << gpu_graph_device_keys[i].size();
for (size_t j = 0; j < gpu_graph_device_keys[i].size(); j++) {
gpu_graph_total_keys_.push_back(gpu_graph_device_keys[i][j]);
}
}
if (FLAGS_graph_get_neighbor_id) {
std::vector<std::vector<uint64_t>> gpu_graph_neighbor_keys;
gpu_graph_ptr->get_all_neighbor_id(
0, edge_idx, thread_num_, &gpu_graph_neighbor_keys);
for (size_t i = 0; i < gpu_graph_neighbor_keys.size(); i++) {
for (size_t k = 0; k < gpu_graph_neighbor_keys[i].size(); k++) {
gpu_graph_total_keys_.push_back(gpu_graph_neighbor_keys[i][k]);
}
}
}
}
#endif
} else {
for (int64_t i = 0; i < thread_num_; ++i) {
load_threads.push_back(std::thread(
&paddle::framework::DataFeed::LoadIntoMemory, readers_[i].get()));
}
for (std::thread& t : load_threads) {
t.join();
}
}
input_channel_->Close();
int64_t in_chan_size = input_channel_->Size();
......
......@@ -165,6 +165,9 @@ class Dataset {
virtual std::vector<std::string> GetSlots() = 0;
virtual void SetGpuGraphMode(int is_graph_mode) = 0;
virtual int GetGpuGraphMode() = 0;
protected:
virtual int ReceiveFromClient(int msg_type,
int client_id,
......@@ -213,6 +216,8 @@ class DatasetImpl : public Dataset {
virtual std::pair<std::string, std::string> GetHdfsConfig() {
return std::make_pair(fs_name_, fs_ugi_);
}
virtual void SetGpuGraphMode(int is_graph_mode);
virtual int GetGpuGraphMode();
virtual std::string GetDownloadCmd();
virtual const paddle::framework::DataFeedDesc& GetDataFeedDesc() {
return data_feed_desc_;
......@@ -272,7 +277,9 @@ class DatasetImpl : public Dataset {
return multi_consume_channel_;
}
}
std::vector<uint64_t>& GetGpuGraphTotalKeys() {
return gpu_graph_total_keys_;
}
Channel<T>& GetInputChannelRef() { return input_channel_; }
protected:
......@@ -333,6 +340,10 @@ class DatasetImpl : public Dataset {
std::vector<T> input_records_; // only for paddleboxdatafeed
std::vector<std::string> use_slots_;
bool enable_heterps_ = false;
int gpu_graph_mode_ = 0;
// std::vector<std::vector<int64_t>> gpu_graph_device_keys_;
std::vector<std::vector<std::vector<uint64_t>>> graph_all_type_total_keys_;
std::vector<uint64_t> gpu_graph_total_keys_;
};
// use std::vector<MultiSlotType> or Record as data type
......
......@@ -14,8 +14,8 @@ limitations under the License. */
#include "paddle/fluid/framework/device_worker.h"
#include <chrono>
#include "paddle/fluid/framework/convert_utils.h"
namespace phi {
class DenseTensor;
} // namespace phi
......@@ -32,48 +32,179 @@ void DeviceWorker::SetDataFeed(DataFeed* data_feed) {
}
template <typename T>
std::string PrintLodTensorType(Tensor* tensor, int64_t start, int64_t end) {
std::string PrintLodTensorType(Tensor* tensor,
int64_t start,
int64_t end,
char separator = ',',
bool need_leading_separator = true) {
auto count = tensor->numel();
if (start < 0 || end > count) {
VLOG(3) << "access violation";
return "access violation";
}
if (start >= end) return "";
std::ostringstream os;
if (!need_leading_separator) {
os << tensor->data<T>()[start];
start++;
}
for (int64_t i = start; i < end; i++) {
os << ":" << tensor->data<T>()[i];
// os << ":" << tensor->data<T>()[i];
os << separator << tensor->data<T>()[i];
}
return os.str();
}
template <typename T>
void PrintLodTensorType(Tensor* tensor,
int64_t start,
int64_t end,
std::string& out_val,
char separator = ',',
bool need_leading_separator = true) {
auto count = tensor->numel();
if (start < 0 || end > count) {
VLOG(3) << "access violation";
out_val += "access violation";
return;
}
if (start >= end) return;
if (!need_leading_separator) {
out_val += std::to_string(tensor->data<T>()[start]);
// os << tensor->data<T>()[start];
start++;
}
for (int64_t i = start; i < end; i++) {
// os << ":" << tensor->data<T>()[i];
// os << separator << tensor->data<T>()[i];
out_val += separator;
out_val += std::to_string(tensor->data<T>()[i]);
}
}
std::string PrintLodTensorIntType(Tensor* tensor, int64_t start, int64_t end) {
#define FLOAT_EPS 1e-8
#define MAX_FLOAT_BUFF_SIZE 40
template <>
void PrintLodTensorType<float>(Tensor* tensor,
int64_t start,
int64_t end,
std::string& out_val,
char separator,
bool need_leading_separator) {
char buf[MAX_FLOAT_BUFF_SIZE];
auto count = tensor->numel();
if (start < 0 || end > count) {
VLOG(3) << "access violation";
out_val += "access violation";
return;
}
if (start >= end) return;
for (int64_t i = start; i < end; i++) {
if (i != start || need_leading_separator) out_val += separator;
if (tensor->data<float>()[i] > -FLOAT_EPS &&
tensor->data<float>()[i] < FLOAT_EPS)
out_val += "0";
else {
sprintf(buf, "%.9f", tensor->data<float>()[i]);
out_val += buf;
}
}
}
std::string PrintLodTensorIntType(Tensor* tensor,
int64_t start,
int64_t end,
char separator = ',',
bool need_leading_separator = true) {
auto count = tensor->numel();
if (start < 0 || end > count) {
VLOG(3) << "access violation";
return "access violation";
}
if (start >= end) return "";
std::ostringstream os;
if (!need_leading_separator) {
os << static_cast<uint64_t>(tensor->data<int64_t>()[start]);
start++;
}
for (int64_t i = start; i < end; i++) {
os << ":" << static_cast<uint64_t>(tensor->data<int64_t>()[i]);
// os << ":" << static_cast<uint64_t>(tensor->data<int64_t>()[i]);
os << separator << static_cast<uint64_t>(tensor->data<int64_t>()[i]);
}
return os.str();
}
std::string PrintLodTensor(Tensor* tensor, int64_t start, int64_t end) {
void PrintLodTensorIntType(Tensor* tensor,
int64_t start,
int64_t end,
std::string& out_val,
char separator = ',',
bool need_leading_separator = true) {
auto count = tensor->numel();
if (start < 0 || end > count) {
VLOG(3) << "access violation";
out_val += "access violation";
return;
}
if (start >= end) return;
if (!need_leading_separator) {
out_val +=
std::to_string(static_cast<uint64_t>(tensor->data<int64_t>()[start]));
start++;
}
for (int64_t i = start; i < end; i++) {
// os << ":" << static_cast<uint64_t>(tensor->data<int64_t>()[i]);
// os << separator << static_cast<uint64_t>(tensor->data<int64_t>()[i]);
out_val += separator;
out_val +=
std::to_string(static_cast<uint64_t>(tensor->data<int64_t>()[i]));
}
// return os.str();
}
std::string PrintLodTensor(Tensor* tensor,
int64_t start,
int64_t end,
char separator,
bool need_leading_separator) {
std::string out_val;
if (framework::TransToProtoVarType(tensor->dtype()) == proto::VarType::FP32) {
out_val = PrintLodTensorType<float>(tensor, start, end);
out_val = PrintLodTensorType<float>(
tensor, start, end, separator, need_leading_separator);
} else if (framework::TransToProtoVarType(tensor->dtype()) ==
proto::VarType::INT64) {
out_val = PrintLodTensorIntType(tensor, start, end);
out_val = PrintLodTensorIntType(
tensor, start, end, separator, need_leading_separator);
} else if (framework::TransToProtoVarType(tensor->dtype()) ==
proto::VarType::FP64) {
out_val = PrintLodTensorType<double>(tensor, start, end);
out_val = PrintLodTensorType<double>(
tensor, start, end, separator, need_leading_separator);
} else {
out_val = "unsupported type";
}
return out_val;
}
void PrintLodTensor(Tensor* tensor,
int64_t start,
int64_t end,
std::string& out_val,
char separator,
bool need_leading_separator) {
if (framework::TransToProtoVarType(tensor->dtype()) == proto::VarType::FP32) {
PrintLodTensorType<float>(
tensor, start, end, out_val, separator, need_leading_separator);
} else if (framework::TransToProtoVarType(tensor->dtype()) ==
proto::VarType::INT64) {
PrintLodTensorIntType(
tensor, start, end, out_val, separator, need_leading_separator);
} else if (framework::TransToProtoVarType(tensor->dtype()) ==
proto::VarType::FP64) {
PrintLodTensorType<double>(
tensor, start, end, out_val, separator, need_leading_separator);
} else {
out_val += "unsupported type";
}
}
std::pair<int64_t, int64_t> GetTensorBound(LoDTensor* tensor, int index) {
auto& dims = tensor->dims();
if (tensor->lod().size() != 0) {
......@@ -122,6 +253,11 @@ void DeviceWorker::DumpParam(const Scope& scope, const int batch_id) {
}
void DeviceWorker::InitRandomDumpConfig(const TrainerDesc& desc) {
bool is_dump_in_simple_mode = desc.is_dump_in_simple_mode();
if (is_dump_in_simple_mode) {
dump_mode_ = 3;
return;
}
bool enable_random_dump = desc.enable_random_dump();
if (!enable_random_dump) {
dump_mode_ = 0;
......@@ -140,16 +276,124 @@ void DeviceWorker::DumpField(const Scope& scope,
int dump_interval) { // dump_mode: 0: no random,
// 1: random with insid hash,
// 2: random with random
// number
// 3: simple mode using multi-threads, for gpugraphps-mode
auto start1 = std::chrono::steady_clock::now();
size_t batch_size = device_reader_->GetCurBatchSize();
auto& ins_id_vec = device_reader_->GetInsIdVec();
auto& ins_content_vec = device_reader_->GetInsContentVec();
if (ins_id_vec.size() > 0) {
if (dump_mode_ == 3) {
batch_size = std::string::npos;
bool has_valid_batch = false;
for (auto& field : *dump_fields_) {
Variable* var = scope.FindVar(field);
if (var == nullptr) {
VLOG(0) << "Note: field[" << field
<< "] cannot be find in scope, so it was skipped.";
continue;
}
LoDTensor* tensor = var->GetMutable<LoDTensor>();
if (!tensor->IsInitialized()) {
VLOG(0) << "Note: field[" << field
<< "] is not initialized, so it was skipped.";
continue;
}
auto& dims = tensor->dims();
if (dims.size() == 2 && dims[0] > 0) {
batch_size = std::min(batch_size, static_cast<size_t>(dims[0]));
// VLOG(0)<<"in dump field ---> "<<field<<" dim_size = "<<dims[0]<<"
// "<<dims[1]<<" batch_size = "<<batch_size;
has_valid_batch = true;
}
}
if (!has_valid_batch) return;
} else if (ins_id_vec.size() > 0) {
batch_size = ins_id_vec.size();
}
std::vector<std::string> ars(batch_size);
std::vector<bool> hit(batch_size, false);
if (dump_mode_ == 3) {
if (dump_fields_ == NULL || (*dump_fields_).size() == 0) {
return;
}
auto set_output_str = [&, this](
size_t begin, size_t end, LoDTensor* tensor) {
std::pair<int64_t, int64_t> bound;
auto& dims = tensor->dims();
for (size_t i = begin; i < end; ++i) {
bound = {i * dims[1], (i + 1) * dims[1]};
// auto bound = GetTensorBound(tensor, i);
if (ars[i].size() > 0) ars[i] += "\t";
// ars[i] += '[';
PrintLodTensor(tensor, bound.first, bound.second, ars[i], ' ', false);
// ars[i] += ']';
// ars[i] += "<" + PrintLodTensor(tensor, bound.first, bound.second, '
// ', false) + ">";
}
};
std::vector<std::thread> threads(tensor_iterator_thread_num);
for (auto& field : *dump_fields_) {
Variable* var = scope.FindVar(field);
if (var == nullptr) {
VLOG(0) << "Note: field[" << field
<< "] cannot be find in scope, so it was skipped.";
continue;
}
LoDTensor* tensor = var->GetMutable<LoDTensor>();
if (!tensor->IsInitialized()) {
VLOG(0) << "Note: field[" << field
<< "] is not initialized, so it was skipped.";
continue;
}
framework::LoDTensor cpu_tensor;
if (platform::is_gpu_place(tensor->place())) {
TensorCopySync(*tensor, platform::CPUPlace(), &cpu_tensor);
cpu_tensor.set_lod(tensor->lod());
tensor = &cpu_tensor;
}
auto& dims = tensor->dims();
if (dims.size() != 2 || dims[0] <= 0) {
VLOG(0) << "Note: field[" << field
<< "] cannot pass check, so it was "
"skipped. Maybe the dimension is "
"wrong ";
VLOG(0) << dims.size() << " " << dims[0] << " * " << dims[1];
continue;
}
size_t acutal_thread_num =
std::min((size_t)batch_size, tensor_iterator_thread_num);
for (size_t i = 0; i < acutal_thread_num; i++) {
size_t average_size = batch_size / acutal_thread_num;
size_t begin =
average_size * i + std::min(batch_size % acutal_thread_num, i);
size_t end =
begin + average_size + (i < batch_size % acutal_thread_num ? 1 : 0);
threads[i] = std::thread(set_output_str, begin, end, tensor);
}
for (size_t i = 0; i < acutal_thread_num; i++) threads[i].join();
}
auto end1 = std::chrono::steady_clock::now();
auto tt =
std::chrono::duration_cast<std::chrono::microseconds>(end1 - start1);
VLOG(1) << "writing a batch takes " << tt.count() << " us";
size_t acutal_thread_num =
std::min((size_t)batch_size, tensor_iterator_thread_num);
for (size_t i = 0; i < acutal_thread_num; i++) {
size_t average_size = batch_size / acutal_thread_num;
size_t begin =
average_size * i + std::min(batch_size % acutal_thread_num, i);
size_t end =
begin + average_size + (i < batch_size % acutal_thread_num ? 1 : 0);
for (size_t j = begin + 1; j < end; j++) {
if (ars[begin].size() > 0 && ars[j].size() > 0) ars[begin] += "\n";
ars[begin] += ars[j];
}
if (ars[begin].size() > 0) writer_ << ars[begin];
}
return;
}
std::vector<bool> hit(batch_size, false);
std::default_random_engine engine(0);
std::uniform_int_distribution<size_t> dist(0U, INT_MAX);
for (size_t i = 0; i < batch_size; i++) {
......@@ -206,6 +450,7 @@ void DeviceWorker::DumpField(const Scope& scope,
ars[i] += PrintLodTensor(tensor, bound.first, bound.second);
}
}
// #pragma omp parallel for
for (size_t i = 0; i < ars.size(); i++) {
if (ars[i].length() == 0) {
......
......@@ -31,6 +31,7 @@ limitations under the License. */
#include "paddle/fluid/distributed/ps/wrapper/fleet.h"
#endif
#include <map>
#include "paddle/fluid/framework/data_feed.h"
#include "paddle/fluid/framework/executor_gc_helper.h"
#include "paddle/fluid/framework/heter_util.h"
......@@ -59,7 +60,17 @@ class Scope;
namespace paddle {
namespace framework {
std::string PrintLodTensor(Tensor* tensor, int64_t start, int64_t end);
std::string PrintLodTensor(Tensor* tensor,
int64_t start,
int64_t end,
char separator = ',',
bool need_leading_separator = false);
void PrintLodTensor(Tensor* tensor,
int64_t start,
int64_t end,
std::string& output_str,
char separator = ',',
bool need_leading_separator = false);
std::pair<int64_t, int64_t> GetTensorBound(LoDTensor* tensor, int index);
bool CheckValidOutput(LoDTensor* tensor, size_t batch_size);
......@@ -230,6 +241,7 @@ class DeviceWorker {
int dump_mode_ = 0;
int dump_interval_ = 10000;
ChannelWriter<std::string> writer_;
const size_t tensor_iterator_thread_num = 16;
platform::DeviceContext* dev_ctx_ = nullptr;
};
......@@ -772,7 +784,6 @@ class HeterSectionWorker : public DeviceWorker {
static uint64_t batch_id_;
uint64_t total_ins_num_ = 0;
platform::DeviceContext* dev_ctx_ = nullptr;
bool debug_ = false;
std::vector<double> op_total_time_;
std::vector<std::string> op_name_;
......
......@@ -29,7 +29,7 @@ TEST(LodTensor, PrintLodTensor) {
std::string res = PrintLodTensor(&tensor1, -1, 2);
ASSERT_EQ(res, "access violation");
res = PrintLodTensor(&tensor1, 0, 2);
ASSERT_EQ(res, ":0.2:0.5");
ASSERT_EQ(res, "0.2,0.5");
LoDTensor tensor2;
tensor2.Resize({2});
......@@ -39,7 +39,7 @@ TEST(LodTensor, PrintLodTensor) {
res = PrintLodTensor(&tensor2, -1, 2);
ASSERT_EQ(res, "access violation");
res = PrintLodTensor(&tensor2, 0, 2);
ASSERT_EQ(res, ":1:2");
ASSERT_EQ(res, "1,2");
LoDTensor tensor3;
tensor3.Resize({2});
......@@ -47,7 +47,40 @@ TEST(LodTensor, PrintLodTensor) {
tensor3.data<double>()[0] = 0.1;
tensor3.data<double>()[1] = 0.2;
res = PrintLodTensor(&tensor3, 0, 2);
ASSERT_EQ(res, ":0.1:0.2");
ASSERT_EQ(res, "0.1,0.2");
LoDTensor tensor4;
tensor4.Resize({2});
tensor4.mutable_data<double>(platform::CPUPlace());
tensor4.data<double>()[0] = 0.1;
tensor4.data<double>()[1] = 0.2;
res = "";
PrintLodTensor(&tensor4, 0, 2, res);
// ASSERT_EQ(res, "0.1,0.2");
LoDTensor tensor5;
tensor5.Resize({2});
tensor5.mutable_data<int64_t>(platform::CPUPlace());
tensor5.data<int64_t>()[0] = 1;
tensor5.data<int64_t>()[1] = 2;
res = "";
PrintLodTensor(&tensor5, -1, 2, res);
ASSERT_EQ(res, "access violation");
res = "";
PrintLodTensor(&tensor5, 0, 2, res);
ASSERT_EQ(res, "1,2");
LoDTensor tensor6;
tensor6.Resize({2});
tensor6.mutable_data<float>(platform::CPUPlace());
tensor6.data<float>()[0] = 0.2;
tensor6.data<float>()[1] = 0.5;
res = "";
PrintLodTensor(&tensor6, -1, 2, res);
// ASSERT_EQ(res, "access violation");
res = "";
PrintLodTensor(&tensor6, 0, 2, res);
// ASSERT_EQ(res, "0.2,0.5");
}
TEST(LodTensor, GetTensorBound) {
......
......@@ -207,6 +207,12 @@ message TableAccessorParameter {
repeated TableAccessorSaveParameter table_accessor_save_param = 8;
optional SGDParameter embed_sgd_param = 10;
optional SGDParameter embedx_sgd_param = 11;
optional GraphSGDParameter graph_sgd_param = 12;
}
message GraphSGDParameter {
optional uint32 nodeid_slot = 1 [ default = 9008 ];
optional float feature_learning_rate = 2 [ default = 0.05 ];
}
message SGDParameter {
......
......@@ -51,6 +51,8 @@
}
#endif
DECLARE_bool(gpugraph_enable_hbm_table_collision_stat);
// TODO: can we do this more efficiently?
__inline__ __device__ int8_t atomicCAS(int8_t* address,
int8_t compare,
......@@ -330,8 +332,7 @@ template <typename Key,
Key unused_key,
typename Hasher = default_hash<Key>,
typename Equality = equal_to<Key>,
typename Allocator = managed_allocator<thrust::pair<Key, Element>>,
bool count_collisions = false>
typename Allocator = managed_allocator<thrust::pair<Key, Element>>>
class concurrent_unordered_map : public managed {
public:
using size_type = size_t;
......@@ -363,9 +364,12 @@ class concurrent_unordered_map : public managed {
m_allocator(a),
m_hashtbl_size(n),
m_hashtbl_capacity(n),
m_collisions(0),
m_unused_element(
unused_element) { // allocate the raw data of hash table:
m_unused_element(unused_element),
m_enable_collision_stat(false),
m_insert_times(0),
m_insert_collisions(0),
m_query_times(0),
m_query_collisions(0) { // allocate the raw data of hash table:
// m_hashtbl_values,pre-alloc it on current GPU if UM.
m_hashtbl_values = m_allocator.allocate(m_hashtbl_capacity);
constexpr int block_size = 128;
......@@ -390,9 +394,9 @@ class concurrent_unordered_map : public managed {
// Initialize kernel, set all entry to unused <K,V>
init_hashtbl<<<((m_hashtbl_size - 1) / block_size) + 1, block_size>>>(
m_hashtbl_values, m_hashtbl_size, unused_key, m_unused_element);
// CUDA_RT_CALL( cudaGetLastError() );
CUDA_RT_CALL(cudaStreamSynchronize(0));
CUDA_RT_CALL(cudaGetLastError());
m_enable_collision_stat = FLAGS_gpugraph_enable_hbm_table_collision_stat;
}
~concurrent_unordered_map() {
......@@ -572,11 +576,16 @@ class concurrent_unordered_map : public managed {
// TODO: How to handle data types less than 32 bits?
if (keys_equal(unused_key, old_key) || keys_equal(insert_key, old_key)) {
update_existing_value(existing_value, x, op);
insert_success = true;
if (m_enable_collision_stat) {
atomicAdd(&m_insert_times, 1);
}
break;
}
if (m_enable_collision_stat) {
atomicAdd(&m_insert_collisions, 1);
}
current_index = (current_index + 1) % hashtbl_size;
current_hash_bucket = &(hashtbl_values[current_index]);
}
......@@ -614,9 +623,9 @@ std::numeric_limits<mapped_type>::is_integer && sizeof(unsigned long long int)
reinterpret_cast<unsigned long long
int*>(tmp_it), unused, value ); if ( old_val == unused ) { it = tmp_it;
}
else if ( count_collisions )
else if ( m_enable_collision_stat )
{
atomicAdd( &m_collisions, 1 );
atomicAdd( &m_insert_collisions, 1 );
}
} else {
const key_type old_key = atomicCAS( &(tmp_it->first), unused_key,
......@@ -625,9 +634,9 @@ x.first );
(m_hashtbl_values+hash_tbl_idx)->second = x.second;
it = tmp_it;
}
else if ( count_collisions )
else if ( m_enable_collision_stat )
{
atomicAdd( &m_collisions, 1 );
atomicAdd( &m_insert_collisions, 1 );
}
}
#else
......@@ -648,8 +657,7 @@ x.second );
}
*/
__forceinline__ __host__ __device__ const_iterator
find(const key_type& k) const {
__forceinline__ __device__ const_iterator find(const key_type& k) {
size_type key_hash = m_hf(k);
size_type hash_tbl_idx = key_hash % m_hashtbl_size;
......@@ -667,10 +675,17 @@ x.second );
begin_ptr = m_hashtbl_values + m_hashtbl_size;
break;
}
if (m_enable_collision_stat) {
atomicAdd(&m_query_collisions, 1);
}
hash_tbl_idx = (hash_tbl_idx + 1) % m_hashtbl_size;
++counter;
}
if (m_enable_collision_stat) {
atomicAdd(&m_query_times, 1);
}
return const_iterator(
m_hashtbl_values, m_hashtbl_values + m_hashtbl_size, begin_ptr);
}
......@@ -770,7 +785,7 @@ x.second );
int assign_async(const concurrent_unordered_map& other,
cudaStream_t stream = 0) {
m_collisions = other.m_collisions;
m_insert_collisions = other.m_insert_collisions;
if (other.m_hashtbl_size <= m_hashtbl_capacity) {
m_hashtbl_size = other.m_hashtbl_size;
} else {
......@@ -795,10 +810,15 @@ x.second );
0,
stream>>>(
m_hashtbl_values, m_hashtbl_size, unused_key, m_unused_element);
if (count_collisions) m_collisions = 0;
if (m_enable_collision_stat) {
m_insert_times = 0;
m_insert_collisions = 0;
m_query_times = 0;
m_query_collisions = 0;
}
}
unsigned long long get_num_collisions() const { return m_collisions; }
unsigned long long get_num_collisions() const { return m_insert_collisions; }
void print() {
for (size_type i = 0; i < 5; ++i) {
......@@ -850,6 +870,21 @@ x.second );
return it;
}
__host__ void print_collision(int id) {
if (m_enable_collision_stat) {
printf(
"collision stat for hbm table %d, insert(%lu:%lu:%.2f), "
"query(%lu:%lu:%.2f)\n",
id,
m_insert_times,
m_insert_collisions,
m_insert_collisions / (double)m_insert_times,
m_query_times,
m_query_collisions,
m_query_collisions / (double)m_query_times);
}
}
private:
const hasher m_hf;
const key_equal m_equal;
......@@ -862,7 +897,11 @@ x.second );
size_type m_hashtbl_capacity;
value_type* m_hashtbl_values;
unsigned long long m_collisions;
bool m_enable_collision_stat;
uint64_t m_insert_times;
uint64_t m_insert_collisions;
uint64_t m_query_times;
uint64_t m_query_collisions;
};
#endif // CONCURRENT_UNORDERED_MAP_CUH
......@@ -13,11 +13,16 @@ limitations under the License. */
#ifdef PADDLE_WITH_HETERPS
#include "paddle/fluid/framework/fleet/heter_ps/feature_value.h"
#include "paddle/fluid/platform/device/gpu/gpu_primitives.h"
namespace paddle {
namespace framework {
template <typename FVAccessor>
const int CUDA_NUM_THREADS = platform::PADDLE_CUDA_NUM_THREADS;
#define GET_BLOCK(N) ((N + CUDA_NUM_THREADS - 1) / CUDA_NUM_THREADS)
#define CUDA_BLOCK(N) GET_BLOCK(N), CUDA_NUM_THREADS, 0
template <typename GPUAccessor>
__global__ void PullCopy(float** dest,
const float* src,
const int64_t* len,
......@@ -26,7 +31,7 @@ __global__ void PullCopy(float** dest,
uint64_t** keys,
uint64_t max_val_size,
int* gpu_dim,
FVAccessor feature_value_accessor) {
GPUAccessor gpu_accessor) {
CUDA_KERNEL_LOOP(i, total_len) {
int low = 0;
int high = slot_num - 1;
......@@ -42,12 +47,62 @@ __global__ void PullCopy(float** dest,
float* feature_value_ptr =
(float*)((char*)src + uint64_t(i) * uint64_t(max_val_size));
int mf_dim = gpu_dim[x] - 3;
feature_value_accessor.Select(
gpu_accessor.Select(
dest[x] + y * (mf_dim + 3), feature_value_ptr, keys[x] + y, mf_dim);
}
}
template <typename FVAccessor>
template <typename TAccess>
__global__ void PullDedupCopy(const size_t N,
const uint64_t* total_keys,
float** dest,
const float* src,
const int64_t* slot_lens,
uint64_t max_val_size,
const int* slot_dims,
const int hidden,
const int* key2slot,
const uint32_t* restore_idx,
TAccess accessor) {
CUDA_KERNEL_LOOP(idx, N) {
int i = idx / hidden;
int off = idx % hidden;
int x = key2slot[i];
int y = i - slot_lens[x];
assert(slot_dims[x] == hidden);
float* dest_ptr = dest[x] + y * hidden;
// 0 key fill zero
if (total_keys[i] == 0) {
*(dest_ptr + off) = 0;
return;
}
float* src_ptr = (float*)((char*)src + uint64_t(restore_idx[i]) *
uint64_t(max_val_size));
switch (off) {
case 0:
*(dest_ptr + off) = src_ptr[accessor.ShowIndex()];
break;
case 1:
*(dest_ptr + off) = src_ptr[accessor.ClickIndex()];
break;
case 2:
*(dest_ptr + off) = src_ptr[accessor.EmbedWIndex()];
break;
default:
if (src_ptr[accessor.MfSizeIndex()] == 0) {
*(dest_ptr + off) = 0;
} else {
*(dest_ptr + off) = src_ptr[accessor.EmbedxWIndex() + off - 3];
}
break;
}
}
}
template <typename GPUAccessor>
__global__ void PushCopyWithPool(float* dest,
float** src,
int64_t* len,
......@@ -57,7 +112,7 @@ __global__ void PushCopyWithPool(float* dest,
int* slot_vector,
int* mf_dim_vector,
size_t grad_value_size,
FVAccessor feature_value_accessor) {
GPUAccessor gpu_accessor) {
CUDA_KERNEL_LOOP(i, total_len) {
int low = 0;
int high = slot_num - 1;
......@@ -72,24 +127,167 @@ __global__ void PushCopyWithPool(float* dest,
int y = i - (x ? len[low - 1] : 0);
float* cur = (float*)((char*)dest + i * grad_value_size);
cur[feature_value_accessor.common_push_value.SlotIndex()] =
(float)slot_vector[x];
cur[gpu_accessor.common_push_value.SlotIndex()] = (float)slot_vector[x];
int mf_dim = mf_dim_vector[x];
cur[feature_value_accessor.common_push_value.MfDimIndex()] = mf_dim;
cur[gpu_accessor.common_push_value.MfDimIndex()] = mf_dim;
cur[feature_value_accessor.common_push_value.ShowIndex()] =
cur[gpu_accessor.common_push_value.ShowIndex()] =
*(src[x] + y * (mf_dim + 3));
cur[feature_value_accessor.common_push_value.ClickIndex()] =
cur[gpu_accessor.common_push_value.ClickIndex()] =
*(src[x] + y * (mf_dim + 3) + 1);
cur[feature_value_accessor.common_push_value.EmbedGIndex()] =
cur[gpu_accessor.common_push_value.EmbedGIndex()] =
*(src[x] + y * (mf_dim + 3) + 2) * -1. * bs;
for (int j = 0; j < mf_dim; j++) {
cur[feature_value_accessor.common_push_value.EmbedxGIndex() + j] =
cur[gpu_accessor.common_push_value.EmbedxGIndex() + j] =
*(src[x] + y * (mf_dim + 3) + 3 + j) * -1. * bs;
}
}
}
template <typename TAccess>
__global__ void PushMergeCopyAtomic(const size_t N,
const uint64_t* total_keys,
float* dest,
float** src,
const int hidden,
const int bs,
const int* slot_vector,
const int* slot_dims,
const int64_t* slot_lens,
const int* key2slot,
const uint32_t* d_restore_idx,
size_t grad_value_size,
TAccess accessor) {
CUDA_KERNEL_LOOP(idx, N) {
int i = idx / hidden;
int off = idx % hidden;
// filter 0 keys
if (total_keys[i] == 0) {
return;
}
int x = key2slot[i];
int y = i - slot_lens[x];
const float* ptr = src[x] + y * hidden;
float* cur = (float*)((char*)dest + d_restore_idx[i] * grad_value_size);
int mf_dim = slot_dims[x] - 3;
switch (off) {
case 0:
cur[accessor.SlotIndex()] = (float)slot_vector[x];
cur[accessor.MfDimIndex()] = mf_dim;
paddle::platform::CudaAtomicAdd(&cur[accessor.ShowIndex()],
*(ptr + off));
break;
case 1:
paddle::platform::CudaAtomicAdd(&cur[accessor.ClickIndex()],
*(ptr + off));
break;
case 2:
paddle::platform::CudaAtomicAdd(&cur[accessor.EmbedGIndex()],
*(ptr + off) * -1. * bs);
break;
default:
int embedx_idx = off - 3;
if (mf_dim < embedx_idx) {
return;
}
paddle::platform::CudaAtomicAdd(
&cur[accessor.EmbedxGIndex() + embedx_idx],
*(ptr + off) * -1. * bs);
break;
}
}
}
#define SUM_GRAD_VALUE \
for (uint32_t j = 0; j < count; ++j) { \
const uint32_t& pos = d_sort_idx[start + j]; \
const int& x = key2slot[pos]; \
y = pos - slot_lens[x]; \
val += *(reinterpret_cast<float*>(src[x] + y * hidden + off)); \
}
template <typename TAccess>
__global__ void PushMergeCopy(const size_t N,
const uint64_t* total_keys,
float* dest,
float** src,
const int hidden,
const int bs,
const int* slot_vector,
const int* slot_dims,
const int64_t* slot_lens,
const int* key2slot,
const uint32_t* d_sort_idx,
const uint32_t* d_sort_offset,
const uint32_t* d_sort_cnt,
size_t grad_value_size,
TAccess accessor) {
CUDA_KERNEL_LOOP(idx, N) {
int i = idx / hidden;
int off = idx % hidden;
// filter 0 keys
float* cur = (float*)((char*)dest + i * grad_value_size);
if (total_keys[i] == 0) {
switch (off) {
case 0:
cur[accessor.SlotIndex()] = 0;
cur[accessor.MfDimIndex()] = 0;
cur[accessor.ShowIndex()] = 0.0;
break;
case 1:
cur[accessor.ClickIndex()] = 0.0;
break;
case 2:
cur[accessor.EmbedGIndex()] = 0.0;
break;
default:
cur[accessor.EmbedxGIndex() + off - 3] = 0.0;
break;
}
return;
}
const uint32_t& start = d_sort_offset[i];
const uint32_t& count = d_sort_cnt[i];
const uint32_t& pos = d_sort_idx[start];
const int& x = key2slot[pos];
int y = pos - slot_lens[x];
int mf_dim = slot_dims[x] - 3;
double val = 0.0;
switch (off) {
case 0:
cur[accessor.SlotIndex()] = (float)slot_vector[x];
cur[accessor.MfDimIndex()] = mf_dim;
SUM_GRAD_VALUE
cur[accessor.ShowIndex()] = val;
break;
case 1:
SUM_GRAD_VALUE
cur[accessor.ClickIndex()] = val;
break;
case 2:
SUM_GRAD_VALUE
cur[accessor.EmbedGIndex()] = val * -1. * bs;
break;
default:
int embedx_idx = off - 3;
if (mf_dim < embedx_idx) {
cur[accessor.EmbedxGIndex() + embedx_idx] = 0.0;
return;
}
SUM_GRAD_VALUE
cur[accessor.EmbedxGIndex() + embedx_idx] = val * -1. * bs;
break;
}
}
}
template <typename GPUAccessor>
void AccessorWrapper<GPUAccessor>::CopyForPullImpl(
const paddle::platform::Place& place,
......@@ -183,6 +381,118 @@ void AccessorWrapper<GPUAccessor>::CopyForPushImpl(
cudaStreamSynchronize(stream);
}
template <typename GPUAccessor>
void AccessorWrapper<GPUAccessor>::CopyForPullDedupImpl(
const paddle::platform::Place& place,
const uint64_t* total_keys,
float** gpu_values,
const float* total_values_gpu,
const int64_t* slot_lens,
const int* key2slot,
const int hidden_size,
const int64_t total_length,
const int* slot_dims,
const uint32_t* gpu_restore_idx,
int pull_value_size) {
auto stream = dynamic_cast<paddle::platform::CUDADeviceContext*>(
paddle::platform::DeviceContextPool::Instance().Get(place))
->stream();
size_t N = total_length * hidden_size;
PullDedupCopy<<<CUDA_BLOCK(N), stream>>>(N,
total_keys,
gpu_values,
total_values_gpu,
slot_lens,
pull_value_size,
slot_dims,
hidden_size,
key2slot,
gpu_restore_idx,
gpu_accessor_.common_pull_value);
cudaStreamSynchronize(stream);
}
template <typename GPUAccessor>
void AccessorWrapper<GPUAccessor>::CopyForPushDedupImpl(
const paddle::platform::Place& place,
const uint64_t* total_keys,
float** grad_values,
float* total_grad_values_gpu,
const int* slots,
const int64_t* slot_lens,
const int hidden_size,
const int64_t total_length,
const int64_t dedup_length,
const int batch_size,
const int* slot_dims,
const int* key2slot,
const uint32_t* d_restore_idx,
const size_t grad_value_size) {
auto stream = dynamic_cast<paddle::platform::CUDADeviceContext*>(
paddle::platform::DeviceContextPool::Instance().Get(place))
->stream();
cudaMemsetAsync(
total_grad_values_gpu, 0, dedup_length * grad_value_size, stream);
size_t N = total_length * hidden_size;
PushMergeCopyAtomic<<<CUDA_BLOCK(N), stream>>>(
N,
total_keys,
total_grad_values_gpu,
grad_values,
hidden_size,
batch_size,
slots,
slot_dims,
slot_lens,
key2slot,
d_restore_idx,
grad_value_size,
gpu_accessor_.common_push_value);
cudaStreamSynchronize(stream);
}
template <typename GPUAccessor>
void AccessorWrapper<GPUAccessor>::CopyForPushDedupImpl(
const paddle::platform::Place& place,
const uint64_t* total_keys,
float** grad_values,
float* total_grad_values_gpu,
const int* slots,
const int64_t* slot_lens,
const int hidden_size,
const int64_t total_length,
const int64_t dedup_length,
const int batch_size,
const int* slot_dims,
const int* key2slot,
const uint32_t* gpu_sort_idx,
const uint32_t* gpu_sort_offset,
const uint32_t* gpu_sort_lens,
const size_t grad_value_size) {
auto stream = dynamic_cast<paddle::platform::CUDADeviceContext*>(
paddle::platform::DeviceContextPool::Instance().Get(place))
->stream();
// merge all grad to one
size_t N = dedup_length * hidden_size;
PushMergeCopy<<<CUDA_BLOCK(N), stream>>>(N,
total_keys,
total_grad_values_gpu,
grad_values,
hidden_size,
batch_size,
slots,
slot_dims,
slot_lens,
key2slot,
gpu_sort_idx,
gpu_sort_offset,
gpu_sort_lens,
grad_value_size,
gpu_accessor_.common_push_value);
cudaStreamSynchronize(stream);
}
#ifdef PADDLE_WITH_PSCORE
template class AccessorWrapper<CommonFeatureValueAccessor>;
#endif
......
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <cuda.h>
#include <cuda_runtime.h>
#include <device_launch_parameters.h>
#include <stdio.h>
#include "paddle/fluid/platform/enforce.h"
namespace paddle {
namespace framework {
#define CUDA_CHECK(cmd) \
do { \
cudaError_t e = cmd; \
CHECK(e == cudaSuccess) << "Cuda failure " << __FILE__ << ":" << __LINE__ \
<< " " << cudaGetErrorString(e) << std::endl; \
} while (0)
class CudaDeviceRestorer {
public:
CudaDeviceRestorer() { cudaGetDevice(&dev_); }
~CudaDeviceRestorer() { cudaSetDevice(dev_); }
private:
int dev_;
};
inline void debug_gpu_memory_info(int gpu_id, const char* desc) {
CudaDeviceRestorer r;
size_t avail{0};
size_t total{0};
cudaSetDevice(gpu_id);
auto err = cudaMemGetInfo(&avail, &total);
PADDLE_ENFORCE_EQ(
err,
cudaSuccess,
platform::errors::InvalidArgument("cudaMemGetInfo failed!"));
VLOG(0) << "updatex gpu memory on device " << gpu_id << ", "
<< "avail=" << avail / 1024.0 / 1024.0 / 1024.0 << "g, "
<< "total=" << total / 1024.0 / 1024.0 / 1024.0 << "g, "
<< "use_rate=" << (total - avail) / double(total) << "%, "
<< "desc=" << desc;
}
inline void debug_gpu_memory_info(const char* desc) {
CudaDeviceRestorer r;
int device_num = 0;
auto err = cudaGetDeviceCount(&device_num);
PADDLE_ENFORCE_EQ(
err,
cudaSuccess,
platform::errors::InvalidArgument("cudaGetDeviceCount failed!"));
size_t avail{0};
size_t total{0};
for (int i = 0; i < device_num; ++i) {
cudaSetDevice(i);
auto err = cudaMemGetInfo(&avail, &total);
PADDLE_ENFORCE_EQ(
err,
cudaSuccess,
platform::errors::InvalidArgument("cudaMemGetInfo failed!"));
VLOG(0) << "update gpu memory on device " << i << ", "
<< "avail=" << avail / 1024.0 / 1024.0 / 1024.0 << "g, "
<< "total=" << total / 1024.0 / 1024.0 / 1024.0 << "g, "
<< "use_rate=" << (total - avail) / double(total) << "%, "
<< "desc=" << desc;
}
}
}; // namespace framework
}; // namespace paddle
......@@ -32,39 +32,76 @@ class GraphGpuWrapper {
}
static std::shared_ptr<GraphGpuWrapper> s_instance_;
void initialize();
void test();
void finalize();
void set_device(std::vector<int> ids);
void init_service();
void set_up_types(std::vector<std::string>& edge_type,
std::vector<std::string>& node_type);
void upload_batch(int idx, std::vector<std::vector<int64_t>>& ids);
void upload_batch(int type,
int idx,
int slice_num,
const std::string& edge_type);
void upload_batch(int type, int slice_num, int slot_num);
void add_table_feat_conf(std::string table_name,
std::string feat_name,
std::string feat_dtype,
int feat_shape);
void load_edge_file(std::string name, std::string filepath, bool reverse);
void load_node_file(std::string name, std::string filepath);
void load_node_and_edge(std::string etype,
std::string ntype,
std::string epath,
std::string npath,
int part_num,
bool reverse);
int32_t load_next_partition(int idx);
int32_t get_partition_num(int idx);
void load_node_weight(int type_id, int idx, std::string path);
void export_partition_files(int idx, std::string file_path);
std::vector<int64_t> get_partition(int idx, int num);
std::vector<uint64_t> get_partition(int idx, int num);
void make_partitions(int idx, int64_t byte_size, int device_len);
void make_complementary_graph(int idx, int64_t byte_size);
void set_search_level(int level);
void init_search_level(int level);
std::vector<std::vector<int64_t>> get_all_id(int type,
int idx,
int slice_num);
NodeQueryResult query_node_list(int gpu_id, int start, int query_size);
int get_all_id(int type,
int slice_num,
std::vector<std::vector<uint64_t>>* output);
int get_all_neighbor_id(int type,
int slice_num,
std::vector<std::vector<uint64_t>>* output);
int get_all_id(int type,
int idx,
int slice_num,
std::vector<std::vector<uint64_t>>* output);
int get_all_neighbor_id(int type,
int idx,
int slice_num,
std::vector<std::vector<uint64_t>>* output);
int get_all_feature_ids(int type,
int idx,
int slice_num,
std::vector<std::vector<uint64_t>>* output);
NodeQueryResult query_node_list(int gpu_id,
int idx,
int start,
int query_size);
NeighborSampleResult graph_neighbor_sample_v3(NeighborSampleQuery q,
bool cpu_switch);
std::vector<int64_t> graph_neighbor_sample(int gpu_id,
std::vector<int64_t>& key,
int sample_size);
NeighborSampleResult graph_neighbor_sample(int gpu_id,
uint64_t* device_keys,
int walk_degree,
int len);
std::vector<uint64_t> graph_neighbor_sample(int gpu_id,
int idx,
std::vector<uint64_t>& key,
int sample_size);
void set_feature_separator(std::string ch);
int get_feature_of_nodes(int gpu_id,
uint64_t* d_walk,
uint64_t* d_offset,
uint32_t size,
int slot_num);
void init_sample_status();
void free_sample_status();
std::unordered_map<std::string, int> edge_to_id, feature_to_id;
std::vector<std::string> id_to_feature, id_to_edge;
std::vector<std::unordered_map<std::string, int>> table_feat_mapping;
......@@ -75,6 +112,9 @@ class GraphGpuWrapper {
std::vector<int> device_id_mapping;
int search_level = 1;
void* graph_table;
int upload_num = 8;
std::shared_ptr<::ThreadPool> upload_task_pool;
std::string feature_separator_ = std::string(" ");
};
#endif
} // namespace framework
......
......@@ -50,15 +50,16 @@ TEST(TEST_FLEET, graph_comm) {
}
std::vector<int> neighbor_offset(gpu_count, 0), node_index(gpu_count, 0);
for (int i = 0; i < graph_list.size(); i++) {
graph_list[i].node_list = new GpuPsGraphNode[graph_list[i].node_size];
graph_list[i].node_list = new uint64_t[graph_list[i].node_size];
graph_list[i].node_info_list = new GpuPsNodeInfo[graph_list[i].node_size];
graph_list[i].neighbor_list = new int64_t[graph_list[i].neighbor_size];
}
for (int i = 0; i < node_count; i++) {
ind = i % gpu_count;
graph_list[ind].node_list[node_index[ind]].node_id = i;
graph_list[ind].node_list[node_index[ind]].neighbor_offset =
graph_list[ind].node_list[node_index[ind]] = i;
graph_list[ind].node_info_list[node_index[ind]].neighbor_offset =
neighbor_offset[ind];
graph_list[ind].node_list[node_index[ind]].neighbor_size =
graph_list[ind].node_info_list[node_index[ind]].neighbor_size =
neighbors[i].size();
for (auto x : neighbors[i]) {
graph_list[ind].neighbor_list[neighbor_offset[ind]++] = x;
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册