Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
Paddle
提交
bd0f8f8d
P
Paddle
项目概览
PaddlePaddle
/
Paddle
大约 1 年 前同步成功
通知
2298
Star
20931
Fork
5422
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1423
列表
看板
标记
里程碑
合并请求
543
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1,423
Issue
1,423
列表
看板
标记
里程碑
合并请求
543
合并请求
543
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
bd0f8f8d
编写于
6月 04, 2018
作者:
Y
Yancey
提交者:
GitHub
6月 04, 2018
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #11054 from velconia/update_simple_distranspiler
Merge simple_dist_transpiler.py into distribute_transpiler.py
上级
d3e99aee
0ed461df
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
207 addition
and
333 deletion
+207
-333
python/paddle/fluid/__init__.py
python/paddle/fluid/__init__.py
+2
-2
python/paddle/fluid/tests/unittests/test_dist_transpiler.py
python/paddle/fluid/tests/unittests/test_dist_transpiler.py
+3
-54
python/paddle/fluid/tests/unittests/test_simple_dist_transpiler.py
...ddle/fluid/tests/unittests/test_simple_dist_transpiler.py
+80
-0
python/paddle/fluid/tests/unittests/test_slice_var.py
python/paddle/fluid/tests/unittests/test_slice_var.py
+6
-6
python/paddle/fluid/tests/unittests/transpiler_test.py
python/paddle/fluid/tests/unittests/transpiler_test.py
+73
-0
python/paddle/fluid/transpiler/__init__.py
python/paddle/fluid/transpiler/__init__.py
+2
-3
python/paddle/fluid/transpiler/distribute_transpiler.py
python/paddle/fluid/transpiler/distribute_transpiler.py
+41
-14
python/paddle/fluid/transpiler/distribute_transpiler_simple.py
...n/paddle/fluid/transpiler/distribute_transpiler_simple.py
+0
-254
未找到文件。
python/paddle/fluid/__init__.py
浏览文件 @
bd0f8f8d
...
...
@@ -44,8 +44,8 @@ import transpiler
from
param_attr
import
ParamAttr
,
WeightNormParamAttr
from
data_feeder
import
DataFeeder
from
core
import
LoDTensor
,
CPUPlace
,
CUDAPlace
,
CUDAPinnedPlace
from
transpiler
import
DistributeTranspiler
,
SimpleDistribut
eTranspiler
,
\
InferenceTranspiler
,
memory_optimize
,
release_memory
from
transpiler
import
DistributeTranspiler
,
Inferenc
eTranspiler
,
\
memory_optimize
,
release_memory
from
concurrency
import
(
Go
,
make_channel
,
channel_send
,
channel_recv
,
channel_close
,
Select
)
from
lod_tensor
import
create_lod_tensor
,
create_random_int_lodtensor
...
...
python/paddle/fluid/tests/unittests/test_dist_transpiler.py
浏览文件 @
bd0f8f8d
...
...
@@ -12,40 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import
unittest
import
paddle.fluid
as
fluid
import
paddle.fluid.core
as
core
import
paddle.fluid.layers
as
layers
from
paddle.fluid.transpiler.distribute_transpiler
import
delete_ops
import
numpy
from
transpiler_test
import
TranspilerTest
class
TestDistTranspiler
(
unittest
.
TestCase
):
class
TestDistTranspiler
(
TranspilerTest
):
def
setUp
(
self
):
self
.
trainer_id
=
0
self
.
trainers
=
2
self
.
pservers
=
2
self
.
pserver_eps
=
"127.0.0.1:6174,127.0.0.1:6175"
self
.
current_pserver_ep
=
"127.0.0.1:6174"
def
net_conf
(
self
):
x
=
fluid
.
layers
.
data
(
name
=
'x'
,
shape
=
[
1000
],
dtype
=
'float32'
)
y_predict
=
fluid
.
layers
.
fc
(
input
=
x
,
size
=
1000
,
act
=
None
,
param_attr
=
fluid
.
ParamAttr
(
name
=
'fc_w'
))
y
=
fluid
.
layers
.
data
(
name
=
'y'
,
shape
=
[
1
],
dtype
=
'float32'
)
cost
=
fluid
.
layers
.
square_error_cost
(
input
=
y_predict
,
label
=
y
)
avg_cost
=
fluid
.
layers
.
mean
(
cost
)
sgd_optimizer
=
fluid
.
optimizer
.
SGD
(
learning_rate
=
0.1
)
optimize_ops
,
params_grads
=
sgd_optimizer
.
minimize
(
avg_cost
)
return
optimize_ops
,
params_grads
def
test_transpiler
(
self
):
trainer
=
self
.
get_trainer
()
pserver
,
startup
=
self
.
get_pserver
(
self
.
current_pserver_ep
)
...
...
@@ -70,14 +46,6 @@ class TestDistTranspiler(unittest.TestCase):
fc_w_var
=
startup
.
global_block
().
var
(
"fc_w.block1"
)
self
.
assertEqual
(
fc_w_var
.
shape
,
(
500
,
1000
))
def
get_main_program
(
self
):
main
=
fluid
.
Program
()
with
fluid
.
program_guard
(
main
):
self
.
net_conf
()
return
main
def
get_expect_trainer_ops
(
self
):
trainer
=
fluid
.
Program
()
...
...
@@ -92,25 +60,6 @@ class TestDistTranspiler(unittest.TestCase):
ops
.
insert
(
ops
.
index
(
"elementwise_add_grad"
)
+
1
,
"send_vars"
)
return
ops
def
get_trainer
(
self
):
return
self
.
_transpiler_instance
().
get_trainer_program
()
def
get_pserver
(
self
,
ep
):
t
=
self
.
_transpiler_instance
()
pserver
=
t
.
get_pserver_program
(
ep
)
startup
=
t
.
get_startup_program
(
ep
,
pserver
)
return
pserver
,
startup
def
_transpiler_instance
(
self
):
main
=
self
.
get_main_program
()
t
=
fluid
.
DistributeTranspiler
()
t
.
transpile
(
self
.
trainer_id
,
program
=
main
,
pservers
=
self
.
pserver_eps
,
trainers
=
self
.
trainers
)
return
t
if
__name__
==
"__main__"
:
unittest
.
main
()
python/paddle/fluid/tests/unittests/test_simple_dist_transpiler.py
0 → 100644
浏览文件 @
bd0f8f8d
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
numpy
as
np
import
paddle.fluid
as
fluid
from
paddle.fluid.transpiler.distribute_transpiler
import
delete_ops
from
transpiler_test
import
TranspilerTest
class
TestSimpleDistTranspiler
(
TranspilerTest
):
def
setUp
(
self
):
self
.
current_pserver_ep
=
"127.0.0.1:6175"
def
test_simple_transpiler
(
self
):
np
.
random
.
seed
(
1
)
trainer
=
self
.
get_trainer
()
pserver
,
startup
=
self
.
get_pserver
(
self
.
current_pserver_ep
)
self
.
assertEqual
([
op
.
type
for
op
in
trainer
.
global_block
().
ops
],
self
.
get_expect_trainer_ops
())
self
.
assertEqual
(
len
(
pserver
.
blocks
),
2
)
# block0: listen_and_serv
self
.
assertEqual
([
op
.
type
for
op
in
pserver
.
blocks
[
0
].
ops
],
[
"listen_and_serv"
])
# block1: optimize pass
self
.
assertEqual
([
op
.
type
for
op
in
pserver
.
blocks
[
1
].
ops
],
[
"sum"
,
"scale"
,
"sgd"
])
# confirm startup program
self
.
assertEqual
([
op
.
type
for
op
in
startup
.
global_block
().
ops
],
[
"fill_constant"
,
"uniform_random"
,
"uniform_random"
])
# the variable #fc_w will NOT be splited
fc_w_var
=
startup
.
global_block
().
var
(
"fc_w@GRAD"
)
self
.
assertEqual
(
fc_w_var
.
shape
,
(
1000
,
1000
))
fc_w_var
=
startup
.
global_block
().
var
(
"fc_w@GRAD.trainer_0"
)
self
.
assertEqual
(
fc_w_var
.
shape
,
(
1000
,
1000
))
def
get_expect_trainer_ops
(
self
):
trainer
=
fluid
.
Program
()
with
fluid
.
program_guard
(
trainer
):
optimize_ops
,
params_grads
=
self
.
net_conf
()
delete_ops
(
trainer
.
global_block
(),
optimize_ops
)
ops
=
[
op
.
type
for
op
in
trainer
.
global_block
().
ops
]
+
[
"send_vars"
,
"send_barrier"
,
"recv"
,
"recv"
,
"fetch_barrier"
]
ops
.
insert
(
ops
.
index
(
"elementwise_add_grad"
)
+
1
,
"send_vars"
)
return
ops
def
_transpiler_instance
(
self
):
main
=
self
.
get_main_program
()
t
=
fluid
.
DistributeTranspiler
()
t
.
transpile
(
self
.
trainer_id
,
program
=
main
,
pservers
=
self
.
pserver_eps
,
trainers
=
self
.
trainers
,
slice_var_up
=
False
)
return
t
if
__name__
==
"__main__"
:
unittest
.
main
()
python/paddle/fluid/tests/unittests/test_s
plit
_var.py
→
python/paddle/fluid/tests/unittests/test_s
lice
_var.py
浏览文件 @
bd0f8f8d
...
...
@@ -14,14 +14,14 @@
import
math
import
unittest
from
paddle.fluid.transpiler.distribute_transpiler
import
s
plit
_variable
from
paddle.fluid.transpiler.distribute_transpiler
import
s
lice
_variable
import
paddle.fluid
as
fluid
import
paddle.fluid.core
as
core
import
random
class
TestS
plit
Var
(
unittest
.
TestCase
):
def
check_s
plit
_output
(
self
,
shapes
,
expected_sizes
,
min_size
):
class
TestS
lice
Var
(
unittest
.
TestCase
):
def
check_s
lice
_output
(
self
,
shapes
,
expected_sizes
,
min_size
):
var_list
=
[]
program
=
fluid
.
Program
()
for
shape
in
shapes
:
...
...
@@ -31,7 +31,7 @@ class TestSplitVar(unittest.TestCase):
# dtype=core.VarDesc.VarType.LOD_TENSOR,
shape
=
shape
)
var_list
.
append
(
var
)
blocks
=
s
plit
_variable
(
var_list
,
10
,
min_size
)
blocks
=
s
lice
_variable
(
var_list
,
10
,
min_size
)
all_sizes
=
[]
for
s
in
expected_sizes
:
for
s2
in
s
:
...
...
@@ -49,7 +49,7 @@ class TestSplitVar(unittest.TestCase):
[
1150
,
1150
,
1150
,
1150
,
1150
,
1150
,
1100
]
]
self
.
check_s
plit
_output
(
shapes
,
expected_sizes
,
1024
)
self
.
check_s
lice
_output
(
shapes
,
expected_sizes
,
1024
)
def
test_check_output_8k
(
self
):
shapes
=
[[
3
,
5
],
[
1024
],
[
28
,
784
],
[
8
,
1020
],
[
800
,
10
],
...
...
@@ -57,7 +57,7 @@ class TestSplitVar(unittest.TestCase):
expected_sizes
=
[[
15
],
[
1024
],
[
10976
,
10976
],
[
8160
],
[
8000
],
[
35937
,
35937
,
35937
,
35937
,
35937
,
35937
]]
self
.
check_s
plit
_output
(
shapes
,
expected_sizes
,
8192
)
self
.
check_s
lice
_output
(
shapes
,
expected_sizes
,
8192
)
if
__name__
==
'__main__'
:
...
...
python/paddle/fluid/tests/unittests/transpiler_test.py
0 → 100644
浏览文件 @
bd0f8f8d
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
unittest
import
numpy
as
np
import
paddle.fluid
as
fluid
import
paddle.fluid.core
as
core
import
paddle.fluid.layers
as
layers
class
TranspilerTest
(
unittest
.
TestCase
):
@
classmethod
def
setUpClass
(
self
):
self
.
trainer_id
=
0
self
.
trainers
=
2
self
.
pservers
=
2
self
.
pserver_eps
=
"127.0.0.1:6174,127.0.0.1:6175"
def
net_conf
(
self
):
x
=
fluid
.
layers
.
data
(
name
=
'x'
,
shape
=
[
1000
],
dtype
=
'float32'
)
y_predict
=
fluid
.
layers
.
fc
(
input
=
x
,
size
=
1000
,
act
=
None
,
param_attr
=
fluid
.
ParamAttr
(
name
=
'fc_w'
))
y
=
fluid
.
layers
.
data
(
name
=
'y'
,
shape
=
[
1
],
dtype
=
'float32'
)
cost
=
fluid
.
layers
.
square_error_cost
(
input
=
y_predict
,
label
=
y
)
avg_cost
=
fluid
.
layers
.
mean
(
cost
)
sgd_optimizer
=
fluid
.
optimizer
.
SGD
(
learning_rate
=
0.1
)
optimize_ops
,
params_grads
=
sgd_optimizer
.
minimize
(
avg_cost
)
return
optimize_ops
,
params_grads
def
get_main_program
(
self
):
main
=
fluid
.
Program
()
with
fluid
.
program_guard
(
main
):
self
.
net_conf
()
return
main
def
get_trainer
(
self
):
return
self
.
_transpiler_instance
().
get_trainer_program
()
def
get_pserver
(
self
,
ep
):
t
=
self
.
_transpiler_instance
()
pserver
=
t
.
get_pserver_program
(
ep
)
startup
=
t
.
get_startup_program
(
ep
,
pserver
)
return
pserver
,
startup
def
_transpiler_instance
(
self
):
main
=
self
.
get_main_program
()
t
=
fluid
.
DistributeTranspiler
()
t
.
transpile
(
self
.
trainer_id
,
program
=
main
,
pservers
=
self
.
pserver_eps
,
trainers
=
self
.
trainers
)
return
t
python/paddle/fluid/transpiler/__init__.py
浏览文件 @
bd0f8f8d
...
...
@@ -15,10 +15,9 @@
from
distribute_transpiler
import
DistributeTranspiler
from
inference_transpiler
import
InferenceTranspiler
from
memory_optimization_transpiler
import
memory_optimize
,
release_memory
from
distribute_transpiler_simple
import
SimpleDistributeTranspiler
from
ps_dispatcher
import
HashName
,
RoundRobin
__all__
=
[
"DistributeTranspiler"
,
"InferenceTranspiler"
,
"
SimpleDistributeTranspiler
"
,
"
memory_optimize"
,
"
release_memory"
,
"HashName"
,
"RoundRobin"
"DistributeTranspiler"
,
"InferenceTranspiler"
,
"
memory_optimize
"
,
"release_memory"
,
"HashName"
,
"RoundRobin"
]
python/paddle/fluid/transpiler/distribute_transpiler.py
浏览文件 @
bd0f8f8d
...
...
@@ -39,6 +39,7 @@ Steps to transpile pserver:
from
__future__
import
print_function
import
math
import
numpy
as
np
from
ps_dispatcher
import
RoundRobin
,
HashName
,
PSDispatcher
from
..
import
core
,
framework
...
...
@@ -70,7 +71,7 @@ def same_or_split_var(p_name, var_name):
return
p_name
==
var_name
or
p_name
.
startswith
(
var_name
+
".block"
)
def
s
plit_variable
(
var_list
,
serv
ice_count
,
min_block_size
=
8192
):
def
s
lice_variable
(
var_list
,
sl
ice_count
,
min_block_size
=
8192
):
"""
We may need to split dense tensor to one or more blocks and put
them equally onto parameter server. One block is a sub-tensor
...
...
@@ -78,25 +79,25 @@ def split_variable(var_list, service_count, min_block_size=8192):
We need to have a minimal block size so that the calculations in
the parameter server side can gain better performance. By default
minimum block size 8K elements (maybe 16bit or 32bit or 64bit).
minimum block size 8K elements (maybe 16bit or 32bit or 64bit).
Args:
var_list (list): List of variables.
s
ervice_count (int): Numel of pserver services. A pserver may have two
or more listening ports
.
s
lice_count (int): Numel of count that variables will be sliced, which
could be the pserver services' count
.
min_block_size (int): Minimum splitted block size.
Returns:
blocks (list[(varname, block_id, current_block_size)]): A list
blocks (list[(varname, block_id, current_block_size)]): A list
of VarBlocks. Each VarBlock specifies a shard of the var.
"""
blocks
=
[]
for
var
in
var_list
:
split_count
=
s
erv
ice_count
split_count
=
s
l
ice_count
var_numel
=
reduce
(
lambda
x
,
y
:
x
*
y
,
var
.
shape
)
max_pserver_count
=
int
(
math
.
floor
(
var_numel
/
float
(
min_block_size
)))
if
max_pserver_count
==
0
:
max_pserver_count
=
1
if
max_pserver_count
<
s
erv
ice_count
:
if
max_pserver_count
<
s
l
ice_count
:
split_count
=
max_pserver_count
block_size
=
int
(
math
.
ceil
(
var_numel
/
float
(
split_count
)))
...
...
@@ -177,7 +178,7 @@ class DistributeTranspiler:
for
index
in
range
(
len
(
self
.
pserver_endpoints
))
]
def
_init_splited_vars
(
self
,
s
plit_method
):
def
_init_splited_vars
(
self
,
s
lice_var_up
):
# update these mappings for further transpile:
# 1. param_var_mapping: param var name -> [splited params vars]
# 2. grad_var_mapping: grad var name -> [splited grads vars]
...
...
@@ -196,9 +197,19 @@ class DistributeTranspiler:
self
.
_update_dist_lookup_table_vars
(
param_list
,
grad_list
,
self
.
params_grads
)
grad_blocks
=
split_variable
(
grad_list
,
len
(
self
.
pserver_endpoints
))
param_blocks
=
split_variable
(
param_list
,
len
(
self
.
pserver_endpoints
))
if
slice_var_up
:
# when we slice var up into blocks, we will slice the var according to
# pserver services' count. A pserver may have two or more listening ports.
grad_blocks
=
slice_variable
(
grad_list
,
len
(
self
.
pserver_endpoints
))
param_blocks
=
slice_variable
(
param_list
,
len
(
self
.
pserver_endpoints
))
else
:
# when we do NOT slice var up into blocks, we will always slice params
# grads into one block.
grad_blocks
=
slice_variable
(
grad_list
,
1
)
param_blocks
=
slice_variable
(
param_list
,
1
)
assert
(
len
(
grad_blocks
)
==
len
(
param_blocks
))
# origin_varname -> [splited_var]
self
.
param_var_mapping
=
self
.
_create_vars_from_blocklist
(
self
.
origin_program
,
param_blocks
)
...
...
@@ -229,6 +240,7 @@ class DistributeTranspiler:
program
=
None
,
pservers
=
"127.0.0.1:6174"
,
trainers
=
1
,
slice_var_up
=
True
,
split_method
=
RoundRobin
,
sync_mode
=
True
):
"""
...
...
@@ -262,13 +274,27 @@ class DistributeTranspiler:
self
.
has_distributed_lookup_table
=
self
.
_has_distributed_lookup_table
()
# split and create vars, then put splited vars in dicts for later use.
self
.
_init_splited_vars
(
s
plit_method
)
self
.
_init_splited_vars
(
s
lice_var_up
)
# step 3.1: insert send op to send gradient vars to parameter servers
ps_dispatcher
.
reset
()
send_vars
=
[]
for
orig_varname
,
splited_vars
in
self
.
grad_var_mapping
.
items
():
# in general cases, the number of pservers is times of 2, and this
# will lead to uneven distribution among weights and bias:
# fc_w@GRAD_trainer_0, fc_w@GRAD_trainer_1 --> pserver1
# fc_b@GRAD_trainer_0, fc_b@GRAD_trainer_1 --> pserver2
# shuffle the map will avoid the uneven distribution above
grad_var_mapping_items
=
self
.
grad_var_mapping
.
items
()
if
not
slice_var_up
:
np
.
random
.
shuffle
(
grad_var_mapping_items
)
for
orig_varname
,
splited_vars
in
grad_var_mapping_items
:
eplist
=
ps_dispatcher
.
dispatch
(
splited_vars
)
if
not
slice_var_up
:
assert
(
len
(
splited_vars
)
==
1
)
if
len
(
splited_vars
)
==
1
:
orig_varname
=
splited_vars
[
0
].
name
index
=
find_op_by_output_arg
(
program
.
global_block
(),
...
...
@@ -316,6 +342,7 @@ class DistributeTranspiler:
for
i
,
ep
in
enumerate
(
eplist
):
self
.
param_grad_ep_mapping
[
ep
][
"params"
].
append
(
recv_vars
[
i
])
self
.
param_grad_ep_mapping
[
ep
][
"grads"
].
append
(
send_vars
[
i
])
# step4: Concat the parameters splits together after recv.
for
varname
,
splited_var
in
self
.
param_var_mapping
.
iteritems
():
eps
=
[]
...
...
@@ -788,8 +815,8 @@ class DistributeTranspiler:
program (ProgramDesc): ProgramDesc which gradients blong.
block_list (list[(varname, block_id, block_size)]): List of gradient blocks.
add_trainer_suffix (Bool): Add trainer suffix to new variable's name if set True.
Returns:
var_mapping (dict(varname->[new_varname_variable])):A dict mapping
Returns:
var_mapping (dict(varname->[new_varname_variable])):A dict mapping
from original var name to each var split.
"""
...
...
python/paddle/fluid/transpiler/distribute_transpiler_simple.py
已删除
100644 → 0
浏览文件 @
d3e99aee
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
..framework
import
Program
,
default_main_program
,
Parameter
,
Variable
from
..layer_helper
import
LayerHelper
def
hash_name_to_server
(
params_grads
,
pserver_endpoints
):
"""
:param param_grads:
:return: a map of pserver endpoint ->
params -> [param list]
grads -> [grad list]
"""
def
_hash_param
(
param_name
,
total
):
return
hash
(
param_name
)
%
total
param_grad_map
=
dict
()
for
param
,
grad
in
params_grads
:
if
param
.
trainable
is
True
and
grad
is
not
None
:
server_id
=
_hash_param
(
param
.
name
,
len
(
pserver_endpoints
))
server_for_param
=
pserver_endpoints
[
server_id
]
if
not
param_grad_map
.
has_key
(
server_for_param
):
param_grad_map
[
server_for_param
]
=
{
"params"
:
[],
"grads"
:
[]}
param_grad_map
[
server_for_param
][
"params"
].
append
(
param
)
param_grad_map
[
server_for_param
][
"grads"
].
append
(
grad
)
return
param_grad_map
def
round_robin
(
params_grads
,
pserver_endpoints
):
assert
(
len
(
params_grads
)
>
len
(
pserver_endpoints
))
param_grad_map
=
dict
()
pserver_idx
=
0
for
param
,
grad
in
params_grads
:
if
param
.
trainable
is
True
:
server_for_param
=
pserver_endpoints
[
pserver_idx
]
if
not
param_grad_map
.
has_key
(
server_for_param
):
param_grad_map
[
server_for_param
]
=
{
"params"
:
[],
"grads"
:
[]}
param_grad_map
[
server_for_param
][
"params"
].
append
(
param
)
param_grad_map
[
server_for_param
][
"grads"
].
append
(
grad
)
pserver_idx
+=
1
if
pserver_idx
>=
len
(
pserver_endpoints
):
pserver_idx
=
0
return
param_grad_map
class
SimpleDistributeTranspiler
:
def
transpile
(
self
,
optimize_ops
,
params_grads
,
program
=
None
,
pservers
=
"127.0.0.1:6174"
,
trainers
=
1
,
split_method
=
round_robin
):
"""
Transpile the program to a distributed data-parallelism programs.
The main_program will be transform to use a remote parameter server
to do parameter optimization. And the optimization graph will be put
in to a parameter server program.
Use different methods to split trainable varialbles to different
parameter servers.
Example to run:
exe = fluid.Executor(place)
t = fluid.DistributeTranspiler()
t.transpile(optimize_ops, params_grads, pservers="127.0.0.1:6174", trainers=1)
pserver_endpoint = os.getenv("PSERVER")
if pserver_endpoint:
pserver_prog = t.get_pserver_program(pserver_endpoint, optimize_ops)
exe.run(fluid.default_startup_program())
exe.run(pserver_prog)
else:
feeder = fluid.DataFeeder(feed_list=[images, label], place=place)
exe.run(fluid.default_startup_program())
for pass_id in range(PASS_NUM):
...
:param optimize_ops: op list of optimization, should be the
return value of Optimizer.minimize
:type optimize_ops: list
:param program: program to optimize, default default_main_program
:param pservers: parameter server endpoints like "m1:6174,m2:6174"
:type pservers: string
:return: return a list of programs
"""
if
program
is
None
:
program
=
default_main_program
()
self
.
program
=
program
self
.
trainers
=
trainers
self
.
optimize_ops
=
optimize_ops
self
.
_optimize_distributed
(
optimize_ops
,
program
,
params_grads
,
pservers
=
pservers
,
trainers
=
trainers
,
split_method
=
split_method
)
def
_clone_param
(
self
,
block
,
v
):
assert
isinstance
(
v
,
Parameter
)
new_p
=
Parameter
(
block
=
block
,
shape
=
v
.
shape
,
dtype
=
v
.
dtype
,
type
=
v
.
type
,
lod_level
=
v
.
lod_level
,
stop_gradient
=
v
.
stop_gradient
,
trainable
=
v
.
trainable
,
optimize_attr
=
v
.
optimize_attr
,
regularizer
=
v
.
regularizer
,
name
=
v
.
name
)
block
.
vars
[
new_p
.
name
]
=
new_p
def
_clone_var
(
self
,
block
,
var
):
assert
isinstance
(
var
,
Variable
)
return
block
.
create_var
(
name
=
var
.
name
,
shape
=
var
.
shape
,
dtype
=
var
.
dtype
,
type
=
var
.
type
,
lod_level
=
var
.
lod_level
,
persistable
=
var
.
persistable
)
def
_optimize_distributed
(
self
,
optimize_ops
,
program
,
params_and_grads
,
**
kwargs
):
if
kwargs
.
has_key
(
"split_method"
):
split_method
=
kwargs
[
"split_method"
]
else
:
split_method
=
round_robin
assert
(
callable
(
split_method
))
pserver_endpoints
=
kwargs
[
"pservers"
].
split
(
","
)
self
.
param_grad_map
=
split_method
(
params_and_grads
,
pserver_endpoints
)
send_op_ordered_inputs
=
[]
send_op_ordered_outputs
=
[]
epmap
=
[]
for
ep
,
v
in
self
.
param_grad_map
.
iteritems
():
send_op_ordered_inputs
.
extend
(
v
[
"grads"
])
send_op_ordered_outputs
.
extend
(
v
[
"params"
])
for
i
in
v
[
"grads"
]:
epmap
.
append
(
ep
)
send_op
=
program
.
global_block
().
append_op
(
type
=
"send"
,
inputs
=
{
"X"
:
send_op_ordered_inputs
},
# inputs is a list of tensors to be send
outputs
=
{
"Out"
:
send_op_ordered_outputs
},
attrs
=
{
"endpoints"
:
pserver_endpoints
,
"epmap"
:
epmap
})
def
get_trainer_program
(
self
):
# remove optimize ops and add a send op to main_program
self
.
program
.
global_block
().
delete_ops
(
self
.
optimize_ops
)
return
self
.
program
def
_create_var_for_trainers
(
self
,
block
,
var
,
trainers
):
var_list
=
[]
for
i
in
xrange
(
trainers
):
var_each
=
block
.
create_var
(
name
=
"%s.trainer_%d"
%
(
var
.
name
,
i
),
psersistable
=
var
.
persistable
,
dtype
=
var
.
dtype
,
shape
=
var
.
shape
)
var_list
.
append
(
var_each
)
return
var_list
def
get_pserver_program
(
self
,
endpoint
,
optimize_ops
):
pserver_program
=
Program
()
for
v
in
self
.
param_grad_map
[
endpoint
][
"params"
]:
self
.
_clone_param
(
pserver_program
.
global_block
(),
v
)
optimize_sub_program
=
Program
()
grad_var_names
=
[
var
.
name
for
var
in
self
.
param_grad_map
[
endpoint
][
"grads"
]
]
for
opt_op
in
optimize_ops
:
for
_
,
var
in
opt_op
.
inputs
.
iteritems
():
# NOTE: append operators to merge gradients from multiple
# trainers. If trainers == 1, this is not needed.
if
self
.
trainers
>
1
and
var
.
name
in
grad_var_names
:
vars2merge
=
self
.
_create_var_for_trainers
(
optimize_sub_program
.
global_block
(),
var
,
self
.
trainers
)
merged_var
=
optimize_sub_program
.
global_block
().
create_var
(
name
=
var
.
name
,
persistable
=
var
.
persistable
,
dtype
=
var
.
dtype
,
shape
=
var
.
shape
)
optimize_sub_program
.
global_block
().
append_op
(
type
=
"sum"
,
inputs
=
{
"X"
:
vars2merge
},
outputs
=
{
"Out"
:
merged_var
})
optimize_sub_program
.
global_block
().
append_op
(
type
=
"scale"
,
inputs
=
{
"X"
:
merged_var
},
outputs
=
{
"Out"
:
merged_var
},
attrs
=
{
"scale"
:
1.0
/
float
(
self
.
trainers
)})
else
:
optimize_sub_program
.
global_block
().
create_var
(
name
=
var
.
name
,
persistable
=
var
.
persistable
,
dtype
=
var
.
dtype
,
shape
=
var
.
shape
)
if
opt_op
.
inputs
.
has_key
(
"Grad"
):
if
opt_op
.
inputs
[
"Grad"
].
name
in
grad_var_names
:
optimize_sub_program
.
global_block
().
append_op
(
type
=
opt_op
.
type
,
inputs
=
opt_op
.
inputs
,
outputs
=
opt_op
.
outputs
,
attrs
=
opt_op
.
attrs
)
else
:
optimize_sub_program
.
global_block
().
append_op
(
type
=
opt_op
.
type
,
inputs
=
opt_op
.
inputs
,
outputs
=
opt_op
.
outputs
,
attrs
=
opt_op
.
attrs
)
pserver_program
.
global_block
().
append_op
(
type
=
"recv"
,
inputs
=
{
"RX"
:
self
.
param_grad_map
[
endpoint
][
"grads"
]},
# grads to recv
outputs
=
{},
attrs
=
{
"OptimizeBlock"
:
optimize_sub_program
.
global_block
(),
"endpoint"
:
endpoint
,
"ParamList"
:
[
p
.
name
for
p
in
self
.
param_grad_map
[
endpoint
][
"params"
]],
"GradList"
:
[
p
.
name
for
p
in
self
.
param_grad_map
[
endpoint
][
"grads"
]],
"Trainers"
:
self
.
trainers
})
pserver_program
.
sync_with_cpp
()
return
pserver_program
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录