diff --git a/paddle/fluid/framework/pipeline_trainer.cc b/paddle/fluid/framework/pipeline_trainer.cc index 07e8466ff18729af30ef4e58ed1e01edf29269d1..dbcc993aee8272dbf01addf2cb235e35ee4c51d2 100644 --- a/paddle/fluid/framework/pipeline_trainer.cc +++ b/paddle/fluid/framework/pipeline_trainer.cc @@ -82,7 +82,10 @@ void PipelineTrainer::CopyParameters(int microbatch_id, for (auto& var : global_block.AllVars()) { bool is_param_grad = false; size_t pos = 0; - if ((pos = var->Name().find(kGradVarSuffix)) != std::string::npos) { + // A magic suffix to indicated the merged gradient. + std::string magicSuffix = "MERGED"; + if ((pos = var->Name().find(kGradVarSuffix)) != std::string::npos && + var->Name().find(magicSuffix) != std::string::npos) { auto prefix_name = var->Name().substr(0, pos); if (param_map.find(prefix_name) != param_map.end()) { is_param_grad = true; diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index e4ff8a0d5638300b5f7a094620088ad48f27b76d..86399de9b756219136bde7dc6faa63a51c46ca62 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -153,6 +153,9 @@ class ShardingOptimizer(MetaOptimizerBase): if self.use_pipeline: pp_optimizer._rename_gradient_var_name(main_block) + pp_optimizer._accumulate_gradients(main_block) + with open("main_%d" % self.role_maker._worker_index(), 'w') as f: + f.writelines(str(main_program)) # step1: set_up self._set_up(params_grads) @@ -210,23 +213,6 @@ class ShardingOptimizer(MetaOptimizerBase): # if self._shard.has_param(param_name): # param_list.append(param_name) #pp_optimizer._clear_gradients(main_block, param_list) - accumulated_grad_names = pp_optimizer._accumulate_gradients( - main_block, - pp_allreduce_in_optimize=self.pp_allreduce_in_optimize) - # accumulated_grad_names = sorted(accumulated_grad_names) - if self.pp_allreduce_in_optimize: - print("persistable FP32 grad: ") - print(accumulated_grad_names) - first_optimize_op_index = get_first_check_finite_and_unscale_op_idx( - main_block) - insert_reduce_ops( - main_block, - first_optimize_op_index, - self.sharding_ring_id, - accumulated_grad_names, - self._shard, - core.op_proto_and_checker_maker.OpRole.Optimize, - use_calc_stream=True) #if not self._shard.has_param(param_name): continue ##if not main_block.has_var(grad_name): continue #assert main_block.has_var(grad_name) @@ -246,131 +232,130 @@ class ShardingOptimizer(MetaOptimizerBase): # 'op_role': core.op_proto_and_checker_maker.OpRole.LRSched, # }) - pass - #def _create_var(block, ref_var, name): - # """ - # Create a new var for block, which has the same type, - # shape and dtype as ref_var, then rename it with the - # name `name`. - # """ - # new_var = block.create_var( - # name=name, - # shape=ref_var.shape, - # dtype=ref_var.dtype, - # type=ref_var.type, - # lod_level=ref_var.lod_level, - # persistable=ref_var.persistable, - # is_data=ref_var.is_data, - # need_check_feed=ref_var.desc.need_check_feed()) - # new_var.stop_gradient = ref_var.stop_gradient - # return new_var - - #def _rename_arg(op, old_name, new_name): - # op_desc = op.desc - # if isinstance(op_desc, tuple): - # op_desc = op_desc[0] - # op_desc._rename_input(old_name, new_name) - # op_desc._rename_output(old_name, new_name) - - #print("params_grads:", params_grads) - #for param_name, grad_name in params_grads: - # if not self._shard.has_param(param_name): continue - # #if not main_block.has_var(grad_name): continue - # assert main_block.has_var(grad_name) - # use_fp16 = False - # fp16_grad_name = param_name + '.cast_fp16@GRAD' - # if main_block.has_var(grad_name): - # fp16_grad_var = main_block.vars[fp16_grad_name] - # use_fp16 = True - # grad_var = main_block.vars[grad_name] - # if use_fp16: - # cast_grad_var_name = paddle.fluid.unique_name.generate( - # grad_name) - # cast_var = _create_var(main_block, fp16_grad_var, - # cast_grad_var_name) - # cast_var.persistable = False - # main_block.append_op( - # #index=offset + 1, - # type='cast', - # inputs={'X': grad_var}, - # outputs={'Out': cast_var}, - # attrs={ - # 'in_dtype': grad_var.dtype, - # 'out_dtype': cast_var.dtype, - # 'op_role': - # core.op_proto_and_checker_maker.OpRole.Backward, - # }) - # #offset += 1 - # main_block.append_op( - # #index=offset + 1, - # type='sum', - # inputs={'X': [fp16_grad_var, cast_var]}, - # outputs={'Out': fp16_grad_var}, - # attrs={ - # 'op_role': - # core.op_proto_and_checker_maker.OpRole.Backward, - # 'op_role_var': op_role_var - # }) - - # for index, op in reversed(tuple(enumerate(list(main_block.ops)))): - # offset = index - # if is_backward_op(op) and ( - # 'op_role_var' in op.attr_names): - # op_role_var = op.all_attrs()['op_role_var'] - - # if len(op_role_var) == 0: - # continue - # assert len(op_role_var) % 2 == 0 - # offset = index - # for i in range(0, len(op_role_var), 2): - # grad_name = op_role_var[i + 1] - # if not main_block.has_var(grad_name): continue - # grad_var = main_block.vars[grad_name] - # if not 'cast_fp16' in grad_name: - # new_grad_var_name = paddle.fluid.unique_name.generate(grad_name) - # new_var = _create_var(main_block, grad_var, - # new_grad_var_name) - # new_var.persistable = False - # _rename_arg(op, grad_name, new_grad_var_name) - # main_block._insert_op( - # index=offset + 1, - # type='sum', - # inputs={'X': [grad_var, new_var]}, - # outputs={'Out': grad_var}, - # attrs={ - # 'op_role': core.op_proto_and_checker_maker.OpRole.Backward, - # 'op_role_var': op_role_var - # }) - # offset += 1 - # if 'cast_fp16' in grad_name: - # param_name = op_role_var[i] - # fp32_grad_var_name = param_name + "@GRAD" - # fp32_grad_var = main_block.vars[grad_name] - # cast_grad_var_name = paddle.fluid.unique_name.generate( - # fp32_grad_var_name) - # cast_var = _create_var(main_block, grad_var, - # cast_grad_var_name) - # cast_var.persistable = False - # main_block._insert_op( - # index=offset + 1, - # type='cast', - # inputs={'X': fp32_grad_var}, - # outputs={'Out': cast_var}, - # attrs={ - # 'in_dtype': fp32_grad_var.dtype, - # 'out_dtype': cast_var.dtype, - # 'op_role': core.op_proto_and_checker_maker.OpRole.Backward, - # # self._op_role_var_key: op_role_var - # }) - # offset += 1 - # main_block._insert_op( - # index=offset + 1, - # type='sum', - # inputs={'X': [grad_var, cast_var]}, - # outputs={'Out': grad_var}, - # attrs={ - # 'op_role': core.op_proto_and_checker_maker.OpRole.Backward, - # 'op_role_var': op_role_var}) + #def _create_var(block, ref_var, name): + # """ + # Create a new var for block, which has the same type, + # shape and dtype as ref_var, then rename it with the + # name `name`. + # """ + # new_var = block.create_var( + # name=name, + # shape=ref_var.shape, + # dtype=ref_var.dtype, + # type=ref_var.type, + # lod_level=ref_var.lod_level, + # persistable=ref_var.persistable, + # is_data=ref_var.is_data, + # need_check_feed=ref_var.desc.need_check_feed()) + # new_var.stop_gradient = ref_var.stop_gradient + # return new_var + + #def _rename_arg(op, old_name, new_name): + # op_desc = op.desc + # if isinstance(op_desc, tuple): + # op_desc = op_desc[0] + # op_desc._rename_input(old_name, new_name) + # op_desc._rename_output(old_name, new_name) + + #print("params_grads:", params_grads) + #for param_name, grad_name in params_grads: + # if not self._shard.has_param(param_name): continue + # #if not main_block.has_var(grad_name): continue + # assert main_block.has_var(grad_name) + # use_fp16 = False + # fp16_grad_name = param_name + '.cast_fp16@GRAD' + # if main_block.has_var(grad_name): + # fp16_grad_var = main_block.vars[fp16_grad_name] + # use_fp16 = True + # grad_var = main_block.vars[grad_name] + # if use_fp16: + # cast_grad_var_name = paddle.fluid.unique_name.generate( + # grad_name) + # cast_var = _create_var(main_block, fp16_grad_var, + # cast_grad_var_name) + # cast_var.persistable = False + # main_block.append_op( + # #index=offset + 1, + # type='cast', + # inputs={'X': grad_var}, + # outputs={'Out': cast_var}, + # attrs={ + # 'in_dtype': grad_var.dtype, + # 'out_dtype': cast_var.dtype, + # 'op_role': + # core.op_proto_and_checker_maker.OpRole.Backward, + # }) + # #offset += 1 + # main_block.append_op( + # #index=offset + 1, + # type='sum', + # inputs={'X': [fp16_grad_var, cast_var]}, + # outputs={'Out': fp16_grad_var}, + # attrs={ + # 'op_role': + # core.op_proto_and_checker_maker.OpRole.Backward, + # 'op_role_var': op_role_var + # }) + + # for index, op in reversed(tuple(enumerate(list(main_block.ops)))): + # offset = index + # if is_backward_op(op) and ( + # 'op_role_var' in op.attr_names): + # op_role_var = op.all_attrs()['op_role_var'] + + # if len(op_role_var) == 0: + # continue + # assert len(op_role_var) % 2 == 0 + # offset = index + # for i in range(0, len(op_role_var), 2): + # grad_name = op_role_var[i + 1] + # if not main_block.has_var(grad_name): continue + # grad_var = main_block.vars[grad_name] + # if not 'cast_fp16' in grad_name: + # new_grad_var_name = paddle.fluid.unique_name.generate(grad_name) + # new_var = _create_var(main_block, grad_var, + # new_grad_var_name) + # new_var.persistable = False + # _rename_arg(op, grad_name, new_grad_var_name) + # main_block._insert_op( + # index=offset + 1, + # type='sum', + # inputs={'X': [grad_var, new_var]}, + # outputs={'Out': grad_var}, + # attrs={ + # 'op_role': core.op_proto_and_checker_maker.OpRole.Backward, + # 'op_role_var': op_role_var + # }) + # offset += 1 + # if 'cast_fp16' in grad_name: + # param_name = op_role_var[i] + # fp32_grad_var_name = param_name + "@GRAD" + # fp32_grad_var = main_block.vars[grad_name] + # cast_grad_var_name = paddle.fluid.unique_name.generate( + # fp32_grad_var_name) + # cast_var = _create_var(main_block, grad_var, + # cast_grad_var_name) + # cast_var.persistable = False + # main_block._insert_op( + # index=offset + 1, + # type='cast', + # inputs={'X': fp32_grad_var}, + # outputs={'Out': cast_var}, + # attrs={ + # 'in_dtype': fp32_grad_var.dtype, + # 'out_dtype': cast_var.dtype, + # 'op_role': core.op_proto_and_checker_maker.OpRole.Backward, + # # self._op_role_var_key: op_role_var + # }) + # offset += 1 + # main_block._insert_op( + # index=offset + 1, + # type='sum', + # inputs={'X': [grad_var, cast_var]}, + # outputs={'Out': grad_var}, + # attrs={ + # 'op_role': core.op_proto_and_checker_maker.OpRole.Backward, + # 'op_role_var': op_role_var}) main_block._sync_with_cpp() # TODO(wangxi): add optimize offload diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 6f43fae7013bd409bca0ca1595f7b0763987624b..bc0532a4ee0a9aabd00da30bc17f49100b131b17 100644 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -4064,11 +4064,8 @@ class PipelineOptimizer(object): return None def _rename_arg(self, op, old_name, new_name): - op_desc = op.desc - if isinstance(op_desc, tuple): - op_desc = op_desc[0] - op_desc._rename_input(old_name, new_name) - op_desc._rename_output(old_name, new_name) + op._rename_input(old_name, new_name) + op._rename_output(old_name, new_name) def _create_var(self, block, ref_var, name): """ @@ -4823,48 +4820,33 @@ class PipelineOptimizer(object): def _rename_gradient_var_name(self, block): for index, op in enumerate(block.ops): - if self._is_backward_op(op) and ( - self._op_role_var_key in op.attr_names): - op_role_var = op.attr(self._op_role_var_key) - - if len(op_role_var) == 0: - continue - for i in range(0, len(op_role_var), 2): - grad_name = op_role_var[i + 1] - grad_var = block.vars[grad_name] - new_grad_var_name = unique_name.generate(grad_name) - new_var = self._create_var(block, grad_var, - new_grad_var_name) - new_var.persistable = False - self._rename_arg(op, grad_name, new_grad_var_name) + if not self._is_optimize_op(op): continue + input_names = op.input_arg_names + output_names = op.output_arg_names + in_out_names = input_names + output_names + # append "MERGED" to the names of parameter gradients, + # and mofify the op_role_var attribute (by rename_arg func). + for name in in_out_names: + if not core.grad_var_suffix() in name: continue + param_name = name.strip(core.grad_var_suffix()) + new_grad_name = name + "@MERGED" + self._rename_arg(op, name, new_grad_name) def _accumulate_gradients(self, block, pp_allreduce_in_optimize=False): """ - Accumulate the gradients generated in microbatch to the one in mini-batch. + Create a new merged gradient for each parameter and accumulate the + corresponding gradient to it. """ - # the name of real grad vars that should be allreduce - # accumulated_gradient_names = [] - - first_optimize_op_index = None - accumulated_grad_names = [] for index, op in reversed(tuple(enumerate(list(block.ops)))): # remove the cast op of fp16 grad to fp32 grad if self._is_optimize_op(op) and op.type == 'cast': in_name = op.input_arg_names[0] out_name = op.output_arg_names[0] - if out_name.strip('@GRAD') in self._param_device_map: + if out_name.strip('@GRAD@MERGED') in self._param_device_map: assert in_name.replace('.cast_fp16', '') == out_name block._remove_op(index) continue - if not self._is_optimize_op(op) and not first_optimize_op_index: - first_optimize_op_index = index + 1 - if block.ops[ - first_optimize_op_index].type == 'c_sync_comm_stream': - block.ops[first_optimize_op_index]._set_attr( - self._op_role_key, self._op_role.Backward) - first_optimize_op_index += 1 - if self._is_backward_op(op) and ( self._op_role_var_key in op.attr_names): op_role_var = op.attr(self._op_role_var_key) @@ -4872,143 +4854,80 @@ class PipelineOptimizer(object): if len(op_role_var) == 0: continue assert len(op_role_var) % 2 == 0 + op._remove_attr(self._op_role_var_key) for i in range(0, len(op_role_var), 2): - offset = 0 + offset = 1 param_name = op_role_var[i] - - if not pp_allreduce_in_optimize: - if not block.has_var(param_name): - continue - - if '@BroadCast' in param_name: - param_name = param_name[0:param_name.find('@BroadCast')] + assert block.has_var(param_name), ( + "parameter {} not in " + "current block.".format(param_name)) # clear gradient assert param_name in self.origin_main_block.vars, "[{}] not in original main block".format( param_name) param_grad_name = self._append_grad_suffix(param_name) - if not block.has_var(param_grad_name): - self._create_var( - block, self.origin_main_block.vars[param_name], - param_grad_name) - assert block.has_var(param_grad_name) + merged_param_grad_name = param_grad_name + '@MERGED' + if not block.has_var(merged_param_grad_name): + self._create_var(block, block.vars[param_name], + merged_param_grad_name) + assert block.has_var(merged_param_grad_name) param_grad_var = block.var(param_grad_name) - param_grad_var.persistable = True + merged_param_grad_var = block.var(merged_param_grad_name) + merged_param_grad_var.persistable = True block._insert_op( - index=first_optimize_op_index + offset, + index=index + offset, type='fill_constant', inputs={}, - outputs={'Out': [param_grad_var]}, + outputs={'Out': [merged_param_grad_var]}, attrs={ - 'shape': param_grad_var.shape, - 'dtype': param_grad_var.dtype, + 'shape': merged_param_grad_var.shape, + 'dtype': merged_param_grad_var.dtype, 'value': float(0), - # self._op_device_key: device, # a trick to run this op once per mini-batch self._op_role_key: self._op_role.Optimize.LRSched, }) - #offset += 1 - grad_name = op_role_var[i + 1] # with _0 suffix + offset += 1 + grad_name = op_role_var[i + 1] grad_var = block.vars[grad_name] - #real_grad_name = grad_name[0:grad_name.find( - # '@GRAD')] + '@GRAD' # without _0 suffix - #real_grad_var = block.vars[ - # real_grad_name] # without _0 suffix - # new_grad_var_name = unique_name.generate(grad_name) - # new_var = self._create_var(block, grad_var, - # new_grad_var_name) - # new_var.persistable = False - # self._rename_arg(op, grad_name, new_grad_var_name) if not 'cast_fp16' in grad_name: block._insert_op( - index=index + 1, + index=index + offset, type='sum', - inputs={'X': [grad_var, param_grad_var]}, - outputs={'Out': param_grad_var}, + inputs={'X': [grad_var, merged_param_grad_var]}, + outputs={'Out': merged_param_grad_var}, attrs={ - #self._op_device_key: device, self._op_role_key: self._op_role.Backward, - #self._op_role_var_key: op_role_var }) - #offset += 1 - accumulated_grad_names.append(param_grad_var.name) + offset += 1 else: - grad_name = op_role_var[i + 1] # with _0 suffix - grad_var = block.vars[grad_name] - #fp32_grad_var_name = param_name + core.grad_var_suffix( - #) # without _0 suffix - #fp32_grad_var = block.vars[fp32_grad_var_name] - #fp32_grad_var.persistable = True - cast_grad_var_name = unique_name.generate( - param_grad_name) + # cast gradient to fp32 to accumulate to merged gradient + cast_grad_var_name = param_grad_name + '@TMP' cast_grad_var = self._create_var(block, param_grad_var, cast_grad_var_name) cast_grad_var.persistable = False block._insert_op( - index=index + 1, + index=index + offset, type='cast', inputs={'X': grad_var}, outputs={'Out': cast_grad_var}, attrs={ 'in_dtype': grad_var.dtype, 'out_dtype': cast_grad_var.dtype, - # self._op_device_key: device, self._op_role_key: self._op_role.Backward, - # self._op_role_var_key: op_role_var }) offset += 1 block._insert_op( - index=index + 2, + index=index + offset, type='sum', - inputs={'X': [param_grad_var, cast_grad_var]}, - outputs={'Out': param_grad_var}, + inputs={ + 'X': [merged_param_grad_var, cast_grad_var] + }, + outputs={'Out': merged_param_grad_var}, attrs={ # self._op_device_key: device, self._op_role_key: self._op_role.Backward, - # self._op_role_var_key: op_role_var + self._op_role_var_key: op_role_var }) offset += 1 - accumulated_grad_names.append(param_grad_var.name) - #real_grad_name = grad_name[0:grad_name.find( - # '@GRAD')] + '@GRAD' - #real_grad_var = block.vars[ - # real_grad_name] # without _0 suffix - #block._insert_op( - # index=first_optimize_op_index + offset, - # type='cast', - # inputs={'X': fp32_grad_var}, - # outputs={'Out': cast_var}, - # attrs={ - # 'in_dtype': fp32_grad_var.dtype, - # 'out_dtype': cast_var.dtype, - # # self._op_device_key: device, - # self._op_role_key: self._op_role.Backward, - # # self._op_role_var_key: op_role_var - # }) - #offset += 1 - #block._insert_op( - # index=first_optimize_op_index + offset, - # type='sum', - # inputs={'X': [grad_var, cast_var]}, - # outputs={'Out': real_grad_var}, - # attrs={ - # # self._op_device_key: device, - # self._op_role_key: self._op_role.Backward, - # # self._op_role_var_key: op_role_var - # }) - #offset += 1 - #block._insert_op( - # index=first_optimize_op_index + offset, - # type='cast', - # inputs={'X': real_grad_var}, - # outputs={'Out': fp32_grad_var}, - # attrs={ - # 'in_dtype': real_grad_var.dtype, - # 'out_dtype': fp32_grad_var.dtype, - # # self._op_device_key: device, - # self._op_role_key: self._op_role.Backward, - # # self._op_role_var_key: op_role_var - # }) - return accumulated_grad_names def _add_sub_blocks(self, main_block, program_list): main_program = main_block.program @@ -5351,7 +5270,9 @@ class PipelineOptimizer(object): if real_block.has_var(param): param_list.append(param) #self._clear_gradients(real_block, param_list) self._rename_gradient_var_name(real_block) + real_block._sync_with_cpp() self._accumulate_gradients(real_block) + real_block._sync_with_cpp() place_id = int(os.getenv("FLAGS_selected_gpus", "0")) main_program._pipeline_opt = {