diff --git a/paddle/fluid/distributed/collective/ProcessGroup.h b/paddle/fluid/distributed/collective/ProcessGroup.h index be3bfc0dc00290a8e20a94c5c254cd99e62bc58b..8a5465bf0515b475548faba9022fe175c789f916 100644 --- a/paddle/fluid/distributed/collective/ProcessGroup.h +++ b/paddle/fluid/distributed/collective/ProcessGroup.h @@ -137,6 +137,15 @@ class ProcessGroup { "ProcessGroup%s does not support AllGather", GetBackendName())); } + virtual std::shared_ptr AllGather_Partial( + std::vector& in_tensors, // NOLINT + std::vector& out_tensors, // NOLINT + int offset, + int length) { // NOLINT + PADDLE_THROW(platform::errors::InvalidArgument( + "ProcessGroup%s does not support AllGather_Partial", GetBackendName())); + } + virtual std::shared_ptr AllToAll( std::vector&, // NOLINT std::vector&) { // NOLINT diff --git a/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc b/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc index 1beca8022e9f9833d9305841e4823116808bdd6d..81db9b94da93e8607045e3916d0fff14ce8ca5d6 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc @@ -85,18 +85,19 @@ bool ProcessGroupNCCL::NCCLTask::IsCompleted() { return true; } -void ProcessGroupNCCL::CheckSplitSizes(std::vector& split_sizes, +void ProcessGroupNCCL::CheckSplitSizes(std::vector* split_sizes, std::vector tensor_shape) { - int64_t len_size = split_sizes.size(); + int64_t len_size = (*split_sizes).size(); if (len_size == 0) { PADDLE_ENFORCE_EQ(tensor_shape[0] % size_ == 0, true, platform::errors::InvalidArgument( "Tensor's dim[0] must be divisible by group size " "when split_sizes not given.")); - split_sizes.insert(split_sizes.end(), - size_, - static_cast(tensor_shape[0] / size_)); + (*split_sizes) + .insert((*split_sizes).end(), + size_, + static_cast(tensor_shape[0] / size_)); } else { PADDLE_ENFORCE_EQ( len_size == size_, @@ -104,7 +105,7 @@ void ProcessGroupNCCL::CheckSplitSizes(std::vector& split_sizes, platform::errors::InvalidArgument( "The length of split_sizes must be equal to group size.")); auto sum_size = std::accumulate( - split_sizes.begin(), split_sizes.end(), static_cast(0)); + (*split_sizes).begin(), (*split_sizes).end(), static_cast(0)); PADDLE_ENFORCE_EQ( sum_size == tensor_shape[0], true, @@ -626,6 +627,37 @@ void* GetPointerByOffset(void* raw_pointer, return nullptr; } +std::shared_ptr ProcessGroupNCCL::AllGather_Partial( + std::vector& in_tensors, + std::vector& out_tensors, + int offset, + int length) { + PADDLE_ENFORCE_EQ( + CheckTensorsInCudaPlace(in_tensors), + true, + platform::errors::InvalidArgument("All inputs should be in CudaPlace.")); + PADDLE_ENFORCE_EQ( + CheckTensorsInCudaPlace(out_tensors), + true, + platform::errors::InvalidArgument("All outputs should be in CudaPlace.")); + return Collective( + in_tensors, + out_tensors, + [&](phi::DenseTensor& input, + phi::DenseTensor& output, + ncclComm_t comm, + const gpuStream_t& stream) { + return platform::dynload::ncclAllGather( + GetPointerByOffset(input.data(), offset, input.dtype()), + output.data(), + length, + platform::ToNCCLDataType(input.dtype()), + comm, + stream); + }, + CommType::ALLGATHER); +} + std::shared_ptr ProcessGroupNCCL::AllToAll( std::vector& in_tensors, std::vector& out_tensors) { @@ -695,8 +727,8 @@ std::shared_ptr ProcessGroupNCCL::AllToAll_Single( std::vector in_dims = phi::vectorize(input.dims()); std::vector out_dims = phi::vectorize(output.dims()); - CheckSplitSizes(in_sizes, in_dims); - CheckSplitSizes(out_sizes, out_dims); + CheckSplitSizes(&in_sizes, in_dims); + CheckSplitSizes(&out_sizes, out_dims); size_t in_offset = 0, out_offset = 0; size_t in_length = 0, out_length = 0; diff --git a/paddle/fluid/distributed/collective/ProcessGroupNCCL.h b/paddle/fluid/distributed/collective/ProcessGroupNCCL.h index a8adffe64e70d14cd20a37367f45de278dc44041..d5845b5a383ce229e09cfcc7803c440be426fe1a 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupNCCL.h +++ b/paddle/fluid/distributed/collective/ProcessGroupNCCL.h @@ -125,6 +125,12 @@ class ProcessGroupNCCL : public ProcessGroup { std::vector& in_tensors, std::vector& out_tensors) override; + std::shared_ptr AllGather_Partial( + std::vector& in_tensors, + std::vector& out_tensors, + int offset, + int length) override; + std::shared_ptr AllToAll( std::vector& in, std::vector& out) override; @@ -206,7 +212,7 @@ class ProcessGroupNCCL : public ProcessGroup { void CreateNCCLManagerCache(const std::string& places_key, const std::vector& places); - void CheckSplitSizes(std::vector& split_sizes, + void CheckSplitSizes(std::vector* split_sizes, std::vector tensor_shape); }; diff --git a/paddle/fluid/operators/collective/partial_allgather_op.cu.cc b/paddle/fluid/operators/collective/partial_allgather_op.cu.cc index 9907de95fa661dff5f15e9ca6cd2e2d69b5e0f24..7e25f6876adb0f42e92af837cb1f9cccec2bf67e 100644 --- a/paddle/fluid/operators/collective/partial_allgather_op.cu.cc +++ b/paddle/fluid/operators/collective/partial_allgather_op.cu.cc @@ -15,6 +15,7 @@ limitations under the License. */ #include "paddle/fluid/operators/collective/partial_allgather_op.h" #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) +#include "paddle/fluid/distributed/collective/ProcessGroup.h" #include "paddle/fluid/platform/collective_helper.h" #include "paddle/fluid/platform/device/gpu/nccl_helper.h" #endif @@ -61,24 +62,38 @@ class PartialAllGatherOpCUDAKernel : public framework::OpKernel { int64_t send_numel = numel / nranks; int offset = send_numel * rank; - const T* send_buff = in->data() + offset; - T* recv_buff = out->data(); - gpuStream_t stream = nullptr; - if (ctx.Attr("use_calc_stream")) { - auto dev_ctx = platform::DeviceContextPool::Instance().Get(place); - stream = static_cast(dev_ctx)->stream(); + auto map = distributed::ProcessGroupMapFromGid::getInstance(); + if (map->has(rid)) { + // Use ProcessGroup + distributed::ProcessGroup* pg = map->get(rid); + std::vector in_tensors; + std::vector out_tensors; + in_tensors.push_back(*in); + out_tensors.push_back(*out); + auto task = + pg->AllGather_Partial(in_tensors, out_tensors, offset, send_numel); + task->Wait(); } else { - stream = comm->stream(); + const T* send_buff = in->data() + offset; + T* recv_buff = out->data(); + + gpuStream_t stream = nullptr; + if (ctx.Attr("use_calc_stream")) { + auto dev_ctx = platform::DeviceContextPool::Instance().Get(place); + stream = static_cast(dev_ctx)->stream(); + } else { + stream = comm->stream(); + } + + PADDLE_ENFORCE_GPU_SUCCESS( + platform::dynload::ncclAllGather(send_buff, + recv_buff, + send_numel, + static_cast(dtype), + comm->comm(), + stream)); } - - PADDLE_ENFORCE_GPU_SUCCESS( - platform::dynload::ncclAllGather(send_buff, - recv_buff, - send_numel, - static_cast(dtype), - comm->comm(), - stream)); #else PADDLE_THROW(platform::errors::PreconditionNotMet( "PaddlePaddle should compile with GPU.")); diff --git a/paddle/fluid/operators/collective/partial_recv_op.cu.cc b/paddle/fluid/operators/collective/partial_recv_op.cu.cc index 0ddeb8c195f173ca83bf226cca75ab151642faaa..da6690a96a19a1a8836dad27350440357fff8c08 100644 --- a/paddle/fluid/operators/collective/partial_recv_op.cu.cc +++ b/paddle/fluid/operators/collective/partial_recv_op.cu.cc @@ -15,6 +15,7 @@ limitations under the License. */ #include "paddle/fluid/operators/collective/partial_recv_op.h" #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) +#include "paddle/fluid/distributed/collective/ProcessGroup.h" #include "paddle/fluid/platform/collective_helper.h" #include "paddle/fluid/platform/device/gpu/nccl_helper.h" #endif @@ -65,37 +66,44 @@ class PartialRecvOpCUDAKernel : public framework::OpKernel { platform::errors::InvalidArgument( "The input numel (%d) must be divisible by num(%d)", numel, num)); - gpuStream_t stream = nullptr; auto place = ctx.GetPlace(); - auto comm = platform::NCCLCommContext::Instance().Get(rid, place); - if (ctx.Attr("use_calc_stream")) { - auto dev_ctx = platform::DeviceContextPool::Instance().Get(place); - stream = static_cast(dev_ctx)->stream(); - } else { - stream = comm->stream(); - } - PADDLE_ENFORCE_LT( - peer, - comm->nranks(), - platform::errors::InvalidArgument("The value of peer (%d) you set must " - "be less than comm->nranks (%d).", - peer, - comm->nranks())); - out->mutable_data(out_dims, place); - ncclDataType_t dtype = platform::ToNCCLDataType(type); int recv_numel = numel / num; int offset = recv_numel * id; - PADDLE_ENFORCE_GPU_SUCCESS( - platform::dynload::ncclRecv(out->data() + offset, - recv_numel, - dtype, - peer, - comm->comm(), - stream)); - VLOG(3) << "rank " << comm->rank() << " recv " << recv_numel - << " from offset[" << offset << "] from " << peer; + auto map = distributed::ProcessGroupMapFromGid::getInstance(); + if (map->has(rid)) { + // Use ProcessGroup + distributed::ProcessGroup *pg = map->get(rid); + auto task = pg->Recv_Partial(*out, peer, offset, recv_numel); + task->Wait(); + } else { + gpuStream_t stream = nullptr; + auto comm = platform::NCCLCommContext::Instance().Get(rid, place); + if (ctx.Attr("use_calc_stream")) { + auto dev_ctx = platform::DeviceContextPool::Instance().Get(place); + stream = static_cast(dev_ctx)->stream(); + } else { + stream = comm->stream(); + } + PADDLE_ENFORCE_LT(peer, + comm->nranks(), + platform::errors::InvalidArgument( + "The value of peer (%d) you set must " + "be less than comm->nranks (%d).", + peer, + comm->nranks())); + ncclDataType_t dtype = platform::ToNCCLDataType(type); + PADDLE_ENFORCE_GPU_SUCCESS( + platform::dynload::ncclRecv(out->data() + offset, + recv_numel, + dtype, + peer, + comm->comm(), + stream)); + VLOG(3) << "rank " << comm->rank() << " recv " << recv_numel + << " from offset[" << offset << "] from " << peer; + } #else PADDLE_THROW(platform::errors::Unavailable( "PaddlePaddle should be compiled with NCCL and " diff --git a/paddle/fluid/operators/collective/partial_send_op.cu.cc b/paddle/fluid/operators/collective/partial_send_op.cu.cc index 480988586cb50dd95c79135d894e9646a2b40f87..874bd61d198971f7a9d8d44d4984037c247af166 100644 --- a/paddle/fluid/operators/collective/partial_send_op.cu.cc +++ b/paddle/fluid/operators/collective/partial_send_op.cu.cc @@ -15,6 +15,7 @@ limitations under the License. */ #include "paddle/fluid/operators/collective/partial_send_op.h" #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) +#include "paddle/fluid/distributed/collective/ProcessGroup.h" #include "paddle/fluid/platform/collective_helper.h" #include "paddle/fluid/platform/device/gpu/nccl_helper.h" #endif @@ -61,32 +62,47 @@ class PartialSendCUDAKernel : public framework::OpKernel { platform::errors::InvalidArgument( "The input numel (%d) must be divisible by num(%d)", numel, num)); - gpuStream_t stream = nullptr; - auto place = ctx.GetPlace(); - auto comm = platform::NCCLCommContext::Instance().Get(rid, place); - if (ctx.Attr("use_calc_stream")) { - auto dev_ctx = platform::DeviceContextPool::Instance().Get(place); - stream = static_cast(dev_ctx)->stream(); - } else { - stream = comm->stream(); - } - PADDLE_ENFORCE_LT( - peer, - comm->nranks(), - platform::errors::InvalidArgument("The value of peer (%d) you set must " - "be less than comm->nranks (%d).", - peer, - comm->nranks())); - - ncclDataType_t dtype = - platform::ToNCCLDataType(framework::TransToProtoVarType(x->dtype())); int send_numel = numel / num; int offset = send_numel * id; - PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclSend( - x->data() + offset, send_numel, dtype, peer, comm->comm(), stream)); - VLOG(3) << "rank " << comm->rank() << " send " << send_numel - << " from offset[" << offset << "] to " << peer; + auto map = distributed::ProcessGroupMapFromGid::getInstance(); + if (map->has(rid)) { + // Use ProcessGroup + distributed::ProcessGroup* pg = map->get(rid); + phi::DenseTensor tmp = *x; + auto task = pg->Send_Partial(tmp, peer, offset, send_numel); + task->Wait(); + } else { + gpuStream_t stream = nullptr; + auto place = ctx.GetPlace(); + auto comm = platform::NCCLCommContext::Instance().Get(rid, place); + if (ctx.Attr("use_calc_stream")) { + auto dev_ctx = platform::DeviceContextPool::Instance().Get(place); + stream = static_cast(dev_ctx)->stream(); + } else { + stream = comm->stream(); + } + PADDLE_ENFORCE_LT(peer, + comm->nranks(), + platform::errors::InvalidArgument( + "The value of peer (%d) you set must " + "be less than comm->nranks (%d).", + peer, + comm->nranks())); + + ncclDataType_t dtype = + platform::ToNCCLDataType(framework::TransToProtoVarType(x->dtype())); + + PADDLE_ENFORCE_GPU_SUCCESS( + platform::dynload::ncclSend(x->data() + offset, + send_numel, + dtype, + peer, + comm->comm(), + stream)); + VLOG(3) << "rank " << comm->rank() << " send " << send_numel + << " from offset[" << offset << "] to " << peer; + } #else PADDLE_THROW(platform::errors::Unavailable( "PaddlePaddle should be compiled with NCCL " diff --git a/paddle/fluid/pybind/distributed_py.cc b/paddle/fluid/pybind/distributed_py.cc index b8d5a0de820e7529b016b335e470ea80ca15ba08..1146f650d8d571fa38055ad2090928ca891d32f5 100644 --- a/paddle/fluid/pybind/distributed_py.cc +++ b/paddle/fluid/pybind/distributed_py.cc @@ -172,6 +172,27 @@ void BindDistributed(py::module *m) { py::arg("dst"), py::call_guard()) + .def( + "send_partial", + [](distributed::ProcessGroup &self, + py::handle py_tensor, + int dst_rank, + int nranks, + int rank_id) { + auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); + auto dense = + std::dynamic_pointer_cast(tensor.impl()); + int numel = (*dense).numel(); + int send_numel = numel / nranks; + int offset = send_numel * rank_id; + return self.Send_Partial(*dense, dst_rank, offset, send_numel); + }, + py::arg("tensor"), + py::arg("dst"), + py::arg("num"), + py::arg("id"), + py::call_guard()) + .def( "recv", [](distributed::ProcessGroup &self, @@ -187,6 +208,27 @@ void BindDistributed(py::module *m) { py::arg("src"), py::call_guard()) + .def( + "recv_partial", + [](distributed::ProcessGroup &self, + py::handle py_tensor, + int src_rank, + int nranks, + int rank_id) { + auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); + auto dense = + std::dynamic_pointer_cast(tensor.impl()); + int numel = (*dense).numel(); + int recv_numel = numel / nranks; + int offset = recv_numel * rank_id; + return self.Recv_Partial(*dense, src_rank, offset, recv_numel); + }, + py::arg("tensor"), + py::arg("src"), + py::arg("num"), + py::arg("id"), + py::call_guard()) + .def( "all_gather", [](distributed::ProcessGroup &self, @@ -206,6 +248,33 @@ void BindDistributed(py::module *m) { py::arg("out"), py::call_guard()) + .def( + "all_gather_partial", + [](distributed::ProcessGroup &self, + py::handle py_in_tensor, + py::handle py_out_tensor, + int nranks, + int rank_id) { + auto in_tensor = CastPyArg2Tensor(py_in_tensor.ptr(), 0); + auto out_tensor = CastPyArg2Tensor(py_out_tensor.ptr(), 0); + auto in_dense = std::dynamic_pointer_cast( + in_tensor.impl()); + auto out_dense = std::dynamic_pointer_cast( + out_tensor.impl()); + std::vector in_tensors = {*in_dense}; + std::vector out_tensors = {*out_dense}; + int numel = (*in_dense).numel(); + int send_numel = numel / nranks; + int offset = send_numel * rank_id; + return self.AllGather_Partial( + in_tensors, out_tensors, offset, send_numel); + }, + py::arg("in"), + py::arg("out"), + py::arg("num"), + py::arg("id"), + py::call_guard()) + .def( "alltoall", [](distributed::ProcessGroup &self, diff --git a/python/paddle/distributed/fleet/meta_parallel/pp_utils/p2p_communication.py b/python/paddle/distributed/fleet/meta_parallel/pp_utils/p2p_communication.py index 6f917d9f89d6a8e26fa49f802dd68749704ced9d..f42752c5e8f1b2ab3b1370e4ba20845c26c303e3 100644 --- a/python/paddle/distributed/fleet/meta_parallel/pp_utils/p2p_communication.py +++ b/python/paddle/distributed/fleet/meta_parallel/pp_utils/p2p_communication.py @@ -158,14 +158,26 @@ _send_recv_meta = SendRecvMeta() def _is_valid_send_recv_partial(tensor, mp_degree): + tensor_numel = np.prod(tensor.shape) + assert tensor_numel != 0, "can't send/recv zero element" + return mp_degree > 1 and tensor_numel % mp_degree == 0 + + +def _partial_send_op(tensor, group, use_calc_stream, ring_id, dst, nranks, + rank_id): if _in_legacy_dygraph(): - tensor_numel = np.prod(tensor.shape) - assert tensor_numel != 0, "can't send/recv zero element" - return mp_degree > 1 and tensor_numel % mp_degree == 0 + return _C_ops.partial_send(tensor.detach(), 'use_calc_stream', + use_calc_stream, 'ring_id', ring_id, 'peer', + dst, 'num', nranks, 'id', rank_id) elif in_dygraph_mode(): - # TODO(shenliang03) support mp+pp optimizer in future. - # (partial_send/partial_recv/partial_allgather_) - return False + group = paddle.distributed.collective._get_default_group( + ) if group is None else group + task = group.process_group.send_partial(tensor, dst, nranks, rank_id) + if use_calc_stream: + task.wait() + return None + else: + return task def send_partial(tensor, @@ -180,9 +192,8 @@ def send_partial(tensor, ring_id = 0 if group is None else group.id if _is_valid_send_recv_partial(tensor, nranks): - return _C_ops.partial_send(tensor.detach(), 'use_calc_stream', - use_calc_stream, 'ring_id', ring_id, 'peer', - dst, 'num', nranks, 'id', rank_id) + return _partial_send_op(tensor, group, use_calc_stream, ring_id, dst, + nranks, rank_id) else: return paddle.distributed.send(tensor.detach(), dst=group.ranks[dst], @@ -190,6 +201,24 @@ def send_partial(tensor, use_calc_stream=use_calc_stream) +def _partial_recv_op(tensor, group, use_calc_stream, ring_id, src, nranks, + rank_id): + if _in_legacy_dygraph(): + return _C_ops.partial_recv(tensor.detach(), 'use_calc_stream', + use_calc_stream, 'ring_id', ring_id, 'peer', + src, 'num', nranks, 'id', rank_id, 'dtype', + tensor.dtype, 'out_shape', tensor.shape) + elif in_dygraph_mode(): + group = paddle.distributed.collective._get_default_group( + ) if group is None else group + task = group.process_group.recv_partial(tensor, src, nranks, rank_id) + if use_calc_stream: + task.wait() + return None + else: + return task + + def recv_partial(tensor, src=0, nranks=1, @@ -202,15 +231,31 @@ def recv_partial(tensor, ring_id = 0 if group is None else group.id if _is_valid_send_recv_partial(tensor, nranks): - _C_ops.partial_recv(tensor.detach(), 'use_calc_stream', use_calc_stream, - 'ring_id', ring_id, 'peer', src, 'num', nranks, - 'id', rank_id, 'dtype', tensor.dtype, 'out_shape', - tensor.shape) + return _partial_recv_op(tensor, group, use_calc_stream, ring_id, src, + nranks, rank_id) else: - paddle.distributed.recv(tensor.detach(), - src=group.ranks[src], - group=group, - use_calc_stream=use_calc_stream) + return paddle.distributed.recv(tensor.detach(), + src=group.ranks[src], + group=group, + use_calc_stream=use_calc_stream) + + +def _partial_allgather_op(tensor, group, use_calc_stream, ring_id, nranks, + rank_id): + if _in_legacy_dygraph(): + return _C_ops.partial_allgather_(tensor.detach(), 'use_calc_stream', + use_calc_stream, 'ring_id', ring_id, + 'nranks', nranks, 'rank', rank_id) + elif in_dygraph_mode(): + group = paddle.distributed.collective._get_default_group( + ) if group is None else group + task = group.process_group.all_gather_partial(tensor, tensor, nranks, + rank_id) + if use_calc_stream: + task.wait() + return None + else: + return task def allgather_partial(tensor, @@ -224,9 +269,8 @@ def allgather_partial(tensor, return ring_id = 0 if group is None else group.id - return _C_ops.partial_allgather_(tensor.detach(), 'use_calc_stream', - use_calc_stream, 'ring_id', ring_id, - 'nranks', nranks, 'rank', rank_id) + return _partial_allgather_op(tensor, group, use_calc_stream, ring_id, + nranks, rank_id) def _p2p_helper(tensor_send_next, tensor_send_prev, recv_prev, recv_next):