/* Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ #include #ifdef _POSIX_C_SOURCE #undef _POSIX_C_SOURCE #endif #ifdef _XOPEN_SOURCE #undef _XOPEN_SOURCE #endif #include "paddle/fluid/distributed/collective/ProcessGroup.h" #include "paddle/fluid/distributed/collective/ProcessGroupStream.h" #include "paddle/fluid/distributed/collective/Types.h" #include "paddle/fluid/distributed/collective/Utils.h" #include "paddle/fluid/distributed/collective/reducer.h" #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/tensor.h" #include "paddle/fluid/imperative/layer.h" #include "paddle/fluid/pybind/distributed_py.h" #include "paddle/fluid/pybind/eager_utils.h" #include "paddle/phi/api/all.h" #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) #include "paddle/fluid/distributed/collective/ProcessGroupNCCL.h" #endif #if defined(PADDLE_WITH_MPI) #include "paddle/fluid/distributed/collective/ProcessGroupMPI.h" #endif #if defined(PADDLE_WITH_ASCEND_CL) #include "paddle/fluid/distributed/collective/ProcessGroupHCCL.h" #endif #if defined(PADDLE_WITH_CUSTOM_DEVICE) #include "paddle/fluid/distributed/collective/ProcessGroupCustom.h" #endif #if defined(PADDLE_WITH_GLOO) && defined(PADDLE_WITH_PSCORE) && \ (defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_ASCEND_CL)) #include "paddle/fluid/distributed/collective/ProcessGroupHeter.h" #endif #if defined(PADDLE_WITH_GLOO) #include "paddle/fluid/distributed/collective/ProcessGroupGloo.h" #include "paddle/fluid/distributed/store/tcp_store.h" #endif #include "paddle/phi/kernels/sync_batch_norm_kernel.h" namespace py = pybind11; namespace paddle { namespace pybind { using Tensor = paddle::experimental::Tensor; std::shared_ptr CreateEagerReducer( py::handle py_tensors, const std::vector> &group_indices, const std::vector &is_sparse_gradient, std::shared_ptr process_group, const std::vector &group_size_limits, bool find_unused_parameters) { auto params = CastPyArg2VectorOfTensor(py_tensors.ptr(), 0); return std::make_shared(params, group_indices, is_sparse_gradient, process_group, group_size_limits, find_unused_parameters); } #if defined(PADDLE_WITH_GLOO) using ProcessGroupGloo = paddle::distributed::ProcessGroupGloo; using GlooStore = paddle::distributed::ProcessGroupGloo::GlooStore; using GlooOptions = paddle::distributed::ProcessGroupGloo::GlooOptions; #endif static std::string GLOO_SOCKET_IFNAME_ENV = "GLOO_SOCKET_IFNAME"; // NOLINT static UNUSED void *use_ccl_comm_func = phi::detail::GetCCLComm(phi::CPUPlace()); void BindDistributed(py::module *m) { py::enum_(*m, "ReduceOp") .value("SUM", distributed::ReduceOp::SUM) .value("AVG", distributed::ReduceOp::AVG) .value("MAX", distributed::ReduceOp::MAX) .value("MIN", distributed::ReduceOp::MIN) .value("PRODUCT", distributed::ReduceOp::PRODUCT); py::class_(*m, "AllreduceOptions") .def(py::init<>()) .def_readwrite("reduce_op", &distributed::AllreduceOptions::reduce_op); py::class_(*m, "BroadcastOptions") .def(py::init<>()) .def_readwrite("source_rank", &distributed::BroadcastOptions::source_rank) .def_readwrite("source_root", &distributed::BroadcastOptions::source_root); py::class_(*m, "BarrierOptions") .def(py::init<>()) .def_readwrite("place_ids", &distributed::BarrierOptions::place_ids); py::class_(*m, "ReduceOptions") .def(py::init<>()) .def_readwrite("reduce_op", &distributed::ReduceOptions::reduce_op) .def_readwrite("source_root", &distributed::ReduceOptions::root_rank); auto ProcessGroup = py::class_>(*m, "ProcessGroup") .def("rank", &distributed::ProcessGroup::GetRank) .def("size", &distributed::ProcessGroup::GetSize) .def("name", &distributed::ProcessGroup::GetBackendName) .def( "all_reduce", [](distributed::ProcessGroup &self, py::handle py_tensor, distributed::ReduceOp op, bool sync_op) { auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); auto p_dense = std::dynamic_pointer_cast(tensor.impl()); auto *out_dense = p_dense.get(); auto in_dense = *p_dense; distributed::AllreduceOptions opts{op}; return self.AllReduce(out_dense, in_dense, opts, sync_op); }, py::arg("tensor"), py::arg("op"), py::arg("sync_op"), py::call_guard()) .def( "broadcast", [](distributed::ProcessGroup &self, py::handle py_tensor, int src, bool sync_op) { auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); auto p_dense = std::dynamic_pointer_cast(tensor.impl()); auto *out_dense = p_dense.get(); auto in_dense = *p_dense; distributed::BroadcastOptions opts{src}; return self.Broadcast(out_dense, in_dense, opts, sync_op); }, py::arg("tensor"), py::arg("src"), py::arg("sync_op"), py::call_guard()) .def( "send", [](distributed::ProcessGroup &self, py::handle py_tensor, int dst, bool sync_op) { auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); auto dense = std::dynamic_pointer_cast(tensor.impl()); std::vector tensors = {*dense}; return self.Send(tensors, dst, sync_op); }, py::arg("tensor"), py::arg("dst"), py::arg("sync_op"), py::call_guard()) .def( "send_partial", [](distributed::ProcessGroup &self, py::handle py_tensor, int dst_rank, int nranks, int rank_id, bool sync_op) { auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); auto dense = std::dynamic_pointer_cast(tensor.impl()); int64_t numel = (*dense).numel(); int64_t send_numel = numel / nranks; int64_t offset = send_numel * rank_id; return self.Send_Partial( *dense, dst_rank, offset, send_numel, sync_op); }, py::arg("tensor"), py::arg("dst"), py::arg("num"), py::arg("id"), py::arg("sync_op"), py::call_guard()) .def( "recv", [](distributed::ProcessGroup &self, py::handle py_tensor, int src, bool sync_op) { auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); auto dense = std::dynamic_pointer_cast(tensor.impl()); std::vector tensors = {*dense}; return self.Recv(tensors, src, sync_op); }, py::arg("tensor"), py::arg("src"), py::arg("sync_op"), py::call_guard()) .def( "recv_partial", [](distributed::ProcessGroup &self, py::handle py_tensor, int src_rank, int nranks, int rank_id, bool sync_op) { auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); auto dense = std::dynamic_pointer_cast(tensor.impl()); int64_t numel = (*dense).numel(); int64_t recv_numel = numel / nranks; int64_t offset = recv_numel * rank_id; return self.Recv_Partial( *dense, src_rank, offset, recv_numel, sync_op); }, py::arg("tensor"), py::arg("src"), py::arg("num"), py::arg("id"), py::arg("sync_op"), py::call_guard()) .def( "all_gather", [](distributed::ProcessGroup &self, py::handle py_out_tensor_list, py::handle py_in_tensor, bool sync_op) { auto out_tensor_list = CastPyArg2VectorOfTensor(py_out_tensor_list.ptr(), 0); Tensor concat_out_tensor = paddle::concat(out_tensor_list, 0); auto p_out_tensor = std::dynamic_pointer_cast( concat_out_tensor.impl()); auto *out_dense = p_out_tensor.get(); auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); auto p_in_tensor = std::dynamic_pointer_cast( in_tensor.impl()); auto in_dense = *p_in_tensor; const auto &dev_ctx = self.GetDeviceContext(in_tensor.place()); auto task = self.AllGather(out_dense, in_dense, sync_op); distributed::SplitTensor(dev_ctx, *out_dense, &out_tensor_list); task->UpdateWaitChain(dev_ctx); return task; }, py::arg("out"), py::arg("in"), py::arg("sync_op"), py::call_guard()) .def( "all_gather_into_tensor", [](distributed::ProcessGroup &self, py::handle py_out_tensor, py::handle py_in_tensor, bool sync_op) { auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); auto p_out_tensor = std::dynamic_pointer_cast( out_tensor.impl()); auto *out_dense = p_out_tensor.get(); auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); auto p_in_tensor = std::dynamic_pointer_cast( in_tensor.impl()); auto in_dense = *p_in_tensor; return self.AllGather(out_dense, in_dense, sync_op); }, py::arg("out"), py::arg("in"), py::arg("sync_op"), py::call_guard()) .def( "all_to_all", [](distributed::ProcessGroup &self, py::handle py_in_tensor_list, py::handle py_out_tensor_list, bool sync_op) { auto in_tensor_list = CastPyArg2VectorOfTensor(py_in_tensor_list.ptr(), 0); Tensor concat_in_tensor = paddle::concat(in_tensor_list, 0); auto in_dense = std::dynamic_pointer_cast( concat_in_tensor.impl()); std::vector in_wrapper = {*in_dense}; auto out_tensor_list = CastPyArg2VectorOfTensor(py_out_tensor_list.ptr(), 0); Tensor concat_out_tensor = paddle::concat(out_tensor_list, 0); auto out_dense = std::dynamic_pointer_cast( concat_out_tensor.impl()); std::vector out_wrapper = {*out_dense}; // in_tensor_list should not be empty const auto &dev_ctx = self.GetDeviceContext(in_tensor_list.back().place()); auto task = self.AllToAll(in_wrapper, out_wrapper, sync_op); distributed::SplitTensor(dev_ctx, *out_dense, &out_tensor_list); task->UpdateWaitChain(dev_ctx); return task; }, py::arg("in"), py::arg("out"), py::arg("sync_op"), py::call_guard()) .def( "all_to_all_tensor", [](distributed::ProcessGroup &self, py::handle py_in_tensor, py::handle py_out_tensor, bool sync_op) { auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); auto in_dense = std::dynamic_pointer_cast( in_tensor.impl()); std::vector in_wrapper = {*in_dense}; auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); auto out_dense = std::dynamic_pointer_cast( out_tensor.impl()); std::vector out_wrapper = {*out_dense}; return self.AllToAll(in_wrapper, out_wrapper, sync_op); }, py::arg("in"), py::arg("out"), py::arg("sync_op"), py::call_guard()) .def( "all_to_all_single", [](distributed::ProcessGroup &self, py::handle py_in_tensor, py::handle py_out_tensor, std::vector &in_sizes, std::vector &out_sizes, bool sync_op) { auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); auto in_dense = std::dynamic_pointer_cast( in_tensor.impl()); std::vector in_wrapper = {*in_dense}; auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); auto out_dense = std::dynamic_pointer_cast( out_tensor.impl()); std::vector out_wrapper = {*out_dense}; return self.AllToAllSingle( in_wrapper, out_wrapper, in_sizes, out_sizes, sync_op); }, py::arg("in"), py::arg("out"), py::arg("in_sizes"), py::arg("out_sizes"), py::arg("sync_op"), py::call_guard()) .def( "reduce", [](distributed::ProcessGroup &self, py::handle py_in_tensor, int dst, distributed::ReduceOp op, bool sync_op) { auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); distributed::ReduceOptions opts{op, dst}; auto dense = std::dynamic_pointer_cast( in_tensor.impl()); std::vector tensors = {*dense}; return self.Reduce(tensors, tensors, opts, sync_op); }, py::arg("tensor"), py::arg("dst"), py::arg("op"), py::arg("sync_op"), py::call_guard()) .def( "reduce_scatter", [](distributed::ProcessGroup &self, py::handle py_in_tensor_list, py::handle py_out_tensor, distributed::ReduceOp op, bool sync_op) { auto in_tensor_list = CastPyArg2VectorOfTensor(py_in_tensor_list.ptr(), 0); Tensor concat_in_tensor = paddle::concat(in_tensor_list, 0); auto in_dense = std::dynamic_pointer_cast( concat_in_tensor.impl()); std::vector in_wrapper = {*in_dense}; auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); auto out_dense = std::dynamic_pointer_cast( out_tensor.impl()); std::vector out_wrapper = {*out_dense}; distributed::ReduceScatterOptions opts{op}; return self.ReduceScatter( in_wrapper, out_wrapper, opts, sync_op); }, py::arg("in"), py::arg("out"), py::arg("op"), py::arg("sync_op"), py::call_guard()) .def( "reduce_scatter_tensor", [](distributed::ProcessGroup &self, py::handle py_in_tensor, py::handle py_out_tensor, distributed::ReduceOp op, bool sync_op) { auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); auto in_dense = std::dynamic_pointer_cast( in_tensor.impl()); std::vector in_wrapper = {*in_dense}; auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); auto out_dense = std::dynamic_pointer_cast( out_tensor.impl()); std::vector out_wrapper = {*out_dense}; distributed::ReduceScatterOptions opts{op}; return self.ReduceScatter( in_wrapper, out_wrapper, opts, sync_op); }, py::arg("in"), py::arg("out"), py::arg("op"), py::arg("sync_op"), py::call_guard()) .def( "scatter", [](distributed::ProcessGroup &self, py::handle py_in_tensor_list, py::handle py_out_tensor, int src, bool sync_op) { auto in_tensor_list = CastPyArg2VectorOfTensor(py_in_tensor_list.ptr(), 0); Tensor concat_in_tensor = paddle::concat(in_tensor_list, 0); auto in_dense = std::dynamic_pointer_cast( concat_in_tensor.impl()); std::vector in_wrapper = {*in_dense}; auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); auto out_dense = std::dynamic_pointer_cast( out_tensor.impl()); std::vector out_wrapper = {*out_dense}; distributed::ScatterOptions opts{src}; return self.Scatter(in_wrapper, out_wrapper, opts, sync_op); }, py::arg("in"), py::arg("out"), py::arg("src"), py::arg("sync_op"), py::call_guard()) .def( "scatter_tensor", [](distributed::ProcessGroup &self, py::handle py_in_tensor, py::handle py_out_tensor, int src, bool sync_op) { auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); auto in_dense = std::dynamic_pointer_cast( in_tensor.impl()); std::vector in_wrapper = {*in_dense}; auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); auto out_dense = std::dynamic_pointer_cast( out_tensor.impl()); std::vector out_wrapper = {*out_dense}; distributed::ScatterOptions opts{src}; return self.Scatter(in_wrapper, out_wrapper, opts, sync_op); }, py::arg("in"), py::arg("out"), py::arg("src"), py::arg("sync_op"), py::call_guard()) .def( "barrier", [](distributed::ProcessGroup &self, std::vector place_ids) { distributed::BarrierOptions opts; opts.place_ids = place_ids; return self.Barrier(opts); }, py::arg("place_ids") = std::vector{}, py::call_guard()) // TODO(liyurui): Interface below will be removed in the future. .def( "allreduce", [](distributed::ProcessGroup &self, py::handle py_tensor, distributed::ReduceOp op) { auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); distributed::AllreduceOptions opts; opts.reduce_op = op; auto dense = std::dynamic_pointer_cast(tensor.impl()); std::vector tensors = {*dense}; return self.AllReduce(tensors, tensors, opts); }, py::arg("tensor"), py::arg("op") = distributed::ReduceOp::SUM, py::call_guard()) .def( "broadcast", [](distributed::ProcessGroup &self, py::handle py_tensor, int source_rank) { auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); distributed::BroadcastOptions opts; opts.source_rank = source_rank; auto dense = std::dynamic_pointer_cast(tensor.impl()); std::vector tensors = {*dense}; return self.Broadcast(tensors, tensors, opts); }, py::arg("tensor"), py::arg("source_rank"), py::call_guard()) .def( "send", [](distributed::ProcessGroup &self, py::handle py_tensor, int dst) { auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); auto dense = std::dynamic_pointer_cast(tensor.impl()); std::vector tensors = {*dense}; return self.Send(tensors, dst); }, py::arg("tensor"), py::arg("dst"), py::call_guard()) .def( "send_partial", [](distributed::ProcessGroup &self, py::handle py_tensor, int dst_rank, int nranks, int rank_id) { auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); auto dense = std::dynamic_pointer_cast(tensor.impl()); int64_t numel = (*dense).numel(); int64_t send_numel = numel / nranks; int64_t offset = send_numel * rank_id; return self.Send_Partial(*dense, dst_rank, offset, send_numel); }, py::arg("tensor"), py::arg("dst"), py::arg("num"), py::arg("id"), py::call_guard()) .def( "recv", [](distributed::ProcessGroup &self, py::handle py_tensor, int src) { auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); auto dense = std::dynamic_pointer_cast(tensor.impl()); std::vector tensors = {*dense}; return self.Recv(tensors, src); }, py::arg("tensor"), py::arg("src"), py::call_guard()) .def( "recv_partial", [](distributed::ProcessGroup &self, py::handle py_tensor, int src_rank, int nranks, int rank_id) { auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); auto dense = std::dynamic_pointer_cast(tensor.impl()); int64_t numel = (*dense).numel(); int64_t recv_numel = numel / nranks; int64_t offset = recv_numel * rank_id; return self.Recv_Partial(*dense, src_rank, offset, recv_numel); }, py::arg("tensor"), py::arg("src"), py::arg("num"), py::arg("id"), py::call_guard()) .def( "all_gather", [](distributed::ProcessGroup &self, py::handle py_in_tensor, py::handle py_out_tensor) { auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); auto in_dense = std::dynamic_pointer_cast( in_tensor.impl()); auto out_dense = std::dynamic_pointer_cast( out_tensor.impl()); std::vector in_tensors = {*in_dense}; std::vector out_tensors = {*out_dense}; return self.AllGather(in_tensors, out_tensors); }, py::arg("in"), py::arg("out"), py::call_guard()) .def( "all_gather_partial", [](distributed::ProcessGroup &self, py::handle py_in_tensor, py::handle py_out_tensor, int nranks, int rank_id) { auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); auto in_dense = std::dynamic_pointer_cast( in_tensor.impl()); auto out_dense = std::dynamic_pointer_cast( out_tensor.impl()); std::vector in_tensors = {*in_dense}; std::vector out_tensors = {*out_dense}; int64_t numel = (*in_dense).numel(); int64_t send_numel = numel / nranks; int64_t offset = send_numel * rank_id; return self.AllGather_Partial( in_tensors, out_tensors, offset, send_numel); }, py::arg("in"), py::arg("out"), py::arg("num"), py::arg("id"), py::call_guard()) .def( "alltoall", [](distributed::ProcessGroup &self, py::handle py_in_tensor, py::handle py_out_tensor) { auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); auto in_dense = std::dynamic_pointer_cast( in_tensor.impl()); auto out_dense = std::dynamic_pointer_cast( out_tensor.impl()); std::vector in_tensors = {*in_dense}; std::vector out_tensors = {*out_dense}; return self.AllToAll(in_tensors, out_tensors); }, py::arg("in"), py::arg("out"), py::call_guard()) .def( "alltoall_single", [](distributed::ProcessGroup &self, py::handle py_in_tensor, py::handle py_out_tensor, std::vector in_sizes, std::vector out_sizes) { auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); auto in_dense = std::dynamic_pointer_cast( in_tensor.impl()); auto out_dense = std::dynamic_pointer_cast( out_tensor.impl()); std::vector in_tensors = {*in_dense}; std::vector out_tensors = {*out_dense}; return self.AllToAll_Single( in_tensors, out_tensors, in_sizes, out_sizes); }, py::arg("in"), py::arg("out"), py::arg("in_sizes"), py::arg("out_sizes"), py::call_guard()) .def( "reduce", [](distributed::ProcessGroup &self, py::handle py_in_tensor, int dst, distributed::ReduceOp op) { auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); distributed::ReduceOptions opts; opts.reduce_op = op; opts.root_rank = dst; auto dense = std::dynamic_pointer_cast( in_tensor.impl()); std::vector tensors = {*dense}; return self.Reduce(tensors, tensors, opts); }, py::arg("tensor"), py::arg("dst"), py::arg("op") = distributed::ReduceOp::SUM, py::call_guard()) .def( "scatter", [](distributed::ProcessGroup &self, py::handle py_in_tensor, py::handle py_out_tensor, int src) { auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); distributed::ScatterOptions opts; opts.root_rank = src; auto in_dense = std::dynamic_pointer_cast( in_tensor.impl()); auto out_dense = std::dynamic_pointer_cast( out_tensor.impl()); std::vector in_tensors = {*in_dense}; std::vector out_tensors = {*out_dense}; return self.Scatter(in_tensors, out_tensors, opts); }, py::arg("in"), py::arg("out"), py::arg("src"), py::call_guard()); auto ProcessGroupStream = py::class_>( *m, "ProcessGroupStream", ProcessGroup) .def( "all_gather_on_calc_stream", [](distributed::ProcessGroupStream &self, py::handle py_out_tensor_list, py::handle py_in_tensor) { auto out_tensor_list = CastPyArg2VectorOfTensor(py_out_tensor_list.ptr(), 0); Tensor concat_out_tensor = paddle::concat(out_tensor_list, 0); auto p_out_tensor = std::dynamic_pointer_cast( concat_out_tensor.impl()); auto *out_dense = p_out_tensor.get(); auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); auto p_in_tensor = std::dynamic_pointer_cast( in_tensor.impl()); auto in_dense = *p_in_tensor; const auto &dev_ctx = self.GetDeviceContext(in_tensor.place(), true); auto task = self.AllGather(out_dense, in_dense, /*sync_op*/ true, /*use_calc_stream*/ true); distributed::SplitTensor(dev_ctx, *out_dense, &out_tensor_list); return task; }, py::arg("out"), py::arg("in"), py::call_guard()) .def( "all_gather_into_tensor_on_calc_stream", [](distributed::ProcessGroupStream &self, py::handle py_out_tensor, py::handle py_in_tensor) { auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); auto p_out_tensor = std::dynamic_pointer_cast( out_tensor.impl()); auto *out_dense = p_out_tensor.get(); auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); auto p_in_tensor = std::dynamic_pointer_cast( in_tensor.impl()); auto in_dense = *p_in_tensor; return self.AllGather(out_dense, in_dense, /*sync_op*/ true, /*use_calc_stream*/ true); }, py::arg("out"), py::arg("in"), py::call_guard()) .def( "all_gather_partial_on_calc_stream", [](distributed::ProcessGroupStream &self, py::handle py_in_tensor, py::handle py_out_tensor, int nranks, int rank_id) { auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); auto in_dense = std::dynamic_pointer_cast( in_tensor.impl()); auto out_dense = std::dynamic_pointer_cast( out_tensor.impl()); std::vector in_tensors = {*in_dense}; std::vector out_tensors = {*out_dense}; int64_t numel = (*in_dense).numel(); int64_t send_numel = numel / nranks; int64_t offset = send_numel * rank_id; return self.AllGather_Partial(in_tensors, out_tensors, offset, send_numel, /*sync_op*/ true, /*use_calc_stream*/ true); }, py::arg("in"), py::arg("out"), py::arg("num"), py::arg("id"), py::call_guard()) .def( "all_reduce_on_calc_stream", [](distributed::ProcessGroupStream &self, py::handle py_tensor, distributed::ReduceOp op) { auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); auto p_dense = std::dynamic_pointer_cast(tensor.impl()); auto in_dense = *p_dense; auto *out_dense = p_dense.get(); distributed::AllreduceOptions opts{op}; return self.AllReduce(out_dense, in_dense, opts, /*sync_op*/ true, /*use_calc_stream*/ true); }, py::arg("tensor"), py::arg("op") = distributed::ReduceOp::SUM, py::call_guard()) .def( "all_to_all_on_calc_stream", [](distributed::ProcessGroupStream &self, py::handle py_in_tensor_list, py::handle py_out_tensor_list) { auto in_tensor_list = CastPyArg2VectorOfTensor(py_in_tensor_list.ptr(), 0); Tensor concat_in_tensor = paddle::concat(in_tensor_list, 0); auto in_dense = std::dynamic_pointer_cast( concat_in_tensor.impl()); std::vector in_wrapper = {*in_dense}; auto out_tensor_list = CastPyArg2VectorOfTensor(py_out_tensor_list.ptr(), 0); Tensor concat_out_tensor = paddle::concat(out_tensor_list, 0); auto out_dense = std::dynamic_pointer_cast( concat_out_tensor.impl()); std::vector out_wrapper = {*out_dense}; // in_tensor_list must not be empty const auto &dev_ctx = self.GetDeviceContext( in_tensor_list.back().place(), /*use_calc_stream*/ true); auto task = self.AllToAll(in_wrapper, out_wrapper, /*sync_op*/ true, /*use_calc_stream*/ true); distributed::SplitTensor(dev_ctx, *out_dense, &out_tensor_list); return task; }, py::arg("in"), py::arg("out"), py::call_guard()) .def( "all_to_all_tensor_on_calc_stream", [](distributed::ProcessGroupStream &self, py::handle py_in_tensor, py::handle py_out_tensor) { auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); auto in_dense = std::dynamic_pointer_cast( in_tensor.impl()); std::vector in_wrapper = {*in_dense}; auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); auto out_dense = std::dynamic_pointer_cast( out_tensor.impl()); std::vector out_wrapper = {*out_dense}; return self.AllToAll(in_wrapper, out_wrapper, /*sync_op*/ true, /*use_calc_stream*/ true); }, py::arg("in"), py::arg("out"), py::call_guard()) .def( "all_to_all_single_on_calc_stream", [](distributed::ProcessGroupStream &self, py::handle py_in_tensor, py::handle py_out_tensor, std::vector &in_sizes, std::vector &out_sizes) { auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); auto in_dense = std::dynamic_pointer_cast( in_tensor.impl()); std::vector in_wrapper = {*in_dense}; auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); auto out_dense = std::dynamic_pointer_cast( out_tensor.impl()); std::vector out_wrapper = {*out_dense}; return self.AllToAllSingle(in_wrapper, out_wrapper, in_sizes, out_sizes, /*sync_op*/ true, /*use_calc_stream*/ true); }, py::arg("in"), py::arg("out"), py::arg("in_sizes"), py::arg("out_sizes"), py::call_guard()) .def( "broadcast_on_calc_stream", [](distributed::ProcessGroupStream &self, py::handle py_tensor, int src) { auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); auto p_dense = std::dynamic_pointer_cast(tensor.impl()); auto *out_dense = p_dense.get(); auto in_dense = *p_dense; distributed::BroadcastOptions opts{src}; return self.Broadcast(out_dense, in_dense, opts, /*sync_op*/ true, /*use_calc_stream*/ true); }, py::arg("tensor"), py::arg("src"), py::call_guard()) .def( "reduce_on_calc_stream", [](distributed::ProcessGroupStream &self, py::handle py_in_tensor, int dst, distributed::ReduceOp op) { auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); distributed::ReduceOptions opts{op, dst}; auto dense = std::dynamic_pointer_cast( in_tensor.impl()); std::vector tensors = {*dense}; return self.Reduce(tensors, tensors, opts, /*sync_op*/ true, /*use_calc_stream*/ true); }, py::arg("tensor"), py::arg("dst"), py::arg("op"), py::call_guard()) .def( "reduce_scatter_on_calc_stream", [](distributed::ProcessGroupStream &self, py::handle py_in_tensor_list, py::handle py_out_tensor, distributed::ReduceOp op) { auto in_tensor_list = CastPyArg2VectorOfTensor(py_in_tensor_list.ptr(), 0); Tensor concat_in_tensor = paddle::concat(in_tensor_list, 0); auto in_dense = std::dynamic_pointer_cast( concat_in_tensor.impl()); std::vector in_wrapper = {*in_dense}; auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); auto out_dense = std::dynamic_pointer_cast( out_tensor.impl()); std::vector out_wrapper = {*out_dense}; distributed::ReduceScatterOptions opts{op}; return self.ReduceScatter(in_wrapper, out_wrapper, opts, /*sync_op*/ true, /*use_calc_stream*/ true); }, py::arg("in"), py::arg("out"), py::arg("op"), py::call_guard()) .def( "reduce_scatter_tensor_on_calc_stream", [](distributed::ProcessGroupStream &self, py::handle py_in_tensor, py::handle py_out_tensor, distributed::ReduceOp op) { auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); auto in_dense = std::dynamic_pointer_cast( in_tensor.impl()); std::vector in_wrapper = {*in_dense}; auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); auto out_dense = std::dynamic_pointer_cast( out_tensor.impl()); std::vector out_wrapper = {*out_dense}; distributed::ReduceScatterOptions opts{op}; return self.ReduceScatter(in_wrapper, out_wrapper, opts, /*sync_op*/ true, /*use_calc_stream*/ true); }, py::arg("in"), py::arg("out"), py::arg("op"), py::call_guard()) .def( "scatter_on_calc_stream", [](distributed::ProcessGroupStream &self, py::handle py_in_tensor_list, py::handle py_out_tensor, int src) { auto in_tensor_list = CastPyArg2VectorOfTensor(py_in_tensor_list.ptr(), 0); Tensor concat_in_tensor = paddle::concat(in_tensor_list, 0); auto in_dense = std::dynamic_pointer_cast( concat_in_tensor.impl()); std::vector in_wrapper = {*in_dense}; auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); auto out_dense = std::dynamic_pointer_cast( out_tensor.impl()); std::vector out_wrapper = {*out_dense}; distributed::ScatterOptions opts{src}; return self.Scatter(in_wrapper, out_wrapper, opts, /*sync_op*/ true, /*use_calc_stream*/ true); }, py::arg("in"), py::arg("out"), py::arg("src"), py::call_guard()) .def( "scatter_tensor_on_calc_stream", [](distributed::ProcessGroupStream &self, py::handle py_in_tensor, py::handle py_out_tensor, int src) { auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); auto in_dense = std::dynamic_pointer_cast( in_tensor.impl()); std::vector in_wrapper = {*in_dense}; auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); auto out_dense = std::dynamic_pointer_cast( out_tensor.impl()); std::vector out_wrapper = {*out_dense}; distributed::ScatterOptions opts{src}; return self.Scatter(in_wrapper, out_wrapper, opts, /*sync_op*/ true, /*use_calc_stream*/ true); }, py::arg("in"), py::arg("out"), py::arg("src"), py::call_guard()) .def( "send_on_calc_stream", [](distributed::ProcessGroupStream &self, py::handle py_tensor, int dst) { auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); auto dense = std::dynamic_pointer_cast(tensor.impl()); std::vector tensors = {*dense}; return self.Send(tensors, dst, /*sync_op*/ true, /*use_calc_stream*/ true); }, py::arg("tensor"), py::arg("dst"), py::call_guard()) .def( "send_partial_on_calc_stream", [](distributed::ProcessGroupStream &self, py::handle py_tensor, int dst_rank, int nranks, int rank_id) { auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); auto dense = std::dynamic_pointer_cast(tensor.impl()); int64_t numel = (*dense).numel(); int64_t send_numel = numel / nranks; int64_t offset = send_numel * rank_id; return self.Send_Partial(*dense, dst_rank, offset, send_numel, /*sync_op*/ true, /*use_calc_stream*/ true); }, py::arg("tensor"), py::arg("dst"), py::arg("num"), py::arg("id"), py::call_guard()) .def( "recv_on_calc_stream", [](distributed::ProcessGroupStream &self, py::handle py_tensor, int src) { auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); auto dense = std::dynamic_pointer_cast(tensor.impl()); std::vector tensors = {*dense}; return self.Recv(tensors, src, /*sync_op*/ true, /*use_calc_stream*/ true); }, py::arg("tensor"), py::arg("src"), py::call_guard()) .def( "recv_partial_on_calc_stream", [](distributed::ProcessGroupStream &self, py::handle py_tensor, int src_rank, int nranks, int rank_id) { auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); auto dense = std::dynamic_pointer_cast(tensor.impl()); int64_t numel = (*dense).numel(); int64_t recv_numel = numel / nranks; int64_t offset = recv_numel * rank_id; return self.Recv_Partial(*dense, src_rank, offset, recv_numel, /*sync_op*/ true, /*use_calc_stream*/ true); }, py::arg("tensor"), py::arg("src"), py::arg("num"), py::arg("id"), py::call_guard()); #if defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL) auto processGroupNCCL = py::class_>( *m, "ProcessGroupNCCL", ProcessGroupStream) .def(py::init &, int, int, const platform::CUDAPlace &, int>(), py::arg("store"), py::arg("rank"), py::arg("world_size"), py::arg("place"), py::arg("group_id") = 0, py::call_guard()); processGroupNCCL.def_static( "group_start", []() { distributed::ProcessGroupNCCL::GroupStart(); }); processGroupNCCL.def_static( "group_end", []() { distributed::ProcessGroupNCCL::GroupEnd(); }); #endif #if defined(PADDLE_WITH_MPI) py::class_>( *m, "ProcessGroupMPI", ProcessGroup) .def_static( "create", [](const std::vector &ranks, int gid) -> std::shared_ptr { return paddle::distributed::ProcessGroupMPI::CreateProcessGroupMPI( ranks, gid); }) .def("get_rank", &distributed::ProcessGroup::GetRank, py::call_guard()) .def("get_world_size", &distributed::ProcessGroup::GetSize, py::call_guard()); #endif #if defined(PADDLE_WITH_GLOO) && defined(PADDLE_WITH_PSCORE) && \ (defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_ASCEND_CL)) py::class_>( *m, "ProcessGroupHeter", ProcessGroup) .def(py::init &, int, int, #if defined(PADDLE_WITH_ASCEND_CL) const platform::NPUPlace &, #else const platform::CUDAPlace &, #endif int, int, int, int, int, bool, std::string, int, int>(), py::arg("store"), py::arg("rank"), py::arg("world_size"), py::arg("place"), py::arg("gid") = 0, py::arg("local_rank") = 0, py::arg("local_size") = 1, py::arg("gloo_rank") = 0, py::arg("gloo_size") = 1, py::arg("with_switch") = false, py::arg("switch_endpoint") = "", py::arg("src_rank") = "", py::arg("dst_rank") = "", py::call_guard()); #endif #if defined(PADDLE_WITH_ASCEND_CL) py::class_>( *m, "ProcessGroupHCCL", ProcessGroup) .def(py::init &, int, int, const platform::NPUPlace &, int>(), py::arg("store"), py::arg("rank"), py::arg("world_size"), py::arg("place"), py::arg("group_id") = 0, py::call_guard()); #endif #if defined(PADDLE_WITH_CUSTOM_DEVICE) py::class_>( *m, "ProcessGroupCustom", ProcessGroup) .def(py::init &, int, int, const platform::CustomPlace &, int>(), py::arg("store"), py::arg("rank"), py::arg("world_size"), py::arg("place"), py::arg("group_id") = 0, py::call_guard()); #endif py::class_>(*m, "task") .def("is_completed", &distributed::ProcessGroup::Task::IsCompleted) .def("is_sync", &distributed::ProcessGroup::Task::IsSync) .def("wait", &distributed::ProcessGroup::Task::Wait, py::arg("timeout") = kWaitTimeout, py::call_guard()) .def("synchronize", &distributed::ProcessGroup::Task::Synchronize, py::call_guard()); #if defined(PADDLE_WITH_GLOO) py::class_>( *m, "ProcessGroupGloo", ProcessGroup) .def(py::init &, int, int, const platform::CPUPlace &, int, std::shared_ptr &>(), py::call_guard()) .def(py::init([](const std::shared_ptr &store, int rank, int world_size, const platform::CPUPlace &place, int gid) { auto opts = GlooOptions::create(); char *ifname = getenv(GLOO_SOCKET_IFNAME_ENV.c_str()); if (ifname && strlen(ifname) > 1) { opts->device = ProcessGroupGloo::createDeviceForInterface( std::string(ifname)); } else { opts->device = ProcessGroupGloo::createDefaultDevice(); } return std::make_shared( store, rank, world_size, place, gid, opts); }), py::arg("store"), py::arg("rank"), py::arg("world_size"), py::arg("place"), py::arg("group_id") = 0, py::call_guard()) .def_static("create_default_device", &ProcessGroupGloo::createDefaultDevice); #endif m->def( "eager_assign_group_by_size", [](py::handle py_tensors, std::vector is_sparse_gradient, std::vector group_size_limits, std::vector tensor_indices) { auto tensors = CastPyArg2VectorOfTensor(py_tensors.ptr(), 0); return distributed::Eager_AssignGroupBySize( tensors, is_sparse_gradient, group_size_limits, tensor_indices); }, py::arg("tensors"), py::arg("is_sparse_gradient"), py::arg("group_size_limits") = std::vector{25 * 1024 * 1024}, py::arg("tensor_indices") = std::vector{}, py::call_guard()); py::class_>( *m, "EagerReducer", R"DOC()DOC") .def(py::init(&CreateEagerReducer)) .def( "prepare_for_backward", [](distributed::EagerReducer &self, py::handle py_tensors) { auto params = CastPyArg2VectorOfTensor(py_tensors.ptr(), 0); self.PrepareForBackward(params); }, py::arg("tensors"), py::call_guard()); } } // end namespace pybind } // namespace paddle