Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
Paddle
提交
b2c9efef
P
Paddle
项目概览
BaiXuePrincess
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b2c9efef
编写于
11月 28, 2018
作者:
Q
Qiao Longfei
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add more unit test for lookup_remote_table
test=develop
上级
40f68b13
变更
1
显示空白变更内容
内联
并排
Showing
1 changed file
with
69 addition
and
13 deletion
+69
-13
python/paddle/fluid/tests/unittests/test_lookup_remote_table_op.py
...ddle/fluid/tests/unittests/test_lookup_remote_table_op.py
+69
-13
未找到文件。
python/paddle/fluid/tests/unittests/test_lookup_remote_table_op.py
浏览文件 @
b2c9efef
...
...
@@ -27,7 +27,7 @@ from paddle.fluid.op import Operator
from
paddle.fluid.framework
import
Program
,
program_guard
def
run_pserver
(
use_cuda
,
sync_mode
):
def
run_pserver
(
pserver_id
,
use_cuda
,
sync_mode
):
scope
=
fluid
.
core
.
Scope
()
program
=
Program
()
with
fluid
.
scope_guard
(
scope
):
...
...
@@ -36,7 +36,10 @@ def run_pserver(use_cuda, sync_mode):
place
=
fluid
.
CUDAPlace
(
0
)
if
use_cuda
else
fluid
.
CPUPlace
()
# create and initialize Param Variable
param
=
scope
.
var
(
'table'
).
get_tensor
()
param_array
=
np
.
full
((
10
,
8
),
5.0
).
astype
(
"float32"
)
param_array
=
np
.
ones
((
10
,
8
)).
astype
(
"float32"
)
for
i
in
range
(
len
(
param_array
)):
param_array
[
i
]
*=
param_array
[
i
]
*
i
+
pserver_id
*
10
param
.
set
(
param_array
,
place
)
optimize_block
=
program
.
_create_block
(
program
.
global_block
().
idx
)
...
...
@@ -60,8 +63,8 @@ class TestListenAndServOp(unittest.TestCase):
def
setUp
(
self
):
self
.
ps_timeout
=
5
def
_start_pserver
(
self
,
use_cuda
,
sync_mode
,
pserver_func
):
p
=
Process
(
target
=
pserver_func
,
args
=
(
use_cuda
,
sync_mode
))
def
_start_pserver
(
self
,
pserver_id
,
use_cuda
,
sync_mode
,
pserver_func
):
p
=
Process
(
target
=
pserver_func
,
args
=
(
pserver_id
,
use_cuda
,
sync_mode
))
p
.
daemon
=
True
p
.
start
()
return
p
...
...
@@ -85,7 +88,7 @@ class TestListenAndServOp(unittest.TestCase):
port
=
int
(
f
.
read
().
strip
())
return
port
def
_run_lookup_table_op
(
self
,
place
,
port
):
def
_run_lookup_table_op
_one_pserver
(
self
,
place
,
port
):
scope
=
fluid
.
core
.
Scope
()
program
=
Program
()
with
fluid
.
scope_guard
(
scope
):
...
...
@@ -96,15 +99,17 @@ class TestListenAndServOp(unittest.TestCase):
param
.
set
(
param_array
,
place
)
ids
=
scope
.
var
(
'Ids'
).
get_tensor
()
ids_array
=
np
.
array
([[
1
.0
],
[
2.0
]]).
astype
(
"int64"
)
ids_array
=
np
.
array
([[
1
],
[
2
],
[
5
]]).
astype
(
"int64"
)
ids
.
set
(
ids_array
,
place
)
ids
.
set_lod
([[
0
,
1
,
2
]])
ids_lod
=
[[
0
,
1
,
2
,
3
]]
ids
.
set_lod
(
ids_lod
)
out
=
scope
.
var
(
'Out'
).
get_tensor
()
emaps
=
[
'127.0.0.1:'
+
str
(
port
)]
table_names
=
[
'table'
]
height_sections
=
[
10
]
# create and run sgd operator
lookup_table_op
=
Operator
(
"lookup_table"
,
...
...
@@ -120,24 +125,75 @@ class TestListenAndServOp(unittest.TestCase):
# get and compare result
result_array
=
np
.
array
(
out
)
print
(
result_array
)
self
.
assertEqual
(
out
.
lod
(),
ids_lod
)
self
.
assertEqual
(
list
(
result_array
.
shape
),
[
len
(
ids_array
),
8
])
for
i
in
range
(
len
(
ids_array
)):
id
=
ids_array
[
i
][
0
]
self
.
assertTrue
((
result_array
[
i
]
==
id
).
all
())
self
.
assertTrue
((
result_array
[
0
]
==
5
).
all
())
self
.
assertTrue
((
result_array
[
0
]
==
5
).
all
())
def
_run_lookup_table_op_two_pserver
(
self
,
place
,
port0
,
port1
):
scope
=
fluid
.
core
.
Scope
()
program
=
Program
()
with
fluid
.
scope_guard
(
scope
):
with
program_guard
(
program
,
startup_program
=
Program
()):
# create and initialize Param Variable
param
=
scope
.
var
(
'W'
).
get_tensor
()
param_array
=
np
.
full
((
10
,
8
),
1.0
).
astype
(
"float32"
)
param
.
set
(
param_array
,
place
)
ids
=
scope
.
var
(
'Ids'
).
get_tensor
()
ids_array
=
np
.
array
([[
1
],
[
2
],
[
11
],
[
13
]]).
astype
(
"int64"
)
ids
.
set
(
ids_array
,
place
)
ids_lod
=
[[
0
,
2
,
3
,
4
]]
ids
.
set_lod
(
ids_lod
)
out
=
scope
.
var
(
'Out'
).
get_tensor
()
emaps
=
[
'127.0.0.1:'
+
str
(
port0
),
'127.0.0.1:'
+
str
(
port1
)]
table_names
=
[
'table'
,
'table'
]
height_sections
=
[
10
,
20
]
# create and run sgd operator
lookup_table_op
=
Operator
(
"lookup_table"
,
W
=
'W'
,
Ids
=
'Ids'
,
Out
=
'Out'
,
remote_prefetch
=
True
,
epmap
=
emaps
,
table_names
=
table_names
,
height_sections
=
height_sections
)
lookup_table_op
.
run
(
scope
,
place
)
# get and compare result
result_array
=
np
.
array
(
out
)
self
.
assertEqual
(
out
.
lod
(),
ids_lod
)
self
.
assertEqual
(
list
(
result_array
.
shape
),
[
len
(
ids_array
),
8
])
for
i
in
range
(
len
(
ids_array
)):
id
=
ids_array
[
i
][
0
]
self
.
assertTrue
((
result_array
[
i
]
==
id
).
all
())
def
test_lookup_remote_table
(
self
):
# run pserver on CPU in sync mode
p1
=
self
.
_start_pserver
(
False
,
True
,
run_pserver
)
p0
=
self
.
_start_pserver
(
0
,
False
,
True
,
run_pserver
)
self
.
_wait_ps_ready
(
p0
.
pid
)
port0
=
self
.
_get_pserver_port
(
p0
.
pid
)
p1
=
self
.
_start_pserver
(
1
,
False
,
True
,
run_pserver
)
self
.
_wait_ps_ready
(
p1
.
pid
)
port
=
self
.
_get_pserver_port
(
p1
.
pid
)
port
1
=
self
.
_get_pserver_port
(
p1
.
pid
)
places
=
[
core
.
CPUPlace
()]
# if core.is_compiled_with_cuda():
# places.append(core.CUDAPlace(0))
for
place
in
places
:
self
.
_run_lookup_table_op
(
place
,
port
)
self
.
_run_lookup_table_op_one_pserver
(
place
,
port0
)
self
.
_run_lookup_table_op_two_pserver
(
place
,
port0
,
port1
)
# raise SIGTERM to pserver
os
.
kill
(
p0
.
pid
,
signal
.
SIGINT
)
p0
.
join
()
os
.
kill
(
p1
.
pid
,
signal
.
SIGINT
)
p1
.
join
()
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录