diff --git a/paddle/fluid/eager/api/utils/tensor_utils.h b/paddle/fluid/eager/api/utils/tensor_utils.h index ac6de72dbff39b58c58b842618ff7c4508a8373e..158aa5c8d7dd013a9a25693ac7f517de9ac58209 100644 --- a/paddle/fluid/eager/api/utils/tensor_utils.h +++ b/paddle/fluid/eager/api/utils/tensor_utils.h @@ -15,7 +15,7 @@ #pragma once #include "paddle/fluid/eager/eager_tensor.h" -#include "paddle/phi/api/all.h" +#include "paddle/phi/api/include/tensor.h" namespace egr { namespace egr_utils_api { diff --git a/paddle/fluid/operators/margin_cross_entropy_op.cu b/paddle/fluid/operators/margin_cross_entropy_op.cu index a2e34d98461e0107f27d51d3ce7a618c34ca7ea3..fd5ba1952caf9336e907dbe959f88e1292eaffa2 100644 --- a/paddle/fluid/operators/margin_cross_entropy_op.cu +++ b/paddle/fluid/operators/margin_cross_entropy_op.cu @@ -26,10 +26,12 @@ namespace cub = hipcub; #include "paddle/fluid/operators/reduce_ops/reduce_op.cu.h" #include "paddle/fluid/operators/reduce_ops/reduce_op.h" #include "paddle/fluid/string/string_helper.h" +#include "paddle/phi/api/include/tensor.h" #include "paddle/phi/kernels/funcs/axis_utils.h" #include "paddle/phi/kernels/funcs/math_function.h" #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) +#include "paddle/fluid/distributed/collective/ProcessGroup.h" #include "paddle/fluid/platform/collective_helper.h" #include "paddle/fluid/platform/device/gpu/nccl_helper.h" #endif @@ -63,19 +65,34 @@ void GetClassInterval(const gpuStream_t& stream, const platform::Place& place, framework::TensorFromVector(shard_dim_vec, ctx, &num_classes_per_device); int* num_classes_per_device_ptr = num_classes_per_device.data(); - const auto& comm = platform::NCCLCommContext::Instance().Get(rid, place); - // use global calculate stream - const auto calcu_stream = - static_cast( - platform::DeviceContextPool::Instance().Get(place)) - ->stream(); - - PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclAllReduce( - num_classes_per_device_ptr, num_classes_per_device_ptr, - num_classes_per_device.numel(), - platform::ToNCCLDataType( - framework::TransToProtoVarType(num_classes_per_device.dtype())), - ncclSum, comm->comm(), calcu_stream)); + auto map = distributed::ProcessGroupMapFromGid::getInstance(); + if (map->has(rid)) { + // Use ProcessGroup + distributed::ProcessGroup* pg = map->get(rid); + std::vector in_tensor; + std::vector out_tensor; + in_tensor.push_back(num_classes_per_device); + out_tensor.push_back(num_classes_per_device); + + distributed::AllreduceOptions opts; + opts.reduce_op = distributed::ReduceOp::SUM; + auto task = pg->AllReduce(in_tensor, out_tensor, opts); + task->Wait(); + } else { + const auto& comm = platform::NCCLCommContext::Instance().Get(rid, place); + // use global calculate stream + const auto calcu_stream = + static_cast( + platform::DeviceContextPool::Instance().Get(place)) + ->stream(); + + PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclAllReduce( + num_classes_per_device_ptr, num_classes_per_device_ptr, + num_classes_per_device.numel(), + platform::ToNCCLDataType( + framework::TransToProtoVarType(num_classes_per_device.dtype())), + ncclSum, comm->comm(), calcu_stream)); + } auto class_interval_ptr = class_interval->mutable_data({nranks + 1}, place); @@ -228,14 +245,21 @@ class MarginCrossEntropyOpCUDAKernel : public framework::OpKernel { #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) platform::NCCLComm* comm; + distributed::ProcessGroup* pg = nullptr; gpuStream_t stream; if (nranks > 1) { - comm = platform::NCCLCommContext::Instance().Get(rid, place); - - // use global calculate stream - stream = static_cast( - platform::DeviceContextPool::Instance().Get(place)) - ->stream(); + auto map = distributed::ProcessGroupMapFromGid::getInstance(); + if (map->has(rid)) { + // Use ProcessGroup + pg = map->get(rid); + } else { + comm = platform::NCCLCommContext::Instance().Get(rid, place); + + // use global calculate stream + stream = static_cast( + platform::DeviceContextPool::Instance().Get(place)) + ->stream(); + } } #endif @@ -306,11 +330,23 @@ class MarginCrossEntropyOpCUDAKernel : public framework::OpKernel { #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) if (nranks > 1) { - PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclAllReduce( - logits_max_buff, logits_max_buff, logits_max.numel(), - platform::ToNCCLDataType( - framework::TransToProtoVarType(logits_max.dtype())), - ncclMax, comm->comm(), stream)); + if (pg) { + std::vector in_tensor; + std::vector out_tensor; + in_tensor.push_back(logits_max); + out_tensor.push_back(logits_max); + + distributed::AllreduceOptions opts; + opts.reduce_op = distributed::ReduceOp::MAX; + auto task = pg->AllReduce(in_tensor, out_tensor, opts); + task->Wait(); + } else { + PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclAllReduce( + logits_max_buff, logits_max_buff, logits_max.numel(), + platform::ToNCCLDataType( + framework::TransToProtoVarType(logits_max.dtype())), + ncclMax, comm->comm(), stream)); + } } #endif @@ -329,11 +365,23 @@ class MarginCrossEntropyOpCUDAKernel : public framework::OpKernel { #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) if (nranks > 1) { - PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclAllReduce( - sum_exp_logits_buff, sum_exp_logits_buff, sum_exp_logits.numel(), - platform::ToNCCLDataType( - framework::TransToProtoVarType(sum_exp_logits.dtype())), - ncclSum, comm->comm(), stream)); + if (pg) { + std::vector in_tensor; + std::vector out_tensor; + in_tensor.push_back(sum_exp_logits); + out_tensor.push_back(sum_exp_logits); + + distributed::AllreduceOptions opts; + opts.reduce_op = distributed::ReduceOp::SUM; + auto task = pg->AllReduce(in_tensor, out_tensor, opts); + task->Wait(); + } else { + PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclAllReduce( + sum_exp_logits_buff, sum_exp_logits_buff, sum_exp_logits.numel(), + platform::ToNCCLDataType( + framework::TransToProtoVarType(sum_exp_logits.dtype())), + ncclSum, comm->comm(), stream)); + } } #endif @@ -363,11 +411,23 @@ class MarginCrossEntropyOpCUDAKernel : public framework::OpKernel { #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) if (nranks > 1) { - PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclAllReduce( - loss_ptr, loss_ptr, loss->numel(), - platform::ToNCCLDataType( - framework::TransToProtoVarType(loss->dtype())), - ncclSum, comm->comm(), stream)); + if (pg) { + std::vector in_tensor; + std::vector out_tensor; + in_tensor.push_back(*loss); + out_tensor.push_back(*loss); + + distributed::AllreduceOptions opts; + opts.reduce_op = distributed::ReduceOp::SUM; + auto task = pg->AllReduce(in_tensor, out_tensor, opts); + task->Wait(); + } else { + PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclAllReduce( + loss_ptr, loss_ptr, loss->numel(), + platform::ToNCCLDataType( + framework::TransToProtoVarType(loss->dtype())), + ncclSum, comm->comm(), stream)); + } } #endif } diff --git a/python/paddle/fluid/tests/unittests/test_parallel_margin_cross_entropy.py b/python/paddle/fluid/tests/unittests/test_parallel_margin_cross_entropy.py index 1b24889830ad873997fb803fb9baf574c657e2a9..e6402ee78f1d1e67fb447186ed7bfd91ec816939 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_margin_cross_entropy.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_margin_cross_entropy.py @@ -14,6 +14,7 @@ from __future__ import print_function +import os import unittest import paddle.fluid as fluid @@ -23,7 +24,10 @@ from test_parallel_dygraph_dataparallel import TestMultipleGpus class TestParallelMarginSoftmaxWithCrossEntropy(TestMultipleGpus): def test_parallel_margin_cross_entropy(self): self.run_mnist_2gpu('parallel_margin_cross_entropy.py') + self.run_mnist_2gpu( + 'parallel_margin_cross_entropy.py', eager_mode=False) if __name__ == "__main__": + os.environ["FLAGS_enable_eager_mode"] = "1" unittest.main()