Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
Paddle
提交
e91292c6
P
Paddle
项目概览
BaiXuePrincess
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
e91292c6
编写于
3月 28, 2022
作者:
S
seemingwang
提交者:
GitHub
3月 28, 2022
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
reduce graph-engine warnings (#41015)
* fix dependency * reduce warnings
上级
630f5b89
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
22 addition
and
31 deletion
+22
-31
paddle/fluid/distributed/ps/table/common_graph_table.cc
paddle/fluid/distributed/ps/table/common_graph_table.cc
+12
-19
paddle/fluid/distributed/ps/table/common_graph_table.h
paddle/fluid/distributed/ps/table/common_graph_table.h
+1
-6
paddle/fluid/distributed/test/graph_node_test.cc
paddle/fluid/distributed/test/graph_node_test.cc
+9
-6
未找到文件。
paddle/fluid/distributed/ps/table/common_graph_table.cc
浏览文件 @
e91292c6
...
...
@@ -73,7 +73,7 @@ int CompleteGraphSampler::run_graph_sampling() {
}
for
(
size_t
i
=
0
;
i
<
tasks
.
size
();
i
++
)
tasks
[
i
].
get
();
tasks
.
clear
();
for
(
size_
t
i
=
0
;
i
<
gpu_num
;
i
++
)
{
for
(
in
t
i
=
0
;
i
<
gpu_num
;
i
++
)
{
tasks
.
push_back
(
graph_table
->
_shards_task_pool
[
i
%
graph_table
->
task_pool_size_
]
->
enqueue
([
&
,
i
,
this
]()
->
int
{
...
...
@@ -101,7 +101,7 @@ int CompleteGraphSampler::run_graph_sampling() {
pthread_rwlock_unlock
(
rw_lock
);
return
0
;
}
for
(
size_
t
i
=
0
;
i
<
gpu_num
;
i
++
)
{
for
(
in
t
i
=
0
;
i
<
gpu_num
;
i
++
)
{
sample_res
[
i
].
node_list
=
sample_nodes
[
i
].
data
();
sample_res
[
i
].
neighbor_list
=
sample_neighbors
[
i
].
data
();
sample_res
[
i
].
node_size
=
sample_nodes
[
i
].
size
();
...
...
@@ -136,8 +136,7 @@ int BasicBfsGraphSampler::run_graph_sampling() {
int
task_size
=
0
;
std
::
vector
<
std
::
future
<
int
>>
tasks
;
int
init_size
=
0
;
//__sync_fetch_and_add
std
::
function
<
int
(
int
,
int64_t
)
>
bfs
=
[
&
,
this
](
int
i
,
int
id
)
->
int
{
std
::
function
<
int
(
int
,
int64_t
)
>
bfs
=
[
&
,
this
](
int
i
,
int64_t
id
)
->
int
{
if
(
this
->
status
==
GraphSamplerStatus
::
terminating
)
{
int
task_left
=
__sync_sub_and_fetch
(
&
task_size
,
1
);
if
(
task_left
==
0
)
{
...
...
@@ -196,7 +195,7 @@ int BasicBfsGraphSampler::run_graph_sampling() {
pthread_rwlock_unlock
(
rw_lock
);
return
0
;
}
std
::
cout
<<
"bfs over"
<<
std
::
endl
;
VLOG
(
0
)
<<
"BasicBfsGraphSampler finishes the graph searching task"
;
sample_nodes
.
clear
();
sample_neighbors
.
clear
();
sample_res
.
clear
();
...
...
@@ -244,7 +243,7 @@ int BasicBfsGraphSampler::run_graph_sampling() {
pthread_rwlock_unlock
(
rw_lock
);
return
0
;
}
for
(
size_t
i
=
0
;
i
<
gpu_num
;
i
++
)
{
for
(
size_t
i
=
0
;
i
<
(
size_t
)
gpu_num
;
i
++
)
{
tasks
.
push_back
(
graph_table
->
_shards_task_pool
[
i
%
graph_table
->
task_pool_size_
]
->
enqueue
([
&
,
i
,
this
]()
->
int
{
...
...
@@ -253,19 +252,15 @@ int BasicBfsGraphSampler::run_graph_sampling() {
return
0
;
}
int
total_offset
=
0
;
size_t
ind
=
i
%
graph_table
->
task_pool_size_
;
for
(
int
j
=
0
;
j
<
this
->
graph_table
->
task_pool_size_
;
j
++
)
{
for
(
size_t
k
=
0
;
k
<
sample_nodes_ex
[
j
][
i
nd
].
size
();
k
++
)
{
sample_nodes
[
i
].
push_back
(
sample_nodes_ex
[
j
][
i
nd
][
k
]);
for
(
size_t
k
=
0
;
k
<
sample_nodes_ex
[
j
][
i
].
size
();
k
++
)
{
sample_nodes
[
i
].
push_back
(
sample_nodes_ex
[
j
][
i
][
k
]);
sample_nodes
[
i
].
back
().
neighbor_offset
+=
total_offset
;
// neighbor_offset[i].push_back(total_offset +
// neighbor_offset_ex[j][i][k]);
}
size_t
neighbor_size
=
sample_neighbors_ex
[
j
][
i
nd
].
size
();
size_t
neighbor_size
=
sample_neighbors_ex
[
j
][
i
].
size
();
total_offset
+=
neighbor_size
;
for
(
size_t
k
=
0
;
k
<
neighbor_size
;
k
++
)
{
sample_neighbors
[
ind
].
push_back
(
sample_neighbors_ex
[
j
][
ind
][
k
]);
sample_neighbors
[
i
].
push_back
(
sample_neighbors_ex
[
j
][
i
][
k
]);
}
}
return
0
;
...
...
@@ -276,9 +271,7 @@ int BasicBfsGraphSampler::run_graph_sampling() {
pthread_rwlock_unlock
(
rw_lock
);
return
0
;
}
// int64_t total_neighbors =
// std::accumulate(shard_neighbor_size.begin(),shard_neighbor_size.end(),0);
for
(
size_t
i
=
0
;
i
<
gpu_num
;
i
++
)
{
for
(
int
i
=
0
;
i
<
gpu_num
;
i
++
)
{
sample_res
[
i
].
node_list
=
sample_nodes
[
i
].
data
();
sample_res
[
i
].
neighbor_list
=
sample_neighbors
[
i
].
data
();
sample_res
[
i
].
node_size
=
sample_nodes
[
i
].
size
();
...
...
@@ -683,7 +676,7 @@ int32_t GraphTable::load_edges(const std::string &path, bool reverse_edge) {
sort
(
index
.
begin
(),
index
.
end
(),
[
&
](
int
&
a
,
int
&
b
)
{
return
has_alloc
[
a
]
-
alloc
[
a
]
<
has_alloc
[
b
]
-
alloc
[
b
];
});
int
left
=
0
,
right
=
index
.
size
()
-
1
;
int
left
=
0
,
right
=
(
int
)
index
.
size
()
-
1
;
while
(
left
<
right
)
{
if
(
has_alloc
[
index
[
right
]]
-
alloc
[
index
[
right
]]
==
0
)
break
;
int
x
=
std
::
min
(
alloc
[
index
[
left
]]
-
has_alloc
[
index
[
left
]],
...
...
@@ -1152,7 +1145,7 @@ int32_t GraphTable::initialize(const GraphParameter &graph) {
shard_end
=
shard_start
+
shard_num_per_server
;
VLOG
(
0
)
<<
"in init graph table shard idx = "
<<
_shard_idx
<<
" shard_start "
<<
shard_start
<<
" shard_end "
<<
shard_end
;
for
(
in
t
i
=
0
;
i
<
shard_num_per_server
;
i
++
)
{
for
(
size_
t
i
=
0
;
i
<
shard_num_per_server
;
i
++
)
{
shards
.
push_back
(
new
GraphShard
());
}
use_duplicate_nodes
=
false
;
...
...
paddle/fluid/distributed/ps/table/common_graph_table.h
浏览文件 @
e91292c6
...
...
@@ -204,7 +204,7 @@ class RandomSampleLRU {
}
void
process_redundant
(
int
process_size
)
{
size_
t
length
=
std
::
min
(
remove_count
,
process_size
);
in
t
length
=
std
::
min
(
remove_count
,
process_size
);
while
(
length
--
)
{
remove
(
node_head
);
remove_count
--
;
...
...
@@ -306,12 +306,10 @@ class ScaledLRU {
if
((
size_t
)
node_size
<=
size_t
(
1.1
*
size_limit
)
+
1
)
return
0
;
if
(
pthread_rwlock_wrlock
(
&
rwlock
)
==
0
)
{
// VLOG(0)<"in shrink\n";
global_count
=
0
;
for
(
size_t
i
=
0
;
i
<
lru_pool
.
size
();
i
++
)
{
global_count
+=
lru_pool
[
i
].
node_size
-
lru_pool
[
i
].
remove_count
;
}
// VLOG(0)<<"global_count "<<global_count<<"\n";
if
((
size_t
)
global_count
>
size_limit
)
{
size_t
remove
=
global_count
-
size_limit
;
for
(
size_t
i
=
0
;
i
<
lru_pool
.
size
();
i
++
)
{
...
...
@@ -319,7 +317,6 @@ class ScaledLRU {
lru_pool
[
i
].
remove_count
+=
1.0
*
(
lru_pool
[
i
].
node_size
-
lru_pool
[
i
].
remove_count
)
/
global_count
*
remove
;
// VLOG(0)<<i<<" "<<lru_pool[i].remove_count<<std::endl;
}
}
pthread_rwlock_unlock
(
&
rwlock
);
...
...
@@ -332,8 +329,6 @@ class ScaledLRU {
if
(
diff
!=
0
)
{
__sync_fetch_and_add
(
&
global_count
,
diff
);
if
(
global_count
>
int
(
1.25
*
size_limit
))
{
// VLOG(0)<<"global_count too large "<<global_count<<" enter start
// shrink task\n";
thread_pool
->
enqueue
([
this
]()
->
int
{
return
shrink
();
});
}
}
...
...
paddle/fluid/distributed/test/graph_node_test.cc
浏览文件 @
e91292c6
...
...
@@ -649,11 +649,12 @@ void testCache() {
ASSERT_EQ
((
int
)
r
.
size
(),
0
);
st
.
insert
(
0
,
&
skey
,
result
,
1
);
for
(
in
t
i
=
0
;
i
<
st
.
get_ttl
();
i
++
)
{
for
(
size_
t
i
=
0
;
i
<
st
.
get_ttl
();
i
++
)
{
st
.
query
(
0
,
&
skey
,
1
,
r
);
ASSERT_EQ
((
int
)
r
.
size
(),
1
);
char
*
p
=
(
char
*
)
r
[
0
].
second
.
buffer
.
get
();
for
(
int
j
=
0
;
j
<
r
[
0
].
second
.
actual_size
;
j
++
)
ASSERT_EQ
(
p
[
j
],
str
[
j
]);
for
(
size_t
j
=
0
;
j
<
r
[
0
].
second
.
actual_size
;
j
++
)
ASSERT_EQ
(
p
[
j
],
str
[
j
]);
r
.
clear
();
}
st
.
query
(
0
,
&
skey
,
1
,
r
);
...
...
@@ -662,22 +663,24 @@ void testCache() {
strcpy
(
str
,
"54321678"
);
result
=
new
::
paddle
::
distributed
::
SampleResult
(
strlen
(
str
),
str
);
st
.
insert
(
0
,
&
skey
,
result
,
1
);
for
(
in
t
i
=
0
;
i
<
st
.
get_ttl
()
/
2
;
i
++
)
{
for
(
size_
t
i
=
0
;
i
<
st
.
get_ttl
()
/
2
;
i
++
)
{
st
.
query
(
0
,
&
skey
,
1
,
r
);
ASSERT_EQ
((
int
)
r
.
size
(),
1
);
char
*
p
=
(
char
*
)
r
[
0
].
second
.
buffer
.
get
();
for
(
int
j
=
0
;
j
<
r
[
0
].
second
.
actual_size
;
j
++
)
ASSERT_EQ
(
p
[
j
],
str
[
j
]);
for
(
size_t
j
=
0
;
j
<
r
[
0
].
second
.
actual_size
;
j
++
)
ASSERT_EQ
(
p
[
j
],
str
[
j
]);
r
.
clear
();
}
str
=
new
char
[
18
];
strcpy
(
str
,
"343332d4321"
);
result
=
new
::
paddle
::
distributed
::
SampleResult
(
strlen
(
str
),
str
);
st
.
insert
(
0
,
&
skey
,
result
,
1
);
for
(
in
t
i
=
0
;
i
<
st
.
get_ttl
();
i
++
)
{
for
(
size_
t
i
=
0
;
i
<
st
.
get_ttl
();
i
++
)
{
st
.
query
(
0
,
&
skey
,
1
,
r
);
ASSERT_EQ
((
int
)
r
.
size
(),
1
);
char
*
p
=
(
char
*
)
r
[
0
].
second
.
buffer
.
get
();
for
(
int
j
=
0
;
j
<
r
[
0
].
second
.
actual_size
;
j
++
)
ASSERT_EQ
(
p
[
j
],
str
[
j
]);
for
(
size_t
j
=
0
;
j
<
r
[
0
].
second
.
actual_size
;
j
++
)
ASSERT_EQ
(
p
[
j
],
str
[
j
]);
r
.
clear
();
}
st
.
query
(
0
,
&
skey
,
1
,
r
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录