diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index 4c90b4a8535aa5a10f1f8405abea327834c9fe1a..58d32bac1257ab25f406ab37948d532b84d4ded8 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -4,6 +4,7 @@ from framework import Program, default_main_program, Parameter, Variable import optimizer from layer_helper import LayerHelper from distributed_spliter import * +import math class VarBlock: @@ -17,6 +18,47 @@ class VarBlock: return "%s:%d:%d" % (self.varname, self.offset, self.size) +def split_dense_variable(var_list, + pserver_count, + min_block_size=1024, + max_block_size=1048576): + """ + We may need to split dense tensor to one or several blocks and put + them equally onto parameter server. One block is a sub-tensor + aligned by dim[0] of the tensor. + + We need to have a minimal block size so that the calculations in + the parameter server side can gain better performance. By default + mininum block size is 1024. The max block size is used to prevent + too large block that may causing send error. + """ + blocks = [] + for var in var_list: + split_count = pserver_count + var_numel = reduce(lambda x, y: x * y, var.shape) + max_pserver_count = int(math.floor(var_numel / float(min_block_size))) + if max_pserver_count == 0: + max_pserver_count = 1 + if max_pserver_count < pserver_count: + split_count = max_pserver_count + block_size = int(math.ceil(var_numel / float(split_count))) + + if len(var.shape) >= 2: + # align by dim1(width) + dim1 = reduce(lambda x, y: x * y, var.shape[1:]) + remains = block_size % dim1 + if remains != 0: + block_size += dim1 - remains + # update split_count after align + split_count = int(math.ceil(var_numel / float(block_size))) + for block_id in xrange(split_count): + curr_block_size = min(block_size, var_numel - ( + (block_id) * block_size)) + block = VarBlock(var.name, block_id, curr_block_size) + blocks.append(str(block)) + return blocks + + class DistributeTranspiler: def transpile(self, optimize_ops, @@ -57,43 +99,49 @@ class DistributeTranspiler: # 5. create parameter server program by split_method generated endpoint->VarBlock # 6. run compile time infershape for parameter server program - if kwargs.has_key("split_method"): - split_method = kwargs["split_method"] - else: - split_method = round_robin - pserver_endpoints = kwargs["pservers"].split(",") - - grad2param = dict() - for param, grad in params_and_grads: - grad2param[grad.name()] = param.name() + pserver_endpoints = pservers.split(",") # step1 - param_list = [pg[0] for pg in params_and_grads] - grad_list = [pg[1] for pg in params_and_grads] + param_list = [pg[0] for pg in params_grads] + grad_list = [pg[1] for pg in params_grads] # TODO: add split selected rows support - grad_blocks = _split_dense_variable(grad_list, len(pserver_endpoints)) - param_blocks = _split_dense_variable(param_list, len(pserver_endpoints)) - ep2gradblock = split_method(grad_blocks, pserver_endpoints) - # self.param_grad_map + grad_blocks = split_dense_variable(grad_list, len(pserver_endpoints)) + param_blocks = split_dense_variable(param_list, len(pserver_endpoints)) # step2 - var2splited = self._split_trainer_vars(program, grad_blocks) + grad_var_mapping = self._append_split_op(program, grad_blocks) # step3 send_inputs = [] - for _, splited in var2splited.iteritems(): + send_outputs = [] + for _, splited in grad_var_mapping.iteritems(): send_inputs.extend(splited) - send_outputs = self._create_vars_from_blocklist(program, param_blocks) + param_var_mapping = self._create_vars_from_blocklist(program, + param_blocks) + for _, splited in param_var_mapping.iteritems(): + send_outputs.extend(splited) + # let send_op know which endpoint to send which var, eplist is of the same + # order of send_inputs. + eplist = split_method(send_inputs, pserver_endpoints) send_op = program.global_block().append_op( type="send", inputs={"X": send_inputs}, outputs={"Out": send_outputs}, attrs={"endpoints": pserver_endpoints, - "epmap": epmap}) + "epmap": eplist}) + + # step4 + for varname, splited_var in param_var_mapping.iteritems(): + orig_param = program.global_block().vars[varname] + concat = program.global_block().append_op( + type="concat", + inputs={"X": send_outputs}, + outputs={"Out": orig_param}, + attrs={"axis": 0}) def _create_vars_from_blocklist(self, program, block_list): block_map = dict() - ret_vars = [] + var_mapping = dict() for block_str in block_list: varname, offset, size = block_str.split(":") if not block_map.has_key(varname): @@ -102,15 +150,26 @@ class DistributeTranspiler: for varname, splited in block_map.iteritems(): orig_var = program.global_block().vars[varname] - for block in splited: + orig_shape = orig_var.shape + orig_dim1_flatten = 1 + if len(orig_shape) >= 2: + orig_dim1_flatten = reduce(lambda x, y: x * y, orig_shape[1:]) + var_list = [] + for i, block in enumerate(splited): size = block[1] + rows = size / orig_dim1_flatten + splited_shape = [rows] + if len(orig_shape) >= 2: + splited_shape.extend(orig_shape[1:]) + print("block, splited shape:", block, splited_shape) var = program.global_block().create_var( name="%s.block%d" % (varname, i), psersistable=False, dtype=orig_var.dtype, - shape=[1, size]) # flattend splited var - ret_vars.append(var) - return ret_vars + shape=splited_shape) # flattend splited var + var_list.append(var) + var_mapping[varname] = var_list + return var_mapping def _clone_param(self, block, v): assert isinstance(v, Parameter) @@ -137,80 +196,22 @@ class DistributeTranspiler: lod_level=var.lod_level, persistable=var.persistable) - def _split_dense_variable(self, - var_list, - pserver_count, - min_block_size=1024, - max_block_size=1048576): - """ - We may need to split dense tensor to one or several blocks and put - them equally onto parameter server. One block is a sub-tensor - aligned by dim[0] of the tensor. - - We need to have a minimal block size so that the calculations in - the parameter server side can gain better performance. By default - mininum block size is 1024. The max block size is used to prevent - too large block that may causing send error. - """ - block_sizes = [] - blocks = [] - for grad in var_list: - dim1 = reduce(lambda x, y: x * y, grad.shape[1:]) - grad_numel = reduce(lambda x, y: x * y, grad.shape) - if grad_numel < min_block_size: - block_sizes.append(grad_numel) - block_size = grad_numel / min_block_size - if block_size < min_block_size: - block_size = min_block_size - # align by dim1(width) - remains = block_size % dim1 - if remains != 0: - block_size += dim1 - remains - block_sizes.append(block_size) - num_blocks = grad_numel / block_size - print("grad numel :%d, blocksize: %d" % grad_numel, block_size) - for block_id in xrange(num_blocks): - block = VarBlock(grad.name(), block_id, block_size) - blocks.append(str(block)) - return blocks - - def _split_trainer_vars(self, program, gradblocks, params_and_grads): - var2blocks = dict() - splited = dict() - for block_str in gradblocks: - varname, offset, size = block_str.split(":") - if not var2blocks.has_key(varname): - var2blocks[varname] = [] - var2blocks[varname].append((long(offset), long(size))) - for varname, blocks in var2blocks.iteritems(): + def _append_split_op(self, program, gradblocks): + var_mapping = self._create_vars_from_blocklist(program, gradblocks) + for varname, splited_vars in var_mapping.iteritems(): + if len(splited_vars) == 1: + continue orig_var = program.global_block().vars[varname] - split_outs = [] - for i in xrange(len(blocks)): - size = blocks[i][1] - var = program.global_block().create_var( - name="%s.block%d" % (varname, i), - psersistable=False, - dtype=orig_var.dtype, - shape=[1, size]) # flattend splited var - split_outs.append(var) - - splited[varname] = split_outs + sections = [] + for v in splited_vars: + sections.append(v.shape[0]) program.global_block().append_op( type="split", inputs={"X": orig_var}, - outputs={"Out": split_outs}, - attrs={"num": len(blocks)} # assume split evenly + outputs={"Out": splited_vars}, + attrs={"sections": sections} # assume split evenly ) - return splited - - def _concat_trainer_vars(self, program, splited): - for varname, to_merge_list in splited.iteritems(): - orig_var = program.global_block().vars[varname] - program.global_block().append_op( - type="concat", - inputs={"X": to_merge_list}, - outputs={"Out": orig_var}, - attrs={}) + return var_mapping def get_trainer_program(self): # remove optimize ops and add a send op to main_program diff --git a/python/paddle/v2/fluid/distribute_transpiler_simple.py b/python/paddle/v2/fluid/distribute_transpiler_simple.py new file mode 100644 index 0000000000000000000000000000000000000000..49ece7b725e318d7526d58fe54c97cbe20200a7d --- /dev/null +++ b/python/paddle/v2/fluid/distribute_transpiler_simple.py @@ -0,0 +1,242 @@ +import framework +from framework import Program, default_main_program, Parameter, Variable +import optimizer +from layer_helper import LayerHelper + + +def hash_name_to_server(params_grads, pserver_endpoints): + """ + :param param_grads: + :return: a map of pserver endpoint -> + params -> [param list] + grads -> [grad list] + """ + + def _hash_param(param_name, total): + return hash(param_name) % total + + param_grad_map = dict() + for param, grad in params_grads: + if param.trainable is True and grad is not None: + server_id = _hash_param(param.name, len(pserver_endpoints)) + server_for_param = pserver_endpoints[server_id] + if not param_grad_map.has_key(server_for_param): + param_grad_map[server_for_param] = {"params": [], "grads": []} + param_grad_map[server_for_param]["params"].append(param) + param_grad_map[server_for_param]["grads"].append(grad) + + return param_grad_map + + +def round_robin(params_grads, pserver_endpoints): + assert (len(params_grads) > len(pserver_endpoints)) + + param_grad_map = dict() + pserver_idx = 0 + for param, grad in params_grads: + if param.trainable is True: + server_for_param = pserver_endpoints[pserver_idx] + if not param_grad_map.has_key(server_for_param): + param_grad_map[server_for_param] = {"params": [], "grads": []} + + param_grad_map[server_for_param]["params"].append(param) + param_grad_map[server_for_param]["grads"].append(grad) + + pserver_idx += 1 + if pserver_idx >= len(pserver_endpoints): + pserver_idx = 0 + return param_grad_map + + +class DistributeTranspiler: + def transpile(self, + optimize_ops, + params_grads, + program=None, + pservers="127.0.0.1:6174", + trainers=1, + split_method=round_robin): + """ + Transpile the program to a distributed data-parallelism programs. + + The main_program will be transform to use a remote parameter server + to do parameter optimization. And the optimization graph will be put + in to a parameter server program. + + Use different methods to split trainable varialbles to different + parameter servers. + + Example to run: + + exe = fluid.Executor(place) + t = fluid.DistributeTranspiler() + t.transpile(optimize_ops, params_grads, pservers="127.0.0.1:6174", trainers=1) + + pserver_endpoint = os.getenv("PSERVER") + if pserver_endpoint: + pserver_prog = t.get_pserver_program(pserver_endpoint, optimize_ops) + exe.run(fluid.default_startup_program()) + exe.run(pserver_prog) + else: + feeder = fluid.DataFeeder(feed_list=[images, label], place=place) + exe.run(fluid.default_startup_program()) + + for pass_id in range(PASS_NUM): + ... + + :param optimize_ops: op list of optimization, should be the + return value of Optimizer.minimize + :type optimize_ops: list + :param program: program to optimize, default default_main_program + :param pservers: parameter server endpoints like "m1:6174,m2:6174" + :type pservers: string + + :return: return a list of programs + """ + if program is None: + program = default_main_program() + self.program = program + self.trainers = trainers + self.optimize_ops = optimize_ops + self._optimize_distributed( + optimize_ops, + program, + params_grads, + pservers=pservers, + trainers=trainers, + split_method=split_method) + + def _clone_param(self, block, v): + assert isinstance(v, Parameter) + new_p = Parameter( + block=block, + shape=v.shape, + dtype=v.dtype, + type=v.type, + lod_level=v.lod_level, + stop_gradient=v.stop_gradient, + trainable=v.trainable, + optimize_attr=v.optimize_attr, + regularizer=v.regularizer, + name=v.name) + block.vars[new_p.name] = new_p + + def _clone_var(self, block, var): + assert isinstance(var, Variable) + return block.create_var( + name=var.name, + shape=var.shape, + dtype=var.dtype, + type=var.type, + lod_level=var.lod_level, + persistable=var.persistable) + + def _optimize_distributed(self, optimize_ops, program, params_and_grads, + **kwargs): + if kwargs.has_key("split_method"): + split_method = kwargs["split_method"] + else: + split_method = round_robin + + assert (callable(split_method)) + pserver_endpoints = kwargs["pservers"].split(",") + self.param_grad_map = split_method(params_and_grads, pserver_endpoints) + + send_op_ordered_inputs = [] + send_op_ordered_outputs = [] + epmap = [] + for ep, v in self.param_grad_map.iteritems(): + send_op_ordered_inputs.extend(v["grads"]) + send_op_ordered_outputs.extend(v["params"]) + for i in v["grads"]: + epmap.append(ep) + send_op = program.global_block().append_op( + type="send", + inputs={"X": send_op_ordered_inputs + }, # inputs is a list of tensors to be send + outputs={"Out": send_op_ordered_outputs}, + attrs={"endpoints": pserver_endpoints, + "epmap": epmap}) + + def get_trainer_program(self): + # remove optimize ops and add a send op to main_program + self.program.global_block().delete_ops(self.optimize_ops) + return self.program + + def _create_var_for_trainers(self, block, var, trainers): + var_list = [] + for i in xrange(trainers): + var_each = block.create_var( + name="%s.trainer_%d" % (var.name, i), + psersistable=var.persistable, + dtype=var.dtype, + shape=var.shape) + var_list.append(var_each) + return var_list + + def get_pserver_program(self, endpoint, optimize_ops): + pserver_program = Program() + for v in self.param_grad_map[endpoint]["params"]: + self._clone_param(pserver_program.global_block(), v) + + optimize_sub_program = Program() + grad_var_names = [ + var.name for var in self.param_grad_map[endpoint]["grads"] + ] + for opt_op in optimize_ops: + for _, var in opt_op.inputs.iteritems(): + # NOTE: append operators to merge gradients from multiple + # trainers. If trainers == 1, this is not needed. + if self.trainers > 1 and var.name in grad_var_names: + vars2merge = self._create_var_for_trainers( + optimize_sub_program.global_block(), var, self.trainers) + merged_var = optimize_sub_program.global_block().create_var( + name=var.name, + persistable=var.persistable, + dtype=var.dtype, + shape=var.shape) + optimize_sub_program.global_block().append_op( + type="sum", + inputs={"X": vars2merge}, + outputs={"Out": merged_var}) + optimize_sub_program.global_block().append_op( + type="scale", + inputs={"X": merged_var}, + outputs={"Out": merged_var}, + attrs={"scale": 1.0 / float(self.trainers)}) + else: + optimize_sub_program.global_block().create_var( + name=var.name, + persistable=var.persistable, + dtype=var.dtype, + shape=var.shape) + + if opt_op.inputs.has_key("Grad"): + if opt_op.inputs["Grad"].name in grad_var_names: + optimize_sub_program.global_block().append_op( + type=opt_op.type, + inputs=opt_op.inputs, + outputs=opt_op.outputs, + attrs=opt_op.attrs) + else: + optimize_sub_program.global_block().append_op( + type=opt_op.type, + inputs=opt_op.inputs, + outputs=opt_op.outputs, + attrs=opt_op.attrs) + pserver_program.global_block().append_op( + type="recv", + inputs={"RX": + self.param_grad_map[endpoint]["grads"]}, # grads to recv + outputs={}, + attrs={ + "OptimizeProgram": optimize_sub_program.desc, + "endpoint": endpoint, + "ParamList": + [p.name for p in self.param_grad_map[endpoint]["params"]], + "GradList": + [p.name for p in self.param_grad_map[endpoint]["grads"]], + "Trainers": self.trainers + }) + pserver_program.sync_with_cpp() + return pserver_program diff --git a/python/paddle/v2/fluid/distributed_spliter.py b/python/paddle/v2/fluid/distributed_spliter.py index e7ba53390d48bb797a08e261e5615462cb9e6019..eff30f7bb66b5149bb24615593463e8715e78576 100644 --- a/python/paddle/v2/fluid/distributed_spliter.py +++ b/python/paddle/v2/fluid/distributed_spliter.py @@ -1,38 +1,35 @@ -def hash_name(varblocks, pserver_endpoints): +def hash_name(varlist, pserver_endpoints): """ - :param varblocks: a list of VarBlock string indicating - sub blocks of variables - :return: a map of pserver endpoint -> varblock_str + hash variable names to several endpoints. + + :param varlist: a list of Variables + :return: a map of pserver endpoint -> varname """ def _hash_block(block_str, total): return hash(block_str) % total - ep2block = dict() - for varblock_str in varblocks: - if param.trainable is True and grad is not None: - server_id = _hash_block(varblock_str, len(pserver_endpoints)) - server_for_param = pserver_endpoints[server_id] - if not ep2block.has_key(server_for_param): - ep2block[server_for_param] = [] - ep2block[server_for_param].append(varblock_str) - - return ep2block + eplist = [] + for var in varlist: + server_id = _hash_block(var.name(), len(pserver_endpoints)) + server_for_param = pserver_endpoints[server_id] + eplist.append(server_for_param) + return eplist -def round_robin(varblocks, pserver_endpoints): - assert (len(varblocks) > len(pserver_endpoints)) +def round_robin(varlist, pserver_endpoints): + """ + distribute variables to several endpoints. + """ + assert (len(varlist) > len(pserver_endpoints)) - ep2block = dict() + eplist = [] pserver_idx = 0 - for varblock_str in varblocks: - if param.trainable is True: - server_for_param = pserver_endpoints[pserver_idx] - if not ep2block.has_key(server_for_param): - ep2block[server_for_param] = [] - ep2block[server_for_param].append(varblock_str) + for var in varlist: + server_for_param = pserver_endpoints[pserver_idx] + eplist.append(server_for_param) - pserver_idx += 1 - if pserver_idx >= len(pserver_endpoints): - pserver_idx = 0 - return ep2block + pserver_idx += 1 + if pserver_idx >= len(pserver_endpoints): + pserver_idx = 0 + return eplist diff --git a/python/paddle/v2/fluid/tests/book_distribute/test_split_var.py b/python/paddle/v2/fluid/tests/book_distribute/test_split_var.py new file mode 100644 index 0000000000000000000000000000000000000000..1355e13e1c11ff9cdd130ed579408d8c5416c9d9 --- /dev/null +++ b/python/paddle/v2/fluid/tests/book_distribute/test_split_var.py @@ -0,0 +1,38 @@ +import math +import unittest +from paddle.v2.fluid.distribute_transpiler import split_dense_variable +import paddle.v2.fluid as fluid +import random + + +class TestSplitVar(unittest.TestCase): + def test_check_output(self): + # split below shapes to 10 servers + shapes = [[3, 5], [1024], [28, 784], [8, 1020], [800, 10]] + expected_sizes = [ + [15], [1024], + [2352, 2352, 2352, 2352, 2352, 2352, 2352, 2352, 2352, 784], + [2040, 2040, 2040, 2040], + [1150, 1150, 1150, 1150, 1150, 1150, 1100] + ] + var_list = [] + program = fluid.Program() + for shape in shapes: + var = program.global_block().create_var( + name=str(random.randint(10000)), + persistable=True, + dtype=core.VarDesc.VarType.LOD_TENSOR, + shape=shape) + var_list.append(var) + blocks = split_dense_variable(var_list, 10) + all_sizes = [] + for s in expected_sizes: + for s2 in s: + all_sizes.append(s2) + for i, block_str in enumerate(blocks): + varname, block_id, size = block_str.split(":") + self.assertEqual(int(size), all_sizes[i]) + + +if __name__ == '__main__': + unittest.main()