diff --git a/paddle/operators/nccl_op_test.cu b/paddle/operators/nccl_op_test.cu index 0e64802f1793aa159faf53e1511ea29880c35143..8c54a3dcba2d3a15815a0e528ac1c7b44809c5f3 100644 --- a/paddle/operators/nccl_op_test.cu +++ b/paddle/operators/nccl_op_test.cu @@ -43,81 +43,107 @@ namespace f = paddle::framework; namespace p = paddle::platform; static std::vector gpu_list; -static std::vector> dev_ctxs; -std::mutex mu; // test data amount const f::DDim kDims = {100, 100}; -// ncclInitOp with desc -TEST(NCCL, ncclInitOp) { - std::unique_ptr op_desc(new f::OpDescBind); +// nccl op common tester, init communicator. +class NCCLTester : public ::testing::Test { + public: + virtual void SetUp() override { + cpu_ctx = new p::CPUDeviceContext(p::CPUPlace()); + for (size_t i = 0; i < gpu_list.size(); ++i) { + p::GPUPlace place(i); + dev_ctxs.emplace_back(new p::CUDADeviceContext(place)); + } + + NCCLInitOp(); + } - op_desc->SetType("ncclInit"); - op_desc->SetOutput("Communicator", {"x1"}); - op_desc->SetAttr("gpus", {gpu_list}); - f::Scope g_scope; - p::DeviceContext *ctx = new p::CPUDeviceContext(p::CPUPlace()); + virtual void TearDown() override { + for (auto &device_context : dev_ctxs) { + delete device_context; + } + } - auto *var = g_scope.Var("x1"); - var->GetMutable(); + void NCCLInitOp() { + std::unique_ptr op1(new f::OpDescBind); - auto op = f::OpRegistry::CreateOp(*op_desc); - VLOG(1) << "invoke NCCLInitOp."; - op->Run(g_scope, *ctx); - VLOG(1) << "NCCLInitOp finished."; -} + op1->SetType("ncclInit"); + op1->SetOutput("Communicator", {"comm"}); + op1->SetAttr("gpus", {gpu_list}); -template -void DeviceProgram(int gpu_id, const f::OpDescBind &op_desc, f::Scope *scope) { - std::unique_lock lk(mu); - f::ProgramDescBind program; - f::BlockDescBind *block = program.Block(0); - f::OpDescBind *op1 = block->AppendOp(); - *op1 = op_desc; - - p::GPUPlace place(gpu_id); - auto ctx = dev_ctxs.at(gpu_id); - - auto *send_tensor = scope->Var("st")->GetMutable(); - auto *recv_tensor = scope->Var("rt")->GetMutable(); - send_tensor->Resize(kDims); - send_tensor->mutable_data(kDims, place); - - std::vector send_vector(f::product(kDims), gpu_id); - send_tensor->CopyFromVector(send_vector, *ctx); - lk.unlock(); - PADDLE_ENFORCE(send_tensor->numel() == f::product(kDims), - "Tensor numel not match!"); - ctx->Wait(); - - VLOG(1) << "Send Tensor filled with elements " << send_tensor->numel(); - - auto op = f::OpRegistry::CreateOp(*op1); - VLOG(1) << "Device : " << gpu_id << " invoke " << op_desc.Type(); - op->Run(*scope, *ctx); - VLOG(1) << "Device : " << gpu_id << " finished " << op_desc.Type(); -} + auto *var = g_scope.Var("comm"); + var->GetMutable(); -// ncclAllReduceOp with desc -TEST(NCCL, ncclAllReduceOp) { - std::unique_ptr ctx(new p::CPUDeviceContext(p::CPUPlace())); - std::unique_ptr g_scope(new Scope); + auto op = f::OpRegistry::CreateOp(*op1); + VLOG(1) << "invoke NCCLInitOp."; + op->Run(g_scope, *cpu_ctx); + VLOG(1) << "NCCLInitOp finished."; + } + + template + void PerThreadProgram(int gpu_id, const f::OpDescBind &op_desc, + f::Scope *scope) { + std::unique_lock lk(mu); + f::ProgramDescBind program; + f::BlockDescBind *block = program.Block(0); + f::OpDescBind *op1 = block->AppendOp(); + *op1 = op_desc; + + p::GPUPlace place(gpu_id); + auto &ctx = dev_ctxs.at(gpu_id); + + auto *send_tensor = scope->Var("st")->GetMutable(); + auto *recv_tensor = scope->Var("rt")->GetMutable(); + send_tensor->Resize(kDims); + send_tensor->mutable_data(kDims, place); + + std::vector send_vector(f::product(kDims), gpu_id); + send_tensor->CopyFromVector(send_vector, *ctx); + lk.unlock(); + PADDLE_ENFORCE(send_tensor->numel() == f::product(kDims), + "Tensor numel not match!"); + ctx->Wait(); + + VLOG(1) << "Send Tensor filled with elements " << send_tensor->numel(); + + auto op = f::OpRegistry::CreateOp(*op1); + VLOG(1) << "Device : " << gpu_id << " invoke " << op_desc.Type(); + op->Run(*scope, *ctx); + VLOG(1) << "Device : " << gpu_id << " finished " << op_desc.Type(); + } - std::unique_ptr op1(new f::OpDescBind); - op1->SetType("ncclInit"); - op1->SetOutput("Communicator", {"comm"}); - op1->SetAttr("gpus", {gpu_list}); + public: + std::vector dev_ctxs; + p::DeviceContext *cpu_ctx; + f::Scope g_scope; + std::mutex mu; +}; + +// ncclInitOp with desc +// TEST(NCCL, ncclInitOp) { +// std::unique_ptr op_desc(new f::OpDescBind); + +// op_desc->SetType("ncclInit"); +// op_desc->SetOutput("Communicator", {"x1"}); +// op_desc->SetAttr("gpus", {gpu_list}); + +// f::Scope g_scope; +// std::unique_ptr ctx(new +// p::CPUDeviceContext(p::CPUPlace())); - auto *var = g_scope.Var("comm"); - var->GetMutable(); +// auto *var = g_scope.Var("x1"); +// var->GetMutable(); - auto op = f::OpRegistry::CreateOp(*op1); - VLOG(1) << "invoke NCCLInitOp."; - op->Run(g_scope, *ctx); - VLOG(1) << "NCCLInitOp finished."; - delete ctx; +// auto op = f::OpRegistry::CreateOp(*op_desc); +// VLOG(1) << "invoke NCCLInitOp."; +// op->Run(g_scope, *ctx.get()); +// VLOG(1) << "NCCLInitOp finished."; +// } +// ncclAllReduceOp with desc +TEST_F(NCCLTester, ncclAllReduceOp) { std::unique_ptr op2(new f::OpDescBind); op2->SetType("ncclAllReduce"); op2->SetInput("X", {"st"}); @@ -126,36 +152,18 @@ TEST(NCCL, ncclAllReduceOp) { std::vector ths; for (size_t i = 0; i < gpu_list.size(); ++i) { - std::thread th(DeviceProgram, gpu_list[i], *op2, - &g_scope.NewScope()); + std::thread th(&NCCLTester::PerThreadProgram, this, gpu_list[i], + *op2.get(), &g_scope.NewScope()); ths.emplace_back(std::move(th)); } for (size_t i = 0; i < gpu_list.size(); ++i) { ths[i].join(); } - g_scope->reset(nullptr); } // ncclReduceOp with desc TEST(NCCL, ncclReduceOp) { - std::unique_ptr ctx(new p::CPUDeviceContext(p::CPUPlace())); - std::unique_ptr g_scope(new Scope); - - std::unique_ptr op1(new f::OpDescBind); - op1->SetType("ncclInit"); - op1->SetOutput("Communicator", {"comm"}); - op1->SetAttr("gpus", {gpu_list}); - - auto *var = g_scope.Var("comm"); - var->GetMutable(); - - auto op = f::OpRegistry::CreateOp(*op1); - VLOG(1) << "invoke NCCLInitOp."; - op->Run(g_scope, *ctx); - VLOG(1) << "NCCLInitOp finished."; - delete ctx; - std::unique_ptr op2(new f::OpDescBind); op2->SetType("ncclReduce"); op2->SetInput("X", {"st"}); @@ -164,53 +172,36 @@ TEST(NCCL, ncclReduceOp) { std::vector ths; for (size_t i = 0; i < gpu_list.size(); ++i) { - std::thread th(DeviceProgram, gpu_list[i], *op2, - &g_scope.NewScope()); + std::thread th(&NCCLTester::PerThreadProgram, this, gpu_list[i], + *op2.get(), &g_scope.NewScope()); ths.emplace_back(std::move(th)); } for (size_t i = 0; i < gpu_list.size(); ++i) { ths[i].join(); } - g_scope->reset(nullptr); } // ncclBcastOp with desc -TEST(NCCL, ncclBcastOp) { - f::ProgramDescBind program; - f::BlockDescBind *block = program.Block(0); - f::OpDescBind *op1 = block->AppendOp(); - - p::DeviceContext *ctx = new p::CPUDeviceContext(p::CPUPlace()); - - op1->SetType("ncclInit"); - op1->SetOutput("Communicator", {"comm"}); - op1->SetAttr("gpus", {gpu_list}); - - auto *var = g_scope.Var("comm"); - var->GetMutable(); - - auto op = f::OpRegistry::CreateOp(*op1); - VLOG(1) << "invoke NCCLInitOp."; - op->Run(g_scope, *ctx); - VLOG(1) << "NCCLInitOp finished."; - - f::OpDescBind *op2 = new f::OpDescBind; - op2->SetType("ncclBcastSend"); - op2->SetInput("X", {"st"}); - op2->SetInput("Communicator", {"comm"}); - op2->SetOutput("Out", {"rt"}); - - std::vector ths; - for (size_t i = 0; i < gpu_list.size(); ++i) { - std::thread th(DeviceProgram, gpu_list[i], *op2); - ths.emplace_back(std::move(th)); - } - - for (size_t i = 0; i < gpu_list.size(); ++i) { - ths[i].join(); - } -} +// TEST(NCCL, ncclBcastOp) { +// std::unique_ptr op2(new f::OpDescBind); +// op2->SetType("ncclBcastSend"); +// op2->SetInput("X", {"st"}); +// op2->SetInput("Communicator", {"comm"}); +// op2->SetOutput("Out", {"rt"}); + +// std::vector ths; +// for (size_t i = 0; i < gpu_list.size(); ++i) { +// std::thread th(&NCCLTester::PerThreadProgram, this, gpu_list[i], +// *op2.get(), +// &g_scope.NewScope()); +// ths.emplace_back(std::move(th)); +// } + +// for (size_t i = 0; i < gpu_list.size(); ++i) { +// ths[i].join(); +// } +// } int main(int argc, char **argv) { const int dev_count = p::GetCUDADeviceCount(); @@ -228,9 +219,5 @@ int main(int argc, char **argv) { // device context should be release before scope. // otherwise driver will down. - for (size_t i = 0; i < gpu_list.size(); ++i) { - p::GPUPlace place(i); - dev_ctxs.emplace_back(new p::CUDADeviceContext(place)); - } return RUN_ALL_TESTS(); }