diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index 7f3da674633884d5a275a952c07d5a4a8e0f138c..ac13a7cb60a306a4c542b192fe97d7009c2192ef 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -98,8 +98,7 @@ class DistributeTranspiler: # 3. append send_op to trainer. # 4. append concat_op to trainer to update local weights. # 5. create new program as parameter server. - # 5. create parameter server program by split_method generated endpoint->VarBlock - # 6. run compile time infershape for parameter server program + # 6. create parameter server program by split_method generated endpoint->VarBlock pserver_endpoints = pservers.split(",") @@ -124,6 +123,15 @@ class DistributeTranspiler: # let send_op know which endpoint to send which var, eplist is of the same # order of send_inputs. eplist = split_method(send_inputs, pserver_endpoints) + # create mapping of endpoint -> var to create pserver side program + self.param_grad_ep_mapping = dict() + for i, ep in enumerate(eplist): + param = send_outputs[i] + grad = send_inputs[i] + if not self.param_grad_ep_mapping.has_key(ep): + self.param_grad_ep_mapping[ep] = {"params": [], "grads": []} + self.param_grad_ep_mapping[ep]["params"].append(param) + self.param_grad_ep_mapping[ep]["grads"].append(grad) send_op = program.global_block().append_op( type="send", @@ -235,27 +243,29 @@ class DistributeTranspiler: var_list.append(var_each) return var_list - def get_pserver_program(self, endpoint, optimize_ops): - pserver_program = Program() - for v in self.param_grad_map[endpoint]["params"]: - self._clone_param(pserver_program.global_block(), v) - - optimize_sub_program = Program() - grad_var_names = [ - var.name for var in self.param_grad_map[endpoint]["grads"] - ] - for opt_op in optimize_ops: - for _, var in opt_op.inputs.iteritems(): - # NOTE: append operators to merge gradients from multiple - # trainers. If trainers == 1, this is not needed. - if self.trainers > 1 and var.name in grad_var_names: + def _append_pserver_ops(self, opt_op, endpoint): + new_inputs = dict() + for key, var in opt_op.inputs.iteritems(): + if key == "Grad": + grad_block = None + for g in self.param_grad_ep_mapping[endpoint]["grads"]: + if g.name.startswith(var.name): + grad_block = g + break + if not grad_block: + # do not append this op if current endpoint + # is not dealing with this grad block + return + merged_var = optimize_sub_program.global_block().create_var( + name=grad_block.name, + persistable=grad_block.persistable, + dtype=grad_block.dtype, + shape=grad_block.shape) + # append merging ops if trainers > 1 + if self.trainers > 1: vars2merge = self._create_var_for_trainers( - optimize_sub_program.global_block(), var, self.trainers) - merged_var = optimize_sub_program.global_block().create_var( - name=var.name, - persistable=var.persistable, - dtype=var.dtype, - shape=var.shape) + optimize_sub_program.global_block(), grad_block, + self.trainers) optimize_sub_program.global_block().append_op( type="sum", inputs={"X": vars2merge}, @@ -265,38 +275,88 @@ class DistributeTranspiler: inputs={"X": merged_var}, outputs={"Out": merged_var}, attrs={"scale": 1.0 / float(self.trainers)}) - else: - optimize_sub_program.global_block().create_var( - name=var.name, - persistable=var.persistable, - dtype=var.dtype, - shape=var.shape) + new_inputs[key] = merged_var + elif key == "Param": + # param is already created on global program + param_block = None + for p in self.param_grad_ep_mapping[endpoint]["params"]: + if p.name.startswith(var.name): + param_block = p + break + if not param_block: + return + tmpvar = optimize_sub_program.global_block().create_var( + name=param_block.name, + persistable=param_block.persistable, + dtype=param_block.dtype, + shape=param_block.shape) + new_inputs[key] = tmpvar + else: + tmpvar = optimize_sub_program.global_block().create_var( + name=var.name, + persistable=var.persistable, + dtype=var.dtype, + shape=var.shape) + new_inputs[key] = tmpvar - if opt_op.inputs.has_key("Grad"): - if opt_op.inputs["Grad"].name in grad_var_names: - optimize_sub_program.global_block().append_op( - type=opt_op.type, - inputs=opt_op.inputs, - outputs=opt_op.outputs, - attrs=opt_op.attrs) + # FIXME: change outputs ParamOut + optimize_sub_program.global_block().append_op( + type=opt_op.type, + inputs=new_inputs, + outputs=opt_op.outputs, + attrs=opt_op.attrs) + + def _append_pserver_non_opt_ops(self, opt_op): + for _, var in opt_op.inputs.iteritems(): + optimize_sub_program.global_block().create_var( + name=var.name, + persistable=var.persistable, + dtype=var.dtype, + shape=var.shape) + optimize_sub_program.global_block().append_op( + type=opt_op.type, + inputs=new_inputs, + outputs=opt_op.outputs, + attrs=opt_op.attrs) + + def get_pserver_program(self, endpoint, optimize_ops): + """ + get pserver side program by endpoint + + NOTE: assume blocks of the same variable is not distributed + on the same pserver, only change param/grad varnames for + trainers to fetch. For each pserver endpoint, server side + program must be a sub-set of the original optimization program. + """ + # step5 + pserver_program = Program() + for v in self.param_grad_ep_mapping[endpoint]["params"]: + self._clone_param(pserver_program.global_block(), v) + # step6 + optimize_sub_program = Program() + for opt_op in optimize_ops: + if opt_ops.inputs.has_key("Grad"): + # append optimize_op + self._append_pserver_ops(opt_op, endpoint) else: - optimize_sub_program.global_block().append_op( - type=opt_op.type, - inputs=opt_op.inputs, - outputs=opt_op.outputs, - attrs=opt_op.attrs) + self._append_pserver_non_opt_ops(opt_op) + pserver_program.global_block().append_op( type="recv", - inputs={"RX": - self.param_grad_map[endpoint]["grads"]}, # grads to recv + inputs={"RX": self.param_grad_ep_mapping[endpoint]["grads"] + }, # grads to recv outputs={}, attrs={ "OptimizeProgram": optimize_sub_program.desc, "endpoint": endpoint, - "ParamList": - [p.name for p in self.param_grad_map[endpoint]["params"]], - "GradList": - [p.name for p in self.param_grad_map[endpoint]["grads"]], + "ParamList": [ + p.name + for p in self.param_grad_ep_mapping[endpoint]["params"] + ], + "GradList": [ + p.name + for p in self.param_grad_ep_mapping[endpoint]["grads"] + ], "Trainers": self.trainers }) pserver_program.sync_with_cpp()