未验证 提交 ae867a84 编写于 作者: H Haohongxiang 提交者: GitHub

[Dygraph] Fix bugs of supporting ProcessGroupNCCL on DCU (#43682)

* fix bugs

* update

* update

* update

* code style

* code style check
上级 292b7254
...@@ -129,7 +129,7 @@ endif() ...@@ -129,7 +129,7 @@ endif()
if(NOT ON_INFER) if(NOT ON_INFER)
set(PYBIND_DEPS ${PYBIND_DEPS} processgroup eager_reducer) set(PYBIND_DEPS ${PYBIND_DEPS} processgroup eager_reducer)
if(WITH_NCCL) if(WITH_NCCL OR WITH_RCCL)
set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_nccl) set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_nccl)
if(WITH_PSCORE) if(WITH_PSCORE)
set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_heter) set(PYBIND_DEPS ${PYBIND_DEPS} processgroup_heter)
......
...@@ -31,7 +31,7 @@ limitations under the License. */ ...@@ -31,7 +31,7 @@ limitations under the License. */
#include "paddle/fluid/pybind/eager_utils.h" #include "paddle/fluid/pybind/eager_utils.h"
#include "paddle/phi/api/all.h" #include "paddle/phi/api/all.h"
#if defined(PADDLE_WITH_NCCL) #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
#include "paddle/fluid/distributed/collective/ProcessGroupNCCL.h" #include "paddle/fluid/distributed/collective/ProcessGroupNCCL.h"
#endif #endif
...@@ -61,11 +61,15 @@ std::shared_ptr<distributed::EagerReducer> CreateEagerReducer( ...@@ -61,11 +61,15 @@ std::shared_ptr<distributed::EagerReducer> CreateEagerReducer(
const std::vector<std::vector<size_t>> &group_indices, const std::vector<std::vector<size_t>> &group_indices,
const std::vector<bool> &is_sparse_gradient, const std::vector<bool> &is_sparse_gradient,
std::shared_ptr<distributed::ProcessGroup> process_group, std::shared_ptr<distributed::ProcessGroup> process_group,
const std::vector<size_t> &group_size_limits, bool find_unused_parameters) { const std::vector<size_t> &group_size_limits,
bool find_unused_parameters) {
auto params = CastPyArg2VectorOfTensor(py_tensors.ptr(), 0); auto params = CastPyArg2VectorOfTensor(py_tensors.ptr(), 0);
return std::make_shared<distributed::EagerReducer>( return std::make_shared<distributed::EagerReducer>(params,
params, group_indices, is_sparse_gradient, process_group, group_indices,
group_size_limits, find_unused_parameters); is_sparse_gradient,
process_group,
group_size_limits,
find_unused_parameters);
} }
#if defined(PADDLE_WITH_GLOO) #if defined(PADDLE_WITH_GLOO)
...@@ -111,7 +115,8 @@ void BindDistributed(py::module *m) { ...@@ -111,7 +115,8 @@ void BindDistributed(py::module *m) {
.def("name", &distributed::ProcessGroup::GetBackendName) .def("name", &distributed::ProcessGroup::GetBackendName)
.def( .def(
"allreduce", "allreduce",
[](distributed::ProcessGroup &self, py::handle py_tensor, [](distributed::ProcessGroup &self,
py::handle py_tensor,
distributed::ReduceOp op) { distributed::ReduceOp op) {
auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0);
distributed::AllreduceOptions opts; distributed::AllreduceOptions opts;
...@@ -121,12 +126,14 @@ void BindDistributed(py::module *m) { ...@@ -121,12 +126,14 @@ void BindDistributed(py::module *m) {
std::vector<phi::DenseTensor> tensors = {*dense}; std::vector<phi::DenseTensor> tensors = {*dense};
return self.AllReduce(tensors, tensors, opts); return self.AllReduce(tensors, tensors, opts);
}, },
py::arg("tensor"), py::arg("op") = distributed::ReduceOp::SUM, py::arg("tensor"),
py::arg("op") = distributed::ReduceOp::SUM,
py::call_guard<py::gil_scoped_release>()) py::call_guard<py::gil_scoped_release>())
.def( .def(
"broadcast", "broadcast",
[](distributed::ProcessGroup &self, py::handle py_tensor, [](distributed::ProcessGroup &self,
py::handle py_tensor,
int source_rank) { int source_rank) {
auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0);
distributed::BroadcastOptions opts; distributed::BroadcastOptions opts;
...@@ -136,7 +143,8 @@ void BindDistributed(py::module *m) { ...@@ -136,7 +143,8 @@ void BindDistributed(py::module *m) {
std::vector<phi::DenseTensor> tensors = {*dense}; std::vector<phi::DenseTensor> tensors = {*dense};
return self.Broadcast(tensors, tensors, opts); return self.Broadcast(tensors, tensors, opts);
}, },
py::arg("tensor"), py::arg("source_rank"), py::arg("tensor"),
py::arg("source_rank"),
py::call_guard<py::gil_scoped_release>()) py::call_guard<py::gil_scoped_release>())
.def( .def(
...@@ -151,7 +159,8 @@ void BindDistributed(py::module *m) { ...@@ -151,7 +159,8 @@ void BindDistributed(py::module *m) {
.def( .def(
"send", "send",
[](distributed::ProcessGroup &self, py::handle py_tensor, [](distributed::ProcessGroup &self,
py::handle py_tensor,
int dst) { int dst) {
auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0);
auto dense = auto dense =
...@@ -159,12 +168,14 @@ void BindDistributed(py::module *m) { ...@@ -159,12 +168,14 @@ void BindDistributed(py::module *m) {
std::vector<phi::DenseTensor> tensors = {*dense}; std::vector<phi::DenseTensor> tensors = {*dense};
return self.Send(tensors, dst); return self.Send(tensors, dst);
}, },
py::arg("tensor"), py::arg("dst"), py::arg("tensor"),
py::arg("dst"),
py::call_guard<py::gil_scoped_release>()) py::call_guard<py::gil_scoped_release>())
.def( .def(
"recv", "recv",
[](distributed::ProcessGroup &self, py::handle py_tensor, [](distributed::ProcessGroup &self,
py::handle py_tensor,
int src) { int src) {
auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0);
auto dense = auto dense =
...@@ -172,12 +183,14 @@ void BindDistributed(py::module *m) { ...@@ -172,12 +183,14 @@ void BindDistributed(py::module *m) {
std::vector<phi::DenseTensor> tensors = {*dense}; std::vector<phi::DenseTensor> tensors = {*dense};
return self.Recv(tensors, src); return self.Recv(tensors, src);
}, },
py::arg("tensor"), py::arg("src"), py::arg("tensor"),
py::arg("src"),
py::call_guard<py::gil_scoped_release>()) py::call_guard<py::gil_scoped_release>())
.def( .def(
"all_gather", "all_gather",
[](distributed::ProcessGroup &self, py::handle py_in_tensor, [](distributed::ProcessGroup &self,
py::handle py_in_tensor,
py::handle py_out_tensor) { py::handle py_out_tensor) {
auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0);
auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0);
...@@ -189,12 +202,14 @@ void BindDistributed(py::module *m) { ...@@ -189,12 +202,14 @@ void BindDistributed(py::module *m) {
std::vector<phi::DenseTensor> out_tensors = {*out_dense}; std::vector<phi::DenseTensor> out_tensors = {*out_dense};
return self.AllGather(in_tensors, out_tensors); return self.AllGather(in_tensors, out_tensors);
}, },
py::arg("in"), py::arg("out"), py::arg("in"),
py::arg("out"),
py::call_guard<py::gil_scoped_release>()) py::call_guard<py::gil_scoped_release>())
.def( .def(
"alltoall", "alltoall",
[](distributed::ProcessGroup &self, py::handle py_in_tensor, [](distributed::ProcessGroup &self,
py::handle py_in_tensor,
py::handle py_out_tensor) { py::handle py_out_tensor) {
auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0);
auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0);
...@@ -206,13 +221,16 @@ void BindDistributed(py::module *m) { ...@@ -206,13 +221,16 @@ void BindDistributed(py::module *m) {
std::vector<phi::DenseTensor> out_tensors = {*out_dense}; std::vector<phi::DenseTensor> out_tensors = {*out_dense};
return self.AllToAll(in_tensors, out_tensors); return self.AllToAll(in_tensors, out_tensors);
}, },
py::arg("in"), py::arg("out"), py::arg("in"),
py::arg("out"),
py::call_guard<py::gil_scoped_release>()) py::call_guard<py::gil_scoped_release>())
.def( .def(
"reduce", "reduce",
[](distributed::ProcessGroup &self, py::handle py_in_tensor, [](distributed::ProcessGroup &self,
int dst, distributed::ReduceOp op) { py::handle py_in_tensor,
int dst,
distributed::ReduceOp op) {
auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0);
distributed::ReduceOptions opts; distributed::ReduceOptions opts;
opts.reduce_op = op; opts.reduce_op = op;
...@@ -222,14 +240,17 @@ void BindDistributed(py::module *m) { ...@@ -222,14 +240,17 @@ void BindDistributed(py::module *m) {
std::vector<phi::DenseTensor> tensors = {*dense}; std::vector<phi::DenseTensor> tensors = {*dense};
return self.Reduce(tensors, tensors, opts); return self.Reduce(tensors, tensors, opts);
}, },
py::arg("tensor"), py::arg("dst"), py::arg("tensor"),
py::arg("dst"),
py::arg("op") = distributed::ReduceOp::SUM, py::arg("op") = distributed::ReduceOp::SUM,
py::call_guard<py::gil_scoped_release>()) py::call_guard<py::gil_scoped_release>())
.def( .def(
"scatter", "scatter",
[](distributed::ProcessGroup &self, py::handle py_in_tensor, [](distributed::ProcessGroup &self,
py::handle py_out_tensor, int src) { py::handle py_in_tensor,
py::handle py_out_tensor,
int src) {
auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0);
auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0);
distributed::ScatterOptions opts; distributed::ScatterOptions opts;
...@@ -242,17 +263,25 @@ void BindDistributed(py::module *m) { ...@@ -242,17 +263,25 @@ void BindDistributed(py::module *m) {
std::vector<phi::DenseTensor> out_tensors = {*out_dense}; std::vector<phi::DenseTensor> out_tensors = {*out_dense};
return self.Scatter(in_tensors, out_tensors, opts); return self.Scatter(in_tensors, out_tensors, opts);
}, },
py::arg("in"), py::arg("out"), py::arg("src"), py::arg("in"),
py::arg("out"),
py::arg("src"),
py::call_guard<py::gil_scoped_release>()); py::call_guard<py::gil_scoped_release>());
#if defined(PADDLE_WITH_NCCL) #if defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL)
py::class_<distributed::ProcessGroupNCCL, py::class_<distributed::ProcessGroupNCCL,
std::shared_ptr<distributed::ProcessGroupNCCL>>( std::shared_ptr<distributed::ProcessGroupNCCL>>(
*m, "ProcessGroupNCCL", ProcessGroup) *m, "ProcessGroupNCCL", ProcessGroup)
.def(py::init<const std::shared_ptr<distributed::Store> &, int, int, .def(py::init<const std::shared_ptr<distributed::Store> &,
const platform::CUDAPlace &, int>(), int,
py::arg("store"), py::arg("rank"), py::arg("world_size"), int,
py::arg("place"), py::arg("group_id") = 0, const platform::CUDAPlace &,
int>(),
py::arg("store"),
py::arg("rank"),
py::arg("world_size"),
py::arg("place"),
py::arg("group_id") = 0,
py::call_guard<py::gil_scoped_release>()); py::call_guard<py::gil_scoped_release>());
#endif #endif
...@@ -261,29 +290,53 @@ void BindDistributed(py::module *m) { ...@@ -261,29 +290,53 @@ void BindDistributed(py::module *m) {
py::class_<distributed::ProcessGroupHeter, py::class_<distributed::ProcessGroupHeter,
std::shared_ptr<distributed::ProcessGroupHeter>>( std::shared_ptr<distributed::ProcessGroupHeter>>(
*m, "ProcessGroupHeter", ProcessGroup) *m, "ProcessGroupHeter", ProcessGroup)
.def(py::init<const std::shared_ptr<distributed::Store> &, int, int, .def(py::init<const std::shared_ptr<distributed::Store> &,
int,
int,
#if defined(PADDLE_WITH_ASCEND_CL) #if defined(PADDLE_WITH_ASCEND_CL)
const platform::NPUPlace &, const platform::NPUPlace &,
#else #else
const platform::CUDAPlace &, const platform::CUDAPlace &,
#endif #endif
int, int, int, int, int, bool, std::string, int, int>(), int,
py::arg("store"), py::arg("rank"), py::arg("world_size"), int,
py::arg("place"), py::arg("gid") = 0, py::arg("local_rank") = 0, int,
py::arg("local_size") = 1, py::arg("gloo_rank") = 0, int,
py::arg("gloo_size") = 1, py::arg("with_switch") = false, int,
py::arg("switch_endpoint") = "", py::arg("src_rank") = "", bool,
py::arg("dst_rank") = "", py::call_guard<py::gil_scoped_release>()); std::string,
int,
int>(),
py::arg("store"),
py::arg("rank"),
py::arg("world_size"),
py::arg("place"),
py::arg("gid") = 0,
py::arg("local_rank") = 0,
py::arg("local_size") = 1,
py::arg("gloo_rank") = 0,
py::arg("gloo_size") = 1,
py::arg("with_switch") = false,
py::arg("switch_endpoint") = "",
py::arg("src_rank") = "",
py::arg("dst_rank") = "",
py::call_guard<py::gil_scoped_release>());
#endif #endif
#if defined(PADDLE_WITH_ASCEND_CL) #if defined(PADDLE_WITH_ASCEND_CL)
py::class_<distributed::ProcessGroupHCCL, py::class_<distributed::ProcessGroupHCCL,
std::shared_ptr<distributed::ProcessGroupHCCL>>( std::shared_ptr<distributed::ProcessGroupHCCL>>(
*m, "ProcessGroupHCCL", ProcessGroup) *m, "ProcessGroupHCCL", ProcessGroup)
.def(py::init<const std::shared_ptr<distributed::Store> &, int, int, .def(py::init<const std::shared_ptr<distributed::Store> &,
const platform::NPUPlace &, int>(), int,
py::arg("store"), py::arg("rank"), py::arg("world_size"), int,
py::arg("place"), py::arg("group_id") = 0, const platform::NPUPlace &,
int>(),
py::arg("store"),
py::arg("rank"),
py::arg("world_size"),
py::arg("place"),
py::arg("group_id") = 0,
py::call_guard<py::gil_scoped_release>()); py::call_guard<py::gil_scoped_release>());
#endif #endif
...@@ -291,22 +344,29 @@ void BindDistributed(py::module *m) { ...@@ -291,22 +344,29 @@ void BindDistributed(py::module *m) {
py::class_<distributed::ProcessGroup::Task, py::class_<distributed::ProcessGroup::Task,
std::shared_ptr<distributed::ProcessGroup::Task>>(*m, "task") std::shared_ptr<distributed::ProcessGroup::Task>>(*m, "task")
.def("is_completed", &distributed::ProcessGroup::Task::IsCompleted) .def("is_completed", &distributed::ProcessGroup::Task::IsCompleted)
.def("wait", &distributed::ProcessGroup::Task::Wait, .def("wait",
&distributed::ProcessGroup::Task::Wait,
py::arg("timeout") = kWaitTimeout, py::arg("timeout") = kWaitTimeout,
py::call_guard<py::gil_scoped_release>()) py::call_guard<py::gil_scoped_release>())
.def("synchronize", &distributed::ProcessGroup::Task::Synchronize, .def("synchronize",
&distributed::ProcessGroup::Task::Synchronize,
py::call_guard<py::gil_scoped_release>()); py::call_guard<py::gil_scoped_release>());
#if defined(PADDLE_WITH_GLOO) #if defined(PADDLE_WITH_GLOO)
py::class_<ProcessGroupGloo, std::shared_ptr<ProcessGroupGloo>>( py::class_<ProcessGroupGloo, std::shared_ptr<ProcessGroupGloo>>(
*m, "ProcessGroupGloo", ProcessGroup) *m, "ProcessGroupGloo", ProcessGroup)
.def(py::init<const std::shared_ptr<paddle::distributed::Store> &, int, .def(py::init<const std::shared_ptr<paddle::distributed::Store> &,
int, const platform::CPUPlace &, int, int,
int,
const platform::CPUPlace &,
int,
std::shared_ptr<GlooOptions> &>(), std::shared_ptr<GlooOptions> &>(),
py::call_guard<py::gil_scoped_release>()) py::call_guard<py::gil_scoped_release>())
.def(py::init([](const std::shared_ptr<paddle::distributed::Store> &store, .def(py::init([](const std::shared_ptr<paddle::distributed::Store> &store,
int rank, int world_size, int rank,
const platform::CPUPlace &place, int gid) { int world_size,
const platform::CPUPlace &place,
int gid) {
auto opts = GlooOptions::create(); auto opts = GlooOptions::create();
char *ifname = getenv(GLOO_SOCKET_IFNAME_ENV.c_str()); char *ifname = getenv(GLOO_SOCKET_IFNAME_ENV.c_str());
if (ifname && strlen(ifname) > 1) { if (ifname && strlen(ifname) > 1) {
...@@ -315,11 +375,14 @@ void BindDistributed(py::module *m) { ...@@ -315,11 +375,14 @@ void BindDistributed(py::module *m) {
} else { } else {
opts->device = ProcessGroupGloo::createDefaultDevice(); opts->device = ProcessGroupGloo::createDefaultDevice();
} }
return std::make_shared<ProcessGroupGloo>(store, rank, world_size, return std::make_shared<ProcessGroupGloo>(
place, gid, opts); store, rank, world_size, place, gid, opts);
}), }),
py::arg("store"), py::arg("rank"), py::arg("world_size"), py::arg("store"),
py::arg("place"), py::arg("group_id") = 0, py::arg("rank"),
py::arg("world_size"),
py::arg("place"),
py::arg("group_id") = 0,
py::call_guard<py::gil_scoped_release>()) py::call_guard<py::gil_scoped_release>())
.def_static("create_default_device", .def_static("create_default_device",
&ProcessGroupGloo::createDefaultDevice); &ProcessGroupGloo::createDefaultDevice);
...@@ -327,21 +390,23 @@ void BindDistributed(py::module *m) { ...@@ -327,21 +390,23 @@ void BindDistributed(py::module *m) {
m->def( m->def(
"eager_assign_group_by_size", "eager_assign_group_by_size",
[](py::handle py_tensors, std::vector<bool> is_sparse_gradient, [](py::handle py_tensors,
std::vector<bool> is_sparse_gradient,
std::vector<size_t> group_size_limits, std::vector<size_t> group_size_limits,
std::vector<int64_t> tensor_indices) { std::vector<int64_t> tensor_indices) {
auto tensors = CastPyArg2VectorOfTensor(py_tensors.ptr(), 0); auto tensors = CastPyArg2VectorOfTensor(py_tensors.ptr(), 0);
return distributed::Eager_AssignGroupBySize( return distributed::Eager_AssignGroupBySize(
tensors, is_sparse_gradient, group_size_limits, tensor_indices); tensors, is_sparse_gradient, group_size_limits, tensor_indices);
}, },
py::arg("tensors"), py::arg("is_sparse_gradient"), py::arg("tensors"),
py::arg("is_sparse_gradient"),
py::arg("group_size_limits") = std::vector<size_t>{25 * 1024 * 1024}, py::arg("group_size_limits") = std::vector<size_t>{25 * 1024 * 1024},
py::arg("tensor_indices") = std::vector<int64_t>{}, py::arg("tensor_indices") = std::vector<int64_t>{},
py::call_guard<py::gil_scoped_release>()); py::call_guard<py::gil_scoped_release>());
py::class_<distributed::EagerReducer, py::class_<distributed::EagerReducer,
std::shared_ptr<distributed::EagerReducer>>(*m, "EagerReducer", std::shared_ptr<distributed::EagerReducer>>(
R"DOC()DOC") *m, "EagerReducer", R"DOC()DOC")
.def(py::init(&CreateEagerReducer)) .def(py::init(&CreateEagerReducer))
.def( .def(
"prepare_for_backward", "prepare_for_backward",
...@@ -349,7 +414,8 @@ void BindDistributed(py::module *m) { ...@@ -349,7 +414,8 @@ void BindDistributed(py::module *m) {
auto params = CastPyArg2VectorOfTensor(py_tensors.ptr(), 0); auto params = CastPyArg2VectorOfTensor(py_tensors.ptr(), 0);
self.PrepareForBackward(params); self.PrepareForBackward(params);
}, },
py::arg("tensors"), py::call_guard<py::gil_scoped_release>()); py::arg("tensors"),
py::call_guard<py::gil_scoped_release>());
} }
} // end namespace pybind } // end namespace pybind
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册