// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include // NOLINT #include #include #include #include #include #include #include #include #include #include "paddle/fluid/distributed/table/accessor.h" #include "paddle/fluid/distributed/table/common_table.h" #include "paddle/fluid/distributed/table/graph/graph_node.h" #include "paddle/fluid/framework/rw_lock.h" #include "paddle/fluid/string/string_helper.h" namespace paddle { namespace distributed { class GraphShard { public: size_t get_size(); GraphShard() {} GraphShard(int shard_num) { this->shard_num = shard_num; } ~GraphShard(); std::vector &get_bucket() { return bucket; } std::vector get_batch(int start, int end, int step); std::vector get_ids_by_range(int start, int end) { std::vector res; for (int i = start; i < end && i < (int)bucket.size(); i++) { res.push_back(bucket[i]->get_id()); } return res; } GraphNode *add_graph_node(uint64_t id); FeatureNode *add_feature_node(uint64_t id); Node *find_node(uint64_t id); void delete_node(uint64_t id); void clear(); void add_neighboor(uint64_t id, uint64_t dst_id, float weight); std::unordered_map get_node_location() { return node_location; } private: std::unordered_map node_location; int shard_num; std::vector bucket; }; enum LRUResponse { ok = 0, blocked = 1, err = 2 }; struct SampleKey { uint64_t node_key; size_t sample_size; SampleKey(uint64_t _node_key, size_t _sample_size) : node_key(_node_key), sample_size(_sample_size) { // std::cerr<<"in constructor of samplekey\n"; } bool operator==(const SampleKey &s) const { return node_key == s.node_key && sample_size == s.sample_size; } }; class SampleResult { public: size_t actual_size; std::shared_ptr buffer; SampleResult(size_t _actual_size, std::shared_ptr &_buffer) : actual_size(_actual_size), buffer(_buffer) {} SampleResult(size_t _actual_size, char *_buffer) : actual_size(_actual_size), buffer(_buffer, [](char *p) { delete[] p; }) {} ~SampleResult() {} }; template class LRUNode { public: LRUNode(K _key, V _data, size_t _ttl) : key(_key), data(_data), ttl(_ttl) { next = pre = NULL; } std::chrono::milliseconds ms; // the last hit time K key; V data; size_t ttl; // time to live LRUNode *pre, *next; }; template class ScaledLRU; template class RandomSampleLRU { public: RandomSampleLRU(ScaledLRU *_father) : father(_father) { node_size = 0; node_head = node_end = NULL; global_ttl = father->ttl; } ~RandomSampleLRU() { LRUNode *p; while (node_head != NULL) { p = node_head->next; delete node_head; node_head = p; } } LRUResponse query(K *keys, size_t length, std::vector> &res) { if (pthread_rwlock_tryrdlock(&father->rwlock) != 0) return LRUResponse::blocked; int init_node_size = node_size; try { for (size_t i = 0; i < length; i++) { auto iter = key_map.find(keys[i]); if (iter != key_map.end()) { res.push_back({keys[i], iter->second->data}); iter->second->ttl--; if (iter->second->ttl == 0) { remove(iter->second, true); } else { remove(iter->second); add_to_tail(iter->second); } } } } catch (...) { pthread_rwlock_unlock(&father->rwlock); father->handle_size_diff(node_size - init_node_size); return LRUResponse::err; } pthread_rwlock_unlock(&father->rwlock); father->handle_size_diff(node_size - init_node_size); return LRUResponse::ok; } LRUResponse insert(K *keys, V *data, size_t length) { if (pthread_rwlock_tryrdlock(&father->rwlock) != 0) return LRUResponse::blocked; int init_node_size = node_size; try { for (size_t i = 0; i < length; i++) { auto iter = key_map.find(keys[i]); if (iter != key_map.end()) { iter->second->ttl = global_ttl; remove(iter->second); add_to_tail(iter->second); iter->second->data = data[i]; } else { LRUNode *temp = new LRUNode(keys[i], data[i], global_ttl); add_to_tail(temp); key_map[keys[i]] = temp; } } } catch (...) { pthread_rwlock_unlock(&father->rwlock); father->handle_size_diff(node_size - init_node_size); return LRUResponse::err; } pthread_rwlock_unlock(&father->rwlock); father->handle_size_diff(node_size - init_node_size); return LRUResponse::ok; } void remove(LRUNode *node, bool del = false) { if (node->pre) { node->pre->next = node->next; } else { node_head = node->next; } if (node->next) { node->next->pre = node->pre; } else { node_end = node->pre; } node_size--; if (del) { delete node; key_map.erase(node->key); } } void add_to_tail(LRUNode *node) { if (node_end == NULL) { node_head = node_end = node; node->next = node->pre = NULL; } else { node_end->next = node; node->pre = node_end; node->next = NULL; node_end = node; } node_size++; node->ms = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()); } private: std::unordered_map *> key_map; ScaledLRU *father; size_t global_ttl; int node_size; LRUNode *node_head, *node_end; friend class ScaledLRU; }; template class ScaledLRU { public: ScaledLRU(size_t shard_num, size_t size_limit, size_t _ttl) : size_limit(size_limit), ttl(_ttl) { pthread_rwlock_init(&rwlock, NULL); stop = false; thread_pool.reset(new ::ThreadPool(1)); global_count = 0; lru_pool = std::vector>(shard_num, RandomSampleLRU(this)); shrink_job = std::thread([this]() -> void { while (true) { { std::unique_lock lock(mutex_); cv_.wait_for(lock, std::chrono::milliseconds(3000)); if (stop) { return; } } // shrink(); // std::cerr<<"shrink job in queue\n"; auto status = thread_pool->enqueue([this]() -> int { return shrink(); }); status.wait(); } }); shrink_job.detach(); } ~ScaledLRU() { std::unique_lock lock(mutex_); // std::cerr<<"cancel shrink job\n"; stop = true; cv_.notify_one(); // pthread_cancel(shrink_job.native_handle()); } LRUResponse query(size_t index, K *keys, size_t length, std::vector> &res) { return lru_pool[index].query(keys, length, res); } LRUResponse insert(size_t index, K *keys, V *data, size_t length) { return lru_pool[index].insert(keys, data, length); } int shrink() { int node_size = 0; std::string t = ""; for (size_t i = 0; i < lru_pool.size(); i++) { node_size += lru_pool[i].node_size; // t += std::to_string(i) + "->" + std::to_string(lru_pool[i].node_size) + // " "; } // std::cout<, std::greater> q; for (size_t i = 0; i < lru_pool.size(); i++) { if (lru_pool[i].node_size > 0) { global_count += lru_pool[i].node_size; q.push({lru_pool[i].node_head, &lru_pool[i]}); } } if (global_count > size_limit) { // std::cout<<"before shrinking cache, cached nodes count = // "<next; if (next) { q.push({next, remove_node.lru_pointer}); } global_count--; remove_node.lru_pointer->key_map.erase(remove_node.node->key); remove_node.lru_pointer->remove(remove_node.node, true); } // std::cout<<"after shrinking cache, cached nodes count = // "< int(1.5 * size_limit)) { // std::cout<<"global_count too large "<enqueue([this]() -> int { return shrink(); }); } } } size_t get_ttl() { return ttl; } private: pthread_rwlock_t rwlock; int global_count; size_t size_limit; size_t ttl; bool stop; std::thread shrink_job; std::vector> lru_pool; mutable std::mutex mutex_; std::condition_variable cv_; struct RemovedNode { LRUNode *node; RandomSampleLRU *lru_pointer; bool operator>(const RemovedNode &a) const { return node->ms > a.node->ms; } }; std::shared_ptr<::ThreadPool> thread_pool; friend class RandomSampleLRU; }; class GraphTable : public SparseTable { public: GraphTable() { use_cache = false; } virtual ~GraphTable() {} virtual int32_t pull_graph_list(int start, int size, std::unique_ptr &buffer, int &actual_size, bool need_feature, int step); virtual int32_t random_sample_neighbors( uint64_t *node_ids, int sample_size, std::vector> &buffers, std::vector &actual_sizes); int32_t random_sample_nodes(int sample_size, std::unique_ptr &buffers, int &actual_sizes); virtual int32_t get_nodes_ids_by_ranges( std::vector> ranges, std::vector &res); virtual int32_t initialize(); int32_t load(const std::string &path, const std::string ¶m); int32_t load_edges(const std::string &path, bool reverse); int32_t load_nodes(const std::string &path, std::string node_type); int32_t add_graph_node(std::vector &id_list, std::vector &is_weight_list); int32_t remove_graph_node(std::vector &id_list); int32_t get_server_index_by_id(uint64_t id); Node *find_node(uint64_t id); virtual int32_t pull_sparse(float *values, const PullSparseValue &pull_value) { return 0; } virtual int32_t push_sparse(const uint64_t *keys, const float *values, size_t num) { return 0; } virtual int32_t clear_nodes(); virtual void clear() {} virtual int32_t flush() { return 0; } virtual int32_t shrink(const std::string ¶m) { return 0; } //指定保存路径 virtual int32_t save(const std::string &path, const std::string &converter) { return 0; } virtual int32_t initialize_shard() { return 0; } virtual uint32_t get_thread_pool_index_by_shard_index(uint64_t shard_index); virtual uint32_t get_thread_pool_index(uint64_t node_id); virtual std::pair parse_feature(std::string feat_str); virtual int32_t get_node_feat(const std::vector &node_ids, const std::vector &feature_names, std::vector> &res); virtual int32_t set_node_feat( const std::vector &node_ids, const std::vector &feature_names, const std::vector> &res); size_t get_server_num() { return server_num; } virtual int32_t make_neighbor_sample_cache(size_t size_limit, size_t ttl) { { std::unique_lock lock(mutex_); if (use_cache == false) { scaled_lru.reset(new ScaledLRU( shard_end - shard_start, size_limit, ttl)); use_cache = true; } } return 0; } protected: std::vector shards; size_t shard_start, shard_end, server_num, shard_num_per_server, shard_num; const int task_pool_size_ = 24; const int random_sample_nodes_ranges = 3; std::vector feat_name; std::vector feat_dtype; std::vector feat_shape; std::unordered_map feat_id_map; std::string table_name; std::string table_type; std::vector> _shards_task_pool; std::vector> _shards_task_rng_pool; std::shared_ptr> scaled_lru; bool use_cache; mutable std::mutex mutex_; }; } // namespace distributed }; // namespace paddle namespace std { template <> struct hash { size_t operator()(const paddle::distributed::SampleKey &s) const { return s.node_key ^ s.sample_size; } }; }