diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index 573774a2324791c1786e39700aeb27e64e2e8f9a..abcad899bfac9ba3eff20cde825e136d867a4485 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -38,14 +38,14 @@ def split_dense_variable(var_list, min_block_size=1024, max_block_size=1048576): """ - We may need to split dense tensor to one or several blocks and put + We may need to split dense tensor to one or more blocks and put them equally onto parameter server. One block is a sub-tensor aligned by dim[0] of the tensor. - + We need to have a minimal block size so that the calculations in the parameter server side can gain better performance. By default - mininum block size is 1024. The max block size is used to prevent - too large block that may causing send error. + minimum block size is 1024. The max block size is used to prevent + very large blocks that may cause send error. """ blocks = [] for var in var_list: @@ -64,7 +64,7 @@ def split_dense_variable(var_list, remains = block_size % dim1 if remains != 0: block_size += dim1 - remains - # update split_count after align + # update split_count after aligning split_count = int(math.ceil(var_numel / float(block_size))) for block_id in xrange(split_count): curr_block_size = min(block_size, var_numel - ( @@ -83,18 +83,18 @@ class DistributeTranspiler: trainers=1, split_method=round_robin): """ - Transpile the program to a distributed data-parallelism programs. - The main_program will be transform to use a remote parameter server + Transpile the program to distributed data-parallelism programs. + The main_program will be transformed to use a remote parameter server to do parameter optimization. And the optimization graph will be put - in to a parameter server program. + into a parameter server program. - Use different methods to split trainable varialbles to different + Use different methods to split trainable variables to different parameter servers. :param optimize_ops: op list of optimization, should be the return value of Optimizer.minimize :type optimize_ops: list - :param program: program to optimize, default default_main_program + :param program: program to optimize, default is default_main_program :param pservers: parameter server endpoints like "m1:6174,m2:6174" :type pservers: string :return: return a list of programs @@ -106,11 +106,11 @@ class DistributeTranspiler: self.trainers = trainers self.optimize_ops = optimize_ops # steps to transpile: - # 1. split variable to multiple blocks, align by product(dim[1:]) (width). + # 1. split variable to multiple blocks, aligned by product(dim[1:]) (width). # 2. modify trainer program add split_op to each Grad. # 3. append send_op to trainer. # 4. append concat_op to trainer to update local weights. - # 5. create new program as parameter server. + # 5. create new program for parameter server. # 6. create parameter server program by split_method generated endpoint->VarBlock pserver_endpoints = pservers.split(",") @@ -136,10 +136,10 @@ class DistributeTranspiler: for b in param_blocks: varname, block_id, _ = b.split(":") send_outputs.append(param_var_mapping[varname][int(block_id)]) - # let send_op know which endpoint to send which var, eplist is of the same - # order of send_inputs. + # let send_op know which endpoint to send which var to, eplist has the same + # order as send_inputs. eplist = split_method(send_inputs, pserver_endpoints) - # create mapping of endpoint -> splited var to create pserver side program + # create mapping of endpoint -> split var to create pserver side program self.param_grad_ep_mapping = dict() for i, ep in enumerate(eplist): param = send_outputs[i] @@ -149,6 +149,7 @@ class DistributeTranspiler: self.param_grad_ep_mapping[ep]["params"].append(param) self.param_grad_ep_mapping[ep]["grads"].append(grad) + # create send_op send_op = program.global_block().append_op( type="send", inputs={"X": send_inputs}, @@ -167,6 +168,7 @@ class DistributeTranspiler: attrs={"axis": 0}) def _create_vars_from_blocklist(self, program, block_list): + # Create respective variables using the block_list block_map = dict() var_mapping = dict() for block_str in block_list: @@ -207,11 +209,12 @@ class DistributeTranspiler: dtype=var.dtype, type=var.type, lod_level=var.lod_level, - # HACK: let all param in pserver persistable so child + # HACK: let all param in pserver be persistable so the child # program in recv can get them persistable=True) def _append_split_op(self, program, gradblocks): + # Split variables that need to be split and append respective ops var_mapping = self._create_vars_from_blocklist(program, gradblocks) for varname, splited_vars in var_mapping.iteritems(): # variable that don't need to split have empty splited_vars @@ -248,6 +251,7 @@ class DistributeTranspiler: return self.program def _create_var_for_trainers(self, block, var, trainers): + # For each trainer, create the necessary variables var_list = [] for i in xrange(trainers): var_each = block.create_var( @@ -262,7 +266,7 @@ class DistributeTranspiler: param_shape): """ Returns the shape for optimizer inputs that need to be reshaped when - Param and Grad is splited to multiple servers. + Param and Grad is split to multiple servers. """ # HACK(typhoonzero): Should use functions of corresponding optimizer in # optimizer.py to get the shape, do not bind this in the transpiler. @@ -300,7 +304,7 @@ class DistributeTranspiler: else: for n in param_names: if n.startswith(op.inputs["Param"].name+".block") and \ - n != op.inputs["Param"].name: + n != op.inputs["Param"].name: return True return False else: @@ -396,7 +400,7 @@ class DistributeTranspiler: dtype=var.dtype, shape=new_shape) - # change outputs ParamOut variable + # change output's ParamOut variable opt_op.outputs["ParamOut"] = new_inputs["Param"] program.global_block().append_op( type=opt_op.type, @@ -405,6 +409,7 @@ class DistributeTranspiler: attrs=opt_op.attrs) def _append_pserver_non_opt_ops(self, program, pserver_program, opt_op): + # Append the ops for parameters that do not need to be optimized/updated for _, var in opt_op.inputs.iteritems(): program.global_block().create_var( name=var.name, @@ -424,7 +429,7 @@ class DistributeTranspiler: def get_pserver_program(self, endpoint): """ - get pserver side program by endpoint + Get pserver side program using the endpoint NOTE: assume blocks of the same variable is not distributed on the same pserver, only change param/grad varnames for @@ -450,6 +455,7 @@ class DistributeTranspiler: shape=v.shape) # step6 optimize_sub_program = Program() + # Iterate through the ops and append ops as needed for idx, opt_op in enumerate(self.optimize_ops): is_op_on_pserver = self._is_op_on_pserver(endpoint, self.optimize_ops, idx) @@ -461,6 +467,7 @@ class DistributeTranspiler: else: self._append_pserver_non_opt_ops(optimize_sub_program, pserver_program, opt_op) + # Append the recv op pserver_program.global_block().append_op( type="recv", inputs={"RX": self.param_grad_ep_mapping[endpoint]["grads"] @@ -486,7 +493,7 @@ class DistributeTranspiler: """ Get startup program for current parameter server. Modify operator input variables if there are variables that - was splited to several blocks. + were split to several blocks. """ s_prog = Program() orig_s_prog = framework.default_startup_program()