diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index a06e041c1e8aaa8897ac77f2ec1275824849e7ef..7fc4edf0265af0043f8a734ceec330791e7cadad 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -46,7 +46,7 @@ from param_attr import ParamAttr, WeightNormParamAttr from data_feeder import DataFeeder from core import LoDTensor, LoDTensorArray, CPUPlace, CUDAPlace, CUDAPinnedPlace, Scope from transpiler import DistributeTranspiler, InferenceTranspiler, \ - memory_optimize, release_memory + memory_optimize, release_memory, DistributeTranspilerConfig from concurrency import (Go, make_channel, channel_send, channel_recv, channel_close, Select) from lod_tensor import create_lod_tensor, create_random_int_lodtensor diff --git a/python/paddle/fluid/tests/unittests/test_dist_transpiler.py b/python/paddle/fluid/tests/unittests/test_dist_transpiler.py index 75b4b4e50da04521021dcb1e97cfe495f2619433..f307e737e2eed4546e22a37211d570b623982429 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_transpiler.py +++ b/python/paddle/fluid/tests/unittests/test_dist_transpiler.py @@ -27,7 +27,6 @@ class TranspilerTest(unittest.TestCase): self.pserver_eps = "127.0.0.1:6174,127.0.0.1:6175" self.pserver1_ep = "127.0.0.1:6174" self.pserver2_ep = "127.0.0.1:6175" - self.slice_var_up = True self.sync_mode = True self.transpiler = None @@ -52,27 +51,26 @@ class TranspilerTest(unittest.TestCase): self.origin_prog = main.clone() return main - def get_trainer(self): - t = self._transpiler_instance() + def get_trainer(self, config=None): + t = self._transpiler_instance(config) return t.get_trainer_program() - def get_pserver(self, ep): - t = self._transpiler_instance() + def get_pserver(self, ep, config=None): + t = self._transpiler_instance(config) pserver = t.get_pserver_program(ep) startup = t.get_startup_program(ep, pserver) return pserver, startup - def _transpiler_instance(self): + def _transpiler_instance(self, config=None): if not self.transpiler: main = self.get_main_program() - self.transpiler = fluid.DistributeTranspiler() + self.transpiler = fluid.DistributeTranspiler(config=config) self.transpiler.transpile( self.trainer_id, program=main, pservers=self.pserver_eps, - trainers=self.trainers, - slice_var_up=self.slice_var_up, - sync_mode=self.sync_mode) + trainers=self.trainers) + return self.transpiler @@ -124,14 +122,67 @@ class TestBasicModel(TranspilerTest): self.assertEqual(set(pserver_params), set(trainer_params)) +class TestBasicModelWithLargeBlockSize(TranspilerTest): + def test_transpiler(self): + config = fluid.DistributeTranspilerConfig() + config.min_block_size = 1048576 + + pserver, startup = self.get_pserver(self.pserver1_ep, config) + pserver2, startup2 = self.get_pserver(self.pserver2_ep, config) + + trainer = self.get_trainer(config) + + self.assertEqual([op.type for op in trainer.global_block().ops], [ + 'mul', 'elementwise_add', 'elementwise_sub', 'square', 'mean', + 'fill_constant', 'mean_grad', 'square_grad', 'elementwise_sub_grad', + 'elementwise_add_grad', 'send', 'mul_grad', 'send', 'send_barrier', + 'recv', 'recv', 'fetch_barrier' + ]) + + self.assertEqual(len(pserver.blocks), 2) + # block0: listen_and_serv + self.assertEqual([op.type for op in pserver.blocks[0].ops], + ["listen_and_serv"]) + # block1~2: optimize pass + self.assertEqual([op.type for op in pserver.blocks[1].ops], + ["sum", "scale", "sgd"]) + # confirm startup program + self.assertEqual([op.type for op in startup.global_block().ops], + ["fill_constant", "fill_constant", "fill_constant"]) + # the variable #fc_w will be split into two blocks + fc_w_var = startup2.global_block().var("fc_w") + self.assertEqual(fc_w_var.shape, (1000L, 1000L)) + # all parameters should be optimized on pserver + + pserver_params = [] + for prog in [pserver, pserver2]: + for blk in prog.blocks: + for op in blk.ops: + if "Param" in op.input_names: + param_name = op.input("Param")[0] + is_block_idx = param_name.find(".block") + if is_block_idx != -1: + origin_param_name = param_name[:is_block_idx] + else: + origin_param_name = param_name + pserver_params.append(origin_param_name) + trainer_params = [] + for op in self.origin_prog.global_block().ops: + if "Param" in op.input_names: + trainer_params.append(op.input("Param")[0]) + self.assertEqual(set(pserver_params), set(trainer_params)) + + class TestNoSliceVar(TranspilerTest): def setUp(self): super(TestNoSliceVar, self).setUp() - self.slice_var_up = False def test_transpiler(self): - _, startup = self.get_pserver(self.pserver1_ep) - _, startup2 = self.get_pserver(self.pserver2_ep) + config = fluid.DistributeTranspilerConfig() + config.slice_var_up = False + + _, startup = self.get_pserver(self.pserver1_ep, config) + _, startup2 = self.get_pserver(self.pserver2_ep, config) if startup.global_block().vars.has_key("fc_w"): fc_w_var = startup.global_block().vars["fc_w"] diff --git a/python/paddle/fluid/transpiler/__init__.py b/python/paddle/fluid/transpiler/__init__.py index cf18090f71f34be5105498f5846dbcdf15ab2e3f..eae13b50398f791d4a203b72a0e96f3e87cc2a88 100644 --- a/python/paddle/fluid/transpiler/__init__.py +++ b/python/paddle/fluid/transpiler/__init__.py @@ -12,12 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from distribute_transpiler import DistributeTranspiler +from distribute_transpiler import DistributeTranspiler, DistributeTranspilerConfig from inference_transpiler import InferenceTranspiler from memory_optimization_transpiler import memory_optimize, release_memory from ps_dispatcher import HashName, RoundRobin __all__ = [ "DistributeTranspiler", "InferenceTranspiler", "memory_optimize", - "release_memory", "HashName", "RoundRobin" + "release_memory", "HashName", "RoundRobin", "DistributeTranspilerConfig" ] diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index b0e9a6599730cde98338edc3e12eb81fc8f793c3..c4995fa09c7790897aa659c544a91c4c076fcfc4 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -64,7 +64,7 @@ def same_or_split_var(p_name, var_name): return p_name == var_name or p_name.startswith(var_name + ".block") -def slice_variable(var_list, slice_count, min_block_size=8192): +def slice_variable(var_list, slice_count, min_block_size): """ We may need to split dense tensor to one or more blocks and put them equally onto parameter server. One block is a sub-tensor @@ -110,6 +110,22 @@ def slice_variable(var_list, slice_count, min_block_size=8192): return blocks +class DistributeTranspilerConfig(object): + """ + slice_var_up (bool): Do Tensor slice for pservers, default is True. + split_method (PSDispatcher): RoundRobin or HashName can be used + try to choose the best method to balance loads for pservers. + min_block_size (int): Minimum splitted element number in block. + According:https://github.com/PaddlePaddle/Paddle/issues/8638#issuecomment-369912156 + We can use bandwidth effiently when data size is larger than 2MB.If you + want to change it, please be sure you see the slice_variable function. + """ + + slice_var_up = True + split_method = None + min_block_size = 8192 + + class DistributeTranspiler(object): """ **DistributeTranspiler** @@ -146,13 +162,23 @@ class DistributeTranspiler(object): trainer_program = t.get_trainer_program() """ + def __init__(self, config=None): + if config is not None: + self.config = config + else: + self.config = DistributeTranspilerConfig() + + if self.config.split_method is None: + self.config.split_method = RoundRobin + + assert (self.config.min_block_size >= 8192) + assert (self.config.split_method.__bases__[0] == PSDispatcher) + def transpile(self, trainer_id, program=None, pservers="127.0.0.1:6174", trainers=1, - slice_var_up=True, - split_method=RoundRobin, sync_mode=True): """ Run the transpiler. @@ -165,12 +191,8 @@ class DistributeTranspiler(object): pservers (str): comma separated ip:port string for the pserver list. trainers (int): number of trainers in the distributed job. - slice_var_up (bool): Do Tensor slice for pservers, default is True. - split_method (PSDispatcher): RoundRobin or HashName can be used - try to choose the best method to balance loads for pservers. sync_mode (bool): Do sync training or not, default is True. """ - assert (split_method.__bases__[0] == PSDispatcher) if program is None: program = default_main_program() self.origin_program = program @@ -181,11 +203,11 @@ class DistributeTranspiler(object): self.pserver_endpoints = pserver_endpoints self.optimize_ops, self.params_grads = self._get_optimize_pass() - ps_dispatcher = split_method(self.pserver_endpoints) + ps_dispatcher = self.config.split_method(self.pserver_endpoints) self.has_distributed_lookup_table = self._has_distributed_lookup_table() # split and create vars, then put splited vars in dicts for later use. - self._init_splited_vars(slice_var_up) + self._init_splited_vars() # step 3.1: insert send op to send gradient vars to parameter servers ps_dispatcher.reset() @@ -197,14 +219,14 @@ class DistributeTranspiler(object): # fc_b@GRAD_trainer_0, fc_b@GRAD_trainer_1 --> pserver2 # shuffle the map will avoid the uneven distribution above grad_var_mapping_items = self.grad_var_mapping.items() - if not slice_var_up: + if not self.config.slice_var_up: random.seed(self.trainer_num) random.shuffle(grad_var_mapping_items) for orig_varname, splited_vars in grad_var_mapping_items: eplist = ps_dispatcher.dispatch(splited_vars) - if not slice_var_up: + if not self.config.slice_var_up: assert (len(splited_vars) == 1) if len(splited_vars) == 1: @@ -627,7 +649,7 @@ class DistributeTranspiler(object): ] return param_list, grad_list - def _init_splited_vars(self, slice_var_up): + def _init_splited_vars(self): # update these mappings for further transpile: # 1. param_var_mapping: param var name -> [splited params vars] # 2. grad_var_mapping: grad var name -> [splited grads vars] @@ -651,17 +673,22 @@ class DistributeTranspiler(object): param_list, grad_list = self._update_dist_lookup_table_vars( param_list, grad_list, self.params_grads) - if slice_var_up: + if self.config.slice_var_up: # when we slice var up into blocks, we will slice the var according to # pserver services' count. A pserver may have two or more listening ports. - grad_blocks = slice_variable(grad_list, len(self.pserver_endpoints)) + grad_blocks = slice_variable(grad_list, + len(self.pserver_endpoints), + self.config.min_block_size) param_blocks = slice_variable(param_list, - len(self.pserver_endpoints)) + len(self.pserver_endpoints), + self.config.min_block_size) else: # when we do NOT slice var up into blocks, we will always slice params # grads into one block. - grad_blocks = slice_variable(grad_list, 1) - param_blocks = slice_variable(param_list, 1) + grad_blocks = slice_variable(grad_list, 1, + self.config.min_block_size) + param_blocks = slice_variable(param_list, 1, + self.config.min_block_size) assert (len(grad_blocks) == len(param_blocks)) # origin_varname -> [splited_var] @@ -1001,6 +1028,7 @@ class DistributeTranspiler(object): shape=splited_shape) # flattend splited var var_mapping[varname].append(var) program.global_block().sync_with_cpp() + return var_mapping def create_splited_vars(self, source_var, block, tag):