Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
Paddle
提交
4d9b2d6d
P
Paddle
项目概览
PaddlePaddle
/
Paddle
大约 1 年 前同步成功
通知
2299
Star
20931
Fork
5422
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1423
列表
看板
标记
里程碑
合并请求
543
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1,423
Issue
1,423
列表
看板
标记
里程碑
合并请求
543
合并请求
543
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
4d9b2d6d
编写于
8月 20, 2021
作者:
Y
Yuang Liu
提交者:
GitHub
8月 20, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[hybrid performance] Grad fuse for gradient merge under pipeline mode (#35004)
上级
f6015d0d
变更
10
显示空白变更内容
内联
并排
Showing
10 changed file
with
534 addition
and
11 deletion
+534
-11
paddle/fluid/framework/distributed_strategy.proto
paddle/fluid/framework/distributed_strategy.proto
+1
-0
paddle/fluid/operators/coalesce_tensor_op.cc
paddle/fluid/operators/coalesce_tensor_op.cc
+59
-6
python/paddle/distributed/fleet/base/distributed_strategy.py
python/paddle/distributed/fleet/base/distributed_strategy.py
+22
-0
python/paddle/distributed/fleet/meta_optimizers/sharding/offload_helper.py
...tributed/fleet/meta_optimizers/sharding/offload_helper.py
+3
-0
python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py
...addle/distributed/fleet/meta_optimizers/sharding/utils.py
+5
-1
python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py
...e/distributed/fleet/meta_optimizers/sharding_optimizer.py
+3
-1
python/paddle/fluid/optimizer.py
python/paddle/fluid/optimizer.py
+254
-1
python/paddle/fluid/tests/unittests/npu/test_coalesce_tensor_op_npu.py
.../fluid/tests/unittests/npu/test_coalesce_tensor_op_npu.py
+2
-1
python/paddle/fluid/tests/unittests/test_coalesce_tensor_op.py
...n/paddle/fluid/tests/unittests/test_coalesce_tensor_op.py
+2
-1
python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py
...uid/tests/unittests/test_fleet_sharding_meta_optimizer.py
+183
-0
未找到文件。
paddle/fluid/framework/distributed_strategy.proto
浏览文件 @
4d9b2d6d
...
@@ -200,6 +200,7 @@ message DistributedStrategy {
...
@@ -200,6 +200,7 @@ message DistributedStrategy {
optional
int32
fuse_grad_size_in_num
=
31
[
default
=
8
];
optional
int32
fuse_grad_size_in_num
=
31
[
default
=
8
];
optional
bool
calc_comm_same_stream
=
32
[
default
=
false
];
optional
bool
calc_comm_same_stream
=
32
[
default
=
false
];
optional
bool
asp
=
33
[
default
=
false
];
optional
bool
asp
=
33
[
default
=
false
];
optional
bool
fuse_grad_merge
=
34
[
default
=
false
];
optional
RecomputeConfig
recompute_configs
=
101
;
optional
RecomputeConfig
recompute_configs
=
101
;
optional
AMPConfig
amp_configs
=
102
;
optional
AMPConfig
amp_configs
=
102
;
...
...
paddle/fluid/operators/coalesce_tensor_op.cc
浏览文件 @
4d9b2d6d
...
@@ -20,10 +20,49 @@
...
@@ -20,10 +20,49 @@
#include "paddle/fluid/framework/var_type.h"
#include "paddle/fluid/framework/var_type.h"
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/platform/device_memory_aligment.h"
#include "paddle/fluid/platform/device_memory_aligment.h"
#ifdef PADDLE_WITH_ASCEND_CL
#include "paddle/fluid/operators/npu_op_runner.h"
#endif
namespace
paddle
{
namespace
paddle
{
namespace
operators
{
namespace
operators
{
template
<
typename
DeviceContext
>
struct
FillConstantVisitor
{
FillConstantVisitor
(
const
DeviceContext
&
dev_ctx
,
framework
::
LoDTensor
*
tensor
,
const
float
value
)
:
dev_ctx_
(
dev_ctx
),
tensor_
(
tensor
),
value_
(
value
)
{}
template
<
typename
T
>
void
apply
(
typename
std
::
enable_if
<
std
::
is_same
<
T
,
int8_t
>::
value
||
std
::
is_same
<
T
,
int16_t
>::
value
>::
type
*
=
nullptr
)
const
{
PADDLE_THROW
(
platform
::
errors
::
InvalidArgument
(
"Not support data type for set_constant attr"
));
}
template
<
typename
T
>
void
apply
(
typename
std
::
enable_if
<!
(
std
::
is_same
<
T
,
int8_t
>::
value
||
std
::
is_same
<
T
,
int16_t
>::
value
)
>::
type
*
=
nullptr
)
const
{
#ifdef PADDLE_WITH_ASCEND_CL
if
(
platform
::
is_npu_place
(
dev_ctx_
.
GetPlace
()))
{
FillNpuTensorWithConstant
<
T
>
(
tensor_
,
static_cast
<
T
>
(
value_
));
}
else
{
math
::
SetConstant
<
DeviceContext
,
T
>
set_constant
;
set_constant
(
dev_ctx_
,
tensor_
,
static_cast
<
T
>
(
value_
));
}
#else
math
::
SetConstant
<
DeviceContext
,
T
>
set_constant
;
set_constant
(
dev_ctx_
,
tensor_
,
static_cast
<
T
>
(
value_
));
#endif
}
const
DeviceContext
&
dev_ctx_
;
framework
::
LoDTensor
*
tensor_
;
float
value_
;
};
template
<
typename
DeviceContext
,
typename
T
>
template
<
typename
DeviceContext
,
typename
T
>
class
CoalesceTensorOpKernel
:
public
framework
::
OpKernel
<
T
>
{
class
CoalesceTensorOpKernel
:
public
framework
::
OpKernel
<
T
>
{
public:
public:
...
@@ -70,6 +109,7 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
...
@@ -70,6 +109,7 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
auto
in_tensors
=
context
.
MultiInput
<
framework
::
LoDTensor
>
(
"Input"
);
auto
in_tensors
=
context
.
MultiInput
<
framework
::
LoDTensor
>
(
"Input"
);
bool
use_align
=
context
.
Attr
<
bool
>
(
"use_align"
);
bool
use_align
=
context
.
Attr
<
bool
>
(
"use_align"
);
auto
align_size
=
context
.
Attr
<
int
>
(
"align_size"
);
auto
align_size
=
context
.
Attr
<
int
>
(
"align_size"
);
auto
size_of_dtype
=
context
.
Attr
<
int
>
(
"user_defined_size_of_dtype"
);
if
(
context
.
Attr
<
bool
>
(
"check_name"
))
{
if
(
context
.
Attr
<
bool
>
(
"check_name"
))
{
for
(
size_t
i
=
0
;
i
<
in_var_names
.
size
();
++
i
)
{
for
(
size_t
i
=
0
;
i
<
in_var_names
.
size
();
++
i
)
{
...
@@ -94,7 +134,9 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
...
@@ -94,7 +134,9 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
size_t
numel
=
0
;
size_t
numel
=
0
;
auto
dtype
=
static_cast
<
framework
::
proto
::
VarType
::
Type
>
(
auto
dtype
=
static_cast
<
framework
::
proto
::
VarType
::
Type
>
(
context
.
Attr
<
int
>
(
"dtype"
));
context
.
Attr
<
int
>
(
"dtype"
));
size_t
size_of_dtype
=
framework
::
SizeOfType
(
dtype
);
if
(
size_of_dtype
==
-
1
)
{
size_of_dtype
=
framework
::
SizeOfType
(
dtype
);
}
GetMemSizeAndDtype
(
in_tensors
,
in_var_names
,
&
numel
,
size_of_dtype
,
GetMemSizeAndDtype
(
in_tensors
,
in_var_names
,
&
numel
,
size_of_dtype
,
context
.
GetPlace
(),
use_align
,
align_size
);
context
.
GetPlace
(),
use_align
,
align_size
);
...
@@ -121,10 +163,9 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
...
@@ -121,10 +163,9 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
:
len
;
:
len
;
}
}
}
else
if
(
context
.
Attr
<
bool
>
(
"set_constant"
))
{
}
else
if
(
context
.
Attr
<
bool
>
(
"set_constant"
))
{
// TODO(Liu yuang) ADD NPU SET_CONSTANT FUNCTION.
framework
::
VisitDataType
(
math
::
SetConstant
<
DeviceContext
,
T
>
set_constant
;
dtype
,
FillConstantVisitor
<
DeviceContext
>
(
set_constant
(
dev_ctx
,
fused_tensor
,
dev_ctx
,
fused_tensor
,
context
.
Attr
<
float
>
(
"constant"
)));
static_cast
<
T
>
(
context
.
Attr
<
float
>
(
"constant"
)));
}
else
if
(
context
.
Attr
<
bool
>
(
"persist_output"
))
{
}
else
if
(
context
.
Attr
<
bool
>
(
"persist_output"
))
{
for
(
size_t
i
=
0
;
i
<
out_var_names
.
size
();
++
i
)
{
for
(
size_t
i
=
0
;
i
<
out_var_names
.
size
();
++
i
)
{
size_t
len
=
static_cast
<
size_t
>
(
out_tensors
[
i
]
->
numel
());
size_t
len
=
static_cast
<
size_t
>
(
out_tensors
[
i
]
->
numel
());
...
@@ -227,10 +268,13 @@ class CoalesceTensorOp : public framework::OperatorWithKernel {
...
@@ -227,10 +268,13 @@ class CoalesceTensorOp : public framework::OperatorWithKernel {
}
}
auto
use_align
=
ctx
->
Attrs
().
Get
<
bool
>
(
"use_align"
);
auto
use_align
=
ctx
->
Attrs
().
Get
<
bool
>
(
"use_align"
);
auto
align_size
=
ctx
->
Attrs
().
Get
<
int
>
(
"align_size"
);
auto
align_size
=
ctx
->
Attrs
().
Get
<
int
>
(
"align_size"
);
auto
size_of_dtype
=
ctx
->
Attrs
().
Get
<
int
>
(
"user_defined_size_of_dtype"
);
auto
dtype
=
static_cast
<
framework
::
proto
::
VarType
::
Type
>
(
auto
dtype
=
static_cast
<
framework
::
proto
::
VarType
::
Type
>
(
ctx
->
Attrs
().
Get
<
int
>
(
"dtype"
));
ctx
->
Attrs
().
Get
<
int
>
(
"dtype"
));
size_t
size_of_dtype
=
framework
::
SizeOfType
(
dtype
);
if
(
size_of_dtype
==
-
1
)
{
size_of_dtype
=
framework
::
SizeOfType
(
dtype
);
}
auto
alignment
=
[](
size_t
size
,
size_t
align_size
)
{
auto
alignment
=
[](
size_t
size
,
size_t
align_size
)
{
size_t
remaining
=
size
%
align_size
;
size_t
remaining
=
size
%
align_size
;
...
@@ -308,6 +352,15 @@ class CoalesceTensorOpMaker : public framework::OpProtoAndCheckerMaker {
...
@@ -308,6 +352,15 @@ class CoalesceTensorOpMaker : public framework::OpProtoAndCheckerMaker {
.
SetDefault
(
true
);
.
SetDefault
(
true
);
AddAttr
<
int
>
(
"align_size"
,
"The alignment size when use_align is True"
)
AddAttr
<
int
>
(
"align_size"
,
"The alignment size when use_align is True"
)
.
SetDefault
(
-
1
);
.
SetDefault
(
-
1
);
AddAttr
<
int
>
(
"user_defined_size_of_dtype"
,
"The user defined size of dtype. This is used to coalesce "
"grad vars and merged_grad vars at the same time. For some "
"strategy, the dtype of fused_grad_vars and the dtype of "
"fused_grad_merged_vars are not identical, which will cause "
"the shape of these two coalesced vars are different. To "
"make sure the shape of these two vars are identical with "
"each other, this attr is added."
)
.
SetDefault
(
-
1
);
AddComment
(
R"DOC(
AddComment
(
R"DOC(
CoalesceTensor Operator.
CoalesceTensor Operator.
...
...
python/paddle/distributed/fleet/base/distributed_strategy.py
浏览文件 @
4d9b2d6d
...
@@ -967,6 +967,28 @@ class DistributedStrategy(object):
...
@@ -967,6 +967,28 @@ class DistributedStrategy(object):
"WARNING: calc_comm_same_stream should have value of boolean type"
"WARNING: calc_comm_same_stream should have value of boolean type"
)
)
@
property
def
fuse_grad_merge
(
self
):
"""
Set whether fuse the grad for gradient merge.
Note: this flag will only effect the gradient merge under pipeline mode
The default value for the fuse_grad_merge is False
Examples:
.. code-block:: python
import paddle.distributed.fleet as fleet
strategy = fleet.DistributedStrategy()
strategy.fuse_param_grad = True
"""
return
self
.
strategy
.
fuse_grad_merge
@
fuse_grad_merge
.
setter
@
is_strict_auto
def
fuse_grad_merge
(
self
,
fuse_grad_merge
):
if
isinstance
(
fuse_grad_merge
,
bool
):
self
.
strategy
.
fuse_grad_merge
=
fuse_grad_merge
else
:
print
(
"WARNING: fuse_grad_merge should have value of boolean type"
)
@
property
@
property
def
fuse_grad_size_in_num
(
self
):
def
fuse_grad_size_in_num
(
self
):
"""
"""
...
...
python/paddle/distributed/fleet/meta_optimizers/sharding/offload_helper.py
浏览文件 @
4d9b2d6d
...
@@ -122,6 +122,9 @@ class OffloadHelper(object):
...
@@ -122,6 +122,9 @@ class OffloadHelper(object):
for
idx
,
op
in
enumerate
(
block
.
ops
):
for
idx
,
op
in
enumerate
(
block
.
ops
):
if
is_optimizer_op
(
op
):
if
is_optimizer_op
(
op
):
break
break
# TODO (Yuang Liu): tmp solution for fuse_grad_merge + optimize_cast
if
not
offload
and
op
.
type
==
'coalesce_tensor'
:
continue
for
input_name
in
op
.
desc
.
input_arg_names
():
for
input_name
in
op
.
desc
.
input_arg_names
():
if
input_name
not
in
param_to_idx
:
if
input_name
not
in
param_to_idx
:
continue
continue
...
...
python/paddle/distributed/fleet/meta_optimizers/sharding/utils.py
浏览文件 @
4d9b2d6d
...
@@ -341,7 +341,11 @@ def insert_allreduce_ops(block,
...
@@ -341,7 +341,11 @@ def insert_allreduce_ops(block,
if
len
(
allreduce_vars
)
==
0
:
if
len
(
allreduce_vars
)
==
0
:
return
return
if
user_defined_strategy
and
user_defined_strategy
.
fuse_all_reduce_ops
:
if
user_defined_strategy
and
\
user_defined_strategy
.
fuse_all_reduce_ops
and
\
not
user_defined_strategy
.
fuse_grad_merge
:
# If fuse_grad_merge is enable, the grad vars have already been fused during
# gradient merge pass, therefore, those vars are not need to be fused here
insert_fused_allreduce_ops
(
block
,
insert_idx
,
ring_id
,
allreduce_vars
,
insert_fused_allreduce_ops
(
block
,
insert_idx
,
ring_id
,
allreduce_vars
,
op_role
,
use_calc_stream
,
op_role
,
use_calc_stream
,
user_defined_strategy
.
fuse_grad_size_in_MB
)
user_defined_strategy
.
fuse_grad_size_in_MB
)
...
...
python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py
浏览文件 @
4d9b2d6d
...
@@ -319,7 +319,9 @@ class ShardingOptimizer(MetaOptimizerBase):
...
@@ -319,7 +319,9 @@ class ShardingOptimizer(MetaOptimizerBase):
main_block
.
_remove_op
(
idx
)
main_block
.
_remove_op
(
idx
)
accumulated_grad_names
=
self
.
_pp_optimizer
.
_accumulate_gradients
(
accumulated_grad_names
=
self
.
_pp_optimizer
.
_accumulate_gradients
(
main_block
,
fp16_allreduce
=
fp16_allreduce
)
main_block
,
fp16_allreduce
=
fp16_allreduce
,
user_defined_strategy
=
strategy
)
len_of_ops
=
len
(
main_block
.
ops
)
len_of_ops
=
len
(
main_block
.
ops
)
first_optimize_op_index
=
get_first_optimize_op_idx
(
main_block
)
first_optimize_op_index
=
get_first_optimize_op_idx
(
main_block
)
...
...
python/paddle/fluid/optimizer.py
浏览文件 @
4d9b2d6d
...
@@ -5037,11 +5037,18 @@ class PipelineOptimizer(object):
...
@@ -5037,11 +5037,18 @@ class PipelineOptimizer(object):
def
_accumulate_gradients
(
self
,
def
_accumulate_gradients
(
self
,
block
,
block
,
pp_allreduce_in_optimize
=
False
,
pp_allreduce_in_optimize
=
False
,
fp16_allreduce
=
False
):
fp16_allreduce
=
False
,
user_defined_strategy
=
None
):
"""
"""
Create a new merged gradient for each parameter and accumulate the
Create a new merged gradient for each parameter and accumulate the
corresponding gradient to it.
corresponding gradient to it.
"""
"""
if
user_defined_strategy
and
user_defined_strategy
.
fuse_grad_merge
:
fused_gradient_names
=
self
.
_accumulate_gradients_with_fuse
(
block
,
fp16_allreduce
,
user_defined_strategy
.
fuse_grad_size_in_MB
)
return
fused_gradient_names
merged_gradient_names
=
[]
merged_gradient_names
=
[]
first_opt_op_idx
=
None
first_opt_op_idx
=
None
...
@@ -5171,6 +5178,252 @@ class PipelineOptimizer(object):
...
@@ -5171,6 +5178,252 @@ class PipelineOptimizer(object):
return
merged_gradient_names
return
merged_gradient_names
def
_accumulate_gradients_with_fuse
(
self
,
main_block
,
fp16
,
fused_size
):
first_opt_op_idx
=
None
grad_param_pairs
=
[]
# obtain all param/grad pairs that needed to be fused
for
index
,
op
in
reversed
(
tuple
(
enumerate
(
list
(
main_block
.
ops
)))):
# remove the cast op of fp16 grad to fp32 grad
if
self
.
_is_optimize_op
(
op
)
and
op
.
type
==
'cast'
:
in_name
=
op
.
input_arg_names
[
0
]
out_name
=
op
.
output_arg_names
[
0
]
if
out_name
.
strip
(
'@GRAD'
)
in
self
.
_param_device_map
:
assert
in_name
.
replace
(
'.cast_fp16'
,
''
)
==
out_name
main_block
.
_remove_op
(
index
)
continue
if
self
.
_is_backward_op
(
op
)
and
first_opt_op_idx
is
None
:
first_opt_op_idx
=
index
+
1
# no optimize phase
if
first_opt_op_idx
==
len
(
main_block
.
ops
):
return
if
self
.
_is_backward_op
(
op
)
and
(
self
.
_op_role_var_key
in
op
.
attr_names
):
op_role_var
=
op
.
attr
(
self
.
_op_role_var_key
)
if
len
(
op_role_var
)
==
0
:
continue
assert
len
(
op_role_var
)
%
2
==
0
for
i
in
range
(
0
,
len
(
op_role_var
),
2
):
param_name
=
op_role_var
[
i
]
if
not
main_block
.
has_var
(
param_name
):
continue
if
'@BroadCast'
in
param_name
:
continue
grad_param_pairs
.
append
(
(
op_role_var
[
i
+
1
],
op_role_var
[
i
]))
if
len
(
grad_param_pairs
)
==
0
:
return
grad_param_segments
=
[]
merged_suffix
=
'@MERGED@FP16'
if
fp16
else
'@MERGED'
dtype
=
paddle
.
float16
if
fp16
else
paddle
.
float32
cur_size
=
0.
last_dtype
=
None
# split the grad based on dtype and fused size
for
grad
,
param
in
grad_param_pairs
:
real_grad
=
main_block
.
var
(
grad
)
# create the gradient merged var for each grad
merged_grad_var
=
main_block
.
create_var
(
name
=
param
+
core
.
grad_var_suffix
()
+
merged_suffix
,
dtype
=
dtype
,
shape
=
real_grad
.
shape
,
persistable
=
True
,
stop_gradient
=
False
)
real_param
=
main_block
.
var
(
param
)
tmp_size
=
self
.
_get_var_size
(
real_grad
)
# two strategies for splitting the grad
# 1. the current segment's size reach the user defined grad_size_in_MB
# 2. the upcoming grad holds different dtype compared with grads in current segment
if
len
(
grad_param_segments
)
==
0
\
or
cur_size
+
tmp_size
>
fused_size
\
or
real_grad
.
dtype
!=
last_dtype
:
grad_param_segments
.
append
(
([
real_grad
],
[
real_param
],
[
merged_grad_var
]))
last_dtype
=
real_grad
.
dtype
cur_size
=
0.
else
:
grad_param_segments
[
-
1
][
0
].
append
(
real_grad
)
grad_param_segments
[
-
1
][
1
].
append
(
real_param
)
grad_param_segments
[
-
1
][
2
].
append
(
merged_grad_var
)
cur_size
+=
tmp_size
fused_gradients
=
[]
fused_merged_gradients
=
[]
# create fused vars for grad and param
for
grad_param_segment
in
grad_param_segments
:
grad_segment
=
grad_param_segment
[
0
]
merged_grad_segment
=
grad_param_segment
[
2
]
fused_grad
=
main_block
.
create_var
(
name
=
'FusedGrad_{}'
.
format
(
grad_segment
[
0
].
name
),
dtype
=
grad_segment
[
0
].
dtype
,
persistable
=
False
,
stop_gradient
=
False
)
# keep the '.cast_fp16' info in the fuse var name
fused_merged_grad_name_prefix
=
'FusedMergedGrad.cast_fp16.'
if
\
merged_grad_segment
[
0
].
dtype
==
paddle
.
float16
else
'FusedMergedGrad'
fused_merged_grad_name
=
fused_merged_grad_name_prefix
+
'_{}'
.
format
(
merged_grad_segment
[
0
].
name
)
fused_merged_grad
=
main_block
.
create_var
(
name
=
fused_merged_grad_name
,
dtype
=
merged_grad_segment
[
0
].
dtype
,
persistable
=
True
,
stop_gradient
=
False
)
fused_gradients
.
append
(
fused_grad
)
fused_merged_gradients
.
append
(
fused_merged_grad
)
assert
len
(
fused_gradients
)
==
len
(
grad_param_segments
)
assert
len
(
fused_merged_gradients
)
==
len
(
grad_param_segments
)
# insert coalesce op at the start of the backward pass
# use param as the coalesce input to make sure the two Fused vars are in same shape
first_back_op_idx
=
None
for
index
,
op
in
enumerate
(
main_block
.
ops
):
if
self
.
_is_backward_op
(
op
)
and
first_back_op_idx
is
None
:
first_back_op_idx
=
index
break
assert
first_back_op_idx
is
not
None
offset
=
0
for
i
in
range
(
len
(
grad_param_segments
)):
fused_grad
=
fused_gradients
[
i
]
fused_merged_grad
=
fused_merged_gradients
[
i
]
grads
=
grad_param_segments
[
i
][
0
]
params
=
grad_param_segments
[
i
][
1
]
merged_grads
=
grad_param_segments
[
i
][
2
]
main_block
.
_insert_op_without_sync
(
first_back_op_idx
+
offset
,
type
=
"coalesce_tensor"
,
inputs
=
{
"Input"
:
params
},
outputs
=
{
"Output"
:
grads
,
"FusedOutput"
:
fused_grad
},
attrs
=
{
# Explanation of user_defined_size_of_dtype:
# In coalesce op, the align size is 256 bytes
# the float takes 4 bytes while fp16 takes 2 bytes.
# To meet the requirement, 128 fp16 or 64 float will be aligned
# Think the total shape of the input tensors if [64],
# if the dtype is float, then the shape of the fuse var is [64]
# however if the dytpe if fp16, the shape of the fuse var is [128],
# which will cause the fused vars' shape vary between each other.
# To make sure the shape of the fused vars are identical,
# we set the dtype of float and fp16 both to 2.
# Under this way, the fused vars' shape for float and fp16 are all [128]
"user_defined_size_of_dtype"
:
2
,
"copy_data"
:
False
,
"use_align"
:
True
,
"dtype"
:
grads
[
0
].
dtype
,
self
.
_op_role_key
:
self
.
_op_role
.
Backward
})
offset
+=
1
# For the gradient_merged_fused_var, given a init value during the coalesce op
# this will remove a problematic fill_constant op. This op role of this coalesce
# is set to be LRSched to make this coalesce (with init) only run once
main_block
.
_insert_op_without_sync
(
first_back_op_idx
+
offset
,
type
=
"coalesce_tensor"
,
inputs
=
{
"Input"
:
params
},
outputs
=
{
"Output"
:
merged_grads
,
"FusedOutput"
:
fused_merged_grad
},
attrs
=
{
"user_defined_size_of_dtype"
:
2
,
"set_constant"
:
True
,
"constant"
:
float
(
0.0
),
"copy_data"
:
False
,
"use_align"
:
True
,
"dtype"
:
merged_grads
[
0
].
dtype
,
self
.
_op_role_key
:
self
.
_op_role
.
Optimize
.
LRSched
})
offset
+=
1
# insert gradient merge relating ops
first_opt_op_idx
+=
offset
offset
=
0
for
i
in
range
(
len
(
fused_gradients
)):
fused_grad
=
fused_gradients
[
i
]
fused_merged_grad
=
fused_merged_gradients
[
i
]
is_fp16_grad
=
'cast_fp16'
in
fused_grad
.
name
need_cast
=
(
is_fp16_grad
is
not
fp16
)
if
need_cast
:
# for fp16 allreduce, cast fp32 grad to fp16
# for fp32 allreduce, cast fp16 grad to fp32
cast_grad_var_name
=
fused_grad
.
name
+
'@TMP'
cast_grad_var
=
main_block
.
create_var
(
name
=
cast_grad_var_name
,
dtype
=
dtype
,
persistable
=
False
,
stop_gradient
=
False
)
main_block
.
_insert_op
(
index
=
first_opt_op_idx
+
offset
,
type
=
'cast'
,
inputs
=
{
'X'
:
fused_grad
},
outputs
=
{
'Out'
:
cast_grad_var
},
attrs
=
{
'in_dtype'
:
fused_grad
.
dtype
,
'out_dtype'
:
cast_grad_var
.
dtype
,
self
.
_op_role_key
:
self
.
_op_role
.
Backward
,
})
offset
+=
1
fused_grad
=
cast_grad_var
main_block
.
_insert_op
(
index
=
first_opt_op_idx
+
offset
,
type
=
'sum'
,
inputs
=
{
'X'
:
[
fused_merged_grad
,
fused_grad
]},
outputs
=
{
'Out'
:
fused_merged_grad
},
attrs
=
{
self
.
_op_role_key
:
self
.
_op_role
.
Backward
})
offset
+=
1
if
fp16
:
# if using fp16 allreduce, the optimizer needs fp32 grads, cast them back to fp32
for
grad
,
param
in
grad_param_pairs
:
real_grad
=
main_block
.
var
(
grad
)
fp16_grad_name
=
param
+
core
.
grad_var_suffix
()
+
'@MERGED@FP16'
assert
main_block
.
has_var
(
fp16_grad_name
)
fp16_grad
=
main_block
.
var
(
fp16_grad_name
)
fp32_grad_name
=
param
+
core
.
grad_var_suffix
()
+
'@MERGED'
fp32_grad
=
main_block
.
create_var
(
name
=
fp32_grad_name
,
dtype
=
paddle
.
float32
,
shape
=
real_grad
.
shape
,
persistable
=
False
,
stop_gradient
=
False
)
main_block
.
_insert_op
(
index
=
first_opt_op_idx
+
offset
,
type
=
'cast'
,
inputs
=
{
'X'
:
fp16_grad
},
outputs
=
{
'Out'
:
fp32_grad
},
attrs
=
{
'in_dtype'
:
paddle
.
float16
,
'out_dtype'
:
paddle
.
float32
,
self
.
_op_role_key
:
self
.
_op_role
.
Optimize
,
})
offset
+=
1
# replace the var with it's name, which will be used for inserting allreduce
for
i
in
range
(
len
(
fused_merged_gradients
)):
fused_merged_gradients
[
i
]
=
fused_merged_gradients
[
i
].
name
main_block
.
_sync_with_cpp
()
return
fused_merged_gradients
def
_get_var_size
(
self
,
var
):
dtype_to_size
=
{
core
.
VarDesc
.
VarType
.
FP16
:
2
,
core
.
VarDesc
.
VarType
.
FP32
:
4
,
core
.
VarDesc
.
VarType
.
FP64
:
8
,
core
.
VarDesc
.
VarType
.
INT16
:
2
,
core
.
VarDesc
.
VarType
.
INT32
:
4
,
core
.
VarDesc
.
VarType
.
INT64
:
8
,
core
.
VarDesc
.
VarType
.
BOOL
:
1
,
core
.
VarDesc
.
VarType
.
UINT8
:
1
,
}
assert
-
1
not
in
var
.
shape
return
reduce
(
lambda
x
,
y
:
x
*
y
,
var
.
shape
)
*
dtype_to_size
[
var
.
dtype
]
/
1024.0
/
1024.0
def
_add_sub_blocks
(
self
,
main_block
,
program_list
):
def
_add_sub_blocks
(
self
,
main_block
,
program_list
):
main_program
=
main_block
.
program
main_program
=
main_block
.
program
for
prog
in
program_list
:
for
prog
in
program_list
:
...
...
python/paddle/fluid/tests/unittests/npu/test_coalesce_tensor_op_npu.py
浏览文件 @
4d9b2d6d
...
@@ -90,7 +90,8 @@ class TestAllocContinuousSpace2(TestAllocContinuousSpace):
...
@@ -90,7 +90,8 @@ class TestAllocContinuousSpace2(TestAllocContinuousSpace):
"set_constant"
:
False
,
"set_constant"
:
False
,
"constant"
:
0.5
,
"constant"
:
0.5
,
"use_align"
:
True
,
"use_align"
:
True
,
"dtype"
:
self
.
fluid_dtype
"dtype"
:
self
.
fluid_dtype
,
"user_defined_size_of_dtype"
:
2
}
}
def
test_check_output
(
self
):
def
test_check_output
(
self
):
...
...
python/paddle/fluid/tests/unittests/test_coalesce_tensor_op.py
浏览文件 @
4d9b2d6d
...
@@ -92,7 +92,8 @@ class TestAllocContinuousSpace2(TestAllocContinuousSpace):
...
@@ -92,7 +92,8 @@ class TestAllocContinuousSpace2(TestAllocContinuousSpace):
"copy_data"
:
False
,
"copy_data"
:
False
,
"set_constant"
:
True
,
"set_constant"
:
True
,
"constant"
:
0.5
,
"constant"
:
0.5
,
"dtype"
:
self
.
fluid_dtype
"dtype"
:
self
.
fluid_dtype
,
"user_defined_size_of_dtype"
:
2
}
}
def
test_check_output
(
self
):
def
test_check_output
(
self
):
...
...
python/paddle/fluid/tests/unittests/test_fleet_sharding_meta_optimizer.py
浏览文件 @
4d9b2d6d
...
@@ -1050,6 +1050,189 @@ class TestFleetShardingHybridOptimizer(TestFleetMetaOptimizer):
...
@@ -1050,6 +1050,189 @@ class TestFleetShardingHybridOptimizer(TestFleetMetaOptimizer):
self
.
assertEqual
(
dp_group_waiting_ports
,
[
'127.0.0.1:36002'
])
self
.
assertEqual
(
dp_group_waiting_ports
,
[
'127.0.0.1:36002'
])
def
test_hybrid_with_pp_dp_amp_fp16allreduce_optimize_cast_with_gradient_fuse
(
self
):
train_prog
,
startup_prog
=
paddle
.
fluid
.
Program
(),
paddle
.
fluid
.
Program
(
)
avg_cost
,
strategy
=
self
.
pp_net
(
train_prog
,
startup_prog
)
strategy
.
amp
=
True
strategy
.
amp_configs
=
{
'custom_black_varnames'
:
[
'fc_6.b_0'
],
}
strategy
.
sharding
=
True
strategy
.
sharding_configs
=
{
"sharding_degree"
:
1
,
"mp_degree"
:
1
,
"pp_degree"
:
2
,
"dp_degree"
:
2
,
"optimize_cast"
:
True
,
}
strategy
.
pipeline
=
True
strategy
.
pipeline_configs
=
{
"schedule_mode"
:
"1F1B"
,
"micro_batch_size"
:
2
,
"accumulate_steps"
:
4
,
}
strategy
.
fp16_allreduce
=
True
strategy
.
fuse_grad_merge
=
True
self
.
optimizer
(
avg_cost
,
strategy
,
train_prog
,
startup_prog
)
train_prog
=
train_prog
.
_pipeline_opt
[
'section_program'
]
startup_prog
=
startup_prog
.
_pipeline_opt
[
'startup_program'
]
startup_prog_ops
=
startup_prog
.
global_block
().
ops
main_prog_ops
=
train_prog
.
global_block
().
ops
# check program
startup_prog_op_types
=
[
op
.
type
for
op
in
startup_prog_ops
]
main_prog_op_types
=
[
op
.
type
for
op
in
main_prog_ops
]
# ring: mp, pp_group, pp_pair, pp_pair
self
.
assertEqual
(
startup_prog_op_types
,
[
'uniform_random'
,
'cast'
,
'fill_constant'
,
'cast'
,
'uniform_random'
,
'cast'
,
'fill_constant'
,
'cast'
,
'uniform_random'
,
'cast'
,
'fill_constant'
,
'cast'
,
'uniform_random'
,
'cast'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'c_gen_nccl_id'
,
'c_comm_init'
,
'c_gen_nccl_id'
,
'c_comm_init'
,
'c_gen_nccl_id'
,
'c_comm_init'
,
'c_gen_nccl_id'
,
'c_comm_init'
,
'c_sync_comm_stream'
])
self
.
assertEqual
(
main_prog_op_types
,
[
'recv_v2'
,
'mul'
,
'elementwise_add'
,
'tanh'
,
'mul'
,
'elementwise_add'
,
'tanh'
,
'mul'
,
'elementwise_add'
,
'tanh'
,
'mul'
,
'cast'
,
'elementwise_add'
,
'softmax'
,
'cross_entropy2'
,
'mean'
,
'elementwise_mul'
,
'coalesce_tensor'
,
'coalesce_tensor'
,
'coalesce_tensor'
,
'coalesce_tensor'
,
'fill_constant'
,
'scale'
,
'scale'
,
'elementwise_mul_grad'
,
'mean_grad'
,
'cross_entropy_grad2'
,
'softmax_grad'
,
'elementwise_add_grad'
,
'cast'
,
'mul_grad'
,
'tanh_grad'
,
'elementwise_add_grad'
,
'mul_grad'
,
'tanh_grad'
,
'elementwise_add_grad'
,
'mul_grad'
,
'tanh_grad'
,
'elementwise_add_grad'
,
'mul_grad'
,
'c_sync_calc_stream'
,
'send_v2'
,
'sum'
,
'cast'
,
'sum'
,
'c_allreduce_sum'
,
'c_allreduce_sum'
,
'cast'
,
'cast'
,
'cast'
,
'cast'
,
'cast'
,
'cast'
,
'cast'
,
'cast'
,
'c_sync_comm_stream'
,
'check_finite_and_unscale'
,
'cast'
,
'c_allreduce_max'
,
'cast'
,
'update_loss_scaling'
,
'momentum'
,
'cast'
,
'momentum'
,
'cast'
,
'momentum'
,
'cast'
,
'momentum'
,
'cast'
,
'momentum'
,
'cast'
,
'momentum'
,
'cast'
,
'momentum'
,
'momentum'
,
'cast'
])
# amp check_finite_and_unscale, allreduce(pp)
self
.
assertEqual
(
main_prog_op_types
.
count
(
'c_allreduce_max'
),
1
)
# should has ring id for pp
created_ring_ids
=
[
op
.
desc
.
attr
(
"ring_id"
)
for
op
in
startup_prog_ops
if
op
.
type
==
"c_comm_init"
]
self
.
assertIn
(
self
.
pp_pair_ring_id
,
created_ring_ids
)
self
.
assertIn
(
self
.
dp_ring_id
,
created_ring_ids
)
# check correctness of pp group
for
op
in
startup_prog_ops
:
if
op
.
type
==
"c_gen_nccl_id"
and
op
.
desc
.
output_arg_names
()[
0
]
==
"comm_id_0"
:
pp_group_waiting_ports
=
op
.
desc
.
attr
(
"other_endpoints"
)
self
.
assertEqual
(
pp_group_waiting_ports
,
[
'127.0.0.1:36003'
])
# check correctness of dp group
for
op
in
startup_prog_ops
:
if
op
.
type
==
"c_gen_nccl_id"
and
op
.
desc
.
output_arg_names
()[
0
]
==
"comm_id_3"
:
dp_group_waiting_ports
=
op
.
desc
.
attr
(
"other_endpoints"
)
self
.
assertEqual
(
dp_group_waiting_ports
,
[
'127.0.0.1:36002'
])
def
test_hybrid_with_pp_dp_amp_with_gradient_fuse
(
self
):
train_prog
,
startup_prog
=
paddle
.
fluid
.
Program
(),
paddle
.
fluid
.
Program
(
)
avg_cost
,
strategy
=
self
.
pp_net
(
train_prog
,
startup_prog
)
strategy
.
amp
=
True
strategy
.
amp_configs
=
{
'custom_black_varnames'
:
[
'fc_6.b_0'
],
}
strategy
.
sharding
=
True
strategy
.
sharding_configs
=
{
"sharding_degree"
:
1
,
"mp_degree"
:
1
,
"pp_degree"
:
2
,
"dp_degree"
:
2
,
}
strategy
.
pipeline
=
True
strategy
.
pipeline_configs
=
{
"schedule_mode"
:
"1F1B"
,
"micro_batch_size"
:
2
,
"accumulate_steps"
:
4
,
}
strategy
.
fuse_grad_merge
=
True
self
.
optimizer
(
avg_cost
,
strategy
,
train_prog
,
startup_prog
)
train_prog
=
train_prog
.
_pipeline_opt
[
'section_program'
]
startup_prog
=
startup_prog
.
_pipeline_opt
[
'startup_program'
]
startup_prog_ops
=
startup_prog
.
global_block
().
ops
main_prog_ops
=
train_prog
.
global_block
().
ops
# check program
startup_prog_op_types
=
[
op
.
type
for
op
in
startup_prog_ops
]
main_prog_op_types
=
[
op
.
type
for
op
in
main_prog_ops
]
# ring: mp, pp_group, pp_pair, pp_pair
self
.
assertEqual
(
startup_prog_op_types
,
[
'uniform_random'
,
'fill_constant'
,
'uniform_random'
,
'fill_constant'
,
'uniform_random'
,
'fill_constant'
,
'uniform_random'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'fill_constant'
,
'c_gen_nccl_id'
,
'c_comm_init'
,
'c_gen_nccl_id'
,
'c_comm_init'
,
'c_gen_nccl_id'
,
'c_comm_init'
,
'c_gen_nccl_id'
,
'c_comm_init'
,
'c_sync_comm_stream'
])
self
.
assertEqual
(
main_prog_op_types
,
[
'recv_v2'
,
'cast'
,
'mul'
,
'cast'
,
'elementwise_add'
,
'tanh'
,
'cast'
,
'mul'
,
'cast'
,
'elementwise_add'
,
'tanh'
,
'cast'
,
'mul'
,
'cast'
,
'elementwise_add'
,
'tanh'
,
'cast'
,
'mul'
,
'cast'
,
'elementwise_add'
,
'softmax'
,
'cross_entropy2'
,
'mean'
,
'elementwise_mul'
,
'coalesce_tensor'
,
'coalesce_tensor'
,
'coalesce_tensor'
,
'coalesce_tensor'
,
'fill_constant'
,
'scale'
,
'scale'
,
'elementwise_mul_grad'
,
'mean_grad'
,
'cross_entropy_grad2'
,
'softmax_grad'
,
'elementwise_add_grad'
,
'cast'
,
'mul_grad'
,
'tanh_grad'
,
'elementwise_add_grad'
,
'mul_grad'
,
'tanh_grad'
,
'elementwise_add_grad'
,
'mul_grad'
,
'tanh_grad'
,
'elementwise_add_grad'
,
'mul_grad'
,
'c_sync_calc_stream'
,
'send_v2'
,
'cast'
,
'sum'
,
'sum'
,
'c_allreduce_sum'
,
'c_allreduce_sum'
,
'c_sync_comm_stream'
,
'check_finite_and_unscale'
,
'cast'
,
'c_allreduce_max'
,
'cast'
,
'update_loss_scaling'
,
'momentum'
,
'momentum'
,
'momentum'
,
'momentum'
,
'momentum'
,
'momentum'
,
'momentum'
,
'momentum'
])
# amp check_finite_and_unscale, allreduce(pp)
self
.
assertEqual
(
main_prog_op_types
.
count
(
'c_allreduce_max'
),
1
)
# should has ring id for pp
created_ring_ids
=
[
op
.
desc
.
attr
(
"ring_id"
)
for
op
in
startup_prog_ops
if
op
.
type
==
"c_comm_init"
]
self
.
assertIn
(
self
.
pp_pair_ring_id
,
created_ring_ids
)
self
.
assertIn
(
self
.
dp_ring_id
,
created_ring_ids
)
# check correctness of pp group
for
op
in
startup_prog_ops
:
if
op
.
type
==
"c_gen_nccl_id"
and
op
.
desc
.
output_arg_names
()[
0
]
==
"comm_id_0"
:
pp_group_waiting_ports
=
op
.
desc
.
attr
(
"other_endpoints"
)
self
.
assertEqual
(
pp_group_waiting_ports
,
[
'127.0.0.1:36003'
])
# check correctness of dp group
for
op
in
startup_prog_ops
:
if
op
.
type
==
"c_gen_nccl_id"
and
op
.
desc
.
output_arg_names
()[
0
]
==
"comm_id_3"
:
dp_group_waiting_ports
=
op
.
desc
.
attr
(
"other_endpoints"
)
self
.
assertEqual
(
dp_group_waiting_ports
,
[
'127.0.0.1:36002'
])
if
__name__
==
"__main__"
:
if
__name__
==
"__main__"
:
unittest
.
main
()
unittest
.
main
()
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录