未验证 提交 67163fb4 编写于 作者: G Guoxia Wang 提交者: GitHub

fix the bug of margin cross entropy loss for eager mode (#43161)

上级 85baa3c0
......@@ -15,7 +15,7 @@
#pragma once
#include "paddle/fluid/eager/eager_tensor.h"
#include "paddle/phi/api/all.h"
#include "paddle/phi/api/include/tensor.h"
namespace egr {
namespace egr_utils_api {
......
......@@ -26,10 +26,12 @@ namespace cub = hipcub;
#include "paddle/fluid/operators/reduce_ops/reduce_op.cu.h"
#include "paddle/fluid/operators/reduce_ops/reduce_op.h"
#include "paddle/fluid/string/string_helper.h"
#include "paddle/phi/api/include/tensor.h"
#include "paddle/phi/kernels/funcs/axis_utils.h"
#include "paddle/phi/kernels/funcs/math_function.h"
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#endif
......@@ -63,19 +65,34 @@ void GetClassInterval(const gpuStream_t& stream, const platform::Place& place,
framework::TensorFromVector(shard_dim_vec, ctx, &num_classes_per_device);
int* num_classes_per_device_ptr = num_classes_per_device.data<int>();
const auto& comm = platform::NCCLCommContext::Instance().Get(rid, place);
// use global calculate stream
const auto calcu_stream =
static_cast<platform::CUDADeviceContext*>(
platform::DeviceContextPool::Instance().Get(place))
->stream();
PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclAllReduce(
num_classes_per_device_ptr, num_classes_per_device_ptr,
num_classes_per_device.numel(),
platform::ToNCCLDataType(
framework::TransToProtoVarType(num_classes_per_device.dtype())),
ncclSum, comm->comm(), calcu_stream));
auto map = distributed::ProcessGroupMapFromGid::getInstance();
if (map->has(rid)) {
// Use ProcessGroup
distributed::ProcessGroup* pg = map->get(rid);
std::vector<phi::DenseTensor> in_tensor;
std::vector<phi::DenseTensor> out_tensor;
in_tensor.push_back(num_classes_per_device);
out_tensor.push_back(num_classes_per_device);
distributed::AllreduceOptions opts;
opts.reduce_op = distributed::ReduceOp::SUM;
auto task = pg->AllReduce(in_tensor, out_tensor, opts);
task->Wait();
} else {
const auto& comm = platform::NCCLCommContext::Instance().Get(rid, place);
// use global calculate stream
const auto calcu_stream =
static_cast<platform::CUDADeviceContext*>(
platform::DeviceContextPool::Instance().Get(place))
->stream();
PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclAllReduce(
num_classes_per_device_ptr, num_classes_per_device_ptr,
num_classes_per_device.numel(),
platform::ToNCCLDataType(
framework::TransToProtoVarType(num_classes_per_device.dtype())),
ncclSum, comm->comm(), calcu_stream));
}
auto class_interval_ptr =
class_interval->mutable_data<int>({nranks + 1}, place);
......@@ -228,14 +245,21 @@ class MarginCrossEntropyOpCUDAKernel : public framework::OpKernel<T> {
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
platform::NCCLComm* comm;
distributed::ProcessGroup* pg = nullptr;
gpuStream_t stream;
if (nranks > 1) {
comm = platform::NCCLCommContext::Instance().Get(rid, place);
// use global calculate stream
stream = static_cast<platform::CUDADeviceContext*>(
platform::DeviceContextPool::Instance().Get(place))
->stream();
auto map = distributed::ProcessGroupMapFromGid::getInstance();
if (map->has(rid)) {
// Use ProcessGroup
pg = map->get(rid);
} else {
comm = platform::NCCLCommContext::Instance().Get(rid, place);
// use global calculate stream
stream = static_cast<platform::CUDADeviceContext*>(
platform::DeviceContextPool::Instance().Get(place))
->stream();
}
}
#endif
......@@ -306,11 +330,23 @@ class MarginCrossEntropyOpCUDAKernel : public framework::OpKernel<T> {
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
if (nranks > 1) {
PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclAllReduce(
logits_max_buff, logits_max_buff, logits_max.numel(),
platform::ToNCCLDataType(
framework::TransToProtoVarType(logits_max.dtype())),
ncclMax, comm->comm(), stream));
if (pg) {
std::vector<phi::DenseTensor> in_tensor;
std::vector<phi::DenseTensor> out_tensor;
in_tensor.push_back(logits_max);
out_tensor.push_back(logits_max);
distributed::AllreduceOptions opts;
opts.reduce_op = distributed::ReduceOp::MAX;
auto task = pg->AllReduce(in_tensor, out_tensor, opts);
task->Wait();
} else {
PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclAllReduce(
logits_max_buff, logits_max_buff, logits_max.numel(),
platform::ToNCCLDataType(
framework::TransToProtoVarType(logits_max.dtype())),
ncclMax, comm->comm(), stream));
}
}
#endif
......@@ -329,11 +365,23 @@ class MarginCrossEntropyOpCUDAKernel : public framework::OpKernel<T> {
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
if (nranks > 1) {
PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclAllReduce(
sum_exp_logits_buff, sum_exp_logits_buff, sum_exp_logits.numel(),
platform::ToNCCLDataType(
framework::TransToProtoVarType(sum_exp_logits.dtype())),
ncclSum, comm->comm(), stream));
if (pg) {
std::vector<phi::DenseTensor> in_tensor;
std::vector<phi::DenseTensor> out_tensor;
in_tensor.push_back(sum_exp_logits);
out_tensor.push_back(sum_exp_logits);
distributed::AllreduceOptions opts;
opts.reduce_op = distributed::ReduceOp::SUM;
auto task = pg->AllReduce(in_tensor, out_tensor, opts);
task->Wait();
} else {
PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclAllReduce(
sum_exp_logits_buff, sum_exp_logits_buff, sum_exp_logits.numel(),
platform::ToNCCLDataType(
framework::TransToProtoVarType(sum_exp_logits.dtype())),
ncclSum, comm->comm(), stream));
}
}
#endif
......@@ -363,11 +411,23 @@ class MarginCrossEntropyOpCUDAKernel : public framework::OpKernel<T> {
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
if (nranks > 1) {
PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclAllReduce(
loss_ptr, loss_ptr, loss->numel(),
platform::ToNCCLDataType(
framework::TransToProtoVarType(loss->dtype())),
ncclSum, comm->comm(), stream));
if (pg) {
std::vector<phi::DenseTensor> in_tensor;
std::vector<phi::DenseTensor> out_tensor;
in_tensor.push_back(*loss);
out_tensor.push_back(*loss);
distributed::AllreduceOptions opts;
opts.reduce_op = distributed::ReduceOp::SUM;
auto task = pg->AllReduce(in_tensor, out_tensor, opts);
task->Wait();
} else {
PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclAllReduce(
loss_ptr, loss_ptr, loss->numel(),
platform::ToNCCLDataType(
framework::TransToProtoVarType(loss->dtype())),
ncclSum, comm->comm(), stream));
}
}
#endif
}
......
......@@ -14,6 +14,7 @@
from __future__ import print_function
import os
import unittest
import paddle.fluid as fluid
......@@ -23,7 +24,10 @@ from test_parallel_dygraph_dataparallel import TestMultipleGpus
class TestParallelMarginSoftmaxWithCrossEntropy(TestMultipleGpus):
def test_parallel_margin_cross_entropy(self):
self.run_mnist_2gpu('parallel_margin_cross_entropy.py')
self.run_mnist_2gpu(
'parallel_margin_cross_entropy.py', eager_mode=False)
if __name__ == "__main__":
os.environ["FLAGS_enable_eager_mode"] = "1"
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册