提交 a2c017da 编写于 作者: M minqiyang

1. merge simple_dist_transpiler to distribute_transpiler

2. add align_var_to_block argument to func transpile
3. remove concat and spilt if align_var_to_block is False
4. unittests for simple_dist_transpiler
上级 580340ee
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle.fluid as fluid
import paddle.fluid.core as core
import paddle.fluid.layers as layers
from paddle.fluid.transpiler.distribute_transpiler import delete_ops
import numpy as np
class TestSimpleDistTranspiler(unittest.TestCase):
def setUp(self):
self.trainer_id = 0
self.trainers = 2
self.pservers = 2
self.pserver_eps = "127.0.0.1:6174,127.0.0.1:6175"
self.current_pserver_ep = "127.0.0.1:6175"
def net_conf(self):
x = fluid.layers.data(name='x', shape=[1000], dtype='float32')
y_predict = fluid.layers.fc(input=x,
size=1000,
act=None,
param_attr=fluid.ParamAttr(name='fc_w'))
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
cost = fluid.layers.square_error_cost(input=y_predict, label=y)
avg_cost = fluid.layers.mean(cost)
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.1)
optimize_ops, params_grads = sgd_optimizer.minimize(avg_cost)
return optimize_ops, params_grads
def test_simple_transpiler(self):
np.random.seed(1)
trainer = self.get_trainer()
pserver, startup = self.get_pserver(self.current_pserver_ep)
self.assertEqual([op.type for op in trainer.global_block().ops],
self.get_expect_trainer_ops())
self.assertEqual(len(pserver.blocks), 2)
# block0: listen_and_serv
self.assertEqual([op.type for op in pserver.blocks[0].ops],
["listen_and_serv"])
# block1: optimize pass
self.assertEqual([op.type for op in pserver.blocks[1].ops],
["sum", "scale", "sgd"])
print("xxx", [op.output_arg_names for op in startup.global_block().ops])
# confirm startup program
self.assertEqual([op.type for op in startup.global_block().ops],
["fill_constant", "uniform_random", "uniform_random"])
# the variable #fc_w will NOT be splited
fc_w_var = startup.global_block().var("fc_w@GRAD")
self.assertEqual(fc_w_var.shape, (1000, 1000))
fc_w_var = startup.global_block().var("fc_w@GRAD.trainer_0")
self.assertEqual(fc_w_var.shape, (1000, 1000))
def get_main_program(self):
main = fluid.Program()
with fluid.program_guard(main):
self.net_conf()
return main
def get_expect_trainer_ops(self):
trainer = fluid.Program()
with fluid.program_guard(trainer):
optimize_ops, params_grads = self.net_conf()
delete_ops(trainer.global_block(), optimize_ops)
ops = [op.type for op in trainer.global_block().ops] + [
"send_vars", "send_barrier", "recv", "recv", "fetch_barrier"
]
ops.insert(ops.index("elementwise_add_grad") + 1, "send_vars")
return ops
def get_trainer(self):
return self._transpiler_instance().get_trainer_program()
def get_pserver(self, ep):
t = self._transpiler_instance()
pserver = t.get_pserver_program(ep)
startup = t.get_startup_program(ep, pserver)
return pserver, startup
def _transpiler_instance(self):
main = self.get_main_program()
t = fluid.DistributeTranspiler()
t.transpile(
self.trainer_id,
program=main,
pservers=self.pserver_eps,
trainers=self.trainers,
align_var_to_block=False)
return t
if __name__ == "__main__":
unittest.main()
......@@ -15,6 +15,7 @@
from __future__ import print_function
import math
import numpy as np
from ps_dispatcher import RoundRobin, HashName, PSDispatcher
from .. import core, framework
......@@ -103,7 +104,7 @@ def split_dense_variable(var_list, service_count, min_block_size=8192):
We need to have a minimal block size so that the calculations in
the parameter server side can gain better performance. By default
minimum block size 8K elements (maybe 16bit or 32bit or 64bit).
minimum block size 8K elements (maybe 16bit or 32bit or 64bit).
Args:
var_list (list): List of variables.
......@@ -111,7 +112,7 @@ def split_dense_variable(var_list, service_count, min_block_size=8192):
or more listening ports.
min_block_size (int): Minimum splitted block size.
Returns:
blocks (list[(varname, block_id, current_block_size)]): A list
blocks (list[(varname, block_id, current_block_size)]): A list
of VarBlocks. Each VarBlock specifies a shard of the var.
"""
blocks = []
......@@ -171,6 +172,7 @@ class DistributeTranspiler:
program=None,
pservers="127.0.0.1:6174",
trainers=1,
align_var_to_block=True,
split_method=RoundRobin,
sync_mode=True):
"""
......@@ -183,7 +185,8 @@ class DistributeTranspiler:
parameter servers.
Steps to transpile trainer:
1. split variable to multiple blocks, aligned by product(dim[1:]) (width).
1. split variable to multiple blocks, aligned by product(dim[1:]) (width)
if align_var_to_block is True
2. rename splited grad variables to add trainer_id suffix ".trainer_%d".
3. modify trainer program add split_op to each grad variable.
4. append send_op to send splited variables to server and fetch
......@@ -293,9 +296,18 @@ class DistributeTranspiler:
for index in range(len(self.pserver_endpoints))
]
grad_blocks = split_dense_variable(grad_list, len(pserver_endpoints))
param_blocks = split_dense_variable(param_list, len(pserver_endpoints))
if align_var_to_block:
grad_blocks = split_dense_variable(grad_list,
len(pserver_endpoints))
param_blocks = split_dense_variable(param_list,
len(pserver_endpoints))
else:
# when we do NOT align var to block, we will always split params
# grads into one block.
grad_blocks = split_dense_variable(grad_list, 1)
param_blocks = split_dense_variable(param_list, 1)
assert (len(grad_blocks) == len(param_blocks))
# step2: Create new vars for the parameters and gradients blocks and
# add ops to do the split.
param_var_mapping = self._create_vars_from_blocklist(program,
......@@ -325,8 +337,22 @@ class DistributeTranspiler:
# step 3.1: insert send op to send gradient vars to parameter servers
ps_dispatcher.reset()
send_vars = []
for orig_varname, splited_vars in grad_var_mapping.items():
# in general cases, the number of pservers is times of 2, and this
# will lead to uneven distribution among weights and bias:
# fc_w@GRAD_trainer_0, fc_w@GRAD_trainer_1 --> pserver1
# fc_b@GRAD_trainer_0, fc_b@GRAD_trainer_1 --> pserver2
# shuffle the map will avoid the uneven distribution above
grad_var_mapping_items = grad_var_mapping.items()
if not align_var_to_block:
np.random.shuffle(grad_var_mapping_items)
for orig_varname, splited_vars in grad_var_mapping_items:
eplist = ps_dispatcher.dispatch(splited_vars)
if not align_var_to_block:
assert (len(splited_vars) == 1)
if len(splited_vars) == 1:
orig_varname = splited_vars[0].name
index = find_op_by_output_arg(program.global_block(),
......@@ -374,7 +400,7 @@ class DistributeTranspiler:
for i, ep in enumerate(eplist):
self.param_grad_ep_mapping[ep]["params"].append(recv_vars[i])
self.param_grad_ep_mapping[ep]["grads"].append(send_vars[i])
# step4: Concat the parameters splits together after recv.
for varname, splited_var in param_var_mapping.iteritems():
eps = []
for var in splited_var:
......@@ -399,6 +425,7 @@ class DistributeTranspiler:
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE
})
# step4: Concat the parameters splits together after recv.
for varname, splited_var in param_var_mapping.iteritems():
if len(splited_var) <= 1:
continue
......@@ -849,8 +876,8 @@ class DistributeTranspiler:
program (ProgramDesc): ProgramDesc which gradients blong.
block_list (list[(varname, block_id, block_size)]): List of gradient blocks.
add_trainer_suffix (Bool): Add trainer suffix to new variable's name if set True.
Returns:
var_mapping (dict(varname->[new_varname_variable])):A dict mapping
Returns:
var_mapping (dict(varname->[new_varname_variable])):A dict mapping
from original var name to each var split.
"""
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ..framework import Program, default_main_program, Parameter, Variable
from ..layer_helper import LayerHelper
def hash_name_to_server(params_grads, pserver_endpoints):
"""
:param param_grads:
:return: a map of pserver endpoint ->
params -> [param list]
grads -> [grad list]
"""
def _hash_param(param_name, total):
return hash(param_name) % total
param_grad_map = dict()
for param, grad in params_grads:
if param.trainable is True and grad is not None:
server_id = _hash_param(param.name, len(pserver_endpoints))
server_for_param = pserver_endpoints[server_id]
if not param_grad_map.has_key(server_for_param):
param_grad_map[server_for_param] = {"params": [], "grads": []}
param_grad_map[server_for_param]["params"].append(param)
param_grad_map[server_for_param]["grads"].append(grad)
return param_grad_map
def round_robin(params_grads, pserver_endpoints):
assert (len(params_grads) > len(pserver_endpoints))
param_grad_map = dict()
pserver_idx = 0
for param, grad in params_grads:
if param.trainable is True:
server_for_param = pserver_endpoints[pserver_idx]
if not param_grad_map.has_key(server_for_param):
param_grad_map[server_for_param] = {"params": [], "grads": []}
param_grad_map[server_for_param]["params"].append(param)
param_grad_map[server_for_param]["grads"].append(grad)
pserver_idx += 1
if pserver_idx >= len(pserver_endpoints):
pserver_idx = 0
return param_grad_map
class SimpleDistributeTranspiler:
def transpile(self,
optimize_ops,
params_grads,
program=None,
pservers="127.0.0.1:6174",
trainers=1,
split_method=round_robin):
"""
Transpile the program to a distributed data-parallelism programs.
The main_program will be transform to use a remote parameter server
to do parameter optimization. And the optimization graph will be put
in to a parameter server program.
Use different methods to split trainable varialbles to different
parameter servers.
Example to run:
exe = fluid.Executor(place)
t = fluid.DistributeTranspiler()
t.transpile(optimize_ops, params_grads, pservers="127.0.0.1:6174", trainers=1)
pserver_endpoint = os.getenv("PSERVER")
if pserver_endpoint:
pserver_prog = t.get_pserver_program(pserver_endpoint, optimize_ops)
exe.run(fluid.default_startup_program())
exe.run(pserver_prog)
else:
feeder = fluid.DataFeeder(feed_list=[images, label], place=place)
exe.run(fluid.default_startup_program())
for pass_id in range(PASS_NUM):
...
:param optimize_ops: op list of optimization, should be the
return value of Optimizer.minimize
:type optimize_ops: list
:param program: program to optimize, default default_main_program
:param pservers: parameter server endpoints like "m1:6174,m2:6174"
:type pservers: string
:return: return a list of programs
"""
if program is None:
program = default_main_program()
self.program = program
self.trainers = trainers
self.optimize_ops = optimize_ops
self._optimize_distributed(
optimize_ops,
program,
params_grads,
pservers=pservers,
trainers=trainers,
split_method=split_method)
def _clone_param(self, block, v):
assert isinstance(v, Parameter)
new_p = Parameter(
block=block,
shape=v.shape,
dtype=v.dtype,
type=v.type,
lod_level=v.lod_level,
stop_gradient=v.stop_gradient,
trainable=v.trainable,
optimize_attr=v.optimize_attr,
regularizer=v.regularizer,
name=v.name)
block.vars[new_p.name] = new_p
def _clone_var(self, block, var):
assert isinstance(var, Variable)
return block.create_var(
name=var.name,
shape=var.shape,
dtype=var.dtype,
type=var.type,
lod_level=var.lod_level,
persistable=var.persistable)
def _optimize_distributed(self, optimize_ops, program, params_and_grads,
**kwargs):
if kwargs.has_key("split_method"):
split_method = kwargs["split_method"]
else:
split_method = round_robin
assert (callable(split_method))
pserver_endpoints = kwargs["pservers"].split(",")
self.param_grad_map = split_method(params_and_grads, pserver_endpoints)
send_op_ordered_inputs = []
send_op_ordered_outputs = []
epmap = []
for ep, v in self.param_grad_map.iteritems():
send_op_ordered_inputs.extend(v["grads"])
send_op_ordered_outputs.extend(v["params"])
for i in v["grads"]:
epmap.append(ep)
send_op = program.global_block().append_op(
type="send",
inputs={"X": send_op_ordered_inputs
}, # inputs is a list of tensors to be send
outputs={"Out": send_op_ordered_outputs},
attrs={"endpoints": pserver_endpoints,
"epmap": epmap})
def get_trainer_program(self):
# remove optimize ops and add a send op to main_program
self.program.global_block().delete_ops(self.optimize_ops)
return self.program
def _create_var_for_trainers(self, block, var, trainers):
var_list = []
for i in xrange(trainers):
var_each = block.create_var(
name="%s.trainer_%d" % (var.name, i),
psersistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
var_list.append(var_each)
return var_list
def get_pserver_program(self, endpoint, optimize_ops):
pserver_program = Program()
for v in self.param_grad_map[endpoint]["params"]:
self._clone_param(pserver_program.global_block(), v)
optimize_sub_program = Program()
grad_var_names = [
var.name for var in self.param_grad_map[endpoint]["grads"]
]
for opt_op in optimize_ops:
for _, var in opt_op.inputs.iteritems():
# NOTE: append operators to merge gradients from multiple
# trainers. If trainers == 1, this is not needed.
if self.trainers > 1 and var.name in grad_var_names:
vars2merge = self._create_var_for_trainers(
optimize_sub_program.global_block(), var, self.trainers)
merged_var = optimize_sub_program.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
optimize_sub_program.global_block().append_op(
type="sum",
inputs={"X": vars2merge},
outputs={"Out": merged_var})
optimize_sub_program.global_block().append_op(
type="scale",
inputs={"X": merged_var},
outputs={"Out": merged_var},
attrs={"scale": 1.0 / float(self.trainers)})
else:
optimize_sub_program.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
if opt_op.inputs.has_key("Grad"):
if opt_op.inputs["Grad"].name in grad_var_names:
optimize_sub_program.global_block().append_op(
type=opt_op.type,
inputs=opt_op.inputs,
outputs=opt_op.outputs,
attrs=opt_op.attrs)
else:
optimize_sub_program.global_block().append_op(
type=opt_op.type,
inputs=opt_op.inputs,
outputs=opt_op.outputs,
attrs=opt_op.attrs)
pserver_program.global_block().append_op(
type="recv",
inputs={"RX":
self.param_grad_map[endpoint]["grads"]}, # grads to recv
outputs={},
attrs={
"OptimizeBlock": optimize_sub_program.global_block(),
"endpoint": endpoint,
"ParamList":
[p.name for p in self.param_grad_map[endpoint]["params"]],
"GradList":
[p.name for p in self.param_grad_map[endpoint]["grads"]],
"Trainers": self.trainers
})
pserver_program.sync_with_cpp()
return pserver_program
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册