Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
Paddle
提交
8089a1fb
P
Paddle
项目概览
PaddlePaddle
/
Paddle
1 年多 前同步成功
通知
2302
Star
20931
Fork
5422
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1423
列表
看板
标记
里程碑
合并请求
543
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1,423
Issue
1,423
列表
看板
标记
里程碑
合并请求
543
合并请求
543
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
8089a1fb
编写于
9月 27, 2022
作者:
L
LiYuRio
提交者:
GitHub
9月 27, 2022
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
change use_calc_stream to sync_op (#46182) (#46493)
上级
536d9d8c
变更
18
隐藏空白更改
内联
并排
Showing
18 changed file
with
125 addition
and
135 deletion
+125
-135
python/paddle/distributed/auto_parallel/process_group.py
python/paddle/distributed/auto_parallel/process_group.py
+1
-1
python/paddle/distributed/collective.py
python/paddle/distributed/collective.py
+63
-60
python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/dygraph_sharding_optimizer.py
...ptimizers/dygraph_optimizer/dygraph_sharding_optimizer.py
+1
-1
python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/sharding_optimizer_stage2.py
...optimizers/dygraph_optimizer/sharding_optimizer_stage2.py
+2
-2
python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py
...ddle/distributed/fleet/meta_parallel/pipeline_parallel.py
+4
-4
python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_optimizer_stage2.py
.../meta_parallel/sharding/group_sharded_optimizer_stage2.py
+2
-2
python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage2.py
...uted/fleet/meta_parallel/sharding/group_sharded_stage2.py
+1
-1
python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage3.py
...uted/fleet/meta_parallel/sharding/group_sharded_stage3.py
+2
-2
python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage2.py
...stributed/fleet/meta_parallel/sharding/sharding_stage2.py
+3
-3
python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage3.py
...stributed/fleet/meta_parallel/sharding/sharding_stage3.py
+4
-4
python/paddle/distributed/fleet/utils/hybrid_parallel_util.py
...on/paddle/distributed/fleet/utils/hybrid_parallel_util.py
+3
-3
python/paddle/fluid/dygraph/parallel.py
python/paddle/fluid/dygraph/parallel.py
+1
-1
python/paddle/fluid/tests/unittests/collective/collective_allreduce_new_group_api.py
...nittests/collective/collective_allreduce_new_group_api.py
+1
-3
python/paddle/fluid/tests/unittests/collective/collective_alltoall_single.py
.../tests/unittests/collective/collective_alltoall_single.py
+1
-1
python/paddle/fluid/tests/unittests/collective/collective_reduce_scatter.py
...d/tests/unittests/collective/collective_reduce_scatter.py
+3
-2
python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_communicate_group.py
...sts/collective/fleet/hybrid_parallel_communicate_group.py
+5
-8
python/paddle/fluid/tests/unittests/collective/fleet/new_group.py
...addle/fluid/tests/unittests/collective/fleet/new_group.py
+5
-8
python/paddle/fluid/tests/unittests/collective/process_group_nccl.py
...le/fluid/tests/unittests/collective/process_group_nccl.py
+23
-29
未找到文件。
python/paddle/distributed/auto_parallel/process_group.py
浏览文件 @
8089a1fb
...
@@ -151,7 +151,7 @@ class ProcessGroup:
...
@@ -151,7 +151,7 @@ class ProcessGroup:
tmp
=
paddle
.
to_tensor
(
tmp
=
paddle
.
to_tensor
(
[
1
],
dtype
=
"int32"
)
if
_non_static_mode
()
else
fill_constant
(
[
1
],
dtype
=
"int32"
)
if
_non_static_mode
()
else
fill_constant
(
[
0
],
dtype
=
"int32"
,
value
=
"1"
)
[
0
],
dtype
=
"int32"
,
value
=
"1"
)
paddle
.
distributed
.
all_reduce
(
tmp
,
use_calc_stream
=
True
,
group
=
self
)
paddle
.
distributed
.
all_reduce
(
tmp
,
sync_op
=
True
,
group
=
self
)
paddle
.
distributed
.
wait
(
tmp
,
group
=
self
)
paddle
.
distributed
.
wait
(
tmp
,
group
=
self
)
paddle
.
enable_static
()
paddle
.
enable_static
()
...
...
python/paddle/distributed/collective.py
浏览文件 @
8089a1fb
...
@@ -414,7 +414,7 @@ def new_group(ranks=None, backend=None, timeout=_default_timeout):
...
@@ -414,7 +414,7 @@ def new_group(ranks=None, backend=None, timeout=_default_timeout):
paddle.distributed.init_parallel_env()
paddle.distributed.init_parallel_env()
tindata = paddle.randn(shape=[2, 3])
tindata = paddle.randn(shape=[2, 3])
gp = paddle.distributed.new_group([2,4,6])
gp = paddle.distributed.new_group([2,4,6])
paddle.distributed.all_reduce(tindata, group=gp,
use_calc_stream
=False)
paddle.distributed.all_reduce(tindata, group=gp,
sync_op
=False)
"""
"""
global
_custom_gid
global
_custom_gid
...
@@ -521,7 +521,7 @@ def new_group(ranks=None, backend=None, timeout=_default_timeout):
...
@@ -521,7 +521,7 @@ def new_group(ranks=None, backend=None, timeout=_default_timeout):
tmp
=
paddle
.
to_tensor
(
tmp
=
paddle
.
to_tensor
(
[
1
],
dtype
=
"int32"
)
if
_non_static_mode
()
else
fill_constant
(
[
1
],
dtype
=
"int32"
)
if
_non_static_mode
()
else
fill_constant
(
[
0
],
dtype
=
"int32"
,
value
=
"1"
)
[
0
],
dtype
=
"int32"
,
value
=
"1"
)
paddle
.
distributed
.
all_reduce
(
tmp
,
use_calc_stream
=
True
)
paddle
.
distributed
.
all_reduce
(
tmp
,
sync_op
=
True
)
paddle
.
distributed
.
wait
(
tmp
)
paddle
.
distributed
.
wait
(
tmp
)
return
gp
return
gp
...
@@ -617,7 +617,7 @@ def wait(tensor, group=None, use_calc_stream=True):
...
@@ -617,7 +617,7 @@ def wait(tensor, group=None, use_calc_stream=True):
paddle.distributed.init_parallel_env()
paddle.distributed.init_parallel_env()
tindata = paddle.randn(shape=[2, 3])
tindata = paddle.randn(shape=[2, 3])
paddle.distributed.all_reduce(tindata,
use_calc_stream
=True)
paddle.distributed.all_reduce(tindata,
sync_op
=True)
paddle.distributed.wait(tindata)
paddle.distributed.wait(tindata)
"""
"""
...
@@ -665,7 +665,7 @@ def _sync_comm_stream(tensor, ring_id=0):
...
@@ -665,7 +665,7 @@ def _sync_comm_stream(tensor, ring_id=0):
)
)
def
broadcast
(
tensor
,
src
,
group
=
None
,
use_calc_stream
=
True
):
def
broadcast
(
tensor
,
src
,
group
=
None
,
sync_op
=
True
):
"""
"""
Broadcast a tensor from the source to all others.
Broadcast a tensor from the source to all others.
...
@@ -681,9 +681,8 @@ def broadcast(tensor, src, group=None, use_calc_stream=True):
...
@@ -681,9 +681,8 @@ def broadcast(tensor, src, group=None, use_calc_stream=True):
tensor (Tensor): The Tensor to send if current rank is the source, or the Tensor to receive otherwise. Its data type
tensor (Tensor): The Tensor to send if current rank is the source, or the Tensor to receive otherwise. Its data type
should be float16, float32, float64, int32, int64, int8, uint8 or bool.
should be float16, float32, float64, int32, int64, int8, uint8 or bool.
src (int): The source rank.
src (int): The source rank.
group (Group): The group instance return by new_group or None for global default group.
group (Group, optional): The group instance return by new_group or None for global default group.
use_calc_stream (bool): Wether to use calculation stream (True) or communication stream (False).
sync_op (bool, optional): Whether this op is a sync op. The default value is True.
Default to True.
Returns:
Returns:
None.
None.
...
@@ -716,12 +715,13 @@ def broadcast(tensor, src, group=None, use_calc_stream=True):
...
@@ -716,12 +715,13 @@ def broadcast(tensor, src, group=None, use_calc_stream=True):
gsrc
=
group
.
get_group_rank
(
src
)
gsrc
=
group
.
get_group_rank
(
src
)
assert
gsrc
>=
0
,
(
"src rank out of group, need global rank"
)
assert
gsrc
>=
0
,
(
"src rank out of group, need global rank"
)
task
=
group
.
process_group
.
broadcast
(
tensor
,
gsrc
)
task
=
group
.
process_group
.
broadcast
(
tensor
,
gsrc
)
if
use_calc_stream
:
if
sync_op
:
task
.
wait
()
task
.
wait
()
return
None
return
None
else
:
else
:
return
task
return
task
use_calc_stream
=
sync_op
ring_id
=
ring_id
=
0
if
group
is
None
else
group
.
id
ring_id
=
ring_id
=
0
if
group
is
None
else
group
.
id
gsrc
=
src
if
group
is
None
else
group
.
get_group_rank
(
src
)
gsrc
=
src
if
group
is
None
else
group
.
get_group_rank
(
src
)
assert
gsrc
>=
0
,
(
"src rank out of group, need global rank"
)
assert
gsrc
>=
0
,
(
"src rank out of group, need global rank"
)
...
@@ -748,7 +748,7 @@ def broadcast(tensor, src, group=None, use_calc_stream=True):
...
@@ -748,7 +748,7 @@ def broadcast(tensor, src, group=None, use_calc_stream=True):
})
})
def
all_reduce
(
tensor
,
op
=
ReduceOp
.
SUM
,
group
=
None
,
use_calc_stream
=
True
):
def
all_reduce
(
tensor
,
op
=
ReduceOp
.
SUM
,
group
=
None
,
sync_op
=
True
):
"""
"""
Reduce a tensor over all ranks so that all get the result.
Reduce a tensor over all ranks so that all get the result.
...
@@ -764,10 +764,9 @@ def all_reduce(tensor, op=ReduceOp.SUM, group=None, use_calc_stream=True):
...
@@ -764,10 +764,9 @@ def all_reduce(tensor, op=ReduceOp.SUM, group=None, use_calc_stream=True):
Args:
Args:
tensor (Tensor): The input Tensor. It also works as the output Tensor. Its data type
tensor (Tensor): The input Tensor. It also works as the output Tensor. Its data type
should be float16, float32, float64, int32, int64, int8, uint8 or bool.
should be float16, float32, float64, int32, int64, int8, uint8 or bool.
op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD): Optional. The operation used. Default value is ReduceOp.SUM.
op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The operation used. Default value is ReduceOp.SUM.
group (Group): The group instance return by new_group or None for global default group.
group (Group, optional): The group instance return by new_group or None for global default group.
use_calc_stream (bool): Wether to use calculation stream (True) or communication stream (False).
sync_op (bool, optional): Wether this op is a sync op. Default value is True.
Default to True.
Returns:
Returns:
None.
None.
...
@@ -795,12 +794,13 @@ def all_reduce(tensor, op=ReduceOp.SUM, group=None, use_calc_stream=True):
...
@@ -795,12 +794,13 @@ def all_reduce(tensor, op=ReduceOp.SUM, group=None, use_calc_stream=True):
op_type
=
_get_reduce_op
(
op
,
"all_reduce"
)
op_type
=
_get_reduce_op
(
op
,
"all_reduce"
)
group
=
_get_default_group
()
if
group
is
None
else
group
group
=
_get_default_group
()
if
group
is
None
else
group
task
=
group
.
process_group
.
allreduce
(
tensor
,
op_type
)
task
=
group
.
process_group
.
allreduce
(
tensor
,
op_type
)
if
use_calc_stream
:
if
sync_op
:
task
.
wait
()
task
.
wait
()
return
None
return
None
else
:
else
:
return
task
return
task
use_calc_stream
=
sync_op
ring_id
=
0
if
group
is
None
else
group
.
id
ring_id
=
0
if
group
is
None
else
group
.
id
if
_non_static_mode
():
if
_non_static_mode
():
if
op
==
ReduceOp
.
SUM
:
if
op
==
ReduceOp
.
SUM
:
...
@@ -846,7 +846,7 @@ def all_reduce(tensor, op=ReduceOp.SUM, group=None, use_calc_stream=True):
...
@@ -846,7 +846,7 @@ def all_reduce(tensor, op=ReduceOp.SUM, group=None, use_calc_stream=True):
})
})
def
reduce
(
tensor
,
dst
,
op
=
ReduceOp
.
SUM
,
group
=
None
,
use_calc_stream
=
True
):
def
reduce
(
tensor
,
dst
,
op
=
ReduceOp
.
SUM
,
group
=
None
,
sync_op
=
True
):
"""
"""
Reduce a tensor to the destination from all others. As shown below, one process is started with a GPU and the data of this process is represented
Reduce a tensor to the destination from all others. As shown below, one process is started with a GPU and the data of this process is represented
...
@@ -862,10 +862,9 @@ def reduce(tensor, dst, op=ReduceOp.SUM, group=None, use_calc_stream=True):
...
@@ -862,10 +862,9 @@ def reduce(tensor, dst, op=ReduceOp.SUM, group=None, use_calc_stream=True):
tensor (Tensor): The output Tensor for the destination and the input Tensor otherwise. Its data type
tensor (Tensor): The output Tensor for the destination and the input Tensor otherwise. Its data type
should be float16, float32, float64, int32, int64, int8, uint8 or bool.
should be float16, float32, float64, int32, int64, int8, uint8 or bool.
dst (int): The destination rank id.
dst (int): The destination rank id.
op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD): Optional. The operation used. Default value is ReduceOp.SUM.
op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The operation used. Default value is ReduceOp.SUM.
group (Group): The group instance return by new_group or None for global default group.
group (Group, optional): The group instance return by new_group or None for global default group.
use_calc_stream (bool): Wether to use calculation stream (True) or communication stream (False).
sync_op (bool, optional): Whether this op is a sync op. The default value is True.
Default to True.
Returns:
Returns:
None.
None.
...
@@ -896,12 +895,13 @@ def reduce(tensor, dst, op=ReduceOp.SUM, group=None, use_calc_stream=True):
...
@@ -896,12 +895,13 @@ def reduce(tensor, dst, op=ReduceOp.SUM, group=None, use_calc_stream=True):
gdst
=
group
.
get_group_rank
(
dst
)
gdst
=
group
.
get_group_rank
(
dst
)
assert
gdst
>=
0
,
(
"dst rank out of group, need global rank"
)
assert
gdst
>=
0
,
(
"dst rank out of group, need global rank"
)
task
=
group
.
process_group
.
reduce
(
tensor
,
gdst
,
op_type
)
task
=
group
.
process_group
.
reduce
(
tensor
,
gdst
,
op_type
)
if
use_calc_stream
:
if
sync_op
:
task
.
wait
()
task
.
wait
()
return
None
return
None
else
:
else
:
return
task
return
task
use_calc_stream
=
sync_op
ring_id
=
0
if
group
is
None
else
group
.
id
ring_id
=
0
if
group
is
None
else
group
.
id
gdst
=
dst
if
group
is
None
else
group
.
get_group_rank
(
dst
)
gdst
=
dst
if
group
is
None
else
group
.
get_group_rank
(
dst
)
assert
gdst
>=
0
,
(
"dst rank out of group, need global rank"
)
assert
gdst
>=
0
,
(
"dst rank out of group, need global rank"
)
...
@@ -953,7 +953,7 @@ def reduce(tensor, dst, op=ReduceOp.SUM, group=None, use_calc_stream=True):
...
@@ -953,7 +953,7 @@ def reduce(tensor, dst, op=ReduceOp.SUM, group=None, use_calc_stream=True):
})
})
def
all_gather
(
tensor_list
,
tensor
,
group
=
None
,
use_calc_stream
=
True
):
def
all_gather
(
tensor_list
,
tensor
,
group
=
None
,
sync_op
=
True
):
"""
"""
Gather tensors from all participators and all get the result. As shown
Gather tensors from all participators and all get the result. As shown
...
@@ -971,9 +971,8 @@ def all_gather(tensor_list, tensor, group=None, use_calc_stream=True):
...
@@ -971,9 +971,8 @@ def all_gather(tensor_list, tensor, group=None, use_calc_stream=True):
should be float16, float32, float64, int32, int64, int8, uint8, bool, complex64 or complex128.
should be float16, float32, float64, int32, int64, int8, uint8, bool, complex64 or complex128.
tensor (Tensor): The Tensor to send. Its data type
tensor (Tensor): The Tensor to send. Its data type
should be float16, float32, float64, int32, int64, int8, uint8, bool, complex64 or complex128.
should be float16, float32, float64, int32, int64, int8, uint8, bool, complex64 or complex128.
group (Group): The group instance return by new_group or None for global default group.
group (Group, optional): The group instance return by new_group or None for global default group.
use_calc_stream (bool): Wether to use calculation stream (True) or communication stream (False).
sync_op (bool, optional): Whether this op is a sync op. The default value is True.
Default to True.
Returns:
Returns:
None.
None.
...
@@ -1027,6 +1026,7 @@ def all_gather(tensor_list, tensor, group=None, use_calc_stream=True):
...
@@ -1027,6 +1026,7 @@ def all_gather(tensor_list, tensor, group=None, use_calc_stream=True):
tensor_list
.
extend
(
list_of_tensor
)
tensor_list
.
extend
(
list_of_tensor
)
return
return
use_calc_stream
=
sync_op
ring_id
=
0
if
group
is
None
else
group
.
id
ring_id
=
0
if
group
is
None
else
group
.
id
nranks
=
_get_global_group
().
nranks
if
group
is
None
else
group
.
nranks
nranks
=
_get_global_group
().
nranks
if
group
is
None
else
group
.
nranks
...
@@ -1137,7 +1137,7 @@ def all_gather_object(object_list, obj, group=None):
...
@@ -1137,7 +1137,7 @@ def all_gather_object(object_list, obj, group=None):
_convert_tensor_to_object
(
tensor
,
list_len_of_tensor
[
i
]))
_convert_tensor_to_object
(
tensor
,
list_len_of_tensor
[
i
]))
def
scatter
(
tensor
,
tensor_list
=
None
,
src
=
0
,
group
=
None
,
use_calc_stream
=
True
):
def
scatter
(
tensor
,
tensor_list
=
None
,
src
=
0
,
group
=
None
,
sync_op
=
True
):
"""
"""
Scatter a tensor to all participators. As shown below, one process is started with a GPU and the source of the scatter
Scatter a tensor to all participators. As shown below, one process is started with a GPU and the source of the scatter
...
@@ -1154,9 +1154,8 @@ def scatter(tensor, tensor_list=None, src=0, group=None, use_calc_stream=True):
...
@@ -1154,9 +1154,8 @@ def scatter(tensor, tensor_list=None, src=0, group=None, use_calc_stream=True):
tensor_list (list|tuple): A list/tuple of Tensors to scatter. Every element in the list must be a Tensor whose data type
tensor_list (list|tuple): A list/tuple of Tensors to scatter. Every element in the list must be a Tensor whose data type
should be float16, float32, float64, int32, int64, int8, uint8 or bool. Default value is None.
should be float16, float32, float64, int32, int64, int8, uint8 or bool. Default value is None.
src (int): The source rank id. Default value is 0.
src (int): The source rank id. Default value is 0.
group (Group): The group instance return by new_group or None for global default group.
group (Group, optional): The group instance return by new_group or None for global default group.
use_calc_stream (bool): Wether to use calculation stream (True) or communication stream (False).
sync_op (bool, optional): Whether this op is a sync op. The default value is True.
Default to True.
Returns:
Returns:
None.
None.
...
@@ -1206,12 +1205,13 @@ def scatter(tensor, tensor_list=None, src=0, group=None, use_calc_stream=True):
...
@@ -1206,12 +1205,13 @@ def scatter(tensor, tensor_list=None, src=0, group=None, use_calc_stream=True):
temp
=
paddle
.
concat
(
tensor_list
,
axis
=
0
)
temp
=
paddle
.
concat
(
tensor_list
,
axis
=
0
)
if
in_dygraph_mode
():
if
in_dygraph_mode
():
task
=
group
.
process_group
.
scatter
(
temp
,
tensor
,
gsrc
)
task
=
group
.
process_group
.
scatter
(
temp
,
tensor
,
gsrc
)
if
use_calc_stream
:
if
sync_op
:
task
.
wait
()
task
.
wait
()
return
None
return
None
else
:
else
:
return
task
return
task
use_calc_stream
=
sync_op
if
_non_static_mode
():
if
_non_static_mode
():
return
_legacy_C_ops
.
c_scatter
(
temp
,
tensor
,
'use_calc_stream'
,
return
_legacy_C_ops
.
c_scatter
(
temp
,
tensor
,
'use_calc_stream'
,
use_calc_stream
,
'ring_id'
,
ring_id
,
use_calc_stream
,
'ring_id'
,
ring_id
,
...
@@ -1233,7 +1233,7 @@ def scatter(tensor, tensor_list=None, src=0, group=None, use_calc_stream=True):
...
@@ -1233,7 +1233,7 @@ def scatter(tensor, tensor_list=None, src=0, group=None, use_calc_stream=True):
})
})
def
alltoall
(
in_tensor_list
,
out_tensor_list
,
group
=
None
,
use_calc_stream
=
True
):
def
alltoall
(
in_tensor_list
,
out_tensor_list
,
group
=
None
,
sync_op
=
True
):
"""
"""
Scatter tensors in in_tensor_list to all participators averagely and gather the result tensors in out_tensor_list.
Scatter tensors in in_tensor_list to all participators averagely and gather the result tensors in out_tensor_list.
As shown below, the in_tensor_list in GPU0 includes 0_0 and 0_1, and GPU1 includes 1_0 and 1_1.
As shown below, the in_tensor_list in GPU0 includes 0_0 and 0_1, and GPU1 includes 1_0 and 1_1.
...
@@ -1251,8 +1251,8 @@ def alltoall(in_tensor_list, out_tensor_list, group=None, use_calc_stream=True):
...
@@ -1251,8 +1251,8 @@ def alltoall(in_tensor_list, out_tensor_list, group=None, use_calc_stream=True):
out_tensor_list (list): A list of output Tensors. The data type of its elements should be the same as the
out_tensor_list (list): A list of output Tensors. The data type of its elements should be the same as the
data type of the input Tensors.
data type of the input Tensors.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
use_calc_stream (bool, optional): Whether to use calculation stream (True) or communication stream. Default:
True.
sync_op (bool, optional): Whether this op is a sync op. The default value is
True.
Returns:
Returns:
None.
None.
...
@@ -1301,6 +1301,7 @@ def alltoall(in_tensor_list, out_tensor_list, group=None, use_calc_stream=True):
...
@@ -1301,6 +1301,7 @@ def alltoall(in_tensor_list, out_tensor_list, group=None, use_calc_stream=True):
out_tensor_list
.
extend
(
paddle
.
split
(
out
,
nranks
,
0
))
out_tensor_list
.
extend
(
paddle
.
split
(
out
,
nranks
,
0
))
return
return
use_calc_stream
=
sync_op
if
_non_static_mode
():
if
_non_static_mode
():
out
=
_legacy_C_ops
.
alltoall
(
temp
,
'use_calc_stream'
,
use_calc_stream
,
out
=
_legacy_C_ops
.
alltoall
(
temp
,
'use_calc_stream'
,
use_calc_stream
,
'ring_id'
,
ring_id
)
'ring_id'
,
ring_id
)
...
@@ -1339,7 +1340,7 @@ def alltoall_single(in_tensor,
...
@@ -1339,7 +1340,7 @@ def alltoall_single(in_tensor,
in_split_sizes
=
None
,
in_split_sizes
=
None
,
out_split_sizes
=
None
,
out_split_sizes
=
None
,
group
=
None
,
group
=
None
,
use_calc_stream
=
True
):
sync_op
=
True
):
"""
"""
Scatter a single input tensor to all participators and gather the received tensors in out_tensor.
Scatter a single input tensor to all participators and gather the received tensors in out_tensor.
...
@@ -1354,11 +1355,11 @@ def alltoall_single(in_tensor,
...
@@ -1354,11 +1355,11 @@ def alltoall_single(in_tensor,
out_split_sizes (list[int], optional): Split sizes of ``out_tensor`` for dim[0]. If not given, dim[0] of ``out_tensor``
out_split_sizes (list[int], optional): Split sizes of ``out_tensor`` for dim[0]. If not given, dim[0] of ``out_tensor``
must be divisible by group size and ``out_tensor`` will be gathered averagely from all participators. Default: None.
must be divisible by group size and ``out_tensor`` will be gathered averagely from all participators. Default: None.
group (Group, optional): The group instance return by ``new_group`` or None for global default group. Default: None.
group (Group, optional): The group instance return by ``new_group`` or None for global default group. Default: None.
use_calc_stream (bool, optional): Whether to use calculation stream (True) or communication stream. Default:
True.
sync_op (bool, optional): Whether this op is a sync op. The default value is
True.
Returns:
Returns:
None, if ``
use_calc_stream`` is set to ``True``; ``Task`` of ``group``, if ``use_calc_stream
`` is set to ``False``.
None, if ``
sync_op`` is set to ``True``; ``Task`` of ``group``, if ``sync_op
`` is set to ``False``.
Examples:
Examples:
.. code-block:: python
.. code-block:: python
...
@@ -1396,7 +1397,7 @@ def alltoall_single(in_tensor,
...
@@ -1396,7 +1397,7 @@ def alltoall_single(in_tensor,
output,
output,
in_split_sizes,
in_split_sizes,
out_split_sizes,
out_split_sizes,
use_calc_stream
=False,
sync_op
=False,
group=group)
group=group)
task.wait()
task.wait()
print(output)
print(output)
...
@@ -1419,7 +1420,7 @@ def alltoall_single(in_tensor,
...
@@ -1419,7 +1420,7 @@ def alltoall_single(in_tensor,
task
=
group
.
process_group
.
alltoall_single
(
in_tensor
,
out_tensor
,
task
=
group
.
process_group
.
alltoall_single
(
in_tensor
,
out_tensor
,
in_split_sizes
,
out_split_sizes
)
in_split_sizes
,
out_split_sizes
)
if
use_calc_stream
:
if
sync_op
:
task
.
wait
()
task
.
wait
()
return
return
else
:
else
:
...
@@ -1430,7 +1431,7 @@ def _get_group_rank(global_rank, group=None):
...
@@ -1430,7 +1431,7 @@ def _get_group_rank(global_rank, group=None):
return
global_rank
if
group
is
None
else
group
.
get_group_rank
(
global_rank
)
return
global_rank
if
group
is
None
else
group
.
get_group_rank
(
global_rank
)
def
send
(
tensor
,
dst
=
0
,
group
=
None
,
use_calc_stream
=
True
):
def
send
(
tensor
,
dst
=
0
,
group
=
None
,
sync_op
=
True
):
"""
"""
Send a tensor to the receiver.
Send a tensor to the receiver.
...
@@ -1439,8 +1440,8 @@ def send(tensor, dst=0, group=None, use_calc_stream=True):
...
@@ -1439,8 +1440,8 @@ def send(tensor, dst=0, group=None, use_calc_stream=True):
should be float16, float32, float64, int32, int64, int8, uint8 or bool.
should be float16, float32, float64, int32, int64, int8, uint8 or bool.
dst (int): The destination rank id.
dst (int): The destination rank id.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
use_calc_stream (bool, optional): Whether to use calculate stream or communication stream. Default:
True.
sync_op (bool, optional): Whether this op is a sync op. The default value is
True.
Returns:
Returns:
None.
None.
...
@@ -1469,12 +1470,13 @@ def send(tensor, dst=0, group=None, use_calc_stream=True):
...
@@ -1469,12 +1470,13 @@ def send(tensor, dst=0, group=None, use_calc_stream=True):
backend
=
_group_map_backend
[
group
]
backend
=
_group_map_backend
[
group
]
assert
backend
!=
'gloo'
,
(
"backend gloo is not supported yet"
)
assert
backend
!=
'gloo'
,
(
"backend gloo is not supported yet"
)
task
=
group
.
process_group
.
send
(
tensor
,
dst
)
task
=
group
.
process_group
.
send
(
tensor
,
dst
)
if
use_calc_stream
:
if
sync_op
:
task
.
wait
()
task
.
wait
()
return
None
return
None
else
:
else
:
return
task
return
task
use_calc_stream
=
sync_op
ring_id
=
0
if
group
is
None
else
group
.
id
ring_id
=
0
if
group
is
None
else
group
.
id
if
_non_static_mode
():
if
_non_static_mode
():
...
@@ -1495,7 +1497,7 @@ def send(tensor, dst=0, group=None, use_calc_stream=True):
...
@@ -1495,7 +1497,7 @@ def send(tensor, dst=0, group=None, use_calc_stream=True):
})
})
def
recv
(
tensor
,
src
=
0
,
group
=
None
,
use_calc_stream
=
True
):
def
recv
(
tensor
,
src
=
0
,
group
=
None
,
sync_op
=
True
):
"""
"""
Receive a tensor to the sender.
Receive a tensor to the sender.
...
@@ -1504,8 +1506,8 @@ def recv(tensor, src=0, group=None, use_calc_stream=True):
...
@@ -1504,8 +1506,8 @@ def recv(tensor, src=0, group=None, use_calc_stream=True):
should be float16, float32, float64, int32, int64, int8, uint8 or bool.
should be float16, float32, float64, int32, int64, int8, uint8 or bool.
src (int): The source rank id.
src (int): The source rank id.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
use_calc_stream (bool, optional): Whether to use calculate stream or communication stream. Default:
True.
sync_op (bool, optional): Whether this op is a sync op. The default value is
True.
Returns:
Returns:
None.
None.
...
@@ -1535,12 +1537,13 @@ def recv(tensor, src=0, group=None, use_calc_stream=True):
...
@@ -1535,12 +1537,13 @@ def recv(tensor, src=0, group=None, use_calc_stream=True):
backend
=
_group_map_backend
[
group
]
backend
=
_group_map_backend
[
group
]
assert
backend
!=
'gloo'
,
(
"backend gloo is not supported yet"
)
assert
backend
!=
'gloo'
,
(
"backend gloo is not supported yet"
)
task
=
group
.
process_group
.
recv
(
tensor
,
src
)
task
=
group
.
process_group
.
recv
(
tensor
,
src
)
if
use_calc_stream
:
if
sync_op
:
task
.
wait
()
task
.
wait
()
return
None
return
None
else
:
else
:
return
task
return
task
use_calc_stream
=
sync_op
ring_id
=
0
if
group
is
None
else
group
.
id
ring_id
=
0
if
group
is
None
else
group
.
id
if
_non_static_mode
():
if
_non_static_mode
():
...
@@ -1811,7 +1814,7 @@ def reduce_scatter(tensor,
...
@@ -1811,7 +1814,7 @@ def reduce_scatter(tensor,
tensor_list
,
tensor_list
,
op
=
ReduceOp
.
SUM
,
op
=
ReduceOp
.
SUM
,
group
=
None
,
group
=
None
,
use_calc_stream
=
True
):
sync_op
=
True
):
"""
"""
Reduces, then scatters a list of tensors to all processes in a group
Reduces, then scatters a list of tensors to all processes in a group
...
@@ -1822,13 +1825,13 @@ def reduce_scatter(tensor,
...
@@ -1822,13 +1825,13 @@ def reduce_scatter(tensor,
op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD): Optional. The operation used. Default: ReduceOp.SUM.
op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD): Optional. The operation used. Default: ReduceOp.SUM.
group (Group, optional): The group instance return by new_group or None for global
group (Group, optional): The group instance return by new_group or None for global
default group. Default: None.
default group. Default: None.
use_calc_stream (bool, optional): Whether this op should be an async op
.
sync_op (bool, optional): Whether this op is a sync op. The default value is True
.
Returns:
Returns:
Async task handle, if
use_calc_stream
is set to False.
Async task handle, if
sync_op
is set to False.
None, if
use_calc_stream
or if not part of the group.
None, if
sync_op
or if not part of the group.
Warning:
Warning:
This API only supports the dygraph mode.
This API only supports the dygraph mode.
...
@@ -1866,7 +1869,7 @@ def reduce_scatter(tensor,
...
@@ -1866,7 +1869,7 @@ def reduce_scatter(tensor,
temp
=
paddle
.
concat
(
tensor_list
,
axis
=
0
)
temp
=
paddle
.
concat
(
tensor_list
,
axis
=
0
)
task
=
group
.
process_group
.
_reduce_scatter_base
(
tensor
,
temp
,
op_type
)
task
=
group
.
process_group
.
_reduce_scatter_base
(
tensor
,
temp
,
op_type
)
if
use_calc_stream
:
if
sync_op
:
task
.
wait
()
task
.
wait
()
return
None
return
None
else
:
else
:
...
@@ -1879,7 +1882,7 @@ def _reduce_scatter_base(output,
...
@@ -1879,7 +1882,7 @@ def _reduce_scatter_base(output,
input
,
input
,
op
=
ReduceOp
.
SUM
,
op
=
ReduceOp
.
SUM
,
group
=
None
,
group
=
None
,
use_calc_stream
=
True
):
sync_op
=
True
):
"""
"""
Reduces, then scatters a flattened tensor to all processes in a group.
Reduces, then scatters a flattened tensor to all processes in a group.
...
@@ -1890,11 +1893,11 @@ def _reduce_scatter_base(output,
...
@@ -1890,11 +1893,11 @@ def _reduce_scatter_base(output,
op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD): Optional. The operation used. Default: ReduceOp.SUM.
op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD): Optional. The operation used. Default: ReduceOp.SUM.
group (ProcessGroup, optional): The process group to work on. If None,
group (ProcessGroup, optional): The process group to work on. If None,
the default process group will be used.
the default process group will be used.
use_calc_stream (bool, optional): Wether to use calculation stream (True) or communication stream (False)
.
sync_op (bool, optional): Whether this op is a sync op. The default value is True
.
Default to True.
Returns:
Returns:
Async task handle, if
use_calc_stream
is set to False.
Async task handle, if
sync_op
is set to False.
None, if
use_calc_stream
or if not part of the group.
None, if
sync_op
or if not part of the group.
Examples:
Examples:
.. code-block:: python
.. code-block:: python
...
@@ -1925,7 +1928,7 @@ def _reduce_scatter_base(output,
...
@@ -1925,7 +1928,7 @@ def _reduce_scatter_base(output,
op_type
=
_get_reduce_op
(
op
,
"_reduce_scatter_base"
)
op_type
=
_get_reduce_op
(
op
,
"_reduce_scatter_base"
)
group
=
_get_default_group
()
if
group
is
None
else
group
group
=
_get_default_group
()
if
group
is
None
else
group
task
=
group
.
process_group
.
_reduce_scatter_base
(
output
,
input
,
op_type
)
task
=
group
.
process_group
.
_reduce_scatter_base
(
output
,
input
,
op_type
)
if
use_calc_stream
:
if
sync_op
:
task
.
wait
()
task
.
wait
()
return
None
return
None
else
:
else
:
...
...
python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/dygraph_sharding_optimizer.py
浏览文件 @
8089a1fb
...
@@ -146,7 +146,7 @@ class DygraphShardingOptimizer(object):
...
@@ -146,7 +146,7 @@ class DygraphShardingOptimizer(object):
# instead of the relative logic rank id within group
# instead of the relative logic rank id within group
src
=
self
.
_hcg
.
get_sharding_parallel_group
().
ranks
[
rank
],
src
=
self
.
_hcg
.
get_sharding_parallel_group
().
ranks
[
rank
],
group
=
self
.
_hcg
.
get_sharding_parallel_group
(),
group
=
self
.
_hcg
.
get_sharding_parallel_group
(),
use_calc_stream
=
True
)
sync_op
=
True
)
def
_update_trainable
(
self
):
def
_update_trainable
(
self
):
"""
"""
...
...
python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/sharding_optimizer_stage2.py
浏览文件 @
8089a1fb
...
@@ -150,7 +150,7 @@ class ShardingOptimizerStage2(Optimizer):
...
@@ -150,7 +150,7 @@ class ShardingOptimizerStage2(Optimizer):
broadcast
(
p
,
broadcast
(
p
,
src
=
self
.
_global_root_rank
,
src
=
self
.
_global_root_rank
,
group
=
self
.
group
,
group
=
self
.
group
,
use_calc_stream
=
True
)
sync_op
=
True
)
# Multi stream operation will be supported later
# Multi stream operation will be supported later
wait
(
tensor
=
p
,
group
=
self
.
group
,
use_calc_stream
=
True
)
wait
(
tensor
=
p
,
group
=
self
.
group
,
use_calc_stream
=
True
)
...
@@ -415,7 +415,7 @@ class ShardingOptimizerStage2(Optimizer):
...
@@ -415,7 +415,7 @@ class ShardingOptimizerStage2(Optimizer):
broadcast
(
tensor
=
internal_storage
.
buffer
,
broadcast
(
tensor
=
internal_storage
.
buffer
,
src
=
self
.
group
.
ranks
[
dst_rank
],
src
=
self
.
group
.
ranks
[
dst_rank
],
group
=
self
.
group
,
group
=
self
.
group
,
use_calc_stream
=
True
)
sync_op
=
True
)
# Multi stream operation will be supported later
# Multi stream operation will be supported later
wait
(
tensor
=
internal_storage
.
buffer
,
wait
(
tensor
=
internal_storage
.
buffer
,
...
...
python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py
浏览文件 @
8089a1fb
...
@@ -377,18 +377,18 @@ class PipelineParallel(MetaParallelBase):
...
@@ -377,18 +377,18 @@ class PipelineParallel(MetaParallelBase):
1
)
if
loss
.
dtype
==
paddle
.
float32
else
paddle
.
to_tensor
(
0
)
1
)
if
loss
.
dtype
==
paddle
.
float32
else
paddle
.
to_tensor
(
0
)
paddle
.
distributed
.
broadcast
(
is_fp32
,
paddle
.
distributed
.
broadcast
(
is_fp32
,
src
=
self
.
global_rank
,
src
=
self
.
global_rank
,
use_calc_stream
=
True
,
sync_op
=
True
,
group
=
self
.
pp_group
)
group
=
self
.
pp_group
)
paddle
.
distributed
.
broadcast
(
loss
,
paddle
.
distributed
.
broadcast
(
loss
,
src
=
self
.
global_rank
,
src
=
self
.
global_rank
,
use_calc_stream
=
True
,
sync_op
=
True
,
group
=
self
.
pp_group
)
group
=
self
.
pp_group
)
else
:
else
:
is_fp32
=
paddle
.
to_tensor
(
1
)
is_fp32
=
paddle
.
to_tensor
(
1
)
paddle
.
distributed
.
broadcast
(
paddle
.
distributed
.
broadcast
(
is_fp32
,
is_fp32
,
src
=
self
.
_hcg
.
get_rank_from_stage
(
self
.
num_stages
-
1
),
src
=
self
.
_hcg
.
get_rank_from_stage
(
self
.
num_stages
-
1
),
use_calc_stream
=
True
,
sync_op
=
True
,
group
=
self
.
pp_group
)
group
=
self
.
pp_group
)
loss
=
paddle
.
zeros
(
shape
=
[
loss
=
paddle
.
zeros
(
shape
=
[
1
1
...
@@ -397,7 +397,7 @@ class PipelineParallel(MetaParallelBase):
...
@@ -397,7 +397,7 @@ class PipelineParallel(MetaParallelBase):
paddle
.
distributed
.
broadcast
(
paddle
.
distributed
.
broadcast
(
loss
,
loss
,
src
=
self
.
_hcg
.
get_rank_from_stage
(
self
.
num_stages
-
1
),
src
=
self
.
_hcg
.
get_rank_from_stage
(
self
.
num_stages
-
1
),
use_calc_stream
=
True
,
sync_op
=
True
,
group
=
self
.
pp_group
)
group
=
self
.
pp_group
)
return
loss
return
loss
...
...
python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_optimizer_stage2.py
浏览文件 @
8089a1fb
...
@@ -155,7 +155,7 @@ class GroupShardedOptimizerStage2(Optimizer):
...
@@ -155,7 +155,7 @@ class GroupShardedOptimizerStage2(Optimizer):
broadcast
(
p
,
broadcast
(
p
,
src
=
self
.
_global_root_rank
,
src
=
self
.
_global_root_rank
,
group
=
self
.
_group
,
group
=
self
.
_group
,
use_calc_stream
=
True
)
sync_op
=
True
)
def
_generate_master_params
(
self
,
trainable_params
):
def
_generate_master_params
(
self
,
trainable_params
):
if
self
.
offload
:
if
self
.
offload
:
...
@@ -413,4 +413,4 @@ class GroupShardedOptimizerStage2(Optimizer):
...
@@ -413,4 +413,4 @@ class GroupShardedOptimizerStage2(Optimizer):
broadcast
(
tensor
=
internal_storage
.
buffer
,
broadcast
(
tensor
=
internal_storage
.
buffer
,
src
=
self
.
_group
.
ranks
[
dst_rank
],
src
=
self
.
_group
.
ranks
[
dst_rank
],
group
=
self
.
_group
,
group
=
self
.
_group
,
use_calc_stream
=
True
)
sync_op
=
True
)
python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage2.py
浏览文件 @
8089a1fb
...
@@ -287,7 +287,7 @@ class GroupShardedStage2(nn.Layer):
...
@@ -287,7 +287,7 @@ class GroupShardedStage2(nn.Layer):
collective
.
broadcast
(
buffer
,
collective
.
broadcast
(
buffer
,
self
.
_global_root_rank
,
self
.
_global_root_rank
,
self
.
_group
,
self
.
_group
,
use_calc_stream
=
True
)
sync_op
=
True
)
def
__getattr__
(
self
,
name
):
def
__getattr__
(
self
,
name
):
"""Forward missing attributes to wrapped layer."""
"""Forward missing attributes to wrapped layer."""
...
...
python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage3.py
浏览文件 @
8089a1fb
...
@@ -181,7 +181,7 @@ class GroupShardedStage3(nn.Layer):
...
@@ -181,7 +181,7 @@ class GroupShardedStage3(nn.Layer):
collective
.
broadcast
(
p
,
collective
.
broadcast
(
p
,
src
=
self
.
_global_root_rank
,
src
=
self
.
_global_root_rank
,
group
=
self
.
_group
,
group
=
self
.
_group
,
use_calc_stream
=
True
)
sync_op
=
True
)
def
_clear_gradients
(
self
):
def
_clear_gradients
(
self
):
assert
len
(
self
.
_trainable_params
.
keys
())
>
0
assert
len
(
self
.
_trainable_params
.
keys
())
>
0
...
@@ -446,7 +446,7 @@ class GroupShardedStage3(nn.Layer):
...
@@ -446,7 +446,7 @@ class GroupShardedStage3(nn.Layer):
collective
.
broadcast
(
buffer
,
collective
.
broadcast
(
buffer
,
self
.
_global_root_rank
,
self
.
_global_root_rank
,
self
.
_group
,
self
.
_group
,
use_calc_stream
=
True
)
sync_op
=
True
)
def
__getattr__
(
self
,
name
):
def
__getattr__
(
self
,
name
):
"""Forward missing attributes to wrapped layer."""
"""Forward missing attributes to wrapped layer."""
...
...
python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage2.py
浏览文件 @
8089a1fb
...
@@ -285,7 +285,7 @@ class ShardingStage2(nn.Layer):
...
@@ -285,7 +285,7 @@ class ShardingStage2(nn.Layer):
dist
.
broadcast
(
buffer
,
dist
.
broadcast
(
buffer
,
self
.
_global_root_rank
,
self
.
_global_root_rank
,
self
.
_group
,
self
.
_group
,
use_calc_stream
=
True
)
sync_op
=
True
)
# Multi stream operation will be supported later
# Multi stream operation will be supported later
dist
.
wait
(
tensor
=
buffer
,
group
=
self
.
_group
,
use_calc_stream
=
True
)
dist
.
wait
(
tensor
=
buffer
,
group
=
self
.
_group
,
use_calc_stream
=
True
)
...
@@ -340,7 +340,7 @@ class ShardingStage2(nn.Layer):
...
@@ -340,7 +340,7 @@ class ShardingStage2(nn.Layer):
tensor
=
param
.
grad
,
tensor
=
param
.
grad
,
dst
=
self
.
_group
.
ranks
[
dst_rank
],
dst
=
self
.
_group
.
ranks
[
dst_rank
],
group
=
self
.
_group
,
group
=
self
.
_group
,
use_calc_stream
=
True
),
sync_op
=
True
),
callback
=
cleanup
))
callback
=
cleanup
))
# Multi stream operation will be supported later
# Multi stream operation will be supported later
...
@@ -396,7 +396,7 @@ class ShardingStage2(nn.Layer):
...
@@ -396,7 +396,7 @@ class ShardingStage2(nn.Layer):
tensor
=
grad_storage
.
buffer
,
tensor
=
grad_storage
.
buffer
,
dst
=
self
.
_group
.
ranks
[
grad_storage
.
destination
],
dst
=
self
.
_group
.
ranks
[
grad_storage
.
destination
],
group
=
self
.
_group
,
group
=
self
.
_group
,
use_calc_stream
=
True
),
sync_op
=
True
),
callback
=
cleanup
))
callback
=
cleanup
))
# Multi stream operation will be supported later
# Multi stream operation will be supported later
...
...
python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage3.py
浏览文件 @
8089a1fb
...
@@ -170,7 +170,7 @@ class ShardingStage3(nn.Layer):
...
@@ -170,7 +170,7 @@ class ShardingStage3(nn.Layer):
dist
.
broadcast
(
p
,
dist
.
broadcast
(
p
,
src
=
self
.
_global_root_rank
,
src
=
self
.
_global_root_rank
,
group
=
self
.
_group
,
group
=
self
.
_group
,
use_calc_stream
=
True
)
sync_op
=
True
)
# Multi stream operation will be supported later
# Multi stream operation will be supported later
dist
.
wait
(
tensor
=
p
,
group
=
self
.
_group
,
use_calc_stream
=
True
)
dist
.
wait
(
tensor
=
p
,
group
=
self
.
_group
,
use_calc_stream
=
True
)
...
@@ -435,7 +435,7 @@ class ShardingStage3(nn.Layer):
...
@@ -435,7 +435,7 @@ class ShardingStage3(nn.Layer):
dist
.
broadcast
(
buffer
,
dist
.
broadcast
(
buffer
,
self
.
_global_root_rank
,
self
.
_global_root_rank
,
self
.
_group
,
self
.
_group
,
use_calc_stream
=
True
)
sync_op
=
True
)
# Multi stream operation will be supported later
# Multi stream operation will be supported later
dist
.
wait
(
tensor
=
buffer
,
group
=
self
.
_group
,
use_calc_stream
=
True
)
dist
.
wait
(
tensor
=
buffer
,
group
=
self
.
_group
,
use_calc_stream
=
True
)
...
@@ -478,7 +478,7 @@ class ShardingStage3(nn.Layer):
...
@@ -478,7 +478,7 @@ class ShardingStage3(nn.Layer):
grad_storage
.
buffer
.
scale_
(
scale
=
self
.
_world_size_scaling
)
grad_storage
.
buffer
.
scale_
(
scale
=
self
.
_world_size_scaling
)
dist
.
all_reduce
(
tensor
=
grad_storage
.
buffer
,
dist
.
all_reduce
(
tensor
=
grad_storage
.
buffer
,
group
=
self
.
_group
,
group
=
self
.
_group
,
use_calc_stream
=
True
)
sync_op
=
True
)
dist
.
wait
(
tensor
=
grad_storage
.
buffer
,
dist
.
wait
(
tensor
=
grad_storage
.
buffer
,
group
=
self
.
_group
,
group
=
self
.
_group
,
use_calc_stream
=
True
)
use_calc_stream
=
True
)
...
@@ -541,7 +541,7 @@ class ShardingStage3(nn.Layer):
...
@@ -541,7 +541,7 @@ class ShardingStage3(nn.Layer):
# Only support sync allreduce current rank's layer now
# Only support sync allreduce current rank's layer now
dist
.
all_reduce
(
tensor
=
full_grad
,
dist
.
all_reduce
(
tensor
=
full_grad
,
group
=
self
.
_group
,
group
=
self
.
_group
,
use_calc_stream
=
True
)
sync_op
=
True
)
dist
.
wait
(
tensor
=
full_grad
,
dist
.
wait
(
tensor
=
full_grad
,
group
=
self
.
_group
,
group
=
self
.
_group
,
use_calc_stream
=
True
)
use_calc_stream
=
True
)
...
...
python/paddle/distributed/fleet/utils/hybrid_parallel_util.py
浏览文件 @
8089a1fb
...
@@ -94,7 +94,7 @@ def _broadcast_data_help(data, shape, dtype, hcg):
...
@@ -94,7 +94,7 @@ def _broadcast_data_help(data, shape, dtype, hcg):
paddle
.
distributed
.
broadcast
(
shape_gpu
,
paddle
.
distributed
.
broadcast
(
shape_gpu
,
src
=
src_rank
,
src
=
src_rank
,
group
=
model_parallel_group
,
group
=
model_parallel_group
,
use_calc_stream
=
True
)
sync_op
=
True
)
if
mp_rank
!=
0
:
if
mp_rank
!=
0
:
input_data
=
paddle
.
zeros
(
shape_gpu
,
dtype
=
dtype
)
input_data
=
paddle
.
zeros
(
shape_gpu
,
dtype
=
dtype
)
...
@@ -104,7 +104,7 @@ def _broadcast_data_help(data, shape, dtype, hcg):
...
@@ -104,7 +104,7 @@ def _broadcast_data_help(data, shape, dtype, hcg):
paddle
.
distributed
.
broadcast
(
input_data
,
paddle
.
distributed
.
broadcast
(
input_data
,
src
=
src_rank
,
src
=
src_rank
,
group
=
model_parallel_group
,
group
=
model_parallel_group
,
use_calc_stream
=
True
)
sync_op
=
True
)
if
mp_rank
!=
0
:
if
mp_rank
!=
0
:
if
in_dygraph_mode
():
if
in_dygraph_mode
():
...
@@ -186,7 +186,7 @@ def sharding_reduce_gradients(parameter_list, hcg):
...
@@ -186,7 +186,7 @@ def sharding_reduce_gradients(parameter_list, hcg):
paddle
.
distributed
.
all_reduce
(
paddle
.
distributed
.
all_reduce
(
param
.
grad
,
param
.
grad
,
group
=
hcg
.
get_sharding_parallel_group
(),
group
=
hcg
.
get_sharding_parallel_group
(),
use_calc_stream
=
True
)
sync_op
=
True
)
elif
_in_legacy_dygraph
():
elif
_in_legacy_dygraph
():
g_var
=
param
.
_grad_ivar
()
g_var
=
param
.
_grad_ivar
()
...
...
python/paddle/fluid/dygraph/parallel.py
浏览文件 @
8089a1fb
...
@@ -420,7 +420,7 @@ def sync_params_buffers(model,
...
@@ -420,7 +420,7 @@ def sync_params_buffers(model,
paddle
.
distributed
.
broadcast
(
coalesced_var
,
paddle
.
distributed
.
broadcast
(
coalesced_var
,
src
=
src_rank
,
src
=
src_rank
,
group
=
comm_group
,
group
=
comm_group
,
use_calc_stream
=
True
)
sync_op
=
True
)
for
coalesced_var
,
origin_vars
,
var_shapes
in
coalesced_vars
:
for
coalesced_var
,
origin_vars
,
var_shapes
in
coalesced_vars
:
var_len
=
[
np
.
prod
(
v_shape
)
for
v_shape
in
var_shapes
]
var_len
=
[
np
.
prod
(
v_shape
)
for
v_shape
in
var_shapes
]
...
...
python/paddle/fluid/tests/unittests/collective/collective_allreduce_new_group_api.py
浏览文件 @
8089a1fb
...
@@ -49,9 +49,7 @@ class TestCollectiveAllreduceNewGroupAPI(TestCollectiveAPIRunnerBase):
...
@@ -49,9 +49,7 @@ class TestCollectiveAllreduceNewGroupAPI(TestCollectiveAPIRunnerBase):
shape
=
[
10
,
1000
],
shape
=
[
10
,
1000
],
dtype
=
'float32'
)
dtype
=
'float32'
)
gp
=
paddle
.
distributed
.
new_group
([
0
,
1
])
gp
=
paddle
.
distributed
.
new_group
([
0
,
1
])
paddle
.
distributed
.
all_reduce
(
tindata
,
paddle
.
distributed
.
all_reduce
(
tindata
,
group
=
gp
,
sync_op
=
True
)
group
=
gp
,
use_calc_stream
=
True
)
return
[
tindata
]
return
[
tindata
]
...
...
python/paddle/fluid/tests/unittests/collective/collective_alltoall_single.py
浏览文件 @
8089a1fb
...
@@ -69,7 +69,7 @@ class TestCollectiveAllToAllSingle(unittest.TestCase):
...
@@ -69,7 +69,7 @@ class TestCollectiveAllToAllSingle(unittest.TestCase):
output
,
output
,
in_split_sizes
,
in_split_sizes
,
out_split_sizes
,
out_split_sizes
,
use_calc_stream
=
False
,
sync_op
=
False
,
group
=
group
)
group
=
group
)
task
.
wait
()
task
.
wait
()
...
...
python/paddle/fluid/tests/unittests/collective/collective_reduce_scatter.py
浏览文件 @
8089a1fb
...
@@ -83,8 +83,9 @@ class TestCollectiveReduceScatter(unittest.TestCase):
...
@@ -83,8 +83,9 @@ class TestCollectiveReduceScatter(unittest.TestCase):
# [1, 2, 3, 4] # Rank-1
# [1, 2, 3, 4] # Rank-1
output
=
paddle
.
empty
(
shape
=
[
2
],
dtype
=
input
.
dtype
)
output
=
paddle
.
empty
(
shape
=
[
2
],
dtype
=
input
.
dtype
)
task
=
paddle
.
distributed
.
collective
.
_reduce_scatter_base
(
task
=
paddle
.
distributed
.
collective
.
_reduce_scatter_base
(
output
,
output
,
input
,
use_calc_stream
=
False
)
input
,
sync_op
=
False
)
task
.
wait
()
task
.
wait
()
...
...
python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_communicate_group.py
浏览文件 @
8089a1fb
...
@@ -53,24 +53,21 @@ class TestNewGroupAPI(object):
...
@@ -53,24 +53,21 @@ class TestNewGroupAPI(object):
paddle
.
distributed
.
scatter
(
result
,
[
self
.
tensor2
,
self
.
tensor1
],
paddle
.
distributed
.
scatter
(
result
,
[
self
.
tensor2
,
self
.
tensor1
],
src
=
dp_src_rank
,
src
=
dp_src_rank
,
group
=
dp_gp
,
group
=
dp_gp
,
use_calc_stream
=
True
)
sync_op
=
True
)
if
dp_rank
==
0
:
if
dp_rank
==
0
:
assert
np
.
array_equal
(
result
,
self
.
tensor2
)
assert
np
.
array_equal
(
result
,
self
.
tensor2
)
elif
dp_rank
==
1
:
elif
dp_rank
==
1
:
assert
np
.
array_equal
(
result
,
self
.
tensor1
)
assert
np
.
array_equal
(
result
,
self
.
tensor1
)
print
(
"test scatter api ok"
)
print
(
"test scatter api ok"
)
paddle
.
distributed
.
broadcast
(
result
,
paddle
.
distributed
.
broadcast
(
result
,
src
=
1
,
group
=
dp_gp
,
sync_op
=
True
)
src
=
1
,
group
=
dp_gp
,
use_calc_stream
=
True
)
assert
np
.
array_equal
(
result
,
self
.
tensor1
)
assert
np
.
array_equal
(
result
,
self
.
tensor1
)
print
(
"test broadcast api ok"
)
print
(
"test broadcast api ok"
)
paddle
.
distributed
.
reduce
(
result
,
paddle
.
distributed
.
reduce
(
result
,
dst
=
dp_src_rank
,
dst
=
dp_src_rank
,
group
=
dp_gp
,
group
=
dp_gp
,
use_calc_stream
=
True
)
sync_op
=
True
)
if
dp_rank
==
0
:
if
dp_rank
==
0
:
assert
np
.
array_equal
(
result
,
paddle
.
add
(
self
.
tensor1
,
assert
np
.
array_equal
(
result
,
paddle
.
add
(
self
.
tensor1
,
self
.
tensor1
))
self
.
tensor1
))
...
@@ -78,7 +75,7 @@ class TestNewGroupAPI(object):
...
@@ -78,7 +75,7 @@ class TestNewGroupAPI(object):
assert
np
.
array_equal
(
result
,
self
.
tensor1
)
assert
np
.
array_equal
(
result
,
self
.
tensor1
)
print
(
"test reduce api ok"
)
print
(
"test reduce api ok"
)
paddle
.
distributed
.
all_reduce
(
result
,
use_calc_stream
=
True
)
paddle
.
distributed
.
all_reduce
(
result
,
sync_op
=
True
)
assert
np
.
array_equal
(
assert
np
.
array_equal
(
result
,
result
,
paddle
.
add
(
paddle
.
add
(
self
.
tensor1
,
self
.
tensor1
),
self
.
tensor1
))
paddle
.
add
(
paddle
.
add
(
self
.
tensor1
,
self
.
tensor1
),
self
.
tensor1
))
...
@@ -92,7 +89,7 @@ class TestNewGroupAPI(object):
...
@@ -92,7 +89,7 @@ class TestNewGroupAPI(object):
paddle
.
distributed
.
all_gather
(
result
,
paddle
.
distributed
.
all_gather
(
result
,
self
.
tensor1
,
self
.
tensor1
,
group
=
dp_gp
,
group
=
dp_gp
,
use_calc_stream
=
True
)
sync_op
=
True
)
assert
np
.
array_equal
(
result
[
0
],
self
.
tensor1
)
assert
np
.
array_equal
(
result
[
0
],
self
.
tensor1
)
assert
np
.
array_equal
(
result
[
1
],
self
.
tensor1
)
assert
np
.
array_equal
(
result
[
1
],
self
.
tensor1
)
print
(
"test all_gather api ok"
)
print
(
"test all_gather api ok"
)
...
...
python/paddle/fluid/tests/unittests/collective/fleet/new_group.py
浏览文件 @
8089a1fb
...
@@ -36,21 +36,18 @@ class TestNewGroupAPI(object):
...
@@ -36,21 +36,18 @@ class TestNewGroupAPI(object):
paddle
.
distributed
.
scatter
(
result
,
[
self
.
tensor2
,
self
.
tensor1
],
paddle
.
distributed
.
scatter
(
result
,
[
self
.
tensor2
,
self
.
tensor1
],
src
=
0
,
src
=
0
,
group
=
gp
,
group
=
gp
,
use_calc_stream
=
True
)
sync_op
=
True
)
if
gp
.
rank
==
0
:
if
gp
.
rank
==
0
:
assert
np
.
array_equal
(
result
,
self
.
tensor2
)
assert
np
.
array_equal
(
result
,
self
.
tensor2
)
elif
gp
.
rank
==
1
:
elif
gp
.
rank
==
1
:
assert
np
.
array_equal
(
result
,
self
.
tensor1
)
assert
np
.
array_equal
(
result
,
self
.
tensor1
)
print
(
"test scatter api ok"
)
print
(
"test scatter api ok"
)
paddle
.
distributed
.
broadcast
(
result
,
paddle
.
distributed
.
broadcast
(
result
,
src
=
1
,
group
=
gp
,
sync_op
=
True
)
src
=
1
,
group
=
gp
,
use_calc_stream
=
True
)
assert
np
.
array_equal
(
result
,
self
.
tensor1
)
assert
np
.
array_equal
(
result
,
self
.
tensor1
)
print
(
"test broadcast api ok"
)
print
(
"test broadcast api ok"
)
paddle
.
distributed
.
reduce
(
result
,
dst
=
0
,
group
=
gp
,
use_calc_stream
=
True
)
paddle
.
distributed
.
reduce
(
result
,
dst
=
0
,
group
=
gp
,
sync_op
=
True
)
if
gp
.
rank
==
0
:
if
gp
.
rank
==
0
:
assert
np
.
array_equal
(
result
,
paddle
.
add
(
self
.
tensor1
,
assert
np
.
array_equal
(
result
,
paddle
.
add
(
self
.
tensor1
,
self
.
tensor1
))
self
.
tensor1
))
...
@@ -58,7 +55,7 @@ class TestNewGroupAPI(object):
...
@@ -58,7 +55,7 @@ class TestNewGroupAPI(object):
assert
np
.
array_equal
(
result
,
self
.
tensor1
)
assert
np
.
array_equal
(
result
,
self
.
tensor1
)
print
(
"test reduce api ok"
)
print
(
"test reduce api ok"
)
paddle
.
distributed
.
all_reduce
(
result
,
use_calc_stream
=
True
)
paddle
.
distributed
.
all_reduce
(
result
,
sync_op
=
True
)
assert
np
.
array_equal
(
assert
np
.
array_equal
(
result
,
result
,
paddle
.
add
(
paddle
.
add
(
self
.
tensor1
,
self
.
tensor1
),
self
.
tensor1
))
paddle
.
add
(
paddle
.
add
(
self
.
tensor1
,
self
.
tensor1
),
self
.
tensor1
))
...
@@ -72,7 +69,7 @@ class TestNewGroupAPI(object):
...
@@ -72,7 +69,7 @@ class TestNewGroupAPI(object):
paddle
.
distributed
.
all_gather
(
result
,
paddle
.
distributed
.
all_gather
(
result
,
self
.
tensor1
,
self
.
tensor1
,
group
=
gp
,
group
=
gp
,
use_calc_stream
=
True
)
sync_op
=
True
)
assert
np
.
array_equal
(
result
[
0
],
self
.
tensor1
)
assert
np
.
array_equal
(
result
[
0
],
self
.
tensor1
)
assert
np
.
array_equal
(
result
[
1
],
self
.
tensor1
)
assert
np
.
array_equal
(
result
[
1
],
self
.
tensor1
)
print
(
"test all_gather api ok"
)
print
(
"test all_gather api ok"
)
...
...
python/paddle/fluid/tests/unittests/collective/process_group_nccl.py
浏览文件 @
8089a1fb
...
@@ -90,13 +90,13 @@ class TestProcessGroupFp32(unittest.TestCase):
...
@@ -90,13 +90,13 @@ class TestProcessGroupFp32(unittest.TestCase):
if
pg
.
rank
()
==
0
:
if
pg
.
rank
()
==
0
:
task
=
dist
.
all_reduce
(
tensor_x
,
task
=
dist
.
all_reduce
(
tensor_x
,
dist
.
ReduceOp
.
MAX
,
dist
.
ReduceOp
.
MAX
,
use_calc_stream
=
False
)
sync_op
=
False
)
task
.
wait
()
task
.
wait
()
assert
np
.
array_equal
(
tensor_x
,
max_result
)
assert
np
.
array_equal
(
tensor_x
,
max_result
)
else
:
else
:
task
=
dist
.
all_reduce
(
tensor_y
,
task
=
dist
.
all_reduce
(
tensor_y
,
dist
.
ReduceOp
.
MAX
,
dist
.
ReduceOp
.
MAX
,
use_calc_stream
=
False
)
sync_op
=
False
)
task
.
wait
()
task
.
wait
()
assert
np
.
array_equal
(
tensor_y
,
max_result
)
assert
np
.
array_equal
(
tensor_y
,
max_result
)
...
@@ -115,13 +115,13 @@ class TestProcessGroupFp32(unittest.TestCase):
...
@@ -115,13 +115,13 @@ class TestProcessGroupFp32(unittest.TestCase):
if
pg
.
rank
()
==
0
:
if
pg
.
rank
()
==
0
:
task
=
dist
.
all_reduce
(
tensor_x
,
task
=
dist
.
all_reduce
(
tensor_x
,
dist
.
ReduceOp
.
MIN
,
dist
.
ReduceOp
.
MIN
,
use_calc_stream
=
False
)
sync_op
=
False
)
task
.
wait
()
task
.
wait
()
assert
np
.
array_equal
(
tensor_x
,
min_result
)
assert
np
.
array_equal
(
tensor_x
,
min_result
)
else
:
else
:
task
=
dist
.
all_reduce
(
tensor_y
,
task
=
dist
.
all_reduce
(
tensor_y
,
dist
.
ReduceOp
.
MIN
,
dist
.
ReduceOp
.
MIN
,
use_calc_stream
=
False
)
sync_op
=
False
)
task
.
wait
()
task
.
wait
()
assert
np
.
array_equal
(
tensor_y
,
min_result
)
assert
np
.
array_equal
(
tensor_y
,
min_result
)
...
@@ -140,13 +140,13 @@ class TestProcessGroupFp32(unittest.TestCase):
...
@@ -140,13 +140,13 @@ class TestProcessGroupFp32(unittest.TestCase):
if
pg
.
rank
()
==
0
:
if
pg
.
rank
()
==
0
:
task
=
dist
.
all_reduce
(
tensor_x
,
task
=
dist
.
all_reduce
(
tensor_x
,
dist
.
ReduceOp
.
PROD
,
dist
.
ReduceOp
.
PROD
,
use_calc_stream
=
False
)
sync_op
=
False
)
task
.
wait
()
task
.
wait
()
assert
np
.
array_equal
(
tensor_x
,
prod_result
)
assert
np
.
array_equal
(
tensor_x
,
prod_result
)
else
:
else
:
task
=
dist
.
all_reduce
(
tensor_y
,
task
=
dist
.
all_reduce
(
tensor_y
,
dist
.
ReduceOp
.
PROD
,
dist
.
ReduceOp
.
PROD
,
use_calc_stream
=
False
)
sync_op
=
False
)
task
.
wait
()
task
.
wait
()
assert
np
.
array_equal
(
tensor_y
,
prod_result
)
assert
np
.
array_equal
(
tensor_y
,
prod_result
)
...
@@ -162,7 +162,7 @@ class TestProcessGroupFp32(unittest.TestCase):
...
@@ -162,7 +162,7 @@ class TestProcessGroupFp32(unittest.TestCase):
broadcast_result
=
paddle
.
assign
(
tensor_x
)
broadcast_result
=
paddle
.
assign
(
tensor_x
)
if
pg
.
rank
()
==
0
:
if
pg
.
rank
()
==
0
:
task
=
dist
.
broadcast
(
tensor_x
,
0
,
use_calc_stream
=
False
)
task
=
dist
.
broadcast
(
tensor_x
,
0
,
sync_op
=
False
)
task
.
synchronize
()
task
.
synchronize
()
paddle
.
device
.
cuda
.
synchronize
()
paddle
.
device
.
cuda
.
synchronize
()
assert
task
.
is_completed
()
assert
task
.
is_completed
()
...
@@ -205,9 +205,7 @@ class TestProcessGroupFp32(unittest.TestCase):
...
@@ -205,9 +205,7 @@ class TestProcessGroupFp32(unittest.TestCase):
paddle
.
empty_like
(
tensor_x
),
paddle
.
empty_like
(
tensor_x
),
paddle
.
empty_like
(
tensor_x
)
paddle
.
empty_like
(
tensor_x
)
]
]
task
=
dist
.
all_gather
(
tensor_out_list
,
task
=
dist
.
all_gather
(
tensor_out_list
,
tensor_y
,
sync_op
=
False
)
tensor_y
,
use_calc_stream
=
False
)
paddle
.
device
.
cuda
.
synchronize
()
paddle
.
device
.
cuda
.
synchronize
()
tensor_out
=
paddle
.
concat
(
tensor_out_list
)
tensor_out
=
paddle
.
concat
(
tensor_out_list
)
out_1
=
paddle
.
slice
(
tensor_out
,
[
0
],
[
0
],
[
out_shape
[
0
]
//
2
])
out_1
=
paddle
.
slice
(
tensor_out
,
[
0
],
[
0
],
[
out_shape
[
0
]
//
2
])
...
@@ -224,9 +222,7 @@ class TestProcessGroupFp32(unittest.TestCase):
...
@@ -224,9 +222,7 @@ class TestProcessGroupFp32(unittest.TestCase):
# rank 1
# rank 1
else
:
else
:
tensor_out_list
=
[]
tensor_out_list
=
[]
task
=
dist
.
all_gather
(
tensor_out_list
,
task
=
dist
.
all_gather
(
tensor_out_list
,
tensor_y
,
sync_op
=
False
)
tensor_y
,
use_calc_stream
=
False
)
paddle
.
device
.
cuda
.
synchronize
()
paddle
.
device
.
cuda
.
synchronize
()
tensor_out
=
paddle
.
concat
(
tensor_out_list
)
tensor_out
=
paddle
.
concat
(
tensor_out_list
)
out_1
=
paddle
.
slice
(
tensor_out
,
[
0
],
[
0
],
[
out_shape
[
0
]
//
2
])
out_1
=
paddle
.
slice
(
tensor_out
,
[
0
],
[
0
],
[
out_shape
[
0
]
//
2
])
...
@@ -310,11 +306,11 @@ class TestProcessGroupFp32(unittest.TestCase):
...
@@ -310,11 +306,11 @@ class TestProcessGroupFp32(unittest.TestCase):
tensor_y
=
paddle
.
to_tensor
(
y
)
tensor_y
=
paddle
.
to_tensor
(
y
)
sum_result
=
tensor_x
+
tensor_y
sum_result
=
tensor_x
+
tensor_y
if
pg
.
rank
()
==
0
:
if
pg
.
rank
()
==
0
:
task
=
dist
.
reduce
(
tensor_x
,
0
,
use_calc_stream
=
True
)
task
=
dist
.
reduce
(
tensor_x
,
0
,
sync_op
=
True
)
paddle
.
device
.
cuda
.
synchronize
()
paddle
.
device
.
cuda
.
synchronize
()
# rank 1
# rank 1
else
:
else
:
task
=
dist
.
reduce
(
tensor_y
,
0
,
use_calc_stream
=
False
)
task
=
dist
.
reduce
(
tensor_y
,
0
,
sync_op
=
False
)
task
.
wait
()
task
.
wait
()
paddle
.
device
.
cuda
.
synchronize
()
paddle
.
device
.
cuda
.
synchronize
()
if
pg
.
rank
()
==
0
:
if
pg
.
rank
()
==
0
:
...
@@ -335,14 +331,14 @@ class TestProcessGroupFp32(unittest.TestCase):
...
@@ -335,14 +331,14 @@ class TestProcessGroupFp32(unittest.TestCase):
task
=
dist
.
reduce
(
tensor_x
,
task
=
dist
.
reduce
(
tensor_x
,
0
,
0
,
dist
.
ReduceOp
.
MAX
,
dist
.
ReduceOp
.
MAX
,
use_calc_stream
=
False
)
sync_op
=
False
)
task
.
wait
()
task
.
wait
()
assert
np
.
array_equal
(
tensor_x
,
max_result
)
assert
np
.
array_equal
(
tensor_x
,
max_result
)
else
:
else
:
task
=
dist
.
reduce
(
tensor_y
,
task
=
dist
.
reduce
(
tensor_y
,
0
,
0
,
dist
.
ReduceOp
.
MAX
,
dist
.
ReduceOp
.
MAX
,
use_calc_stream
=
False
)
sync_op
=
False
)
task
.
wait
()
task
.
wait
()
print
(
"test reduce max api ok"
)
print
(
"test reduce max api ok"
)
...
@@ -361,14 +357,14 @@ class TestProcessGroupFp32(unittest.TestCase):
...
@@ -361,14 +357,14 @@ class TestProcessGroupFp32(unittest.TestCase):
task
=
dist
.
reduce
(
tensor_x
,
task
=
dist
.
reduce
(
tensor_x
,
0
,
0
,
dist
.
ReduceOp
.
MIN
,
dist
.
ReduceOp
.
MIN
,
use_calc_stream
=
False
)
sync_op
=
False
)
task
.
wait
()
task
.
wait
()
assert
np
.
array_equal
(
tensor_x
,
min_result
)
assert
np
.
array_equal
(
tensor_x
,
min_result
)
else
:
else
:
task
=
dist
.
reduce
(
tensor_y
,
task
=
dist
.
reduce
(
tensor_y
,
0
,
0
,
dist
.
ReduceOp
.
MIN
,
dist
.
ReduceOp
.
MIN
,
use_calc_stream
=
False
)
sync_op
=
False
)
task
.
wait
()
task
.
wait
()
print
(
"test reduce min api ok"
)
print
(
"test reduce min api ok"
)
...
@@ -387,14 +383,14 @@ class TestProcessGroupFp32(unittest.TestCase):
...
@@ -387,14 +383,14 @@ class TestProcessGroupFp32(unittest.TestCase):
task
=
dist
.
reduce
(
tensor_x
,
task
=
dist
.
reduce
(
tensor_x
,
0
,
0
,
dist
.
ReduceOp
.
PROD
,
dist
.
ReduceOp
.
PROD
,
use_calc_stream
=
False
)
sync_op
=
False
)
task
.
wait
()
task
.
wait
()
assert
np
.
array_equal
(
tensor_x
,
prod_result
)
assert
np
.
array_equal
(
tensor_x
,
prod_result
)
else
:
else
:
task
=
dist
.
reduce
(
tensor_y
,
task
=
dist
.
reduce
(
tensor_y
,
0
,
0
,
dist
.
ReduceOp
.
PROD
,
dist
.
ReduceOp
.
PROD
,
use_calc_stream
=
False
)
sync_op
=
False
)
task
.
wait
()
task
.
wait
()
print
(
"test reduce prod api ok"
)
print
(
"test reduce prod api ok"
)
...
@@ -408,14 +404,12 @@ class TestProcessGroupFp32(unittest.TestCase):
...
@@ -408,14 +404,12 @@ class TestProcessGroupFp32(unittest.TestCase):
tensor_y
=
paddle
.
to_tensor
(
y
)
tensor_y
=
paddle
.
to_tensor
(
y
)
if
pg
.
rank
()
==
0
:
if
pg
.
rank
()
==
0
:
in_1
,
in_2
=
paddle
.
split
(
tensor_x
,
2
)
in_1
,
in_2
=
paddle
.
split
(
tensor_x
,
2
)
task
=
dist
.
scatter
(
tensor_y
,
[
in_1
,
in_2
],
task
=
dist
.
scatter
(
tensor_y
,
[
in_1
,
in_2
],
0
,
sync_op
=
True
)
0
,
use_calc_stream
=
True
)
#task.wait()
#task.wait()
paddle
.
device
.
cuda
.
synchronize
()
paddle
.
device
.
cuda
.
synchronize
()
# rank 1
# rank 1
else
:
else
:
task
=
dist
.
scatter
(
tensor_y
,
[],
0
,
use_calc_stream
=
False
)
task
=
dist
.
scatter
(
tensor_y
,
[],
0
,
sync_op
=
False
)
task
.
wait
()
task
.
wait
()
paddle
.
device
.
cuda
.
synchronize
()
paddle
.
device
.
cuda
.
synchronize
()
out1
=
paddle
.
slice
(
tensor_x
,
[
0
],
[
0
],
[
self
.
shape
[
0
]])
out1
=
paddle
.
slice
(
tensor_x
,
[
0
],
[
0
],
[
self
.
shape
[
0
]])
...
@@ -436,10 +430,10 @@ class TestProcessGroupFp32(unittest.TestCase):
...
@@ -436,10 +430,10 @@ class TestProcessGroupFp32(unittest.TestCase):
tensor_y
=
paddle
.
to_tensor
(
y
)
tensor_y
=
paddle
.
to_tensor
(
y
)
if
pg
.
rank
()
==
0
:
if
pg
.
rank
()
==
0
:
task
=
dist
.
send
(
tensor_x
,
1
,
use_calc_stream
=
False
)
task
=
dist
.
send
(
tensor_x
,
1
,
sync_op
=
False
)
task
.
wait
()
task
.
wait
()
else
:
else
:
task
=
dist
.
recv
(
tensor_y
,
0
,
use_calc_stream
=
False
)
task
=
dist
.
recv
(
tensor_y
,
0
,
sync_op
=
False
)
task
.
wait
()
task
.
wait
()
assert
np
.
array_equal
(
tensor_y
,
tensor_x
)
assert
np
.
array_equal
(
tensor_y
,
tensor_x
)
...
@@ -454,9 +448,9 @@ class TestProcessGroupFp32(unittest.TestCase):
...
@@ -454,9 +448,9 @@ class TestProcessGroupFp32(unittest.TestCase):
tensor_y
=
paddle
.
to_tensor
(
y
)
tensor_y
=
paddle
.
to_tensor
(
y
)
if
pg
.
rank
()
==
0
:
if
pg
.
rank
()
==
0
:
task
=
dist
.
send
(
tensor_x
,
1
,
use_calc_stream
=
True
)
task
=
dist
.
send
(
tensor_x
,
1
,
sync_op
=
True
)
else
:
else
:
task
=
dist
.
recv
(
tensor_y
,
0
,
use_calc_stream
=
True
)
task
=
dist
.
recv
(
tensor_y
,
0
,
sync_op
=
True
)
assert
np
.
array_equal
(
tensor_y
,
tensor_x
)
assert
np
.
array_equal
(
tensor_y
,
tensor_x
)
print
(
"test send api ok"
)
print
(
"test send api ok"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录