提交 6d1493a4 编写于 作者: D Dong Zhihong

"add bcast c++ test case"

上级 5200c657
......@@ -74,15 +74,8 @@ class NCCLAllReduceOp : public framework::OperatorWithKernel {
// reduction == "ncclMin" || reduction == "ncclMax"),
// "invalid reduction.");
// auto in_dim = x_dims[0];
ctx->SetOutputsDim("Out", x_dims);
ctx->ShareLoD("X", /*->*/ "Out");
size_t N = x_dims.size();
auto out_dims = ctx->GetOutputsDim("Out");
for (size_t i = 0; i < N; ++i) {
VLOG(1) << " inference (X) " << framework::product(x_dims[i]) << " (Out)"
<< framework::product(out_dims[i]);
}
}
};
......
......@@ -58,12 +58,6 @@ class NCCLAllReduceKernel : public framework::OpKernel<T> {
boost::get<platform::GPUPlace>(ctx.GetPlace()).GetDeviceId();
int idx = comm->GetCommId(device_id);
size_t N = ins.size();
for (size_t i = 0; i < N; ++i) {
VLOG(1) << " inference (X) " << framework::product(ins[i]->dims())
<< " (Out)" << framework::product(outs[i]->dims());
}
for (size_t i = 0; i < ins.size(); ++i) {
VLOG(1) << " invoke allreduce. send " << ins[i]->numel() << " recv "
<< outs[i]->numel();
......@@ -87,8 +81,8 @@ class NCCLReduceKernel : public framework::OpKernel<T> {
PADDLE_ENFORCE(platform::is_gpu_place(ctx.GetPlace()),
"This kernel only runs on GPU device.");
auto ins = ctx.MultiInput<Tensor>("X"); // x0, x1, x2
auto outs = ctx.MultiOutput<Tensor>("Out");
auto ins = ctx.MultiInput<LoDTensor>("X"); // x0, x1, x2
auto outs = ctx.MultiOutput<LoDTensor>("Out");
auto* comm = ctx.Input<Communicator>("Communicator");
......@@ -108,10 +102,17 @@ class NCCLReduceKernel : public framework::OpKernel<T> {
if (root == device_id) {
recvbuffer = outs[i]->mutable_data<T>(ctx.GetPlace());
}
VLOG(1) << " invoke reduce. send " << ins[i]->numel() << " recv "
<< outs[i]->numel();
PADDLE_ENFORCE(platform::dynload::ncclReduce(
ins[i]->data<T>(), recvbuffer, ins[i]->numel(),
NCCLTypeWrapper<T>::type, ncclSum, root, comm->comms_[idx], stream));
PADDLE_ENFORCE(cudaStreamSynchronize(stream));
VLOG(1) << " finished reduce. send " << ins[i]->numel() << " recv "
<< outs[i]->numel();
}
}
};
......
......@@ -16,7 +16,7 @@
#include <glog/logging.h>
#include <gtest/gtest.h>
#include <thrust/device_vector.h>
#include <memory>
#include <mutex>
#include <thread>
#include <utility>
......@@ -24,6 +24,7 @@
#include "paddle/framework/block_desc.h"
#include "paddle/framework/op_desc.h"
#include "paddle/framework/op_registry.h"
#include "paddle/framework/program_desc.h"
#include "paddle/framework/var_desc.h"
#include "paddle/operators/nccl/nccl_gpu_common.h"
......@@ -32,8 +33,6 @@
#include "paddle/platform/gpu_info.h"
#include "paddle/platform/place.h"
#include "paddle/framework/op_registry.h"
USE_NO_KERNEL_OP(ncclInit);
USE_GPU_ONLY_OP(ncclAllReduce);
USE_GPU_ONLY_OP(ncclReduce);
......@@ -44,51 +43,31 @@ namespace f = paddle::framework;
namespace p = paddle::platform;
static std::vector<int> gpu_list;
static std::vector<std::unique_ptr<p::DeviceContext>> dev_ctxs;
std::mutex mu;
// test data amount
const f::DDim kDims = {100, 100};
// ncclInitOp with desc
// TEST(NCCL, ncclInitOp) {
// f::ProgramDescBind program;
// f::BlockDescBind *block = program.Block(0);
// f::OpDescBind *op_desc = block->AppendOp();
// op_desc->SetType("ncclInit");
// op_desc->SetOutput("Communicator", {"x1"});
// op_desc->SetAttr("gpus", {gpu_list});
// f::Scope g_scope;
// p::DeviceContext *ctx =
// new p::CPUDeviceContext(p::CPUPlace());
// auto *var = g_scope.Var("x1");
// var->GetMutable<p::Communicator>();
// auto op = f::OpRegistry::CreateOp(*op_desc);
// VLOG(1) << "invoke NCCLInitOp.";
// op->Run(g_scope, *ctx);
// VLOG(1) << "NCCLInitOp finished.";
// }
TEST(NCCL, ncclInitOp) {
std::unique_ptr<f::OpDescBind> op_desc(new f::OpDescBind);
// test data amount
static const f::DDim kDims = {100, 100};
static std::vector<p::DeviceContext *> dev_ctxs;
op_desc->SetType("ncclInit");
op_desc->SetOutput("Communicator", {"x1"});
op_desc->SetAttr("gpus", {gpu_list});
f::Scope g_scope;
p::DeviceContext *ctx = new p::CPUDeviceContext(p::CPUPlace());
void CreateContext() {
for (size_t i = 0; i < gpu_list.size(); ++i) {
p::GPUPlace place(i);
VLOG(1) << "create devicecontext : " << i;
dev_ctxs.emplace_back(new p::CUDADeviceContext(place));
}
}
auto *var = g_scope.Var("x1");
var->GetMutable<p::Communicator>();
void DestroyContext() {
for (size_t i = 0; i < gpu_list.size(); ++i) {
delete dev_ctxs[i];
}
auto op = f::OpRegistry::CreateOp(*op_desc);
VLOG(1) << "invoke NCCLInitOp.";
op->Run(g_scope, *ctx);
VLOG(1) << "NCCLInitOp finished.";
}
// global scope
static f::Scope g_scope;
std::mutex mu;
template <class T>
void DeviceProgram(int gpu_id, const f::OpDescBind &op_desc, f::Scope *scope) {
std::unique_lock<std::mutex> lk(mu);
......@@ -98,18 +77,12 @@ void DeviceProgram(int gpu_id, const f::OpDescBind &op_desc, f::Scope *scope) {
*op1 = op_desc;
p::GPUPlace place(gpu_id);
// p::DeviceContext *ctx =
// new p::CUDADeviceContext(place);
p::DeviceContext *ctx = dev_ctxs.at(gpu_id);
VLOG(1) << "device context : " << dev_ctxs.size() << " gpu_id " << gpu_id;
// f::Scope &local_scope = g_scope.NewScope();
auto ctx = dev_ctxs.at(gpu_id);
auto *send_tensor = scope->Var("st")->GetMutable<f::LoDTensor>();
auto *recv_tensor = scope->Var("rt")->GetMutable<f::LoDTensor>();
send_tensor->Resize(kDims);
send_tensor->mutable_data<T>(kDims, place);
// recv_tensor->mutable_data<T>(kDims, place);
std::vector<T> send_vector(f::product(kDims), gpu_id);
send_tensor->CopyFromVector<T>(send_vector, *ctx);
......@@ -118,7 +91,7 @@ void DeviceProgram(int gpu_id, const f::OpDescBind &op_desc, f::Scope *scope) {
"Tensor numel not match!");
ctx->Wait();
VLOG(1) << send_tensor->numel() << " element in send tensor";
VLOG(1) << "Send Tensor filled with elements " << send_tensor->numel();
auto op = f::OpRegistry::CreateOp(*op1);
VLOG(1) << "Device : " << gpu_id << " invoke " << op_desc.Type();
......@@ -128,14 +101,10 @@ void DeviceProgram(int gpu_id, const f::OpDescBind &op_desc, f::Scope *scope) {
// ncclAllReduceOp with desc
TEST(NCCL, ncclAllReduceOp) {
f::ProgramDescBind program;
f::BlockDescBind *block = program.Block(0);
f::OpDescBind *op1 = block->AppendOp();
p::DeviceContext *ctx = new p::CPUDeviceContext(p::CPUPlace());
CreateContext();
std::unique_ptr<p::DeviceContext> ctx(new p::CPUDeviceContext(p::CPUPlace()));
std::unique_ptr<f::Scope> g_scope(new Scope);
std::unique_ptr<f::OpDescBind> op1(new f::OpDescBind);
op1->SetType("ncclInit");
op1->SetOutput("Communicator", {"comm"});
op1->SetAttr("gpus", {gpu_list});
......@@ -149,7 +118,7 @@ TEST(NCCL, ncclAllReduceOp) {
VLOG(1) << "NCCLInitOp finished.";
delete ctx;
f::OpDescBind *op2 = new f::OpDescBind;
std::unique_ptr<f::OpDescBind> op2(new f::OpDescBind);
op2->SetType("ncclAllReduce");
op2->SetInput("X", {"st"});
op2->SetInput("Communicator", {"comm"});
......@@ -159,61 +128,89 @@ TEST(NCCL, ncclAllReduceOp) {
for (size_t i = 0; i < gpu_list.size(); ++i) {
std::thread th(DeviceProgram<float>, gpu_list[i], *op2,
&g_scope.NewScope());
// std::thread th([=](){
// VLOG(1) << "thread id created : " << i;
// return 1;});
ths.emplace_back(std::move(th));
}
for (size_t i = 0; i < gpu_list.size(); ++i) {
VLOG(1) << " thread joined! " << i;
ths[i].join();
}
VLOG(1) << " main thread joined!";
g_scope->reset(nullptr);
}
// ncclReduceOp with desc
TEST(NCCL, ncclReduceOp) {
std::unique_ptr<p::DeviceContext> ctx(new p::CPUDeviceContext(p::CPUPlace()));
std::unique_ptr<f::Scope> g_scope(new Scope);
std::unique_ptr<f::OpDescBind> op1(new f::OpDescBind);
op1->SetType("ncclInit");
op1->SetOutput("Communicator", {"comm"});
op1->SetAttr("gpus", {gpu_list});
auto *var = g_scope.Var("comm");
var->GetMutable<p::Communicator>();
auto op = f::OpRegistry::CreateOp(*op1);
VLOG(1) << "invoke NCCLInitOp.";
op->Run(g_scope, *ctx);
VLOG(1) << "NCCLInitOp finished.";
delete ctx;
std::unique_ptr<f::OpDescBind> op2(new f::OpDescBind);
op2->SetType("ncclReduce");
op2->SetInput("X", {"st"});
op2->SetInput("Communicator", {"comm"});
op2->SetOutput("Out", {"rt"});
delete op2;
g_scope.~Scope();
DestroyContext();
VLOG(1) << " destory contexts";
std::vector<std::thread> ths;
for (size_t i = 0; i < gpu_list.size(); ++i) {
std::thread th(DeviceProgram<float>, gpu_list[i], *op2,
&g_scope.NewScope());
ths.emplace_back(std::move(th));
}
for (size_t i = 0; i < gpu_list.size(); ++i) {
ths[i].join();
}
g_scope->reset(nullptr);
}
// ncclBcastOp with desc
// TEST(NCCL, ncclBcastOp) {
// f::ProgramDescBind program;
// f::BlockDescBind *block = program.Block(0);
// f::OpDescBind *op1= block->AppendOp();
// p::DeviceContext *ctx =
// new p::CPUDeviceContext(p::CPUPlace());
// op1->SetType("ncclInit");
// op1->SetOutput("Communicator", {"comm"});
// op1->SetAttr("gpus", {gpu_list});
// auto *var = g_scope.Var("comm");
// var->GetMutable<p::Communicator>();
// auto op = f::OpRegistry::CreateOp(*op1);
// VLOG(1) << "invoke NCCLInitOp.";
// op->Run(g_scope, *ctx);
// VLOG(1) << "NCCLInitOp finished.";
// f::OpDescBind *op2 = new f::OpDescBind;
// op2->SetType("ncclBcastSend");
// op2->SetInput("X", {"st"});
// op2->SetInput("Communicator", {"comm"});
// op2->SetOutput("Out", {"rt"});
// std::vector<std::thread> ths;
// for (size_t i=0; i < gpu_list.size(); ++i) {
// std::thread th(DeviceProgram<float>, gpu_list[i], *op2);
// ths.emplace_back(std::move(th));
// }
// for (size_t i=0; i < gpu_list.size(); ++i) {
// ths[i].join();
// }
// }
TEST(NCCL, ncclBcastOp) {
f::ProgramDescBind program;
f::BlockDescBind *block = program.Block(0);
f::OpDescBind *op1 = block->AppendOp();
p::DeviceContext *ctx = new p::CPUDeviceContext(p::CPUPlace());
op1->SetType("ncclInit");
op1->SetOutput("Communicator", {"comm"});
op1->SetAttr("gpus", {gpu_list});
auto *var = g_scope.Var("comm");
var->GetMutable<p::Communicator>();
auto op = f::OpRegistry::CreateOp(*op1);
VLOG(1) << "invoke NCCLInitOp.";
op->Run(g_scope, *ctx);
VLOG(1) << "NCCLInitOp finished.";
f::OpDescBind *op2 = new f::OpDescBind;
op2->SetType("ncclBcastSend");
op2->SetInput("X", {"st"});
op2->SetInput("Communicator", {"comm"});
op2->SetOutput("Out", {"rt"});
std::vector<std::thread> ths;
for (size_t i = 0; i < gpu_list.size(); ++i) {
std::thread th(DeviceProgram<float>, gpu_list[i], *op2);
ths.emplace_back(std::move(th));
}
for (size_t i = 0; i < gpu_list.size(); ++i) {
ths[i].join();
}
}
int main(int argc, char **argv) {
const int dev_count = p::GetCUDADeviceCount();
......@@ -228,5 +225,12 @@ int main(int argc, char **argv) {
gpu_list.emplace_back(i);
}
testing::InitGoogleTest(&argc, argv);
// device context should be release before scope.
// otherwise driver will down.
for (size_t i = 0; i < gpu_list.size(); ++i) {
p::GPUPlace place(i);
dev_ctxs.emplace_back(new p::CUDADeviceContext(place));
}
return RUN_ALL_TESTS();
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册