diff --git a/paddle/fluid/distributed/ps/table/common_graph_table.cc b/paddle/fluid/distributed/ps/table/common_graph_table.cc index b326870a3a7b0e7d28417b8d18d002e195fa9d54..dcce46270d02671d2f26ab20e5ac127a72b1a636 100644 --- a/paddle/fluid/distributed/ps/table/common_graph_table.cc +++ b/paddle/fluid/distributed/ps/table/common_graph_table.cc @@ -73,7 +73,7 @@ int CompleteGraphSampler::run_graph_sampling() { } for (size_t i = 0; i < tasks.size(); i++) tasks[i].get(); tasks.clear(); - for (size_t i = 0; i < gpu_num; i++) { + for (int i = 0; i < gpu_num; i++) { tasks.push_back( graph_table->_shards_task_pool[i % graph_table->task_pool_size_] ->enqueue([&, i, this]() -> int { @@ -101,7 +101,7 @@ int CompleteGraphSampler::run_graph_sampling() { pthread_rwlock_unlock(rw_lock); return 0; } - for (size_t i = 0; i < gpu_num; i++) { + for (int i = 0; i < gpu_num; i++) { sample_res[i].node_list = sample_nodes[i].data(); sample_res[i].neighbor_list = sample_neighbors[i].data(); sample_res[i].node_size = sample_nodes[i].size(); @@ -136,8 +136,7 @@ int BasicBfsGraphSampler::run_graph_sampling() { int task_size = 0; std::vector> tasks; int init_size = 0; - //__sync_fetch_and_add - std::function bfs = [&, this](int i, int id) -> int { + std::function bfs = [&, this](int i, int64_t id) -> int { if (this->status == GraphSamplerStatus::terminating) { int task_left = __sync_sub_and_fetch(&task_size, 1); if (task_left == 0) { @@ -196,7 +195,7 @@ int BasicBfsGraphSampler::run_graph_sampling() { pthread_rwlock_unlock(rw_lock); return 0; } - std::cout << "bfs over" << std::endl; + VLOG(0) << "BasicBfsGraphSampler finishes the graph searching task"; sample_nodes.clear(); sample_neighbors.clear(); sample_res.clear(); @@ -244,7 +243,7 @@ int BasicBfsGraphSampler::run_graph_sampling() { pthread_rwlock_unlock(rw_lock); return 0; } - for (size_t i = 0; i < gpu_num; i++) { + for (size_t i = 0; i < (size_t)gpu_num; i++) { tasks.push_back( graph_table->_shards_task_pool[i % graph_table->task_pool_size_] ->enqueue([&, i, this]() -> int { @@ -253,19 +252,15 @@ int BasicBfsGraphSampler::run_graph_sampling() { return 0; } int total_offset = 0; - size_t ind = i % graph_table->task_pool_size_; for (int j = 0; j < this->graph_table->task_pool_size_; j++) { - for (size_t k = 0; k < sample_nodes_ex[j][ind].size(); k++) { - sample_nodes[i].push_back(sample_nodes_ex[j][ind][k]); + for (size_t k = 0; k < sample_nodes_ex[j][i].size(); k++) { + sample_nodes[i].push_back(sample_nodes_ex[j][i][k]); sample_nodes[i].back().neighbor_offset += total_offset; - // neighbor_offset[i].push_back(total_offset + - // neighbor_offset_ex[j][i][k]); } - size_t neighbor_size = sample_neighbors_ex[j][ind].size(); + size_t neighbor_size = sample_neighbors_ex[j][i].size(); total_offset += neighbor_size; for (size_t k = 0; k < neighbor_size; k++) { - sample_neighbors[ind].push_back( - sample_neighbors_ex[j][ind][k]); + sample_neighbors[i].push_back(sample_neighbors_ex[j][i][k]); } } return 0; @@ -276,9 +271,7 @@ int BasicBfsGraphSampler::run_graph_sampling() { pthread_rwlock_unlock(rw_lock); return 0; } - // int64_t total_neighbors = - // std::accumulate(shard_neighbor_size.begin(),shard_neighbor_size.end(),0); - for (size_t i = 0; i < gpu_num; i++) { + for (int i = 0; i < gpu_num; i++) { sample_res[i].node_list = sample_nodes[i].data(); sample_res[i].neighbor_list = sample_neighbors[i].data(); sample_res[i].node_size = sample_nodes[i].size(); @@ -683,7 +676,7 @@ int32_t GraphTable::load_edges(const std::string &path, bool reverse_edge) { sort(index.begin(), index.end(), [&](int &a, int &b) { return has_alloc[a] - alloc[a] < has_alloc[b] - alloc[b]; }); - int left = 0, right = index.size() - 1; + int left = 0, right = (int)index.size() - 1; while (left < right) { if (has_alloc[index[right]] - alloc[index[right]] == 0) break; int x = std::min(alloc[index[left]] - has_alloc[index[left]], @@ -1152,7 +1145,7 @@ int32_t GraphTable::initialize(const GraphParameter &graph) { shard_end = shard_start + shard_num_per_server; VLOG(0) << "in init graph table shard idx = " << _shard_idx << " shard_start " << shard_start << " shard_end " << shard_end; - for (int i = 0; i < shard_num_per_server; i++) { + for (size_t i = 0; i < shard_num_per_server; i++) { shards.push_back(new GraphShard()); } use_duplicate_nodes = false; diff --git a/paddle/fluid/distributed/ps/table/common_graph_table.h b/paddle/fluid/distributed/ps/table/common_graph_table.h index 4c97cea23eaa277a81538e9c8aeacd6478bc9c51..72600b42b828247a84a344a606cbc08fe8e2b3ef 100644 --- a/paddle/fluid/distributed/ps/table/common_graph_table.h +++ b/paddle/fluid/distributed/ps/table/common_graph_table.h @@ -204,7 +204,7 @@ class RandomSampleLRU { } void process_redundant(int process_size) { - size_t length = std::min(remove_count, process_size); + int length = std::min(remove_count, process_size); while (length--) { remove(node_head); remove_count--; @@ -306,12 +306,10 @@ class ScaledLRU { if ((size_t)node_size <= size_t(1.1 * size_limit) + 1) return 0; if (pthread_rwlock_wrlock(&rwlock) == 0) { - // VLOG(0)<"in shrink\n"; global_count = 0; for (size_t i = 0; i < lru_pool.size(); i++) { global_count += lru_pool[i].node_size - lru_pool[i].remove_count; } - // VLOG(0)<<"global_count "< size_limit) { size_t remove = global_count - size_limit; for (size_t i = 0; i < lru_pool.size(); i++) { @@ -319,7 +317,6 @@ class ScaledLRU { lru_pool[i].remove_count += 1.0 * (lru_pool[i].node_size - lru_pool[i].remove_count) / global_count * remove; - // VLOG(0)< int(1.25 * size_limit)) { - // VLOG(0)<<"global_count too large "<enqueue([this]() -> int { return shrink(); }); } } diff --git a/paddle/fluid/distributed/test/graph_node_test.cc b/paddle/fluid/distributed/test/graph_node_test.cc index a3f3c48581d6195569747cdf03ac389979caf7df..e55d39cd4834d425025a8084eb88982ef543a6f1 100644 --- a/paddle/fluid/distributed/test/graph_node_test.cc +++ b/paddle/fluid/distributed/test/graph_node_test.cc @@ -649,11 +649,12 @@ void testCache() { ASSERT_EQ((int)r.size(), 0); st.insert(0, &skey, result, 1); - for (int i = 0; i < st.get_ttl(); i++) { + for (size_t i = 0; i < st.get_ttl(); i++) { st.query(0, &skey, 1, r); ASSERT_EQ((int)r.size(), 1); char* p = (char*)r[0].second.buffer.get(); - for (int j = 0; j < r[0].second.actual_size; j++) ASSERT_EQ(p[j], str[j]); + for (size_t j = 0; j < r[0].second.actual_size; j++) + ASSERT_EQ(p[j], str[j]); r.clear(); } st.query(0, &skey, 1, r); @@ -662,22 +663,24 @@ void testCache() { strcpy(str, "54321678"); result = new ::paddle::distributed::SampleResult(strlen(str), str); st.insert(0, &skey, result, 1); - for (int i = 0; i < st.get_ttl() / 2; i++) { + for (size_t i = 0; i < st.get_ttl() / 2; i++) { st.query(0, &skey, 1, r); ASSERT_EQ((int)r.size(), 1); char* p = (char*)r[0].second.buffer.get(); - for (int j = 0; j < r[0].second.actual_size; j++) ASSERT_EQ(p[j], str[j]); + for (size_t j = 0; j < r[0].second.actual_size; j++) + ASSERT_EQ(p[j], str[j]); r.clear(); } str = new char[18]; strcpy(str, "343332d4321"); result = new ::paddle::distributed::SampleResult(strlen(str), str); st.insert(0, &skey, result, 1); - for (int i = 0; i < st.get_ttl(); i++) { + for (size_t i = 0; i < st.get_ttl(); i++) { st.query(0, &skey, 1, r); ASSERT_EQ((int)r.size(), 1); char* p = (char*)r[0].second.buffer.get(); - for (int j = 0; j < r[0].second.actual_size; j++) ASSERT_EQ(p[j], str[j]); + for (size_t j = 0; j < r[0].second.actual_size; j++) + ASSERT_EQ(p[j], str[j]); r.clear(); } st.query(0, &skey, 1, r);