From ed55f1b9d49ed3f6609246159594da85810a9a2e Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Fri, 5 Jan 2018 19:25:30 +0800 Subject: [PATCH] transpiler_split_tensor --- .../paddle/v2/fluid/distribute_transpiler.py | 241 +++++++++++------- 1 file changed, 148 insertions(+), 93 deletions(-) diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index 49ece7b725e..e5314cf2729 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -1,51 +1,20 @@ +from __future__ import print_function import framework from framework import Program, default_main_program, Parameter, Variable import optimizer from layer_helper import LayerHelper +from distributed_spliter import * -def hash_name_to_server(params_grads, pserver_endpoints): - """ - :param param_grads: - :return: a map of pserver endpoint -> - params -> [param list] - grads -> [grad list] - """ +class VarBlock: + def __init__(self, varname, offset, size): + self.varname = varname + # NOTE: real offset is offset * size + self.offset = offset + self.size = size - def _hash_param(param_name, total): - return hash(param_name) % total - - param_grad_map = dict() - for param, grad in params_grads: - if param.trainable is True and grad is not None: - server_id = _hash_param(param.name, len(pserver_endpoints)) - server_for_param = pserver_endpoints[server_id] - if not param_grad_map.has_key(server_for_param): - param_grad_map[server_for_param] = {"params": [], "grads": []} - param_grad_map[server_for_param]["params"].append(param) - param_grad_map[server_for_param]["grads"].append(grad) - - return param_grad_map - - -def round_robin(params_grads, pserver_endpoints): - assert (len(params_grads) > len(pserver_endpoints)) - - param_grad_map = dict() - pserver_idx = 0 - for param, grad in params_grads: - if param.trainable is True: - server_for_param = pserver_endpoints[pserver_idx] - if not param_grad_map.has_key(server_for_param): - param_grad_map[server_for_param] = {"params": [], "grads": []} - - param_grad_map[server_for_param]["params"].append(param) - param_grad_map[server_for_param]["grads"].append(grad) - - pserver_idx += 1 - if pserver_idx >= len(pserver_endpoints): - pserver_idx = 0 - return param_grad_map + def __str__(self): + return "%s:%d:%d" % (self.varname, self.offset, self.size) class DistributeTranspiler: @@ -58,7 +27,6 @@ class DistributeTranspiler: split_method=round_robin): """ Transpile the program to a distributed data-parallelism programs. - The main_program will be transform to use a remote parameter server to do parameter optimization. And the optimization graph will be put in to a parameter server program. @@ -66,45 +34,84 @@ class DistributeTranspiler: Use different methods to split trainable varialbles to different parameter servers. - Example to run: - - exe = fluid.Executor(place) - t = fluid.DistributeTranspiler() - t.transpile(optimize_ops, params_grads, pservers="127.0.0.1:6174", trainers=1) - - pserver_endpoint = os.getenv("PSERVER") - if pserver_endpoint: - pserver_prog = t.get_pserver_program(pserver_endpoint, optimize_ops) - exe.run(fluid.default_startup_program()) - exe.run(pserver_prog) - else: - feeder = fluid.DataFeeder(feed_list=[images, label], place=place) - exe.run(fluid.default_startup_program()) - - for pass_id in range(PASS_NUM): - ... - :param optimize_ops: op list of optimization, should be the return value of Optimizer.minimize :type optimize_ops: list :param program: program to optimize, default default_main_program :param pservers: parameter server endpoints like "m1:6174,m2:6174" :type pservers: string - :return: return a list of programs """ + assert (callable(split_method)) if program is None: program = default_main_program() self.program = program self.trainers = trainers self.optimize_ops = optimize_ops - self._optimize_distributed( - optimize_ops, - program, - params_grads, - pservers=pservers, - trainers=trainers, - split_method=split_method) + # steps to transpile: + # 1. split variable to multiple blocks, align by product(dim[1:]) (width). + # 2. modify trainer program add split_op to each Grad. + # 3. append send_op to trainer. + # 4. append concat_op to trainer to update local weights. + # 5. create new program as parameter server. + # 5. create parameter server program by split_method generated endpoint->VarBlock + # 6. run compile time infershape for parameter server program + + if kwargs.has_key("split_method"): + split_method = kwargs["split_method"] + else: + split_method = round_robin + pserver_endpoints = kwargs["pservers"].split(",") + + grad2param = dict() + for param, grad in params_and_grads: + grad2param[grad.name()] = param.name() + + # step1 + param_list = [pg[0] for pg in params_and_grads] + grad_list = [pg[1] for pg in params_and_grads] + # TODO: add split selected rows support + grad_blocks = _split_dense_variable(grad_list, len(pserver_endpoints)) + param_blocks = _split_dense_variable(param_list, len(pserver_endpoints)) + ep2gradblock = split_method(grad_blocks, pserver_endpoints) + # self.param_grad_map + # step2 + var2splited = self._split_trainer_vars(program, grad_blocks) + + # step3 + send_inputs = [] + send_outputs = [] + for _, splited in var2splited.iteritems(): + send_inputs.extend(splited) + send_outputs = self._create_vars_from_blocklist(program, param_blocks) + + send_op = program.global_block().append_op( + type="send", + inputs={"X": send_inputs}, + outputs={"Out": send_outputs}, + attrs={"endpoints": pserver_endpoints, + "epmap": epmap}) + + def _create_vars_from_blocklist(self, program, block_list): + block_map = dict() + ret_vars = [] + for block_str in block_list: + varname, offset, size = block_str.split(":") + if not block_map.has_key(varname): + block_map[varname] = [] + block_map[varname].append((long(offset), long(size))) + + for varname, splited in block_map.iteritems(): + orig_var = program.global_block().vars[varname] + for block in splited: + size = block[1] + var = program.global_block().create_var( + name="%s.block%d" % (varname, i), + psersistable=False, + dtype=orig_var.dtype, + shape=[1, size]) # flattend splited var + ret_vars.append(var) + return ret_vars def _clone_param(self, block, v): assert isinstance(v, Parameter) @@ -131,32 +138,80 @@ class DistributeTranspiler: lod_level=var.lod_level, persistable=var.persistable) - def _optimize_distributed(self, optimize_ops, program, params_and_grads, - **kwargs): - if kwargs.has_key("split_method"): - split_method = kwargs["split_method"] - else: - split_method = round_robin + def _split_dense_variable(self, + var_list, + pserver_count, + min_block_size=1024, + max_block_size=1048576): + """ + We may need to split dense tensor to one or several blocks and put + them equally onto parameter server. One block is a sub-tensor + aligned by dim[0] of the tensor. + + We need to have a minimal block size so that the calculations in + the parameter server side can gain better performance. By default + mininum block size is 1024. The max block size is used to prevent + too large block that may causing send error. + """ + block_sizes = [] + blocks = [] + for grad in var_list: + dim1 = reduce(lambda x, y: x * y, grad.shape[1:]) + grad_numel = reduce(lambda x, y: x * y, grad.shape) + if grad_numel < min_block_size: + block_sizes.append(grad_numel) + block_size = grad_numel / min_block_size + if block_size < min_block_size: + block_size = min_block_size + # align by dim1(width) + remains = block_size % dim1 + if remains != 0: + block_size += dim1 - remains + block_sizes.append(block_size) + num_blocks = grad_numel / block_size + print("grad numel :%d, blocksize: %d" % grad_numel, block_size) + for block_id in xrange(num_blocks): + block = VarBlock(grad.name(), block_id, block_size) + blocks.append(str(block)) + return blocks - assert (callable(split_method)) - pserver_endpoints = kwargs["pservers"].split(",") - self.param_grad_map = split_method(params_and_grads, pserver_endpoints) - - send_op_ordered_inputs = [] - send_op_ordered_outputs = [] - epmap = [] - for ep, v in self.param_grad_map.iteritems(): - send_op_ordered_inputs.extend(v["grads"]) - send_op_ordered_outputs.extend(v["params"]) - for i in v["grads"]: - epmap.append(ep) - send_op = program.global_block().append_op( - type="send", - inputs={"X": send_op_ordered_inputs - }, # inputs is a list of tensors to be send - outputs={"Out": send_op_ordered_outputs}, - attrs={"endpoints": pserver_endpoints, - "epmap": epmap}) + def _split_trainer_vars(self, program, gradblocks, params_and_grads): + var2blocks = dict() + splited = dict() + for block_str in gradblocks: + varname, offset, size = block_str.split(":") + if not var2blocks.has_key(varname): + var2blocks[varname] = [] + var2blocks[varname].append((long(offset), long(size))) + for varname, blocks in var2blocks.iteritems(): + orig_var = program.global_block().vars[varname] + split_outs = [] + for i in xrange(len(blocks)): + size = blocks[i][1] + var = program.global_block().create_var( + name="%s.block%d" % (varname, i), + psersistable=False, + dtype=orig_var.dtype, + shape=[1, size]) # flattend splited var + split_outs.append(var) + + splited[varname] = split_outs + program.global_block().append_op( + type="split", + inputs={"X": orig_var}, + outputs={"Out": split_outs}, + attrs={"num": len(blocks)} # assume split evenly + ) + return splited + + def _concat_trainer_vars(self, program, splited): + for varname, to_merge_list in splited.iteritems(): + orig_var = program.global_block().vars[varname] + program.global_block().append_op( + type="concat", + inputs={"X": to_merge_list}, + outputs={"Out": orig_var}, + attrs={}) def get_trainer_program(self): # remove optimize ops and add a send op to main_program -- GitLab