提交 a7522361 编写于 作者: L Luo Tao

Merge branch 'develop' into demo

......@@ -46,7 +46,7 @@ from param_attr import ParamAttr, WeightNormParamAttr
from data_feeder import DataFeeder
from core import LoDTensor, LoDTensorArray, CPUPlace, CUDAPlace, CUDAPinnedPlace, Scope
from transpiler import DistributeTranspiler, InferenceTranspiler, \
memory_optimize, release_memory
memory_optimize, release_memory, DistributeTranspilerConfig
from concurrency import (Go, make_channel, channel_send, channel_recv,
channel_close, Select)
from lod_tensor import create_lod_tensor, create_random_int_lodtensor
......
......@@ -27,7 +27,6 @@ class TranspilerTest(unittest.TestCase):
self.pserver_eps = "127.0.0.1:6174,127.0.0.1:6175"
self.pserver1_ep = "127.0.0.1:6174"
self.pserver2_ep = "127.0.0.1:6175"
self.slice_var_up = True
self.sync_mode = True
self.transpiler = None
......@@ -52,27 +51,26 @@ class TranspilerTest(unittest.TestCase):
self.origin_prog = main.clone()
return main
def get_trainer(self):
t = self._transpiler_instance()
def get_trainer(self, config=None):
t = self._transpiler_instance(config)
return t.get_trainer_program()
def get_pserver(self, ep):
t = self._transpiler_instance()
def get_pserver(self, ep, config=None):
t = self._transpiler_instance(config)
pserver = t.get_pserver_program(ep)
startup = t.get_startup_program(ep, pserver)
return pserver, startup
def _transpiler_instance(self):
def _transpiler_instance(self, config=None):
if not self.transpiler:
main = self.get_main_program()
self.transpiler = fluid.DistributeTranspiler()
self.transpiler = fluid.DistributeTranspiler(config=config)
self.transpiler.transpile(
self.trainer_id,
program=main,
pservers=self.pserver_eps,
trainers=self.trainers,
slice_var_up=self.slice_var_up,
sync_mode=self.sync_mode)
trainers=self.trainers)
return self.transpiler
......@@ -124,14 +122,67 @@ class TestBasicModel(TranspilerTest):
self.assertEqual(set(pserver_params), set(trainer_params))
class TestBasicModelWithLargeBlockSize(TranspilerTest):
def test_transpiler(self):
config = fluid.DistributeTranspilerConfig()
config.min_block_size = 1048576
pserver, startup = self.get_pserver(self.pserver1_ep, config)
pserver2, startup2 = self.get_pserver(self.pserver2_ep, config)
trainer = self.get_trainer(config)
self.assertEqual([op.type for op in trainer.global_block().ops], [
'mul', 'elementwise_add', 'elementwise_sub', 'square', 'mean',
'fill_constant', 'mean_grad', 'square_grad', 'elementwise_sub_grad',
'elementwise_add_grad', 'send', 'mul_grad', 'send', 'send_barrier',
'recv', 'recv', 'fetch_barrier'
])
self.assertEqual(len(pserver.blocks), 2)
# block0: listen_and_serv
self.assertEqual([op.type for op in pserver.blocks[0].ops],
["listen_and_serv"])
# block1~2: optimize pass
self.assertEqual([op.type for op in pserver.blocks[1].ops],
["sum", "scale", "sgd"])
# confirm startup program
self.assertEqual([op.type for op in startup.global_block().ops],
["fill_constant", "fill_constant", "fill_constant"])
# the variable #fc_w will be split into two blocks
fc_w_var = startup2.global_block().var("fc_w")
self.assertEqual(fc_w_var.shape, (1000L, 1000L))
# all parameters should be optimized on pserver
pserver_params = []
for prog in [pserver, pserver2]:
for blk in prog.blocks:
for op in blk.ops:
if "Param" in op.input_names:
param_name = op.input("Param")[0]
is_block_idx = param_name.find(".block")
if is_block_idx != -1:
origin_param_name = param_name[:is_block_idx]
else:
origin_param_name = param_name
pserver_params.append(origin_param_name)
trainer_params = []
for op in self.origin_prog.global_block().ops:
if "Param" in op.input_names:
trainer_params.append(op.input("Param")[0])
self.assertEqual(set(pserver_params), set(trainer_params))
class TestNoSliceVar(TranspilerTest):
def setUp(self):
super(TestNoSliceVar, self).setUp()
self.slice_var_up = False
def test_transpiler(self):
_, startup = self.get_pserver(self.pserver1_ep)
_, startup2 = self.get_pserver(self.pserver2_ep)
config = fluid.DistributeTranspilerConfig()
config.slice_var_up = False
_, startup = self.get_pserver(self.pserver1_ep, config)
_, startup2 = self.get_pserver(self.pserver2_ep, config)
if startup.global_block().vars.has_key("fc_w"):
fc_w_var = startup.global_block().vars["fc_w"]
......
......@@ -12,12 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from distribute_transpiler import DistributeTranspiler
from distribute_transpiler import DistributeTranspiler, DistributeTranspilerConfig
from inference_transpiler import InferenceTranspiler
from memory_optimization_transpiler import memory_optimize, release_memory
from ps_dispatcher import HashName, RoundRobin
__all__ = [
"DistributeTranspiler", "InferenceTranspiler", "memory_optimize",
"release_memory", "HashName", "RoundRobin"
"release_memory", "HashName", "RoundRobin", "DistributeTranspilerConfig"
]
......@@ -64,7 +64,7 @@ def same_or_split_var(p_name, var_name):
return p_name == var_name or p_name.startswith(var_name + ".block")
def slice_variable(var_list, slice_count, min_block_size=8192):
def slice_variable(var_list, slice_count, min_block_size):
"""
We may need to split dense tensor to one or more blocks and put
them equally onto parameter server. One block is a sub-tensor
......@@ -110,6 +110,22 @@ def slice_variable(var_list, slice_count, min_block_size=8192):
return blocks
class DistributeTranspilerConfig(object):
"""
slice_var_up (bool): Do Tensor slice for pservers, default is True.
split_method (PSDispatcher): RoundRobin or HashName can be used
try to choose the best method to balance loads for pservers.
min_block_size (int): Minimum splitted element number in block.
According:https://github.com/PaddlePaddle/Paddle/issues/8638#issuecomment-369912156
We can use bandwidth effiently when data size is larger than 2MB.If you
want to change it, please be sure you see the slice_variable function.
"""
slice_var_up = True
split_method = None
min_block_size = 8192
class DistributeTranspiler(object):
"""
**DistributeTranspiler**
......@@ -146,13 +162,23 @@ class DistributeTranspiler(object):
trainer_program = t.get_trainer_program()
"""
def __init__(self, config=None):
if config is not None:
self.config = config
else:
self.config = DistributeTranspilerConfig()
if self.config.split_method is None:
self.config.split_method = RoundRobin
assert (self.config.min_block_size >= 8192)
assert (self.config.split_method.__bases__[0] == PSDispatcher)
def transpile(self,
trainer_id,
program=None,
pservers="127.0.0.1:6174",
trainers=1,
slice_var_up=True,
split_method=RoundRobin,
sync_mode=True):
"""
Run the transpiler.
......@@ -165,12 +191,8 @@ class DistributeTranspiler(object):
pservers (str): comma separated ip:port string for the pserver
list.
trainers (int): number of trainers in the distributed job.
slice_var_up (bool): Do Tensor slice for pservers, default is True.
split_method (PSDispatcher): RoundRobin or HashName can be used
try to choose the best method to balance loads for pservers.
sync_mode (bool): Do sync training or not, default is True.
"""
assert (split_method.__bases__[0] == PSDispatcher)
if program is None:
program = default_main_program()
self.origin_program = program
......@@ -181,11 +203,11 @@ class DistributeTranspiler(object):
self.pserver_endpoints = pserver_endpoints
self.optimize_ops, self.params_grads = self._get_optimize_pass()
ps_dispatcher = split_method(self.pserver_endpoints)
ps_dispatcher = self.config.split_method(self.pserver_endpoints)
self.has_distributed_lookup_table = self._has_distributed_lookup_table()
# split and create vars, then put splited vars in dicts for later use.
self._init_splited_vars(slice_var_up)
self._init_splited_vars()
# step 3.1: insert send op to send gradient vars to parameter servers
ps_dispatcher.reset()
......@@ -197,14 +219,14 @@ class DistributeTranspiler(object):
# fc_b@GRAD_trainer_0, fc_b@GRAD_trainer_1 --> pserver2
# shuffle the map will avoid the uneven distribution above
grad_var_mapping_items = self.grad_var_mapping.items()
if not slice_var_up:
if not self.config.slice_var_up:
random.seed(self.trainer_num)
random.shuffle(grad_var_mapping_items)
for orig_varname, splited_vars in grad_var_mapping_items:
eplist = ps_dispatcher.dispatch(splited_vars)
if not slice_var_up:
if not self.config.slice_var_up:
assert (len(splited_vars) == 1)
if len(splited_vars) == 1:
......@@ -627,7 +649,7 @@ class DistributeTranspiler(object):
]
return param_list, grad_list
def _init_splited_vars(self, slice_var_up):
def _init_splited_vars(self):
# update these mappings for further transpile:
# 1. param_var_mapping: param var name -> [splited params vars]
# 2. grad_var_mapping: grad var name -> [splited grads vars]
......@@ -651,17 +673,22 @@ class DistributeTranspiler(object):
param_list, grad_list = self._update_dist_lookup_table_vars(
param_list, grad_list, self.params_grads)
if slice_var_up:
if self.config.slice_var_up:
# when we slice var up into blocks, we will slice the var according to
# pserver services' count. A pserver may have two or more listening ports.
grad_blocks = slice_variable(grad_list, len(self.pserver_endpoints))
grad_blocks = slice_variable(grad_list,
len(self.pserver_endpoints),
self.config.min_block_size)
param_blocks = slice_variable(param_list,
len(self.pserver_endpoints))
len(self.pserver_endpoints),
self.config.min_block_size)
else:
# when we do NOT slice var up into blocks, we will always slice params
# grads into one block.
grad_blocks = slice_variable(grad_list, 1)
param_blocks = slice_variable(param_list, 1)
grad_blocks = slice_variable(grad_list, 1,
self.config.min_block_size)
param_blocks = slice_variable(param_list, 1,
self.config.min_block_size)
assert (len(grad_blocks) == len(param_blocks))
# origin_varname -> [splited_var]
......@@ -1001,6 +1028,7 @@ class DistributeTranspiler(object):
shape=splited_shape) # flattend splited var
var_mapping[varname].append(var)
program.global_block().sync_with_cpp()
return var_mapping
def create_splited_vars(self, source_var, block, tag):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册