diff --git a/paddle/operators/nccl_op.cc b/paddle/operators/nccl_op.cc index 85f589f4aae0e81b59c0665d86104ca4ec8a5b53..ec7a89d5ff4a66e0527d8365027bc2c5b4e93aaf 100644 --- a/paddle/operators/nccl_op.cc +++ b/paddle/operators/nccl_op.cc @@ -74,15 +74,8 @@ class NCCLAllReduceOp : public framework::OperatorWithKernel { // reduction == "ncclMin" || reduction == "ncclMax"), // "invalid reduction."); - // auto in_dim = x_dims[0]; ctx->SetOutputsDim("Out", x_dims); ctx->ShareLoD("X", /*->*/ "Out"); - size_t N = x_dims.size(); - auto out_dims = ctx->GetOutputsDim("Out"); - for (size_t i = 0; i < N; ++i) { - VLOG(1) << " inference (X) " << framework::product(x_dims[i]) << " (Out)" - << framework::product(out_dims[i]); - } } }; diff --git a/paddle/operators/nccl_op.cu b/paddle/operators/nccl_op.cu index c507d325f2872f085c1fb88e23941fd7c73b1f98..68d0d5b7c9060724facee0ead4f4a7547506d6ee 100644 --- a/paddle/operators/nccl_op.cu +++ b/paddle/operators/nccl_op.cu @@ -58,12 +58,6 @@ class NCCLAllReduceKernel : public framework::OpKernel { boost::get(ctx.GetPlace()).GetDeviceId(); int idx = comm->GetCommId(device_id); - size_t N = ins.size(); - for (size_t i = 0; i < N; ++i) { - VLOG(1) << " inference (X) " << framework::product(ins[i]->dims()) - << " (Out)" << framework::product(outs[i]->dims()); - } - for (size_t i = 0; i < ins.size(); ++i) { VLOG(1) << " invoke allreduce. send " << ins[i]->numel() << " recv " << outs[i]->numel(); @@ -87,8 +81,8 @@ class NCCLReduceKernel : public framework::OpKernel { PADDLE_ENFORCE(platform::is_gpu_place(ctx.GetPlace()), "This kernel only runs on GPU device."); - auto ins = ctx.MultiInput("X"); // x0, x1, x2 - auto outs = ctx.MultiOutput("Out"); + auto ins = ctx.MultiInput("X"); // x0, x1, x2 + auto outs = ctx.MultiOutput("Out"); auto* comm = ctx.Input("Communicator"); @@ -108,10 +102,17 @@ class NCCLReduceKernel : public framework::OpKernel { if (root == device_id) { recvbuffer = outs[i]->mutable_data(ctx.GetPlace()); } + + VLOG(1) << " invoke reduce. send " << ins[i]->numel() << " recv " + << outs[i]->numel(); + PADDLE_ENFORCE(platform::dynload::ncclReduce( ins[i]->data(), recvbuffer, ins[i]->numel(), NCCLTypeWrapper::type, ncclSum, root, comm->comms_[idx], stream)); PADDLE_ENFORCE(cudaStreamSynchronize(stream)); + + VLOG(1) << " finished reduce. send " << ins[i]->numel() << " recv " + << outs[i]->numel(); } } }; diff --git a/paddle/operators/nccl_op_test.cu b/paddle/operators/nccl_op_test.cu index 0509e6ddabf44301721b58b1e5c7a0972a0f9cda..0e64802f1793aa159faf53e1511ea29880c35143 100644 --- a/paddle/operators/nccl_op_test.cu +++ b/paddle/operators/nccl_op_test.cu @@ -16,7 +16,7 @@ #include #include -#include +#include #include #include #include @@ -24,6 +24,7 @@ #include "paddle/framework/block_desc.h" #include "paddle/framework/op_desc.h" +#include "paddle/framework/op_registry.h" #include "paddle/framework/program_desc.h" #include "paddle/framework/var_desc.h" #include "paddle/operators/nccl/nccl_gpu_common.h" @@ -32,8 +33,6 @@ #include "paddle/platform/gpu_info.h" #include "paddle/platform/place.h" -#include "paddle/framework/op_registry.h" - USE_NO_KERNEL_OP(ncclInit); USE_GPU_ONLY_OP(ncclAllReduce); USE_GPU_ONLY_OP(ncclReduce); @@ -44,51 +43,31 @@ namespace f = paddle::framework; namespace p = paddle::platform; static std::vector gpu_list; +static std::vector> dev_ctxs; +std::mutex mu; + +// test data amount +const f::DDim kDims = {100, 100}; // ncclInitOp with desc -// TEST(NCCL, ncclInitOp) { -// f::ProgramDescBind program; -// f::BlockDescBind *block = program.Block(0); -// f::OpDescBind *op_desc = block->AppendOp(); - -// op_desc->SetType("ncclInit"); -// op_desc->SetOutput("Communicator", {"x1"}); -// op_desc->SetAttr("gpus", {gpu_list}); -// f::Scope g_scope; -// p::DeviceContext *ctx = -// new p::CPUDeviceContext(p::CPUPlace()); - -// auto *var = g_scope.Var("x1"); -// var->GetMutable(); - -// auto op = f::OpRegistry::CreateOp(*op_desc); -// VLOG(1) << "invoke NCCLInitOp."; -// op->Run(g_scope, *ctx); -// VLOG(1) << "NCCLInitOp finished."; -// } +TEST(NCCL, ncclInitOp) { + std::unique_ptr op_desc(new f::OpDescBind); -// test data amount -static const f::DDim kDims = {100, 100}; -static std::vector dev_ctxs; + op_desc->SetType("ncclInit"); + op_desc->SetOutput("Communicator", {"x1"}); + op_desc->SetAttr("gpus", {gpu_list}); + f::Scope g_scope; + p::DeviceContext *ctx = new p::CPUDeviceContext(p::CPUPlace()); -void CreateContext() { - for (size_t i = 0; i < gpu_list.size(); ++i) { - p::GPUPlace place(i); - VLOG(1) << "create devicecontext : " << i; - dev_ctxs.emplace_back(new p::CUDADeviceContext(place)); - } -} + auto *var = g_scope.Var("x1"); + var->GetMutable(); -void DestroyContext() { - for (size_t i = 0; i < gpu_list.size(); ++i) { - delete dev_ctxs[i]; - } + auto op = f::OpRegistry::CreateOp(*op_desc); + VLOG(1) << "invoke NCCLInitOp."; + op->Run(g_scope, *ctx); + VLOG(1) << "NCCLInitOp finished."; } -// global scope -static f::Scope g_scope; -std::mutex mu; - template void DeviceProgram(int gpu_id, const f::OpDescBind &op_desc, f::Scope *scope) { std::unique_lock lk(mu); @@ -98,18 +77,12 @@ void DeviceProgram(int gpu_id, const f::OpDescBind &op_desc, f::Scope *scope) { *op1 = op_desc; p::GPUPlace place(gpu_id); - // p::DeviceContext *ctx = - // new p::CUDADeviceContext(place); - p::DeviceContext *ctx = dev_ctxs.at(gpu_id); - VLOG(1) << "device context : " << dev_ctxs.size() << " gpu_id " << gpu_id; - - // f::Scope &local_scope = g_scope.NewScope(); + auto ctx = dev_ctxs.at(gpu_id); auto *send_tensor = scope->Var("st")->GetMutable(); auto *recv_tensor = scope->Var("rt")->GetMutable(); send_tensor->Resize(kDims); send_tensor->mutable_data(kDims, place); - // recv_tensor->mutable_data(kDims, place); std::vector send_vector(f::product(kDims), gpu_id); send_tensor->CopyFromVector(send_vector, *ctx); @@ -118,7 +91,7 @@ void DeviceProgram(int gpu_id, const f::OpDescBind &op_desc, f::Scope *scope) { "Tensor numel not match!"); ctx->Wait(); - VLOG(1) << send_tensor->numel() << " element in send tensor"; + VLOG(1) << "Send Tensor filled with elements " << send_tensor->numel(); auto op = f::OpRegistry::CreateOp(*op1); VLOG(1) << "Device : " << gpu_id << " invoke " << op_desc.Type(); @@ -128,14 +101,10 @@ void DeviceProgram(int gpu_id, const f::OpDescBind &op_desc, f::Scope *scope) { // ncclAllReduceOp with desc TEST(NCCL, ncclAllReduceOp) { - f::ProgramDescBind program; - f::BlockDescBind *block = program.Block(0); - f::OpDescBind *op1 = block->AppendOp(); - - p::DeviceContext *ctx = new p::CPUDeviceContext(p::CPUPlace()); - - CreateContext(); + std::unique_ptr ctx(new p::CPUDeviceContext(p::CPUPlace())); + std::unique_ptr g_scope(new Scope); + std::unique_ptr op1(new f::OpDescBind); op1->SetType("ncclInit"); op1->SetOutput("Communicator", {"comm"}); op1->SetAttr("gpus", {gpu_list}); @@ -149,7 +118,7 @@ TEST(NCCL, ncclAllReduceOp) { VLOG(1) << "NCCLInitOp finished."; delete ctx; - f::OpDescBind *op2 = new f::OpDescBind; + std::unique_ptr op2(new f::OpDescBind); op2->SetType("ncclAllReduce"); op2->SetInput("X", {"st"}); op2->SetInput("Communicator", {"comm"}); @@ -159,61 +128,89 @@ TEST(NCCL, ncclAllReduceOp) { for (size_t i = 0; i < gpu_list.size(); ++i) { std::thread th(DeviceProgram, gpu_list[i], *op2, &g_scope.NewScope()); - // std::thread th([=](){ - // VLOG(1) << "thread id created : " << i; - // return 1;}); ths.emplace_back(std::move(th)); } for (size_t i = 0; i < gpu_list.size(); ++i) { - VLOG(1) << " thread joined! " << i; ths[i].join(); } - VLOG(1) << " main thread joined!"; + g_scope->reset(nullptr); +} + +// ncclReduceOp with desc +TEST(NCCL, ncclReduceOp) { + std::unique_ptr ctx(new p::CPUDeviceContext(p::CPUPlace())); + std::unique_ptr g_scope(new Scope); + + std::unique_ptr op1(new f::OpDescBind); + op1->SetType("ncclInit"); + op1->SetOutput("Communicator", {"comm"}); + op1->SetAttr("gpus", {gpu_list}); + + auto *var = g_scope.Var("comm"); + var->GetMutable(); + + auto op = f::OpRegistry::CreateOp(*op1); + VLOG(1) << "invoke NCCLInitOp."; + op->Run(g_scope, *ctx); + VLOG(1) << "NCCLInitOp finished."; + delete ctx; + + std::unique_ptr op2(new f::OpDescBind); + op2->SetType("ncclReduce"); + op2->SetInput("X", {"st"}); + op2->SetInput("Communicator", {"comm"}); + op2->SetOutput("Out", {"rt"}); - delete op2; - g_scope.~Scope(); - DestroyContext(); - VLOG(1) << " destory contexts"; + std::vector ths; + for (size_t i = 0; i < gpu_list.size(); ++i) { + std::thread th(DeviceProgram, gpu_list[i], *op2, + &g_scope.NewScope()); + ths.emplace_back(std::move(th)); + } + + for (size_t i = 0; i < gpu_list.size(); ++i) { + ths[i].join(); + } + g_scope->reset(nullptr); } // ncclBcastOp with desc -// TEST(NCCL, ncclBcastOp) { -// f::ProgramDescBind program; -// f::BlockDescBind *block = program.Block(0); -// f::OpDescBind *op1= block->AppendOp(); - -// p::DeviceContext *ctx = -// new p::CPUDeviceContext(p::CPUPlace()); - -// op1->SetType("ncclInit"); -// op1->SetOutput("Communicator", {"comm"}); -// op1->SetAttr("gpus", {gpu_list}); - -// auto *var = g_scope.Var("comm"); -// var->GetMutable(); - -// auto op = f::OpRegistry::CreateOp(*op1); -// VLOG(1) << "invoke NCCLInitOp."; -// op->Run(g_scope, *ctx); -// VLOG(1) << "NCCLInitOp finished."; - -// f::OpDescBind *op2 = new f::OpDescBind; -// op2->SetType("ncclBcastSend"); -// op2->SetInput("X", {"st"}); -// op2->SetInput("Communicator", {"comm"}); -// op2->SetOutput("Out", {"rt"}); - -// std::vector ths; -// for (size_t i=0; i < gpu_list.size(); ++i) { -// std::thread th(DeviceProgram, gpu_list[i], *op2); -// ths.emplace_back(std::move(th)); -// } - -// for (size_t i=0; i < gpu_list.size(); ++i) { -// ths[i].join(); -// } -// } +TEST(NCCL, ncclBcastOp) { + f::ProgramDescBind program; + f::BlockDescBind *block = program.Block(0); + f::OpDescBind *op1 = block->AppendOp(); + + p::DeviceContext *ctx = new p::CPUDeviceContext(p::CPUPlace()); + + op1->SetType("ncclInit"); + op1->SetOutput("Communicator", {"comm"}); + op1->SetAttr("gpus", {gpu_list}); + + auto *var = g_scope.Var("comm"); + var->GetMutable(); + + auto op = f::OpRegistry::CreateOp(*op1); + VLOG(1) << "invoke NCCLInitOp."; + op->Run(g_scope, *ctx); + VLOG(1) << "NCCLInitOp finished."; + + f::OpDescBind *op2 = new f::OpDescBind; + op2->SetType("ncclBcastSend"); + op2->SetInput("X", {"st"}); + op2->SetInput("Communicator", {"comm"}); + op2->SetOutput("Out", {"rt"}); + + std::vector ths; + for (size_t i = 0; i < gpu_list.size(); ++i) { + std::thread th(DeviceProgram, gpu_list[i], *op2); + ths.emplace_back(std::move(th)); + } + + for (size_t i = 0; i < gpu_list.size(); ++i) { + ths[i].join(); + } +} int main(int argc, char **argv) { const int dev_count = p::GetCUDADeviceCount(); @@ -228,5 +225,12 @@ int main(int argc, char **argv) { gpu_list.emplace_back(i); } testing::InitGoogleTest(&argc, argv); + + // device context should be release before scope. + // otherwise driver will down. + for (size_t i = 0; i < gpu_list.size(); ++i) { + p::GPUPlace place(i); + dev_ctxs.emplace_back(new p::CUDADeviceContext(place)); + } return RUN_ALL_TESTS(); }