未验证 提交 e931cd12 编写于 作者: Y Yuang Liu 提交者: GitHub

[cherry-pick][hybrid performance] Grad fuse for gradient merge under pipeline...

[cherry-pick][hybrid performance] Grad fuse for gradient merge under pipeline mode (#35004) (#35299)
上级 d4948bc1
...@@ -200,6 +200,7 @@ message DistributedStrategy { ...@@ -200,6 +200,7 @@ message DistributedStrategy {
optional int32 fuse_grad_size_in_num = 31 [ default = 8 ]; optional int32 fuse_grad_size_in_num = 31 [ default = 8 ];
optional bool calc_comm_same_stream = 32 [ default = false ]; optional bool calc_comm_same_stream = 32 [ default = false ];
optional bool asp = 33 [ default = false ]; optional bool asp = 33 [ default = false ];
optional bool fuse_grad_merge = 34 [ default = false ];
optional RecomputeConfig recompute_configs = 101; optional RecomputeConfig recompute_configs = 101;
optional AMPConfig amp_configs = 102; optional AMPConfig amp_configs = 102;
......
...@@ -20,10 +20,49 @@ ...@@ -20,10 +20,49 @@
#include "paddle/fluid/framework/var_type.h" #include "paddle/fluid/framework/var_type.h"
#include "paddle/fluid/operators/math/math_function.h" #include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/platform/device_memory_aligment.h" #include "paddle/fluid/platform/device_memory_aligment.h"
#ifdef PADDLE_WITH_ASCEND_CL
#include "paddle/fluid/operators/npu_op_runner.h"
#endif
namespace paddle { namespace paddle {
namespace operators { namespace operators {
template <typename DeviceContext>
struct FillConstantVisitor {
FillConstantVisitor(const DeviceContext &dev_ctx,
framework::LoDTensor *tensor, const float value)
: dev_ctx_(dev_ctx), tensor_(tensor), value_(value) {}
template <typename T>
void apply(typename std::enable_if<std::is_same<T, int8_t>::value ||
std::is_same<T, int16_t>::value>::type * =
nullptr) const {
PADDLE_THROW(platform::errors::InvalidArgument(
"Not support data type for set_constant attr"));
}
template <typename T>
void apply(typename std::enable_if<!(std::is_same<T, int8_t>::value ||
std::is_same<T, int16_t>::value)>::type
* = nullptr) const {
#ifdef PADDLE_WITH_ASCEND_CL
if (platform::is_npu_place(dev_ctx_.GetPlace())) {
FillNpuTensorWithConstant<T>(tensor_, static_cast<T>(value_));
} else {
math::SetConstant<DeviceContext, T> set_constant;
set_constant(dev_ctx_, tensor_, static_cast<T>(value_));
}
#else
math::SetConstant<DeviceContext, T> set_constant;
set_constant(dev_ctx_, tensor_, static_cast<T>(value_));
#endif
}
const DeviceContext &dev_ctx_;
framework::LoDTensor *tensor_;
float value_;
};
template <typename DeviceContext, typename T> template <typename DeviceContext, typename T>
class CoalesceTensorOpKernel : public framework::OpKernel<T> { class CoalesceTensorOpKernel : public framework::OpKernel<T> {
public: public:
...@@ -70,6 +109,7 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> { ...@@ -70,6 +109,7 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
auto in_tensors = context.MultiInput<framework::LoDTensor>("Input"); auto in_tensors = context.MultiInput<framework::LoDTensor>("Input");
bool use_align = context.Attr<bool>("use_align"); bool use_align = context.Attr<bool>("use_align");
auto align_size = context.Attr<int>("align_size"); auto align_size = context.Attr<int>("align_size");
auto size_of_dtype = context.Attr<int>("user_defined_size_of_dtype");
if (context.Attr<bool>("check_name")) { if (context.Attr<bool>("check_name")) {
for (size_t i = 0; i < in_var_names.size(); ++i) { for (size_t i = 0; i < in_var_names.size(); ++i) {
...@@ -94,7 +134,9 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> { ...@@ -94,7 +134,9 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
size_t numel = 0; size_t numel = 0;
auto dtype = static_cast<framework::proto::VarType::Type>( auto dtype = static_cast<framework::proto::VarType::Type>(
context.Attr<int>("dtype")); context.Attr<int>("dtype"));
size_t size_of_dtype = framework::SizeOfType(dtype); if (size_of_dtype == -1) {
size_of_dtype = framework::SizeOfType(dtype);
}
GetMemSizeAndDtype(in_tensors, in_var_names, &numel, size_of_dtype, GetMemSizeAndDtype(in_tensors, in_var_names, &numel, size_of_dtype,
context.GetPlace(), use_align, align_size); context.GetPlace(), use_align, align_size);
...@@ -121,10 +163,9 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> { ...@@ -121,10 +163,9 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
: len; : len;
} }
} else if (context.Attr<bool>("set_constant")) { } else if (context.Attr<bool>("set_constant")) {
// TODO(Liu yuang) ADD NPU SET_CONSTANT FUNCTION. framework::VisitDataType(
math::SetConstant<DeviceContext, T> set_constant; dtype, FillConstantVisitor<DeviceContext>(
set_constant(dev_ctx, fused_tensor, dev_ctx, fused_tensor, context.Attr<float>("constant")));
static_cast<T>(context.Attr<float>("constant")));
} else if (context.Attr<bool>("persist_output")) { } else if (context.Attr<bool>("persist_output")) {
for (size_t i = 0; i < out_var_names.size(); ++i) { for (size_t i = 0; i < out_var_names.size(); ++i) {
size_t len = static_cast<size_t>(out_tensors[i]->numel()); size_t len = static_cast<size_t>(out_tensors[i]->numel());
...@@ -227,10 +268,13 @@ class CoalesceTensorOp : public framework::OperatorWithKernel { ...@@ -227,10 +268,13 @@ class CoalesceTensorOp : public framework::OperatorWithKernel {
} }
auto use_align = ctx->Attrs().Get<bool>("use_align"); auto use_align = ctx->Attrs().Get<bool>("use_align");
auto align_size = ctx->Attrs().Get<int>("align_size"); auto align_size = ctx->Attrs().Get<int>("align_size");
auto size_of_dtype = ctx->Attrs().Get<int>("user_defined_size_of_dtype");
auto dtype = static_cast<framework::proto::VarType::Type>( auto dtype = static_cast<framework::proto::VarType::Type>(
ctx->Attrs().Get<int>("dtype")); ctx->Attrs().Get<int>("dtype"));
size_t size_of_dtype = framework::SizeOfType(dtype); if (size_of_dtype == -1) {
size_of_dtype = framework::SizeOfType(dtype);
}
auto alignment = [](size_t size, size_t align_size) { auto alignment = [](size_t size, size_t align_size) {
size_t remaining = size % align_size; size_t remaining = size % align_size;
...@@ -308,6 +352,15 @@ class CoalesceTensorOpMaker : public framework::OpProtoAndCheckerMaker { ...@@ -308,6 +352,15 @@ class CoalesceTensorOpMaker : public framework::OpProtoAndCheckerMaker {
.SetDefault(true); .SetDefault(true);
AddAttr<int>("align_size", "The alignment size when use_align is True") AddAttr<int>("align_size", "The alignment size when use_align is True")
.SetDefault(-1); .SetDefault(-1);
AddAttr<int>("user_defined_size_of_dtype",
"The user defined size of dtype. This is used to coalesce "
"grad vars and merged_grad vars at the same time. For some "
"strategy, the dtype of fused_grad_vars and the dtype of "
"fused_grad_merged_vars are not identical, which will cause "
"the shape of these two coalesced vars are different. To "
"make sure the shape of these two vars are identical with "
"each other, this attr is added.")
.SetDefault(-1);
AddComment(R"DOC( AddComment(R"DOC(
CoalesceTensor Operator. CoalesceTensor Operator.
......
...@@ -967,6 +967,28 @@ class DistributedStrategy(object): ...@@ -967,6 +967,28 @@ class DistributedStrategy(object):
"WARNING: calc_comm_same_stream should have value of boolean type" "WARNING: calc_comm_same_stream should have value of boolean type"
) )
@property
def fuse_grad_merge(self):
"""
Set whether fuse the grad for gradient merge.
Note: this flag will only effect the gradient merge under pipeline mode
The default value for the fuse_grad_merge is False
Examples:
.. code-block:: python
import paddle.distributed.fleet as fleet
strategy = fleet.DistributedStrategy()
strategy.fuse_param_grad = True
"""
return self.strategy.fuse_grad_merge
@fuse_grad_merge.setter
@is_strict_auto
def fuse_grad_merge(self, fuse_grad_merge):
if isinstance(fuse_grad_merge, bool):
self.strategy.fuse_grad_merge = fuse_grad_merge
else:
print("WARNING: fuse_grad_merge should have value of boolean type")
@property @property
def fuse_grad_size_in_num(self): def fuse_grad_size_in_num(self):
""" """
......
...@@ -122,6 +122,9 @@ class OffloadHelper(object): ...@@ -122,6 +122,9 @@ class OffloadHelper(object):
for idx, op in enumerate(block.ops): for idx, op in enumerate(block.ops):
if is_optimizer_op(op): if is_optimizer_op(op):
break break
# TODO (Yuang Liu): tmp solution for fuse_grad_merge + optimize_cast
if not offload and op.type == 'coalesce_tensor':
continue
for input_name in op.desc.input_arg_names(): for input_name in op.desc.input_arg_names():
if input_name not in param_to_idx: if input_name not in param_to_idx:
continue continue
......
...@@ -341,7 +341,11 @@ def insert_allreduce_ops(block, ...@@ -341,7 +341,11 @@ def insert_allreduce_ops(block,
if len(allreduce_vars) == 0: if len(allreduce_vars) == 0:
return return
if user_defined_strategy and user_defined_strategy.fuse_all_reduce_ops: if user_defined_strategy and \
user_defined_strategy.fuse_all_reduce_ops and \
not user_defined_strategy.fuse_grad_merge:
# If fuse_grad_merge is enable, the grad vars have already been fused during
# gradient merge pass, therefore, those vars are not need to be fused here
insert_fused_allreduce_ops(block, insert_idx, ring_id, allreduce_vars, insert_fused_allreduce_ops(block, insert_idx, ring_id, allreduce_vars,
op_role, use_calc_stream, op_role, use_calc_stream,
user_defined_strategy.fuse_grad_size_in_MB) user_defined_strategy.fuse_grad_size_in_MB)
......
...@@ -319,7 +319,9 @@ class ShardingOptimizer(MetaOptimizerBase): ...@@ -319,7 +319,9 @@ class ShardingOptimizer(MetaOptimizerBase):
main_block._remove_op(idx) main_block._remove_op(idx)
accumulated_grad_names = self._pp_optimizer._accumulate_gradients( accumulated_grad_names = self._pp_optimizer._accumulate_gradients(
main_block, fp16_allreduce=fp16_allreduce) main_block,
fp16_allreduce=fp16_allreduce,
user_defined_strategy=strategy)
len_of_ops = len(main_block.ops) len_of_ops = len(main_block.ops)
first_optimize_op_index = get_first_optimize_op_idx(main_block) first_optimize_op_index = get_first_optimize_op_idx(main_block)
......
...@@ -5044,11 +5044,18 @@ class PipelineOptimizer(object): ...@@ -5044,11 +5044,18 @@ class PipelineOptimizer(object):
def _accumulate_gradients(self, def _accumulate_gradients(self,
block, block,
pp_allreduce_in_optimize=False, pp_allreduce_in_optimize=False,
fp16_allreduce=False): fp16_allreduce=False,
user_defined_strategy=None):
""" """
Create a new merged gradient for each parameter and accumulate the Create a new merged gradient for each parameter and accumulate the
corresponding gradient to it. corresponding gradient to it.
""" """
if user_defined_strategy and user_defined_strategy.fuse_grad_merge:
fused_gradient_names = self._accumulate_gradients_with_fuse(
block, fp16_allreduce,
user_defined_strategy.fuse_grad_size_in_MB)
return fused_gradient_names
merged_gradient_names = [] merged_gradient_names = []
first_opt_op_idx = None first_opt_op_idx = None
...@@ -5178,6 +5185,252 @@ class PipelineOptimizer(object): ...@@ -5178,6 +5185,252 @@ class PipelineOptimizer(object):
return merged_gradient_names return merged_gradient_names
def _accumulate_gradients_with_fuse(self, main_block, fp16, fused_size):
first_opt_op_idx = None
grad_param_pairs = []
# obtain all param/grad pairs that needed to be fused
for index, op in reversed(tuple(enumerate(list(main_block.ops)))):
# remove the cast op of fp16 grad to fp32 grad
if self._is_optimize_op(op) and op.type == 'cast':
in_name = op.input_arg_names[0]
out_name = op.output_arg_names[0]
if out_name.strip('@GRAD') in self._param_device_map:
assert in_name.replace('.cast_fp16', '') == out_name
main_block._remove_op(index)
continue
if self._is_backward_op(op) and first_opt_op_idx is None:
first_opt_op_idx = index + 1
# no optimize phase
if first_opt_op_idx == len(main_block.ops):
return
if self._is_backward_op(op) and (
self._op_role_var_key in op.attr_names):
op_role_var = op.attr(self._op_role_var_key)
if len(op_role_var) == 0:
continue
assert len(op_role_var) % 2 == 0
for i in range(0, len(op_role_var), 2):
param_name = op_role_var[i]
if not main_block.has_var(param_name):
continue
if '@BroadCast' in param_name:
continue
grad_param_pairs.append(
(op_role_var[i + 1], op_role_var[i]))
if len(grad_param_pairs) == 0:
return
grad_param_segments = []
merged_suffix = '@MERGED@FP16' if fp16 else '@MERGED'
dtype = paddle.float16 if fp16 else paddle.float32
cur_size = 0.
last_dtype = None
# split the grad based on dtype and fused size
for grad, param in grad_param_pairs:
real_grad = main_block.var(grad)
# create the gradient merged var for each grad
merged_grad_var = main_block.create_var(
name=param + core.grad_var_suffix() + merged_suffix,
dtype=dtype,
shape=real_grad.shape,
persistable=True,
stop_gradient=False)
real_param = main_block.var(param)
tmp_size = self._get_var_size(real_grad)
# two strategies for splitting the grad
# 1. the current segment's size reach the user defined grad_size_in_MB
# 2. the upcoming grad holds different dtype compared with grads in current segment
if len(grad_param_segments) == 0 \
or cur_size + tmp_size > fused_size \
or real_grad.dtype != last_dtype:
grad_param_segments.append(
([real_grad], [real_param], [merged_grad_var]))
last_dtype = real_grad.dtype
cur_size = 0.
else:
grad_param_segments[-1][0].append(real_grad)
grad_param_segments[-1][1].append(real_param)
grad_param_segments[-1][2].append(merged_grad_var)
cur_size += tmp_size
fused_gradients = []
fused_merged_gradients = []
# create fused vars for grad and param
for grad_param_segment in grad_param_segments:
grad_segment = grad_param_segment[0]
merged_grad_segment = grad_param_segment[2]
fused_grad = main_block.create_var(
name='FusedGrad_{}'.format(grad_segment[0].name),
dtype=grad_segment[0].dtype,
persistable=False,
stop_gradient=False)
# keep the '.cast_fp16' info in the fuse var name
fused_merged_grad_name_prefix = 'FusedMergedGrad.cast_fp16.' if \
merged_grad_segment[0].dtype == paddle.float16 else 'FusedMergedGrad'
fused_merged_grad_name = fused_merged_grad_name_prefix + '_{}'.format(
merged_grad_segment[0].name)
fused_merged_grad = main_block.create_var(
name=fused_merged_grad_name,
dtype=merged_grad_segment[0].dtype,
persistable=True,
stop_gradient=False)
fused_gradients.append(fused_grad)
fused_merged_gradients.append(fused_merged_grad)
assert len(fused_gradients) == len(grad_param_segments)
assert len(fused_merged_gradients) == len(grad_param_segments)
# insert coalesce op at the start of the backward pass
# use param as the coalesce input to make sure the two Fused vars are in same shape
first_back_op_idx = None
for index, op in enumerate(main_block.ops):
if self._is_backward_op(op) and first_back_op_idx is None:
first_back_op_idx = index
break
assert first_back_op_idx is not None
offset = 0
for i in range(len(grad_param_segments)):
fused_grad = fused_gradients[i]
fused_merged_grad = fused_merged_gradients[i]
grads = grad_param_segments[i][0]
params = grad_param_segments[i][1]
merged_grads = grad_param_segments[i][2]
main_block._insert_op_without_sync(
first_back_op_idx + offset,
type="coalesce_tensor",
inputs={"Input": params},
outputs={"Output": grads,
"FusedOutput": fused_grad},
attrs={
# Explanation of user_defined_size_of_dtype:
# In coalesce op, the align size is 256 bytes
# the float takes 4 bytes while fp16 takes 2 bytes.
# To meet the requirement, 128 fp16 or 64 float will be aligned
# Think the total shape of the input tensors if [64],
# if the dtype is float, then the shape of the fuse var is [64]
# however if the dytpe if fp16, the shape of the fuse var is [128],
# which will cause the fused vars' shape vary between each other.
# To make sure the shape of the fused vars are identical,
# we set the dtype of float and fp16 both to 2.
# Under this way, the fused vars' shape for float and fp16 are all [128]
"user_defined_size_of_dtype": 2,
"copy_data": False,
"use_align": True,
"dtype": grads[0].dtype,
self._op_role_key: self._op_role.Backward
})
offset += 1
# For the gradient_merged_fused_var, given a init value during the coalesce op
# this will remove a problematic fill_constant op. This op role of this coalesce
# is set to be LRSched to make this coalesce (with init) only run once
main_block._insert_op_without_sync(
first_back_op_idx + offset,
type="coalesce_tensor",
inputs={"Input": params},
outputs={
"Output": merged_grads,
"FusedOutput": fused_merged_grad
},
attrs={
"user_defined_size_of_dtype": 2,
"set_constant": True,
"constant": float(0.0),
"copy_data": False,
"use_align": True,
"dtype": merged_grads[0].dtype,
self._op_role_key: self._op_role.Optimize.LRSched
})
offset += 1
# insert gradient merge relating ops
first_opt_op_idx += offset
offset = 0
for i in range(len(fused_gradients)):
fused_grad = fused_gradients[i]
fused_merged_grad = fused_merged_gradients[i]
is_fp16_grad = 'cast_fp16' in fused_grad.name
need_cast = (is_fp16_grad is not fp16)
if need_cast:
# for fp16 allreduce, cast fp32 grad to fp16
# for fp32 allreduce, cast fp16 grad to fp32
cast_grad_var_name = fused_grad.name + '@TMP'
cast_grad_var = main_block.create_var(
name=cast_grad_var_name,
dtype=dtype,
persistable=False,
stop_gradient=False)
main_block._insert_op(
index=first_opt_op_idx + offset,
type='cast',
inputs={'X': fused_grad},
outputs={'Out': cast_grad_var},
attrs={
'in_dtype': fused_grad.dtype,
'out_dtype': cast_grad_var.dtype,
self._op_role_key: self._op_role.Backward,
})
offset += 1
fused_grad = cast_grad_var
main_block._insert_op(
index=first_opt_op_idx + offset,
type='sum',
inputs={'X': [fused_merged_grad, fused_grad]},
outputs={'Out': fused_merged_grad},
attrs={self._op_role_key: self._op_role.Backward})
offset += 1
if fp16:
# if using fp16 allreduce, the optimizer needs fp32 grads, cast them back to fp32
for grad, param in grad_param_pairs:
real_grad = main_block.var(grad)
fp16_grad_name = param + core.grad_var_suffix() + '@MERGED@FP16'
assert main_block.has_var(fp16_grad_name)
fp16_grad = main_block.var(fp16_grad_name)
fp32_grad_name = param + core.grad_var_suffix() + '@MERGED'
fp32_grad = main_block.create_var(
name=fp32_grad_name,
dtype=paddle.float32,
shape=real_grad.shape,
persistable=False,
stop_gradient=False)
main_block._insert_op(
index=first_opt_op_idx + offset,
type='cast',
inputs={'X': fp16_grad},
outputs={'Out': fp32_grad},
attrs={
'in_dtype': paddle.float16,
'out_dtype': paddle.float32,
self._op_role_key: self._op_role.Optimize,
})
offset += 1
# replace the var with it's name, which will be used for inserting allreduce
for i in range(len(fused_merged_gradients)):
fused_merged_gradients[i] = fused_merged_gradients[i].name
main_block._sync_with_cpp()
return fused_merged_gradients
def _get_var_size(self, var):
dtype_to_size = {
core.VarDesc.VarType.FP16: 2,
core.VarDesc.VarType.FP32: 4,
core.VarDesc.VarType.FP64: 8,
core.VarDesc.VarType.INT16: 2,
core.VarDesc.VarType.INT32: 4,
core.VarDesc.VarType.INT64: 8,
core.VarDesc.VarType.BOOL: 1,
core.VarDesc.VarType.UINT8: 1,
}
assert -1 not in var.shape
return reduce(lambda x, y: x * y,
var.shape) * dtype_to_size[var.dtype] / 1024.0 / 1024.0
def _add_sub_blocks(self, main_block, program_list): def _add_sub_blocks(self, main_block, program_list):
main_program = main_block.program main_program = main_block.program
for prog in program_list: for prog in program_list:
......
...@@ -90,7 +90,8 @@ class TestAllocContinuousSpace2(TestAllocContinuousSpace): ...@@ -90,7 +90,8 @@ class TestAllocContinuousSpace2(TestAllocContinuousSpace):
"set_constant": False, "set_constant": False,
"constant": 0.5, "constant": 0.5,
"use_align": True, "use_align": True,
"dtype": self.fluid_dtype "dtype": self.fluid_dtype,
"user_defined_size_of_dtype": 2
} }
def test_check_output(self): def test_check_output(self):
......
...@@ -92,7 +92,8 @@ class TestAllocContinuousSpace2(TestAllocContinuousSpace): ...@@ -92,7 +92,8 @@ class TestAllocContinuousSpace2(TestAllocContinuousSpace):
"copy_data": False, "copy_data": False,
"set_constant": True, "set_constant": True,
"constant": 0.5, "constant": 0.5,
"dtype": self.fluid_dtype "dtype": self.fluid_dtype,
"user_defined_size_of_dtype": 2
} }
def test_check_output(self): def test_check_output(self):
......
...@@ -1050,6 +1050,189 @@ class TestFleetShardingHybridOptimizer(TestFleetMetaOptimizer): ...@@ -1050,6 +1050,189 @@ class TestFleetShardingHybridOptimizer(TestFleetMetaOptimizer):
self.assertEqual(dp_group_waiting_ports, ['127.0.0.1:36002']) self.assertEqual(dp_group_waiting_ports, ['127.0.0.1:36002'])
def test_hybrid_with_pp_dp_amp_fp16allreduce_optimize_cast_with_gradient_fuse(
self):
train_prog, startup_prog = paddle.fluid.Program(), paddle.fluid.Program(
)
avg_cost, strategy = self.pp_net(train_prog, startup_prog)
strategy.amp = True
strategy.amp_configs = {'custom_black_varnames': ['fc_6.b_0'], }
strategy.sharding = True
strategy.sharding_configs = {
"sharding_degree": 1,
"mp_degree": 1,
"pp_degree": 2,
"dp_degree": 2,
"optimize_cast": True,
}
strategy.pipeline = True
strategy.pipeline_configs = {
"schedule_mode": "1F1B",
"micro_batch_size": 2,
"accumulate_steps": 4,
}
strategy.fp16_allreduce = True
strategy.fuse_grad_merge = True
self.optimizer(avg_cost, strategy, train_prog, startup_prog)
train_prog = train_prog._pipeline_opt['section_program']
startup_prog = startup_prog._pipeline_opt['startup_program']
startup_prog_ops = startup_prog.global_block().ops
main_prog_ops = train_prog.global_block().ops
# check program
startup_prog_op_types = [op.type for op in startup_prog_ops]
main_prog_op_types = [op.type for op in main_prog_ops]
# ring: mp, pp_group, pp_pair, pp_pair
self.assertEqual(startup_prog_op_types, [
'uniform_random', 'cast', 'fill_constant', 'cast', 'uniform_random',
'cast', 'fill_constant', 'cast', 'uniform_random', 'cast',
'fill_constant', 'cast', 'uniform_random', 'cast', 'fill_constant',
'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant',
'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant',
'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant',
'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init',
'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init',
'c_sync_comm_stream'
])
self.assertEqual(main_prog_op_types, [
'recv_v2', 'mul', 'elementwise_add', 'tanh', 'mul',
'elementwise_add', 'tanh', 'mul', 'elementwise_add', 'tanh', 'mul',
'cast', 'elementwise_add', 'softmax', 'cross_entropy2', 'mean',
'elementwise_mul', 'coalesce_tensor', 'coalesce_tensor',
'coalesce_tensor', 'coalesce_tensor', 'fill_constant', 'scale',
'scale', 'elementwise_mul_grad', 'mean_grad', 'cross_entropy_grad2',
'softmax_grad', 'elementwise_add_grad', 'cast', 'mul_grad',
'tanh_grad', 'elementwise_add_grad', 'mul_grad', 'tanh_grad',
'elementwise_add_grad', 'mul_grad', 'tanh_grad',
'elementwise_add_grad', 'mul_grad', 'c_sync_calc_stream', 'send_v2',
'sum', 'cast', 'sum', 'c_allreduce_sum', 'c_allreduce_sum', 'cast',
'cast', 'cast', 'cast', 'cast', 'cast', 'cast', 'cast',
'c_sync_comm_stream', 'check_finite_and_unscale', 'cast',
'c_allreduce_max', 'cast', 'update_loss_scaling', 'momentum',
'cast', 'momentum', 'cast', 'momentum', 'cast', 'momentum', 'cast',
'momentum', 'cast', 'momentum', 'cast', 'momentum', 'momentum',
'cast'
])
# amp check_finite_and_unscale, allreduce(pp)
self.assertEqual(main_prog_op_types.count('c_allreduce_max'), 1)
# should has ring id for pp
created_ring_ids = [
op.desc.attr("ring_id") for op in startup_prog_ops
if op.type == "c_comm_init"
]
self.assertIn(self.pp_pair_ring_id, created_ring_ids)
self.assertIn(self.dp_ring_id, created_ring_ids)
# check correctness of pp group
for op in startup_prog_ops:
if op.type == "c_gen_nccl_id" and op.desc.output_arg_names()[
0] == "comm_id_0":
pp_group_waiting_ports = op.desc.attr("other_endpoints")
self.assertEqual(pp_group_waiting_ports, ['127.0.0.1:36003'])
# check correctness of dp group
for op in startup_prog_ops:
if op.type == "c_gen_nccl_id" and op.desc.output_arg_names()[
0] == "comm_id_3":
dp_group_waiting_ports = op.desc.attr("other_endpoints")
self.assertEqual(dp_group_waiting_ports, ['127.0.0.1:36002'])
def test_hybrid_with_pp_dp_amp_with_gradient_fuse(self):
train_prog, startup_prog = paddle.fluid.Program(), paddle.fluid.Program(
)
avg_cost, strategy = self.pp_net(train_prog, startup_prog)
strategy.amp = True
strategy.amp_configs = {'custom_black_varnames': ['fc_6.b_0'], }
strategy.sharding = True
strategy.sharding_configs = {
"sharding_degree": 1,
"mp_degree": 1,
"pp_degree": 2,
"dp_degree": 2,
}
strategy.pipeline = True
strategy.pipeline_configs = {
"schedule_mode": "1F1B",
"micro_batch_size": 2,
"accumulate_steps": 4,
}
strategy.fuse_grad_merge = True
self.optimizer(avg_cost, strategy, train_prog, startup_prog)
train_prog = train_prog._pipeline_opt['section_program']
startup_prog = startup_prog._pipeline_opt['startup_program']
startup_prog_ops = startup_prog.global_block().ops
main_prog_ops = train_prog.global_block().ops
# check program
startup_prog_op_types = [op.type for op in startup_prog_ops]
main_prog_op_types = [op.type for op in main_prog_ops]
# ring: mp, pp_group, pp_pair, pp_pair
self.assertEqual(startup_prog_op_types, [
'uniform_random', 'fill_constant', 'uniform_random',
'fill_constant', 'uniform_random', 'fill_constant',
'uniform_random', 'fill_constant', 'fill_constant', 'fill_constant',
'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant',
'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant',
'fill_constant', 'fill_constant', 'c_gen_nccl_id', 'c_comm_init',
'c_gen_nccl_id', 'c_comm_init', 'c_gen_nccl_id', 'c_comm_init',
'c_gen_nccl_id', 'c_comm_init', 'c_sync_comm_stream'
])
self.assertEqual(main_prog_op_types, [
'recv_v2', 'cast', 'mul', 'cast', 'elementwise_add', 'tanh', 'cast',
'mul', 'cast', 'elementwise_add', 'tanh', 'cast', 'mul', 'cast',
'elementwise_add', 'tanh', 'cast', 'mul', 'cast', 'elementwise_add',
'softmax', 'cross_entropy2', 'mean', 'elementwise_mul',
'coalesce_tensor', 'coalesce_tensor', 'coalesce_tensor',
'coalesce_tensor', 'fill_constant', 'scale', 'scale',
'elementwise_mul_grad', 'mean_grad', 'cross_entropy_grad2',
'softmax_grad', 'elementwise_add_grad', 'cast', 'mul_grad',
'tanh_grad', 'elementwise_add_grad', 'mul_grad', 'tanh_grad',
'elementwise_add_grad', 'mul_grad', 'tanh_grad',
'elementwise_add_grad', 'mul_grad', 'c_sync_calc_stream', 'send_v2',
'cast', 'sum', 'sum', 'c_allreduce_sum', 'c_allreduce_sum',
'c_sync_comm_stream', 'check_finite_and_unscale', 'cast',
'c_allreduce_max', 'cast', 'update_loss_scaling', 'momentum',
'momentum', 'momentum', 'momentum', 'momentum', 'momentum',
'momentum', 'momentum'
])
# amp check_finite_and_unscale, allreduce(pp)
self.assertEqual(main_prog_op_types.count('c_allreduce_max'), 1)
# should has ring id for pp
created_ring_ids = [
op.desc.attr("ring_id") for op in startup_prog_ops
if op.type == "c_comm_init"
]
self.assertIn(self.pp_pair_ring_id, created_ring_ids)
self.assertIn(self.dp_ring_id, created_ring_ids)
# check correctness of pp group
for op in startup_prog_ops:
if op.type == "c_gen_nccl_id" and op.desc.output_arg_names()[
0] == "comm_id_0":
pp_group_waiting_ports = op.desc.attr("other_endpoints")
self.assertEqual(pp_group_waiting_ports, ['127.0.0.1:36003'])
# check correctness of dp group
for op in startup_prog_ops:
if op.type == "c_gen_nccl_id" and op.desc.output_arg_names()[
0] == "comm_id_3":
dp_group_waiting_ports = op.desc.attr("other_endpoints")
self.assertEqual(dp_group_waiting_ports, ['127.0.0.1:36002'])
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册