diff --git a/python/paddle/v2/fluid/__init__.py b/python/paddle/v2/fluid/__init__.py index ec5159fca161ed1912bc4145e732b7927833cc0b..5afc663822cac62105f0e6191f927bb2c8c4a705 100644 --- a/python/paddle/v2/fluid/__init__.py +++ b/python/paddle/v2/fluid/__init__.py @@ -18,6 +18,7 @@ from param_attr import ParamAttr from data_feeder import DataFeeder from core import LoDTensor, CPUPlace, CUDAPlace from distribute_transpiler import DistributeTranspiler +from distribute_transpiler_simple import SimpleDistributeTranspiler import clip from memory_optimization_transpiler import memory_optimize @@ -37,6 +38,7 @@ __all__ = framework.__all__ + executor.__all__ + [ 'ParamAttr' 'DataFeeder', 'clip', + 'SimpleDistributeTranspiler', 'DistributeTranspiler', 'memory_optimize', ] diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index 49ece7b725e318d7526d58fe54c97cbe20200a7d..d17f9815cca5e3f4142da1357d2e5da6914a76cf 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -1,51 +1,62 @@ +from __future__ import print_function import framework from framework import Program, default_main_program, Parameter, Variable import optimizer from layer_helper import LayerHelper +from distributed_spliter import * +import math -def hash_name_to_server(params_grads, pserver_endpoints): - """ - :param param_grads: - :return: a map of pserver endpoint -> - params -> [param list] - grads -> [grad list] - """ - - def _hash_param(param_name, total): - return hash(param_name) % total - - param_grad_map = dict() - for param, grad in params_grads: - if param.trainable is True and grad is not None: - server_id = _hash_param(param.name, len(pserver_endpoints)) - server_for_param = pserver_endpoints[server_id] - if not param_grad_map.has_key(server_for_param): - param_grad_map[server_for_param] = {"params": [], "grads": []} - param_grad_map[server_for_param]["params"].append(param) - param_grad_map[server_for_param]["grads"].append(grad) - - return param_grad_map +class VarBlock: + def __init__(self, varname, offset, size): + self.varname = varname + # NOTE: real offset is offset * size + self.offset = offset + self.size = size + def __str__(self): + return "%s:%d:%d" % (self.varname, self.offset, self.size) -def round_robin(params_grads, pserver_endpoints): - assert (len(params_grads) > len(pserver_endpoints)) - param_grad_map = dict() - pserver_idx = 0 - for param, grad in params_grads: - if param.trainable is True: - server_for_param = pserver_endpoints[pserver_idx] - if not param_grad_map.has_key(server_for_param): - param_grad_map[server_for_param] = {"params": [], "grads": []} - - param_grad_map[server_for_param]["params"].append(param) - param_grad_map[server_for_param]["grads"].append(grad) +def split_dense_variable(var_list, + pserver_count, + min_block_size=1024, + max_block_size=1048576): + """ + We may need to split dense tensor to one or several blocks and put + them equally onto parameter server. One block is a sub-tensor + aligned by dim[0] of the tensor. + + We need to have a minimal block size so that the calculations in + the parameter server side can gain better performance. By default + mininum block size is 1024. The max block size is used to prevent + too large block that may causing send error. + """ + blocks = [] + for var in var_list: + split_count = pserver_count + var_numel = reduce(lambda x, y: x * y, var.shape) + max_pserver_count = int(math.floor(var_numel / float(min_block_size))) + if max_pserver_count == 0: + max_pserver_count = 1 + if max_pserver_count < pserver_count: + split_count = max_pserver_count + block_size = int(math.ceil(var_numel / float(split_count))) - pserver_idx += 1 - if pserver_idx >= len(pserver_endpoints): - pserver_idx = 0 - return param_grad_map + if len(var.shape) >= 2: + # align by dim1(width) + dim1 = reduce(lambda x, y: x * y, var.shape[1:]) + remains = block_size % dim1 + if remains != 0: + block_size += dim1 - remains + # update split_count after align + split_count = int(math.ceil(var_numel / float(block_size))) + for block_id in xrange(split_count): + curr_block_size = min(block_size, var_numel - ( + (block_id) * block_size)) + block = VarBlock(var.name, block_id, curr_block_size) + blocks.append(str(block)) + return blocks class DistributeTranspiler: @@ -58,7 +69,6 @@ class DistributeTranspiler: split_method=round_robin): """ Transpile the program to a distributed data-parallelism programs. - The main_program will be transform to use a remote parameter server to do parameter optimization. And the optimization graph will be put in to a parameter server program. @@ -66,60 +76,113 @@ class DistributeTranspiler: Use different methods to split trainable varialbles to different parameter servers. - Example to run: - - exe = fluid.Executor(place) - t = fluid.DistributeTranspiler() - t.transpile(optimize_ops, params_grads, pservers="127.0.0.1:6174", trainers=1) - - pserver_endpoint = os.getenv("PSERVER") - if pserver_endpoint: - pserver_prog = t.get_pserver_program(pserver_endpoint, optimize_ops) - exe.run(fluid.default_startup_program()) - exe.run(pserver_prog) - else: - feeder = fluid.DataFeeder(feed_list=[images, label], place=place) - exe.run(fluid.default_startup_program()) - - for pass_id in range(PASS_NUM): - ... - :param optimize_ops: op list of optimization, should be the return value of Optimizer.minimize :type optimize_ops: list :param program: program to optimize, default default_main_program :param pservers: parameter server endpoints like "m1:6174,m2:6174" :type pservers: string - :return: return a list of programs """ + assert (callable(split_method)) if program is None: program = default_main_program() self.program = program self.trainers = trainers self.optimize_ops = optimize_ops - self._optimize_distributed( - optimize_ops, - program, - params_grads, - pservers=pservers, - trainers=trainers, - split_method=split_method) - - def _clone_param(self, block, v): - assert isinstance(v, Parameter) - new_p = Parameter( - block=block, - shape=v.shape, - dtype=v.dtype, - type=v.type, - lod_level=v.lod_level, - stop_gradient=v.stop_gradient, - trainable=v.trainable, - optimize_attr=v.optimize_attr, - regularizer=v.regularizer, - name=v.name) - block.vars[new_p.name] = new_p + # steps to transpile: + # 1. split variable to multiple blocks, align by product(dim[1:]) (width). + # 2. modify trainer program add split_op to each Grad. + # 3. append send_op to trainer. + # 4. append concat_op to trainer to update local weights. + # 5. create new program as parameter server. + # 6. create parameter server program by split_method generated endpoint->VarBlock + + pserver_endpoints = pservers.split(",") + + # step1 + param_list = [pg[0] for pg in params_grads] + grad_list = [pg[1] for pg in params_grads] + # TODO: add split selected rows support + grad_blocks = split_dense_variable(grad_list, len(pserver_endpoints)) + param_blocks = split_dense_variable(param_list, len(pserver_endpoints)) + # step2 + grad_var_mapping = self._append_split_op(program, grad_blocks) + + # step3 + send_inputs = [] + send_outputs = [] + for b in grad_blocks: # append by order + varname, block_id, _ = b.split(":") + send_inputs.append(grad_var_mapping[varname][int(block_id)]) + + param_var_mapping = self._create_vars_from_blocklist(program, + param_blocks) + for b in param_blocks: + varname, block_id, _ = b.split(":") + send_outputs.append(param_var_mapping[varname][int(block_id)]) + # let send_op know which endpoint to send which var, eplist is of the same + # order of send_inputs. + eplist = split_method(send_inputs, pserver_endpoints) + # create mapping of endpoint -> splited var to create pserver side program + self.param_grad_ep_mapping = dict() + for i, ep in enumerate(eplist): + param = send_outputs[i] + grad = send_inputs[i] + if not self.param_grad_ep_mapping.has_key(ep): + self.param_grad_ep_mapping[ep] = {"params": [], "grads": []} + self.param_grad_ep_mapping[ep]["params"].append(param) + self.param_grad_ep_mapping[ep]["grads"].append(grad) + + send_op = program.global_block().append_op( + type="send", + inputs={"X": send_inputs}, + outputs={"Out": send_outputs}, + attrs={"endpoints": pserver_endpoints, + "epmap": eplist}) + # step4 + for varname, splited_var in param_var_mapping.iteritems(): + if len(splited_var) <= 1: + continue + orig_param = program.global_block().vars[varname] + concat = program.global_block().append_op( + type="concat", + inputs={"X": splited_var}, + outputs={"Out": [orig_param]}, + attrs={"axis": 0}) + + def _create_vars_from_blocklist(self, program, block_list): + block_map = dict() + var_mapping = dict() + for block_str in block_list: + varname, offset, size = block_str.split(":") + if not block_map.has_key(varname): + block_map[varname] = [] + block_map[varname].append((long(offset), long(size))) + for varname, splited in block_map.iteritems(): + orig_var = program.global_block().vars[varname] + var_mapping[varname] = [] + if len(splited) == 1: + var_mapping[varname] = [orig_var] + continue + orig_shape = orig_var.shape + orig_dim1_flatten = 1 + if len(orig_shape) >= 2: + orig_dim1_flatten = reduce(lambda x, y: x * y, orig_shape[1:]) + + for i, block in enumerate(splited): + size = block[1] + rows = size / orig_dim1_flatten + splited_shape = [rows] + if len(orig_shape) >= 2: + splited_shape.extend(orig_shape[1:]) + var = program.global_block().create_var( + name="%s.block%d" % (varname, i), + psersistable=False, + dtype=orig_var.dtype, + shape=splited_shape) # flattend splited var + var_mapping[varname].append(var) + return var_mapping def _clone_var(self, block, var): assert isinstance(var, Variable) @@ -129,34 +192,27 @@ class DistributeTranspiler: dtype=var.dtype, type=var.type, lod_level=var.lod_level, - persistable=var.persistable) + # HACK: let all param in pserver persistable so child + # program in recv can get them + persistable=True) - def _optimize_distributed(self, optimize_ops, program, params_and_grads, - **kwargs): - if kwargs.has_key("split_method"): - split_method = kwargs["split_method"] - else: - split_method = round_robin - - assert (callable(split_method)) - pserver_endpoints = kwargs["pservers"].split(",") - self.param_grad_map = split_method(params_and_grads, pserver_endpoints) - - send_op_ordered_inputs = [] - send_op_ordered_outputs = [] - epmap = [] - for ep, v in self.param_grad_map.iteritems(): - send_op_ordered_inputs.extend(v["grads"]) - send_op_ordered_outputs.extend(v["params"]) - for i in v["grads"]: - epmap.append(ep) - send_op = program.global_block().append_op( - type="send", - inputs={"X": send_op_ordered_inputs - }, # inputs is a list of tensors to be send - outputs={"Out": send_op_ordered_outputs}, - attrs={"endpoints": pserver_endpoints, - "epmap": epmap}) + def _append_split_op(self, program, gradblocks): + var_mapping = self._create_vars_from_blocklist(program, gradblocks) + for varname, splited_vars in var_mapping.iteritems(): + # variable that don't need to split have empty splited_vars + if len(splited_vars) <= 1: + continue + orig_var = program.global_block().vars[varname] + sections = [] + for v in splited_vars: + sections.append(v.shape[0]) + program.global_block().append_op( + type="split", + inputs={"X": orig_var}, + outputs={"Out": splited_vars}, + attrs={"sections": sections} # assume split evenly + ) + return var_mapping def get_trainer_program(self): # remove optimize ops and add a send op to main_program @@ -174,69 +230,267 @@ class DistributeTranspiler: var_list.append(var_each) return var_list - def get_pserver_program(self, endpoint, optimize_ops): - pserver_program = Program() - for v in self.param_grad_map[endpoint]["params"]: - self._clone_param(pserver_program.global_block(), v) + def _get_optimizer_input_shape(self, op_type, varkey, orig_shape, + param_shape): + """ + Returns the shape for optimizer inputs that need to be reshaped when + Param and Grad is splited to multiple servers. + """ + # HACK(typhoonzero): Should use functions of corresponding optimizer in + # optimizer.py to get the shape, do not bind this in the transpiler. + if op_type == "adam": + if varkey in ["Moment1", "Moment2"]: + return param_shape + elif op_type == "adagrad": + if varkey == "Moment": + return param_shape + elif op_type == "adamax": + if varkey in ["Moment", "InfNorm"]: + return param_shape + elif op_type == "momentum": + if varkey == "Velocity": + return param_shape + elif op_type == "": + if varkey == "Moment": + return param_shape + elif op_type == "sgd": + pass + return orig_shape - optimize_sub_program = Program() - grad_var_names = [ - var.name for var in self.param_grad_map[endpoint]["grads"] + def _is_op_on_pserver(self, endpoint, all_ops, idx): + """ + Recursively check if the op need to run on current server. + Assume that ops are in the execution order. + """ + param_names = [ + p.name for p in self.param_grad_ep_mapping[endpoint]["params"] ] - for opt_op in optimize_ops: - for _, var in opt_op.inputs.iteritems(): - # NOTE: append operators to merge gradients from multiple - # trainers. If trainers == 1, this is not needed. - if self.trainers > 1 and var.name in grad_var_names: + op = all_ops[idx] + if op.inputs.has_key("Param"): + if op.inputs["Param"].name in param_names: + return True + else: + for n in param_names: + if n.startswith(op.inputs["Param"].name+".block") and \ + n != op.inputs["Param"].name: + return True + return False + else: + j = idx - 1 + while j >= 0: + prev_op = all_ops[j] + prev_output_names = [o.name for o in prev_op.outputs.values()] + prev_input_names = [o.name for o in prev_op.inputs.values()] + found1 = False + found2 = False + for _, v in op.inputs.iteritems(): + if v.name in prev_output_names: + found1 = self._is_op_on_pserver(endpoint, all_ops, j) + # later ops may produce output for prev op's next batch use. + for _, v in op.outputs.iteritems(): + if v.name in prev_input_names: + found2 = self._is_op_on_pserver(endpoint, all_ops, j) + if found1 or found2: + return True + j -= 1 + return False + + def _append_pserver_ops(self, program, pserver_program, opt_op, endpoint): + new_inputs = dict() + # update param/grad shape first, then other inputs like + # moment can use the updated shape + for key, var in opt_op.inputs.iteritems(): + if key == "Grad": + grad_block = None + for g in self.param_grad_ep_mapping[endpoint]["grads"]: + if g.name.startswith(var.name): + grad_block = g + break + if not grad_block: + # do not append this op if current endpoint + # is not dealing with this grad block + return + merged_var = program.global_block().create_var( + name=grad_block.name, + persistable=grad_block.persistable, + dtype=grad_block.dtype, + shape=grad_block.shape) + # append merging ops if trainers > 1 + if self.trainers > 1: vars2merge = self._create_var_for_trainers( - optimize_sub_program.global_block(), var, self.trainers) - merged_var = optimize_sub_program.global_block().create_var( - name=var.name, - persistable=var.persistable, - dtype=var.dtype, - shape=var.shape) - optimize_sub_program.global_block().append_op( + program.global_block(), grad_block, self.trainers) + program.global_block().append_op( type="sum", inputs={"X": vars2merge}, outputs={"Out": merged_var}) - optimize_sub_program.global_block().append_op( + program.global_block().append_op( type="scale", inputs={"X": merged_var}, outputs={"Out": merged_var}, attrs={"scale": 1.0 / float(self.trainers)}) - else: - optimize_sub_program.global_block().create_var( - name=var.name, - persistable=var.persistable, - dtype=var.dtype, - shape=var.shape) + new_inputs[key] = merged_var + elif key == "Param": + # param is already created on global program + param_block = None + for p in self.param_grad_ep_mapping[endpoint]["params"]: + if p.name.startswith(var.name): + param_block = p + break + if not param_block: + return + tmpvar = program.global_block().create_var( + name=param_block.name, + persistable=True, + dtype=param_block.dtype, + shape=param_block.shape) + + new_inputs[key] = tmpvar + for key, var in opt_op.inputs.iteritems(): + if key in ["Param", "Grad"]: + continue + # update accumulator variable shape + param_shape = new_inputs["Param"].shape + new_shape = self._get_optimizer_input_shape(opt_op.type, key, + var.shape, param_shape) + tmpvar = program.global_block().create_var( + name=var.name, + persistable=var.persistable, + dtype=var.dtype, + shape=new_shape) + new_inputs[key] = tmpvar + # create var in pserver program global block. + # TODO(typhoonzero): put blocks in one program to avoid create two + # variables. + pserver_program.global_block().create_var( + name=var.name, + persistable=var.persistable, + dtype=var.dtype, + shape=new_shape) + + # change outputs ParamOut variable + opt_op.outputs["ParamOut"] = new_inputs["Param"] + program.global_block().append_op( + type=opt_op.type, + inputs=new_inputs, + outputs=opt_op.outputs, + attrs=opt_op.attrs) + + def _append_pserver_non_opt_ops(self, program, pserver_program, opt_op): + for _, var in opt_op.inputs.iteritems(): + program.global_block().create_var( + name=var.name, + persistable=var.persistable, + dtype=var.dtype, + shape=var.shape) + pserver_program.global_block().create_var( + name=var.name, + persistable=var.persistable, + dtype=var.dtype, + shape=var.shape) + program.global_block().append_op( + type=opt_op.type, + inputs=opt_op.inputs, + outputs=opt_op.outputs, + attrs=opt_op.attrs) + + def get_pserver_program(self, endpoint, optimize_ops): + """ + get pserver side program by endpoint + + NOTE: assume blocks of the same variable is not distributed + on the same pserver, only change param/grad varnames for + trainers to fetch. For each pserver endpoint, server side + program must be a sub-set of the original optimization program. + """ + # step5 + pserver_program = Program() + for v in self.param_grad_ep_mapping[endpoint]["params"]: + self._clone_var(pserver_program.global_block(), v) + # step6 + optimize_sub_program = Program() + for idx, opt_op in enumerate(optimize_ops): + is_op_on_pserver = self._is_op_on_pserver(endpoint, optimize_ops, + idx) + if not is_op_on_pserver: + continue if opt_op.inputs.has_key("Grad"): - if opt_op.inputs["Grad"].name in grad_var_names: - optimize_sub_program.global_block().append_op( - type=opt_op.type, - inputs=opt_op.inputs, - outputs=opt_op.outputs, - attrs=opt_op.attrs) + self._append_pserver_ops(optimize_sub_program, pserver_program, + opt_op, endpoint) else: - optimize_sub_program.global_block().append_op( - type=opt_op.type, - inputs=opt_op.inputs, - outputs=opt_op.outputs, - attrs=opt_op.attrs) + self._append_pserver_non_opt_ops(optimize_sub_program, + pserver_program, opt_op) pserver_program.global_block().append_op( type="recv", - inputs={"RX": - self.param_grad_map[endpoint]["grads"]}, # grads to recv + inputs={"RX": self.param_grad_ep_mapping[endpoint]["grads"] + }, # grads to recv outputs={}, attrs={ "OptimizeProgram": optimize_sub_program.desc, "endpoint": endpoint, - "ParamList": - [p.name for p in self.param_grad_map[endpoint]["params"]], - "GradList": - [p.name for p in self.param_grad_map[endpoint]["grads"]], + "ParamList": [ + p.name + for p in self.param_grad_ep_mapping[endpoint]["params"] + ], + "GradList": [ + p.name + for p in self.param_grad_ep_mapping[endpoint]["grads"] + ], "Trainers": self.trainers }) pserver_program.sync_with_cpp() return pserver_program + + def get_startup_program(self, endpoint, pserver_program): + """ + Get startup program for current parameter server. + Modify operator input variables if there are variables that + was splited to several blocks. + """ + s_prog = Program() + orig_s_prog = framework.default_startup_program() + params = self.param_grad_ep_mapping[endpoint]["params"] + + def _get_splited_name_and_shape(varname): + for idx, splited_param in enumerate(params): + pname = splited_param.name + if pname.startswith(varname) and varname != pname: + return pname, splited_param.shape + return "", [] + + # 1. create vars in pserver program to startup program + pserver_vars = pserver_program.global_block().vars + created_var_map = dict() + for _, var in pserver_vars.iteritems(): + tmpvar = s_prog.global_block().create_var( + name=var.name, + persistable=var.persistable, + dtype=var.dtype, + shape=var.shape) + created_var_map[var.name] = tmpvar + + # 2. rename op outputs + for op in orig_s_prog.global_block().ops: + new_outputs = dict() + # do not append startup op if var is not on this pserver + op_on_pserver = False + for key, var in op.outputs.iteritems(): + newname, _ = _get_splited_name_and_shape(var.name) + if newname: + op_on_pserver = True + new_outputs[key] = created_var_map[newname] + elif var.name in pserver_vars: + op_on_pserver = True + new_outputs[key] = pserver_vars[var.name] + + if op_on_pserver: + if op.type in [ + "gaussian_random", "fill_constant", "uniform_random" + ]: + op.attrs["shape"] = new_outputs["Out"].shape + s_prog.global_block().append_op( + type=op.type, + inputs=op.inputs, + outputs=new_outputs, + attrs=op.attrs) + return s_prog diff --git a/python/paddle/v2/fluid/distribute_transpiler_simple.py b/python/paddle/v2/fluid/distribute_transpiler_simple.py new file mode 100644 index 0000000000000000000000000000000000000000..32db3df9aa2a9381d39809e5bcb18a558704c7ed --- /dev/null +++ b/python/paddle/v2/fluid/distribute_transpiler_simple.py @@ -0,0 +1,242 @@ +import framework +from framework import Program, default_main_program, Parameter, Variable +import optimizer +from layer_helper import LayerHelper + + +def hash_name_to_server(params_grads, pserver_endpoints): + """ + :param param_grads: + :return: a map of pserver endpoint -> + params -> [param list] + grads -> [grad list] + """ + + def _hash_param(param_name, total): + return hash(param_name) % total + + param_grad_map = dict() + for param, grad in params_grads: + if param.trainable is True and grad is not None: + server_id = _hash_param(param.name, len(pserver_endpoints)) + server_for_param = pserver_endpoints[server_id] + if not param_grad_map.has_key(server_for_param): + param_grad_map[server_for_param] = {"params": [], "grads": []} + param_grad_map[server_for_param]["params"].append(param) + param_grad_map[server_for_param]["grads"].append(grad) + + return param_grad_map + + +def round_robin(params_grads, pserver_endpoints): + assert (len(params_grads) > len(pserver_endpoints)) + + param_grad_map = dict() + pserver_idx = 0 + for param, grad in params_grads: + if param.trainable is True: + server_for_param = pserver_endpoints[pserver_idx] + if not param_grad_map.has_key(server_for_param): + param_grad_map[server_for_param] = {"params": [], "grads": []} + + param_grad_map[server_for_param]["params"].append(param) + param_grad_map[server_for_param]["grads"].append(grad) + + pserver_idx += 1 + if pserver_idx >= len(pserver_endpoints): + pserver_idx = 0 + return param_grad_map + + +class SimpleDistributeTranspiler: + def transpile(self, + optimize_ops, + params_grads, + program=None, + pservers="127.0.0.1:6174", + trainers=1, + split_method=round_robin): + """ + Transpile the program to a distributed data-parallelism programs. + + The main_program will be transform to use a remote parameter server + to do parameter optimization. And the optimization graph will be put + in to a parameter server program. + + Use different methods to split trainable varialbles to different + parameter servers. + + Example to run: + + exe = fluid.Executor(place) + t = fluid.DistributeTranspiler() + t.transpile(optimize_ops, params_grads, pservers="127.0.0.1:6174", trainers=1) + + pserver_endpoint = os.getenv("PSERVER") + if pserver_endpoint: + pserver_prog = t.get_pserver_program(pserver_endpoint, optimize_ops) + exe.run(fluid.default_startup_program()) + exe.run(pserver_prog) + else: + feeder = fluid.DataFeeder(feed_list=[images, label], place=place) + exe.run(fluid.default_startup_program()) + + for pass_id in range(PASS_NUM): + ... + + :param optimize_ops: op list of optimization, should be the + return value of Optimizer.minimize + :type optimize_ops: list + :param program: program to optimize, default default_main_program + :param pservers: parameter server endpoints like "m1:6174,m2:6174" + :type pservers: string + + :return: return a list of programs + """ + if program is None: + program = default_main_program() + self.program = program + self.trainers = trainers + self.optimize_ops = optimize_ops + self._optimize_distributed( + optimize_ops, + program, + params_grads, + pservers=pservers, + trainers=trainers, + split_method=split_method) + + def _clone_param(self, block, v): + assert isinstance(v, Parameter) + new_p = Parameter( + block=block, + shape=v.shape, + dtype=v.dtype, + type=v.type, + lod_level=v.lod_level, + stop_gradient=v.stop_gradient, + trainable=v.trainable, + optimize_attr=v.optimize_attr, + regularizer=v.regularizer, + name=v.name) + block.vars[new_p.name] = new_p + + def _clone_var(self, block, var): + assert isinstance(var, Variable) + return block.create_var( + name=var.name, + shape=var.shape, + dtype=var.dtype, + type=var.type, + lod_level=var.lod_level, + persistable=var.persistable) + + def _optimize_distributed(self, optimize_ops, program, params_and_grads, + **kwargs): + if kwargs.has_key("split_method"): + split_method = kwargs["split_method"] + else: + split_method = round_robin + + assert (callable(split_method)) + pserver_endpoints = kwargs["pservers"].split(",") + self.param_grad_map = split_method(params_and_grads, pserver_endpoints) + + send_op_ordered_inputs = [] + send_op_ordered_outputs = [] + epmap = [] + for ep, v in self.param_grad_map.iteritems(): + send_op_ordered_inputs.extend(v["grads"]) + send_op_ordered_outputs.extend(v["params"]) + for i in v["grads"]: + epmap.append(ep) + send_op = program.global_block().append_op( + type="send", + inputs={"X": send_op_ordered_inputs + }, # inputs is a list of tensors to be send + outputs={"Out": send_op_ordered_outputs}, + attrs={"endpoints": pserver_endpoints, + "epmap": epmap}) + + def get_trainer_program(self): + # remove optimize ops and add a send op to main_program + self.program.global_block().delete_ops(self.optimize_ops) + return self.program + + def _create_var_for_trainers(self, block, var, trainers): + var_list = [] + for i in xrange(trainers): + var_each = block.create_var( + name="%s.trainer_%d" % (var.name, i), + psersistable=var.persistable, + dtype=var.dtype, + shape=var.shape) + var_list.append(var_each) + return var_list + + def get_pserver_program(self, endpoint, optimize_ops): + pserver_program = Program() + for v in self.param_grad_map[endpoint]["params"]: + self._clone_param(pserver_program.global_block(), v) + + optimize_sub_program = Program() + grad_var_names = [ + var.name for var in self.param_grad_map[endpoint]["grads"] + ] + for opt_op in optimize_ops: + for _, var in opt_op.inputs.iteritems(): + # NOTE: append operators to merge gradients from multiple + # trainers. If trainers == 1, this is not needed. + if self.trainers > 1 and var.name in grad_var_names: + vars2merge = self._create_var_for_trainers( + optimize_sub_program.global_block(), var, self.trainers) + merged_var = optimize_sub_program.global_block().create_var( + name=var.name, + persistable=var.persistable, + dtype=var.dtype, + shape=var.shape) + optimize_sub_program.global_block().append_op( + type="sum", + inputs={"X": vars2merge}, + outputs={"Out": merged_var}) + optimize_sub_program.global_block().append_op( + type="scale", + inputs={"X": merged_var}, + outputs={"Out": merged_var}, + attrs={"scale": 1.0 / float(self.trainers)}) + else: + optimize_sub_program.global_block().create_var( + name=var.name, + persistable=var.persistable, + dtype=var.dtype, + shape=var.shape) + + if opt_op.inputs.has_key("Grad"): + if opt_op.inputs["Grad"].name in grad_var_names: + optimize_sub_program.global_block().append_op( + type=opt_op.type, + inputs=opt_op.inputs, + outputs=opt_op.outputs, + attrs=opt_op.attrs) + else: + optimize_sub_program.global_block().append_op( + type=opt_op.type, + inputs=opt_op.inputs, + outputs=opt_op.outputs, + attrs=opt_op.attrs) + pserver_program.global_block().append_op( + type="recv", + inputs={"RX": + self.param_grad_map[endpoint]["grads"]}, # grads to recv + outputs={}, + attrs={ + "OptimizeProgram": optimize_sub_program.desc, + "endpoint": endpoint, + "ParamList": + [p.name for p in self.param_grad_map[endpoint]["params"]], + "GradList": + [p.name for p in self.param_grad_map[endpoint]["grads"]], + "Trainers": self.trainers + }) + pserver_program.sync_with_cpp() + return pserver_program diff --git a/python/paddle/v2/fluid/distributed_spliter.py b/python/paddle/v2/fluid/distributed_spliter.py new file mode 100644 index 0000000000000000000000000000000000000000..eff30f7bb66b5149bb24615593463e8715e78576 --- /dev/null +++ b/python/paddle/v2/fluid/distributed_spliter.py @@ -0,0 +1,35 @@ +def hash_name(varlist, pserver_endpoints): + """ + hash variable names to several endpoints. + + :param varlist: a list of Variables + :return: a map of pserver endpoint -> varname + """ + + def _hash_block(block_str, total): + return hash(block_str) % total + + eplist = [] + for var in varlist: + server_id = _hash_block(var.name(), len(pserver_endpoints)) + server_for_param = pserver_endpoints[server_id] + eplist.append(server_for_param) + return eplist + + +def round_robin(varlist, pserver_endpoints): + """ + distribute variables to several endpoints. + """ + assert (len(varlist) > len(pserver_endpoints)) + + eplist = [] + pserver_idx = 0 + for var in varlist: + server_for_param = pserver_endpoints[pserver_idx] + eplist.append(server_for_param) + + pserver_idx += 1 + if pserver_idx >= len(pserver_endpoints): + pserver_idx = 0 + return eplist diff --git a/python/paddle/v2/fluid/tests/CMakeLists.txt b/python/paddle/v2/fluid/tests/CMakeLists.txt index e795627bfe9e8ad0c196349a332e62e975f20aa3..9a0240cbf65c7a79e29babc2abcb157ada684c5e 100644 --- a/python/paddle/v2/fluid/tests/CMakeLists.txt +++ b/python/paddle/v2/fluid/tests/CMakeLists.txt @@ -5,3 +5,4 @@ foreach(src ${TEST_OPS}) endforeach() add_subdirectory(book) +add_subdirectory(book_distribute) diff --git a/python/paddle/v2/fluid/tests/book_distribute/CMakeLists.txt b/python/paddle/v2/fluid/tests/book_distribute/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..4d7664469e481344cf9eea84688f068b4fb99dee --- /dev/null +++ b/python/paddle/v2/fluid/tests/book_distribute/CMakeLists.txt @@ -0,0 +1,5 @@ +file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py") +string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}") +foreach(src ${TEST_OPS}) + py_test(${src} SRCS ${src}.py) +endforeach() diff --git a/python/paddle/v2/fluid/tests/book_distribute/test_dist_fit_a_line.py b/python/paddle/v2/fluid/tests/book_distribute/notest_dist_fit_a_line.py similarity index 100% rename from python/paddle/v2/fluid/tests/book_distribute/test_dist_fit_a_line.py rename to python/paddle/v2/fluid/tests/book_distribute/notest_dist_fit_a_line.py diff --git a/python/paddle/v2/fluid/tests/book_distribute/test_dist_label_semantic_roles.py b/python/paddle/v2/fluid/tests/book_distribute/notest_dist_label_semantic_roles.py similarity index 100% rename from python/paddle/v2/fluid/tests/book_distribute/test_dist_label_semantic_roles.py rename to python/paddle/v2/fluid/tests/book_distribute/notest_dist_label_semantic_roles.py diff --git a/python/paddle/v2/fluid/tests/book_distribute/test_dist_word2vec.py b/python/paddle/v2/fluid/tests/book_distribute/notest_dist_word2vec.py similarity index 100% rename from python/paddle/v2/fluid/tests/book_distribute/test_dist_word2vec.py rename to python/paddle/v2/fluid/tests/book_distribute/notest_dist_word2vec.py diff --git a/python/paddle/v2/fluid/tests/book_distribute/test_understand_sentiment_conv_dist.py b/python/paddle/v2/fluid/tests/book_distribute/notest_understand_sentiment_conv_dist.py similarity index 100% rename from python/paddle/v2/fluid/tests/book_distribute/test_understand_sentiment_conv_dist.py rename to python/paddle/v2/fluid/tests/book_distribute/notest_understand_sentiment_conv_dist.py diff --git a/python/paddle/v2/fluid/tests/book_distribute/test_split_var.py b/python/paddle/v2/fluid/tests/book_distribute/test_split_var.py new file mode 100644 index 0000000000000000000000000000000000000000..cfb48a59154527160f622c12ae429bac31483631 --- /dev/null +++ b/python/paddle/v2/fluid/tests/book_distribute/test_split_var.py @@ -0,0 +1,39 @@ +import math +import unittest +from paddle.v2.fluid.distribute_transpiler import split_dense_variable +import paddle.v2.fluid as fluid +import paddle.v2.fluid.core as core +import random + + +class TestSplitVar(unittest.TestCase): + def test_check_output(self): + # split below shapes to 10 servers + shapes = [[3, 5], [1024], [28, 784], [8, 1020], [800, 10]] + expected_sizes = [ + [15], [1024], + [2352, 2352, 2352, 2352, 2352, 2352, 2352, 2352, 2352, 784], + [2040, 2040, 2040, 2040], + [1150, 1150, 1150, 1150, 1150, 1150, 1100] + ] + var_list = [] + program = fluid.Program() + for shape in shapes: + var = program.global_block().create_var( + name=str(random.randint(10000, 99999)), + persistable=True, + # dtype=core.VarDesc.VarType.LOD_TENSOR, + shape=shape) + var_list.append(var) + blocks = split_dense_variable(var_list, 10) + all_sizes = [] + for s in expected_sizes: + for s2 in s: + all_sizes.append(s2) + for i, block_str in enumerate(blocks): + varname, block_id, size = block_str.split(":") + self.assertEqual(int(size), all_sizes[i]) + + +if __name__ == '__main__': + unittest.main()