Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
Paddle
提交
44c64a64
P
Paddle
项目概览
BaiXuePrincess
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
44c64a64
编写于
6月 04, 2018
作者:
W
weixing02
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' of
https://github.com/PaddlePaddle/Paddle
into doc_fix
上级
f1facb7a
bd0f8f8d
变更
8
显示空白变更内容
内联
并排
Showing
8 changed file
with
207 addition
and
333 deletion
+207
-333
python/paddle/fluid/__init__.py
python/paddle/fluid/__init__.py
+2
-2
python/paddle/fluid/tests/unittests/test_dist_transpiler.py
python/paddle/fluid/tests/unittests/test_dist_transpiler.py
+3
-54
python/paddle/fluid/tests/unittests/test_simple_dist_transpiler.py
...ddle/fluid/tests/unittests/test_simple_dist_transpiler.py
+80
-0
python/paddle/fluid/tests/unittests/test_slice_var.py
python/paddle/fluid/tests/unittests/test_slice_var.py
+6
-6
python/paddle/fluid/tests/unittests/transpiler_test.py
python/paddle/fluid/tests/unittests/transpiler_test.py
+73
-0
python/paddle/fluid/transpiler/__init__.py
python/paddle/fluid/transpiler/__init__.py
+2
-3
python/paddle/fluid/transpiler/distribute_transpiler.py
python/paddle/fluid/transpiler/distribute_transpiler.py
+41
-14
python/paddle/fluid/transpiler/distribute_transpiler_simple.py
...n/paddle/fluid/transpiler/distribute_transpiler_simple.py
+0
-254
未找到文件。
python/paddle/fluid/__init__.py
浏览文件 @
44c64a64
...
...
@@ -44,8 +44,8 @@ import transpiler
from
param_attr
import
ParamAttr
,
WeightNormParamAttr
from
data_feeder
import
DataFeeder
from
core
import
LoDTensor
,
CPUPlace
,
CUDAPlace
,
CUDAPinnedPlace
from
transpiler
import
DistributeTranspiler
,
SimpleDistribut
eTranspiler
,
\
InferenceTranspiler
,
memory_optimize
,
release_memory
from
transpiler
import
DistributeTranspiler
,
Inferenc
eTranspiler
,
\
memory_optimize
,
release_memory
from
concurrency
import
(
Go
,
make_channel
,
channel_send
,
channel_recv
,
channel_close
,
Select
)
from
lod_tensor
import
create_lod_tensor
,
create_random_int_lodtensor
...
...
python/paddle/fluid/tests/unittests/test_dist_transpiler.py
浏览文件 @
44c64a64
...
...
@@ -12,40 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import
unittest
import
paddle.fluid
as
fluid
import
paddle.fluid.core
as
core
import
paddle.fluid.layers
as
layers
from
paddle.fluid.transpiler.distribute_transpiler
import
delete_ops
import
numpy
from
transpiler_test
import
TranspilerTest
class
TestDistTranspiler
(
unittest
.
TestCase
):
class
TestDistTranspiler
(
TranspilerTest
):
def
setUp
(
self
):
self
.
trainer_id
=
0
self
.
trainers
=
2
self
.
pservers
=
2
self
.
pserver_eps
=
"127.0.0.1:6174,127.0.0.1:6175"
self
.
current_pserver_ep
=
"127.0.0.1:6174"
def
net_conf
(
self
):
x
=
fluid
.
layers
.
data
(
name
=
'x'
,
shape
=
[
1000
],
dtype
=
'float32'
)
y_predict
=
fluid
.
layers
.
fc
(
input
=
x
,
size
=
1000
,
act
=
None
,
param_attr
=
fluid
.
ParamAttr
(
name
=
'fc_w'
))
y
=
fluid
.
layers
.
data
(
name
=
'y'
,
shape
=
[
1
],
dtype
=
'float32'
)
cost
=
fluid
.
layers
.
square_error_cost
(
input
=
y_predict
,
label
=
y
)
avg_cost
=
fluid
.
layers
.
mean
(
cost
)
sgd_optimizer
=
fluid
.
optimizer
.
SGD
(
learning_rate
=
0.1
)
optimize_ops
,
params_grads
=
sgd_optimizer
.
minimize
(
avg_cost
)
return
optimize_ops
,
params_grads
def
test_transpiler
(
self
):
trainer
=
self
.
get_trainer
()
pserver
,
startup
=
self
.
get_pserver
(
self
.
current_pserver_ep
)
...
...
@@ -70,14 +46,6 @@ class TestDistTranspiler(unittest.TestCase):
fc_w_var
=
startup
.
global_block
().
var
(
"fc_w.block1"
)
self
.
assertEqual
(
fc_w_var
.
shape
,
(
500
,
1000
))
def
get_main_program
(
self
):
main
=
fluid
.
Program
()
with
fluid
.
program_guard
(
main
):
self
.
net_conf
()
return
main
def
get_expect_trainer_ops
(
self
):
trainer
=
fluid
.
Program
()
...
...
@@ -92,25 +60,6 @@ class TestDistTranspiler(unittest.TestCase):
ops
.
insert
(
ops
.
index
(
"elementwise_add_grad"
)
+
1
,
"send_vars"
)
return
ops
def
get_trainer
(
self
):
return
self
.
_transpiler_instance
().
get_trainer_program
()
def
get_pserver
(
self
,
ep
):
t
=
self
.
_transpiler_instance
()
pserver
=
t
.
get_pserver_program
(
ep
)
startup
=
t
.
get_startup_program
(
ep
,
pserver
)
return
pserver
,
startup
def
_transpiler_instance
(
self
):
main
=
self
.
get_main_program
()
t
=
fluid
.
DistributeTranspiler
()
t
.
transpile
(
self
.
trainer_id
,
program
=
main
,
pservers
=
self
.
pserver_eps
,
trainers
=
self
.
trainers
)
return
t
if
__name__
==
"__main__"
:
unittest
.
main
()
python/paddle/fluid/tests/unittests/test_simple_dist_transpiler.py
0 → 100644
浏览文件 @
44c64a64
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
numpy
as
np
import
paddle.fluid
as
fluid
from
paddle.fluid.transpiler.distribute_transpiler
import
delete_ops
from
transpiler_test
import
TranspilerTest
class
TestSimpleDistTranspiler
(
TranspilerTest
):
def
setUp
(
self
):
self
.
current_pserver_ep
=
"127.0.0.1:6175"
def
test_simple_transpiler
(
self
):
np
.
random
.
seed
(
1
)
trainer
=
self
.
get_trainer
()
pserver
,
startup
=
self
.
get_pserver
(
self
.
current_pserver_ep
)
self
.
assertEqual
([
op
.
type
for
op
in
trainer
.
global_block
().
ops
],
self
.
get_expect_trainer_ops
())
self
.
assertEqual
(
len
(
pserver
.
blocks
),
2
)
# block0: listen_and_serv
self
.
assertEqual
([
op
.
type
for
op
in
pserver
.
blocks
[
0
].
ops
],
[
"listen_and_serv"
])
# block1: optimize pass
self
.
assertEqual
([
op
.
type
for
op
in
pserver
.
blocks
[
1
].
ops
],
[
"sum"
,
"scale"
,
"sgd"
])
# confirm startup program
self
.
assertEqual
([
op
.
type
for
op
in
startup
.
global_block
().
ops
],
[
"fill_constant"
,
"uniform_random"
,
"uniform_random"
])
# the variable #fc_w will NOT be splited
fc_w_var
=
startup
.
global_block
().
var
(
"fc_w@GRAD"
)
self
.
assertEqual
(
fc_w_var
.
shape
,
(
1000
,
1000
))
fc_w_var
=
startup
.
global_block
().
var
(
"fc_w@GRAD.trainer_0"
)
self
.
assertEqual
(
fc_w_var
.
shape
,
(
1000
,
1000
))
def
get_expect_trainer_ops
(
self
):
trainer
=
fluid
.
Program
()
with
fluid
.
program_guard
(
trainer
):
optimize_ops
,
params_grads
=
self
.
net_conf
()
delete_ops
(
trainer
.
global_block
(),
optimize_ops
)
ops
=
[
op
.
type
for
op
in
trainer
.
global_block
().
ops
]
+
[
"send_vars"
,
"send_barrier"
,
"recv"
,
"recv"
,
"fetch_barrier"
]
ops
.
insert
(
ops
.
index
(
"elementwise_add_grad"
)
+
1
,
"send_vars"
)
return
ops
def
_transpiler_instance
(
self
):
main
=
self
.
get_main_program
()
t
=
fluid
.
DistributeTranspiler
()
t
.
transpile
(
self
.
trainer_id
,
program
=
main
,
pservers
=
self
.
pserver_eps
,
trainers
=
self
.
trainers
,
slice_var_up
=
False
)
return
t
if
__name__
==
"__main__"
:
unittest
.
main
()
python/paddle/fluid/tests/unittests/test_s
plit
_var.py
→
python/paddle/fluid/tests/unittests/test_s
lice
_var.py
浏览文件 @
44c64a64
...
...
@@ -14,14 +14,14 @@
import
math
import
unittest
from
paddle.fluid.transpiler.distribute_transpiler
import
s
plit
_variable
from
paddle.fluid.transpiler.distribute_transpiler
import
s
lice
_variable
import
paddle.fluid
as
fluid
import
paddle.fluid.core
as
core
import
random
class
TestS
plit
Var
(
unittest
.
TestCase
):
def
check_s
plit
_output
(
self
,
shapes
,
expected_sizes
,
min_size
):
class
TestS
lice
Var
(
unittest
.
TestCase
):
def
check_s
lice
_output
(
self
,
shapes
,
expected_sizes
,
min_size
):
var_list
=
[]
program
=
fluid
.
Program
()
for
shape
in
shapes
:
...
...
@@ -31,7 +31,7 @@ class TestSplitVar(unittest.TestCase):
# dtype=core.VarDesc.VarType.LOD_TENSOR,
shape
=
shape
)
var_list
.
append
(
var
)
blocks
=
s
plit
_variable
(
var_list
,
10
,
min_size
)
blocks
=
s
lice
_variable
(
var_list
,
10
,
min_size
)
all_sizes
=
[]
for
s
in
expected_sizes
:
for
s2
in
s
:
...
...
@@ -49,7 +49,7 @@ class TestSplitVar(unittest.TestCase):
[
1150
,
1150
,
1150
,
1150
,
1150
,
1150
,
1100
]
]
self
.
check_s
plit
_output
(
shapes
,
expected_sizes
,
1024
)
self
.
check_s
lice
_output
(
shapes
,
expected_sizes
,
1024
)
def
test_check_output_8k
(
self
):
shapes
=
[[
3
,
5
],
[
1024
],
[
28
,
784
],
[
8
,
1020
],
[
800
,
10
],
...
...
@@ -57,7 +57,7 @@ class TestSplitVar(unittest.TestCase):
expected_sizes
=
[[
15
],
[
1024
],
[
10976
,
10976
],
[
8160
],
[
8000
],
[
35937
,
35937
,
35937
,
35937
,
35937
,
35937
]]
self
.
check_s
plit
_output
(
shapes
,
expected_sizes
,
8192
)
self
.
check_s
lice
_output
(
shapes
,
expected_sizes
,
8192
)
if
__name__
==
'__main__'
:
...
...
python/paddle/fluid/tests/unittests/transpiler_test.py
0 → 100644
浏览文件 @
44c64a64
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
unittest
import
numpy
as
np
import
paddle.fluid
as
fluid
import
paddle.fluid.core
as
core
import
paddle.fluid.layers
as
layers
class
TranspilerTest
(
unittest
.
TestCase
):
@
classmethod
def
setUpClass
(
self
):
self
.
trainer_id
=
0
self
.
trainers
=
2
self
.
pservers
=
2
self
.
pserver_eps
=
"127.0.0.1:6174,127.0.0.1:6175"
def
net_conf
(
self
):
x
=
fluid
.
layers
.
data
(
name
=
'x'
,
shape
=
[
1000
],
dtype
=
'float32'
)
y_predict
=
fluid
.
layers
.
fc
(
input
=
x
,
size
=
1000
,
act
=
None
,
param_attr
=
fluid
.
ParamAttr
(
name
=
'fc_w'
))
y
=
fluid
.
layers
.
data
(
name
=
'y'
,
shape
=
[
1
],
dtype
=
'float32'
)
cost
=
fluid
.
layers
.
square_error_cost
(
input
=
y_predict
,
label
=
y
)
avg_cost
=
fluid
.
layers
.
mean
(
cost
)
sgd_optimizer
=
fluid
.
optimizer
.
SGD
(
learning_rate
=
0.1
)
optimize_ops
,
params_grads
=
sgd_optimizer
.
minimize
(
avg_cost
)
return
optimize_ops
,
params_grads
def
get_main_program
(
self
):
main
=
fluid
.
Program
()
with
fluid
.
program_guard
(
main
):
self
.
net_conf
()
return
main
def
get_trainer
(
self
):
return
self
.
_transpiler_instance
().
get_trainer_program
()
def
get_pserver
(
self
,
ep
):
t
=
self
.
_transpiler_instance
()
pserver
=
t
.
get_pserver_program
(
ep
)
startup
=
t
.
get_startup_program
(
ep
,
pserver
)
return
pserver
,
startup
def
_transpiler_instance
(
self
):
main
=
self
.
get_main_program
()
t
=
fluid
.
DistributeTranspiler
()
t
.
transpile
(
self
.
trainer_id
,
program
=
main
,
pservers
=
self
.
pserver_eps
,
trainers
=
self
.
trainers
)
return
t
python/paddle/fluid/transpiler/__init__.py
浏览文件 @
44c64a64
...
...
@@ -15,10 +15,9 @@
from
distribute_transpiler
import
DistributeTranspiler
from
inference_transpiler
import
InferenceTranspiler
from
memory_optimization_transpiler
import
memory_optimize
,
release_memory
from
distribute_transpiler_simple
import
SimpleDistributeTranspiler
from
ps_dispatcher
import
HashName
,
RoundRobin
__all__
=
[
"DistributeTranspiler"
,
"InferenceTranspiler"
,
"
SimpleDistributeTranspiler
"
,
"
memory_optimize"
,
"
release_memory"
,
"HashName"
,
"RoundRobin"
"DistributeTranspiler"
,
"InferenceTranspiler"
,
"
memory_optimize
"
,
"release_memory"
,
"HashName"
,
"RoundRobin"
]
python/paddle/fluid/transpiler/distribute_transpiler.py
浏览文件 @
44c64a64
...
...
@@ -39,6 +39,7 @@ Steps to transpile pserver:
from
__future__
import
print_function
import
math
import
numpy
as
np
from
ps_dispatcher
import
RoundRobin
,
HashName
,
PSDispatcher
from
..
import
core
,
framework
...
...
@@ -70,7 +71,7 @@ def same_or_split_var(p_name, var_name):
return
p_name
==
var_name
or
p_name
.
startswith
(
var_name
+
".block"
)
def
s
plit_variable
(
var_list
,
serv
ice_count
,
min_block_size
=
8192
):
def
s
lice_variable
(
var_list
,
sl
ice_count
,
min_block_size
=
8192
):
"""
We may need to split dense tensor to one or more blocks and put
them equally onto parameter server. One block is a sub-tensor
...
...
@@ -82,8 +83,8 @@ def split_variable(var_list, service_count, min_block_size=8192):
Args:
var_list (list): List of variables.
s
ervice_count (int): Numel of pserver services. A pserver may have two
or more listening ports
.
s
lice_count (int): Numel of count that variables will be sliced, which
could be the pserver services' count
.
min_block_size (int): Minimum splitted block size.
Returns:
blocks (list[(varname, block_id, current_block_size)]): A list
...
...
@@ -91,12 +92,12 @@ def split_variable(var_list, service_count, min_block_size=8192):
"""
blocks
=
[]
for
var
in
var_list
:
split_count
=
s
erv
ice_count
split_count
=
s
l
ice_count
var_numel
=
reduce
(
lambda
x
,
y
:
x
*
y
,
var
.
shape
)
max_pserver_count
=
int
(
math
.
floor
(
var_numel
/
float
(
min_block_size
)))
if
max_pserver_count
==
0
:
max_pserver_count
=
1
if
max_pserver_count
<
s
erv
ice_count
:
if
max_pserver_count
<
s
l
ice_count
:
split_count
=
max_pserver_count
block_size
=
int
(
math
.
ceil
(
var_numel
/
float
(
split_count
)))
...
...
@@ -177,7 +178,7 @@ class DistributeTranspiler:
for
index
in
range
(
len
(
self
.
pserver_endpoints
))
]
def
_init_splited_vars
(
self
,
s
plit_method
):
def
_init_splited_vars
(
self
,
s
lice_var_up
):
# update these mappings for further transpile:
# 1. param_var_mapping: param var name -> [splited params vars]
# 2. grad_var_mapping: grad var name -> [splited grads vars]
...
...
@@ -196,9 +197,19 @@ class DistributeTranspiler:
self
.
_update_dist_lookup_table_vars
(
param_list
,
grad_list
,
self
.
params_grads
)
grad_blocks
=
split_variable
(
grad_list
,
len
(
self
.
pserver_endpoints
))
param_blocks
=
split_variable
(
param_list
,
len
(
self
.
pserver_endpoints
))
if
slice_var_up
:
# when we slice var up into blocks, we will slice the var according to
# pserver services' count. A pserver may have two or more listening ports.
grad_blocks
=
slice_variable
(
grad_list
,
len
(
self
.
pserver_endpoints
))
param_blocks
=
slice_variable
(
param_list
,
len
(
self
.
pserver_endpoints
))
else
:
# when we do NOT slice var up into blocks, we will always slice params
# grads into one block.
grad_blocks
=
slice_variable
(
grad_list
,
1
)
param_blocks
=
slice_variable
(
param_list
,
1
)
assert
(
len
(
grad_blocks
)
==
len
(
param_blocks
))
# origin_varname -> [splited_var]
self
.
param_var_mapping
=
self
.
_create_vars_from_blocklist
(
self
.
origin_program
,
param_blocks
)
...
...
@@ -229,6 +240,7 @@ class DistributeTranspiler:
program
=
None
,
pservers
=
"127.0.0.1:6174"
,
trainers
=
1
,
slice_var_up
=
True
,
split_method
=
RoundRobin
,
sync_mode
=
True
):
"""
...
...
@@ -262,13 +274,27 @@ class DistributeTranspiler:
self
.
has_distributed_lookup_table
=
self
.
_has_distributed_lookup_table
()
# split and create vars, then put splited vars in dicts for later use.
self
.
_init_splited_vars
(
s
plit_method
)
self
.
_init_splited_vars
(
s
lice_var_up
)
# step 3.1: insert send op to send gradient vars to parameter servers
ps_dispatcher
.
reset
()
send_vars
=
[]
for
orig_varname
,
splited_vars
in
self
.
grad_var_mapping
.
items
():
# in general cases, the number of pservers is times of 2, and this
# will lead to uneven distribution among weights and bias:
# fc_w@GRAD_trainer_0, fc_w@GRAD_trainer_1 --> pserver1
# fc_b@GRAD_trainer_0, fc_b@GRAD_trainer_1 --> pserver2
# shuffle the map will avoid the uneven distribution above
grad_var_mapping_items
=
self
.
grad_var_mapping
.
items
()
if
not
slice_var_up
:
np
.
random
.
shuffle
(
grad_var_mapping_items
)
for
orig_varname
,
splited_vars
in
grad_var_mapping_items
:
eplist
=
ps_dispatcher
.
dispatch
(
splited_vars
)
if
not
slice_var_up
:
assert
(
len
(
splited_vars
)
==
1
)
if
len
(
splited_vars
)
==
1
:
orig_varname
=
splited_vars
[
0
].
name
index
=
find_op_by_output_arg
(
program
.
global_block
(),
...
...
@@ -316,6 +342,7 @@ class DistributeTranspiler:
for
i
,
ep
in
enumerate
(
eplist
):
self
.
param_grad_ep_mapping
[
ep
][
"params"
].
append
(
recv_vars
[
i
])
self
.
param_grad_ep_mapping
[
ep
][
"grads"
].
append
(
send_vars
[
i
])
# step4: Concat the parameters splits together after recv.
for
varname
,
splited_var
in
self
.
param_var_mapping
.
iteritems
():
eps
=
[]
...
...
python/paddle/fluid/transpiler/distribute_transpiler_simple.py
已删除
100644 → 0
浏览文件 @
f1facb7a
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
..framework
import
Program
,
default_main_program
,
Parameter
,
Variable
from
..layer_helper
import
LayerHelper
def
hash_name_to_server
(
params_grads
,
pserver_endpoints
):
"""
:param param_grads:
:return: a map of pserver endpoint ->
params -> [param list]
grads -> [grad list]
"""
def
_hash_param
(
param_name
,
total
):
return
hash
(
param_name
)
%
total
param_grad_map
=
dict
()
for
param
,
grad
in
params_grads
:
if
param
.
trainable
is
True
and
grad
is
not
None
:
server_id
=
_hash_param
(
param
.
name
,
len
(
pserver_endpoints
))
server_for_param
=
pserver_endpoints
[
server_id
]
if
not
param_grad_map
.
has_key
(
server_for_param
):
param_grad_map
[
server_for_param
]
=
{
"params"
:
[],
"grads"
:
[]}
param_grad_map
[
server_for_param
][
"params"
].
append
(
param
)
param_grad_map
[
server_for_param
][
"grads"
].
append
(
grad
)
return
param_grad_map
def
round_robin
(
params_grads
,
pserver_endpoints
):
assert
(
len
(
params_grads
)
>
len
(
pserver_endpoints
))
param_grad_map
=
dict
()
pserver_idx
=
0
for
param
,
grad
in
params_grads
:
if
param
.
trainable
is
True
:
server_for_param
=
pserver_endpoints
[
pserver_idx
]
if
not
param_grad_map
.
has_key
(
server_for_param
):
param_grad_map
[
server_for_param
]
=
{
"params"
:
[],
"grads"
:
[]}
param_grad_map
[
server_for_param
][
"params"
].
append
(
param
)
param_grad_map
[
server_for_param
][
"grads"
].
append
(
grad
)
pserver_idx
+=
1
if
pserver_idx
>=
len
(
pserver_endpoints
):
pserver_idx
=
0
return
param_grad_map
class
SimpleDistributeTranspiler
:
def
transpile
(
self
,
optimize_ops
,
params_grads
,
program
=
None
,
pservers
=
"127.0.0.1:6174"
,
trainers
=
1
,
split_method
=
round_robin
):
"""
Transpile the program to a distributed data-parallelism programs.
The main_program will be transform to use a remote parameter server
to do parameter optimization. And the optimization graph will be put
in to a parameter server program.
Use different methods to split trainable varialbles to different
parameter servers.
Example to run:
exe = fluid.Executor(place)
t = fluid.DistributeTranspiler()
t.transpile(optimize_ops, params_grads, pservers="127.0.0.1:6174", trainers=1)
pserver_endpoint = os.getenv("PSERVER")
if pserver_endpoint:
pserver_prog = t.get_pserver_program(pserver_endpoint, optimize_ops)
exe.run(fluid.default_startup_program())
exe.run(pserver_prog)
else:
feeder = fluid.DataFeeder(feed_list=[images, label], place=place)
exe.run(fluid.default_startup_program())
for pass_id in range(PASS_NUM):
...
:param optimize_ops: op list of optimization, should be the
return value of Optimizer.minimize
:type optimize_ops: list
:param program: program to optimize, default default_main_program
:param pservers: parameter server endpoints like "m1:6174,m2:6174"
:type pservers: string
:return: return a list of programs
"""
if
program
is
None
:
program
=
default_main_program
()
self
.
program
=
program
self
.
trainers
=
trainers
self
.
optimize_ops
=
optimize_ops
self
.
_optimize_distributed
(
optimize_ops
,
program
,
params_grads
,
pservers
=
pservers
,
trainers
=
trainers
,
split_method
=
split_method
)
def
_clone_param
(
self
,
block
,
v
):
assert
isinstance
(
v
,
Parameter
)
new_p
=
Parameter
(
block
=
block
,
shape
=
v
.
shape
,
dtype
=
v
.
dtype
,
type
=
v
.
type
,
lod_level
=
v
.
lod_level
,
stop_gradient
=
v
.
stop_gradient
,
trainable
=
v
.
trainable
,
optimize_attr
=
v
.
optimize_attr
,
regularizer
=
v
.
regularizer
,
name
=
v
.
name
)
block
.
vars
[
new_p
.
name
]
=
new_p
def
_clone_var
(
self
,
block
,
var
):
assert
isinstance
(
var
,
Variable
)
return
block
.
create_var
(
name
=
var
.
name
,
shape
=
var
.
shape
,
dtype
=
var
.
dtype
,
type
=
var
.
type
,
lod_level
=
var
.
lod_level
,
persistable
=
var
.
persistable
)
def
_optimize_distributed
(
self
,
optimize_ops
,
program
,
params_and_grads
,
**
kwargs
):
if
kwargs
.
has_key
(
"split_method"
):
split_method
=
kwargs
[
"split_method"
]
else
:
split_method
=
round_robin
assert
(
callable
(
split_method
))
pserver_endpoints
=
kwargs
[
"pservers"
].
split
(
","
)
self
.
param_grad_map
=
split_method
(
params_and_grads
,
pserver_endpoints
)
send_op_ordered_inputs
=
[]
send_op_ordered_outputs
=
[]
epmap
=
[]
for
ep
,
v
in
self
.
param_grad_map
.
iteritems
():
send_op_ordered_inputs
.
extend
(
v
[
"grads"
])
send_op_ordered_outputs
.
extend
(
v
[
"params"
])
for
i
in
v
[
"grads"
]:
epmap
.
append
(
ep
)
send_op
=
program
.
global_block
().
append_op
(
type
=
"send"
,
inputs
=
{
"X"
:
send_op_ordered_inputs
},
# inputs is a list of tensors to be send
outputs
=
{
"Out"
:
send_op_ordered_outputs
},
attrs
=
{
"endpoints"
:
pserver_endpoints
,
"epmap"
:
epmap
})
def
get_trainer_program
(
self
):
# remove optimize ops and add a send op to main_program
self
.
program
.
global_block
().
delete_ops
(
self
.
optimize_ops
)
return
self
.
program
def
_create_var_for_trainers
(
self
,
block
,
var
,
trainers
):
var_list
=
[]
for
i
in
xrange
(
trainers
):
var_each
=
block
.
create_var
(
name
=
"%s.trainer_%d"
%
(
var
.
name
,
i
),
psersistable
=
var
.
persistable
,
dtype
=
var
.
dtype
,
shape
=
var
.
shape
)
var_list
.
append
(
var_each
)
return
var_list
def
get_pserver_program
(
self
,
endpoint
,
optimize_ops
):
pserver_program
=
Program
()
for
v
in
self
.
param_grad_map
[
endpoint
][
"params"
]:
self
.
_clone_param
(
pserver_program
.
global_block
(),
v
)
optimize_sub_program
=
Program
()
grad_var_names
=
[
var
.
name
for
var
in
self
.
param_grad_map
[
endpoint
][
"grads"
]
]
for
opt_op
in
optimize_ops
:
for
_
,
var
in
opt_op
.
inputs
.
iteritems
():
# NOTE: append operators to merge gradients from multiple
# trainers. If trainers == 1, this is not needed.
if
self
.
trainers
>
1
and
var
.
name
in
grad_var_names
:
vars2merge
=
self
.
_create_var_for_trainers
(
optimize_sub_program
.
global_block
(),
var
,
self
.
trainers
)
merged_var
=
optimize_sub_program
.
global_block
().
create_var
(
name
=
var
.
name
,
persistable
=
var
.
persistable
,
dtype
=
var
.
dtype
,
shape
=
var
.
shape
)
optimize_sub_program
.
global_block
().
append_op
(
type
=
"sum"
,
inputs
=
{
"X"
:
vars2merge
},
outputs
=
{
"Out"
:
merged_var
})
optimize_sub_program
.
global_block
().
append_op
(
type
=
"scale"
,
inputs
=
{
"X"
:
merged_var
},
outputs
=
{
"Out"
:
merged_var
},
attrs
=
{
"scale"
:
1.0
/
float
(
self
.
trainers
)})
else
:
optimize_sub_program
.
global_block
().
create_var
(
name
=
var
.
name
,
persistable
=
var
.
persistable
,
dtype
=
var
.
dtype
,
shape
=
var
.
shape
)
if
opt_op
.
inputs
.
has_key
(
"Grad"
):
if
opt_op
.
inputs
[
"Grad"
].
name
in
grad_var_names
:
optimize_sub_program
.
global_block
().
append_op
(
type
=
opt_op
.
type
,
inputs
=
opt_op
.
inputs
,
outputs
=
opt_op
.
outputs
,
attrs
=
opt_op
.
attrs
)
else
:
optimize_sub_program
.
global_block
().
append_op
(
type
=
opt_op
.
type
,
inputs
=
opt_op
.
inputs
,
outputs
=
opt_op
.
outputs
,
attrs
=
opt_op
.
attrs
)
pserver_program
.
global_block
().
append_op
(
type
=
"recv"
,
inputs
=
{
"RX"
:
self
.
param_grad_map
[
endpoint
][
"grads"
]},
# grads to recv
outputs
=
{},
attrs
=
{
"OptimizeBlock"
:
optimize_sub_program
.
global_block
(),
"endpoint"
:
endpoint
,
"ParamList"
:
[
p
.
name
for
p
in
self
.
param_grad_map
[
endpoint
][
"params"
]],
"GradList"
:
[
p
.
name
for
p
in
self
.
param_grad_map
[
endpoint
][
"grads"
]],
"Trainers"
:
self
.
trainers
})
pserver_program
.
sync_with_cpp
()
return
pserver_program
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录