From f6ff2221a4337b213e914179c6060e4501982ad2 Mon Sep 17 00:00:00 2001 From: Yuang Liu Date: Tue, 12 Jul 2022 20:22:17 +0800 Subject: [PATCH] fix fused attention, ffn, fm under new process group (#44259) --- .../operators/fused/fused_attention_op.cu | 36 +++++++++++++------ .../operators/fused/fused_feedforward_op.cu | 36 +++++++++++++------ .../fused/fused_multi_transformer_op.cu | 36 +++++++++++++------ 3 files changed, 78 insertions(+), 30 deletions(-) diff --git a/paddle/fluid/operators/fused/fused_attention_op.cu b/paddle/fluid/operators/fused/fused_attention_op.cu index 0c33f7c9d4..2c3fd75d8e 100644 --- a/paddle/fluid/operators/fused/fused_attention_op.cu +++ b/paddle/fluid/operators/fused/fused_attention_op.cu @@ -24,11 +24,13 @@ limitations under the License. */ #include "paddle/fluid/operators/fused/fused_dropout_helper.h" #include "paddle/fluid/platform/device/gpu/gpu_device_function.h" #include "paddle/fluid/platform/device/gpu/gpu_dnn.h" +#include "paddle/phi/api/include/tensor.h" #include "paddle/phi/kernels/funcs/broadcast_function.h" #include "paddle/phi/kernels/funcs/elementwise_functor.h" #include "paddle/phi/kernels/funcs/math_function.h" #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) +#include "paddle/fluid/distributed/collective/ProcessGroup.h" #include "paddle/fluid/platform/collective_helper.h" #include "paddle/fluid/platform/device/gpu/nccl_helper.h" #endif @@ -44,16 +46,30 @@ static void AllReduce(framework::Tensor &tensor, // NOLINT const platform::CUDADeviceContext &ctx) { if (ring_id == -1) return; #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) - auto dtype = - platform::ToNCCLDataType(framework::TransToProtoVarType(tensor.dtype())); - int64_t numel = tensor.numel(); - const void *sendbuff = tensor.data(); - auto place = ctx.GetPlace(); - void *recvbuff = tensor.mutable_data(place); - auto comm = platform::NCCLCommContext::Instance().Get(ring_id, place); - auto stream = ctx.stream(); - PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclAllReduce( - sendbuff, recvbuff, numel, dtype, ncclSum, comm->comm(), stream)); + auto map = paddle::distributed::ProcessGroupMapFromGid::getInstance(); + + if (map->has(ring_id)) { + paddle::distributed::ProcessGroup *pg = map->get(ring_id); + std::vector in_tensor; + std::vector out_tensor; + in_tensor.push_back(tensor); + out_tensor.push_back(tensor); + paddle::distributed::AllreduceOptions opts; + opts.reduce_op = distributed::ReduceOp::SUM; + auto task = pg->AllReduce(in_tensor, out_tensor, opts); + task->Wait(); + } else { + auto dtype = platform::ToNCCLDataType( + framework::TransToProtoVarType(tensor.dtype())); + int64_t numel = tensor.numel(); + const void *sendbuff = tensor.data(); + auto place = ctx.GetPlace(); + void *recvbuff = tensor.mutable_data(place); + auto comm = platform::NCCLCommContext::Instance().Get(ring_id, place); + auto stream = ctx.stream(); + PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclAllReduce( + sendbuff, recvbuff, numel, dtype, ncclSum, comm->comm(), stream)); + } #else PADDLE_THROW(platform::errors::Unimplemented( "PaddlePaddle should compile with NCCL or RCCL when used tensor model " diff --git a/paddle/fluid/operators/fused/fused_feedforward_op.cu b/paddle/fluid/operators/fused/fused_feedforward_op.cu index fe388aa405..4126f5ad72 100644 --- a/paddle/fluid/operators/fused/fused_feedforward_op.cu +++ b/paddle/fluid/operators/fused/fused_feedforward_op.cu @@ -17,11 +17,13 @@ limitations under the License. */ #include "paddle/fluid/operators/fused/fused_dropout_helper.h" #include "paddle/fluid/operators/layer_norm_kernel.cu.h" #include "paddle/fluid/operators/matmul_v2_op.h" +#include "paddle/phi/api/include/tensor.h" #include "paddle/phi/kernels/funcs/blas/blas.h" #include "paddle/phi/kernels/funcs/broadcast_function.h" #include "paddle/phi/kernels/funcs/elementwise_functor.h" #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) +#include "paddle/fluid/distributed/collective/ProcessGroup.h" #include "paddle/fluid/platform/collective_helper.h" #include "paddle/fluid/platform/device/gpu/nccl_helper.h" #endif @@ -37,16 +39,30 @@ static void AllReduce(framework::Tensor& tensor, // NOLINT const platform::CUDADeviceContext& ctx) { if (ring_id == -1) return; #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) - auto dtype = - platform::ToNCCLDataType(framework::TransToProtoVarType(tensor.dtype())); - int64_t numel = tensor.numel(); - const void* sendbuff = tensor.data(); - auto place = ctx.GetPlace(); - void* recvbuff = tensor.mutable_data(place); - auto comm = platform::NCCLCommContext::Instance().Get(ring_id, place); - auto stream = ctx.stream(); - PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclAllReduce( - sendbuff, recvbuff, numel, dtype, ncclSum, comm->comm(), stream)); + auto map = paddle::distributed::ProcessGroupMapFromGid::getInstance(); + + if (map->has(ring_id)) { + paddle::distributed::ProcessGroup* pg = map->get(ring_id); + std::vector in_tensor; + std::vector out_tensor; + in_tensor.push_back(tensor); + out_tensor.push_back(tensor); + paddle::distributed::AllreduceOptions opts; + opts.reduce_op = distributed::ReduceOp::SUM; + auto task = pg->AllReduce(in_tensor, out_tensor, opts); + task->Wait(); + } else { + auto dtype = platform::ToNCCLDataType( + framework::TransToProtoVarType(tensor.dtype())); + int64_t numel = tensor.numel(); + const void* sendbuff = tensor.data(); + auto place = ctx.GetPlace(); + void* recvbuff = tensor.mutable_data(place); + auto comm = platform::NCCLCommContext::Instance().Get(ring_id, place); + auto stream = ctx.stream(); + PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclAllReduce( + sendbuff, recvbuff, numel, dtype, ncclSum, comm->comm(), stream)); + } #else PADDLE_THROW(platform::errors::Unimplemented( "PaddlePaddle should compile with NCCL or RCCL when used tensor model " diff --git a/paddle/fluid/operators/fused/fused_multi_transformer_op.cu b/paddle/fluid/operators/fused/fused_multi_transformer_op.cu index fafbcf724d..a8bebd5012 100644 --- a/paddle/fluid/operators/fused/fused_multi_transformer_op.cu +++ b/paddle/fluid/operators/fused/fused_multi_transformer_op.cu @@ -29,9 +29,11 @@ limitations under the License. */ #include "paddle/fluid/operators/fused/fused_dropout_helper.h" #include "paddle/fluid/platform/device/gpu/gpu_device_function.h" #include "paddle/fluid/platform/device/gpu/gpu_dnn.h" +#include "paddle/phi/api/include/tensor.h" #include "paddle/phi/kernels/funcs/math_function.h" #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) +#include "paddle/fluid/distributed/collective/ProcessGroup.h" #include "paddle/fluid/platform/collective_helper.h" #include "paddle/fluid/platform/device/gpu/nccl_helper.h" #endif @@ -50,16 +52,30 @@ static void AllReduce(framework::Tensor &tensor, // NOLINT const platform::CUDADeviceContext &ctx) { if (ring_id == -1) return; #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) - auto dtype = - platform::ToNCCLDataType(framework::TransToProtoVarType(tensor.dtype())); - int64_t numel = tensor.numel(); - const void *sendbuff = tensor.data(); - auto place = ctx.GetPlace(); - void *recvbuff = tensor.mutable_data(place); - auto comm = platform::NCCLCommContext::Instance().Get(ring_id, place); - auto stream = ctx.stream(); - PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclAllReduce( - sendbuff, recvbuff, numel, dtype, ncclSum, comm->comm(), stream)); + auto map = paddle::distributed::ProcessGroupMapFromGid::getInstance(); + + if (map->has(ring_id)) { + paddle::distributed::ProcessGroup *pg = map->get(ring_id); + std::vector in_tensor; + std::vector out_tensor; + in_tensor.push_back(tensor); + out_tensor.push_back(tensor); + paddle::distributed::AllreduceOptions opts; + opts.reduce_op = distributed::ReduceOp::SUM; + auto task = pg->AllReduce(in_tensor, out_tensor, opts); + task->Wait(); + } else { + auto dtype = platform::ToNCCLDataType( + framework::TransToProtoVarType(tensor.dtype())); + int64_t numel = tensor.numel(); + const void *sendbuff = tensor.data(); + auto place = ctx.GetPlace(); + void *recvbuff = tensor.mutable_data(place); + auto comm = platform::NCCLCommContext::Instance().Get(ring_id, place); + auto stream = ctx.stream(); + PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclAllReduce( + sendbuff, recvbuff, numel, dtype, ncclSum, comm->comm(), stream)); + } #else PADDLE_THROW(platform::errors::Unimplemented( "PaddlePaddle should compile with NCCL or RCCL when used tensor model " -- GitLab