提交 2573ac14 编写于 作者: D Dong Zhihong

"remove python side test case to another PR."

上级 4e165f4e
......@@ -126,201 +126,42 @@ class NCCLTester : public ::testing::Test {
std::mutex mu;
};
// // ncclInitOp with desc
// TEST(NCCL, ncclInitOp) {
// std::unique_ptr<f::OpDescBind> op_desc(new f::OpDescBind);
// op_desc->SetType("ncclInit");
// op_desc->SetOutput("Communicator", {"x1"});
// op_desc->SetAttr("gpus", {gpu_list});
// f::Scope g_scope;
// std::unique_ptr<p::DeviceContext> ctx(new
// p::CPUDeviceContext(p::CPUPlace()));
// auto *var = g_scope.Var("x1");
// var->GetMutable<p::Communicator>();
// auto op = f::OpRegistry::CreateOp(*op_desc);
// VLOG(1) << "invoke NCCLInitOp.";
// op->Run(g_scope, *ctx.get());
// VLOG(1) << "NCCLInitOp finished.";
// }
// // ncclAllReduceOp with desc
// TEST_F(NCCLTester, ncclAllReduceOp) {
// std::unique_ptr<f::OpDescBind> op2(new f::OpDescBind);
// op2->SetType("ncclAllReduce");
// op2->SetInput("X", {"st"});
// op2->SetInput("Communicator", {"comm"});
// op2->SetOutput("Out", {"rt"});
// std::vector<f::Scope *> dev_scopes;
// std::vector<std::thread> ths;
// for (size_t i = 0; i < gpu_list.size(); ++i) {
// dev_scopes.emplace_back(&g_scope.NewScope());
// std::thread th(&NCCLTester::PerThreadProgram<float>, this, gpu_list[i],
// *op2.get(), dev_scopes[i]);
// ths.emplace_back(std::move(th));
// }
// for (size_t i = 0; i < gpu_list.size(); ++i) {
// ths[i].join();
// }
// // check results
// float result = std::accumulate(gpu_list.begin(), gpu_list.end(), 0);
// for (size_t i = 0; i < dev_scopes.size(); ++i) {
// p::CPUPlace cpu_place;
// p::GPUPlace gpu_place(gpu_list[i]);
// auto &recv_tensor = dev_scopes[i]->FindVar("rt")->Get<f::LoDTensor>();
// auto *rt = recv_tensor.data<float>();
// auto *result_tensor =
// dev_scopes[i]->Var("ct")->GetMutable<f::LoDTensor>();
// result_tensor->Resize(kDims);
// auto *ct = result_tensor->mutable_data<float>(cpu_place);
// paddle::memory::Copy(
// cpu_place, ct, p::GPUPlace(gpu_list[i]), rt,
// recv_tensor.numel() * sizeof(float),
// static_cast<p::CUDADeviceContext *>(dev_ctxs[i])->stream());
// for (size_t j = 0; j < f::product(kDims); ++j) {
// ASSERT_NEAR(ct[j], result, 1e-5);
// }
// }
// }
// // ncclAReduceOp with desc
// TEST_F(NCCLTester, ncclReduceOp) {
// std::unique_ptr<f::OpDescBind> op2(new f::OpDescBind);
// const int kRoot = 0;
// op2->SetType("ncclReduce");
// op2->SetInput("X", {"st"});
// op2->SetInput("Communicator", {"comm"});
// op2->SetOutput("Out", {"rt"});
// op2->SetAttr("root", {kRoot});
// std::vector<f::Scope *> dev_scopes;
// std::vector<std::thread> ths;
// for (size_t i = 0; i < gpu_list.size(); ++i) {
// dev_scopes.emplace_back(&g_scope.NewScope());
// std::thread th(&NCCLTester::PerThreadProgram<float>, this, gpu_list[i],
// *op2.get(), dev_scopes[i]);
// ths.emplace_back(std::move(th));
// }
// for (size_t i = 0; i < gpu_list.size(); ++i) {
// ths[i].join();
// }
// // check results on
// float result = std::accumulate(gpu_list.begin(), gpu_list.end(), 0);
// p::CPUPlace cpu_place;
// p::GPUPlace gpu_place(gpu_list[kRoot]);
// auto &recv_tensor = dev_scopes[kRoot]->FindVar("rt")->Get<f::LoDTensor>();
// auto *rt = recv_tensor.data<float>();
// auto *result_tensor =
// dev_scopes[kRoot]->Var("ct")->GetMutable<f::LoDTensor>();
// result_tensor->Resize(kDims);
// auto *ct = result_tensor->mutable_data<float>(cpu_place);
// paddle::memory::Copy(
// cpu_place, ct, p::GPUPlace(gpu_list[kRoot]), rt,
// recv_tensor.numel() * sizeof(float),
// static_cast<p::CUDADeviceContext *>(dev_ctxs[kRoot])->stream());
// for (int j = 0; j < f::product(kDims); ++j) {
// ASSERT_NEAR(ct[j], result, 1e-5);
// }
// }
// // // ncclBcastOp with desc
// TEST_F(NCCLTester, ncclBcastOp) {
// std::unique_ptr<f::OpDescBind> op2(new f::OpDescBind);
// const int kRoot = 5;
// op2->SetType("ncclBcast");
// op2->SetInput("X", {"st"});
// op2->SetInput("Communicator", {"comm"});
// op2->SetOutput("Out", {"rt"});
// op2->SetAttr("root", {kRoot});
// std::vector<f::Scope *> dev_scopes;
// std::vector<std::thread> ths;
// for (size_t i = 0; i < gpu_list.size(); ++i) {
// dev_scopes.emplace_back(&g_scope.NewScope());
// std::thread th(&NCCLTester::PerThreadProgram<float>, this, gpu_list[i],
// *op2.get(), dev_scopes[i]);
// ths.emplace_back(std::move(th));
// }
// for (size_t i = 0; i < gpu_list.size(); ++i) {
// ths[i].join();
// }
// const int idx = 1;
// // check results on
// float result = kRoot;
// p::CPUPlace cpu_place;
// p::GPUPlace gpu_place(gpu_list[idx]);
// auto &recv_tensor = dev_scopes[idx]->FindVar("rt")->Get<f::LoDTensor>();
// auto *rt = recv_tensor.data<float>();
// auto *result_tensor =
// dev_scopes[idx]->Var("ct")->GetMutable<f::LoDTensor>();
// result_tensor->Resize(kDims);
// auto *ct = result_tensor->mutable_data<float>(cpu_place);
// paddle::memory::Copy(
// cpu_place, ct, p::GPUPlace(gpu_list[idx]), rt,
// recv_tensor.numel() * sizeof(float),
// static_cast<p::CUDADeviceContext *>(dev_ctxs[idx])->stream());
// for (size_t j = 0; j < f::product(kDims); ++j) {
// ASSERT_NEAR(ct[j], result, 1e-5);
// }
// }
// joint ncclBcastOp and ncclReduceOp
TEST_F(NCCLTester, MultipleOp) {
const int kRoot = 0;
std::unique_ptr<f::OpDescBind> op1(new f::OpDescBind);
op1->SetType("ncclReduce");
op1->SetInput("X", {"st"});
op1->SetInput("Communicator", {"comm"});
op1->SetOutput("Out", {"rt"});
op1->SetAttr("root", {kRoot});
// ncclInitOp with desc
TEST(NCCL, ncclInitOp) {
std::unique_ptr<f::OpDescBind> op_desc(new f::OpDescBind);
op_desc->SetType("ncclInit");
op_desc->SetOutput("Communicator", {"x1"});
op_desc->SetAttr("gpus", {gpu_list});
f::Scope g_scope;
std::unique_ptr<p::DeviceContext> ctx(new p::CPUDeviceContext(p::CPUPlace()));
auto *var = g_scope.Var("x1");
var->GetMutable<p::Communicator>();
auto op = f::OpRegistry::CreateOp(*op_desc);
VLOG(1) << "invoke NCCLInitOp.";
op->Run(g_scope, *ctx.get());
VLOG(1) << "NCCLInitOp finished.";
}
// ncclAllReduceOp with desc
TEST_F(NCCLTester, ncclAllReduceOp) {
std::unique_ptr<f::OpDescBind> op2(new f::OpDescBind);
op2->SetType("ncclBcast");
op2->SetInput("X", {"rt"});
op2->SetType("ncclAllReduce");
op2->SetInput("X", {"st"});
op2->SetInput("Communicator", {"comm"});
op2->SetOutput("Out", {"out"});
op2->SetAttr("root", {kRoot});
op2->SetOutput("Out", {"rt"});
std::vector<f::Scope *> dev_scopes;
// for (size_t i = 0; i < dev_scopes.size(); ++i) {
// dev_scopes[i]->Var("out")->GetMutable<f::LoDTensor>();
// }
std::vector<std::thread> ths;
// run Reduce
for (size_t i = 0; i < gpu_list.size(); ++i) {
dev_scopes.emplace_back(&g_scope.NewScope());
std::thread th(&NCCLTester::PerThreadProgram<float>, this, gpu_list[i],
*op1.get(), dev_scopes[i]);
*op2.get(), dev_scopes[i]);
ths.emplace_back(std::move(th));
}
......@@ -328,11 +169,46 @@ TEST_F(NCCLTester, MultipleOp) {
ths[i].join();
}
ths.clear();
// check results
float result = std::accumulate(gpu_list.begin(), gpu_list.end(), 0);
for (size_t i = 0; i < dev_scopes.size(); ++i) {
p::CPUPlace cpu_place;
p::GPUPlace gpu_place(gpu_list[i]);
auto &recv_tensor = dev_scopes[i]->FindVar("rt")->Get<f::LoDTensor>();
auto *rt = recv_tensor.data<float>();
auto *result_tensor = dev_scopes[i]->Var("ct")->GetMutable<f::LoDTensor>();
result_tensor->Resize(kDims);
auto *ct = result_tensor->mutable_data<float>(cpu_place);
paddle::memory::Copy(
cpu_place, ct, p::GPUPlace(gpu_list[i]), rt,
recv_tensor.numel() * sizeof(float),
static_cast<p::CUDADeviceContext *>(dev_ctxs[i])->stream());
for (size_t j = 0; j < f::product(kDims); ++j) {
ASSERT_NEAR(ct[j], result, 1e-5);
}
}
}
// ncclAReduceOp with desc
TEST_F(NCCLTester, ncclReduceOp) {
std::unique_ptr<f::OpDescBind> op2(new f::OpDescBind);
const int kRoot = 0;
op2->SetType("ncclReduce");
op2->SetInput("X", {"st"});
op2->SetInput("Communicator", {"comm"});
op2->SetOutput("Out", {"rt"});
op2->SetAttr("root", {kRoot});
std::vector<f::Scope *> dev_scopes;
std::vector<std::thread> ths;
// run Bcast
for (size_t i = 0; i < gpu_list.size(); ++i) {
dev_scopes[i]->Var("out")->GetMutable<f::LoDTensor>();
dev_scopes.emplace_back(&g_scope.NewScope());
std::thread th(&NCCLTester::PerThreadProgram<float>, this, gpu_list[i],
*op2.get(), dev_scopes[i]);
ths.emplace_back(std::move(th));
......@@ -342,27 +218,74 @@ TEST_F(NCCLTester, MultipleOp) {
ths[i].join();
}
// check results
// check results on
float result = std::accumulate(gpu_list.begin(), gpu_list.end(), 0);
for (size_t i = 0; i < dev_scopes.size(); ++i) {
p::CPUPlace cpu_place;
p::GPUPlace gpu_place(gpu_list[i]);
p::GPUPlace gpu_place(gpu_list[kRoot]);
auto &recv_tensor = dev_scopes[i]->FindVar("rt")->Get<f::LoDTensor>();
auto &recv_tensor = dev_scopes[kRoot]->FindVar("rt")->Get<f::LoDTensor>();
auto *rt = recv_tensor.data<float>();
auto *result_tensor = dev_scopes[i]->Var("ct")->GetMutable<f::LoDTensor>();
auto *result_tensor =
dev_scopes[kRoot]->Var("ct")->GetMutable<f::LoDTensor>();
result_tensor->Resize(kDims);
auto *ct = result_tensor->mutable_data<float>(cpu_place);
paddle::memory::Copy(
cpu_place, ct, p::GPUPlace(gpu_list[i]), rt,
cpu_place, ct, p::GPUPlace(gpu_list[kRoot]), rt,
recv_tensor.numel() * sizeof(float),
static_cast<p::CUDADeviceContext *>(dev_ctxs[i])->stream());
static_cast<p::CUDADeviceContext *>(dev_ctxs[kRoot])->stream());
for (int j = 0; j < f::product(kDims); ++j) {
ASSERT_NEAR(ct[j], result, 1e-5);
}
}
// // ncclBcastOp with desc
TEST_F(NCCLTester, ncclBcastOp) {
std::unique_ptr<f::OpDescBind> op2(new f::OpDescBind);
const int kRoot = 5;
op2->SetType("ncclBcast");
op2->SetInput("X", {"st"});
op2->SetInput("Communicator", {"comm"});
op2->SetOutput("Out", {"rt"});
op2->SetAttr("root", {kRoot});
std::vector<f::Scope *> dev_scopes;
std::vector<std::thread> ths;
for (size_t i = 0; i < gpu_list.size(); ++i) {
dev_scopes.emplace_back(&g_scope.NewScope());
std::thread th(&NCCLTester::PerThreadProgram<float>, this, gpu_list[i],
*op2.get(), dev_scopes[i]);
ths.emplace_back(std::move(th));
}
for (size_t i = 0; i < gpu_list.size(); ++i) {
ths[i].join();
}
const int idx = 1;
// check results on
float result = kRoot;
p::CPUPlace cpu_place;
p::GPUPlace gpu_place(gpu_list[idx]);
auto &recv_tensor = dev_scopes[idx]->FindVar("rt")->Get<f::LoDTensor>();
auto *rt = recv_tensor.data<float>();
auto *result_tensor = dev_scopes[idx]->Var("ct")->GetMutable<f::LoDTensor>();
result_tensor->Resize(kDims);
auto *ct = result_tensor->mutable_data<float>(cpu_place);
paddle::memory::Copy(
cpu_place, ct, p::GPUPlace(gpu_list[idx]), rt,
recv_tensor.numel() * sizeof(float),
static_cast<p::CUDADeviceContext *>(dev_ctxs[idx])->stream());
for (size_t j = 0; j < f::product(kDims); ++j) {
ASSERT_NEAR(ct[j], result, 1e-5);
}
}
......
import unittest, os
from threading import Thread
import numpy as np
import paddle.v2 as paddle
from paddle.v2.framework.op import Operator
import paddle.v2.framework.core as core
from op_test import OpTest, create_op, set_input
# gpu_list = os.environ["NV_LIST"]
gpu_list = "0,1,2,3"
if not core.is_compile_gpu() or not gpu_list:
exit(0)
g_scope = core.Scope()
g_ctx = core.DeviceContext.create(core.CPUPlace())
gpus = [int(g) for g in gpu_list.split(",")]
# ground truth
def allreduce(tensors, gpus):
num_device = len(gpus)
assert (len(tensors) == num_device), "not match of tensor and device"
Out = tensors
for i in range(1, len(tensors)):
Out[0] += Out[i]
for i in range(1, len(tensors)):
Out[i] = Out[0]
return Out
input_data = [
np.random.random((32, 32)).astype("float32") for i in range(len(gpus))
]
output_data = allreduce(input_data, gpus)
def thread_allreduce_op(thread_id, gpu_id):
i = gpu_id
scope = g_scope.new_scope()
place = core.GPUPlace(gpus[i])
inputs = {
"X": input_data[i],
"Communicator": scope.find_var("Communicator")
}
outputs = {"Out": output_data[i]}
op = create_op(scope, "ncclAllReduce", inputs, outputs, attrs={})
place = core.GPUPlace(gpus[i])
set_input(scope, op, inputs, place)
ctx = core.DeviceContext.create(place)
print "thread_id : ", thread_id, "gpu_id : ", gpu_id, " invoke allreduce"
op.run(scope, ctx)
print "thread_id : ", thread_id, "gpu_id : ", gpu_id, " allreduce Done."
class TestNCCLAllReduce(unittest.TestCase):
def setUp(self):
self.op_type = "ncclAllReduce"
nccl_init = create_op(
g_scope,
op_type="ncclInit",
inputs={},
outputs={
"Communicator": g_scope.var("Communicator").get_communicator()
},
attrs={"gpus": gpus})
nccl_init.run(g_scope, g_ctx)
def test_output(self):
ops = []
for i in range(len(gpus)):
th = Thread(
target=thread_allreduce_op, args=(
i,
gpus[i], ))
th.start()
ops.append(th)
for t in ops:
t.join()
idx = 0
for out_name, out_dup in Operator.get_op_outputs(self.op_type):
actual = np.array(g_scope.find_var(out_name).get_tensor())
expect = output_data[idx]
idx += 1
self.assertTrue(actual, expect), "has diff"
if __name__ == "__main__":
unittest.main()
import unittest, os
import numpy as np
import paddle.v2 as paddle
from paddle.v2.framework.op import Operator
import paddle.v2.framework.core as core
from op_test import OpTest, create_op, set_input
gpu_list = "0,1,2,3"
g_scope = core.Scope()
g_ctx = core.DeviceContext.create(core.CPUPlace())
if not core.is_compile_gpu() or not gpu_list:
exit(0)
class TestNCCLReduce(OpTest):
def setUp(self):
self.op_type = "ncclReduce"
self.gpus = [int(g) for g in gpu_list.split(",")]
self.scope = g_scope.var("Communicator").get_communicator()
self.outputs = {"Communicator": self.scope.var("Communicator")}
def test_check_output(self):
self.check_output()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册