未验证 提交 03d8304f 编写于 作者: Y Yuang Liu 提交者: GitHub

[hybrid enhance] add flag to control the avg position for grad merge under pipeline mode (#36384)

上级 b857d755
......@@ -133,6 +133,10 @@ message GradientScaleConfig {
// Else if sum, the gradient will accumulated among multiple
// devices.
optional string scale_strategy = 1 [ default = 'avg' ];
// The avg_loss flag is used to determine the position of average
// If scale_gradient is False, it will avg the loss@Grad before grad merge.
// Otherwise, it will do grad merge firstly, then avg the grad after merging.
optional bool scale_gradient = 2 [ default = false ];
}
message AsyncConfig {
......
......@@ -18,7 +18,7 @@ import paddle.fluid as fluid
from paddle.static import default_startup_program, device_guard
from paddle.fluid import layers
from .common import OpRole, OP_ROLE_VAR_KEY, CollectiveHelper
from .common import OpRole, OP_ROLE_VAR_KEY, CollectiveHelper, OP_ROLE_KEY
from .common import is_backward_op, is_optimizer_op, is_update_op
from .meta_optimizer_base import MetaOptimizerBase
from .sharding.shard import Shard, ProgramSegment
......@@ -193,6 +193,14 @@ class ShardingOptimizer(MetaOptimizerBase):
else:
gm_mode = "pp_gm"
gm_acc_step = strategy.pipeline_configs['accumulate_steps']
gradient_scale_configs = strategy.gradient_scale_configs
assert gradient_scale_configs['scale_strategy'] == 'avg', \
'For pipeline mode, the ' 'gradient scale mode should ' \
'be "avg", but got {}'.format(gradient_scale_configs['scale_strategy'])
# Note (Yuang Liu): this avg_loss flag determines where to do the average op for grad merge.
# If True, will do sum firstly for gradient merge, then do scale by gm_acc_step.
# If False, will scale loss by gm_acc_step first, then do sum for gradient merge.
self.scale_gradient = gradient_scale_configs['scale_gradient']
if gm_acc_step > 1:
logger.info("Gradient merge in [{}], acc step = [{}]".format(
gm_mode, gm_acc_step))
......@@ -241,6 +249,7 @@ class ShardingOptimizer(MetaOptimizerBase):
'global_ring_id': 3,
'mp_degree': self.mp_degree,
'mp_rank': global_rank % self.mp_degree,
'scale_gradient': self.scale_gradient
}
main_program = loss.block.program
main_program._pipeline_opt = pipeline_opt
......@@ -362,6 +371,8 @@ class ShardingOptimizer(MetaOptimizerBase):
main_block, strategy=strategy, shard=shard)
len_of_ops = len(main_block.ops)
if self.scale_gradient:
self._avg_grad_merge_after_sum(main_block, accumulated_grad_names)
first_optimize_op_index = get_first_optimize_op_idx(main_block)
if self.pp_allreduce_in_optimize:
......@@ -429,6 +440,55 @@ class ShardingOptimizer(MetaOptimizerBase):
# FIXME(wangxi): if fp16_allreduce, put cast fp16->fp32 to there?
def _avg_grad_merge_after_sum(self, main_block, accumulated_grad_names):
if self.user_defined_strategy.amp and \
self.user_defined_strategy.amp_configs['use_dynamic_loss_scaling']:
# For AMP, if using dynamic loss scaling the avg
# operation can be simple done by modify the LossScaling op.
for idx, op in enumerate(main_block.ops):
if op.type == 'check_finite_and_unscale':
loss_scale_name = op.input('Scale')[0]
loss_scaling_var = main_block.var(loss_scale_name)
loss_scale_tmp_var_name = loss_scale_name + '@TMP'
loss_scale_tmp_var = main_block.create_var(
name=loss_scale_tmp_var_name,
shape=loss_scaling_var.shape,
dtype=loss_scaling_var.dtype)
main_block._insert_op_without_sync(
idx,
type='scale',
inputs={'X': loss_scaling_var},
outputs={'Out': loss_scale_tmp_var},
attrs={
'scale': self._gradient_merge_acc_step,
'bias': 0.0,
'bias_after_scale': False,
OP_ROLE_KEY: OpRole.Optimize
})
op._rename_input(loss_scale_name, loss_scale_tmp_var_name)
break
else:
# For pp, do the avg operation for gradient merge after merging
# the gradient to meet the logic for gradient merge under pure dp.
tmp_first_opt_idx = None
for idx, op in enumerate(main_block.ops):
if is_optimizer_op(op) and op.type != 'c_sync_comm_stream':
tmp_first_opt_idx = idx
break
assert tmp_first_opt_idx is not None, 'Occurs some errors, no optimize ops'
for grad in accumulated_grad_names:
main_block._insert_op_without_sync(
tmp_first_opt_idx,
type='scale',
inputs={'X': grad},
outputs={'Out': grad},
attrs={
'scale': 1.0 / self._gradient_merge_acc_step,
'bias': 0.0,
'bias_after_scale': False,
OP_ROLE_KEY: OpRole.Optimize
})
def _adapt_amp_clip_without_sharding(self):
# if not use sharding, adapt amp/clip, for remain parallelism.
# cast --> amp --> clip --> opt
......
......@@ -5820,6 +5820,7 @@ class PipelineOptimizer(object):
self.global_ring_id = pipeline_opt['global_ring_id']
self.mp_degree = pipeline_opt['mp_degree']
self.mp_rank = pipeline_opt['mp_rank']
self.scale_gradient = pipeline_opt.get('scale_gradient', False)
assert self.mp_degree >= 1
assert 0 <= self.mp_rank < self.mp_degree
......@@ -5886,7 +5887,8 @@ class PipelineOptimizer(object):
"startup_program": new_startup_program,
}
real_block = program_list[self.local_rank].global_block()
self._insert_loss_scale(real_block)
if not self.scale_gradient:
self._insert_loss_scale(real_block)
if not self.use_sharding:
# Step7: clear gradients before each mini-batch and
# accumulate gradients during backward
......
......@@ -1272,6 +1272,201 @@ class TestFleetShardingHybridOptimizer(TestFleetMetaOptimizer):
self.assertEqual(dp_group_waiting_ports, ['127.0.0.1:36002'])
def test_hybrid_with_pp_dp_amp_with_gradient_fuse_and_avg_after_sum(self):
train_prog, startup_prog = paddle.fluid.Program(), paddle.fluid.Program(
)
avg_cost, strategy = self.pp_net(train_prog, startup_prog)
strategy.amp = True
strategy.amp_configs = {'custom_black_varnames': ['fc_6.b_0'], }
strategy.sharding = True
strategy.sharding_configs = {
"sharding_degree": 1,
"mp_degree": 1,
"pp_degree": 2,
"dp_degree": 2,
}
strategy.pipeline = True
strategy.pipeline_configs = {
"schedule_mode": "1F1B",
"micro_batch_size": 2,
"accumulate_steps": 4
}
strategy.gradient_scale_configs = {
'scale_strategy': 'avg',
'scale_gradient': True
}
strategy.fuse_grad_merge = True
self.optimizer(avg_cost, strategy, train_prog, startup_prog)
train_prog = train_prog._pipeline_opt['section_program']
startup_prog = startup_prog._pipeline_opt['startup_program']
startup_prog_ops = startup_prog.global_block().ops
main_prog_ops = train_prog.global_block().ops
# check program
startup_prog_op_types = [op.type for op in startup_prog_ops]
main_prog_op_types = [op.type for op in main_prog_ops]
self.assertEqual(startup_prog_op_types, [
'uniform_random', 'fill_constant', 'uniform_random',
'fill_constant', 'uniform_random', 'fill_constant',
'uniform_random', 'fill_constant', 'fill_constant', 'fill_constant',
'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant',
'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant',
'fill_constant', 'fill_constant', 'c_gen_nccl_id', 'c_comm_init',
'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init',
'c_gen_nccl_id', 'c_comm_init', 'c_broadcast', 'c_broadcast',
'c_broadcast', 'c_broadcast', 'c_broadcast', 'c_broadcast',
'c_broadcast', 'c_broadcast'
])
self.assertEqual(main_prog_op_types, [
'recv_v2', 'cast', 'mul', 'cast', 'elementwise_add', 'tanh', 'cast',
'mul', 'cast', 'elementwise_add', 'tanh', 'cast', 'mul', 'cast',
'elementwise_add', 'tanh', 'cast', 'mul', 'cast', 'elementwise_add',
'softmax', 'cross_entropy2', 'mean', 'elementwise_mul',
'coalesce_tensor', 'coalesce_tensor', 'coalesce_tensor',
'coalesce_tensor', 'fill_constant', 'elementwise_mul_grad',
'mean_grad', 'cross_entropy_grad2', 'softmax_grad',
'elementwise_add_grad', 'cast', 'mul_grad', 'tanh_grad',
'elementwise_add_grad', 'mul_grad', 'tanh_grad',
'elementwise_add_grad', 'mul_grad', 'tanh_grad',
'elementwise_add_grad', 'mul_grad', 'c_sync_calc_stream', 'send_v2',
'cast', 'sum', 'sum', 'c_allreduce_sum', 'c_allreduce_sum',
'c_sync_comm_stream', 'scale', 'check_finite_and_unscale', 'cast',
'c_allreduce_max', 'cast', 'update_loss_scaling', 'momentum',
'momentum', 'momentum', 'momentum', 'momentum', 'momentum',
'momentum', 'momentum'
])
def test_hybrid_with_pp_dp_with_gradient_fuse_and_avg_after_sum(self):
train_prog, startup_prog = paddle.fluid.Program(), paddle.fluid.Program(
)
avg_cost, strategy = self.pp_net(train_prog, startup_prog)
strategy.sharding = True
strategy.sharding_configs = {
"sharding_degree": 1,
"mp_degree": 1,
"pp_degree": 2,
"dp_degree": 2,
}
strategy.pipeline = True
strategy.pipeline_configs = {
"schedule_mode": "1F1B",
"micro_batch_size": 2,
"accumulate_steps": 4
}
strategy.gradient_scale_configs = {
'scale_strategy': 'avg',
'scale_gradient': True
}
strategy.fuse_grad_merge = True
self.optimizer(avg_cost, strategy, train_prog, startup_prog)
train_prog = train_prog._pipeline_opt['section_program']
startup_prog = startup_prog._pipeline_opt['startup_program']
startup_prog_ops = startup_prog.global_block().ops
main_prog_ops = train_prog.global_block().ops
# check program
startup_prog_op_types = [op.type for op in startup_prog_ops]
main_prog_op_types = [op.type for op in main_prog_ops]
self.assertEqual(startup_prog_op_types, [
'uniform_random', 'fill_constant', 'uniform_random',
'fill_constant', 'uniform_random', 'fill_constant',
'uniform_random', 'fill_constant', 'fill_constant', 'fill_constant',
'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant',
'fill_constant', 'fill_constant', 'fill_constant', 'c_gen_nccl_id',
'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id',
'c_comm_init', 'c_gen_nccl_id', 'c_comm_init', 'c_broadcast',
'c_broadcast', 'c_broadcast', 'c_broadcast', 'c_broadcast',
'c_broadcast', 'c_broadcast', 'c_broadcast'
])
self.assertEqual(main_prog_op_types, [
'recv_v2', 'mul', 'elementwise_add', 'tanh', 'mul',
'elementwise_add', 'tanh', 'mul', 'elementwise_add', 'tanh', 'mul',
'elementwise_add', 'softmax', 'cross_entropy2', 'mean',
'coalesce_tensor', 'coalesce_tensor', 'fill_constant', 'mean_grad',
'cross_entropy_grad2', 'softmax_grad', 'elementwise_add_grad',
'mul_grad', 'tanh_grad', 'elementwise_add_grad', 'mul_grad',
'tanh_grad', 'elementwise_add_grad', 'mul_grad', 'tanh_grad',
'elementwise_add_grad', 'mul_grad', 'c_sync_calc_stream', 'send_v2',
'sum', 'c_allreduce_sum', 'c_sync_comm_stream', 'scale', 'momentum',
'momentum', 'momentum', 'momentum', 'momentum', 'momentum',
'momentum', 'momentum'
])
def test_hybrid_with_pp_dp_with_amp_no_dynamic_gradient_fuse_and_avg_after_sum(
self):
train_prog, startup_prog = paddle.fluid.Program(), paddle.fluid.Program(
)
avg_cost, strategy = self.pp_net(train_prog, startup_prog)
strategy.sharding = True
strategy.sharding_configs = {
"sharding_degree": 1,
"mp_degree": 1,
"pp_degree": 2,
"dp_degree": 2,
}
strategy.amp = True
strategy.amp_configs = {
'custom_black_varnames': ['fc_6.b_0'],
'use_dynamic_loss_scaling': False
}
strategy.pipeline = True
strategy.pipeline_configs = {
"schedule_mode": "1F1B",
"micro_batch_size": 2,
"accumulate_steps": 4
}
strategy.gradient_scale_configs = {
'scale_strategy': 'avg',
'scale_gradient': True
}
strategy.fuse_grad_merge = True
self.optimizer(avg_cost, strategy, train_prog, startup_prog)
train_prog = train_prog._pipeline_opt['section_program']
startup_prog = startup_prog._pipeline_opt['startup_program']
startup_prog_ops = startup_prog.global_block().ops
main_prog_ops = train_prog.global_block().ops
# check program
startup_prog_op_types = [op.type for op in startup_prog_ops]
main_prog_op_types = [op.type for op in main_prog_ops]
self.assertEqual(startup_prog_op_types, [
'uniform_random', 'fill_constant', 'uniform_random',
'fill_constant', 'uniform_random', 'fill_constant',
'uniform_random', 'fill_constant', 'fill_constant', 'fill_constant',
'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant',
'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant',
'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init',
'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init',
'c_broadcast', 'c_broadcast', 'c_broadcast', 'c_broadcast',
'c_broadcast', 'c_broadcast', 'c_broadcast', 'c_broadcast'
])
self.assertEqual(main_prog_op_types, [
'recv_v2', 'cast', 'mul', 'cast', 'elementwise_add', 'tanh', 'cast',
'mul', 'cast', 'elementwise_add', 'tanh', 'cast', 'mul', 'cast',
'elementwise_add', 'tanh', 'cast', 'mul', 'cast', 'elementwise_add',
'softmax', 'cross_entropy2', 'mean', 'elementwise_mul',
'coalesce_tensor', 'coalesce_tensor', 'coalesce_tensor',
'coalesce_tensor', 'fill_constant', 'elementwise_mul_grad',
'mean_grad', 'cross_entropy_grad2', 'softmax_grad',
'elementwise_add_grad', 'cast', 'mul_grad', 'tanh_grad',
'elementwise_add_grad', 'mul_grad', 'tanh_grad',
'elementwise_add_grad', 'mul_grad', 'tanh_grad',
'elementwise_add_grad', 'mul_grad', 'c_sync_calc_stream', 'send_v2',
'cast', 'sum', 'sum', 'c_allreduce_sum', 'c_allreduce_sum',
'c_sync_comm_stream', 'scale', 'scale', 'check_finite_and_unscale',
'momentum', 'momentum', 'momentum', 'momentum', 'momentum',
'momentum', 'momentum', 'momentum'
])
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册