Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
Paddle
提交
9ce31e96
P
Paddle
项目概览
BaiXuePrincess
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
9ce31e96
编写于
9月 21, 2022
作者:
W
wuhuachaocoding
提交者:
GitHub
9月 21, 2022
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Mpi final dev simple (#46247)
上级
3d656b58
变更
16
隐藏空白更改
内联
并排
Showing
16 changed file
with
1440 addition
and
0 deletion
+1440
-0
CMakeLists.txt
CMakeLists.txt
+7
-0
cmake/mpi.cmake
cmake/mpi.cmake
+33
-0
paddle/fluid/distributed/collective/CMakeLists.txt
paddle/fluid/distributed/collective/CMakeLists.txt
+7
-0
paddle/fluid/distributed/collective/MPITools.cc
paddle/fluid/distributed/collective/MPITools.cc
+56
-0
paddle/fluid/distributed/collective/MPITools.h
paddle/fluid/distributed/collective/MPITools.h
+53
-0
paddle/fluid/distributed/collective/ProcessGroup.cc
paddle/fluid/distributed/collective/ProcessGroup.cc
+8
-0
paddle/fluid/distributed/collective/ProcessGroup.h
paddle/fluid/distributed/collective/ProcessGroup.h
+3
-0
paddle/fluid/distributed/collective/ProcessGroupMPI.cc
paddle/fluid/distributed/collective/ProcessGroupMPI.cc
+467
-0
paddle/fluid/distributed/collective/ProcessGroupMPI.h
paddle/fluid/distributed/collective/ProcessGroupMPI.h
+211
-0
paddle/fluid/pybind/CMakeLists.txt
paddle/fluid/pybind/CMakeLists.txt
+7
-0
paddle/fluid/pybind/distributed_py.cc
paddle/fluid/pybind/distributed_py.cc
+23
-0
paddle/fluid/pybind/pybind.cc
paddle/fluid/pybind/pybind.cc
+19
-0
python/paddle/fluid/tests/unittests/collective/CMakeLists.txt
...on/paddle/fluid/tests/unittests/collective/CMakeLists.txt
+12
-0
python/paddle/fluid/tests/unittests/collective/process_group_mpi.py
...dle/fluid/tests/unittests/collective/process_group_mpi.py
+506
-0
python/paddle/fluid/tests/unittests/collective/test_mpi_comm.sh
.../paddle/fluid/tests/unittests/collective/test_mpi_comm.sh
+27
-0
python/paddle/fluid/tests/unittests/collective/testslist.csv
python/paddle/fluid/tests/unittests/collective/testslist.csv
+1
-0
未找到文件。
CMakeLists.txt
浏览文件 @
9ce31e96
...
...
@@ -485,6 +485,9 @@ if(WITH_DISTRIBUTE)
ON
CACHE STRING
"Enable GLOO when compiling WITH_DISTRIBUTE=ON."
FORCE
)
endif
()
set
(
WITH_MPI
ON
CACHE STRING
"Enable MPI when compiling WITH_DISTRIBUTE=ON."
FORCE
)
if
(
WITH_ASCEND_CL AND NOT WITH_ARM_BRPC
)
# disable WITH_PSCORE for NPU before include third_party
message
(
...
...
@@ -509,6 +512,10 @@ if(WITH_DISTRIBUTE)
endif
()
endif
()
if
(
WITH_MPI
)
include
(
mpi
)
endif
()
include
(
third_party
)
# download, build, install third_party, Contains about 20+ dependencies
...
...
cmake/mpi.cmake
0 → 100644
浏览文件 @
9ce31e96
if
(
NOT WITH_DISTRIBUTE OR NOT WITH_MPI
)
return
()
endif
()
find_package
(
MPI
)
if
(
NOT MPI_CXX_FOUND
)
set
(
WITH_MPI
OFF
CACHE STRING
"Disable MPI"
FORCE
)
message
(
WARNING
"Not found MPI support in current system"
)
return
()
endif
()
message
(
STATUS
"MPI compile flags: "
${
MPI_CXX_COMPILE_FLAGS
}
)
message
(
STATUS
"MPI include path: "
${
MPI_CXX_INCLUDE_PATH
}
)
message
(
STATUS
"MPI LINK flags path: "
${
MPI_CXX_LINK_FLAGS
}
)
message
(
STATUS
"MPI libraries: "
${
MPI_CXX_LIBRARIES
}
)
include_directories
(
SYSTEM
${
MPI_CXX_INCLUDE_PATH
}
)
set
(
CMAKE_EXE_LINKER_FLAGS
"
${
CMAKE_EXE_LINKER_FLAGS
}
${
MPI_CXX_LINK_FLAGS
}
"
)
add_definitions
(
"-DPADDLE_WITH_MPI"
)
find_program
(
OMPI_INFO
NAMES ompi_info
HINTS
${
MPI_CXX_LIBRARIES
}
/../bin
)
if
(
OMPI_INFO
)
execute_process
(
COMMAND
${
OMPI_INFO
}
OUTPUT_VARIABLE output_
)
if
(
output_ MATCHES
"smcuda"
)
#NOTE some mpi lib support mpi cuda aware.
add_definitions
(
"-DPADDLE_WITH_MPI_AWARE"
)
endif
()
endif
()
paddle/fluid/distributed/collective/CMakeLists.txt
浏览文件 @
9ce31e96
...
...
@@ -43,6 +43,13 @@ if(WITH_NCCL OR WITH_RCCL)
endif
()
endif
()
if
(
WITH_MPI
)
cc_library
(
processgroup_mpi
SRCS ProcessGroupMPI.cc MPITools.cc Common.cc
DEPS collective_helper device_context
)
endif
()
if
(
WITH_ASCEND_CL
)
cc_library
(
processgroup_hccl
...
...
paddle/fluid/distributed/collective/MPITools.cc
0 → 100644
浏览文件 @
9ce31e96
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/distributed/collective/MPITools.h"
#include "paddle/fluid/distributed/collective/Common.h"
#include "paddle/fluid/distributed/collective/Types.h"
namespace
paddle
{
namespace
distributed
{
namespace
mpi
{
MPI_Op
ToMPIType
(
ReduceOp
reduction
)
{
static
const
std
::
map
<
ReduceOp
,
MPI_Op
>
red_type
=
{
{
ReduceOp
::
MIN
,
MPI_MIN
},
{
ReduceOp
::
MAX
,
MPI_MAX
},
{
ReduceOp
::
SUM
,
MPI_SUM
},
{
ReduceOp
::
PRODUCT
,
MPI_PROD
},
};
auto
it
=
red_type
.
find
(
reduction
);
PADDLE_ENFORCE_EQ
(
it
!=
red_type
.
end
(),
true
,
platform
::
errors
::
InvalidArgument
(
"Invalid mpi reduction. Must be MPI_MIN | MPI_MAX | "
"MPI_PROD | MPI_SUM."
));
return
it
->
second
;
}
// NOTE: MPI dose not support CUDA aware now.
bool
CheckMpiCudaAware
()
{
return
false
;
}
void
CheckValidInputs
(
const
std
::
vector
<
phi
::
DenseTensor
>&
tensors
)
{
PADDLE_ENFORCE_EQ
(
tensors
.
size
()
==
1
,
true
,
platform
::
errors
::
InvalidArgument
(
"the inputs size of MPI must be 1!"
));
PADDLE_ENFORCE_EQ
(
CheckTensorsInCudaPlace
(
tensors
)
&&
!
CheckMpiCudaAware
(),
false
,
platform
::
errors
::
InvalidArgument
(
"Found CUDA Tensor. But CUDA-aware MPI not support!"
));
}
}
// namespace mpi
}
// namespace distributed
}
// namespace paddle
paddle/fluid/distributed/collective/MPITools.h
0 → 100644
浏览文件 @
9ce31e96
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <error.h>
#include <iostream>
#include <string>
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/distributed/collective/Types.h"
#ifdef HOST
#undef HOST
#endif
#include <mpi.h>
namespace
paddle
{
namespace
distributed
{
namespace
mpi
{
#define MPI_CHECK(cmd) \
do { \
int r = cmd; \
if (r != MPI_SUCCESS) { \
LOG(FATAL) << "Failed, MPI error in" << __FILE__ << ":" << __LINE__ \
<< "with error code: " << std::to_string(r) << std::endl; \
exit(EXIT_FAILURE); \
} \
} while (0)
MPI_Op
ToMPIType
(
ReduceOp
reduction
);
bool
CheckMpiCudaAware
();
void
CheckValidInputs
(
const
std
::
vector
<
phi
::
DenseTensor
>&
tensors
);
}
// namespace mpi
}
// namespace distributed
}
// namespace paddle
paddle/fluid/distributed/collective/ProcessGroup.cc
浏览文件 @
9ce31e96
...
...
@@ -52,5 +52,13 @@ ProcessGroup::ProcessGroup(int rank,
}
}
ProcessGroup
::
ProcessGroup
(
int
rank
,
int
size
,
int
gid
)
:
rank_
(
rank
),
size_
(
size
),
gid_
(
gid
)
{
if
(
gid
!=
IGNORE_ID
)
{
auto
map
=
ProcessGroupMapFromGid
::
getInstance
();
map
->
insert
(
gid_
,
this
);
}
}
}
// namespace distributed
}
// namespace paddle
paddle/fluid/distributed/collective/ProcessGroup.h
浏览文件 @
9ce31e96
...
...
@@ -82,6 +82,9 @@ class ProcessGroup {
int
size
,
const
platform
::
Place
&
place
,
int
gid
);
explicit
ProcessGroup
(
int
rank
,
int
size
,
int
gid
);
virtual
~
ProcessGroup
()
{}
int
GetRank
()
const
{
return
rank_
;
}
...
...
paddle/fluid/distributed/collective/ProcessGroupMPI.cc
0 → 100644
浏览文件 @
9ce31e96
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/distributed/collective/ProcessGroupMPI.h"
#include <chrono>
#include "paddle/fluid/distributed/collective/Common.h"
#include "paddle/fluid/platform/cuda_device_guard.h"
constexpr
int64_t
kWaitBlockTImeout
=
10
;
namespace
paddle
{
namespace
distributed
{
std
::
map
<
phi
::
DataType
,
MPI_Datatype
>
mpiDatatype
=
{
{
phi
::
DataType
::
INT8
,
MPI_CHAR
},
{
phi
::
DataType
::
UINT8
,
MPI_UNSIGNED_CHAR
},
{
phi
::
DataType
::
FLOAT32
,
MPI_FLOAT
},
{
phi
::
DataType
::
FLOAT64
,
MPI_DOUBLE
},
{
phi
::
DataType
::
INT32
,
MPI_INT
},
{
phi
::
DataType
::
INT64
,
MPI_LONG
}};
void
ProcessGroupMPI
::
MPITask
::
FinishMPITaskError
(
std
::
exception_ptr
eptr
)
{
Finish
(
eptr
);
}
void
ProcessGroupMPI
::
MPITask
::
FinishMPITask
()
{
Finish
();
}
ProcessGroupMPI
::
MPIAsyncTask
::
MPIAsyncTask
(
MPI_Request
request
,
const
std
::
vector
<
phi
::
DenseTensor
>&
inputs
)
:
ProcessGroup
::
Task
(
-
1
,
inputs
,
CommType
::
UNKNOWN
),
request_
(
request
)
{
memset
(
&
status_
,
0
,
sizeof
(
status_
));
}
ProcessGroupMPI
::
MPIAsyncTask
::~
MPIAsyncTask
()
{
if
(
request_
!=
MPI_REQUEST_NULL
)
{
std
::
cerr
<<
" Task has not completed, try to destruct async mpi task, "
<<
"exit the program."
<<
std
::
endl
;
std
::
terminate
();
}
}
bool
ProcessGroupMPI
::
MPIAsyncTask
::
IsCompleted
()
{
if
(
request_
==
MPI_REQUEST_NULL
)
{
return
true
;
}
std
::
unique_lock
<
std
::
mutex
>
lock
(
pg_global_mutex
);
int
flag
=
0
;
MPI_CHECK
(
MPI_Test
(
&
request_
,
&
flag
,
&
status_
));
if
(
request_
!=
MPI_REQUEST_NULL
)
{
return
false
;
}
if
(
status_
.
MPI_ERROR
!=
MPI_SUCCESS
)
{
AppearException
();
}
return
true
;
}
bool
ProcessGroupMPI
::
MPIAsyncTask
::
Wait
(
std
::
chrono
::
milliseconds
timeout
)
{
if
(
request_
==
MPI_REQUEST_NULL
)
{
return
true
;
}
std
::
unique_lock
<
std
::
mutex
>
lock
(
pg_global_mutex
);
MPI_CHECK
(
MPI_Wait
(
&
request_
,
&
status_
));
if
(
status_
.
MPI_ERROR
!=
MPI_SUCCESS
)
{
AppearException
();
std
::
rethrow_exception
(
exception_
);
return
false
;
}
return
true
;
}
void
ProcessGroupMPI
::
MPIAsyncTask
::
AppearException
()
{
std
::
array
<
char
,
MPI_MAX_ERROR_STRING
>
buf
;
int
len
=
buf
.
size
();
MPI_CHECK
(
MPI_Error_string
(
status_
.
MPI_ERROR
,
buf
.
data
(),
&
len
));
exception_
=
std
::
make_exception_ptr
(
std
::
runtime_error
(
std
::
string
(
buf
.
data
(),
len
)));
}
void
ProcessGroupMPI
::
MPIAsyncTask
::
SetOutputs
(
std
::
vector
<
phi
::
DenseTensor
>&
outputs
)
{
outputs_
=
std
::
make_shared
<
std
::
vector
<
phi
::
DenseTensor
>>
(
outputs
);
}
int
ProcessGroupMPI
::
mpi_thread_support
=
0
;
std
::
mutex
ProcessGroupMPI
::
pg_global_mutex
;
std
::
once_flag
ProcessGroupMPI
::
onceFlag
;
void
ProcessGroupMPI
::
ExitMPI
()
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
pg_global_mutex
);
MPI_CHECK
(
MPI_Finalize
());
}
void
ProcessGroupMPI
::
InitOneTimeMPI
()
{
std
::
call_once
(
onceFlag
,
[]()
{
MPI_CHECK
(
MPI_Init_thread
(
nullptr
,
nullptr
,
MPI_THREAD_SERIALIZED
,
&
mpi_thread_support
));
PADDLE_ENFORCE_EQ
(
mpi_thread_support
<
MPI_THREAD_SERIALIZED
,
false
,
platform
::
errors
::
InvalidArgument
(
"MPI supports the number of threads "
"less than MPI_THREAD_SERIALIZED. "
));
std
::
atexit
(
ProcessGroupMPI
::
ExitMPI
);
});
}
std
::
shared_ptr
<
ProcessGroupMPI
>
ProcessGroupMPI
::
CreateProcessGroupMPI
(
const
std
::
vector
<
int
>&
ranks
,
int
gid
)
{
InitOneTimeMPI
();
MPI_Comm
groupComm
=
MPI_COMM_WORLD
;
int
rank
=
-
1
;
int
size
=
-
1
;
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
pg_global_mutex
);
if
(
!
ranks
.
empty
())
{
MPI_Group
worldGroup
;
MPI_Group
ranksGroup
;
MPI_CHECK
(
MPI_Comm_group
(
MPI_COMM_WORLD
,
&
worldGroup
));
MPI_CHECK
(
MPI_Group_incl
(
worldGroup
,
ranks
.
size
(),
ranks
.
data
(),
&
ranksGroup
));
constexpr
int
maxRetries
=
3
;
bool
create_success
=
false
;
MPI_Barrier
(
MPI_COMM_WORLD
);
for
(
auto
i
=
0
;
i
<
maxRetries
;
i
++
)
{
if
(
MPI_Comm_create
(
MPI_COMM_WORLD
,
ranksGroup
,
&
groupComm
))
{
create_success
=
true
;
break
;
}
}
MPI_CHECK
(
create_success
);
MPI_CHECK
(
MPI_Group_free
(
&
worldGroup
));
MPI_CHECK
(
MPI_Group_free
(
&
ranksGroup
));
}
if
(
groupComm
!=
MPI_COMM_NULL
)
{
MPI_CHECK
(
MPI_Comm_rank
(
groupComm
,
&
rank
));
MPI_CHECK
(
MPI_Comm_size
(
groupComm
,
&
size
));
PADDLE_ENFORCE_EQ
(
rank
<
0
||
size
<
0
,
false
,
platform
::
errors
::
InvalidArgument
(
"get world_size or rank failed!"
));
}
}
if
(
groupComm
==
MPI_COMM_NULL
)
{
return
std
::
shared_ptr
<
ProcessGroupMPI
>
();
}
VLOG
(
3
)
<<
"MPI Group Create Success! rank = "
<<
rank
<<
" size = "
<<
size
<<
" group_id = "
<<
gid
;
return
std
::
make_shared
<
ProcessGroupMPI
>
(
rank
,
size
,
groupComm
,
gid
);
}
ProcessGroupMPI
::
ProcessGroupMPI
(
int
rank
,
int
size
,
MPI_Comm
pg_comm
,
int
gid
)
:
ProcessGroup
(
rank
,
size
,
gid
),
stop_
(
false
),
pg_comm
(
pg_comm
)
{
PADDLE_ENFORCE_EQ
(
pg_comm
==
MPI_COMM_NULL
,
false
,
platform
::
errors
::
InvalidArgument
(
"Error! mpi comm is MPI_COMM_NULL!"
));
worker_thread
=
std
::
thread
(
&
ProcessGroupMPI
::
workLoop
,
this
);
}
ProcessGroupMPI
::~
ProcessGroupMPI
()
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
pg_mutex
);
queue_consume
.
wait
(
lock
,
[
&
]
{
return
queue_
.
empty
();
});
stop_
=
true
;
lock
.
unlock
();
queue_produce
.
notify_all
();
worker_thread
.
join
();
}
void
ProcessGroupMPI
::
workLoop
()
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
pg_mutex
);
while
(
!
stop_
)
{
if
(
queue_
.
empty
())
{
queue_produce
.
wait
(
lock
);
continue
;
}
auto
taskTuple
=
std
::
move
(
queue_
.
front
());
queue_
.
pop_front
();
auto
&
taskEntry
=
std
::
get
<
0
>
(
taskTuple
);
auto
&
task
=
std
::
get
<
1
>
(
taskTuple
);
lock
.
unlock
();
queue_consume
.
notify_one
();
try
{
taskEntry
->
run_
(
taskEntry
);
task
->
FinishMPITask
();
}
catch
(...)
{
task
->
FinishMPITaskError
(
std
::
current_exception
());
}
lock
.
lock
();
}
}
std
::
shared_ptr
<
ProcessGroup
::
Task
>
ProcessGroupMPI
::
Enqueue
(
std
::
unique_ptr
<
TaskEntry
>
entry
,
const
std
::
vector
<
phi
::
DenseTensor
>&
inputs
)
{
auto
task
=
std
::
make_shared
<
MPITask
>
(
entry
->
dst_
,
inputs
);
std
::
unique_lock
<
std
::
mutex
>
lock
(
pg_mutex
);
queue_
.
push_back
(
std
::
make_tuple
(
std
::
move
(
entry
),
task
));
lock
.
unlock
();
queue_produce
.
notify_one
();
return
task
;
}
std
::
shared_ptr
<
ProcessGroup
::
Task
>
ProcessGroupMPI
::
Broadcast
(
std
::
vector
<
phi
::
DenseTensor
>&
in_tensors
,
std
::
vector
<
phi
::
DenseTensor
>&
out_tensors
,
const
BroadcastOptions
&
opts
)
{
mpi
::
CheckValidInputs
(
in_tensors
);
const
auto
places
=
GetPlaceList
(
in_tensors
);
std
::
function
<
void
(
std
::
unique_ptr
<
TaskEntry
>&
)
>
runFunc
=
[
opts
,
this
](
std
::
unique_ptr
<
TaskEntry
>&
entry
)
{
auto
data
=
(
entry
->
src_
)[
0
];
std
::
unique_lock
<
std
::
mutex
>
lock
(
pg_global_mutex
);
const
auto
root
=
opts
.
source_rank
+
opts
.
source_root
;
MPI_CHECK
(
MPI_Bcast
(
data
.
data
(),
data
.
numel
(),
mpiDatatype
.
at
(
data
.
dtype
()),
root
,
pg_comm
));
};
auto
entry
=
std
::
make_unique
<
TaskEntry
>
(
&
in_tensors
,
&
out_tensors
,
std
::
move
(
runFunc
));
return
Enqueue
(
std
::
move
(
entry
),
in_tensors
);
}
std
::
shared_ptr
<
ProcessGroup
::
Task
>
ProcessGroupMPI
::
AllReduce
(
std
::
vector
<
phi
::
DenseTensor
>&
in_tensors
,
std
::
vector
<
phi
::
DenseTensor
>&
out_tensors
,
const
AllreduceOptions
&
opts
)
{
mpi
::
CheckValidInputs
(
in_tensors
);
std
::
function
<
void
(
std
::
unique_ptr
<
TaskEntry
>&
)
>
runFunc
=
[
opts
,
this
](
std
::
unique_ptr
<
TaskEntry
>&
entry
)
{
auto
data
=
(
entry
->
src_
)[
0
];
std
::
unique_lock
<
std
::
mutex
>
lock
(
pg_global_mutex
);
MPI_CHECK
(
MPI_Allreduce
(
MPI_IN_PLACE
,
data
.
data
(),
data
.
numel
(),
mpiDatatype
.
at
(
data
.
dtype
()),
mpi
::
ToMPIType
(
opts
.
reduce_op
),
pg_comm
));
};
auto
entry
=
std
::
make_unique
<
TaskEntry
>
(
&
in_tensors
,
&
out_tensors
,
std
::
move
(
runFunc
));
return
Enqueue
(
std
::
move
(
entry
),
in_tensors
);
}
std
::
shared_ptr
<
ProcessGroup
::
Task
>
ProcessGroupMPI
::
Barrier
(
const
BarrierOptions
&
opts
)
{
std
::
function
<
void
(
std
::
unique_ptr
<
TaskEntry
>&
)
>
runFunc
=
[
this
](
std
::
unique_ptr
<
TaskEntry
>&
entry
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
pg_global_mutex
);
MPI_CHECK
(
MPI_Barrier
(
pg_comm
));
};
auto
entry
=
std
::
make_unique
<
TaskEntry
>
(
nullptr
,
nullptr
,
std
::
move
(
runFunc
));
return
Enqueue
(
std
::
move
(
entry
),
std
::
vector
<
phi
::
DenseTensor
>
{});
}
// NOTE: MPI_send tag set gid_
std
::
shared_ptr
<
ProcessGroup
::
Task
>
ProcessGroupMPI
::
Send
(
std
::
vector
<
phi
::
DenseTensor
>&
tensors
,
int
dst_rank
)
{
mpi
::
CheckValidInputs
(
tensors
);
auto
&
tensor
=
tensors
[
0
];
MPI_Request
request
=
MPI_REQUEST_NULL
;
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
pg_global_mutex
);
MPI_CHECK
(
MPI_Isend
(
tensor
.
data
(),
tensor
.
numel
(),
mpiDatatype
.
at
(
tensor
.
dtype
()),
dst_rank
,
this
->
gid_
,
pg_comm
,
&
request
));
}
return
std
::
make_shared
<
ProcessGroupMPI
::
MPIAsyncTask
>
(
request
,
tensors
);
}
std
::
shared_ptr
<
ProcessGroup
::
Task
>
ProcessGroupMPI
::
Recv
(
std
::
vector
<
phi
::
DenseTensor
>&
tensors
,
int
src_rank
)
{
mpi
::
CheckValidInputs
(
tensors
);
auto
&
tensor
=
tensors
[
0
];
MPI_Request
request
=
MPI_REQUEST_NULL
;
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
pg_global_mutex
);
MPI_CHECK
(
MPI_Irecv
(
tensor
.
data
(),
tensor
.
numel
(),
mpiDatatype
.
at
(
tensor
.
dtype
()),
src_rank
,
this
->
gid_
,
pg_comm
,
&
request
));
}
return
std
::
make_shared
<
ProcessGroupMPI
::
MPIAsyncTask
>
(
request
,
tensors
);
}
std
::
shared_ptr
<
ProcessGroup
::
Task
>
ProcessGroupMPI
::
AllGather
(
std
::
vector
<
phi
::
DenseTensor
>&
in_tensors
,
std
::
vector
<
phi
::
DenseTensor
>&
out_tensors
)
{
mpi
::
CheckValidInputs
(
in_tensors
);
PADDLE_ENFORCE_EQ
(
out_tensors
.
size
()
==
1
,
true
,
platform
::
errors
::
InvalidArgument
(
"MPI only support a single tensor op."
));
std
::
function
<
void
(
std
::
unique_ptr
<
TaskEntry
>&
)
>
runFunc
=
[
this
](
std
::
unique_ptr
<
TaskEntry
>&
entry
)
{
auto
data
=
(
entry
->
src_
)[
0
];
std
::
vector
<
phi
::
DenseTensor
>
dst
=
entry
->
dst_
;
std
::
unique_lock
<
std
::
mutex
>
lock
(
pg_global_mutex
);
MPI_CHECK
(
MPI_Allgather
(
data
.
data
(),
data
.
numel
(),
mpiDatatype
.
at
(
data
.
dtype
()),
dst
[
0
].
data
(),
data
.
numel
(),
mpiDatatype
.
at
(
data
.
dtype
()),
pg_comm
));
};
auto
entry
=
std
::
make_unique
<
TaskEntry
>
(
&
in_tensors
,
&
out_tensors
,
std
::
move
(
runFunc
));
return
Enqueue
(
std
::
move
(
entry
),
in_tensors
);
}
std
::
shared_ptr
<
ProcessGroup
::
Task
>
ProcessGroupMPI
::
AllToAll
(
std
::
vector
<
phi
::
DenseTensor
>&
in_tensors
,
std
::
vector
<
phi
::
DenseTensor
>&
out_tensors
)
{
mpi
::
CheckValidInputs
(
in_tensors
);
mpi
::
CheckValidInputs
(
out_tensors
);
PADDLE_ENFORCE_EQ
(
in_tensors
[
0
].
numel
()
==
out_tensors
[
0
].
numel
()
&&
in_tensors
[
0
].
dtype
()
==
out_tensors
[
0
].
dtype
(),
true
,
platform
::
errors
::
InvalidArgument
(
"MPI AlltoAll: input and output are not equal in "
"size or data type."
));
std
::
function
<
void
(
std
::
unique_ptr
<
TaskEntry
>&
)
>
runFunc
=
[
this
](
std
::
unique_ptr
<
TaskEntry
>&
entry
)
{
auto
srcdata
=
(
entry
->
src_
)[
0
];
auto
dstdata
=
(
entry
->
dst_
)[
0
];
std
::
unique_lock
<
std
::
mutex
>
lock
(
pg_global_mutex
);
MPI_CHECK
(
MPI_Alltoall
(
srcdata
.
data
(),
srcdata
.
numel
()
/
size_
,
mpiDatatype
.
at
(
srcdata
.
dtype
()),
dstdata
.
data
(),
dstdata
.
numel
()
/
size_
,
mpiDatatype
.
at
(
dstdata
.
dtype
()),
pg_comm
));
};
auto
entry
=
std
::
make_unique
<
TaskEntry
>
(
&
in_tensors
,
&
out_tensors
,
std
::
move
(
runFunc
));
return
Enqueue
(
std
::
move
(
entry
),
in_tensors
);
}
std
::
shared_ptr
<
ProcessGroup
::
Task
>
ProcessGroupMPI
::
Reduce
(
std
::
vector
<
phi
::
DenseTensor
>&
tensors
,
std
::
vector
<
phi
::
DenseTensor
>&
out_tensors
,
const
ReduceOptions
&
opts
)
{
mpi
::
CheckValidInputs
(
tensors
);
std
::
function
<
void
(
std
::
unique_ptr
<
TaskEntry
>&
)
>
runFunc
=
[
opts
,
this
](
std
::
unique_ptr
<
TaskEntry
>&
entry
)
{
auto
data
=
(
entry
->
src_
)[
0
];
auto
dataPtr
=
(
entry
->
src_
)[
0
].
data
();
void
*
sendbuf
=
(
rank_
==
opts
.
root_rank
)
?
MPI_IN_PLACE
:
dataPtr
;
void
*
recvbuf
=
(
rank_
==
opts
.
root_rank
)
?
dataPtr
:
nullptr
;
std
::
unique_lock
<
std
::
mutex
>
lock
(
pg_global_mutex
);
MPI_CHECK
(
MPI_Reduce
(
sendbuf
,
recvbuf
,
data
.
numel
(),
mpiDatatype
.
at
(
data
.
dtype
()),
mpi
::
ToMPIType
(
opts
.
reduce_op
),
opts
.
root_rank
,
pg_comm
));
};
auto
entry
=
std
::
make_unique
<
TaskEntry
>
(
&
tensors
,
&
tensors
,
std
::
move
(
runFunc
));
return
Enqueue
(
std
::
move
(
entry
),
tensors
);
}
std
::
shared_ptr
<
ProcessGroup
::
Task
>
ProcessGroupMPI
::
Scatter
(
std
::
vector
<
phi
::
DenseTensor
>&
in_tensors
,
std
::
vector
<
phi
::
DenseTensor
>&
out_tensors
,
const
ScatterOptions
&
opts
)
{
mpi
::
CheckValidInputs
(
in_tensors
);
std
::
function
<
void
(
std
::
unique_ptr
<
TaskEntry
>&
)
>
runFunc
=
[
opts
,
this
](
std
::
unique_ptr
<
TaskEntry
>&
entry
)
{
auto
data
=
(
entry
->
dst_
)[
0
];
void
*
sendbuf
=
nullptr
;
if
(
rank_
==
opts
.
root_rank
)
{
std
::
vector
<
phi
::
DenseTensor
>&
inputData
=
entry
->
src_
;
sendbuf
=
inputData
[
0
].
data
();
}
std
::
unique_lock
<
std
::
mutex
>
lock
(
pg_global_mutex
);
MPI_CHECK
(
MPI_Scatter
(
sendbuf
,
data
.
numel
(),
mpiDatatype
.
at
(
data
.
dtype
()),
data
.
data
(),
data
.
numel
(),
mpiDatatype
.
at
(
data
.
dtype
()),
opts
.
root_rank
,
pg_comm
));
};
if
(
rank_
==
opts
.
root_rank
)
{
auto
entry
=
std
::
make_unique
<
TaskEntry
>
(
&
in_tensors
,
&
out_tensors
,
std
::
move
(
runFunc
));
return
Enqueue
(
std
::
move
(
entry
),
in_tensors
);
}
else
{
auto
entry
=
std
::
make_unique
<
TaskEntry
>
(
nullptr
,
&
out_tensors
,
std
::
move
(
runFunc
));
return
Enqueue
(
std
::
move
(
entry
),
in_tensors
);
}
}
}
// namespace distributed
}
// namespace paddle
paddle/fluid/distributed/collective/ProcessGroupMPI.h
0 → 100644
浏览文件 @
9ce31e96
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <chrono>
#include <map>
#include <string>
#include <unordered_map>
#include <vector>
#include <condition_variable>
#include <deque>
#include <exception>
#include <mutex>
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/fluid/distributed/collective/Types.h"
#include "paddle/fluid/platform/device_context.h"
#if defined(PADDLE_WITH_MPI)
#include "paddle/fluid/distributed/collective/MPITools.h"
#endif
constexpr
const
char
*
MPI_BACKEND_NAME
=
"MPI"
;
namespace
paddle
{
namespace
distributed
{
struct
TaskEntry
{
explicit
TaskEntry
(
std
::
vector
<
phi
::
DenseTensor
>*
src_ptr
,
std
::
vector
<
phi
::
DenseTensor
>*
dst_ptr
,
std
::
function
<
void
(
std
::
unique_ptr
<
TaskEntry
>&
)
>
run
)
:
dst_
(
dst_ptr
?
*
dst_ptr
:
std
::
vector
<
phi
::
DenseTensor
>
()),
run_
(
std
::
move
(
run
))
{
if
(
src_ptr
)
{
src_
=
*
src_ptr
;
}
}
TaskEntry
(
const
TaskEntry
&
)
=
delete
;
TaskEntry
&
operator
=
(
const
TaskEntry
&
)
=
delete
;
std
::
vector
<
phi
::
DenseTensor
>
src_
;
std
::
vector
<
phi
::
DenseTensor
>
dst_
;
int
*
srcRank_
=
nullptr
;
std
::
function
<
void
(
std
::
unique_ptr
<
TaskEntry
>&
)
>
run_
;
};
class
ProcessGroupMPI
:
public
ProcessGroup
{
public:
class
MPITask
:
public
ProcessGroup
::
Task
{
public:
explicit
MPITask
(
std
::
vector
<
phi
::
DenseTensor
>
outputTensors
,
const
std
::
vector
<
phi
::
DenseTensor
>&
inputTensors
)
:
ProcessGroup
::
Task
(
-
1
,
inputTensors
,
CommType
::
UNKNOWN
),
outputs_
(
std
::
move
(
outputTensors
))
{}
void
Synchronize
()
{
Wait
();
}
bool
Wait
(
std
::
chrono
::
milliseconds
timeout
=
kWaitTimeout
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex_
);
if
(
timeout
==
kWaitTimeout
)
{
// This waits without a timeout.
cv_
.
wait
(
lock
,
[
&
]
{
return
is_completed_
;
});
}
else
{
// Waits for the user-provided timeout.
cv_
.
wait_for
(
lock
,
timeout
,
[
&
]
{
return
is_completed_
;
});
PADDLE_ENFORCE_EQ
(
is_completed_
,
true
,
platform
::
errors
::
InvalidArgument
(
"MPI operation timeout! "
));
}
if
(
exception_
)
{
std
::
rethrow_exception
(
exception_
);
}
return
true
;
}
protected:
friend
class
ProcessGroupMPI
;
private:
// about mpi
void
Finish
(
std
::
exception_ptr
exception
=
nullptr
)
{
is_completed_
=
true
;
exception_
=
exception
;
cv_
.
notify_all
();
}
void
FinishMPITask
();
void
FinishMPITaskError
(
std
::
exception_ptr
eptr
);
std
::
vector
<
phi
::
DenseTensor
>
outputs_
;
std
::
condition_variable
cv_
;
std
::
exception_ptr
exception_
;
};
public:
class
MPIAsyncTask
:
public
ProcessGroup
::
Task
{
public:
MPIAsyncTask
(
MPI_Request
request
,
const
std
::
vector
<
phi
::
DenseTensor
>&
inputs
);
bool
IsCompleted
();
void
Synchronize
()
{}
bool
Wait
(
std
::
chrono
::
milliseconds
timeout
=
kWaitTimeout
);
void
SetOutputs
(
std
::
vector
<
phi
::
DenseTensor
>&
outputs
);
// NOLINT
virtual
~
MPIAsyncTask
();
protected:
void
AppearException
();
private:
std
::
shared_ptr
<
std
::
vector
<
phi
::
DenseTensor
>>
outputs_
;
MPI_Request
request_
;
MPI_Status
status_
;
std
::
exception_ptr
exception_
;
};
ProcessGroupMPI
(
int
rank
,
int
size
,
MPI_Comm
pgComm
,
int
gid
);
virtual
~
ProcessGroupMPI
();
const
std
::
string
GetBackendName
()
const
override
{
return
std
::
string
(
MPI_BACKEND_NAME
);
}
std
::
shared_ptr
<
ProcessGroup
::
Task
>
AllReduce
(
std
::
vector
<
phi
::
DenseTensor
>&
in_tensors
,
std
::
vector
<
phi
::
DenseTensor
>&
out_tensors
,
const
AllreduceOptions
&
=
AllreduceOptions
())
override
;
std
::
shared_ptr
<
ProcessGroup
::
Task
>
Broadcast
(
std
::
vector
<
phi
::
DenseTensor
>&
in_tensors
,
std
::
vector
<
phi
::
DenseTensor
>&
out_tensors
,
const
BroadcastOptions
&
=
BroadcastOptions
())
override
;
std
::
shared_ptr
<
ProcessGroup
::
Task
>
Barrier
(
const
BarrierOptions
&
=
BarrierOptions
())
override
;
std
::
shared_ptr
<
ProcessGroup
::
Task
>
Send
(
std
::
vector
<
phi
::
DenseTensor
>&
tensors
,
int
dst_rank
)
override
;
std
::
shared_ptr
<
ProcessGroup
::
Task
>
Recv
(
std
::
vector
<
phi
::
DenseTensor
>&
tensors
,
int
src_rank
)
override
;
std
::
shared_ptr
<
ProcessGroup
::
Task
>
AllGather
(
std
::
vector
<
phi
::
DenseTensor
>&
in_tensors
,
std
::
vector
<
phi
::
DenseTensor
>&
out_tensors
)
override
;
std
::
shared_ptr
<
ProcessGroup
::
Task
>
AllToAll
(
std
::
vector
<
phi
::
DenseTensor
>&
in
,
std
::
vector
<
phi
::
DenseTensor
>&
out
)
override
;
std
::
shared_ptr
<
ProcessGroup
::
Task
>
Reduce
(
std
::
vector
<
phi
::
DenseTensor
>&
tensors
,
std
::
vector
<
phi
::
DenseTensor
>&
out_tensors
,
const
ReduceOptions
&
opts
)
override
;
std
::
shared_ptr
<
ProcessGroup
::
Task
>
Scatter
(
std
::
vector
<
phi
::
DenseTensor
>&
in_tensors
,
std
::
vector
<
phi
::
DenseTensor
>&
out_tensors
,
const
ScatterOptions
&
)
override
;
static
std
::
shared_ptr
<
ProcessGroupMPI
>
CreateProcessGroupMPI
(
const
std
::
vector
<
int
>&
ranks
,
int
gid
);
protected:
void
workLoop
();
std
::
shared_ptr
<
ProcessGroup
::
Task
>
Enqueue
(
std
::
unique_ptr
<
TaskEntry
>
entry
,
const
std
::
vector
<
phi
::
DenseTensor
>&
inputs
);
private:
bool
stop_
{
false
};
std
::
mutex
pg_mutex
;
std
::
thread
worker_thread
;
std
::
deque
<
std
::
tuple
<
std
::
unique_ptr
<
TaskEntry
>
,
std
::
shared_ptr
<
MPITask
>>>
queue_
;
std
::
condition_variable
queue_produce
;
std
::
condition_variable
queue_consume
;
static
void
InitOneTimeMPI
();
static
void
ExitMPI
();
static
std
::
once_flag
onceFlag
;
static
std
::
mutex
pg_global_mutex
;
static
int
mpi_thread_support
;
MPI_Comm
pg_comm
;
};
}
// namespace distributed
}
// namespace paddle
paddle/fluid/pybind/CMakeLists.txt
浏览文件 @
9ce31e96
...
...
@@ -151,6 +151,9 @@ if(WITH_PYTHON)
if
(
WITH_GLOO
)
set
(
PYBIND_DEPS
${
PYBIND_DEPS
}
processgroup_gloo
)
endif
()
if
(
WITH_MPI
)
set
(
PYBIND_DEPS
${
PYBIND_DEPS
}
processgroup_mpi
)
endif
()
if
(
WITH_ASCEND_CL
)
set
(
PYBIND_DEPS
${
PYBIND_DEPS
}
processgroup_hccl
)
if
(
WITH_PSCORE
)
...
...
@@ -591,6 +594,10 @@ if(WITH_PYTHON)
target_link_libraries
(
libpaddle
${
ROCM_HIPRTC_LIB
}
)
endif
()
if
(
WITH_MPI
)
target_link_libraries
(
libpaddle
${
MPI_CXX_LIBRARIES
}
)
endif
()
get_property
(
os_dependency_modules GLOBAL PROPERTY OS_DEPENDENCY_MODULES
)
target_link_libraries
(
libpaddle
${
os_dependency_modules
}
)
add_dependencies
(
libpaddle op_function_generator_cmd
)
...
...
paddle/fluid/pybind/distributed_py.cc
浏览文件 @
9ce31e96
...
...
@@ -36,6 +36,10 @@ limitations under the License. */
#include "paddle/fluid/distributed/collective/ProcessGroupNCCL.h"
#endif
#if defined(PADDLE_WITH_MPI)
#include "paddle/fluid/distributed/collective/ProcessGroupMPI.h"
#endif
#if defined(PADDLE_WITH_ASCEND_CL)
#include "paddle/fluid/distributed/collective/ProcessGroupHCCL.h"
#endif
...
...
@@ -623,6 +627,25 @@ void BindDistributed(py::module *m) {
#endif
#if defined(PADDLE_WITH_MPI)
py
::
class_
<
distributed
::
ProcessGroupMPI
,
std
::
shared_ptr
<
distributed
::
ProcessGroupMPI
>>
(
*
m
,
"ProcessGroupMPI"
,
ProcessGroup
)
.
def_static
(
"create"
,
[](
const
std
::
vector
<
int
>
&
ranks
,
int
gid
)
->
std
::
shared_ptr
<
distributed
::
ProcessGroupMPI
>
{
return
paddle
::
distributed
::
ProcessGroupMPI
::
CreateProcessGroupMPI
(
ranks
,
gid
);
})
.
def
(
"get_rank"
,
&
distributed
::
ProcessGroup
::
GetRank
,
py
::
call_guard
<
py
::
gil_scoped_release
>
())
.
def
(
"get_world_size"
,
&
distributed
::
ProcessGroup
::
GetSize
,
py
::
call_guard
<
py
::
gil_scoped_release
>
());
#endif
#if defined(PADDLE_WITH_GLOO) && defined(PADDLE_WITH_PSCORE) && \
(defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_ASCEND_CL))
py
::
class_
<
distributed
::
ProcessGroupHeter
,
...
...
paddle/fluid/pybind/pybind.cc
浏览文件 @
9ce31e96
...
...
@@ -229,6 +229,23 @@ bool IsCompiledWithNCCL() {
#endif
}
bool
IsCompiledWithMPI
()
{
#ifdef PADDLE_WITH_MPI
return
true
;
#else
return
false
;
#endif
}
// NOTE some mpi lib can support cuda aware, support it in the future.
bool
IsCompiledWithMPIAWARE
()
{
#ifdef PADDLE_WITH_MPI_AWARE
return
true
;
#else
return
false
;
#endif
}
bool
IsCompiledWithROCM
()
{
#ifndef PADDLE_WITH_HIP
return
false
;
...
...
@@ -1718,6 +1735,8 @@ All parameter, weight, gradient are variables in Paddle.
m
.
def
(
"is_compiled_with_xpu"
,
IsCompiledWithXPU
);
m
.
def
(
"is_compiled_with_mkldnn"
,
IsCompiledWithMKLDNN
);
m
.
def
(
"is_compiled_with_nccl"
,
IsCompiledWithNCCL
);
m
.
def
(
"is_compiled_with_mpi"
,
IsCompiledWithMPI
);
m
.
def
(
"is_compiled_with_mpi_aware"
,
IsCompiledWithMPIAWARE
);
m
.
def
(
"is_compiled_with_cinn"
,
IsCompiledWithCINN
);
m
.
def
(
"is_compiled_with_mlu"
,
IsCompiledWithMLU
);
m
.
def
(
"_is_compiled_with_heterps"
,
IsCompiledWithHETERPS
);
...
...
python/paddle/fluid/tests/unittests/collective/CMakeLists.txt
浏览文件 @
9ce31e96
...
...
@@ -323,5 +323,17 @@ if((WITH_ROCM OR WITH_GPU) AND (LINUX))
"PADDLE_DIST_UT_PORT=21532;http_proxy=;https_proxy="
)
set_tests_properties
(
test_world_size_and_rank PROPERTIES TIMEOUT
"120"
)
endif
()
if
(
WITH_MPI
)
if
(
LOCAL_ALL_ARCH
AND
(
LINUX
))
bash_test_modules
(
test_mpi_comm
START_BASH
test_mpi_comm.sh
LABELS
"RUN_TYPE=DIST"
ENVS
"PADDLE_DIST_UT_PORT=21672;http_proxy=;https_proxy="
)
endif
()
endif
()
add_subdirectory
(
fleet
)
add_subdirectory
(
multinode
)
python/paddle/fluid/tests/unittests/collective/process_group_mpi.py
0 → 100644
浏览文件 @
9ce31e96
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
unittest
import
random
import
numpy
as
np
import
os
import
shutil
import
paddle
from
paddle.fluid
import
core
from
datetime
import
timedelta
import
paddle.fluid.core
as
core
from
paddle.fluid.framework
import
_test_eager_guard
from
paddle.fluid.dygraph.parallel
import
ParallelEnv
from
paddle.distributed.collective
import
Group
from
paddle.distributed.collective
import
_group_map_by_name
from
paddle.distributed.collective
import
_default_group_name
from
paddle.distributed.collective
import
_set_group_map
from
paddle.distributed.collective
import
_set_group_map_by_name
from
paddle.distributed.collective
import
_set_group_map_backend
from
paddle.fluid.framework
import
_set_expected_place
import
paddle.distributed
as
dist
import
ctypes
ctypes
.
CDLL
(
"libmpi.so"
,
mode
=
ctypes
.
RTLD_GLOBAL
)
def
init_process_group
(
strategy
=
None
):
gid
=
0
pg
=
core
.
ProcessGroupMPI
.
create
([],
gid
)
rank
=
pg
.
get_rank
()
world_size
=
pg
.
get_world_size
()
# support CPU
place
=
core
.
CPUPlace
()
_set_expected_place
(
place
)
group
=
Group
(
rank
,
world_size
,
id
=
0
,
ranks
=
list
(
range
(
world_size
)),
pg
=
pg
,
name
=
_default_group_name
)
_set_group_map_by_name
(
_default_group_name
,
group
)
_set_group_map
(
gid
,
group
)
_set_group_map_backend
(
group
,
"mpi"
)
return
group
def
test_allreduce_sum
(
pg
,
shape
,
dtype
):
# rank 0
x
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
tensor_x
=
paddle
.
to_tensor
(
x
)
# rank 1
y
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
tensor_y
=
paddle
.
to_tensor
(
y
)
sum_result
=
tensor_x
+
tensor_y
if
pg
.
rank
()
==
0
:
task
=
dist
.
all_reduce
(
tensor_x
)
assert
np
.
array_equal
(
tensor_x
,
sum_result
)
else
:
task
=
dist
.
all_reduce
(
tensor_y
)
assert
np
.
array_equal
(
tensor_y
,
sum_result
)
print
(
"test allreduce sum api ok"
)
def
test_allreduce_max
(
pg
,
shape
,
dtype
):
# rank 0
x
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
tensor_x
=
paddle
.
to_tensor
(
x
)
# rank 1
y
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
tensor_y
=
paddle
.
to_tensor
(
y
)
max_result
=
paddle
.
maximum
(
tensor_x
,
tensor_y
)
if
pg
.
rank
()
==
0
:
task
=
dist
.
all_reduce
(
tensor_x
,
dist
.
ReduceOp
.
MAX
,
use_calc_stream
=
False
)
task
.
wait
()
assert
np
.
array_equal
(
tensor_x
,
max_result
)
else
:
task
=
dist
.
all_reduce
(
tensor_y
,
dist
.
ReduceOp
.
MAX
,
use_calc_stream
=
False
)
task
.
wait
()
assert
np
.
array_equal
(
tensor_y
,
max_result
)
print
(
"test allreduce max api ok"
)
def
test_allreduce_min
(
pg
,
shape
,
dtype
):
# rank 0
x
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
tensor_x
=
paddle
.
to_tensor
(
x
)
# rank 1
y
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
tensor_y
=
paddle
.
to_tensor
(
y
)
min_result
=
paddle
.
minimum
(
tensor_x
,
tensor_y
)
if
pg
.
rank
()
==
0
:
task
=
dist
.
all_reduce
(
tensor_x
,
dist
.
ReduceOp
.
MIN
,
use_calc_stream
=
False
)
task
.
wait
()
assert
np
.
array_equal
(
tensor_x
,
min_result
)
else
:
task
=
dist
.
all_reduce
(
tensor_y
,
dist
.
ReduceOp
.
MIN
,
use_calc_stream
=
False
)
task
.
wait
()
assert
np
.
array_equal
(
tensor_y
,
min_result
)
print
(
"test allreduce min api ok"
)
def
test_allreduce_prod
(
pg
,
shape
,
dtype
):
# rank 0
x
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
tensor_x
=
paddle
.
to_tensor
(
x
)
# rank 1
y
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
tensor_y
=
paddle
.
to_tensor
(
y
)
prod_result
=
np
.
multiply
(
x
,
y
)
if
pg
.
rank
()
==
0
:
task
=
dist
.
all_reduce
(
tensor_x
,
dist
.
ReduceOp
.
PROD
,
use_calc_stream
=
False
)
task
.
wait
()
assert
np
.
array_equal
(
tensor_x
,
prod_result
)
else
:
task
=
dist
.
all_reduce
(
tensor_y
,
dist
.
ReduceOp
.
PROD
,
use_calc_stream
=
False
)
task
.
wait
()
assert
np
.
array_equal
(
tensor_y
,
prod_result
)
print
(
"test allreduce prod api ok"
)
def
test_broadcast
(
pg
,
shape
,
dtype
):
# rank 0
x
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
tensor_x
=
paddle
.
to_tensor
(
x
)
# rank 1
y
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
tensor_y
=
paddle
.
to_tensor
(
y
)
broadcast_result
=
paddle
.
assign
(
tensor_x
)
if
pg
.
rank
()
==
0
:
task
=
dist
.
broadcast
(
tensor_x
,
0
,
use_calc_stream
=
False
)
task
.
synchronize
()
assert
task
.
is_completed
()
assert
np
.
array_equal
(
broadcast_result
,
tensor_x
)
else
:
task
=
dist
.
broadcast
(
tensor_y
,
0
)
assert
np
.
array_equal
(
broadcast_result
,
tensor_y
)
print
(
"test broadcast api ok"
)
def
test_barrair
(
pg
):
# rank 0
if
pg
.
rank
()
==
0
:
dist
.
barrier
()
# rank 1
else
:
task
=
pg
.
barrier
()
task
.
wait
()
print
(
"test barrier api ok
\n
"
)
def
test_allgather
(
pg
,
shape
,
dtype
):
# rank 0
x
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
y
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
tensor_x
=
paddle
.
to_tensor
(
x
)
tensor_y
=
paddle
.
to_tensor
(
y
)
out_shape
=
list
(
shape
)
out_shape
[
0
]
*=
2
out
=
np
.
random
.
random
(
out_shape
).
astype
(
dtype
)
tensor_out
=
paddle
.
to_tensor
(
out
)
if
pg
.
rank
()
==
0
:
task
=
pg
.
all_gather
(
tensor_x
,
tensor_out
)
task
.
wait
()
# rank 1
else
:
tensor_out_list
=
[
paddle
.
empty_like
(
tensor_x
),
paddle
.
empty_like
(
tensor_x
)
]
task
=
dist
.
all_gather
(
tensor_out_list
,
tensor_y
,
use_calc_stream
=
False
)
tensor_out
=
paddle
.
concat
(
tensor_out_list
)
out_1
=
paddle
.
slice
(
tensor_out
,
[
0
],
[
0
],
[
out_shape
[
0
]
//
2
])
out_2
=
paddle
.
slice
(
tensor_out
,
[
0
],
[
out_shape
[
0
]
//
2
],
[
out_shape
[
0
]])
assert
np
.
array_equal
(
tensor_x
,
out_1
)
assert
np
.
array_equal
(
tensor_y
,
out_2
)
print
(
"test allgather api ok
\n
"
)
if
pg
.
rank
()
==
0
:
task
=
pg
.
all_gather
(
tensor_x
,
tensor_out
)
task
.
wait
()
# rank 1
else
:
tensor_out_list
=
[]
task
=
dist
.
all_gather
(
tensor_out_list
,
tensor_y
,
use_calc_stream
=
False
)
tensor_out
=
paddle
.
concat
(
tensor_out_list
)
out_1
=
paddle
.
slice
(
tensor_out
,
[
0
],
[
0
],
[
out_shape
[
0
]
//
2
])
out_2
=
paddle
.
slice
(
tensor_out
,
[
0
],
[
out_shape
[
0
]
//
2
],
[
out_shape
[
0
]])
assert
np
.
array_equal
(
tensor_x
,
out_1
)
assert
np
.
array_equal
(
tensor_y
,
out_2
)
print
(
"test allgather api2 ok
\n
"
)
def
test_all2all
(
pg
,
shape
,
dtype
):
# rank 0
x
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
y
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
out1
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
out2
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
tensor_x
=
paddle
.
to_tensor
(
x
)
tensor_y
=
paddle
.
to_tensor
(
y
)
tensor_out1
=
paddle
.
to_tensor
(
out1
)
tensor_out2
=
paddle
.
to_tensor
(
out2
)
raw_tensor_x_2
=
paddle
.
slice
(
tensor_x
,
[
0
],
[
shape
[
0
]
//
2
],
[
shape
[
0
]])
raw_tensor_y_1
=
paddle
.
slice
(
tensor_y
,
[
0
],
[
0
],
[
shape
[
0
]
//
2
])
if
pg
.
rank
()
==
0
:
task
=
pg
.
alltoall
(
tensor_x
,
tensor_out1
)
task
.
wait
()
# rank 1
else
:
in_1
,
in_2
=
paddle
.
split
(
tensor_y
,
2
)
out_1
,
out_2
=
paddle
.
split
(
tensor_out2
,
2
)
out_tensor_list
=
[
out_1
,
out_2
]
task
=
dist
.
alltoall
([
in_1
,
in_2
],
out_tensor_list
)
tensor_out2
=
paddle
.
concat
(
out_tensor_list
)
out1_2
=
paddle
.
slice
(
tensor_out1
,
[
0
],
[
shape
[
0
]
//
2
],
[
shape
[
0
]])
out2_1
=
paddle
.
slice
(
tensor_out2
,
[
0
],
[
0
],
[
shape
[
0
]
//
2
])
if
pg
.
rank
()
==
0
:
assert
np
.
array_equal
(
out1_2
.
numpy
(),
raw_tensor_y_1
.
numpy
())
else
:
assert
np
.
array_equal
(
out2_1
,
raw_tensor_x_2
)
print
(
"test alltoall api ok
\n
"
)
x
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
y
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
out1
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
out2
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
tensor_x
=
paddle
.
to_tensor
(
x
)
tensor_y
=
paddle
.
to_tensor
(
y
)
tensor_out1
=
paddle
.
to_tensor
(
out1
)
tensor_out2
=
paddle
.
to_tensor
(
out2
)
raw_tensor_x_2
=
paddle
.
slice
(
tensor_x
,
[
0
],
[
shape
[
0
]
//
2
],
[
shape
[
0
]])
raw_tensor_y_1
=
paddle
.
slice
(
tensor_y
,
[
0
],
[
0
],
[
shape
[
0
]
//
2
])
if
pg
.
rank
()
==
0
:
task
=
pg
.
alltoall
(
tensor_x
,
tensor_out1
)
task
.
wait
()
# rank 1
else
:
in_1
,
in_2
=
paddle
.
split
(
tensor_y
,
2
)
out_1
,
out_2
=
paddle
.
split
(
tensor_out2
,
2
)
out_tensor_list
=
[]
task
=
dist
.
alltoall
([
in_1
,
in_2
],
out_tensor_list
)
tensor_out2
=
paddle
.
concat
(
out_tensor_list
)
out1_2
=
paddle
.
slice
(
tensor_out1
,
[
0
],
[
shape
[
0
]
//
2
],
[
shape
[
0
]])
out2_1
=
paddle
.
slice
(
tensor_out2
,
[
0
],
[
0
],
[
shape
[
0
]
//
2
])
if
pg
.
rank
()
==
0
:
assert
np
.
array_equal
(
out1_2
.
numpy
(),
raw_tensor_y_1
.
numpy
())
else
:
assert
np
.
array_equal
(
out2_1
,
raw_tensor_x_2
)
print
(
"test alltoall api2 ok
\n
"
)
def
test_reduce_sum
(
pg
,
shape
,
dtype
):
# rank 0
x
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
y
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
tensor_x
=
paddle
.
to_tensor
(
x
)
tensor_y
=
paddle
.
to_tensor
(
y
)
sum_result
=
tensor_x
+
tensor_y
if
pg
.
rank
()
==
0
:
task
=
dist
.
reduce
(
tensor_x
,
0
,
use_calc_stream
=
True
)
# rank 1
else
:
task
=
dist
.
reduce
(
tensor_y
,
0
,
use_calc_stream
=
False
)
task
.
wait
()
if
pg
.
rank
()
==
0
:
assert
np
.
array_equal
(
tensor_x
,
sum_result
)
print
(
"test reduce sum api ok
\n
"
)
def
test_reduce_max
(
pg
,
shape
,
dtype
):
# rank 0
x
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
tensor_x
=
paddle
.
to_tensor
(
x
)
# rank 1
y
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
tensor_y
=
paddle
.
to_tensor
(
y
)
max_result
=
paddle
.
maximum
(
tensor_x
,
tensor_y
)
if
pg
.
rank
()
==
0
:
task
=
dist
.
reduce
(
tensor_x
,
0
,
dist
.
ReduceOp
.
MAX
,
use_calc_stream
=
False
)
task
.
wait
()
assert
np
.
array_equal
(
tensor_x
,
max_result
)
else
:
task
=
dist
.
reduce
(
tensor_y
,
0
,
dist
.
ReduceOp
.
MAX
,
use_calc_stream
=
False
)
task
.
wait
()
print
(
"test reduce max api ok"
)
def
test_reduce_min
(
pg
,
shape
,
dtype
):
# rank 0
x
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
tensor_x
=
paddle
.
to_tensor
(
x
)
# rank 1
y
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
tensor_y
=
paddle
.
to_tensor
(
y
)
min_result
=
paddle
.
minimum
(
tensor_x
,
tensor_y
)
if
pg
.
rank
()
==
0
:
task
=
dist
.
reduce
(
tensor_x
,
0
,
dist
.
ReduceOp
.
MIN
,
use_calc_stream
=
False
)
task
.
wait
()
assert
np
.
array_equal
(
tensor_x
,
min_result
)
else
:
task
=
dist
.
reduce
(
tensor_y
,
0
,
dist
.
ReduceOp
.
MIN
,
use_calc_stream
=
False
)
task
.
wait
()
print
(
"test reduce min api ok"
)
def
test_reduce_prod
(
pg
,
shape
,
dtype
):
# rank 0
x
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
tensor_x
=
paddle
.
to_tensor
(
x
)
# rank 1
y
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
tensor_y
=
paddle
.
to_tensor
(
y
)
prod_result
=
np
.
multiply
(
x
,
y
)
if
pg
.
rank
()
==
0
:
task
=
dist
.
reduce
(
tensor_x
,
0
,
dist
.
ReduceOp
.
PROD
,
use_calc_stream
=
False
)
task
.
wait
()
assert
np
.
array_equal
(
tensor_x
,
prod_result
)
else
:
task
=
dist
.
reduce
(
tensor_y
,
0
,
dist
.
ReduceOp
.
PROD
,
use_calc_stream
=
False
)
task
.
wait
()
print
(
"test reduce prod api ok"
)
def
test_scatter
(
pg
,
shape
,
dtype
):
# rank 0
in_shape
=
list
(
shape
)
in_shape
[
0
]
*=
2
x
=
np
.
random
.
random
(
in_shape
).
astype
(
dtype
)
y
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
tensor_x
=
paddle
.
to_tensor
(
x
)
tensor_y
=
paddle
.
to_tensor
(
y
)
if
pg
.
rank
()
==
0
:
in_1
,
in_2
=
paddle
.
split
(
tensor_x
,
2
)
task
=
dist
.
scatter
(
tensor_y
,
[
in_1
,
in_2
],
0
,
use_calc_stream
=
True
)
# rank 1
else
:
task
=
dist
.
scatter
(
tensor_y
,
[],
0
,
use_calc_stream
=
False
)
task
.
wait
()
out1
=
paddle
.
slice
(
tensor_x
,
[
0
],
[
0
],
[
shape
[
0
]])
out2
=
paddle
.
slice
(
tensor_x
,
[
0
],
[
shape
[
0
]],
[
shape
[
0
]
*
2
])
if
pg
.
rank
()
==
0
:
assert
np
.
array_equal
(
tensor_y
,
out1
)
else
:
assert
np
.
array_equal
(
tensor_y
,
out2
)
print
(
"test scatter api ok
\n
"
)
def
test_send_recv
(
pg
,
sub_group
,
shape
,
dtype
):
# rank 0
x
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
tensor_x
=
paddle
.
to_tensor
(
x
)
# rank 1
y
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
tensor_y
=
paddle
.
to_tensor
(
y
)
if
pg
.
rank
()
==
0
:
task
=
dist
.
send
(
tensor_x
,
1
,
group
=
sub_group
,
use_calc_stream
=
False
)
task
.
wait
()
elif
pg
.
rank
()
==
1
:
task
=
dist
.
recv
(
tensor_y
,
0
,
group
=
sub_group
,
use_calc_stream
=
False
)
task
.
wait
()
assert
np
.
array_equal
(
tensor_y
,
tensor_x
)
print
(
"test send api ok"
)
# test send min
# rank 0
x
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
tensor_x
=
paddle
.
to_tensor
(
x
)
# rank 1
y
=
np
.
random
.
random
(
shape
).
astype
(
dtype
)
tensor_y
=
paddle
.
to_tensor
(
y
)
if
pg
.
rank
()
==
0
:
task
=
dist
.
send
(
tensor_x
,
1
,
group
=
sub_group
,
use_calc_stream
=
True
)
elif
pg
.
rank
()
==
1
:
task
=
dist
.
recv
(
tensor_y
,
0
,
group
=
sub_group
,
use_calc_stream
=
True
)
assert
np
.
array_equal
(
tensor_y
,
tensor_x
)
print
(
"test send api ok"
)
class
TestProcessGroup
(
unittest
.
TestCase
):
def
setUp
(
self
):
paddle
.
seed
(
2022
)
random
.
seed
(
2022
)
np
.
random
.
seed
(
2022
)
self
.
config
()
def
config
(
self
):
self
.
dtype
=
"float32"
self
.
shape
=
(
2
,
10
,
5
)
def
test_create_process_group_mpi
(
self
):
with
_test_eager_guard
():
group
=
init_process_group
()
pg
=
group
.
process_group
# test allreduce sum
test_allreduce_sum
(
pg
,
self
.
shape
,
self
.
dtype
)
# test allreduce max
test_allreduce_max
(
pg
,
self
.
shape
,
self
.
dtype
)
# test allreduce min
test_allreduce_min
(
pg
,
self
.
shape
,
self
.
dtype
)
# test allreduce prod
test_allreduce_prod
(
pg
,
self
.
shape
,
self
.
dtype
)
# test broadcast
test_broadcast
(
pg
,
self
.
shape
,
self
.
dtype
)
# test barrier
test_barrair
(
pg
)
# test allgather
test_allgather
(
pg
,
self
.
shape
,
self
.
dtype
)
# test alltoall
test_all2all
(
pg
,
self
.
shape
,
self
.
dtype
)
# test Reduce
test_reduce_sum
(
pg
,
self
.
shape
,
self
.
dtype
)
# test reduce max
test_reduce_max
(
pg
,
self
.
shape
,
self
.
dtype
)
# test reduce min
test_reduce_min
(
pg
,
self
.
shape
,
self
.
dtype
)
# test reduce product
test_reduce_prod
(
pg
,
self
.
shape
,
self
.
dtype
)
# test Scatter
test_scatter
(
pg
,
self
.
shape
,
self
.
dtype
)
# test send recv.
test_send_recv
(
pg
,
group
,
self
.
shape
,
self
.
dtype
)
if
__name__
==
"__main__"
:
unittest
.
main
()
python/paddle/fluid/tests/unittests/collective/test_mpi_comm.sh
0 → 100644
浏览文件 @
9ce31e96
#!/bin/bash
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set
-e
# use default values
export
PADDLE_DISTRI_BACKEND
=
"mpi"
cmd
=
`
which mpirun
`
if
[
${#
cmd
}
-eq
0
]
then
echo
"Warning! mpirun command not found!"
else
${
cmd
}
-x
PADDLE_DISTRI_BACKEND
-np
2
--allow-run-as-root
python3.8 process_group_mpi.py
fi
python/paddle/fluid/tests/unittests/collective/testslist.csv
浏览文件 @
9ce31e96
...
...
@@ -38,3 +38,4 @@ test_eager_dist_api,linux,gpu;rocm,120,DIST,test_runner.py,2,,http_proxy=;https_
test_gen_nccl_id_op,,gpu;rocm;ASCEND;ASCEND_CL,,DIST,../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=..,
test_new_group_api,linux,gpu;rocm,120,DIST,test_runner.py,2,,http_proxy=;https_proxy=;PYTHONPATH=..,
test_world_size_and_rank,linux,rocm;gpu,120,DIST,test_world_size_and_rank.sh,2,,http_proxy=;https_proxy=,
test_mpi_comm,linux,,,DIST,test_mpi_comm.sh,2,,http_proxy=;https_proxy=,WITH_MPI
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录