diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto old mode 100644 new mode 100755 index 300d4e49de72febdb69f8f81f9da2f448a792084..c965363d701c5245a825ac1c54326bee0a714745 --- a/paddle/fluid/framework/distributed_strategy.proto +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -38,6 +38,7 @@ message ShardingConfig { optional int32 acc_steps = 7 [ default = 1 ]; optional int32 schedule_mode = 8 [ default = 0 ]; optional int32 pp_bz = 9 [ default = 1 ]; + optional bool pp_allreduce_in_optimize = 10 [ default = true ]; } message AMPConfig { diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py index 6fc82db7462f710e109ace7fa77ebd2fd4b7b958..744a7b857780a86a6901e47326587c172f00590b 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py @@ -88,7 +88,7 @@ def check_allreduce_sum(block, shard, sharding_ring_id, dp_ring_id=-1): grad: - 0: op that generate Var - 1: sync_calc - - 2: allreduce_sum_sharding + - 2: reduce_sum_sharding (allreduce --> reduce) - 3: sync_comm - 4: allreuce_sum_dp (dp_grads) - 5: sync_comm (dp_grads) @@ -103,7 +103,7 @@ def check_allreduce_sum(block, shard, sharding_ring_id, dp_ring_id=-1): idx_gradient_clip_allreduce = -1 for idx, op in enumerate(block.ops): - if op.type == "c_allreduce_sum": + if op.type == "c_allreduce_sum" or op.type == "c_reduce_sum": if op.all_attrs()["use_calc_stream"] == False: ring_id = op.desc.attr("ring_id") var_name = op.desc.input_arg_names()[0] @@ -137,11 +137,12 @@ def check_allreduce_sum(block, shard, sharding_ring_id, dp_ring_id=-1): var_name] == 0: dp_grads_status[var_name] = 1 - elif op.type == "c_allreduce_sum": + elif op.type == "c_allreduce_sum" or op.type == "c_reduce_sum": if op.all_attrs()["use_calc_stream"] == False: var_name = op.desc.input_arg_names()[0] ring_id = op.desc.attr("ring_id") if ring_id == sharding_ring_id: + assert op.type == "c_reduce_sum", "Grad in Sharding group should be reduce rather than allreduce" if var_name in vars_status: _status = vars_status[var_name] else: @@ -191,6 +192,9 @@ def check_allreduce_sum(block, shard, sharding_ring_id, dp_ring_id=-1): raise ValueError("There should be a sync_comm op " "after allreduce the Var: {}".format( input_name)) + raise ValueError( + "The reduce output grad [{}] should NOT be be used in Non-root rank.". + format(input_name)) if input_name in dp_grads_status: if dp_ring_id == -1: if dp_grads_status[input_name] != 3: @@ -352,7 +356,9 @@ def get_grad_device(grad_name, shard): grad_name) base_name = None # mind the traversal order - possible_suffixes = ['.cast_fp16@GRAD', '@GRAD'] + possible_suffixes = [ + '.cast_fp16@GRAD_0', '.cast_fp16@GRAD', '@GRAD_0', '@GRAD' + ] for suffix in possible_suffixes: if suffix in grad_name: base_name = re.sub(suffix, '', grad_name) @@ -369,7 +375,7 @@ def insert_reduce_ops(block, ring_id, reduce_vars, shard, - op_role, + op_role=OpRole.Backward, use_calc_stream=False): """ _add_allreduce_ops @@ -389,10 +395,18 @@ def insert_reduce_ops(block, 'use_calc_stream': use_calc_stream, OP_ROLE_KEY: op_role }) - return +def get_first_check_finite_and_unscale_op_idx(block): + + for idx, op in enumerate(block.ops): + if op.type == "check_finite_and_unscale": + return idx + + raise ValueError("check_finite_and_unscale does not exist in block") + + def insert_broadcast_ops(block, insert_idx, ring_id, broadcast2root): """ _add_broadcast_ops diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index e44b34255bd7b6fdc59c40735da53505e9328ffd..dd637c8ef71595cfce9ac6b140ff91ea2b2b0431 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -100,6 +100,8 @@ class ShardingOptimizer(MetaOptimizerBase): self.schedule_mode = self.user_defined_strategy.sharding_configs[ "schedule_mode"] self.pp_bz = self.user_defined_strategy.sharding_configs["pp_bz"] + self.pp_allreduce_in_optimize = self.user_defined_strategy.sharding_configs[ + "pp_allreduce_in_optimize"] if self.inner_opt is None: raise ValueError( @@ -179,6 +181,7 @@ class ShardingOptimizer(MetaOptimizerBase): self._initialization_broadcast(startup_program) if self.use_pipeline: + # pp_optimizer._rename_gradient_var_name(main_block) # crop ops for idx, op in reversed(list(enumerate(main_block.ops))): # if op.type == 'fill_constant' and int(op.attr('op_role')) == 16: @@ -207,20 +210,22 @@ class ShardingOptimizer(MetaOptimizerBase): # param_list.append(param_name) #pp_optimizer._clear_gradients(main_block, param_list) accumulated_grad_names = pp_optimizer._accumulate_gradients( - main_block) - # accumulated_grad_names = sorted(accumulated_grad_names) - print("persistable FP32 grad: ") - print(accumulated_grad_names) - first_optimize_op_index = get_first_check_finite_and_unscale_op_idx( - main_block) - insert_reduce_ops( main_block, - first_optimize_op_index, - self.sharding_ring_id, - accumulated_grad_names, - self._shard, - core.op_proto_and_checker_maker.OpRole.Optimize, - use_calc_stream=True) + pp_allreduce_in_optimize=self.pp_allreduce_in_optimize) + # accumulated_grad_names = sorted(accumulated_grad_names) + if self.pp_allreduce_in_optimize: + print("persistable FP32 grad: ") + print(accumulated_grad_names) + first_optimize_op_index = get_first_check_finite_and_unscale_op_idx( + main_block) + insert_reduce_ops( + main_block, + first_optimize_op_index, + self.sharding_ring_id, + accumulated_grad_names, + self._shard, + core.op_proto_and_checker_maker.OpRole.Optimize, + use_calc_stream=True) #if not self._shard.has_param(param_name): continue ##if not main_block.has_var(grad_name): continue #assert main_block.has_var(grad_name) @@ -240,130 +245,130 @@ class ShardingOptimizer(MetaOptimizerBase): # 'op_role': core.op_proto_and_checker_maker.OpRole.LRSched, # }) - #def _create_var(block, ref_var, name): - # """ - # Create a new var for block, which has the same type, - # shape and dtype as ref_var, then rename it with the - # name `name`. - # """ - # new_var = block.create_var( - # name=name, - # shape=ref_var.shape, - # dtype=ref_var.dtype, - # type=ref_var.type, - # lod_level=ref_var.lod_level, - # persistable=ref_var.persistable, - # is_data=ref_var.is_data, - # need_check_feed=ref_var.desc.need_check_feed()) - # new_var.stop_gradient = ref_var.stop_gradient - # return new_var - - #def _rename_arg(op, old_name, new_name): - # op_desc = op.desc - # if isinstance(op_desc, tuple): - # op_desc = op_desc[0] - # op_desc._rename_input(old_name, new_name) - # op_desc._rename_output(old_name, new_name) - - #print("params_grads:", params_grads) - #for param_name, grad_name in params_grads: - # if not self._shard.has_param(param_name): continue - # #if not main_block.has_var(grad_name): continue - # assert main_block.has_var(grad_name) - # use_fp16 = False - # fp16_grad_name = param_name + '.cast_fp16@GRAD' - # if main_block.has_var(grad_name): - # fp16_grad_var = main_block.vars[fp16_grad_name] - # use_fp16 = True - # grad_var = main_block.vars[grad_name] - # if use_fp16: - # cast_grad_var_name = paddle.fluid.unique_name.generate( - # grad_name) - # cast_var = _create_var(main_block, fp16_grad_var, - # cast_grad_var_name) - # cast_var.persistable = False - # main_block.append_op( - # #index=offset + 1, - # type='cast', - # inputs={'X': grad_var}, - # outputs={'Out': cast_var}, - # attrs={ - # 'in_dtype': grad_var.dtype, - # 'out_dtype': cast_var.dtype, - # 'op_role': - # core.op_proto_and_checker_maker.OpRole.Backward, - # }) - # #offset += 1 - # main_block.append_op( - # #index=offset + 1, - # type='sum', - # inputs={'X': [fp16_grad_var, cast_var]}, - # outputs={'Out': fp16_grad_var}, - # attrs={ - # 'op_role': - # core.op_proto_and_checker_maker.OpRole.Backward, - # 'op_role_var': op_role_var - # }) - - # for index, op in reversed(tuple(enumerate(list(main_block.ops)))): - # offset = index - # if is_backward_op(op) and ( - # 'op_role_var' in op.attr_names): - # op_role_var = op.all_attrs()['op_role_var'] - - # if len(op_role_var) == 0: - # continue - # assert len(op_role_var) % 2 == 0 - # offset = index - # for i in range(0, len(op_role_var), 2): - # grad_name = op_role_var[i + 1] - # if not main_block.has_var(grad_name): continue - # grad_var = main_block.vars[grad_name] - # if not 'cast_fp16' in grad_name: - # new_grad_var_name = paddle.fluid.unique_name.generate(grad_name) - # new_var = _create_var(main_block, grad_var, - # new_grad_var_name) - # new_var.persistable = False - # _rename_arg(op, grad_name, new_grad_var_name) - # main_block._insert_op( - # index=offset + 1, - # type='sum', - # inputs={'X': [grad_var, new_var]}, - # outputs={'Out': grad_var}, - # attrs={ - # 'op_role': core.op_proto_and_checker_maker.OpRole.Backward, - # 'op_role_var': op_role_var - # }) - # offset += 1 - # if 'cast_fp16' in grad_name: - # param_name = op_role_var[i] - # fp32_grad_var_name = param_name + "@GRAD" - # fp32_grad_var = main_block.vars[grad_name] - # cast_grad_var_name = paddle.fluid.unique_name.generate( - # fp32_grad_var_name) - # cast_var = _create_var(main_block, grad_var, - # cast_grad_var_name) - # cast_var.persistable = False - # main_block._insert_op( - # index=offset + 1, - # type='cast', - # inputs={'X': fp32_grad_var}, - # outputs={'Out': cast_var}, - # attrs={ - # 'in_dtype': fp32_grad_var.dtype, - # 'out_dtype': cast_var.dtype, - # 'op_role': core.op_proto_and_checker_maker.OpRole.Backward, - # # self._op_role_var_key: op_role_var - # }) - # offset += 1 - # main_block._insert_op( - # index=offset + 1, - # type='sum', - # inputs={'X': [grad_var, cast_var]}, - # outputs={'Out': grad_var}, - # attrs={ - # 'op_role': core.op_proto_and_checker_maker.OpRole.Backward, - # 'op_role_var': op_role_var}) + #def _create_var(block, ref_var, name): + # """ + # Create a new var for block, which has the same type, + # shape and dtype as ref_var, then rename it with the + # name `name`. + # """ + # new_var = block.create_var( + # name=name, + # shape=ref_var.shape, + # dtype=ref_var.dtype, + # type=ref_var.type, + # lod_level=ref_var.lod_level, + # persistable=ref_var.persistable, + # is_data=ref_var.is_data, + # need_check_feed=ref_var.desc.need_check_feed()) + # new_var.stop_gradient = ref_var.stop_gradient + # return new_var + + #def _rename_arg(op, old_name, new_name): + # op_desc = op.desc + # if isinstance(op_desc, tuple): + # op_desc = op_desc[0] + # op_desc._rename_input(old_name, new_name) + # op_desc._rename_output(old_name, new_name) + + #print("params_grads:", params_grads) + #for param_name, grad_name in params_grads: + # if not self._shard.has_param(param_name): continue + # #if not main_block.has_var(grad_name): continue + # assert main_block.has_var(grad_name) + # use_fp16 = False + # fp16_grad_name = param_name + '.cast_fp16@GRAD' + # if main_block.has_var(grad_name): + # fp16_grad_var = main_block.vars[fp16_grad_name] + # use_fp16 = True + # grad_var = main_block.vars[grad_name] + # if use_fp16: + # cast_grad_var_name = paddle.fluid.unique_name.generate( + # grad_name) + # cast_var = _create_var(main_block, fp16_grad_var, + # cast_grad_var_name) + # cast_var.persistable = False + # main_block.append_op( + # #index=offset + 1, + # type='cast', + # inputs={'X': grad_var}, + # outputs={'Out': cast_var}, + # attrs={ + # 'in_dtype': grad_var.dtype, + # 'out_dtype': cast_var.dtype, + # 'op_role': + # core.op_proto_and_checker_maker.OpRole.Backward, + # }) + # #offset += 1 + # main_block.append_op( + # #index=offset + 1, + # type='sum', + # inputs={'X': [fp16_grad_var, cast_var]}, + # outputs={'Out': fp16_grad_var}, + # attrs={ + # 'op_role': + # core.op_proto_and_checker_maker.OpRole.Backward, + # 'op_role_var': op_role_var + # }) + + # for index, op in reversed(tuple(enumerate(list(main_block.ops)))): + # offset = index + # if is_backward_op(op) and ( + # 'op_role_var' in op.attr_names): + # op_role_var = op.all_attrs()['op_role_var'] + + # if len(op_role_var) == 0: + # continue + # assert len(op_role_var) % 2 == 0 + # offset = index + # for i in range(0, len(op_role_var), 2): + # grad_name = op_role_var[i + 1] + # if not main_block.has_var(grad_name): continue + # grad_var = main_block.vars[grad_name] + # if not 'cast_fp16' in grad_name: + # new_grad_var_name = paddle.fluid.unique_name.generate(grad_name) + # new_var = _create_var(main_block, grad_var, + # new_grad_var_name) + # new_var.persistable = False + # _rename_arg(op, grad_name, new_grad_var_name) + # main_block._insert_op( + # index=offset + 1, + # type='sum', + # inputs={'X': [grad_var, new_var]}, + # outputs={'Out': grad_var}, + # attrs={ + # 'op_role': core.op_proto_and_checker_maker.OpRole.Backward, + # 'op_role_var': op_role_var + # }) + # offset += 1 + # if 'cast_fp16' in grad_name: + # param_name = op_role_var[i] + # fp32_grad_var_name = param_name + "@GRAD" + # fp32_grad_var = main_block.vars[grad_name] + # cast_grad_var_name = paddle.fluid.unique_name.generate( + # fp32_grad_var_name) + # cast_var = _create_var(main_block, grad_var, + # cast_grad_var_name) + # cast_var.persistable = False + # main_block._insert_op( + # index=offset + 1, + # type='cast', + # inputs={'X': fp32_grad_var}, + # outputs={'Out': cast_var}, + # attrs={ + # 'in_dtype': fp32_grad_var.dtype, + # 'out_dtype': cast_var.dtype, + # 'op_role': core.op_proto_and_checker_maker.OpRole.Backward, + # # self._op_role_var_key: op_role_var + # }) + # offset += 1 + # main_block._insert_op( + # index=offset + 1, + # type='sum', + # inputs={'X': [grad_var, cast_var]}, + # outputs={'Out': grad_var}, + # attrs={ + # 'op_role': core.op_proto_and_checker_maker.OpRole.Backward, + # 'op_role_var': op_role_var}) main_block._sync_with_cpp() with open("start_sharding_%d" % self.role_maker._worker_index(), @@ -540,7 +545,10 @@ class ShardingOptimizer(MetaOptimizerBase): self._main_program.global_block().var(input_name)) # find reduce vars - if not self.use_pipeline: + if self.use_pipeline and self.pp_allreduce_in_optimize: + # place pipeline gradient allreduce in optimize + pass + else: if is_backward_op(op) and \ OP_ROLE_VAR_KEY in op.attr_names: op_role_var = op.all_attrs()[OP_ROLE_VAR_KEY] @@ -678,7 +686,7 @@ class ShardingOptimizer(MetaOptimizerBase): if len(self._segments) < 1: return # sharding - if self.use_pipeline: + if self.use_pipeline and self.pp_allreduce_in_optimize: for idx in range(len(self._segments)): assert len(self._segments[idx]._allreduce_vars) == 0 @@ -693,9 +701,15 @@ class ShardingOptimizer(MetaOptimizerBase): insert_sync_comm_ops(block, self._segments[-1]._end_idx, self.sharding_ring_id, self._segments[-1]._allreduce_vars) - insert_allreduce_ops(block, self._segments[-1]._end_idx, - self.sharding_ring_id, - self._segments[-1]._allreduce_vars) + # allreduce --> reduce + insert_reduce_ops( + block, + self._segments[-1]._end_idx, + self.sharding_ring_id, + self._segments[-1]._allreduce_vars, + self._shard, + op_role=OpRole.Backward, + use_calc_stream=False) for idx, segment in reversed(list(enumerate(self._segments))): allreduce_vars = self._segments[ @@ -775,8 +789,15 @@ class ShardingOptimizer(MetaOptimizerBase): insert_sync_comm_ops(block, segment._start_idx, self.sharding_ring_id, allreduce_vars) # sharding - insert_allreduce_ops(block, segment._start_idx, - self.sharding_ring_id, allreduce_vars) + # allreduce --> reduce + insert_reduce_ops( + block, + segment._start_idx, + self.sharding_ring_id, + allreduce_vars, + self._shard, + op_role=OpRole.Backward, + use_calc_stream=False) block._sync_with_cpp() @@ -829,12 +850,6 @@ class ShardingOptimizer(MetaOptimizerBase): def _init_comm(self): - # sharding alone mode - # self.sharding_ring_id = 0 - # self.sharding_rank = self.global_rank - # self.sharding_group_endpoints = self.endpoints[:] - # self.sharding_group_size = len(self.endpoints) - if self.hybrid_dp: assert self._as_outer_parallelism == False, "hybrid dp is conflict when using sharding as outer parallelism" self.sharding_group_size = self.user_defined_strategy.sharding_configs[ @@ -854,7 +869,6 @@ class ShardingOptimizer(MetaOptimizerBase): ep for idx, ep in enumerate(self.endpoints) if (idx % self.sharding_group_size) == self.sharding_rank ] - # self.global_group_endpoints = self.role_maker._get_trainer_endpoints()[:] assert self.global_word_size > self.sharding_group_size, \ "global_word_size: {} should be larger than sharding_group_size: {}".format(self.global_word_size, self.sharding_group_size) diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 347c27e6fcb18efc615a3611881583833f576651..6f43fae7013bd409bca0ca1595f7b0763987624b 100644 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -4838,7 +4838,7 @@ class PipelineOptimizer(object): new_var.persistable = False self._rename_arg(op, grad_name, new_grad_var_name) - def _accumulate_gradients(self, block): + def _accumulate_gradients(self, block, pp_allreduce_in_optimize=False): """ Accumulate the gradients generated in microbatch to the one in mini-batch. """ @@ -4875,7 +4875,11 @@ class PipelineOptimizer(object): for i in range(0, len(op_role_var), 2): offset = 0 param_name = op_role_var[i] - # if not block.has_var(param_name): continue + + if not pp_allreduce_in_optimize: + if not block.has_var(param_name): + continue + if '@BroadCast' in param_name: param_name = param_name[0:param_name.find('@BroadCast')] # clear gradient