提交 11cf3e3a 编写于 作者: D Dong Zhihong

"refactorization of nccl test case"

上级 6d1493a4
......@@ -43,33 +43,48 @@ namespace f = paddle::framework;
namespace p = paddle::platform;
static std::vector<int> gpu_list;
static std::vector<std::unique_ptr<p::DeviceContext>> dev_ctxs;
std::mutex mu;
// test data amount
const f::DDim kDims = {100, 100};
// ncclInitOp with desc
TEST(NCCL, ncclInitOp) {
std::unique_ptr<f::OpDescBind> op_desc(new f::OpDescBind);
// nccl op common tester, init communicator.
class NCCLTester : public ::testing::Test {
public:
virtual void SetUp() override {
cpu_ctx = new p::CPUDeviceContext(p::CPUPlace());
for (size_t i = 0; i < gpu_list.size(); ++i) {
p::GPUPlace place(i);
dev_ctxs.emplace_back(new p::CUDADeviceContext(place));
}
op_desc->SetType("ncclInit");
op_desc->SetOutput("Communicator", {"x1"});
op_desc->SetAttr("gpus", {gpu_list});
f::Scope g_scope;
p::DeviceContext *ctx = new p::CPUDeviceContext(p::CPUPlace());
NCCLInitOp();
}
auto *var = g_scope.Var("x1");
virtual void TearDown() override {
for (auto &device_context : dev_ctxs) {
delete device_context;
}
}
void NCCLInitOp() {
std::unique_ptr<f::OpDescBind> op1(new f::OpDescBind);
op1->SetType("ncclInit");
op1->SetOutput("Communicator", {"comm"});
op1->SetAttr("gpus", {gpu_list});
auto *var = g_scope.Var("comm");
var->GetMutable<p::Communicator>();
auto op = f::OpRegistry::CreateOp(*op_desc);
auto op = f::OpRegistry::CreateOp(*op1);
VLOG(1) << "invoke NCCLInitOp.";
op->Run(g_scope, *ctx);
op->Run(g_scope, *cpu_ctx);
VLOG(1) << "NCCLInitOp finished.";
}
}
template <class T>
void DeviceProgram(int gpu_id, const f::OpDescBind &op_desc, f::Scope *scope) {
template <class T>
void PerThreadProgram(int gpu_id, const f::OpDescBind &op_desc,
f::Scope *scope) {
std::unique_lock<std::mutex> lk(mu);
f::ProgramDescBind program;
f::BlockDescBind *block = program.Block(0);
......@@ -77,7 +92,7 @@ void DeviceProgram(int gpu_id, const f::OpDescBind &op_desc, f::Scope *scope) {
*op1 = op_desc;
p::GPUPlace place(gpu_id);
auto ctx = dev_ctxs.at(gpu_id);
auto &ctx = dev_ctxs.at(gpu_id);
auto *send_tensor = scope->Var("st")->GetMutable<f::LoDTensor>();
auto *recv_tensor = scope->Var("rt")->GetMutable<f::LoDTensor>();
......@@ -97,27 +112,38 @@ void DeviceProgram(int gpu_id, const f::OpDescBind &op_desc, f::Scope *scope) {
VLOG(1) << "Device : " << gpu_id << " invoke " << op_desc.Type();
op->Run(*scope, *ctx);
VLOG(1) << "Device : " << gpu_id << " finished " << op_desc.Type();
}
}
// ncclAllReduceOp with desc
TEST(NCCL, ncclAllReduceOp) {
std::unique_ptr<p::DeviceContext> ctx(new p::CPUDeviceContext(p::CPUPlace()));
std::unique_ptr<f::Scope> g_scope(new Scope);
public:
std::vector<p::DeviceContext *> dev_ctxs;
p::DeviceContext *cpu_ctx;
f::Scope g_scope;
std::mutex mu;
};
std::unique_ptr<f::OpDescBind> op1(new f::OpDescBind);
op1->SetType("ncclInit");
op1->SetOutput("Communicator", {"comm"});
op1->SetAttr("gpus", {gpu_list});
// ncclInitOp with desc
// TEST(NCCL, ncclInitOp) {
// std::unique_ptr<f::OpDescBind> op_desc(new f::OpDescBind);
auto *var = g_scope.Var("comm");
var->GetMutable<p::Communicator>();
// op_desc->SetType("ncclInit");
// op_desc->SetOutput("Communicator", {"x1"});
// op_desc->SetAttr("gpus", {gpu_list});
auto op = f::OpRegistry::CreateOp(*op1);
VLOG(1) << "invoke NCCLInitOp.";
op->Run(g_scope, *ctx);
VLOG(1) << "NCCLInitOp finished.";
delete ctx;
// f::Scope g_scope;
// std::unique_ptr<p::DeviceContext> ctx(new
// p::CPUDeviceContext(p::CPUPlace()));
// auto *var = g_scope.Var("x1");
// var->GetMutable<p::Communicator>();
// auto op = f::OpRegistry::CreateOp(*op_desc);
// VLOG(1) << "invoke NCCLInitOp.";
// op->Run(g_scope, *ctx.get());
// VLOG(1) << "NCCLInitOp finished.";
// }
// ncclAllReduceOp with desc
TEST_F(NCCLTester, ncclAllReduceOp) {
std::unique_ptr<f::OpDescBind> op2(new f::OpDescBind);
op2->SetType("ncclAllReduce");
op2->SetInput("X", {"st"});
......@@ -126,36 +152,18 @@ TEST(NCCL, ncclAllReduceOp) {
std::vector<std::thread> ths;
for (size_t i = 0; i < gpu_list.size(); ++i) {
std::thread th(DeviceProgram<float>, gpu_list[i], *op2,
&g_scope.NewScope());
std::thread th(&NCCLTester::PerThreadProgram<float>, this, gpu_list[i],
*op2.get(), &g_scope.NewScope());
ths.emplace_back(std::move(th));
}
for (size_t i = 0; i < gpu_list.size(); ++i) {
ths[i].join();
}
g_scope->reset(nullptr);
}
// ncclReduceOp with desc
TEST(NCCL, ncclReduceOp) {
std::unique_ptr<p::DeviceContext> ctx(new p::CPUDeviceContext(p::CPUPlace()));
std::unique_ptr<f::Scope> g_scope(new Scope);
std::unique_ptr<f::OpDescBind> op1(new f::OpDescBind);
op1->SetType("ncclInit");
op1->SetOutput("Communicator", {"comm"});
op1->SetAttr("gpus", {gpu_list});
auto *var = g_scope.Var("comm");
var->GetMutable<p::Communicator>();
auto op = f::OpRegistry::CreateOp(*op1);
VLOG(1) << "invoke NCCLInitOp.";
op->Run(g_scope, *ctx);
VLOG(1) << "NCCLInitOp finished.";
delete ctx;
std::unique_ptr<f::OpDescBind> op2(new f::OpDescBind);
op2->SetType("ncclReduce");
op2->SetInput("X", {"st"});
......@@ -164,53 +172,36 @@ TEST(NCCL, ncclReduceOp) {
std::vector<std::thread> ths;
for (size_t i = 0; i < gpu_list.size(); ++i) {
std::thread th(DeviceProgram<float>, gpu_list[i], *op2,
&g_scope.NewScope());
std::thread th(&NCCLTester::PerThreadProgram<float>, this, gpu_list[i],
*op2.get(), &g_scope.NewScope());
ths.emplace_back(std::move(th));
}
for (size_t i = 0; i < gpu_list.size(); ++i) {
ths[i].join();
}
g_scope->reset(nullptr);
}
// ncclBcastOp with desc
TEST(NCCL, ncclBcastOp) {
f::ProgramDescBind program;
f::BlockDescBind *block = program.Block(0);
f::OpDescBind *op1 = block->AppendOp();
p::DeviceContext *ctx = new p::CPUDeviceContext(p::CPUPlace());
op1->SetType("ncclInit");
op1->SetOutput("Communicator", {"comm"});
op1->SetAttr("gpus", {gpu_list});
auto *var = g_scope.Var("comm");
var->GetMutable<p::Communicator>();
auto op = f::OpRegistry::CreateOp(*op1);
VLOG(1) << "invoke NCCLInitOp.";
op->Run(g_scope, *ctx);
VLOG(1) << "NCCLInitOp finished.";
f::OpDescBind *op2 = new f::OpDescBind;
op2->SetType("ncclBcastSend");
op2->SetInput("X", {"st"});
op2->SetInput("Communicator", {"comm"});
op2->SetOutput("Out", {"rt"});
std::vector<std::thread> ths;
for (size_t i = 0; i < gpu_list.size(); ++i) {
std::thread th(DeviceProgram<float>, gpu_list[i], *op2);
ths.emplace_back(std::move(th));
}
for (size_t i = 0; i < gpu_list.size(); ++i) {
ths[i].join();
}
}
// TEST(NCCL, ncclBcastOp) {
// std::unique_ptr<f::OpDescBind> op2(new f::OpDescBind);
// op2->SetType("ncclBcastSend");
// op2->SetInput("X", {"st"});
// op2->SetInput("Communicator", {"comm"});
// op2->SetOutput("Out", {"rt"});
// std::vector<std::thread> ths;
// for (size_t i = 0; i < gpu_list.size(); ++i) {
// std::thread th(&NCCLTester::PerThreadProgram<float>, this, gpu_list[i],
// *op2.get(),
// &g_scope.NewScope());
// ths.emplace_back(std::move(th));
// }
// for (size_t i = 0; i < gpu_list.size(); ++i) {
// ths[i].join();
// }
// }
int main(int argc, char **argv) {
const int dev_count = p::GetCUDADeviceCount();
......@@ -228,9 +219,5 @@ int main(int argc, char **argv) {
// device context should be release before scope.
// otherwise driver will down.
for (size_t i = 0; i < gpu_list.size(); ++i) {
p::GPUPlace place(i);
dev_ctxs.emplace_back(new p::CUDADeviceContext(place));
}
return RUN_ALL_TESTS();
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册