diff --git a/paddle/fluid/operators/collective/c_allgather_op_npu.cc b/paddle/fluid/operators/collective/c_allgather_op_npu.cc index 2ff1227f3078021a7eeb5a4777a91778e1ae9cfb..ea6caf954d14ce48af7b253add817a41bfb2ab0d 100644 --- a/paddle/fluid/operators/collective/c_allgather_op_npu.cc +++ b/paddle/fluid/operators/collective/c_allgather_op_npu.cc @@ -35,10 +35,10 @@ class CAllGatherOpASCENDKernel : public framework::OpKernel { int ring_id = ctx.Attr("ring_id"); std::string group = std::string(HCOM_GROUP_PREFIX) + std::to_string(ring_id); - std::string tag = ctx.Attr("tag"); auto place = ctx.GetPlace(); auto comm = platform::HCCLCommContext::Instance().Get(ring_id, place); int nranks = comm->nranks(); + std::string tag = std::to_string(ring_id) + "_" + std::to_string(comm->NextTagId()); framework::DDim out_dims = in->dims(); out_dims[0] *= nranks; diff --git a/paddle/fluid/operators/collective/c_allgather_op_npu_test.cc b/paddle/fluid/operators/collective/c_allgather_op_npu_test.cc index c8f06eab2d880150da2b8a46787054587a8ee005..38f19170af9589bde81a8aa9786079ff63849fc8 100644 --- a/paddle/fluid/operators/collective/c_allgather_op_npu_test.cc +++ b/paddle/fluid/operators/collective/c_allgather_op_npu_test.cc @@ -119,7 +119,9 @@ void TestHCCLAllGatherOp(f::Scope* scope, const p::DeviceContext& ctx) { auto op = f::OpRegistry::CreateOp("c_allgather", {{"X", {"X"}}}, {{"Out", {"Out"}}}, attrs); - op->Run(*scope, place); + for (int i = 0; i < 10; i ++) { + op->Run(*scope, place); + } ctx.Wait(); std::vector out_vec; diff --git a/paddle/fluid/operators/collective/c_allreduce_max_op_npu_test.cc b/paddle/fluid/operators/collective/c_allreduce_max_op_npu_test.cc index 3631442a69ed461a0c426f404bff107d5e6982b3..3fdc859506754bb4107d642a12f7d6d1b6d44662 100644 --- a/paddle/fluid/operators/collective/c_allreduce_max_op_npu_test.cc +++ b/paddle/fluid/operators/collective/c_allreduce_max_op_npu_test.cc @@ -118,7 +118,9 @@ void TestHCCLAllReduceOp(f::Scope* scope, const p::DeviceContext& ctx) { auto op = f::OpRegistry::CreateOp("c_allreduce_max", {{"X", {"X"}}}, {{"Out", {"Out"}}}, attrs); - op->Run(*scope, place); + for (int i = 0; i < 10; i ++) { + op->Run(*scope, place); + } ctx.Wait(); std::vector out_vec; diff --git a/paddle/fluid/operators/collective/c_allreduce_op.h b/paddle/fluid/operators/collective/c_allreduce_op.h index b990e18f6ff32b42b681363bc2f581e70daecf4a..0011d9e9ad83a573ee5bc4674ea1908fae927865 100644 --- a/paddle/fluid/operators/collective/c_allreduce_op.h +++ b/paddle/fluid/operators/collective/c_allreduce_op.h @@ -118,7 +118,7 @@ class CAllReduceOpASCENDKernel : public framework::OpKernel { void Compute(const framework::ExecutionContext& ctx) const override { #if defined(PADDLE_WITH_ASCEND_CL) - // we need to pre-allocate 512 Bytes before the data + // we need to pre-allocate 512 Bytes before the data // and 512 Bytes after the data, so the hccl allreduce // can work. This is a must acooding to huawei peer. #define PRE_MALLOC_SIZE_BYTES 512 @@ -135,16 +135,16 @@ class CAllReduceOpASCENDKernel : public framework::OpKernel { paddle::framework::LoDTensor tmp_in, tmp_out; tmp_in.Resize({tmp_numel}); tmp_out.Resize({tmp_numel}); - tmp_in.mutable_data(place); // allocate - tmp_out.mutable_data(place); // allocate + auto p_tmp_in = tmp_in.mutable_data(place); // allocate + auto p_tmp_out = tmp_out.mutable_data(place); // allocate void* sendbuff = reinterpret_cast(tmp_in.data() + pre_tmp_size); void* recvbuff = reinterpret_cast(tmp_out.data() + pre_tmp_size); - std::string tag = ctx.Attr("tag"); int ring_id = ctx.Attr("ring_id"); std::string group = std::string(HCOM_GROUP_PREFIX) + std::to_string(ring_id); auto comm = paddle::platform::HCCLCommContext::Instance().Get(ring_id, place); + std::string tag = std::to_string(ring_id) + "_" + std::to_string(comm->NextTagId()); aclrtStream stream = nullptr; auto dev_ctx = platform::DeviceContextPool::Instance().Get(place); @@ -154,9 +154,13 @@ class CAllReduceOpASCENDKernel : public framework::OpKernel { stream = comm->stream(); } + // we need to memset this memory firstly to avoid core by hccl + platform::NPUMemsetAsync(static_cast(p_tmp_in), 0, tmp_numel*sizeof(T), stream); + platform::NPUMemsetAsync(static_cast(p_tmp_out), 0, tmp_numel*sizeof(T), stream); + auto npu_place = BOOST_GET_CONST(platform::NPUPlace, place); - memory::Copy(npu_place, sendbuff, + memory::Copy(npu_place, sendbuff, npu_place, reinterpret_cast(const_cast(in->data())), numel * sizeof(T), stream); @@ -195,10 +199,10 @@ class CAllReduceOpASCENDKernel : public framework::OpKernel { tag.c_str(), sendbuff, recvbuff, numel, dtype, hccl_red_type, group.c_str(), (void*)stream)); memory::Copy(npu_place, reinterpret_cast(out->data()), - npu_place, recvbuff, + npu_place, recvbuff, numel * sizeof(T), stream); - + out->Resize(in->dims()); #else PADDLE_THROW(platform::errors::PreconditionNotMet( diff --git a/paddle/fluid/operators/collective/c_allreduce_sum_op_npu_test.cc b/paddle/fluid/operators/collective/c_allreduce_sum_op_npu_test.cc index 6e7daf512ede58405f8251b19ef49a3277b91512..ed3a7f50b9972b5d8b7a962fc29b1fc5d7bce02d 100644 --- a/paddle/fluid/operators/collective/c_allreduce_sum_op_npu_test.cc +++ b/paddle/fluid/operators/collective/c_allreduce_sum_op_npu_test.cc @@ -117,7 +117,9 @@ void TestHCCLAllReduceOp(f::Scope* scope, const p::DeviceContext& ctx) { {{"Out", {"Out"}}}, attrs); - op->Run(*scope, place); + for (int i = 0; i < 10; i ++) { + op->Run(*scope, place); + } ctx.Wait(); std::vector out_vec; diff --git a/paddle/fluid/operators/collective/c_broadcast_op_npu.cc b/paddle/fluid/operators/collective/c_broadcast_op_npu.cc index d826411ddc2a46514fc9bebbfddedb9343f7374d..67410a1c70426a84ecdcf1e78d6c2f85a4d8048f 100644 --- a/paddle/fluid/operators/collective/c_broadcast_op_npu.cc +++ b/paddle/fluid/operators/collective/c_broadcast_op_npu.cc @@ -48,7 +48,7 @@ class CBroadcastOpASCENDKernel : public framework::OpKernel { int root = ctx.Attr("root"); std::string group = std::string(HCOM_GROUP_PREFIX) + std::to_string(ring_id); - std::string tag = ctx.Attr("tag"); + std::string tag = std::to_string(ring_id) + "_" + std::to_string(comm->NextTagId()); VLOG(3) << "begin hccl broadcast, parameter is: "<< "root " << root << ", group is " << group diff --git a/paddle/fluid/operators/collective/c_broadcast_op_npu_test.cc b/paddle/fluid/operators/collective/c_broadcast_op_npu_test.cc index 65045bce75737a06da21d92a5185433b5211a612..66158e5ff28ae5e7ced4a9f7d7fafc8c19d57d02 100644 --- a/paddle/fluid/operators/collective/c_broadcast_op_npu_test.cc +++ b/paddle/fluid/operators/collective/c_broadcast_op_npu_test.cc @@ -113,7 +113,9 @@ void TestHCCLBroadcastOp(f::Scope* scope, const p::DeviceContext& ctx) { auto op = f::OpRegistry::CreateOp("c_broadcast", {{"X", {"X"}}}, {{"Out", {"Out"}}}, attrs); - op->Run(*scope, place); + for (int i = 0; i < 10; i ++) { + op->Run(*scope, place); + } ctx.Wait(); std::vector out_vec; diff --git a/paddle/fluid/operators/collective/c_reducescatter_op_npu.cc b/paddle/fluid/operators/collective/c_reducescatter_op_npu.cc index 4658647ac943f8bdd6857f5854bdc458a506bd13..e38bcdea27da7766a2e4d396c6769705d0acae48 100644 --- a/paddle/fluid/operators/collective/c_reducescatter_op_npu.cc +++ b/paddle/fluid/operators/collective/c_reducescatter_op_npu.cc @@ -32,10 +32,10 @@ class CReduceScatterOpAscendKernel : public framework::OpKernel { int ring_id = ctx.Attr("ring_id"); std::string group = std::string(HCOM_GROUP_PREFIX) + std::to_string(ring_id); - std::string tag = ctx.Attr("tag"); auto place = ctx.GetPlace(); auto comm = platform::HCCLCommContext::Instance().Get(ring_id, place); int nranks = comm->nranks(); + std::string tag = std::to_string(ring_id) + "_" + std::to_string(comm->NextTagId()); auto out_dims = in->dims(); PADDLE_ENFORCE_EQ(out_dims[0] % nranks, 0, @@ -43,7 +43,7 @@ class CReduceScatterOpAscendKernel : public framework::OpKernel { "The input tensor X's " "dim[0] (%d) should be divisible by nranks(%d)", out_dims[0], nranks)); - + out_dims[0] = out_dims[0] / nranks; out->mutable_data(out_dims, place); @@ -66,7 +66,7 @@ class CReduceScatterOpAscendKernel : public framework::OpKernel { << "hccl_red_type: " << HCCL_REP_OP_SUM << ", group is: " << group << ", tag is " << tag; - + PADDLE_ENFORCE_NPU_SUCCESS(platform::dynload::hcom_reduce_scatter( tag.c_str(), inputPtr, outputPtr, (u64)recv_numel, dtype, HCCL_REP_OP_SUM, group.c_str(), (void*)stream)); #else @@ -82,7 +82,7 @@ class CReduceScatterOpAscendKernel : public framework::OpKernel { namespace ops = paddle::operators; namespace plat = paddle::platform; -REGISTER_OP_NPU_KERNEL(c_reducescatter, +REGISTER_OP_NPU_KERNEL(c_reducescatter, ops::CReduceScatterOpAscendKernel, ops::CReduceScatterOpAscendKernel, ops::CReduceScatterOpAscendKernel, diff --git a/paddle/fluid/operators/collective/c_reducescatter_op_npu_test.cc b/paddle/fluid/operators/collective/c_reducescatter_op_npu_test.cc index c04dee5b6924253f54446455b206db61ce4e37f8..5bf6539d489ebbe36921c706faaf70b26f694d25 100644 --- a/paddle/fluid/operators/collective/c_reducescatter_op_npu_test.cc +++ b/paddle/fluid/operators/collective/c_reducescatter_op_npu_test.cc @@ -119,7 +119,9 @@ void TestHCCLReduceScatterOp(f::Scope* scope, const p::DeviceContext& ctx) { auto op = f::OpRegistry::CreateOp("c_reducescatter", {{"X", {"X"}}}, {{"Out", {"Out"}}}, attrs); - op->Run(*scope, place); + for (int i = 0; i < 10; i ++) { + op->Run(*scope, place); + } ctx.Wait(); std::vector out_vec; diff --git a/paddle/fluid/operators/collective/recv_v2_op_npu.cc b/paddle/fluid/operators/collective/recv_v2_op_npu.cc index 8d49c6e278e6c1729aa9c1c68c7275b1b4ad6976..80dad82386d1fc1e1f350e825901d1720f48d70c 100644 --- a/paddle/fluid/operators/collective/recv_v2_op_npu.cc +++ b/paddle/fluid/operators/collective/recv_v2_op_npu.cc @@ -42,7 +42,7 @@ class CRecvOpASCENDKernel : public framework::OpKernel { } else { stream = comm->stream(); } - std::string tag = ctx.Attr("tag"); + std::string tag = std::to_string(ring_id) + "_" + std::to_string(comm->NextTagId()); std::string group = std::string(HCOM_GROUP_PREFIX) + std::to_string(ring_id); int srcRank = ctx.Attr("peer"); int srTag = ctx.Attr("srTag"); @@ -66,7 +66,7 @@ class CRecvOpASCENDKernel : public framework::OpKernel { namespace ops = paddle::operators; namespace plat = paddle::platform; -REGISTER_OP_NPU_KERNEL(recv_v2, +REGISTER_OP_NPU_KERNEL(recv_v2, ops::CRecvOpASCENDKernel, ops::CRecvOpASCENDKernel, ops::CRecvOpASCENDKernel, diff --git a/paddle/fluid/operators/collective/recv_v2_op_npu_test.cc b/paddle/fluid/operators/collective/recv_v2_op_npu_test.cc index 22492445a33c2f763f56478726037b2a5ad1477f..727d8be5a8f9ae4f7190bd283c927cdb42408ef5 100644 --- a/paddle/fluid/operators/collective/recv_v2_op_npu_test.cc +++ b/paddle/fluid/operators/collective/recv_v2_op_npu_test.cc @@ -99,7 +99,9 @@ void TestHcomRecvOp(f::Scope* scope, const p::DeviceContext& ctx){ auto op = f::OpRegistry::CreateOp("recv_v2", {}, {{"Out", {"Out"}}}, attrs); VLOG(3) << "CreateOp recv_v2"; - op->Run(*scope, place); + for (int i = 0; i < 10; i ++) { + op->Run(*scope, place); + } VLOG(3) << "Run op recv_v2"; std::vector out_vec; TensorToVector(*tensor_out, ctx, &out_vec); diff --git a/paddle/fluid/operators/collective/send_v2_op_npu.cc b/paddle/fluid/operators/collective/send_v2_op_npu.cc index d0663ea42cb41c14b8703f74d668ebdfb46cf80e..15fcec269c5699f6a42c59a98eec7536100eea73 100644 --- a/paddle/fluid/operators/collective/send_v2_op_npu.cc +++ b/paddle/fluid/operators/collective/send_v2_op_npu.cc @@ -42,7 +42,7 @@ class CSendOpASCENDKernel : public framework::OpKernel { } else { stream = comm->stream(); } - std::string tag = ctx.Attr("tag"); + std::string tag = std::to_string(ring_id) + "_" + std::to_string(comm->NextTagId()); std::string group = std::string(HCOM_GROUP_PREFIX) + std::to_string(ring_id); int destRank = ctx.Attr("peer"); int srTag = ctx.Attr("srTag"); @@ -50,7 +50,7 @@ class CSendOpASCENDKernel : public framework::OpKernel { PADDLE_ENFORCE_NPU_SUCCESS(platform::dynload::hcom_send( tag.c_str(), reinterpret_cast(const_cast(x->data())), (u64)numel, dtype, destRank, srTag, group.c_str(), stream)); - + VLOG(3) << "Dest rank:" << destRank << " Invoke hcom send. Sent " << x->numel(); @@ -67,7 +67,7 @@ class CSendOpASCENDKernel : public framework::OpKernel { namespace ops = paddle::operators; namespace plat = paddle::platform; -REGISTER_OP_NPU_KERNEL(send_v2, +REGISTER_OP_NPU_KERNEL(send_v2, ops::CSendOpASCENDKernel, ops::CSendOpASCENDKernel, ops::CSendOpASCENDKernel, diff --git a/paddle/fluid/operators/collective/send_v2_op_npu_test.cc b/paddle/fluid/operators/collective/send_v2_op_npu_test.cc index 1759633d9e8b6c44913a9c05049afa35cc034472..7916d155ee7617aa513359070d04f0a00b262339 100644 --- a/paddle/fluid/operators/collective/send_v2_op_npu_test.cc +++ b/paddle/fluid/operators/collective/send_v2_op_npu_test.cc @@ -90,7 +90,9 @@ void TestHcomSendOp(f::Scope* scope, const p::DeviceContext& ctx){ auto op = f::OpRegistry::CreateOp("send_v2", {{"X", {"X"}}}, {}, attrs); - op->Run(*scope, place); + for (int i = 0; i < 10; i ++) { + op->Run(*scope, place); + } VLOG(3)<<"send run over"; ctx.Wait(); } diff --git a/paddle/fluid/platform/collective_helper.h b/paddle/fluid/platform/collective_helper.h index e21919a429b2b44463f5fef2c607861eeb1179f2..8a205fdf0b787b26a60119cb1d7c36abb03d7b23 100644 --- a/paddle/fluid/platform/collective_helper.h +++ b/paddle/fluid/platform/collective_helper.h @@ -18,6 +18,7 @@ #include #include #include +#include #include "boost/variant.hpp" #include "paddle/fluid/platform/enforce.h" @@ -148,8 +149,7 @@ class NCCLCommContext { class NPUDeviceContext; #define ENV_RANK_TABLE_FILE "RANK_TABLE_FILE" -#define ENV_RANK_ID "RANK_ID" -#define ENV_DEV_ID "DEV_ID" +#define ENV_RANK_ID "PADDLE_TRAINER_ID" class HCCLComm { public: @@ -160,6 +160,12 @@ class HCCLComm { virtual aclrtStream stream() const = 0; virtual NPUDeviceContext* dev_context() const = 0; virtual ~HCCLComm() = default; + + unsigned long NextTagId() { + return tag_counter_++; + } + private: + std::atomic tag_counter_; }; // A singleton HCCL communicator context reserves communication ring ids @@ -208,10 +214,12 @@ class HCCLCommContext { return Get(ring_id, BOOST_GET_CONST(NPUPlace, place).device); } + private: // Init global hcom HCCLCommContext() { InitHcomWorldGroup(); } + public: ~HCCLCommContext(){ PADDLE_ENFORCE_NPU_SUCCESS(platform::dynload::hcom_destroy());