Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
Paddle
提交
c6a084ef
P
Paddle
项目概览
BaiXuePrincess
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
c6a084ef
编写于
4月 20, 2022
作者:
L
lilong12
提交者:
GitHub
4月 20, 2022
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
be compatible with the old version of alltoall (#42007)
上级
a3c50c42
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
62 addition
and
4 deletion
+62
-4
python/paddle/distributed/collective.py
python/paddle/distributed/collective.py
+12
-4
python/paddle/fluid/tests/unittests/process_group_nccl.py
python/paddle/fluid/tests/unittests/process_group_nccl.py
+50
-0
未找到文件。
python/paddle/distributed/collective.py
浏览文件 @
c6a084ef
...
@@ -860,9 +860,12 @@ def all_gather(tensor_list, tensor, group=None, use_calc_stream=True):
...
@@ -860,9 +860,12 @@ def all_gather(tensor_list, tensor, group=None, use_calc_stream=True):
if
in_dygraph_mode
():
if
in_dygraph_mode
():
group
=
_get_default_group
()
if
group
is
None
else
group
group
=
_get_default_group
()
if
group
is
None
else
group
tensor_shape
=
list
(
tensor
.
shape
)
if
len
(
tensor_list
)
==
0
:
tensor_shape
[
0
]
*=
group
.
nranks
tensor_shape
=
list
(
tensor
.
shape
)
out
=
paddle
.
empty
(
tensor_shape
,
tensor
.
dtype
)
tensor_shape
[
0
]
*=
group
.
nranks
out
=
paddle
.
empty
(
tensor_shape
,
tensor
.
dtype
)
else
:
out
=
paddle
.
concat
(
tensor_list
,
axis
=
0
)
task
=
group
.
process_group
.
all_gather
(
tensor
,
out
)
task
=
group
.
process_group
.
all_gather
(
tensor
,
out
)
task
.
wait
()
task
.
wait
()
tensor_list
.
clear
()
tensor_list
.
clear
()
...
@@ -1783,7 +1786,12 @@ def alltoall(in_tensor_list, out_tensor_list, group=None, use_calc_stream=True):
...
@@ -1783,7 +1786,12 @@ def alltoall(in_tensor_list, out_tensor_list, group=None, use_calc_stream=True):
temp
=
paddle
.
concat
(
in_tensor_list
,
axis
=
0
)
temp
=
paddle
.
concat
(
in_tensor_list
,
axis
=
0
)
nranks
=
len
(
in_tensor_list
)
nranks
=
len
(
in_tensor_list
)
if
in_dygraph_mode
():
if
in_dygraph_mode
():
out
=
paddle
.
concat
(
out_tensor_list
,
axis
=
0
)
if
len
(
out_tensor_list
)
==
0
:
tensor_shape
=
list
(
in_tensor_list
[
0
].
shape
)
tensor_shape
[
0
]
*=
nranks
out
=
paddle
.
empty
(
tensor_shape
,
in_tensor_list
[
0
].
dtype
)
else
:
out
=
paddle
.
concat
(
out_tensor_list
,
axis
=
0
)
task
=
group
.
process_group
.
alltoall
(
temp
,
out
)
task
=
group
.
process_group
.
alltoall
(
temp
,
out
)
task
.
wait
()
task
.
wait
()
out_tensor_list
.
clear
()
out_tensor_list
.
clear
()
...
...
python/paddle/fluid/tests/unittests/process_group_nccl.py
浏览文件 @
c6a084ef
...
@@ -185,6 +185,24 @@ class TestProcessGroupFp32(unittest.TestCase):
...
@@ -185,6 +185,24 @@ class TestProcessGroupFp32(unittest.TestCase):
assert
np
.
array_equal
(
tensor_y
,
out_2
)
assert
np
.
array_equal
(
tensor_y
,
out_2
)
print
(
"test allgather api ok
\n
"
)
print
(
"test allgather api ok
\n
"
)
if
pg
.
rank
()
==
0
:
task
=
pg
.
all_gather
(
tensor_x
,
tensor_out
)
task
.
wait
()
paddle
.
device
.
cuda
.
synchronize
()
# rank 1
else
:
tensor_out_list
=
[]
task
=
dist
.
all_gather
(
tensor_out_list
,
tensor_y
,
use_calc_stream
=
False
)
paddle
.
device
.
cuda
.
synchronize
()
tensor_out
=
paddle
.
concat
(
tensor_out_list
)
out_1
=
paddle
.
slice
(
tensor_out
,
[
0
],
[
0
],
[
out_shape
[
0
]
//
2
])
out_2
=
paddle
.
slice
(
tensor_out
,
[
0
],
[
out_shape
[
0
]
//
2
],
[
out_shape
[
0
]])
assert
np
.
array_equal
(
tensor_x
,
out_1
)
assert
np
.
array_equal
(
tensor_y
,
out_2
)
print
(
"test allgather api2 ok
\n
"
)
# test alltoall
# test alltoall
# rank 0
# rank 0
x
=
np
.
random
.
random
(
self
.
shape
).
astype
(
self
.
dtype
)
x
=
np
.
random
.
random
(
self
.
shape
).
astype
(
self
.
dtype
)
...
@@ -219,6 +237,38 @@ class TestProcessGroupFp32(unittest.TestCase):
...
@@ -219,6 +237,38 @@ class TestProcessGroupFp32(unittest.TestCase):
assert
np
.
array_equal
(
out2_1
,
raw_tensor_x_2
)
assert
np
.
array_equal
(
out2_1
,
raw_tensor_x_2
)
print
(
"test alltoall api ok
\n
"
)
print
(
"test alltoall api ok
\n
"
)
x
=
np
.
random
.
random
(
self
.
shape
).
astype
(
self
.
dtype
)
y
=
np
.
random
.
random
(
self
.
shape
).
astype
(
self
.
dtype
)
out1
=
np
.
random
.
random
(
self
.
shape
).
astype
(
self
.
dtype
)
out2
=
np
.
random
.
random
(
self
.
shape
).
astype
(
self
.
dtype
)
tensor_x
=
paddle
.
to_tensor
(
x
)
tensor_y
=
paddle
.
to_tensor
(
y
)
tensor_out1
=
paddle
.
to_tensor
(
out1
)
tensor_out2
=
paddle
.
to_tensor
(
out2
)
raw_tensor_x_2
=
paddle
.
slice
(
tensor_x
,
[
0
],
[
self
.
shape
[
0
]
//
2
],
[
self
.
shape
[
0
]])
raw_tensor_y_1
=
paddle
.
slice
(
tensor_y
,
[
0
],
[
0
],
[
self
.
shape
[
0
]
//
2
])
if
pg
.
rank
()
==
0
:
task
=
pg
.
alltoall
(
tensor_x
,
tensor_out1
)
task
.
wait
()
# rank 1
else
:
in_1
,
in_2
=
paddle
.
split
(
tensor_y
,
2
)
out_1
,
out_2
=
paddle
.
split
(
tensor_out2
,
2
)
out_tensor_list
=
[]
task
=
dist
.
alltoall
([
in_1
,
in_2
],
out_tensor_list
)
paddle
.
device
.
cuda
.
synchronize
()
tensor_out2
=
paddle
.
concat
(
out_tensor_list
)
out1_2
=
paddle
.
slice
(
tensor_out1
,
[
0
],
[
self
.
shape
[
0
]
//
2
],
[
self
.
shape
[
0
]])
out2_1
=
paddle
.
slice
(
tensor_out2
,
[
0
],
[
0
],
[
self
.
shape
[
0
]
//
2
])
if
pg
.
rank
()
==
0
:
assert
np
.
array_equal
(
out1_2
.
numpy
(),
raw_tensor_y_1
.
numpy
())
else
:
assert
np
.
array_equal
(
out2_1
,
raw_tensor_x_2
)
print
(
"test alltoall api2 ok
\n
"
)
# test Reduce
# test Reduce
# rank 0
# rank 0
x
=
np
.
random
.
random
(
self
.
shape
).
astype
(
self
.
dtype
)
x
=
np
.
random
.
random
(
self
.
shape
).
astype
(
self
.
dtype
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录