From 7b450e78897c96230a3d55d9cb0c2e2322e9c66d Mon Sep 17 00:00:00 2001 From: Void Main Date: Thu, 18 Mar 2021 21:36:16 +0800 Subject: [PATCH] Add auto-increasing tag id for Hcom OPs (#31702) --- .../operators/collective/c_allgather_op_npu.cc | 2 +- .../collective/c_allgather_op_npu_test.cc | 4 +++- .../collective/c_allreduce_max_op_npu_test.cc | 4 +++- .../operators/collective/c_allreduce_op.h | 18 +++++++++++------- .../collective/c_allreduce_sum_op_npu_test.cc | 4 +++- .../operators/collective/c_broadcast_op_npu.cc | 2 +- .../collective/c_broadcast_op_npu_test.cc | 4 +++- .../collective/c_reducescatter_op_npu.cc | 8 ++++---- .../collective/c_reducescatter_op_npu_test.cc | 4 +++- .../operators/collective/recv_v2_op_npu.cc | 4 ++-- .../collective/recv_v2_op_npu_test.cc | 4 +++- .../operators/collective/send_v2_op_npu.cc | 6 +++--- .../collective/send_v2_op_npu_test.cc | 4 +++- paddle/fluid/platform/collective_helper.h | 12 ++++++++++-- 14 files changed, 53 insertions(+), 27 deletions(-) diff --git a/paddle/fluid/operators/collective/c_allgather_op_npu.cc b/paddle/fluid/operators/collective/c_allgather_op_npu.cc index 2ff1227f307..ea6caf954d1 100644 --- a/paddle/fluid/operators/collective/c_allgather_op_npu.cc +++ b/paddle/fluid/operators/collective/c_allgather_op_npu.cc @@ -35,10 +35,10 @@ class CAllGatherOpASCENDKernel : public framework::OpKernel { int ring_id = ctx.Attr("ring_id"); std::string group = std::string(HCOM_GROUP_PREFIX) + std::to_string(ring_id); - std::string tag = ctx.Attr("tag"); auto place = ctx.GetPlace(); auto comm = platform::HCCLCommContext::Instance().Get(ring_id, place); int nranks = comm->nranks(); + std::string tag = std::to_string(ring_id) + "_" + std::to_string(comm->NextTagId()); framework::DDim out_dims = in->dims(); out_dims[0] *= nranks; diff --git a/paddle/fluid/operators/collective/c_allgather_op_npu_test.cc b/paddle/fluid/operators/collective/c_allgather_op_npu_test.cc index c8f06eab2d8..38f19170af9 100644 --- a/paddle/fluid/operators/collective/c_allgather_op_npu_test.cc +++ b/paddle/fluid/operators/collective/c_allgather_op_npu_test.cc @@ -119,7 +119,9 @@ void TestHCCLAllGatherOp(f::Scope* scope, const p::DeviceContext& ctx) { auto op = f::OpRegistry::CreateOp("c_allgather", {{"X", {"X"}}}, {{"Out", {"Out"}}}, attrs); - op->Run(*scope, place); + for (int i = 0; i < 10; i ++) { + op->Run(*scope, place); + } ctx.Wait(); std::vector out_vec; diff --git a/paddle/fluid/operators/collective/c_allreduce_max_op_npu_test.cc b/paddle/fluid/operators/collective/c_allreduce_max_op_npu_test.cc index 3631442a69e..3fdc8595067 100644 --- a/paddle/fluid/operators/collective/c_allreduce_max_op_npu_test.cc +++ b/paddle/fluid/operators/collective/c_allreduce_max_op_npu_test.cc @@ -118,7 +118,9 @@ void TestHCCLAllReduceOp(f::Scope* scope, const p::DeviceContext& ctx) { auto op = f::OpRegistry::CreateOp("c_allreduce_max", {{"X", {"X"}}}, {{"Out", {"Out"}}}, attrs); - op->Run(*scope, place); + for (int i = 0; i < 10; i ++) { + op->Run(*scope, place); + } ctx.Wait(); std::vector out_vec; diff --git a/paddle/fluid/operators/collective/c_allreduce_op.h b/paddle/fluid/operators/collective/c_allreduce_op.h index b990e18f6ff..0011d9e9ad8 100644 --- a/paddle/fluid/operators/collective/c_allreduce_op.h +++ b/paddle/fluid/operators/collective/c_allreduce_op.h @@ -118,7 +118,7 @@ class CAllReduceOpASCENDKernel : public framework::OpKernel { void Compute(const framework::ExecutionContext& ctx) const override { #if defined(PADDLE_WITH_ASCEND_CL) - // we need to pre-allocate 512 Bytes before the data + // we need to pre-allocate 512 Bytes before the data // and 512 Bytes after the data, so the hccl allreduce // can work. This is a must acooding to huawei peer. #define PRE_MALLOC_SIZE_BYTES 512 @@ -135,16 +135,16 @@ class CAllReduceOpASCENDKernel : public framework::OpKernel { paddle::framework::LoDTensor tmp_in, tmp_out; tmp_in.Resize({tmp_numel}); tmp_out.Resize({tmp_numel}); - tmp_in.mutable_data(place); // allocate - tmp_out.mutable_data(place); // allocate + auto p_tmp_in = tmp_in.mutable_data(place); // allocate + auto p_tmp_out = tmp_out.mutable_data(place); // allocate void* sendbuff = reinterpret_cast(tmp_in.data() + pre_tmp_size); void* recvbuff = reinterpret_cast(tmp_out.data() + pre_tmp_size); - std::string tag = ctx.Attr("tag"); int ring_id = ctx.Attr("ring_id"); std::string group = std::string(HCOM_GROUP_PREFIX) + std::to_string(ring_id); auto comm = paddle::platform::HCCLCommContext::Instance().Get(ring_id, place); + std::string tag = std::to_string(ring_id) + "_" + std::to_string(comm->NextTagId()); aclrtStream stream = nullptr; auto dev_ctx = platform::DeviceContextPool::Instance().Get(place); @@ -154,9 +154,13 @@ class CAllReduceOpASCENDKernel : public framework::OpKernel { stream = comm->stream(); } + // we need to memset this memory firstly to avoid core by hccl + platform::NPUMemsetAsync(static_cast(p_tmp_in), 0, tmp_numel*sizeof(T), stream); + platform::NPUMemsetAsync(static_cast(p_tmp_out), 0, tmp_numel*sizeof(T), stream); + auto npu_place = BOOST_GET_CONST(platform::NPUPlace, place); - memory::Copy(npu_place, sendbuff, + memory::Copy(npu_place, sendbuff, npu_place, reinterpret_cast(const_cast(in->data())), numel * sizeof(T), stream); @@ -195,10 +199,10 @@ class CAllReduceOpASCENDKernel : public framework::OpKernel { tag.c_str(), sendbuff, recvbuff, numel, dtype, hccl_red_type, group.c_str(), (void*)stream)); memory::Copy(npu_place, reinterpret_cast(out->data()), - npu_place, recvbuff, + npu_place, recvbuff, numel * sizeof(T), stream); - + out->Resize(in->dims()); #else PADDLE_THROW(platform::errors::PreconditionNotMet( diff --git a/paddle/fluid/operators/collective/c_allreduce_sum_op_npu_test.cc b/paddle/fluid/operators/collective/c_allreduce_sum_op_npu_test.cc index 6e7daf512ed..ed3a7f50b99 100644 --- a/paddle/fluid/operators/collective/c_allreduce_sum_op_npu_test.cc +++ b/paddle/fluid/operators/collective/c_allreduce_sum_op_npu_test.cc @@ -117,7 +117,9 @@ void TestHCCLAllReduceOp(f::Scope* scope, const p::DeviceContext& ctx) { {{"Out", {"Out"}}}, attrs); - op->Run(*scope, place); + for (int i = 0; i < 10; i ++) { + op->Run(*scope, place); + } ctx.Wait(); std::vector out_vec; diff --git a/paddle/fluid/operators/collective/c_broadcast_op_npu.cc b/paddle/fluid/operators/collective/c_broadcast_op_npu.cc index d826411ddc2..67410a1c704 100644 --- a/paddle/fluid/operators/collective/c_broadcast_op_npu.cc +++ b/paddle/fluid/operators/collective/c_broadcast_op_npu.cc @@ -48,7 +48,7 @@ class CBroadcastOpASCENDKernel : public framework::OpKernel { int root = ctx.Attr("root"); std::string group = std::string(HCOM_GROUP_PREFIX) + std::to_string(ring_id); - std::string tag = ctx.Attr("tag"); + std::string tag = std::to_string(ring_id) + "_" + std::to_string(comm->NextTagId()); VLOG(3) << "begin hccl broadcast, parameter is: "<< "root " << root << ", group is " << group diff --git a/paddle/fluid/operators/collective/c_broadcast_op_npu_test.cc b/paddle/fluid/operators/collective/c_broadcast_op_npu_test.cc index 65045bce757..66158e5ff28 100644 --- a/paddle/fluid/operators/collective/c_broadcast_op_npu_test.cc +++ b/paddle/fluid/operators/collective/c_broadcast_op_npu_test.cc @@ -113,7 +113,9 @@ void TestHCCLBroadcastOp(f::Scope* scope, const p::DeviceContext& ctx) { auto op = f::OpRegistry::CreateOp("c_broadcast", {{"X", {"X"}}}, {{"Out", {"Out"}}}, attrs); - op->Run(*scope, place); + for (int i = 0; i < 10; i ++) { + op->Run(*scope, place); + } ctx.Wait(); std::vector out_vec; diff --git a/paddle/fluid/operators/collective/c_reducescatter_op_npu.cc b/paddle/fluid/operators/collective/c_reducescatter_op_npu.cc index 4658647ac94..e38bcdea27d 100644 --- a/paddle/fluid/operators/collective/c_reducescatter_op_npu.cc +++ b/paddle/fluid/operators/collective/c_reducescatter_op_npu.cc @@ -32,10 +32,10 @@ class CReduceScatterOpAscendKernel : public framework::OpKernel { int ring_id = ctx.Attr("ring_id"); std::string group = std::string(HCOM_GROUP_PREFIX) + std::to_string(ring_id); - std::string tag = ctx.Attr("tag"); auto place = ctx.GetPlace(); auto comm = platform::HCCLCommContext::Instance().Get(ring_id, place); int nranks = comm->nranks(); + std::string tag = std::to_string(ring_id) + "_" + std::to_string(comm->NextTagId()); auto out_dims = in->dims(); PADDLE_ENFORCE_EQ(out_dims[0] % nranks, 0, @@ -43,7 +43,7 @@ class CReduceScatterOpAscendKernel : public framework::OpKernel { "The input tensor X's " "dim[0] (%d) should be divisible by nranks(%d)", out_dims[0], nranks)); - + out_dims[0] = out_dims[0] / nranks; out->mutable_data(out_dims, place); @@ -66,7 +66,7 @@ class CReduceScatterOpAscendKernel : public framework::OpKernel { << "hccl_red_type: " << HCCL_REP_OP_SUM << ", group is: " << group << ", tag is " << tag; - + PADDLE_ENFORCE_NPU_SUCCESS(platform::dynload::hcom_reduce_scatter( tag.c_str(), inputPtr, outputPtr, (u64)recv_numel, dtype, HCCL_REP_OP_SUM, group.c_str(), (void*)stream)); #else @@ -82,7 +82,7 @@ class CReduceScatterOpAscendKernel : public framework::OpKernel { namespace ops = paddle::operators; namespace plat = paddle::platform; -REGISTER_OP_NPU_KERNEL(c_reducescatter, +REGISTER_OP_NPU_KERNEL(c_reducescatter, ops::CReduceScatterOpAscendKernel, ops::CReduceScatterOpAscendKernel, ops::CReduceScatterOpAscendKernel, diff --git a/paddle/fluid/operators/collective/c_reducescatter_op_npu_test.cc b/paddle/fluid/operators/collective/c_reducescatter_op_npu_test.cc index c04dee5b692..5bf6539d489 100644 --- a/paddle/fluid/operators/collective/c_reducescatter_op_npu_test.cc +++ b/paddle/fluid/operators/collective/c_reducescatter_op_npu_test.cc @@ -119,7 +119,9 @@ void TestHCCLReduceScatterOp(f::Scope* scope, const p::DeviceContext& ctx) { auto op = f::OpRegistry::CreateOp("c_reducescatter", {{"X", {"X"}}}, {{"Out", {"Out"}}}, attrs); - op->Run(*scope, place); + for (int i = 0; i < 10; i ++) { + op->Run(*scope, place); + } ctx.Wait(); std::vector out_vec; diff --git a/paddle/fluid/operators/collective/recv_v2_op_npu.cc b/paddle/fluid/operators/collective/recv_v2_op_npu.cc index 8d49c6e278e..80dad82386d 100644 --- a/paddle/fluid/operators/collective/recv_v2_op_npu.cc +++ b/paddle/fluid/operators/collective/recv_v2_op_npu.cc @@ -42,7 +42,7 @@ class CRecvOpASCENDKernel : public framework::OpKernel { } else { stream = comm->stream(); } - std::string tag = ctx.Attr("tag"); + std::string tag = std::to_string(ring_id) + "_" + std::to_string(comm->NextTagId()); std::string group = std::string(HCOM_GROUP_PREFIX) + std::to_string(ring_id); int srcRank = ctx.Attr("peer"); int srTag = ctx.Attr("srTag"); @@ -66,7 +66,7 @@ class CRecvOpASCENDKernel : public framework::OpKernel { namespace ops = paddle::operators; namespace plat = paddle::platform; -REGISTER_OP_NPU_KERNEL(recv_v2, +REGISTER_OP_NPU_KERNEL(recv_v2, ops::CRecvOpASCENDKernel, ops::CRecvOpASCENDKernel, ops::CRecvOpASCENDKernel, diff --git a/paddle/fluid/operators/collective/recv_v2_op_npu_test.cc b/paddle/fluid/operators/collective/recv_v2_op_npu_test.cc index 22492445a33..727d8be5a8f 100644 --- a/paddle/fluid/operators/collective/recv_v2_op_npu_test.cc +++ b/paddle/fluid/operators/collective/recv_v2_op_npu_test.cc @@ -99,7 +99,9 @@ void TestHcomRecvOp(f::Scope* scope, const p::DeviceContext& ctx){ auto op = f::OpRegistry::CreateOp("recv_v2", {}, {{"Out", {"Out"}}}, attrs); VLOG(3) << "CreateOp recv_v2"; - op->Run(*scope, place); + for (int i = 0; i < 10; i ++) { + op->Run(*scope, place); + } VLOG(3) << "Run op recv_v2"; std::vector out_vec; TensorToVector(*tensor_out, ctx, &out_vec); diff --git a/paddle/fluid/operators/collective/send_v2_op_npu.cc b/paddle/fluid/operators/collective/send_v2_op_npu.cc index d0663ea42cb..15fcec269c5 100644 --- a/paddle/fluid/operators/collective/send_v2_op_npu.cc +++ b/paddle/fluid/operators/collective/send_v2_op_npu.cc @@ -42,7 +42,7 @@ class CSendOpASCENDKernel : public framework::OpKernel { } else { stream = comm->stream(); } - std::string tag = ctx.Attr("tag"); + std::string tag = std::to_string(ring_id) + "_" + std::to_string(comm->NextTagId()); std::string group = std::string(HCOM_GROUP_PREFIX) + std::to_string(ring_id); int destRank = ctx.Attr("peer"); int srTag = ctx.Attr("srTag"); @@ -50,7 +50,7 @@ class CSendOpASCENDKernel : public framework::OpKernel { PADDLE_ENFORCE_NPU_SUCCESS(platform::dynload::hcom_send( tag.c_str(), reinterpret_cast(const_cast(x->data())), (u64)numel, dtype, destRank, srTag, group.c_str(), stream)); - + VLOG(3) << "Dest rank:" << destRank << " Invoke hcom send. Sent " << x->numel(); @@ -67,7 +67,7 @@ class CSendOpASCENDKernel : public framework::OpKernel { namespace ops = paddle::operators; namespace plat = paddle::platform; -REGISTER_OP_NPU_KERNEL(send_v2, +REGISTER_OP_NPU_KERNEL(send_v2, ops::CSendOpASCENDKernel, ops::CSendOpASCENDKernel, ops::CSendOpASCENDKernel, diff --git a/paddle/fluid/operators/collective/send_v2_op_npu_test.cc b/paddle/fluid/operators/collective/send_v2_op_npu_test.cc index 1759633d9e8..7916d155ee7 100644 --- a/paddle/fluid/operators/collective/send_v2_op_npu_test.cc +++ b/paddle/fluid/operators/collective/send_v2_op_npu_test.cc @@ -90,7 +90,9 @@ void TestHcomSendOp(f::Scope* scope, const p::DeviceContext& ctx){ auto op = f::OpRegistry::CreateOp("send_v2", {{"X", {"X"}}}, {}, attrs); - op->Run(*scope, place); + for (int i = 0; i < 10; i ++) { + op->Run(*scope, place); + } VLOG(3)<<"send run over"; ctx.Wait(); } diff --git a/paddle/fluid/platform/collective_helper.h b/paddle/fluid/platform/collective_helper.h index e21919a429b..8a205fdf0b7 100644 --- a/paddle/fluid/platform/collective_helper.h +++ b/paddle/fluid/platform/collective_helper.h @@ -18,6 +18,7 @@ #include #include #include +#include #include "boost/variant.hpp" #include "paddle/fluid/platform/enforce.h" @@ -148,8 +149,7 @@ class NCCLCommContext { class NPUDeviceContext; #define ENV_RANK_TABLE_FILE "RANK_TABLE_FILE" -#define ENV_RANK_ID "RANK_ID" -#define ENV_DEV_ID "DEV_ID" +#define ENV_RANK_ID "PADDLE_TRAINER_ID" class HCCLComm { public: @@ -160,6 +160,12 @@ class HCCLComm { virtual aclrtStream stream() const = 0; virtual NPUDeviceContext* dev_context() const = 0; virtual ~HCCLComm() = default; + + unsigned long NextTagId() { + return tag_counter_++; + } + private: + std::atomic tag_counter_; }; // A singleton HCCL communicator context reserves communication ring ids @@ -208,10 +214,12 @@ class HCCLCommContext { return Get(ring_id, BOOST_GET_CONST(NPUPlace, place).device); } + private: // Init global hcom HCCLCommContext() { InitHcomWorldGroup(); } + public: ~HCCLCommContext(){ PADDLE_ENFORCE_NPU_SUCCESS(platform::dynload::hcom_destroy()); -- GitLab