未验证 提交 f584d378 编写于 作者: S seemingwang 提交者: GitHub

reduce graph-engine cache memory (#37155)

* graph engine demo

* upload unsaved changes

* fix dependency error

* fix shard_num problem

* py client

* remove lock and graph-type

* add load direct graph

* add load direct graph

* add load direct graph

* batch random_sample

* batch_sample_k

* fix num_nodes size

* batch brpc

* batch brpc

* add test

* add test

* add load_nodes; change add_node function

* change sample return type to pair

* resolve conflict

* resolved conflict

* resolved conflict

* separate server and client

* merge pair type

* fix

* resolved conflict

* fixed segment fault; high-level VLOG for load edges and load nodes

* random_sample return 0

* rm useless loop

* test:load edge

* fix ret -1

* test: rm sample

* rm sample

* random_sample return future

* random_sample return int

* test fake node

* fixed here

* memory leak

* remove test code

* fix return problem

* add common_graph_table

* random sample node &test & change data-structure from linkedList to vector

* add common_graph_table

* sample with srand

* add node_types

* optimize nodes sample

* recover test

* random sample

* destruct weighted sampler

* GraphEdgeBlob

* WeightedGraphEdgeBlob to GraphEdgeBlob

* WeightedGraphEdgeBlob to GraphEdgeBlob

* pybind sample nodes api

* pull nodes with step

* fixed pull_graph_list bug; add test for pull_graph_list by step

* add graph table;name

* add graph table;name

* add pybind

* add pybind

* add FeatureNode

* add FeatureNode

* add FeatureNode Serialize

* add FeatureNode Serialize

* get_feat_node

* avoid local rpc

* fix get_node_feat

* fix get_node_feat

* remove log

* get_node_feat return  py:bytes

* merge develop with graph_engine

* fix threadpool.h head

* fix

* fix typo

* resolve conflict

* fix conflict

* recover lost content

* fix pybind of FeatureNode

* recover cmake

* recover tools

* resolve conflict

* resolve linking problem

* code style

* change test_server port

* fix code problems

* remove shard_num config

* remove redundent threads

* optimize start server

* remove logs

* fix code problems by reviewers' suggestions

* move graph files into a folder

* code style change

* remove graph operations from base table

* optimize get_feat function of graph engine

* fix long long count problem

* remove redandunt graph files

* remove unused shell

* recover dropout_op_pass.h

* fix potential stack overflow when request number is too large & node add & node clear & node remove

* when sample k is larger than neigbor num, return directly

* using random seed generator of paddle to speed up

* fix bug of random sample k

* fix code style

* fix code style

* add remove graph to fleet_py.cc

* fix blocking_queue problem

* fix style

* fix

* recover capacity check

* add remove graph node; add set_feature

* add remove graph node; add set_feature

* add remove graph node; add set_feature

* add remove graph node; add set_feature

* fix distributed op combining problems

* optimize

* remove logs

* fix MultiSlotDataGenerator error

* cache for graph engine

* fix type compare error

* more test&fix thread terminating problem

* remove header

* change time interval of shrink

* use cache when sample nodes

* remove unused function

* change unique_ptr to shared_ptr

* simplify cache template

* cache api on client

* fix

* reduce sample threads when cache is not used

* reduce cache memory
Co-authored-by: NHuang Zhengjie <270018958@qq.com>
Co-authored-by: NWeiyue Su <weiyue.su@gmail.com>
Co-authored-by: Nsuweiyue <suweiyue@baidu.com>
Co-authored-by: Nluobin06 <luobin06@baidu.com>
Co-authored-by: Nliweibin02 <liweibin02@baidu.com>
Co-authored-by: Ntangwei12 <tangwei12@baidu.com>
上级 0fc9919b
...@@ -123,6 +123,8 @@ class RandomSampleLRU { ...@@ -123,6 +123,8 @@ class RandomSampleLRU {
node_size = 0; node_size = 0;
node_head = node_end = NULL; node_head = node_end = NULL;
global_ttl = father->ttl; global_ttl = father->ttl;
extra_penalty = 0;
size_limit = (father->size_limit / father->shard_num + 1);
} }
~RandomSampleLRU() { ~RandomSampleLRU() {
...@@ -138,16 +140,16 @@ class RandomSampleLRU { ...@@ -138,16 +140,16 @@ class RandomSampleLRU {
return LRUResponse::blocked; return LRUResponse::blocked;
int init_node_size = node_size; int init_node_size = node_size;
try { try {
// pthread_rwlock_rdlock(&father->rwlock);
for (size_t i = 0; i < length; i++) { for (size_t i = 0; i < length; i++) {
auto iter = key_map.find(keys[i]); auto iter = key_map.find(keys[i]);
if (iter != key_map.end()) { if (iter != key_map.end()) {
res.emplace_back(keys[i], iter->second->data); res.emplace_back(keys[i], iter->second->data);
iter->second->ttl--; iter->second->ttl--;
if (iter->second->ttl == 0) { if (iter->second->ttl == 0) {
remove(iter->second, true);
} else {
remove(iter->second); remove(iter->second);
add_to_tail(iter->second); } else {
move_to_tail(iter->second);
} }
} }
} }
...@@ -168,14 +170,12 @@ class RandomSampleLRU { ...@@ -168,14 +170,12 @@ class RandomSampleLRU {
for (size_t i = 0; i < length; i++) { for (size_t i = 0; i < length; i++) {
auto iter = key_map.find(keys[i]); auto iter = key_map.find(keys[i]);
if (iter != key_map.end()) { if (iter != key_map.end()) {
move_to_tail(iter->second);
iter->second->ttl = global_ttl; iter->second->ttl = global_ttl;
remove(iter->second);
add_to_tail(iter->second);
iter->second->data = data[i]; iter->second->data = data[i];
} else { } else {
LRUNode<K, V> *temp = new LRUNode<K, V>(keys[i], data[i], global_ttl); LRUNode<K, V> *temp = new LRUNode<K, V>(keys[i], data[i], global_ttl);
add_to_tail(temp); add_new(temp);
key_map[keys[i]] = temp;
} }
} }
} catch (...) { } catch (...) {
...@@ -187,25 +187,34 @@ class RandomSampleLRU { ...@@ -187,25 +187,34 @@ class RandomSampleLRU {
father->handle_size_diff(node_size - init_node_size); father->handle_size_diff(node_size - init_node_size);
return LRUResponse::ok; return LRUResponse::ok;
} }
void remove(LRUNode<K, V> *node, bool del = false) { void remove(LRUNode<K, V> *node) {
if (node->pre) { fetch(node);
node->pre->next = node->next;
} else {
node_head = node->next;
}
if (node->next) {
node->next->pre = node->pre;
} else {
node_end = node->pre;
}
node_size--; node_size--;
if (del) {
delete node;
key_map.erase(node->key); key_map.erase(node->key);
delete node;
if (node_size >= size_limit) {
extra_penalty -= 1.0;
} }
} }
void add_to_tail(LRUNode<K, V> *node) { void move_to_tail(LRUNode<K, V> *node) {
fetch(node);
place_at_tail(node);
}
void add_new(LRUNode<K, V> *node) {
node->ttl = global_ttl;
place_at_tail(node);
node_size++;
key_map[node->key] = node;
if (node_size > size_limit) {
extra_penalty += penalty_inc;
if (extra_penalty >= 1.0) {
remove(node_head);
}
}
}
void place_at_tail(LRUNode<K, V> *node) {
if (node_end == NULL) { if (node_end == NULL) {
node_head = node_end = node; node_head = node_end = node;
node->next = node->pre = NULL; node->next = node->pre = NULL;
...@@ -215,25 +224,40 @@ class RandomSampleLRU { ...@@ -215,25 +224,40 @@ class RandomSampleLRU {
node->next = NULL; node->next = NULL;
node_end = node; node_end = node;
} }
node_size++;
node->ms = std::chrono::duration_cast<std::chrono::milliseconds>( node->ms = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()); std::chrono::system_clock::now().time_since_epoch());
} }
void fetch(LRUNode<K, V> *node) {
if (node->pre) {
node->pre->next = node->next;
} else {
node_head = node->next;
}
if (node->next) {
node->next->pre = node->pre;
} else {
node_end = node->pre;
}
}
private: private:
std::unordered_map<K, LRUNode<K, V> *> key_map; std::unordered_map<K, LRUNode<K, V> *> key_map;
ScaledLRU<K, V> *father; ScaledLRU<K, V> *father;
size_t global_ttl; size_t global_ttl, size_limit;
int node_size; int node_size;
LRUNode<K, V> *node_head, *node_end; LRUNode<K, V> *node_head, *node_end;
friend class ScaledLRU<K, V>; friend class ScaledLRU<K, V>;
float extra_penalty;
const float penalty_inc = 0.75;
}; };
template <typename K, typename V> template <typename K, typename V>
class ScaledLRU { class ScaledLRU {
public: public:
ScaledLRU(size_t shard_num, size_t size_limit, size_t _ttl) ScaledLRU(size_t _shard_num, size_t size_limit, size_t _ttl)
: size_limit(size_limit), ttl(_ttl) { : size_limit(size_limit), ttl(_ttl) {
shard_num = _shard_num;
pthread_rwlock_init(&rwlock, NULL); pthread_rwlock_init(&rwlock, NULL);
stop = false; stop = false;
thread_pool.reset(new ::ThreadPool(1)); thread_pool.reset(new ::ThreadPool(1));
...@@ -244,12 +268,11 @@ class ScaledLRU { ...@@ -244,12 +268,11 @@ class ScaledLRU {
while (true) { while (true) {
{ {
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
cv_.wait_for(lock, std::chrono::milliseconds(3000)); cv_.wait_for(lock, std::chrono::milliseconds(20000));
if (stop) { if (stop) {
return; return;
} }
} }
auto status = auto status =
thread_pool->enqueue([this]() -> int { return shrink(); }); thread_pool->enqueue([this]() -> int { return shrink(); });
status.wait(); status.wait();
...@@ -271,12 +294,11 @@ class ScaledLRU { ...@@ -271,12 +294,11 @@ class ScaledLRU {
} }
int shrink() { int shrink() {
int node_size = 0; int node_size = 0;
std::string t = "";
for (size_t i = 0; i < lru_pool.size(); i++) { for (size_t i = 0; i < lru_pool.size(); i++) {
node_size += lru_pool[i].node_size; node_size += lru_pool[i].node_size;
} }
if (node_size <= size_limit) return 0; if (node_size <= 1.2 * size_limit) return 0;
if (pthread_rwlock_wrlock(&rwlock) == 0) { if (pthread_rwlock_wrlock(&rwlock) == 0) {
try { try {
global_count = 0; global_count = 0;
...@@ -301,14 +323,16 @@ class ScaledLRU { ...@@ -301,14 +323,16 @@ class ScaledLRU {
q.push({next, remove_node.lru_pointer}); q.push({next, remove_node.lru_pointer});
} }
global_count--; global_count--;
remove_node.lru_pointer->key_map.erase(remove_node.node->key); remove_node.lru_pointer->remove(remove_node.node);
remove_node.lru_pointer->remove(remove_node.node, true); }
for (size_t i = 0; i < lru_pool.size(); i++) {
lru_pool[i].size_limit = lru_pool[i].node_size;
lru_pool[i].extra_penalty = 0;
} }
// VLOG(0)<<"after shrinking cache, cached nodes count = // VLOG(0)<<"after shrinking cache, cached nodes count =
// "<<global_count<<std::endl; // // "<<global_count<<std::endl;
} }
} catch (...) { } catch (...) {
// VLOG(0) << "shrink cache failed"<<std::endl;
pthread_rwlock_unlock(&rwlock); pthread_rwlock_unlock(&rwlock);
return -1; return -1;
} }
...@@ -320,7 +344,7 @@ class ScaledLRU { ...@@ -320,7 +344,7 @@ class ScaledLRU {
void handle_size_diff(int diff) { void handle_size_diff(int diff) {
if (diff != 0) { if (diff != 0) {
__sync_fetch_and_add(&global_count, diff); __sync_fetch_and_add(&global_count, diff);
if (global_count > int(1.5 * size_limit)) { if (global_count > int(1.25 * size_limit)) {
// VLOG(0)<<"global_count too large "<<global_count<<" enter start // VLOG(0)<<"global_count too large "<<global_count<<" enter start
// shrink task\n"; // shrink task\n";
thread_pool->enqueue([this]() -> int { return shrink(); }); thread_pool->enqueue([this]() -> int { return shrink(); });
...@@ -332,6 +356,7 @@ class ScaledLRU { ...@@ -332,6 +356,7 @@ class ScaledLRU {
private: private:
pthread_rwlock_t rwlock; pthread_rwlock_t rwlock;
size_t shard_num;
int global_count; int global_count;
size_t size_limit; size_t size_limit;
size_t ttl; size_t ttl;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册