diff --git a/paddle/operators/nccl_op.cc b/paddle/operators/nccl_op.cc index 67bcc419fa4c72fb5ef4b262b121890513a3ff7b..6a0589cb20823a23934fa2fcb9ad5c27945c24f4 100644 --- a/paddle/operators/nccl_op.cc +++ b/paddle/operators/nccl_op.cc @@ -114,6 +114,9 @@ class NCCLBcastOp : public framework::OperatorWithKernel { PADDLE_ENFORCE(ctx->HasOutput("Out"), " Output(Out) of Bcast op output should not be NULL"); + int root = ctx->Attrs().Get("root"); + PADDLE_ENFORCE(root != -1, "Bcast root must be set."); + auto x_dims = ctx->GetInputsDim("X"); ctx->SetOutputsDim("Out", x_dims); ctx->ShareLoD("X", /*->*/ "Out"); diff --git a/paddle/operators/nccl_op.cu b/paddle/operators/nccl_op.cu index 9b9e1df2589460c3c0732f18ea5440a19391891f..1eef2f218f403fbecc367fd171dae4fc6ad86ba4 100644 --- a/paddle/operators/nccl_op.cu +++ b/paddle/operators/nccl_op.cu @@ -54,12 +54,12 @@ class NCCLAllReduceKernel : public framework::OpKernel { ctx.device_context()) .stream(); // device id - int device_id = - boost::get(ctx.GetPlace()).GetDeviceId(); - int idx = comm->GetCommId(device_id); + int gpu_id = boost::get(ctx.GetPlace()).GetDeviceId(); + int idx = comm->GetCommId(gpu_id); for (size_t i = 0; i < ins.size(); ++i) { - VLOG(1) << " invoke allreduce. send " << ins[i]->numel() << " recv " + VLOG(1) << "gpu : " + << " invoke allreduce. send " << ins[i]->numel() << " recv " << outs[i]->numel(); PADDLE_ENFORCE(platform::dynload::ncclAllReduce( @@ -68,7 +68,8 @@ class NCCLAllReduceKernel : public framework::OpKernel { comm->comms_[idx], stream)); PADDLE_ENFORCE(cudaStreamSynchronize(stream)); - VLOG(1) << " finished allreduce. send " << ins[i]->numel() << " recv " + VLOG(1) << "gpu : " + << " finished allreduce. send " << ins[i]->numel() << " recv " << outs[i]->numel(); } } @@ -91,9 +92,8 @@ class NCCLReduceKernel : public framework::OpKernel { ctx.device_context()) .stream(); // device id - int device_id = - boost::get(ctx.GetPlace()).GetDeviceId(); - int idx = comm->GetCommId(device_id); + int gpu_id = boost::get(ctx.GetPlace()).GetDeviceId(); + int idx = comm->GetCommId(gpu_id); auto ins_names = ctx.Inputs("X"); std::hash hasher; @@ -102,20 +102,20 @@ class NCCLReduceKernel : public framework::OpKernel { root = hasher(ins_names[i]) % comm->comms_.size(); } T* recvbuffer = nullptr; - if (root == device_id) { + if (root == gpu_id) { recvbuffer = outs[i]->mutable_data(ctx.GetPlace()); } - VLOG(1) << " invoke reduce. send " << ins[i]->numel() << " recv " - << outs[i]->numel(); + VLOG(1) << "gpu : " << gpu_id << " invoke reduce. send " + << ins[i]->numel() << " recv " << outs[i]->numel(); PADDLE_ENFORCE(platform::dynload::ncclReduce( ins[i]->data(), recvbuffer, ins[i]->numel(), NCCLTypeWrapper::type, ncclSum, root, comm->comms_[idx], stream)); PADDLE_ENFORCE(cudaStreamSynchronize(stream)); - VLOG(1) << " finished reduce. send " << ins[i]->numel() << " recv " - << outs[i]->numel(); + VLOG(1) << "gpu : " << gpu_id << " finished reduce. send " + << ins[i]->numel() << " recv " << outs[i]->numel(); } } }; @@ -135,33 +135,37 @@ class NCCLBcastKernel : public framework::OpKernel { ctx.device_context()) .stream(); // device id - int device_id = - boost::get(ctx.GetPlace()).GetDeviceId(); - int idx = comm->GetCommId(device_id); + int gpu_id = boost::get(ctx.GetPlace()).GetDeviceId(); + int idx = comm->GetCommId(gpu_id); if (idx == root) { auto ins = ctx.MultiInput("X"); for (size_t i = 0; i < ins.size(); ++i) { - VLOG(1) << " invoke Bcast. send " << ins[i]->numel(); + VLOG(1) << "gpu : " << gpu_id << " invoke Bcast. send " + << ins[i]->numel(); + VLOG(1) << " before ncclBcast"; PADDLE_ENFORCE(platform::dynload::ncclBcast( (void*)ins[i]->data(), ins[i]->numel(), NCCLTypeWrapper::type, root, comm->comms_[idx], stream)); + VLOG(1) << " after ncclBcast"; PADDLE_ENFORCE(cudaStreamSynchronize(stream)); - VLOG(1) << " finished Bcast."; + VLOG(1) << "gpu : " << gpu_id << " finished Bcast."; } } else { auto outs = ctx.MultiOutput("Out"); for (size_t i = 0; i < outs.size(); ++i) { - VLOG(1) << " invoke Bcast. recv. "; + VLOG(1) << "gpu : " << gpu_id << " invoke Bcast. recv buffer " + << framework::product(outs[i]->dims()); PADDLE_ENFORCE(platform::dynload::ncclBcast( outs[i]->mutable_data(ctx.GetPlace()), outs[i]->numel(), NCCLTypeWrapper::type, root, comm->comms_[idx], stream)); PADDLE_ENFORCE(cudaStreamSynchronize(stream)); - VLOG(1) << " finished Bcast. recv " << outs[i]->numel(); + VLOG(1) << "gpu : " << gpu_id << " finished Bcast. recv " + << outs[i]->numel(); } } } diff --git a/paddle/operators/nccl_op_test.cu b/paddle/operators/nccl_op_test.cu index d785b279d6e18457eb919b66066536085d4028f1..1132c3d43d97a89bfc7adc0ab02059d9aab60f8d 100644 --- a/paddle/operators/nccl_op_test.cu +++ b/paddle/operators/nccl_op_test.cu @@ -87,30 +87,34 @@ class NCCLTester : public ::testing::Test { void PerThreadProgram(int gpu_id, const f::OpDescBind &op_desc, f::Scope *scope) { std::unique_lock lk(mu); - f::ProgramDescBind program; - f::BlockDescBind *block = program.Block(0); - f::OpDescBind *op1 = block->AppendOp(); - *op1 = op_desc; + const f::OpDescBind *op1 = &op_desc; p::GPUPlace place(gpu_id); auto &ctx = dev_ctxs.at(gpu_id); auto *send_tensor = scope->Var("st")->GetMutable(); auto *recv_tensor = scope->Var("rt")->GetMutable(); - send_tensor->Resize(kDims); - send_tensor->mutable_data(kDims, place); - std::vector send_vector(f::product(kDims), gpu_id); - send_tensor->CopyFromVector(send_vector, *ctx); + if (!send_tensor->numel()) { + send_tensor->Resize(kDims); + send_tensor->mutable_data(kDims, place); + + std::vector send_vector(f::product(kDims), gpu_id); + send_tensor->CopyFromVector(send_vector, *ctx); + ctx->Wait(); + VLOG(1) << "Send Tensor filled with elements " << send_tensor->numel(); + } + lk.unlock(); + PADDLE_ENFORCE(send_tensor->numel() == f::product(kDims), "Tensor numel not match!"); - ctx->Wait(); - - VLOG(1) << "Send Tensor filled with elements " << send_tensor->numel(); auto op = f::OpRegistry::CreateOp(*op1); + VLOG(1) << "Device : " << gpu_id << " invoke " << op_desc.Type(); + VLOG(1) << " send_tensor : " << send_tensor->numel() + << " recv_tensor : " << recv_tensor->numel(); op->Run(*scope, *ctx); VLOG(1) << "Device : " << gpu_id << " finished " << op_desc.Type(); } @@ -122,168 +126,171 @@ class NCCLTester : public ::testing::Test { std::mutex mu; }; -// ncclInitOp with desc -TEST(NCCL, ncclInitOp) { - std::unique_ptr op_desc(new f::OpDescBind); - - op_desc->SetType("ncclInit"); - op_desc->SetOutput("Communicator", {"x1"}); - op_desc->SetAttr("gpus", {gpu_list}); - - f::Scope g_scope; - std::unique_ptr ctx(new p::CPUDeviceContext(p::CPUPlace())); - - auto *var = g_scope.Var("x1"); - var->GetMutable(); - - auto op = f::OpRegistry::CreateOp(*op_desc); - VLOG(1) << "invoke NCCLInitOp."; - op->Run(g_scope, *ctx.get()); - VLOG(1) << "NCCLInitOp finished."; -} - -// ncclAllReduceOp with desc -TEST_F(NCCLTester, ncclAllReduceOp) { - std::unique_ptr op2(new f::OpDescBind); - op2->SetType("ncclAllReduce"); - op2->SetInput("X", {"st"}); - op2->SetInput("Communicator", {"comm"}); - op2->SetOutput("Out", {"rt"}); - - std::vector dev_scopes; - - std::vector ths; - - for (size_t i = 0; i < gpu_list.size(); ++i) { - dev_scopes.emplace_back(&g_scope.NewScope()); - std::thread th(&NCCLTester::PerThreadProgram, this, gpu_list[i], - *op2.get(), dev_scopes[i]); - ths.emplace_back(std::move(th)); - } - - for (size_t i = 0; i < gpu_list.size(); ++i) { - ths[i].join(); - } - - // check results - float result = std::accumulate(gpu_list.begin(), gpu_list.end(), 0); - - for (size_t i = 0; i < dev_scopes.size(); ++i) { - p::CPUPlace cpu_place; - p::GPUPlace gpu_place(gpu_list[i]); - - auto &recv_tensor = dev_scopes[i]->FindVar("rt")->Get(); - auto *rt = recv_tensor.data(); - auto *result_tensor = dev_scopes[i]->Var("ct")->GetMutable(); - result_tensor->Resize(kDims); - auto *ct = result_tensor->mutable_data(cpu_place); - - paddle::memory::Copy( - cpu_place, ct, p::GPUPlace(gpu_list[i]), rt, - recv_tensor.numel() * sizeof(float), - static_cast(dev_ctxs[i])->stream()); - - for (size_t j = 0; j < f::product(kDims); ++j) { - ASSERT_NEAR(ct[j], result, 1e-5); - } - } -} - -// ncclAReduceOp with desc -TEST_F(NCCLTester, ncclReduceOp) { - std::unique_ptr op2(new f::OpDescBind); - const int kRoot = 0; - op2->SetType("ncclReduce"); - op2->SetInput("X", {"st"}); - op2->SetInput("Communicator", {"comm"}); - op2->SetOutput("Out", {"rt"}); - op2->SetAttr("root", {kRoot}); - - std::vector dev_scopes; - - std::vector ths; - - for (size_t i = 0; i < gpu_list.size(); ++i) { - dev_scopes.emplace_back(&g_scope.NewScope()); - std::thread th(&NCCLTester::PerThreadProgram, this, gpu_list[i], - *op2.get(), dev_scopes[i]); - ths.emplace_back(std::move(th)); - } - - for (size_t i = 0; i < gpu_list.size(); ++i) { - ths[i].join(); - } - - // check results on - float result = std::accumulate(gpu_list.begin(), gpu_list.end(), 0); - - p::CPUPlace cpu_place; - p::GPUPlace gpu_place(gpu_list[kRoot]); - - auto &recv_tensor = dev_scopes[kRoot]->FindVar("rt")->Get(); - auto *rt = recv_tensor.data(); - auto *result_tensor = - dev_scopes[kRoot]->Var("ct")->GetMutable(); - result_tensor->Resize(kDims); - auto *ct = result_tensor->mutable_data(cpu_place); - - paddle::memory::Copy( - cpu_place, ct, p::GPUPlace(gpu_list[kRoot]), rt, - recv_tensor.numel() * sizeof(float), - static_cast(dev_ctxs[kRoot])->stream()); - - for (int j = 0; j < f::product(kDims); ++j) { - ASSERT_NEAR(ct[j], result, 1e-5); - } -} - -// // ncclBcastOp with desc -TEST_F(NCCLTester, ncclBcastOp) { - std::unique_ptr op2(new f::OpDescBind); - const int kRoot = 5; - op2->SetType("ncclBcast"); - op2->SetInput("X", {"st"}); - op2->SetInput("Communicator", {"comm"}); - op2->SetOutput("Out", {"rt"}); - op2->SetAttr("root", {kRoot}); - - std::vector dev_scopes; - - std::vector ths; - - for (size_t i = 0; i < gpu_list.size(); ++i) { - dev_scopes.emplace_back(&g_scope.NewScope()); - std::thread th(&NCCLTester::PerThreadProgram, this, gpu_list[i], - *op2.get(), dev_scopes[i]); - ths.emplace_back(std::move(th)); - } - - for (size_t i = 0; i < gpu_list.size(); ++i) { - ths[i].join(); - } - - const int idx = 1; - // check results on - float result = kRoot; - - p::CPUPlace cpu_place; - p::GPUPlace gpu_place(gpu_list[idx]); - - auto &recv_tensor = dev_scopes[idx]->FindVar("rt")->Get(); - auto *rt = recv_tensor.data(); - auto *result_tensor = dev_scopes[idx]->Var("ct")->GetMutable(); - result_tensor->Resize(kDims); - auto *ct = result_tensor->mutable_data(cpu_place); - - paddle::memory::Copy( - cpu_place, ct, p::GPUPlace(gpu_list[idx]), rt, - recv_tensor.numel() * sizeof(float), - static_cast(dev_ctxs[idx])->stream()); - - for (size_t j = 0; j < f::product(kDims); ++j) { - ASSERT_NEAR(ct[j], result, 1e-5); - } -} +// // ncclInitOp with desc +// TEST(NCCL, ncclInitOp) { +// std::unique_ptr op_desc(new f::OpDescBind); + +// op_desc->SetType("ncclInit"); +// op_desc->SetOutput("Communicator", {"x1"}); +// op_desc->SetAttr("gpus", {gpu_list}); + +// f::Scope g_scope; +// std::unique_ptr ctx(new +// p::CPUDeviceContext(p::CPUPlace())); + +// auto *var = g_scope.Var("x1"); +// var->GetMutable(); + +// auto op = f::OpRegistry::CreateOp(*op_desc); +// VLOG(1) << "invoke NCCLInitOp."; +// op->Run(g_scope, *ctx.get()); +// VLOG(1) << "NCCLInitOp finished."; +// } + +// // ncclAllReduceOp with desc +// TEST_F(NCCLTester, ncclAllReduceOp) { +// std::unique_ptr op2(new f::OpDescBind); +// op2->SetType("ncclAllReduce"); +// op2->SetInput("X", {"st"}); +// op2->SetInput("Communicator", {"comm"}); +// op2->SetOutput("Out", {"rt"}); + +// std::vector dev_scopes; + +// std::vector ths; + +// for (size_t i = 0; i < gpu_list.size(); ++i) { +// dev_scopes.emplace_back(&g_scope.NewScope()); +// std::thread th(&NCCLTester::PerThreadProgram, this, gpu_list[i], +// *op2.get(), dev_scopes[i]); +// ths.emplace_back(std::move(th)); +// } + +// for (size_t i = 0; i < gpu_list.size(); ++i) { +// ths[i].join(); +// } + +// // check results +// float result = std::accumulate(gpu_list.begin(), gpu_list.end(), 0); + +// for (size_t i = 0; i < dev_scopes.size(); ++i) { +// p::CPUPlace cpu_place; +// p::GPUPlace gpu_place(gpu_list[i]); + +// auto &recv_tensor = dev_scopes[i]->FindVar("rt")->Get(); +// auto *rt = recv_tensor.data(); +// auto *result_tensor = +// dev_scopes[i]->Var("ct")->GetMutable(); +// result_tensor->Resize(kDims); +// auto *ct = result_tensor->mutable_data(cpu_place); + +// paddle::memory::Copy( +// cpu_place, ct, p::GPUPlace(gpu_list[i]), rt, +// recv_tensor.numel() * sizeof(float), +// static_cast(dev_ctxs[i])->stream()); + +// for (size_t j = 0; j < f::product(kDims); ++j) { +// ASSERT_NEAR(ct[j], result, 1e-5); +// } +// } +// } + +// // ncclAReduceOp with desc +// TEST_F(NCCLTester, ncclReduceOp) { +// std::unique_ptr op2(new f::OpDescBind); +// const int kRoot = 0; +// op2->SetType("ncclReduce"); +// op2->SetInput("X", {"st"}); +// op2->SetInput("Communicator", {"comm"}); +// op2->SetOutput("Out", {"rt"}); +// op2->SetAttr("root", {kRoot}); + +// std::vector dev_scopes; + +// std::vector ths; + +// for (size_t i = 0; i < gpu_list.size(); ++i) { +// dev_scopes.emplace_back(&g_scope.NewScope()); +// std::thread th(&NCCLTester::PerThreadProgram, this, gpu_list[i], +// *op2.get(), dev_scopes[i]); +// ths.emplace_back(std::move(th)); +// } + +// for (size_t i = 0; i < gpu_list.size(); ++i) { +// ths[i].join(); +// } + +// // check results on +// float result = std::accumulate(gpu_list.begin(), gpu_list.end(), 0); + +// p::CPUPlace cpu_place; +// p::GPUPlace gpu_place(gpu_list[kRoot]); + +// auto &recv_tensor = dev_scopes[kRoot]->FindVar("rt")->Get(); +// auto *rt = recv_tensor.data(); +// auto *result_tensor = +// dev_scopes[kRoot]->Var("ct")->GetMutable(); +// result_tensor->Resize(kDims); +// auto *ct = result_tensor->mutable_data(cpu_place); + +// paddle::memory::Copy( +// cpu_place, ct, p::GPUPlace(gpu_list[kRoot]), rt, +// recv_tensor.numel() * sizeof(float), +// static_cast(dev_ctxs[kRoot])->stream()); + +// for (int j = 0; j < f::product(kDims); ++j) { +// ASSERT_NEAR(ct[j], result, 1e-5); +// } +// } + +// // // ncclBcastOp with desc +// TEST_F(NCCLTester, ncclBcastOp) { +// std::unique_ptr op2(new f::OpDescBind); +// const int kRoot = 5; +// op2->SetType("ncclBcast"); +// op2->SetInput("X", {"st"}); +// op2->SetInput("Communicator", {"comm"}); +// op2->SetOutput("Out", {"rt"}); +// op2->SetAttr("root", {kRoot}); + +// std::vector dev_scopes; + +// std::vector ths; + +// for (size_t i = 0; i < gpu_list.size(); ++i) { +// dev_scopes.emplace_back(&g_scope.NewScope()); +// std::thread th(&NCCLTester::PerThreadProgram, this, gpu_list[i], +// *op2.get(), dev_scopes[i]); +// ths.emplace_back(std::move(th)); +// } + +// for (size_t i = 0; i < gpu_list.size(); ++i) { +// ths[i].join(); +// } + +// const int idx = 1; +// // check results on +// float result = kRoot; + +// p::CPUPlace cpu_place; +// p::GPUPlace gpu_place(gpu_list[idx]); + +// auto &recv_tensor = dev_scopes[idx]->FindVar("rt")->Get(); +// auto *rt = recv_tensor.data(); +// auto *result_tensor = +// dev_scopes[idx]->Var("ct")->GetMutable(); +// result_tensor->Resize(kDims); +// auto *ct = result_tensor->mutable_data(cpu_place); + +// paddle::memory::Copy( +// cpu_place, ct, p::GPUPlace(gpu_list[idx]), rt, +// recv_tensor.numel() * sizeof(float), +// static_cast(dev_ctxs[idx])->stream()); + +// for (size_t j = 0; j < f::product(kDims); ++j) { +// ASSERT_NEAR(ct[j], result, 1e-5); +// } +// } // joint ncclBcastOp and ncclReduceOp TEST_F(NCCLTester, MultipleOp) { @@ -299,14 +306,17 @@ TEST_F(NCCLTester, MultipleOp) { op2->SetType("ncclBcast"); op2->SetInput("X", {"rt"}); op2->SetInput("Communicator", {"comm"}); - op2->SetOutput("Out", {"rt"}); + op2->SetOutput("Out", {"out"}); op2->SetAttr("root", {kRoot}); std::vector dev_scopes; + // for (size_t i = 0; i < dev_scopes.size(); ++i) { + // dev_scopes[i]->Var("out")->GetMutable(); + // } std::vector ths; - // run Bcast + // run Reduce for (size_t i = 0; i < gpu_list.size(); ++i) { dev_scopes.emplace_back(&g_scope.NewScope()); std::thread th(&NCCLTester::PerThreadProgram, this, gpu_list[i], @@ -320,9 +330,9 @@ TEST_F(NCCLTester, MultipleOp) { ths.clear(); - // run Reduce + // run Bcast for (size_t i = 0; i < gpu_list.size(); ++i) { - dev_scopes.emplace_back(&g_scope.NewScope()); + dev_scopes[i]->Var("out")->GetMutable(); std::thread th(&NCCLTester::PerThreadProgram, this, gpu_list[i], *op2.get(), dev_scopes[i]); ths.emplace_back(std::move(th));