Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
magicwindyyd
mindspore
提交
39f08eb7
M
mindspore
项目概览
magicwindyyd
/
mindspore
与 Fork 源项目一致
Fork自
MindSpore / mindspore
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
M
mindspore
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
39f08eb7
编写于
7月 14, 2020
作者:
Z
Ziyan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enable optimizer parallel
上级
7976d775
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
80 addition
and
92 deletion
+80
-92
mindspore/ccsrc/backend/optimizer/pass/communication_op_fusion.cc
...e/ccsrc/backend/optimizer/pass/communication_op_fusion.cc
+4
-1
mindspore/context.py
mindspore/context.py
+5
-1
mindspore/nn/optim/optimizer.py
mindspore/nn/optim/optimizer.py
+4
-4
mindspore/nn/wrap/grad_reducer.py
mindspore/nn/wrap/grad_reducer.py
+60
-75
mindspore/parallel/_auto_parallel_context.py
mindspore/parallel/_auto_parallel_context.py
+1
-1
tests/ut/python/parallel/test_parallel_optimizer.py
tests/ut/python/parallel/test_parallel_optimizer.py
+4
-8
tests/ut/python/parallel/test_set_auto_parallel_context.py
tests/ut/python/parallel/test_set_auto_parallel_context.py
+2
-2
未找到文件。
mindspore/ccsrc/backend/optimizer/pass/communication_op_fusion.cc
浏览文件 @
39f08eb7
...
...
@@ -100,7 +100,10 @@ bool CommunicationOpFusion::GetSplitSegments(const CommunicationOpInfo &communic
auto
parallel_context
=
parallel
::
ParallelContext
::
GetInstance
();
MS_EXCEPTION_IF_NULL
(
parallel_context
);
const
auto
&
split_indices
=
parallel_context
->
GetAllReduceFusionSplitIndices
(
group
);
std
::
vector
<
uint32_t
>
split_indices
;
if
(
!
parallel_context
->
enable_parallel_optimizer
())
{
split_indices
=
parallel_context
->
GetAllReduceFusionSplitIndices
(
group
);
}
size_t
segments
=
0
;
if
(
split_indices
.
size
()
!=
0
)
{
...
...
mindspore/context.py
浏览文件 @
39f08eb7
...
...
@@ -443,7 +443,7 @@ def _context():
@
args_type_check
(
device_num
=
int
,
global_rank
=
int
,
mirror_mean
=
bool
,
cast_before_mirror
=
bool
,
parallel_mode
=
str
,
auto_parallel_search_mode
=
str
,
parameter_broadcast
=
bool
,
strategy_ckpt_load_file
=
str
,
strategy_ckpt_save_file
=
str
,
full_batch
=
bool
)
strategy_ckpt_save_file
=
str
,
full_batch
=
bool
,
enable_parallel_optimizer
=
bool
)
def
set_auto_parallel_context
(
**
kwargs
):
"""
Set auto parallel context.
...
...
@@ -487,6 +487,9 @@ def set_auto_parallel_context(**kwargs):
strategy_ckpt_load_file (str): The path to load parallel strategy checkpoint. Default: ''
strategy_ckpt_save_file (str): The path to save parallel strategy checkpoint. Default: ''
full_batch (bool): Whether to load the whole batch on each device. Default: False.
enable_parallel_optimizer(bool): This is a developing feature, which shards the weight update computation in
data parallel training in the benefit of time and memory saving.
Raises:
ValueError: If input key is not attribute in auto parallel context.
...
...
@@ -532,6 +535,7 @@ def reset_auto_parallel_context():
- parameter_broadcast: False.
- strategy_ckpt_load_file: "".
- strategy_ckpt_save_file: "".
- enable_parallel_optimizer: False.
"""
_reset_auto_parallel_context
()
...
...
mindspore/nn/optim/optimizer.py
浏览文件 @
39f08eb7
...
...
@@ -28,8 +28,8 @@ from mindspore._checkparam import Validator as validator
from
mindspore._checkparam
import
Rel
from
mindspore
import
log
as
logger
from
mindspore.parallel._utils
import
_get_global_rank
,
_get_device_num
,
_get_parallel_mode
from
mindspore.parallel._auto_parallel_context
import
auto_parallel_context
from
mindspore.train.parallel_utils
import
ParallelMode
from
mindspore
import
context
__all__
=
[
'Optimizer'
]
...
...
@@ -157,13 +157,12 @@ class Optimizer(Cell):
self
.
param_length
=
len
(
self
.
parameters
)
self
.
map_
=
C
.
Map
()
use_parallel
=
auto_parallel_context
().
get_enable_parallel_optimizer
(
)
use_parallel
=
context
.
get_auto_parallel_context
(
"enable_parallel_optimizer"
)
self
.
use_parallel
=
use_parallel
if
use_parallel
:
if
self
.
cls_name
not
in
[
"Lamb"
,
"AdamWeightDecayDynamicLR"
,
"AdamWeightDecay"
]:
raise
RuntimeError
(
"Optimizer segmentation does not support optimizer {}"
.
format
(
self
.
cls_name
))
if
_get_parallel_mode
()
not
in
[
ParallelMode
.
HYBRID_PARALLEL
,
ParallelMode
.
DATA_PARALLEL
,
ParallelMode
.
AUTO_PARALLEL
]:
if
_get_parallel_mode
()
!=
ParallelMode
.
DATA_PARALLEL
:
raise
RuntimeError
(
"Optimizer segmentation does not support parallel mode {}"
.
format
(
_get_parallel_mode
()))
self
.
dev_num
=
_get_device_num
()
...
...
@@ -175,6 +174,7 @@ class Optimizer(Cell):
self
.
param_names
=
[]
for
param
in
self
.
parameters
:
self
.
param_names
.
append
(
param
.
name
)
else
:
self
.
optim_filter
=
(
True
,)
*
self
.
param_length
...
...
mindspore/nn/wrap/grad_reducer.py
浏览文件 @
39f08eb7
...
...
@@ -13,107 +13,95 @@
# limitations under the License.
# ============================================================================
"""grad reducer cell for distributed training"""
from
mindspore
import
context
from
mindspore.nn.cell
import
Cell
from
mindspore.communication.management
import
GlobalComm
,
get_group_size
from
mindspore.ops
import
functional
as
F
,
composite
as
C
,
operations
as
P
from
mindspore.ops.operations.comm_ops
import
AllReduce
,
ReduceOp
,
AllGather
from
mindspore.ops.operations.comm_ops
import
AllReduce
,
AllGather
from
mindspore.parallel._auto_parallel_context
import
auto_parallel_context
import
mindspore.common.dtype
as
mstype
reduce_opt
=
C
.
MultitypeFuncGraph
(
"reduce_opt"
)
_all_reduce
=
AllReduce
()
_all_gather
=
None
def
_init_optimizer_communication
():
global
_all_reduce
global
_all_gather
_all_reduce
=
AllReduce
(
ReduceOp
.
SUM
,
GlobalComm
.
WORLD_COMM_GROUP
)
_all_reduce
.
add_prim_attr
(
'fusion'
,
1
)
_all_gather
=
AllGather
(
GlobalComm
.
WORLD_COMM_GROUP
)
@
reduce_opt
.
register
(
"Function"
,
"Number"
,
"Bool"
,
"Tensor"
)
def
_tensors_allreduce_mean
(
mul
,
degree
,
allreduce_filter
,
grad
):
def
_init_allreduce_operators
(
length
):
""" initialize allreduce communication operators"""
is_parallel_optimizer
=
context
.
get_auto_parallel_context
(
"enable_parallel_optimizer"
)
split_indices
=
auto_parallel_context
().
get_all_reduce_fusion_split_indices
()
if
is_parallel_optimizer
and
split_indices
:
group
=
1
fusion
=
()
for
i
in
range
(
length
):
fusion
=
fusion
+
(
group
,)
if
split_indices
[
group
-
1
]
<=
i
+
1
:
if
group
>=
len
(
split_indices
):
continue
group
=
group
+
1
index
=
tuple
(
range
(
1
,
length
+
1
))
else
:
fusion
=
(
1
,)
*
length
index
=
(
0
,)
*
length
opt_list
=
()
for
i
in
range
(
length
):
opt
=
AllReduce
(
'sum'
,
GlobalComm
.
WORLD_COMM_GROUP
)
opt
.
add_prim_attr
(
'fusion'
,
fusion
[
i
])
opt
.
add_prim_attr
(
'index'
,
index
[
i
])
opt_list
=
opt_list
+
(
opt
,)
return
opt_list
@
reduce_opt
.
register
(
"Number"
,
"Bool"
,
"Function"
,
"Bool"
,
"Tensor"
,
"Function"
)
def
_tensors_allreduce
(
degree
,
mean
,
allgather
,
allreduce_filter
,
grad
,
allreduce
):
"""
Apply
mean and allreduce on gradient. Allreduce is a communication operation used for distributed deep learning
.
Apply
allreduce on gradient
.
Args:
mul (Primitive): Div operation.
degree (int): The mean coefficient.
mean (bool): When mean is true, the mean coefficient (degree) would apply on gradients.
allgather (Primitive): The communication operator for sparse gradients.
allreduce_filter (bool): When it is true, allreduce would apply.
grad (Tensor): The gradient tensor before operation.
allreduce (Primitive): The communication operator for gradients.
Returns:
Tensor, the gradient tensor after operation.
"""
if
allreduce_filter
:
degree
=
F
.
scalar_cast
(
degree
,
F
.
dtype
(
grad
))
grad
=
_all_reduce
(
grad
)
cast_op
=
P
.
Cast
()
return
mul
(
grad
,
cast_op
(
F
.
scalar_to_array
(
1.0
/
degree
),
F
.
dtype
(
grad
)))
grad
=
allreduce
(
grad
)
if
mean
:
degree
=
F
.
scalar_cast
(
degree
,
F
.
dtype
(
grad
))
cast_op
=
P
.
Cast
()
mul_op
=
P
.
Mul
()
grad
=
mul_op
(
grad
,
cast_op
(
F
.
scalar_to_array
(
1.0
/
degree
),
F
.
dtype
(
grad
)))
return
grad
return
grad
@
reduce_opt
.
register
(
"
Function"
,
"Number"
,
"Bool"
,
"Tuple
"
)
def
_tensors_allreduce_
mean_with_sparse
(
mul
,
degree
,
allreduce_filter
,
grad
):
@
reduce_opt
.
register
(
"
Number"
,
"Bool"
,
"Function"
,
"Bool"
,
"Tuple"
,
"Function
"
)
def
_tensors_allreduce_
with_sparse
(
degree
,
mean
,
allgather
,
allreduce_filter
,
grad
,
allreduce
):
"""
Apply
mean and
allgather on gradient instead of allreduce for sparse feature.
Apply allgather on gradient instead of allreduce for sparse feature.
Allgather is a communication operation used for distributed deep learning.
Args:
mul (Primitive): Div operation.
degree (int): The mean coefficient.
mean (bool): When mean is true, the mean coefficient (degree) would apply on gradients.
allgather (Primitive): The communication operator for sparse gradients.
allreduce_filter (bool): When it is true, allgather would apply.
grad (Tuple): The indices, gradient tensor and tensor_shape before operation.
grad (tuple): The indices, gradient tensor and tensor_shape before operation.
allreduce (Primitive): The communication operator for gradients.
Returns:
Tuple, include indices, the gradient tensor and tensor_shape after operation.
"""
if
allreduce_filter
:
indices
=
_all_gather
(
grad
[
0
])
degree
=
F
.
scalar_cast
(
degree
,
F
.
dtype
(
grad
[
1
]))
dout
=
_all_gather
(
grad
[
1
])
cast_op
=
P
.
Cast
()
dout
=
mul
(
dout
,
cast_op
(
F
.
scalar_to_array
(
1.0
/
degree
),
F
.
dtype
(
dout
)))
grad
=
(
indices
,
dout
,
grad
[
2
])
return
grad
@
reduce_opt
.
register
(
"Bool"
,
"Tensor"
)
def
_tensors_allreduce
(
allreduce_filter
,
grad
):
"""
Apply allreduce on gradient.
Args:
allreduce_filter (bool): When it is true, allreduce would apply.
grad (Tensor): The gradient tensor before operation.
Returns:
Tensor, the gradient tensor after operation.
"""
if
allreduce_filter
:
return
_all_reduce
(
grad
)
return
grad
@
reduce_opt
.
register
(
"Bool"
,
"Tuple"
)
def
_tensors_allreduce_with_sparse
(
allreduce_filter
,
grad
):
"""
Apply mean and allgather on gradient instead of allreduce for sparse feature.
Allgather is a communication operation used for distributed deep learning.
Args:
allreduce_filter (bool): When it is true, allgather would apply.
grad (Tuple): The indices, gradient tensor and tensor_shape before operation.
Returns:
Tuple, include indices, the gradient tensor and tensor_shape after operation.
"""
if
allreduce_filter
:
indices
=
_all_gather
(
grad
[
0
])
dout
=
_all_gather
(
grad
[
1
])
indices
=
allgather
(
grad
[
0
])
dout
=
allgather
(
grad
[
1
])
if
mean
:
degree
=
F
.
scalar_cast
(
degree
,
F
.
dtype
(
grad
[
1
]))
cast_op
=
P
.
Cast
()
mul_op
=
P
.
Mul
()
dout
=
mul_op
(
dout
,
cast_op
(
F
.
scalar_to_array
(
1.0
/
degree
),
F
.
dtype
(
dout
)))
grad
=
(
indices
,
dout
,
grad
[
2
])
return
grad
...
...
@@ -259,7 +247,6 @@ class DistributedGradReducer(Cell):
def
__init__
(
self
,
parameters
,
mean
=
True
,
degree
=
None
):
super
(
DistributedGradReducer
,
self
).
__init__
(
auto_prefix
=
False
)
self
.
map_
=
C
.
Map
()
self
.
mul
=
P
.
Mul
()
if
degree
is
None
:
self
.
degree
=
get_group_size
()
else
:
...
...
@@ -268,7 +255,8 @@ class DistributedGradReducer(Cell):
self
.
degree
=
degree
self
.
mean
=
mean
self
.
allreduce_filter
=
tuple
(
x
.
layerwise_parallel
is
False
for
x
in
parameters
)
_init_optimizer_communication
()
self
.
opt_list
=
_init_allreduce_operators
(
len
(
parameters
))
self
.
allgather
=
AllGather
(
GlobalComm
.
WORLD_COMM_GROUP
)
def
construct
(
self
,
grads
):
"""
...
...
@@ -284,11 +272,8 @@ class DistributedGradReducer(Cell):
"""
datatypes
=
self
.
map_
(
F
.
partial
(
_get_datatype
),
grads
)
grads
=
self
.
map_
(
F
.
partial
(
_cast_datatype
,
mstype
.
float32
),
grads
)
if
self
.
mean
:
new_grad
=
self
.
map_
(
F
.
partial
(
reduce_opt
,
self
.
mul
,
self
.
degree
),
self
.
allreduce_filter
,
grads
)
else
:
new_grad
=
self
.
map_
(
F
.
partial
(
reduce_opt
),
self
.
allreduce_filter
,
grads
)
new_grad
=
self
.
map_
(
F
.
partial
(
reduce_opt
,
self
.
degree
,
self
.
mean
,
self
.
allgather
),
self
.
allreduce_filter
,
grads
,
self
.
opt_list
)
new_grad
=
self
.
map_
(
F
.
partial
(
_cast_datatype
),
datatypes
,
new_grad
)
return
new_grad
mindspore/parallel/_auto_parallel_context.py
浏览文件 @
39f08eb7
...
...
@@ -513,7 +513,7 @@ def _set_auto_parallel_context(**kwargs):
strategy_ckpt_load_file (str): The path to load parallel strategy checkpoint. Default: ''
strategy_ckpt_save_file (str): The path to save parallel strategy checkpoint. Default: ''
full_batch (bool): Whether to load the whole batch on each device. Default: False.
enable_parallel_optimizer (bool): Enable using optimizer segmentation or no
e
. Default: False.
enable_parallel_optimizer (bool): Enable using optimizer segmentation or no
t
. Default: False.
Raises:
ValueError: If input key is not attribute in auto parallel context.
...
...
tests/ut/python/parallel/test_parallel_optimizer.py
浏览文件 @
39f08eb7
...
...
@@ -22,7 +22,6 @@ from mindspore.common.api import _executor
from
mindspore.nn
import
TrainOneStepCell
,
WithLossCell
from
mindspore.nn.optim
import
Adam
,
AdamWeightDecay
,
AdamWeightDecayDynamicLR
,
Lamb
from
mindspore.ops
import
operations
as
P
from
mindspore.parallel._auto_parallel_context
import
auto_parallel_context
from
mindspore
import
context
...
...
@@ -54,8 +53,7 @@ class Net(nn.Cell):
def
test_AdamWeightDecayDynamicLR
():
""" test_AdamWeightDecayDynamicLR """
auto_parallel_context
().
set_enable_parallel_optimizer
(
True
)
context
.
set_auto_parallel_context
(
parallel_mode
=
"data_parallel"
,
device_num
=
2
)
context
.
set_auto_parallel_context
(
parallel_mode
=
"data_parallel"
,
device_num
=
2
,
enable_parallel_optimizer
=
True
)
inputs
=
Tensor
(
np
.
ones
([
32
,
128
]).
astype
(
np
.
float32
))
label
=
Tensor
(
np
.
zeros
([
32
,
768
]).
astype
(
np
.
float32
))
net
=
Net
()
...
...
@@ -70,8 +68,7 @@ def test_AdamWeightDecayDynamicLR():
def
test_AdamWeightDecay
():
""" test_AdamWeightDecayDynamicLR """
auto_parallel_context
().
set_enable_parallel_optimizer
(
True
)
context
.
set_auto_parallel_context
(
parallel_mode
=
"data_parallel"
,
device_num
=
2
)
context
.
set_auto_parallel_context
(
parallel_mode
=
"data_parallel"
,
device_num
=
2
,
enable_parallel_optimizer
=
True
)
inputs
=
Tensor
(
np
.
ones
([
32
,
128
]).
astype
(
np
.
float32
))
label
=
Tensor
(
np
.
zeros
([
32
,
768
]).
astype
(
np
.
float32
))
net
=
Net
()
...
...
@@ -86,8 +83,7 @@ def test_AdamWeightDecay():
def
test_lamb_compile
():
""" test_Lamb_compile """
auto_parallel_context
().
set_enable_parallel_optimizer
(
True
)
context
.
set_auto_parallel_context
(
parallel_mode
=
"auto_parallel"
,
device_num
=
2
)
context
.
set_auto_parallel_context
(
parallel_mode
=
"data_parallel"
,
device_num
=
2
,
enable_parallel_optimizer
=
True
)
inputs
=
Tensor
(
np
.
ones
([
32
,
128
]).
astype
(
np
.
float32
))
label
=
Tensor
(
np
.
zeros
([
32
,
768
]).
astype
(
np
.
float32
))
net
=
Net
()
...
...
@@ -102,7 +98,7 @@ def test_lamb_compile():
def
test_edge_case
():
""" test_edge_case """
auto_parallel_context
().
set_enable_parallel_optimizer
(
True
)
context
.
set_auto_parallel_context
(
enable_parallel_optimizer
=
True
)
net
=
Net
()
with
pytest
.
raises
(
RuntimeError
):
context
.
set_auto_parallel_context
(
parallel_mode
=
"stand_alone"
)
...
...
tests/ut/python/parallel/test_set_auto_parallel_context.py
浏览文件 @
39f08eb7
...
...
@@ -81,8 +81,8 @@ def test_set_auto_parallel_context():
with
pytest
.
raises
(
ValueError
):
set_algo_parameters
(
tensor_slice_align_size
=
1025
)
auto_parallel_context
().
set_enable_parallel_optimizer
(
True
)
assert
auto_parallel_context
().
get_enable_parallel_optimizer
()
is
True
context
.
set_auto_parallel_context
(
enable_parallel_optimizer
=
True
)
assert
context
.
get_auto_parallel_context
(
"enable_parallel_optimizer"
)
assert
not
auto_parallel_context
().
get_all_reduce_fusion_split_indices
()
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录