diff --git a/paddle/operators/nccl_op.cc b/paddle/operators/nccl_op.cc index 5b6c9bec70178f8bc7cb26011ee236a562b22ed0..67bcc419fa4c72fb5ef4b262b121890513a3ff7b 100644 --- a/paddle/operators/nccl_op.cc +++ b/paddle/operators/nccl_op.cc @@ -100,8 +100,8 @@ class NCCLReduceOp : public framework::OperatorWithKernel { } }; -// BcastSendOp -class NCCLBcastSendOp : public framework::OperatorWithKernel { +// BcastOp +class NCCLBcastOp : public framework::OperatorWithKernel { public: using framework::OperatorWithKernel::OperatorWithKernel; @@ -111,20 +111,12 @@ class NCCLBcastSendOp : public framework::OperatorWithKernel { " Input(X) of Bcast op input should not be NULL"); PADDLE_ENFORCE(ctx->HasInput("Communicator"), " Input(Communicator) of Bcast op input should not be NULL"); - } -}; - -// BcastRecvOp -class NCCLBcastRecvOp : public framework::OperatorWithKernel { - public: - using framework::OperatorWithKernel::OperatorWithKernel; - - protected: - void InferShape(framework::InferShapeContext *ctx) const override { - PADDLE_ENFORCE(ctx->HasInput("Communicator"), - " Input(Communicator) of Bcast op input should not be NULL"); PADDLE_ENFORCE(ctx->HasOutput("Out"), " Output(Out) of Bcast op output should not be NULL"); + + auto x_dims = ctx->GetInputsDim("X"); + ctx->SetOutputsDim("Out", x_dims); + ctx->ShareLoD("X", /*->*/ "Out"); } }; @@ -146,52 +138,41 @@ class NCCLAllReduceOpMaker : public framework::OpProtoAndCheckerMaker { } }; -// BcastSend should be in the root -// BcastSendOp -class NCCLBcastSendOpMaker : public framework::OpProtoAndCheckerMaker { +// ReduceOp +class NCCLReduceOpMaker : public framework::OpProtoAndCheckerMaker { public: - NCCLBcastSendOpMaker(framework::OpProto *proto, - framework::OpAttrChecker *op_checker) + NCCLReduceOpMaker(framework::OpProto *proto, + framework::OpAttrChecker *op_checker) : OpProtoAndCheckerMaker(proto, op_checker) { - AddInput("X", "The input of BcastSend op"); + AddInput("X", "The input of Reduce op"); AddInput("Communicator", "Communicator for communicating between gpus"); - AddAttr("root", "root gpu of Bcast"); + AddOutput("Out", "The output of Reduce op"); + AddAttr("root", + "root gpu of the parameter. if not set(-1). hashed by name.") + .SetDefault(-1); AddComment(R"DOC( - Bcast the tensors. - )DOC"); + Reduce the tensors)DOC"); } }; // BcastOp -class NCCLBcastRecvOpMaker : public framework::OpProtoAndCheckerMaker { +class NCCLBcastOpMaker : public framework::OpProtoAndCheckerMaker { public: - NCCLBcastRecvOpMaker(framework::OpProto *proto, - framework::OpAttrChecker *op_checker) + NCCLBcastOpMaker(framework::OpProto *proto, + framework::OpAttrChecker *op_checker) : OpProtoAndCheckerMaker(proto, op_checker) { + AddInput("X", "The input of BcastSend op"); AddInput("Communicator", "Communicator for communicating between gpus"); - AddAttr("root", "root gpu of BcastRecv"); AddOutput("Out", "The output of Bcast"); + AddAttr("root", + "root gpu of the parameter. if not set(-1). hashed by name.") + .SetDefault(-1); AddComment(R"DOC( Bcast the tensors. )DOC"); } }; -// BcastRecvOp -class NCCLReduceOpMaker : public framework::OpProtoAndCheckerMaker { - public: - NCCLReduceOpMaker(framework::OpProto *proto, - framework::OpAttrChecker *op_checker) - : OpProtoAndCheckerMaker(proto, op_checker) { - AddInput("X", "The input of Reduce op"); - AddInput("Communicator", "Communicator for communicating between gpus"); - AddOutput("Out", "The output of Reduce op"); - AddComment(R"DOC( - Reduce the tensors. - )DOC"); - } -}; - } // namespace operators } // namespace paddle @@ -201,9 +182,7 @@ REGISTER_OPERATOR(ncclInit, ops::NCCLInitOp, REGISTER_OP_WITHOUT_GRADIENT(ncclAllReduce, ops::NCCLAllReduceOp, ops::NCCLAllReduceOpMaker); -REGISTER_OP_WITHOUT_GRADIENT(ncclBcastSend, ops::NCCLBcastSendOp, - ops::NCCLBcastSendOpMaker); -REGISTER_OP_WITHOUT_GRADIENT(ncclBcastRecv, ops::NCCLBcastRecvOp, - ops::NCCLBcastRecvOpMaker); +REGISTER_OP_WITHOUT_GRADIENT(ncclBcast, ops::NCCLBcastOp, + ops::NCCLBcastOpMaker); REGISTER_OP_WITHOUT_GRADIENT(ncclReduce, ops::NCCLReduceOp, ops::NCCLReduceOpMaker); diff --git a/paddle/operators/nccl_op.cu b/paddle/operators/nccl_op.cu index 68d0d5b7c9060724facee0ead4f4a7547506d6ee..eb7d4387efe817580a90c5bde7ee46700482c85f 100644 --- a/paddle/operators/nccl_op.cu +++ b/paddle/operators/nccl_op.cu @@ -83,6 +83,7 @@ class NCCLReduceKernel : public framework::OpKernel { auto ins = ctx.MultiInput("X"); // x0, x1, x2 auto outs = ctx.MultiOutput("Out"); + int root = ctx.Attr("root"); auto* comm = ctx.Input("Communicator"); @@ -97,7 +98,9 @@ class NCCLReduceKernel : public framework::OpKernel { auto ins_names = ctx.Inputs("X"); std::hash hasher; for (size_t i = 0; i < ins.size(); ++i) { - int root = hasher(ins_names[i]) % comm->comms_.size(); + if (root == -1) { + root = hasher(ins_names[i]) % comm->comms_.size(); + } T* recvbuffer = nullptr; if (root == device_id) { recvbuffer = outs[i]->mutable_data(ctx.GetPlace()); @@ -135,8 +138,9 @@ class NCCLBcastKernel : public framework::OpKernel { int device_id = boost::get(ctx.GetPlace()).GetDeviceId(); int idx = comm->GetCommId(device_id); + if (idx == root) { - auto ins = ctx.MultiInput("X"); + auto ins = ctx.MultiInput("X"); for (size_t i = 0; i < ins.size(); ++i) { PADDLE_ENFORCE(platform::dynload::ncclBcast( (void*)ins[i]->data(), ins[i]->numel(), NCCLTypeWrapper::type, @@ -144,7 +148,7 @@ class NCCLBcastKernel : public framework::OpKernel { PADDLE_ENFORCE(cudaStreamSynchronize(stream)); } } else { - auto outs = ctx.MultiOutput("Out"); + auto outs = ctx.MultiOutput("Out"); for (size_t i = 0; i < outs.size(); ++i) { PADDLE_ENFORCE(platform::dynload::ncclBcast( outs[i]->mutable_data(ctx.GetPlace()), outs[i]->numel(), @@ -160,6 +164,5 @@ class NCCLBcastKernel : public framework::OpKernel { namespace ops = paddle::operators; REGISTER_OP_GPU_KERNEL(ncclAllReduce, ops::NCCLAllReduceKernel); -REGISTER_OP_GPU_KERNEL(ncclBcastSend, ops::NCCLBcastKernel); +REGISTER_OP_GPU_KERNEL(ncclBcast, ops::NCCLBcastKernel); REGISTER_OP_GPU_KERNEL(ncclReduce, ops::NCCLReduceKernel); -REGISTER_OP_GPU_KERNEL(ncclBcastRecv, ops::NCCLBcastKernel); diff --git a/paddle/operators/nccl_op_test.cu b/paddle/operators/nccl_op_test.cu index 0eda0c6b57d5091546a4aba33860fdeb94abca4f..71491d47bbd87232e294a7c675a51ba4ff2ab620 100644 --- a/paddle/operators/nccl_op_test.cu +++ b/paddle/operators/nccl_op_test.cu @@ -28,6 +28,7 @@ #include "paddle/framework/op_registry.h" #include "paddle/framework/program_desc.h" #include "paddle/framework/var_desc.h" +#include "paddle/operators/math/math_function.h" #include "paddle/operators/nccl/nccl_gpu_common.h" #include "paddle/platform/device_context.h" #include "paddle/platform/enforce.h" @@ -37,8 +38,7 @@ USE_NO_KERNEL_OP(ncclInit); USE_GPU_ONLY_OP(ncclAllReduce); USE_GPU_ONLY_OP(ncclReduce); -USE_GPU_ONLY_OP(ncclBcastSend); -USE_GPU_ONLY_OP(ncclBcastRecv); +USE_GPU_ONLY_OP(ncclBcast); namespace f = paddle::framework; namespace p = paddle::platform; @@ -144,12 +144,62 @@ class NCCLTester : public ::testing::Test { // } // ncclAllReduceOp with desc -TEST_F(NCCLTester, ncclAllReduceOp) { +// TEST_F(NCCLTester, ncclAllReduceOp) { +// std::unique_ptr op2(new f::OpDescBind); +// op2->SetType("ncclAllReduce"); +// op2->SetInput("X", {"st"}); +// op2->SetInput("Communicator", {"comm"}); +// op2->SetOutput("Out", {"rt"}); + +// std::vector dev_scopes; + +// std::vector ths; + +// for (size_t i = 0; i < gpu_list.size(); ++i) { +// dev_scopes.emplace_back(&g_scope.NewScope()); +// std::thread th(&NCCLTester::PerThreadProgram, this, gpu_list[i], +// *op2.get(), dev_scopes[i]); +// ths.emplace_back(std::move(th)); +// } + +// for (size_t i = 0; i < gpu_list.size(); ++i) { +// ths[i].join(); +// } + +// // check results +// float result = std::accumulate(gpu_list.begin(), gpu_list.end(), 0); + +// for (size_t i = 0; i < dev_scopes.size(); ++i) { +// p::CPUPlace cpu_place; +// p::GPUPlace gpu_place(gpu_list[i]); + +// auto &recv_tensor = dev_scopes[i]->FindVar("rt")->Get(); +// auto *rt = recv_tensor.data(); +// auto *result_tensor = +// dev_scopes[i]->Var("ct")->GetMutable(); +// result_tensor->Resize(kDims); +// auto *ct = result_tensor->mutable_data(cpu_place); + +// paddle::memory::Copy( +// cpu_place, ct, p::GPUPlace(gpu_list[i]), rt, +// recv_tensor.numel() * sizeof(float), +// static_cast(dev_ctxs[i])->stream()); + +// for (size_t j = 0; j < f::product(kDims); ++j) { +// ASSERT_NEAR(ct[j], result, 1e-5); +// } +// } +// } + +// ncclAReduceOp with desc +TEST_F(NCCLTester, ncclReduceOp) { std::unique_ptr op2(new f::OpDescBind); - op2->SetType("ncclAllReduce"); + const int kRoot = 0; + op2->SetType("ncclReduce"); op2->SetInput("X", {"st"}); op2->SetInput("Communicator", {"comm"}); op2->SetOutput("Out", {"rt"}); + op2->SetAttr("root", {kRoot}); std::vector dev_scopes; @@ -166,39 +216,43 @@ TEST_F(NCCLTester, ncclAllReduceOp) { ths[i].join(); } - // check results - float result = 0; - std::accumulate(gpu_list.begin(), gpu_list.end(), result); - for (size_t i = 0; i < dev_scopes.size(); ++i) { - auto &recv_tensor = dev_scopes[i]->FindVar("rt")->Get(); - auto *rt = recv_tensor.data(); + // check results on + float result = std::accumulate(gpu_list.begin(), gpu_list.end(), 0); - p::CPUPlace cpu_place; - auto *result_tensor = dev_scopes[i]->Var("ct")->GetMutable(); - result_tensor->Resize(kDims); - auto *ct = result_tensor->mutable_data(cpu_place); + p::CPUPlace cpu_place; + p::GPUPlace gpu_place(gpu_list[kRoot]); - paddle::memory::Copy( - cpu_place, ct, p::GPUPlace(gpu_list[i]), rt, - recv_tensor.numel() * sizeof(float), - static_cast(dev_ctxs[i])->stream()); - for (size_t j = 0; j < f::product(kDims); ++j) { - ASSERT_NEAR(ct[j], result, 1e-5); - } + auto &recv_tensor = dev_scopes[kRoot]->FindVar("rt")->Get(); + auto *rt = recv_tensor.data(); + auto *result_tensor = + dev_scopes[kRoot]->Var("ct")->GetMutable(); + result_tensor->Resize(kDims); + auto *ct = result_tensor->mutable_data(cpu_place); + + paddle::memory::Copy( + cpu_place, ct, p::GPUPlace(gpu_list[kRoot]), rt, + recv_tensor.numel() * sizeof(float), + static_cast(dev_ctxs[kRoot])->stream()); + + for (int j = 0; j < f::product(kDims); ++j) { + ASSERT_NEAR(ct[j], result, 1e-5); } } -// ncclReduceOp with desc -TEST(NCCL, ncclReduceOp) { +// // ncclBcastOp with desc +TEST_F(NCCLTester, ncclBcastOp) { std::unique_ptr op2(new f::OpDescBind); - op2->SetType("ncclReduce"); + const int kRoot = 0; + op2->SetType("ncclBcast"); op2->SetInput("X", {"st"}); op2->SetInput("Communicator", {"comm"}); op2->SetOutput("Out", {"rt"}); + op2->SetAttr("root", {kRoot}); std::vector dev_scopes; std::vector ths; + for (size_t i = 0; i < gpu_list.size(); ++i) { dev_scopes.emplace_back(&g_scope.NewScope()); std::thread th(&NCCLTester::PerThreadProgram, this, gpu_list[i], @@ -210,76 +264,99 @@ TEST(NCCL, ncclReduceOp) { ths[i].join(); } - // check results - float result = 0; - std::accumulate(gpu_list.begin(), gpu_list.end(), result); - for (size_t i = 0; i < dev_scopes.size(); ++i) { - auto &recv_tensor = dev_scopes[i]->FindVar("rt")->Get(); - auto *rt = recv_tensor.data(); + const int idx = 1; + // check results on + float result = std::accumulate(gpu_list.begin(), gpu_list.end(), 0); - p::CPUPlace cpu_place; - auto *result_tensor = dev_scopes[i]->Var("ct")->GetMutable(); - result_tensor->Resize(kDims); - auto *ct = result_tensor->mutable_data(cpu_place); + p::CPUPlace cpu_place; + p::GPUPlace gpu_place(gpu_list[idx]); - paddle::memory::Copy( - cpu_place, ct, p::GPUPlace(gpu_list[i]), rt, - recv_tensor.numel() * sizeof(float), - static_cast(dev_ctxs[i])->stream()); - for (size_t j = 0; j < f::product(kDims); ++j) { - ASSERT_NEAR(ct[j], result, 1e-5); - } + auto &recv_tensor = dev_scopes[idx]->FindVar("rt")->Get(); + auto *rt = recv_tensor.data(); + auto *result_tensor = dev_scopes[idx]->Var("ct")->GetMutable(); + result_tensor->Resize(kDims); + auto *ct = result_tensor->mutable_data(cpu_place); + + paddle::memory::Copy( + cpu_place, ct, p::GPUPlace(gpu_list[idx]), rt, + recv_tensor.numel() * sizeof(float), + static_cast(dev_ctxs[idx])->stream()); + + for (size_t j = 0; j < f::product(kDims); ++j) { + ASSERT_NEAR(ct[j], result, 1e-5); } } -// ncclBcastOp with desc -TEST(NCCL, ncclBcastOp) { +// joint ncclBcastOp and ncclReduceOp +TEST_F(NCCLTester, MultipleOp) { + const int kRoot = 0; std::unique_ptr op1(new f::OpDescBind); - op1->SetType("ncclBcastSend"); - op1->SetInput("X", {"st"}); + op1->SetType("ncclReduce"); + op1->SetInput("X", {"rt"}); op1->SetInput("Communicator", {"comm"}); + op1->SetOutput("Out", {"rt"}); + op2->SetAttr("root", {kRoot}); std::unique_ptr op2(new f::OpDescBind); - op2->SetType("ncclBcastRecv"); + op2->SetType("ncclBcast"); + op2->SetInput("X", {"st"}); op2->SetInput("Communicator", {"comm"}); op2->SetOutput("Out", {"rt"}); + op2->SetAttr("root", {kRoot}); + + std::vector dev_scopes; std::vector ths; - for (size_t i = 1; i < gpu_list.size(); ++i) { + + // run Bcast + for (size_t i = 0; i < gpu_list.size(); ++i) { + dev_scopes.emplace_back(&g_scope.NewScope()); std::thread th(&NCCLTester::PerThreadProgram, this, gpu_list[i], - *op2.get(), &g_scope.NewScope()); + *op1.get(), dev_scopes[i]); ths.emplace_back(std::move(th)); } for (size_t i = 0; i < gpu_list.size(); ++i) { ths[i].join(); } -} -// joint ncclBcastOp and ncclReduceOp -// TEST(NCCL, MultipleOp) { -// std::unique_ptr op2(new f::OpDescBind); -// op2->SetType("ncclBcastSend"); -// op2->SetInput("X", {"st"}); -// op2->SetInput("Communicator", {"comm"}); + ths.clear(); -// std::unique_ptr op2(new f::OpDescBind); -// op2->SetType("ncclBcastRecv"); -// op2->SetInput("Communicator", {"comm"}); -// op2->SetOutput("Out", {"rt"}); + // run Reduce + for (size_t i = 0; i < gpu_list.size(); ++i) { + dev_scopes.emplace_back(&g_scope.NewScope()); + std::thread th(&NCCLTester::PerThreadProgram, this, gpu_list[i], + *op2.get(), dev_scopes[i]); + ths.emplace_back(std::move(th)); + } -// std::vector ths; -// for (size_t i = 0; i < gpu_list.size(); ++i) { -// std::thread th(&NCCLTester::PerThreadProgram, this, gpu_list[i], -// *op2.get(), -// &g_scope.NewScope()); -// ths.emplace_back(std::move(th)); -// } + for (size_t i = 0; i < gpu_list.size(); ++i) { + ths[i].join(); + } -// for (size_t i = 0; i < gpu_list.size(); ++i) { -// ths[i].join(); -// } -// } + // check results + float result = std::accumulate(gpu_list.begin(), gpu_list.end(), 0); + + for (size_t i = 0; i < dev_scopes.size(); ++i) { + p::CPUPlace cpu_place; + p::GPUPlace gpu_place(gpu_list[i]); + + auto &recv_tensor = dev_scopes[i]->FindVar("rt")->Get(); + auto *rt = recv_tensor.data(); + auto *result_tensor = dev_scopes[i]->Var("ct")->GetMutable(); + result_tensor->Resize(kDims); + auto *ct = result_tensor->mutable_data(cpu_place); + + paddle::memory::Copy( + cpu_place, ct, p::GPUPlace(gpu_list[i]), rt, + recv_tensor.numel() * sizeof(float), + static_cast(dev_ctxs[i])->stream()); + + for (int j = 0; j < f::product(kDims); ++j) { + ASSERT_NEAR(ct[j], result, 1e-5); + } + } +} int main(int argc, char **argv) { const int dev_count = p::GetCUDADeviceCount();