// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include // NOLINT #include #include #include #include #include #include #include #include #include #include "paddle/fluid/distributed/ps/table/accessor.h" #include "paddle/fluid/distributed/ps/table/common_table.h" #include "paddle/fluid/distributed/ps/table/graph/class_macro.h" #include "paddle/fluid/distributed/ps/table/graph/graph_node.h" #include "paddle/fluid/string/string_helper.h" #include "paddle/phi/core/utils/rw_lock.h" #ifdef PADDLE_WITH_HETERPS #include "paddle/fluid/framework/fleet/heter_ps/gpu_graph_node.h" #endif namespace paddle { namespace distributed { class GraphShard { public: size_t get_size(); GraphShard() {} ~GraphShard(); std::vector &get_bucket() { return bucket; } std::vector get_batch(int start, int end, int step); std::vector get_ids_by_range(int start, int end) { std::vector res; for (int i = start; i < end && i < (int)bucket.size(); i++) { res.push_back(bucket[i]->get_id()); } return res; } GraphNode *add_graph_node(int64_t id); GraphNode *add_graph_node(Node *node); FeatureNode *add_feature_node(int64_t id); Node *find_node(int64_t id); void delete_node(int64_t id); void clear(); void add_neighbor(int64_t id, int64_t dst_id, float weight); std::unordered_map &get_node_location() { return node_location; } private: std::unordered_map node_location; std::vector bucket; }; enum LRUResponse { ok = 0, blocked = 1, err = 2 }; struct SampleKey { int64_t node_key; size_t sample_size; bool is_weighted; SampleKey(int64_t _node_key, size_t _sample_size, bool _is_weighted) : node_key(_node_key), sample_size(_sample_size), is_weighted(_is_weighted) {} bool operator==(const SampleKey &s) const { return node_key == s.node_key && sample_size == s.sample_size && is_weighted == s.is_weighted; } }; class SampleResult { public: size_t actual_size; std::shared_ptr buffer; SampleResult(size_t _actual_size, std::shared_ptr &_buffer) : actual_size(_actual_size), buffer(_buffer) {} SampleResult(size_t _actual_size, char *_buffer) : actual_size(_actual_size), buffer(_buffer, [](char *p) { delete[] p; }) {} ~SampleResult() {} }; template class LRUNode { public: LRUNode(K _key, V _data, size_t _ttl) : key(_key), data(_data), ttl(_ttl) { next = pre = NULL; } K key; V data; size_t ttl; // time to live LRUNode *pre, *next; }; template class ScaledLRU; template class RandomSampleLRU { public: RandomSampleLRU(ScaledLRU *_father) { father = _father; remove_count = 0; node_size = 0; node_head = node_end = NULL; global_ttl = father->ttl; total_diff = 0; } ~RandomSampleLRU() { LRUNode *p; while (node_head != NULL) { p = node_head->next; delete node_head; node_head = p; } } LRUResponse query(K *keys, size_t length, std::vector> &res) { if (pthread_rwlock_tryrdlock(&father->rwlock) != 0) return LRUResponse::blocked; // pthread_rwlock_rdlock(&father->rwlock); int init_size = node_size - remove_count; process_redundant(length * 3); for (size_t i = 0; i < length; i++) { auto iter = key_map.find(keys[i]); if (iter != key_map.end()) { res.emplace_back(keys[i], iter->second->data); iter->second->ttl--; if (iter->second->ttl == 0) { remove(iter->second); if (remove_count != 0) remove_count--; } else { move_to_tail(iter->second); } } } total_diff += node_size - remove_count - init_size; if (total_diff >= 500 || total_diff < -500) { father->handle_size_diff(total_diff); total_diff = 0; } pthread_rwlock_unlock(&father->rwlock); return LRUResponse::ok; } LRUResponse insert(K *keys, V *data, size_t length) { if (pthread_rwlock_tryrdlock(&father->rwlock) != 0) return LRUResponse::blocked; // pthread_rwlock_rdlock(&father->rwlock); int init_size = node_size - remove_count; process_redundant(length * 3); for (size_t i = 0; i < length; i++) { auto iter = key_map.find(keys[i]); if (iter != key_map.end()) { move_to_tail(iter->second); iter->second->ttl = global_ttl; iter->second->data = data[i]; } else { LRUNode *temp = new LRUNode(keys[i], data[i], global_ttl); add_new(temp); } } total_diff += node_size - remove_count - init_size; if (total_diff >= 500 || total_diff < -500) { father->handle_size_diff(total_diff); total_diff = 0; } pthread_rwlock_unlock(&father->rwlock); return LRUResponse::ok; } void remove(LRUNode *node) { fetch(node); node_size--; key_map.erase(node->key); delete node; } void process_redundant(int process_size) { size_t length = std::min(remove_count, process_size); while (length--) { remove(node_head); remove_count--; } // std::cerr<<"after remove_count = "< *node) { fetch(node); place_at_tail(node); } void add_new(LRUNode *node) { node->ttl = global_ttl; place_at_tail(node); node_size++; key_map[node->key] = node; } void place_at_tail(LRUNode *node) { if (node_end == NULL) { node_head = node_end = node; node->next = node->pre = NULL; } else { node_end->next = node; node->pre = node_end; node->next = NULL; node_end = node; } } void fetch(LRUNode *node) { if (node->pre) { node->pre->next = node->next; } else { node_head = node->next; } if (node->next) { node->next->pre = node->pre; } else { node_end = node->pre; } } private: std::unordered_map *> key_map; ScaledLRU *father; size_t global_ttl, size_limit; int node_size, total_diff; LRUNode *node_head, *node_end; friend class ScaledLRU; int remove_count; }; template class ScaledLRU { public: ScaledLRU(size_t _shard_num, size_t size_limit, size_t _ttl) : size_limit(size_limit), ttl(_ttl) { shard_num = _shard_num; pthread_rwlock_init(&rwlock, NULL); stop = false; thread_pool.reset(new ::ThreadPool(1)); global_count = 0; lru_pool = std::vector>(shard_num, RandomSampleLRU(this)); shrink_job = std::thread([this]() -> void { while (true) { { std::unique_lock lock(mutex_); cv_.wait_for(lock, std::chrono::milliseconds(20000)); if (stop) { return; } } auto status = thread_pool->enqueue([this]() -> int { return shrink(); }); status.wait(); } }); shrink_job.detach(); } ~ScaledLRU() { std::unique_lock lock(mutex_); stop = true; cv_.notify_one(); } LRUResponse query(size_t index, K *keys, size_t length, std::vector> &res) { return lru_pool[index].query(keys, length, res); } LRUResponse insert(size_t index, K *keys, V *data, size_t length) { return lru_pool[index].insert(keys, data, length); } int shrink() { int node_size = 0; for (size_t i = 0; i < lru_pool.size(); i++) { node_size += lru_pool[i].node_size - lru_pool[i].remove_count; } if ((size_t)node_size <= size_t(1.1 * size_limit) + 1) return 0; if (pthread_rwlock_wrlock(&rwlock) == 0) { // VLOG(0)<"in shrink\n"; global_count = 0; for (size_t i = 0; i < lru_pool.size(); i++) { global_count += lru_pool[i].node_size - lru_pool[i].remove_count; } // VLOG(0)<<"global_count "< size_limit) { size_t remove = global_count - size_limit; for (size_t i = 0; i < lru_pool.size(); i++) { lru_pool[i].total_diff = 0; lru_pool[i].remove_count += 1.0 * (lru_pool[i].node_size - lru_pool[i].remove_count) / global_count * remove; // VLOG(0)< int(1.25 * size_limit)) { // VLOG(0)<<"global_count too large "<enqueue([this]() -> int { return shrink(); }); } } } size_t get_ttl() { return ttl; } private: pthread_rwlock_t rwlock; size_t shard_num; int global_count; size_t size_limit, total, hit; size_t ttl; bool stop; std::thread shrink_job; std::vector> lru_pool; mutable std::mutex mutex_; std::condition_variable cv_; std::shared_ptr<::ThreadPool> thread_pool; friend class RandomSampleLRU; }; #ifdef PADDLE_WITH_HETERPS enum GraphSamplerStatus { waiting = 0, running = 1, terminating = 2 }; class GraphTable; class GraphSampler { public: GraphSampler() { status = GraphSamplerStatus::waiting; thread_pool.reset(new ::ThreadPool(1)); callback = [](std::vector &res) { return; }; } virtual int run_graph_sampling() = 0; virtual int start_graph_sampling() { if (status != GraphSamplerStatus::waiting) { return -1; } std::promise prom; std::future fut = prom.get_future(); graph_sample_task_over = thread_pool->enqueue([&prom, this]() { prom.set_value(0); status = GraphSamplerStatus::running; return run_graph_sampling(); }); return fut.get(); } virtual void init(size_t gpu_num, GraphTable *graph_table, std::vector args) = 0; virtual void set_graph_sample_callback( std::function &)> callback) { this->callback = callback; } virtual int end_graph_sampling() { if (status == GraphSamplerStatus::running) { status = GraphSamplerStatus::terminating; return graph_sample_task_over.get(); } return -1; } virtual GraphSamplerStatus get_graph_sampler_status() { return status; } protected: std::function &)> callback; std::shared_ptr<::ThreadPool> thread_pool; GraphSamplerStatus status; std::future graph_sample_task_over; std::vector sample_res; }; #endif class GraphTable : public SparseTable { public: GraphTable() { use_cache = false; shard_num = 0; #ifdef PADDLE_WITH_HETERPS gpups_mode = false; #endif rw_lock.reset(new pthread_rwlock_t()); } virtual ~GraphTable(); virtual int32_t pull_graph_list(int start, int size, std::unique_ptr &buffer, int &actual_size, bool need_feature, int step); virtual int32_t random_sample_neighbors( int64_t *node_ids, int sample_size, std::vector> &buffers, std::vector &actual_sizes, bool need_weight); int32_t random_sample_nodes(int sample_size, std::unique_ptr &buffers, int &actual_sizes); virtual int32_t get_nodes_ids_by_ranges( std::vector> ranges, std::vector &res); virtual int32_t initialize() { return 0; } virtual int32_t initialize(const TableParameter &config, const FsClientParameter &fs_config); virtual int32_t initialize(const GraphParameter &config); int32_t load(const std::string &path, const std::string ¶m); int32_t load_graph_split_config(const std::string &path); int32_t load_edges(const std::string &path, bool reverse); int32_t load_nodes(const std::string &path, std::string node_type); int32_t add_graph_node(std::vector &id_list, std::vector &is_weight_list); int32_t remove_graph_node(std::vector &id_list); int32_t get_server_index_by_id(int64_t id); Node *find_node(int64_t id); virtual int32_t Pull(TableContext &context) { return 0; } virtual int32_t Push(TableContext &context) { return 0; } virtual int32_t pull_sparse(float *values, const PullSparseValue &pull_value) { return 0; } virtual int32_t push_sparse(const uint64_t *keys, const float *values, size_t num) { return 0; } virtual int32_t clear_nodes(); virtual void clear() {} virtual int32_t flush() { return 0; } virtual int32_t shrink(const std::string ¶m) { return 0; } //指定保存路径 virtual int32_t save(const std::string &path, const std::string &converter) { return 0; } virtual int32_t initialize_shard() { return 0; } virtual int32_t set_shard(size_t shard_idx, size_t server_num) { _shard_idx = shard_idx; /* _shard_num is not used in graph_table, this following operation is for the purpose of being compatible with base class table. */ _shard_num = server_num; this->server_num = server_num; return 0; } virtual uint32_t get_thread_pool_index_by_shard_index(int64_t shard_index); virtual uint32_t get_thread_pool_index(int64_t node_id); virtual std::pair parse_feature(std::string feat_str); virtual int32_t get_node_feat(const std::vector &node_ids, const std::vector &feature_names, std::vector> &res); virtual int32_t set_node_feat( const std::vector &node_ids, const std::vector &feature_names, const std::vector> &res); size_t get_server_num() { return server_num; } virtual int32_t make_neighbor_sample_cache(size_t size_limit, size_t ttl) { { std::unique_lock lock(mutex_); if (use_cache == false) { scaled_lru.reset(new ScaledLRU( task_pool_size_, size_limit, ttl)); use_cache = true; } } return 0; } #ifdef PADDLE_WITH_HETERPS virtual int32_t start_graph_sampling() { return this->graph_sampler->start_graph_sampling(); } virtual int32_t end_graph_sampling() { return this->graph_sampler->end_graph_sampling(); } virtual int32_t set_graph_sample_callback( std::function &)> callback) { graph_sampler->set_graph_sample_callback(callback); return 0; } // virtual GraphSampler *get_graph_sampler() { return graph_sampler.get(); } #endif protected: std::vector shards, extra_shards; size_t shard_start, shard_end, server_num, shard_num_per_server, shard_num; int task_pool_size_ = 24; const int random_sample_nodes_ranges = 3; std::vector feat_name; std::vector feat_dtype; std::vector feat_shape; std::unordered_map feat_id_map; std::string table_name; std::string table_type; std::vector> _shards_task_pool; std::vector> _shards_task_rng_pool; std::shared_ptr> scaled_lru; std::unordered_set extra_nodes; std::unordered_map extra_nodes_to_thread_index; bool use_cache, use_duplicate_nodes; mutable std::mutex mutex_; std::shared_ptr rw_lock; #ifdef PADDLE_WITH_HETERPS // paddle::framework::GpuPsGraphTable gpu_graph_table; bool gpups_mode; // std::shared_ptr<::ThreadPool> graph_sample_pool; std::shared_ptr graph_sampler; REGISTER_GRAPH_FRIEND_CLASS(2, CompleteGraphSampler, BasicBfsGraphSampler) #endif }; #ifdef PADDLE_WITH_HETERPS REGISTER_PSCORE_REGISTERER(GraphSampler); class CompleteGraphSampler : public GraphSampler { public: CompleteGraphSampler() {} ~CompleteGraphSampler() {} // virtual pthread_rwlock_t *export_rw_lock(); virtual int run_graph_sampling(); virtual void init(size_t gpu_num, GraphTable *graph_table, std::vector args_); protected: GraphTable *graph_table; std::vector> sample_nodes; std::vector> sample_neighbors; // std::vector sample_res; // std::shared_ptr random; int gpu_num; }; class BasicBfsGraphSampler : public GraphSampler { public: BasicBfsGraphSampler() {} ~BasicBfsGraphSampler() {} // virtual pthread_rwlock_t *export_rw_lock(); virtual int run_graph_sampling(); virtual void init(size_t gpu_num, GraphTable *graph_table, std::vector args_); protected: GraphTable *graph_table; // std::vector> sample_nodes; std::vector> sample_nodes; std::vector> sample_neighbors; size_t gpu_num; int node_num_for_each_shard, edge_num_for_each_node; int rounds, interval; std::vector>> sample_neighbors_map; }; #endif } // namespace distributed }; // namespace paddle namespace std { template <> struct hash { size_t operator()(const paddle::distributed::SampleKey &s) const { return s.node_key ^ s.sample_size; } }; }