diff --git a/paddle/fluid/operators/concat_op.h b/paddle/fluid/operators/concat_op.h index 878e53058567500aeb9fe854a1a65ed5380572a8..c8a4292932dfaddb4ea73a0d1c8ff6bda02ce1c0 100644 --- a/paddle/fluid/operators/concat_op.h +++ b/paddle/fluid/operators/concat_op.h @@ -38,7 +38,7 @@ class ConcatKernel : public framework::OpKernel { auto in_stride = framework::stride_numel(in->dims()); StridedNumelCopyWithAxis(ctx.device_context(), axis, out->data() + output_offset, out_stride, - in->data(), in_stride); + in->data(), in_stride, in_stride[axis]); output_offset += in_stride[axis]; } } @@ -59,7 +59,7 @@ class ConcatGradKernel : public framework::OpKernel { auto out_stride = framework::stride_numel(out->dims()); StridedNumelCopyWithAxis(ctx.device_context(), axis, out->data(), out_stride, in->data() + input_offset, - in_stride); + in_stride, out_stride[axis]); input_offset += out_stride[axis]; } } diff --git a/paddle/fluid/operators/split_op.h b/paddle/fluid/operators/split_op.h index 06bcf82620bec57346c30b029d23ad8417252248..54420e1bf6ec982545715dc847b0b3e138cf2045 100644 --- a/paddle/fluid/operators/split_op.h +++ b/paddle/fluid/operators/split_op.h @@ -38,7 +38,7 @@ class SplitOpKernel : public framework::OpKernel { auto out_stride = framework::stride_numel(out->dims()); StridedNumelCopyWithAxis(ctx.device_context(), axis, out->data(), out_stride, in->data() + input_offset, - in_stride); + in_stride, out_stride[axis]); input_offset += out_stride[axis]; } } diff --git a/paddle/fluid/operators/strided_memcpy.h b/paddle/fluid/operators/strided_memcpy.h index 385124305e2d9afd62313ca46178b4916cd6405d..4c7b90693a2f9ba62d9c30bb601ea4aaebeaf4b5 100644 --- a/paddle/fluid/operators/strided_memcpy.h +++ b/paddle/fluid/operators/strided_memcpy.h @@ -54,7 +54,8 @@ inline void StridedNumelCopyWithAxis(const platform::DeviceContext& ctx, int64_t axis, T* dst, const framework::DDim& dst_stride_numel, const T* src, - const framework::DDim& src_stride_numel) { + const framework::DDim& src_stride_numel, + int64_t size) { int64_t before = dst_stride_numel[0] / dst_stride_numel[axis]; int64_t src_after = src_stride_numel[axis]; int64_t dst_after = dst_stride_numel[axis]; @@ -82,15 +83,14 @@ inline void StridedNumelCopyWithAxis(const platform::DeviceContext& ctx, if (platform::is_cpu_place(place)) { auto& cpu_place = boost::get(place); memory::Copy(cpu_place, dst + i * dst_after, cpu_place, - src + i * src_after, sizeof(T) * src_after); + src + i * src_after, sizeof(T) * size); } else { #ifdef PADDLE_WITH_CUDA auto& gpu_place = boost::get(place); auto& cuda_ctx = reinterpret_cast(ctx); memory::Copy(gpu_place, dst + i * dst_after, gpu_place, - src + i * src_after, sizeof(T) * src_after, - cuda_ctx.stream()); + src + i * src_after, sizeof(T) * size, cuda_ctx.stream()); #else PADDLE_THROW("Paddle is not compiled with GPU"); #endif diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index e4675e24b178b2f1745c2b38270ac381ebfe6550..689920af0c4fb85d11c3492d83da2d22d9c4fa6e 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -121,6 +121,7 @@ def split_dense_variable(var_list, block_size += dim1 - remains # update split_count after aligning split_count = int(math.ceil(var_numel / float(block_size))) + print("###split var ", var.name, var.shape, block_size, split_count) for block_id in xrange(split_count): curr_block_size = min(block_size, var_numel - ( (block_id) * block_size)) @@ -191,7 +192,6 @@ class DistributeTranspiler: for b in param_blocks: varname, block_id, _ = b.split(":") send_outputs.append(param_var_mapping[varname][int(block_id)]) - # let send_op know which endpoint to send which var to, eplist has the same # order as send_inputs. eplist = split_method(send_inputs, pserver_endpoints) @@ -230,21 +230,6 @@ class DistributeTranspiler: outputs={"Out": [orig_param]}, attrs={"axis": 0}) - self.lr_param_mapping = self._create_lr_param_mapping() - - def _create_lr_param_mapping(self): - lr_mapping = dict() - for _, opt_op in enumerate(self.optimize_ops): - if not opt_op.inputs or not opt_op.inputs.has_key("LearningRate") \ - or not opt_op.inputs.has_key("Param"): - continue - lr = opt_op.inputs["LearningRate"].name - param = opt_op.inputs["Param"].name - if not lr_mapping.has_key(lr): - lr_mapping.update({lr: list()}) - lr_mapping[lr].append(param) - return lr_mapping - def _create_vars_from_blocklist(self, program, block_list): # Create respective variables using the block_list block_map = dict() @@ -271,6 +256,7 @@ class DistributeTranspiler: splited_shape = [rows] if len(orig_shape) >= 2: splited_shape.extend(orig_shape[1:]) + print("###splited: ", size, rows, splited_shape) var = program.global_block().create_var( name="%s.block%d" % (varname, i), psersistable=False, @@ -278,6 +264,7 @@ class DistributeTranspiler: type=orig_var.type, shape=splited_shape) # flattend splited var var_mapping[varname].append(var) + print("###created split var ", var) return var_mapping def _clone_var(self, block, var): @@ -369,18 +356,9 @@ class DistributeTranspiler: pass return orig_shape - def _fetch_var_names(self, param_dict): - res = [] - if not param_dict: - return res - for _, values in param_dict.iteritems(): - if not isinstance(values, list): - values = [values] - res += [v.name for v in values] - return res - def _append_pserver_ops(self, optimize_block, opt_op, endpoint): program = optimize_block.program + pserver_block = program.global_block() new_inputs = dict() # update param/grad shape first, then other inputs like # moment can use the updated shape @@ -395,11 +373,11 @@ class DistributeTranspiler: # do not append this op if current endpoint # is not dealing with this grad block return - merged_var = program.global_block().vars[grad_block.name] + merged_var = pserver_block.vars[grad_block.name] # append merging ops if trainers > 1 if self.trainers > 1: vars2merge = self._create_var_for_trainers( - program.global_block(), grad_block, self.trainers) + pserver_block, grad_block, self.trainers) optimize_block.append_op( type="sum", inputs={"X": vars2merge}, @@ -419,29 +397,27 @@ class DistributeTranspiler: break if not param_block: return - tmpvar = program.global_block().create_var( + tmpvar = pserver_block.create_var( name=param_block.name, persistable=True, dtype=param_block.dtype, shape=param_block.shape) - new_inputs[key] = tmpvar elif key == "LearningRate": # leraning rate variable has already be created by non-optimize op, # don't create it once again. - new_inputs[key] = program.global_block().vars[opt_op.input(key)[ - 0]] + new_inputs[key] = pserver_block.vars[opt_op.input(key)[0]] for key in opt_op.input_names: new_shape = None if key in ["Param", "Grad", "LearningRate"]: continue - var = program.global_block().vars[opt_op.input(key)[0]] + var = self.program.global_block().vars[opt_op.input(key)[0]] # update accumulator variable shape param_shape = new_inputs["Param"].shape new_shape = self._get_optimizer_input_shape(opt_op.type, key, var.shape, param_shape) - tmpvar = program.global_block().create_var( + tmpvar = pserver_block.create_var( name=var.name, persistable=var.persistable, dtype=var.dtype, @@ -449,11 +425,14 @@ class DistributeTranspiler: new_inputs[key] = tmpvar # change output's ParamOut variable - opt_op.outputs["ParamOut"] = new_inputs["Param"] + outputs = self._get_output_map_from_op(self.program.global_block().vars, + opt_op) + outputs["ParamOut"] = new_inputs["Param"] + optimize_block.append_op( type=opt_op.type, inputs=new_inputs, - outputs=opt_op.outputs, + outputs=outputs, attrs=opt_op.attrs) def _append_pserver_non_opt_ops(self, optimize_block, opt_op): @@ -497,11 +476,12 @@ class DistributeTranspiler: # If one op's input is another op's output or # one op's output is another op's input, we say # the two operator is connected. - op1_input_names = self._fetch_var_names(op1.inputs) - op1_output_names = self._fetch_var_names(op1.outputs) + op1_input_names = op1.desc.input_arg_names() + op1_output_names = op1.desc.output_arg_names() + + op2_input_names = op2.desc.input_arg_names() + op2_output_names = op2.desc.output_arg_names() - op2_input_names = self._fetch_var_names(op2.inputs) - op2_output_names = self._fetch_var_names(op2.outputs) if set(op1_output_names) & set(op2_input_names) or \ set(op1_input_names) & set(op2_output_names): return True @@ -521,8 +501,8 @@ class DistributeTranspiler: def _is_opt_op(self, op): # NOTE: It's a HACK implement. # optimize op: SGDOptimize, MomentumOptimizer, AdamOptimizer and etc... - if op.inputs and op.inputs.has_key("Param") \ - and op.inputs.has_key("LearningRate"): + if "Param" in op.input_names and \ + "LearningRate" in op.input_names: return True return False @@ -530,12 +510,12 @@ class DistributeTranspiler: param_names = [ p.name for p in self.param_grad_ep_mapping[endpoint]["params"] ] - if op.inputs["Param"].name in param_names: + if op.input("Param") in param_names: return True else: for n in param_names: - param = op.inputs["Param"].name - if same_or_split_var(n, param) and n != op.inputs["Param"].name: + param = op.input("Param")[0] + if same_or_split_var(n, param) and n != param: return True return False return False @@ -551,6 +531,8 @@ class DistributeTranspiler: """ # step5 pserver_program = Program() + print("param mapping on pserver: #### ", + self.param_grad_ep_mapping[endpoint]["params"]) for v in self.param_grad_ep_mapping[endpoint]["params"]: self._clone_var(pserver_program.global_block(), v) for v in self.param_grad_ep_mapping[endpoint]["grads"]: @@ -564,7 +546,6 @@ class DistributeTranspiler: persistable=True, dtype=v.dtype, shape=v.shape) - # step6 optimize_block = pserver_program.create_block(0) # step 6.1 diff --git a/python/paddle/v2/fluid/framework.py b/python/paddle/v2/fluid/framework.py index a517db68c5886fbcbe19e6981aee5bf3971352e4..35d3df785ba4f74ce1681e471e7a83dfdaf71987 100644 --- a/python/paddle/v2/fluid/framework.py +++ b/python/paddle/v2/fluid/framework.py @@ -400,9 +400,6 @@ class Operator(object): """ self.block = block self.desc = desc - # for clone a new operator - self.inputs = inputs - self.outputs = outputs self.attrs = attrs if len(self.desc.type()) != 0: return