提交 38d3adfe 编写于 作者: D Dong Zhihong

"add multioperator testcase"

上级 94992a99
......@@ -100,8 +100,8 @@ class NCCLReduceOp : public framework::OperatorWithKernel {
}
};
// BcastSendOp
class NCCLBcastSendOp : public framework::OperatorWithKernel {
// BcastOp
class NCCLBcastOp : public framework::OperatorWithKernel {
public:
using framework::OperatorWithKernel::OperatorWithKernel;
......@@ -111,20 +111,12 @@ class NCCLBcastSendOp : public framework::OperatorWithKernel {
" Input(X) of Bcast op input should not be NULL");
PADDLE_ENFORCE(ctx->HasInput("Communicator"),
" Input(Communicator) of Bcast op input should not be NULL");
}
};
// BcastRecvOp
class NCCLBcastRecvOp : public framework::OperatorWithKernel {
public:
using framework::OperatorWithKernel::OperatorWithKernel;
protected:
void InferShape(framework::InferShapeContext *ctx) const override {
PADDLE_ENFORCE(ctx->HasInput("Communicator"),
" Input(Communicator) of Bcast op input should not be NULL");
PADDLE_ENFORCE(ctx->HasOutput("Out"),
" Output(Out) of Bcast op output should not be NULL");
auto x_dims = ctx->GetInputsDim("X");
ctx->SetOutputsDim("Out", x_dims);
ctx->ShareLoD("X", /*->*/ "Out");
}
};
......@@ -146,52 +138,41 @@ class NCCLAllReduceOpMaker : public framework::OpProtoAndCheckerMaker {
}
};
// BcastSend should be in the root
// BcastSendOp
class NCCLBcastSendOpMaker : public framework::OpProtoAndCheckerMaker {
// ReduceOp
class NCCLReduceOpMaker : public framework::OpProtoAndCheckerMaker {
public:
NCCLBcastSendOpMaker(framework::OpProto *proto,
NCCLReduceOpMaker(framework::OpProto *proto,
framework::OpAttrChecker *op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) {
AddInput("X", "The input of BcastSend op");
AddInput("X", "The input of Reduce op");
AddInput("Communicator", "Communicator for communicating between gpus");
AddAttr<int>("root", "root gpu of Bcast");
AddOutput("Out", "The output of Reduce op");
AddAttr<int>("root",
"root gpu of the parameter. if not set(-1). hashed by name.")
.SetDefault(-1);
AddComment(R"DOC(
Bcast the tensors.
)DOC");
Reduce the tensors)DOC");
}
};
// BcastOp
class NCCLBcastRecvOpMaker : public framework::OpProtoAndCheckerMaker {
class NCCLBcastOpMaker : public framework::OpProtoAndCheckerMaker {
public:
NCCLBcastRecvOpMaker(framework::OpProto *proto,
NCCLBcastOpMaker(framework::OpProto *proto,
framework::OpAttrChecker *op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) {
AddInput("X", "The input of BcastSend op");
AddInput("Communicator", "Communicator for communicating between gpus");
AddAttr<int>("root", "root gpu of BcastRecv");
AddOutput("Out", "The output of Bcast");
AddAttr<int>("root",
"root gpu of the parameter. if not set(-1). hashed by name.")
.SetDefault(-1);
AddComment(R"DOC(
Bcast the tensors.
)DOC");
}
};
// BcastRecvOp
class NCCLReduceOpMaker : public framework::OpProtoAndCheckerMaker {
public:
NCCLReduceOpMaker(framework::OpProto *proto,
framework::OpAttrChecker *op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) {
AddInput("X", "The input of Reduce op");
AddInput("Communicator", "Communicator for communicating between gpus");
AddOutput("Out", "The output of Reduce op");
AddComment(R"DOC(
Reduce the tensors.
)DOC");
}
};
} // namespace operators
} // namespace paddle
......@@ -201,9 +182,7 @@ REGISTER_OPERATOR(ncclInit, ops::NCCLInitOp,
REGISTER_OP_WITHOUT_GRADIENT(ncclAllReduce, ops::NCCLAllReduceOp,
ops::NCCLAllReduceOpMaker);
REGISTER_OP_WITHOUT_GRADIENT(ncclBcastSend, ops::NCCLBcastSendOp,
ops::NCCLBcastSendOpMaker);
REGISTER_OP_WITHOUT_GRADIENT(ncclBcastRecv, ops::NCCLBcastRecvOp,
ops::NCCLBcastRecvOpMaker);
REGISTER_OP_WITHOUT_GRADIENT(ncclBcast, ops::NCCLBcastOp,
ops::NCCLBcastOpMaker);
REGISTER_OP_WITHOUT_GRADIENT(ncclReduce, ops::NCCLReduceOp,
ops::NCCLReduceOpMaker);
......@@ -83,6 +83,7 @@ class NCCLReduceKernel : public framework::OpKernel<T> {
auto ins = ctx.MultiInput<LoDTensor>("X"); // x0, x1, x2
auto outs = ctx.MultiOutput<LoDTensor>("Out");
int root = ctx.Attr<int>("root");
auto* comm = ctx.Input<Communicator>("Communicator");
......@@ -97,7 +98,9 @@ class NCCLReduceKernel : public framework::OpKernel<T> {
auto ins_names = ctx.Inputs("X");
std::hash<std::string> hasher;
for (size_t i = 0; i < ins.size(); ++i) {
int root = hasher(ins_names[i]) % comm->comms_.size();
if (root == -1) {
root = hasher(ins_names[i]) % comm->comms_.size();
}
T* recvbuffer = nullptr;
if (root == device_id) {
recvbuffer = outs[i]->mutable_data<T>(ctx.GetPlace());
......@@ -135,8 +138,9 @@ class NCCLBcastKernel : public framework::OpKernel<T> {
int device_id =
boost::get<platform::GPUPlace>(ctx.GetPlace()).GetDeviceId();
int idx = comm->GetCommId(device_id);
if (idx == root) {
auto ins = ctx.MultiInput<Tensor>("X");
auto ins = ctx.MultiInput<LoDTensor>("X");
for (size_t i = 0; i < ins.size(); ++i) {
PADDLE_ENFORCE(platform::dynload::ncclBcast(
(void*)ins[i]->data<T>(), ins[i]->numel(), NCCLTypeWrapper<T>::type,
......@@ -144,7 +148,7 @@ class NCCLBcastKernel : public framework::OpKernel<T> {
PADDLE_ENFORCE(cudaStreamSynchronize(stream));
}
} else {
auto outs = ctx.MultiOutput<Tensor>("Out");
auto outs = ctx.MultiOutput<LoDTensor>("Out");
for (size_t i = 0; i < outs.size(); ++i) {
PADDLE_ENFORCE(platform::dynload::ncclBcast(
outs[i]->mutable_data<T>(ctx.GetPlace()), outs[i]->numel(),
......@@ -160,6 +164,5 @@ class NCCLBcastKernel : public framework::OpKernel<T> {
namespace ops = paddle::operators;
REGISTER_OP_GPU_KERNEL(ncclAllReduce, ops::NCCLAllReduceKernel<float>);
REGISTER_OP_GPU_KERNEL(ncclBcastSend, ops::NCCLBcastKernel<float>);
REGISTER_OP_GPU_KERNEL(ncclBcast, ops::NCCLBcastKernel<float>);
REGISTER_OP_GPU_KERNEL(ncclReduce, ops::NCCLReduceKernel<float>);
REGISTER_OP_GPU_KERNEL(ncclBcastRecv, ops::NCCLBcastKernel<float>);
......@@ -28,6 +28,7 @@
#include "paddle/framework/op_registry.h"
#include "paddle/framework/program_desc.h"
#include "paddle/framework/var_desc.h"
#include "paddle/operators/math/math_function.h"
#include "paddle/operators/nccl/nccl_gpu_common.h"
#include "paddle/platform/device_context.h"
#include "paddle/platform/enforce.h"
......@@ -37,8 +38,7 @@
USE_NO_KERNEL_OP(ncclInit);
USE_GPU_ONLY_OP(ncclAllReduce);
USE_GPU_ONLY_OP(ncclReduce);
USE_GPU_ONLY_OP(ncclBcastSend);
USE_GPU_ONLY_OP(ncclBcastRecv);
USE_GPU_ONLY_OP(ncclBcast);
namespace f = paddle::framework;
namespace p = paddle::platform;
......@@ -144,12 +144,62 @@ class NCCLTester : public ::testing::Test {
// }
// ncclAllReduceOp with desc
TEST_F(NCCLTester, ncclAllReduceOp) {
// TEST_F(NCCLTester, ncclAllReduceOp) {
// std::unique_ptr<f::OpDescBind> op2(new f::OpDescBind);
// op2->SetType("ncclAllReduce");
// op2->SetInput("X", {"st"});
// op2->SetInput("Communicator", {"comm"});
// op2->SetOutput("Out", {"rt"});
// std::vector<f::Scope *> dev_scopes;
// std::vector<std::thread> ths;
// for (size_t i = 0; i < gpu_list.size(); ++i) {
// dev_scopes.emplace_back(&g_scope.NewScope());
// std::thread th(&NCCLTester::PerThreadProgram<float>, this, gpu_list[i],
// *op2.get(), dev_scopes[i]);
// ths.emplace_back(std::move(th));
// }
// for (size_t i = 0; i < gpu_list.size(); ++i) {
// ths[i].join();
// }
// // check results
// float result = std::accumulate(gpu_list.begin(), gpu_list.end(), 0);
// for (size_t i = 0; i < dev_scopes.size(); ++i) {
// p::CPUPlace cpu_place;
// p::GPUPlace gpu_place(gpu_list[i]);
// auto &recv_tensor = dev_scopes[i]->FindVar("rt")->Get<f::LoDTensor>();
// auto *rt = recv_tensor.data<float>();
// auto *result_tensor =
// dev_scopes[i]->Var("ct")->GetMutable<f::LoDTensor>();
// result_tensor->Resize(kDims);
// auto *ct = result_tensor->mutable_data<float>(cpu_place);
// paddle::memory::Copy(
// cpu_place, ct, p::GPUPlace(gpu_list[i]), rt,
// recv_tensor.numel() * sizeof(float),
// static_cast<p::CUDADeviceContext *>(dev_ctxs[i])->stream());
// for (size_t j = 0; j < f::product(kDims); ++j) {
// ASSERT_NEAR(ct[j], result, 1e-5);
// }
// }
// }
// ncclAReduceOp with desc
TEST_F(NCCLTester, ncclReduceOp) {
std::unique_ptr<f::OpDescBind> op2(new f::OpDescBind);
op2->SetType("ncclAllReduce");
const int kRoot = 0;
op2->SetType("ncclReduce");
op2->SetInput("X", {"st"});
op2->SetInput("Communicator", {"comm"});
op2->SetOutput("Out", {"rt"});
op2->SetAttr("root", {kRoot});
std::vector<f::Scope *> dev_scopes;
......@@ -166,39 +216,43 @@ TEST_F(NCCLTester, ncclAllReduceOp) {
ths[i].join();
}
// check results
float result = 0;
std::accumulate(gpu_list.begin(), gpu_list.end(), result);
for (size_t i = 0; i < dev_scopes.size(); ++i) {
auto &recv_tensor = dev_scopes[i]->FindVar("rt")->Get<f::LoDTensor>();
auto *rt = recv_tensor.data<float>();
// check results on
float result = std::accumulate(gpu_list.begin(), gpu_list.end(), 0);
p::CPUPlace cpu_place;
auto *result_tensor = dev_scopes[i]->Var("ct")->GetMutable<f::LoDTensor>();
p::GPUPlace gpu_place(gpu_list[kRoot]);
auto &recv_tensor = dev_scopes[kRoot]->FindVar("rt")->Get<f::LoDTensor>();
auto *rt = recv_tensor.data<float>();
auto *result_tensor =
dev_scopes[kRoot]->Var("ct")->GetMutable<f::LoDTensor>();
result_tensor->Resize(kDims);
auto *ct = result_tensor->mutable_data<float>(cpu_place);
paddle::memory::Copy(
cpu_place, ct, p::GPUPlace(gpu_list[i]), rt,
cpu_place, ct, p::GPUPlace(gpu_list[kRoot]), rt,
recv_tensor.numel() * sizeof(float),
static_cast<p::CUDADeviceContext *>(dev_ctxs[i])->stream());
for (size_t j = 0; j < f::product(kDims); ++j) {
static_cast<p::CUDADeviceContext *>(dev_ctxs[kRoot])->stream());
for (int j = 0; j < f::product(kDims); ++j) {
ASSERT_NEAR(ct[j], result, 1e-5);
}
}
}
// ncclReduceOp with desc
TEST(NCCL, ncclReduceOp) {
// // ncclBcastOp with desc
TEST_F(NCCLTester, ncclBcastOp) {
std::unique_ptr<f::OpDescBind> op2(new f::OpDescBind);
op2->SetType("ncclReduce");
const int kRoot = 0;
op2->SetType("ncclBcast");
op2->SetInput("X", {"st"});
op2->SetInput("Communicator", {"comm"});
op2->SetOutput("Out", {"rt"});
op2->SetAttr("root", {kRoot});
std::vector<f::Scope *> dev_scopes;
std::vector<std::thread> ths;
for (size_t i = 0; i < gpu_list.size(); ++i) {
dev_scopes.emplace_back(&g_scope.NewScope());
std::thread th(&NCCLTester::PerThreadProgram<float>, this, gpu_list[i],
......@@ -210,76 +264,99 @@ TEST(NCCL, ncclReduceOp) {
ths[i].join();
}
// check results
float result = 0;
std::accumulate(gpu_list.begin(), gpu_list.end(), result);
for (size_t i = 0; i < dev_scopes.size(); ++i) {
auto &recv_tensor = dev_scopes[i]->FindVar("rt")->Get<f::LoDTensor>();
auto *rt = recv_tensor.data<float>();
const int idx = 1;
// check results on
float result = std::accumulate(gpu_list.begin(), gpu_list.end(), 0);
p::CPUPlace cpu_place;
auto *result_tensor = dev_scopes[i]->Var("ct")->GetMutable<f::LoDTensor>();
p::GPUPlace gpu_place(gpu_list[idx]);
auto &recv_tensor = dev_scopes[idx]->FindVar("rt")->Get<f::LoDTensor>();
auto *rt = recv_tensor.data<float>();
auto *result_tensor = dev_scopes[idx]->Var("ct")->GetMutable<f::LoDTensor>();
result_tensor->Resize(kDims);
auto *ct = result_tensor->mutable_data<float>(cpu_place);
paddle::memory::Copy(
cpu_place, ct, p::GPUPlace(gpu_list[i]), rt,
cpu_place, ct, p::GPUPlace(gpu_list[idx]), rt,
recv_tensor.numel() * sizeof(float),
static_cast<p::CUDADeviceContext *>(dev_ctxs[i])->stream());
static_cast<p::CUDADeviceContext *>(dev_ctxs[idx])->stream());
for (size_t j = 0; j < f::product(kDims); ++j) {
ASSERT_NEAR(ct[j], result, 1e-5);
}
}
}
// ncclBcastOp with desc
TEST(NCCL, ncclBcastOp) {
// joint ncclBcastOp and ncclReduceOp
TEST_F(NCCLTester, MultipleOp) {
const int kRoot = 0;
std::unique_ptr<f::OpDescBind> op1(new f::OpDescBind);
op1->SetType("ncclBcastSend");
op1->SetInput("X", {"st"});
op1->SetType("ncclReduce");
op1->SetInput("X", {"rt"});
op1->SetInput("Communicator", {"comm"});
op1->SetOutput("Out", {"rt"});
op2->SetAttr("root", {kRoot});
std::unique_ptr<f::OpDescBind> op2(new f::OpDescBind);
op2->SetType("ncclBcastRecv");
op2->SetType("ncclBcast");
op2->SetInput("X", {"st"});
op2->SetInput("Communicator", {"comm"});
op2->SetOutput("Out", {"rt"});
op2->SetAttr("root", {kRoot});
std::vector<f::Scope *> dev_scopes;
std::vector<std::thread> ths;
for (size_t i = 1; i < gpu_list.size(); ++i) {
// run Bcast
for (size_t i = 0; i < gpu_list.size(); ++i) {
dev_scopes.emplace_back(&g_scope.NewScope());
std::thread th(&NCCLTester::PerThreadProgram<float>, this, gpu_list[i],
*op2.get(), &g_scope.NewScope());
*op1.get(), dev_scopes[i]);
ths.emplace_back(std::move(th));
}
for (size_t i = 0; i < gpu_list.size(); ++i) {
ths[i].join();
}
}
// joint ncclBcastOp and ncclReduceOp
// TEST(NCCL, MultipleOp) {
// std::unique_ptr<f::OpDescBind> op2(new f::OpDescBind);
// op2->SetType("ncclBcastSend");
// op2->SetInput("X", {"st"});
// op2->SetInput("Communicator", {"comm"});
ths.clear();
// std::unique_ptr<f::OpDescBind> op2(new f::OpDescBind);
// op2->SetType("ncclBcastRecv");
// op2->SetInput("Communicator", {"comm"});
// op2->SetOutput("Out", {"rt"});
// run Reduce
for (size_t i = 0; i < gpu_list.size(); ++i) {
dev_scopes.emplace_back(&g_scope.NewScope());
std::thread th(&NCCLTester::PerThreadProgram<float>, this, gpu_list[i],
*op2.get(), dev_scopes[i]);
ths.emplace_back(std::move(th));
}
// std::vector<std::thread> ths;
// for (size_t i = 0; i < gpu_list.size(); ++i) {
// std::thread th(&NCCLTester::PerThreadProgram<float>, this, gpu_list[i],
// *op2.get(),
// &g_scope.NewScope());
// ths.emplace_back(std::move(th));
// }
for (size_t i = 0; i < gpu_list.size(); ++i) {
ths[i].join();
}
// for (size_t i = 0; i < gpu_list.size(); ++i) {
// ths[i].join();
// }
// }
// check results
float result = std::accumulate(gpu_list.begin(), gpu_list.end(), 0);
for (size_t i = 0; i < dev_scopes.size(); ++i) {
p::CPUPlace cpu_place;
p::GPUPlace gpu_place(gpu_list[i]);
auto &recv_tensor = dev_scopes[i]->FindVar("rt")->Get<f::LoDTensor>();
auto *rt = recv_tensor.data<float>();
auto *result_tensor = dev_scopes[i]->Var("ct")->GetMutable<f::LoDTensor>();
result_tensor->Resize(kDims);
auto *ct = result_tensor->mutable_data<float>(cpu_place);
paddle::memory::Copy(
cpu_place, ct, p::GPUPlace(gpu_list[i]), rt,
recv_tensor.numel() * sizeof(float),
static_cast<p::CUDADeviceContext *>(dev_ctxs[i])->stream());
for (int j = 0; j < f::product(kDims); ++j) {
ASSERT_NEAR(ct[j], result, 1e-5);
}
}
}
int main(int argc, char **argv) {
const int dev_count = p::GetCUDADeviceCount();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册