diff --git a/paddle/operators/nccl_op.cu b/paddle/operators/nccl_op.cu index eb7d4387efe817580a90c5bde7ee46700482c85f..9b9e1df2589460c3c0732f18ea5440a19391891f 100644 --- a/paddle/operators/nccl_op.cu +++ b/paddle/operators/nccl_op.cu @@ -142,18 +142,26 @@ class NCCLBcastKernel : public framework::OpKernel { if (idx == root) { auto ins = ctx.MultiInput("X"); for (size_t i = 0; i < ins.size(); ++i) { + VLOG(1) << " invoke Bcast. send " << ins[i]->numel(); + PADDLE_ENFORCE(platform::dynload::ncclBcast( (void*)ins[i]->data(), ins[i]->numel(), NCCLTypeWrapper::type, root, comm->comms_[idx], stream)); PADDLE_ENFORCE(cudaStreamSynchronize(stream)); + + VLOG(1) << " finished Bcast."; } } else { auto outs = ctx.MultiOutput("Out"); for (size_t i = 0; i < outs.size(); ++i) { + VLOG(1) << " invoke Bcast. recv. "; + PADDLE_ENFORCE(platform::dynload::ncclBcast( outs[i]->mutable_data(ctx.GetPlace()), outs[i]->numel(), NCCLTypeWrapper::type, root, comm->comms_[idx], stream)); PADDLE_ENFORCE(cudaStreamSynchronize(stream)); + + VLOG(1) << " finished Bcast. recv " << outs[i]->numel(); } } } diff --git a/paddle/operators/nccl_op_test.cu b/paddle/operators/nccl_op_test.cu index 71491d47bbd87232e294a7c675a51ba4ff2ab620..d785b279d6e18457eb919b66066536085d4028f1 100644 --- a/paddle/operators/nccl_op_test.cu +++ b/paddle/operators/nccl_op_test.cu @@ -123,73 +123,71 @@ class NCCLTester : public ::testing::Test { }; // ncclInitOp with desc -// TEST(NCCL, ncclInitOp) { -// std::unique_ptr op_desc(new f::OpDescBind); +TEST(NCCL, ncclInitOp) { + std::unique_ptr op_desc(new f::OpDescBind); -// op_desc->SetType("ncclInit"); -// op_desc->SetOutput("Communicator", {"x1"}); -// op_desc->SetAttr("gpus", {gpu_list}); + op_desc->SetType("ncclInit"); + op_desc->SetOutput("Communicator", {"x1"}); + op_desc->SetAttr("gpus", {gpu_list}); -// f::Scope g_scope; -// std::unique_ptr ctx(new -// p::CPUDeviceContext(p::CPUPlace())); + f::Scope g_scope; + std::unique_ptr ctx(new p::CPUDeviceContext(p::CPUPlace())); -// auto *var = g_scope.Var("x1"); -// var->GetMutable(); + auto *var = g_scope.Var("x1"); + var->GetMutable(); -// auto op = f::OpRegistry::CreateOp(*op_desc); -// VLOG(1) << "invoke NCCLInitOp."; -// op->Run(g_scope, *ctx.get()); -// VLOG(1) << "NCCLInitOp finished."; -// } + auto op = f::OpRegistry::CreateOp(*op_desc); + VLOG(1) << "invoke NCCLInitOp."; + op->Run(g_scope, *ctx.get()); + VLOG(1) << "NCCLInitOp finished."; +} // ncclAllReduceOp with desc -// TEST_F(NCCLTester, ncclAllReduceOp) { -// std::unique_ptr op2(new f::OpDescBind); -// op2->SetType("ncclAllReduce"); -// op2->SetInput("X", {"st"}); -// op2->SetInput("Communicator", {"comm"}); -// op2->SetOutput("Out", {"rt"}); - -// std::vector dev_scopes; - -// std::vector ths; - -// for (size_t i = 0; i < gpu_list.size(); ++i) { -// dev_scopes.emplace_back(&g_scope.NewScope()); -// std::thread th(&NCCLTester::PerThreadProgram, this, gpu_list[i], -// *op2.get(), dev_scopes[i]); -// ths.emplace_back(std::move(th)); -// } - -// for (size_t i = 0; i < gpu_list.size(); ++i) { -// ths[i].join(); -// } - -// // check results -// float result = std::accumulate(gpu_list.begin(), gpu_list.end(), 0); - -// for (size_t i = 0; i < dev_scopes.size(); ++i) { -// p::CPUPlace cpu_place; -// p::GPUPlace gpu_place(gpu_list[i]); - -// auto &recv_tensor = dev_scopes[i]->FindVar("rt")->Get(); -// auto *rt = recv_tensor.data(); -// auto *result_tensor = -// dev_scopes[i]->Var("ct")->GetMutable(); -// result_tensor->Resize(kDims); -// auto *ct = result_tensor->mutable_data(cpu_place); - -// paddle::memory::Copy( -// cpu_place, ct, p::GPUPlace(gpu_list[i]), rt, -// recv_tensor.numel() * sizeof(float), -// static_cast(dev_ctxs[i])->stream()); - -// for (size_t j = 0; j < f::product(kDims); ++j) { -// ASSERT_NEAR(ct[j], result, 1e-5); -// } -// } -// } +TEST_F(NCCLTester, ncclAllReduceOp) { + std::unique_ptr op2(new f::OpDescBind); + op2->SetType("ncclAllReduce"); + op2->SetInput("X", {"st"}); + op2->SetInput("Communicator", {"comm"}); + op2->SetOutput("Out", {"rt"}); + + std::vector dev_scopes; + + std::vector ths; + + for (size_t i = 0; i < gpu_list.size(); ++i) { + dev_scopes.emplace_back(&g_scope.NewScope()); + std::thread th(&NCCLTester::PerThreadProgram, this, gpu_list[i], + *op2.get(), dev_scopes[i]); + ths.emplace_back(std::move(th)); + } + + for (size_t i = 0; i < gpu_list.size(); ++i) { + ths[i].join(); + } + + // check results + float result = std::accumulate(gpu_list.begin(), gpu_list.end(), 0); + + for (size_t i = 0; i < dev_scopes.size(); ++i) { + p::CPUPlace cpu_place; + p::GPUPlace gpu_place(gpu_list[i]); + + auto &recv_tensor = dev_scopes[i]->FindVar("rt")->Get(); + auto *rt = recv_tensor.data(); + auto *result_tensor = dev_scopes[i]->Var("ct")->GetMutable(); + result_tensor->Resize(kDims); + auto *ct = result_tensor->mutable_data(cpu_place); + + paddle::memory::Copy( + cpu_place, ct, p::GPUPlace(gpu_list[i]), rt, + recv_tensor.numel() * sizeof(float), + static_cast(dev_ctxs[i])->stream()); + + for (size_t j = 0; j < f::product(kDims); ++j) { + ASSERT_NEAR(ct[j], result, 1e-5); + } + } +} // ncclAReduceOp with desc TEST_F(NCCLTester, ncclReduceOp) { @@ -242,7 +240,7 @@ TEST_F(NCCLTester, ncclReduceOp) { // // ncclBcastOp with desc TEST_F(NCCLTester, ncclBcastOp) { std::unique_ptr op2(new f::OpDescBind); - const int kRoot = 0; + const int kRoot = 5; op2->SetType("ncclBcast"); op2->SetInput("X", {"st"}); op2->SetInput("Communicator", {"comm"}); @@ -266,7 +264,7 @@ TEST_F(NCCLTester, ncclBcastOp) { const int idx = 1; // check results on - float result = std::accumulate(gpu_list.begin(), gpu_list.end(), 0); + float result = kRoot; p::CPUPlace cpu_place; p::GPUPlace gpu_place(gpu_list[idx]); @@ -292,14 +290,14 @@ TEST_F(NCCLTester, MultipleOp) { const int kRoot = 0; std::unique_ptr op1(new f::OpDescBind); op1->SetType("ncclReduce"); - op1->SetInput("X", {"rt"}); + op1->SetInput("X", {"st"}); op1->SetInput("Communicator", {"comm"}); op1->SetOutput("Out", {"rt"}); - op2->SetAttr("root", {kRoot}); + op1->SetAttr("root", {kRoot}); std::unique_ptr op2(new f::OpDescBind); op2->SetType("ncclBcast"); - op2->SetInput("X", {"st"}); + op2->SetInput("X", {"rt"}); op2->SetInput("Communicator", {"comm"}); op2->SetOutput("Out", {"rt"}); op2->SetAttr("root", {kRoot});