diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto index e7a25de96a94711dd3c58c2c955a15f0b58e7917..28eebeb4d9bdc29787a0a9480a27a4fdf29d3d74 100644 --- a/paddle/fluid/framework/distributed_strategy.proto +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -133,6 +133,10 @@ message GradientScaleConfig { // Else if sum, the gradient will accumulated among multiple // devices. optional string scale_strategy = 1 [ default = 'avg' ]; + // The avg_loss flag is used to determine the position of average + // If scale_gradient is False, it will avg the loss@Grad before grad merge. + // Otherwise, it will do grad merge firstly, then avg the grad after merging. + optional bool scale_gradient = 2 [ default = false ]; } message AsyncConfig { diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index 18211459a4e0833ddafcbc56e1dd50e7543cbce3..8b75c57fab4074304e309b47f6e369d04f368f07 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -18,7 +18,7 @@ import paddle.fluid as fluid from paddle.static import default_startup_program, device_guard from paddle.fluid import layers -from .common import OpRole, OP_ROLE_VAR_KEY, CollectiveHelper +from .common import OpRole, OP_ROLE_VAR_KEY, CollectiveHelper, OP_ROLE_KEY from .common import is_backward_op, is_optimizer_op, is_update_op from .meta_optimizer_base import MetaOptimizerBase from .sharding.shard import Shard, ProgramSegment @@ -193,6 +193,14 @@ class ShardingOptimizer(MetaOptimizerBase): else: gm_mode = "pp_gm" gm_acc_step = strategy.pipeline_configs['accumulate_steps'] + gradient_scale_configs = strategy.gradient_scale_configs + assert gradient_scale_configs['scale_strategy'] == 'avg', \ + 'For pipeline mode, the ' 'gradient scale mode should ' \ + 'be "avg", but got {}'.format(gradient_scale_configs['scale_strategy']) + # Note (Yuang Liu): this avg_loss flag determines where to do the average op for grad merge. + # If True, will do sum firstly for gradient merge, then do scale by gm_acc_step. + # If False, will scale loss by gm_acc_step first, then do sum for gradient merge. + self.scale_gradient = gradient_scale_configs['scale_gradient'] if gm_acc_step > 1: logger.info("Gradient merge in [{}], acc step = [{}]".format( gm_mode, gm_acc_step)) @@ -241,6 +249,7 @@ class ShardingOptimizer(MetaOptimizerBase): 'global_ring_id': 3, 'mp_degree': self.mp_degree, 'mp_rank': global_rank % self.mp_degree, + 'scale_gradient': self.scale_gradient } main_program = loss.block.program main_program._pipeline_opt = pipeline_opt @@ -362,6 +371,8 @@ class ShardingOptimizer(MetaOptimizerBase): main_block, strategy=strategy, shard=shard) len_of_ops = len(main_block.ops) + if self.scale_gradient: + self._avg_grad_merge_after_sum(main_block, accumulated_grad_names) first_optimize_op_index = get_first_optimize_op_idx(main_block) if self.pp_allreduce_in_optimize: @@ -429,6 +440,55 @@ class ShardingOptimizer(MetaOptimizerBase): # FIXME(wangxi): if fp16_allreduce, put cast fp16->fp32 to there? + def _avg_grad_merge_after_sum(self, main_block, accumulated_grad_names): + if self.user_defined_strategy.amp and \ + self.user_defined_strategy.amp_configs['use_dynamic_loss_scaling']: + # For AMP, if using dynamic loss scaling the avg + # operation can be simple done by modify the LossScaling op. + for idx, op in enumerate(main_block.ops): + if op.type == 'check_finite_and_unscale': + loss_scale_name = op.input('Scale')[0] + loss_scaling_var = main_block.var(loss_scale_name) + loss_scale_tmp_var_name = loss_scale_name + '@TMP' + loss_scale_tmp_var = main_block.create_var( + name=loss_scale_tmp_var_name, + shape=loss_scaling_var.shape, + dtype=loss_scaling_var.dtype) + main_block._insert_op_without_sync( + idx, + type='scale', + inputs={'X': loss_scaling_var}, + outputs={'Out': loss_scale_tmp_var}, + attrs={ + 'scale': self._gradient_merge_acc_step, + 'bias': 0.0, + 'bias_after_scale': False, + OP_ROLE_KEY: OpRole.Optimize + }) + op._rename_input(loss_scale_name, loss_scale_tmp_var_name) + break + else: + # For pp, do the avg operation for gradient merge after merging + # the gradient to meet the logic for gradient merge under pure dp. + tmp_first_opt_idx = None + for idx, op in enumerate(main_block.ops): + if is_optimizer_op(op) and op.type != 'c_sync_comm_stream': + tmp_first_opt_idx = idx + break + assert tmp_first_opt_idx is not None, 'Occurs some errors, no optimize ops' + for grad in accumulated_grad_names: + main_block._insert_op_without_sync( + tmp_first_opt_idx, + type='scale', + inputs={'X': grad}, + outputs={'Out': grad}, + attrs={ + 'scale': 1.0 / self._gradient_merge_acc_step, + 'bias': 0.0, + 'bias_after_scale': False, + OP_ROLE_KEY: OpRole.Optimize + }) + def _adapt_amp_clip_without_sharding(self): # if not use sharding, adapt amp/clip, for remain parallelism. # cast --> amp --> clip --> opt diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index b81862adf5e65698b8d9c34fa9ea306aba8f9f11..efdd55d856f3988de801acc951f6dba75c752063 100755 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -5820,6 +5820,7 @@ class PipelineOptimizer(object): self.global_ring_id = pipeline_opt['global_ring_id'] self.mp_degree = pipeline_opt['mp_degree'] self.mp_rank = pipeline_opt['mp_rank'] + self.scale_gradient = pipeline_opt.get('scale_gradient', False) assert self.mp_degree >= 1 assert 0 <= self.mp_rank < self.mp_degree @@ -5886,7 +5887,8 @@ class PipelineOptimizer(object): "startup_program": new_startup_program, } real_block = program_list[self.local_rank].global_block() - self._insert_loss_scale(real_block) + if not self.scale_gradient: + self._insert_loss_scale(real_block) if not self.use_sharding: # Step7: clear gradients before each mini-batch and # accumulate gradients during backward diff --git a/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py index 7cb033b748874c08b7ae9defcfcf81ae4659a4e7..c7eaf4e0ff33dbf730a7e52f2cf1b3a71189472e 100755 --- a/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py @@ -1272,6 +1272,201 @@ class TestFleetShardingHybridOptimizer(TestFleetMetaOptimizer): self.assertEqual(dp_group_waiting_ports, ['127.0.0.1:36002']) + def test_hybrid_with_pp_dp_amp_with_gradient_fuse_and_avg_after_sum(self): + train_prog, startup_prog = paddle.fluid.Program(), paddle.fluid.Program( + ) + avg_cost, strategy = self.pp_net(train_prog, startup_prog) + strategy.amp = True + strategy.amp_configs = {'custom_black_varnames': ['fc_6.b_0'], } + strategy.sharding = True + strategy.sharding_configs = { + "sharding_degree": 1, + "mp_degree": 1, + "pp_degree": 2, + "dp_degree": 2, + } + strategy.pipeline = True + strategy.pipeline_configs = { + "schedule_mode": "1F1B", + "micro_batch_size": 2, + "accumulate_steps": 4 + } + strategy.gradient_scale_configs = { + 'scale_strategy': 'avg', + 'scale_gradient': True + } + strategy.fuse_grad_merge = True + self.optimizer(avg_cost, strategy, train_prog, startup_prog) + train_prog = train_prog._pipeline_opt['section_program'] + startup_prog = startup_prog._pipeline_opt['startup_program'] + + startup_prog_ops = startup_prog.global_block().ops + main_prog_ops = train_prog.global_block().ops + + # check program + startup_prog_op_types = [op.type for op in startup_prog_ops] + main_prog_op_types = [op.type for op in main_prog_ops] + + self.assertEqual(startup_prog_op_types, [ + 'uniform_random', 'fill_constant', 'uniform_random', + 'fill_constant', 'uniform_random', 'fill_constant', + 'uniform_random', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'c_gen_nccl_id', 'c_comm_init', + 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', + 'c_gen_nccl_id', 'c_comm_init', 'c_broadcast', 'c_broadcast', + 'c_broadcast', 'c_broadcast', 'c_broadcast', 'c_broadcast', + 'c_broadcast', 'c_broadcast' + ]) + + self.assertEqual(main_prog_op_types, [ + 'recv_v2', 'cast', 'mul', 'cast', 'elementwise_add', 'tanh', 'cast', + 'mul', 'cast', 'elementwise_add', 'tanh', 'cast', 'mul', 'cast', + 'elementwise_add', 'tanh', 'cast', 'mul', 'cast', 'elementwise_add', + 'softmax', 'cross_entropy2', 'mean', 'elementwise_mul', + 'coalesce_tensor', 'coalesce_tensor', 'coalesce_tensor', + 'coalesce_tensor', 'fill_constant', 'elementwise_mul_grad', + 'mean_grad', 'cross_entropy_grad2', 'softmax_grad', + 'elementwise_add_grad', 'cast', 'mul_grad', 'tanh_grad', + 'elementwise_add_grad', 'mul_grad', 'tanh_grad', + 'elementwise_add_grad', 'mul_grad', 'tanh_grad', + 'elementwise_add_grad', 'mul_grad', 'c_sync_calc_stream', 'send_v2', + 'cast', 'sum', 'sum', 'c_allreduce_sum', 'c_allreduce_sum', + 'c_sync_comm_stream', 'scale', 'check_finite_and_unscale', 'cast', + 'c_allreduce_max', 'cast', 'update_loss_scaling', 'momentum', + 'momentum', 'momentum', 'momentum', 'momentum', 'momentum', + 'momentum', 'momentum' + ]) + + def test_hybrid_with_pp_dp_with_gradient_fuse_and_avg_after_sum(self): + train_prog, startup_prog = paddle.fluid.Program(), paddle.fluid.Program( + ) + avg_cost, strategy = self.pp_net(train_prog, startup_prog) + strategy.sharding = True + strategy.sharding_configs = { + "sharding_degree": 1, + "mp_degree": 1, + "pp_degree": 2, + "dp_degree": 2, + } + strategy.pipeline = True + strategy.pipeline_configs = { + "schedule_mode": "1F1B", + "micro_batch_size": 2, + "accumulate_steps": 4 + } + strategy.gradient_scale_configs = { + 'scale_strategy': 'avg', + 'scale_gradient': True + } + strategy.fuse_grad_merge = True + self.optimizer(avg_cost, strategy, train_prog, startup_prog) + train_prog = train_prog._pipeline_opt['section_program'] + startup_prog = startup_prog._pipeline_opt['startup_program'] + + startup_prog_ops = startup_prog.global_block().ops + main_prog_ops = train_prog.global_block().ops + + # check program + startup_prog_op_types = [op.type for op in startup_prog_ops] + main_prog_op_types = [op.type for op in main_prog_ops] + + self.assertEqual(startup_prog_op_types, [ + 'uniform_random', 'fill_constant', 'uniform_random', + 'fill_constant', 'uniform_random', 'fill_constant', + 'uniform_random', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'fill_constant', 'c_gen_nccl_id', + 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', + 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', 'c_broadcast', + 'c_broadcast', 'c_broadcast', 'c_broadcast', 'c_broadcast', + 'c_broadcast', 'c_broadcast', 'c_broadcast' + ]) + + self.assertEqual(main_prog_op_types, [ + 'recv_v2', 'mul', 'elementwise_add', 'tanh', 'mul', + 'elementwise_add', 'tanh', 'mul', 'elementwise_add', 'tanh', 'mul', + 'elementwise_add', 'softmax', 'cross_entropy2', 'mean', + 'coalesce_tensor', 'coalesce_tensor', 'fill_constant', 'mean_grad', + 'cross_entropy_grad2', 'softmax_grad', 'elementwise_add_grad', + 'mul_grad', 'tanh_grad', 'elementwise_add_grad', 'mul_grad', + 'tanh_grad', 'elementwise_add_grad', 'mul_grad', 'tanh_grad', + 'elementwise_add_grad', 'mul_grad', 'c_sync_calc_stream', 'send_v2', + 'sum', 'c_allreduce_sum', 'c_sync_comm_stream', 'scale', 'momentum', + 'momentum', 'momentum', 'momentum', 'momentum', 'momentum', + 'momentum', 'momentum' + ]) + + def test_hybrid_with_pp_dp_with_amp_no_dynamic_gradient_fuse_and_avg_after_sum( + self): + train_prog, startup_prog = paddle.fluid.Program(), paddle.fluid.Program( + ) + avg_cost, strategy = self.pp_net(train_prog, startup_prog) + strategy.sharding = True + strategy.sharding_configs = { + "sharding_degree": 1, + "mp_degree": 1, + "pp_degree": 2, + "dp_degree": 2, + } + strategy.amp = True + strategy.amp_configs = { + 'custom_black_varnames': ['fc_6.b_0'], + 'use_dynamic_loss_scaling': False + } + strategy.pipeline = True + strategy.pipeline_configs = { + "schedule_mode": "1F1B", + "micro_batch_size": 2, + "accumulate_steps": 4 + } + strategy.gradient_scale_configs = { + 'scale_strategy': 'avg', + 'scale_gradient': True + } + strategy.fuse_grad_merge = True + self.optimizer(avg_cost, strategy, train_prog, startup_prog) + train_prog = train_prog._pipeline_opt['section_program'] + startup_prog = startup_prog._pipeline_opt['startup_program'] + + startup_prog_ops = startup_prog.global_block().ops + main_prog_ops = train_prog.global_block().ops + + # check program + startup_prog_op_types = [op.type for op in startup_prog_ops] + main_prog_op_types = [op.type for op in main_prog_ops] + + self.assertEqual(startup_prog_op_types, [ + 'uniform_random', 'fill_constant', 'uniform_random', + 'fill_constant', 'uniform_random', 'fill_constant', + 'uniform_random', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', + 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', + 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', + 'c_broadcast', 'c_broadcast', 'c_broadcast', 'c_broadcast', + 'c_broadcast', 'c_broadcast', 'c_broadcast', 'c_broadcast' + ]) + + self.assertEqual(main_prog_op_types, [ + 'recv_v2', 'cast', 'mul', 'cast', 'elementwise_add', 'tanh', 'cast', + 'mul', 'cast', 'elementwise_add', 'tanh', 'cast', 'mul', 'cast', + 'elementwise_add', 'tanh', 'cast', 'mul', 'cast', 'elementwise_add', + 'softmax', 'cross_entropy2', 'mean', 'elementwise_mul', + 'coalesce_tensor', 'coalesce_tensor', 'coalesce_tensor', + 'coalesce_tensor', 'fill_constant', 'elementwise_mul_grad', + 'mean_grad', 'cross_entropy_grad2', 'softmax_grad', + 'elementwise_add_grad', 'cast', 'mul_grad', 'tanh_grad', + 'elementwise_add_grad', 'mul_grad', 'tanh_grad', + 'elementwise_add_grad', 'mul_grad', 'tanh_grad', + 'elementwise_add_grad', 'mul_grad', 'c_sync_calc_stream', 'send_v2', + 'cast', 'sum', 'sum', 'c_allreduce_sum', 'c_allreduce_sum', + 'c_sync_comm_stream', 'scale', 'scale', 'check_finite_and_unscale', + 'momentum', 'momentum', 'momentum', 'momentum', 'momentum', + 'momentum', 'momentum', 'momentum' + ]) + if __name__ == "__main__": unittest.main()