Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
机器未来
Paddle
提交
add4b466
P
Paddle
项目概览
机器未来
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
add4b466
编写于
11月 01, 2018
作者:
Q
Qiao Longfei
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
dist table only handle is_distributed table
上级
d186e743
变更
2
显示空白变更内容
内联
并排
Showing
2 changed file
with
68 addition
and
43 deletion
+68
-43
python/paddle/fluid/tests/unittests/test_dist_transpiler.py
python/paddle/fluid/tests/unittests/test_dist_transpiler.py
+64
-42
python/paddle/fluid/transpiler/distribute_transpiler.py
python/paddle/fluid/transpiler/distribute_transpiler.py
+4
-1
未找到文件。
python/paddle/fluid/tests/unittests/test_dist_transpiler.py
浏览文件 @
add4b466
...
...
@@ -411,12 +411,12 @@ class TestDistLookupTableBase(TranspilerTest):
self
.
emb_size
=
64
self
.
lookup_table_name
=
'shared_w'
def
emb_pool
(
ids
):
def
emb_pool
(
ids
,
table_name
,
is_distributed
):
emb
=
fluid
.
layers
.
embedding
(
input
=
ids
,
size
=
[
self
.
table_size
,
self
.
emb_size
],
dtype
=
'float32'
,
param_attr
=
self
.
lookup_table_name
,
# share parameter
param_attr
=
table_name
,
is_sparse
=
is_sparse
,
is_distributed
=
is_distributed
)
pool
=
fluid
.
layers
.
sequence_pool
(
input
=
emb
,
pool_type
=
'average'
)
...
...
@@ -426,9 +426,12 @@ class TestDistLookupTableBase(TranspilerTest):
name
=
'title_ids'
,
shape
=
[
1
],
dtype
=
'int64'
,
lod_level
=
1
)
brand_ids
=
fluid
.
layers
.
data
(
name
=
'brand_ids'
,
shape
=
[
1
],
dtype
=
'int64'
,
lod_level
=
1
)
title_emb
=
emb_pool
(
title_ids
)
brand_emb
=
emb_pool
(
brand_ids
)
fc0
=
fluid
.
layers
.
concat
(
input
=
[
title_emb
,
brand_emb
],
axis
=
1
)
profile_ids
=
fluid
.
layers
.
data
(
name
=
'brand_ids'
,
shape
=
[
1
],
dtype
=
'int64'
,
lod_level
=
1
)
title_emb
=
emb_pool
(
title_ids
,
self
.
lookup_table_name
,
is_distributed
)
brand_emb
=
emb_pool
(
brand_ids
,
self
.
lookup_table_name
,
is_distributed
)
profile_emb
=
emb_pool
(
profile_ids
,
"profile_emb"
,
False
)
fc0
=
fluid
.
layers
.
concat
(
input
=
[
title_emb
,
brand_emb
,
profile_emb
],
axis
=
1
)
predict
=
fluid
.
layers
.
fc
(
input
=
fc0
,
size
=
2
,
act
=
None
,
...
...
@@ -449,7 +452,7 @@ class TestLocalLookupTable(TestDistLookupTableBase):
def
transpiler_test_impl
(
self
):
pserver1
,
startup1
=
self
.
get_pserver
(
self
.
pserver1_ep
)
self
.
assertEqual
(
len
(
pserver1
.
blocks
),
3
)
self
.
assertEqual
(
len
(
pserver1
.
blocks
),
4
)
# 0 listen_and_serv
# 1 optimize for fc_w or fc_b adam
self
.
assertEqual
([
op
.
type
for
op
in
pserver1
.
blocks
[
1
].
ops
],
...
...
@@ -459,16 +462,22 @@ class TestLocalLookupTable(TestDistLookupTableBase):
self
.
assertEqual
([
op
.
type
for
op
in
pserver1
.
blocks
[
2
].
ops
],
[
"sum"
,
"scale"
,
"adam"
,
"scale"
,
"scale"
])
# 3 optimize for table 2 adam
# NOTE: if param is not selected rows, the grad will scaled to grad / trainer_num
self
.
assertEqual
([
op
.
type
for
op
in
pserver1
.
blocks
[
3
].
ops
],
[
"sum"
,
"scale"
,
"adam"
,
"scale"
,
"scale"
])
trainer
,
_
=
self
.
get_trainer
()
self
.
assertEqual
(
len
(
trainer
.
blocks
),
1
)
ops
=
[
'lookup_table'
,
'sequence_pool'
,
'lookup_table'
,
'sequence_pool'
,
'concat'
,
'mul'
,
'elementwise_add'
,
'cross_entropy'
,
'mean'
,
'
lookup_table'
,
'sequence_pool'
,
'
concat'
,
'mul'
,
'elementwise_add'
,
'cross_entropy'
,
'mean'
,
'fill_constant'
,
'mean_grad'
,
'cross_entropy_grad'
,
'elementwise_add_grad'
,
'send'
,
'mul_grad'
,
'send'
,
'concat_grad'
,
'sequence_pool_grad'
,
'lookup_table_grad'
,
'split_selected_rows'
,
'send'
,
'sequence_pool_grad'
,
'lookup_table_grad'
,
'sequence_pool_grad'
,
'lookup_table_grad'
,
'sum'
,
'split_selected_rows'
,
'send'
,
'send_barrier'
,
'recv'
,
'recv'
,
'recv'
,
'
fetch_barrier
'
,
'concat'
'send_barrier'
,
'recv'
,
'recv'
,
'recv'
,
'
recv'
,
'fetch_barrier'
,
'concat
'
,
'concat'
]
self
.
assertEqual
([
op
.
type
for
op
in
trainer
.
blocks
[
0
].
ops
],
ops
)
...
...
@@ -480,40 +489,42 @@ class TestDistLookupTable(TestDistLookupTableBase):
def
transpiler_test_impl
(
self
):
pserver1
,
startup1
=
self
.
get_pserver
(
self
.
pserver1_ep
)
self
.
assertEqual
(
len
(
pserver1
.
blocks
),
5
)
self
.
assertEqual
(
len
(
pserver1
.
blocks
),
6
)
# 0 listen_and_serv
# 1 optimize for fc_w or fc_b adam
self
.
assertEqual
([
op
.
type
for
op
in
pserver1
.
blocks
[
1
].
ops
],
[
"sum"
,
"scale"
,
"adam"
,
"scale"
,
"scale"
])
#
2 optimize for table sgd
#
4 prefetch -> lookup_sparse_table for data0
self
.
assertEqual
([
op
.
type
for
op
in
pserver1
.
blocks
[
2
].
ops
],
[
"sum"
,
"scale"
,
"adam"
,
"scale"
,
"scale"
])
# 2 optimize for table sgd
self
.
assertEqual
([
op
.
type
for
op
in
pserver1
.
blocks
[
3
].
ops
],
[
"sum"
,
"sgd"
])
# 3 prefetch -> lookup_sparse_table for data0
self
.
assertEqual
([
op
.
type
for
op
in
pserver1
.
blocks
[
3
].
ops
],
self
.
assertEqual
([
op
.
type
for
op
in
pserver1
.
blocks
[
4
].
ops
],
[
"lookup_sparse_table"
])
#
4
save table
self
.
assertEqual
([
op
.
type
for
op
in
pserver1
.
blocks
[
4
].
ops
],
[
"save"
])
#
5
save table
self
.
assertEqual
([
op
.
type
for
op
in
pserver1
.
blocks
[
5
].
ops
],
[
"save"
])
trainer
,
trainer_startup
=
self
.
get_trainer
()
self
.
assertEqual
(
len
(
trainer
.
blocks
),
1
)
ops
=
[
'split_ids'
,
'prefetch'
,
'merge_ids'
,
'sequence_pool'
,
'sequence_pool'
,
'concat'
,
'mul'
,
'elementwise_add'
,
'sequence_pool'
,
'
lookup_table'
,
'sequence_pool'
,
'
concat'
,
'mul'
,
'elementwise_add'
,
'cross_entropy'
,
'mean'
,
'fill_constant'
,
'mean_grad'
,
'cross_entropy_grad'
,
'elementwise_add_grad'
,
'send'
,
'mul_grad'
,
'send'
,
'concat_grad'
,
'sequence_pool_grad'
,
'lookup_table_grad'
,
's
equence_pool_grad'
,
'lookup_table_grad'
,
'sum'
,
'split_ids
'
,
'se
nd'
,
'send_barrier'
,
'recv'
,
'recv'
,
'fetch_barrier'
]
's
plit_selected_rows'
,
'send'
,
'sequence_pool_grad'
,
'lookup_table_grad
'
,
'se
quence_pool_grad'
,
'lookup_table_grad'
,
'sum'
,
'split_ids'
,
'send'
,
'send_barrier'
,
'recv'
,
'recv'
,
'recv'
,
'fetch_barrier'
,
'concat'
]
self
.
assertEqual
([
op
.
type
for
op
in
trainer
.
blocks
[
0
].
ops
],
ops
)
startup_ops
=
[
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'
uniform_random'
,
'recv'
,
'recv
'
,
'f
etch_barrier'
,
'fake_init'
]
'fill_constant'
,
'fill_constant'
,
'
fill_constant'
,
'fill_constant
'
,
'f
ill_constant'
,
'fill_constant'
,
'uniform_random'
,
'uniform_random'
,
'recv'
,
'recv'
,
'recv'
,
'fetch_barrier'
,
'concat'
,
'fake_init'
]
self
.
assertEqual
([
op
.
type
for
op
in
trainer_startup
.
blocks
[
0
].
ops
],
startup_ops
)
...
...
@@ -526,7 +537,7 @@ class TestAsyncLocalLookupTable(TestDistLookupTableBase):
config
=
fluid
.
DistributeTranspilerConfig
()
pserver1
,
startup1
=
self
.
get_pserver
(
self
.
pserver1_ep
,
config
,
False
)
self
.
assertEqual
(
len
(
pserver1
.
blocks
),
3
)
self
.
assertEqual
(
len
(
pserver1
.
blocks
),
4
)
# 0 listen_and_serv
# 1 optimize for fc_w or fc_b adam
self
.
assertEqual
([
op
.
type
for
op
in
pserver1
.
blocks
[
1
].
ops
],
...
...
@@ -535,17 +546,24 @@ class TestAsyncLocalLookupTable(TestDistLookupTableBase):
# NOTE: if param is not selected rows, the grad will scaled to grad / trainer_num
self
.
assertEqual
([
op
.
type
for
op
in
pserver1
.
blocks
[
2
].
ops
],
[
"adam"
,
"scale"
,
"scale"
])
# 3 optimize for table adam
# NOTE: if param is not selected rows, the grad will scaled to grad / trainer_num
self
.
assertEqual
([
op
.
type
for
op
in
pserver1
.
blocks
[
3
].
ops
],
[
"adam"
,
"scale"
,
"scale"
])
trainer
,
_
=
self
.
get_trainer
(
config
)
self
.
assertEqual
(
len
(
trainer
.
blocks
),
1
)
ops
=
[
'lookup_table'
,
'sequence_pool'
,
'lookup_table'
,
'sequence_pool'
,
'concat'
,
'mul'
,
'elementwise_add'
,
'cross_entropy'
,
'mean'
,
'fill_constant'
,
'mean_grad'
,
'cross_entropy_grad'
,
'elementwise_add_grad'
,
'send'
,
'mul_grad'
,
'send'
,
'concat_grad'
,
'sequence_pool_grad'
,
'lookup_table_grad'
,
'sequence_pool_grad'
,
'lookup_table_grad'
,
'sum'
,
'split_selected_rows'
,
'send'
,
'recv'
,
'recv'
,
'recv'
,
'concat'
'lookup_table'
,
'sequence_pool'
,
'concat'
,
'mul'
,
'elementwise_add'
,
'cross_entropy'
,
'mean'
,
'fill_constant'
,
'mean_grad'
,
'cross_entropy_grad'
,
'elementwise_add_grad'
,
'send'
,
'mul_grad'
,
'send'
,
'concat_grad'
,
'sequence_pool_grad'
,
'lookup_table_grad'
,
'split_selected_rows'
,
'send'
,
'sequence_pool_grad'
,
'lookup_table_grad'
,
'sequence_pool_grad'
,
'lookup_table_grad'
,
'sum'
,
'split_selected_rows'
,
'send'
,
'recv'
,
'recv'
,
'recv'
,
'recv'
,
'concat'
,
'concat'
]
self
.
assertEqual
([
op
.
type
for
op
in
trainer
.
blocks
[
0
].
ops
],
ops
)
...
...
@@ -559,30 +577,34 @@ class TestAsyncDistLookupTable(TestDistLookupTableBase):
pserver1
,
startup1
=
self
.
get_pserver
(
self
.
pserver1_ep
,
config
,
False
)
self
.
assertEqual
(
len
(
pserver1
.
blocks
),
5
)
self
.
assertEqual
(
len
(
pserver1
.
blocks
),
6
)
# 0 listen_and_serv
# 1 optimize for fc_w or fc_b adam
self
.
assertEqual
([
op
.
type
for
op
in
pserver1
.
blocks
[
1
].
ops
],
[
"adam"
,
"scale"
,
"scale"
])
# 2 optimize for table sgd
self
.
assertEqual
([
op
.
type
for
op
in
pserver1
.
blocks
[
2
].
ops
],
[
"sgd"
])
# 3 prefetch -> lookup_sparse_table for data0
self
.
assertEqual
([
op
.
type
for
op
in
pserver1
.
blocks
[
3
].
ops
],
# 2 optimize for table adam
self
.
assertEqual
([
op
.
type
for
op
in
pserver1
.
blocks
[
2
].
ops
],
[
"adam"
,
"scale"
,
"scale"
])
# 3 optimize for table sgd
self
.
assertEqual
([
op
.
type
for
op
in
pserver1
.
blocks
[
3
].
ops
],
[
"sgd"
])
# 4 prefetch -> lookup_sparse_table for data0
self
.
assertEqual
([
op
.
type
for
op
in
pserver1
.
blocks
[
4
].
ops
],
[
"lookup_sparse_table"
])
#
4
save table
self
.
assertEqual
([
op
.
type
for
op
in
pserver1
.
blocks
[
4
].
ops
],
[
"save"
])
#
5
save table
self
.
assertEqual
([
op
.
type
for
op
in
pserver1
.
blocks
[
5
].
ops
],
[
"save"
])
trainer
,
_
=
self
.
get_trainer
(
config
)
self
.
assertEqual
(
len
(
trainer
.
blocks
),
1
)
ops
=
[
'split_ids'
,
'prefetch'
,
'merge_ids'
,
'sequence_pool'
,
'sequence_pool'
,
'concat'
,
'mul'
,
'elementwise_add'
,
'cross_entropy'
,
'mean'
,
'fill_constant'
,
'mean_grad'
,
'cross_entropy_grad'
,
'elementwise_add_grad'
,
'send'
,
'mul_grad'
,
'send'
,
'concat_grad'
,
'sequence_pool_grad'
,
'lookup_table_grad'
,
'sequence_pool_grad'
,
'lookup_table_grad'
,
'sum'
,
'split_ids'
,
'send'
,
'recv'
,
'recv'
]
'sequence_pool'
,
'lookup_table'
,
'sequence_pool'
,
'concat'
,
'mul'
,
'elementwise_add'
,
'cross_entropy'
,
'mean'
,
'fill_constant'
,
'mean_grad'
,
'cross_entropy_grad'
,
'elementwise_add_grad'
,
'send'
,
'mul_grad'
,
'send'
,
'concat_grad'
,
'sequence_pool_grad'
,
'lookup_table_grad'
,
'split_selected_rows'
,
'send'
,
'sequence_pool_grad'
,
'lookup_table_grad'
,
'sequence_pool_grad'
,
'lookup_table_grad'
,
'sum'
,
'split_ids'
,
'send'
,
'recv'
,
'recv'
,
'recv'
,
'concat'
]
self
.
assertEqual
([
op
.
type
for
op
in
trainer
.
blocks
[
0
].
ops
],
ops
)
...
...
python/paddle/fluid/transpiler/distribute_transpiler.py
浏览文件 @
add4b466
...
...
@@ -1065,7 +1065,10 @@ to transpile() call.")
continue_search_lookup_table_op
=
False
all_ops
=
program
.
global_block
().
ops
for
op
in
all_ops
:
if
op
.
type
==
LOOKUP_TABLE_TYPE
:
if
op
.
type
==
LOOKUP_TABLE_TYPE
and
self
.
table_name
==
op
.
input
(
"W"
)[
0
]:
if
not
op
.
attr
(
'is_distributed'
):
raise
RuntimeError
(
"lookup_table_op that lookup an distributed embedding table"
"should set is_distributed to true"
)
continue_search_lookup_table_op
=
True
lookup_table_op_index
=
lookup_table_op_index
if
lookup_table_op_index
!=
-
1
else
list
(
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录