diff --git a/paddle/operators/nccl_op_test.cu b/paddle/operators/nccl_op_test.cu index 1132c3d43d97a89bfc7adc0ab02059d9aab60f8d..63a286f602024e87a2589286d30102290abb3b30 100644 --- a/paddle/operators/nccl_op_test.cu +++ b/paddle/operators/nccl_op_test.cu @@ -126,213 +126,40 @@ class NCCLTester : public ::testing::Test { std::mutex mu; }; -// // ncclInitOp with desc -// TEST(NCCL, ncclInitOp) { -// std::unique_ptr op_desc(new f::OpDescBind); - -// op_desc->SetType("ncclInit"); -// op_desc->SetOutput("Communicator", {"x1"}); -// op_desc->SetAttr("gpus", {gpu_list}); - -// f::Scope g_scope; -// std::unique_ptr ctx(new -// p::CPUDeviceContext(p::CPUPlace())); - -// auto *var = g_scope.Var("x1"); -// var->GetMutable(); - -// auto op = f::OpRegistry::CreateOp(*op_desc); -// VLOG(1) << "invoke NCCLInitOp."; -// op->Run(g_scope, *ctx.get()); -// VLOG(1) << "NCCLInitOp finished."; -// } - -// // ncclAllReduceOp with desc -// TEST_F(NCCLTester, ncclAllReduceOp) { -// std::unique_ptr op2(new f::OpDescBind); -// op2->SetType("ncclAllReduce"); -// op2->SetInput("X", {"st"}); -// op2->SetInput("Communicator", {"comm"}); -// op2->SetOutput("Out", {"rt"}); - -// std::vector dev_scopes; - -// std::vector ths; - -// for (size_t i = 0; i < gpu_list.size(); ++i) { -// dev_scopes.emplace_back(&g_scope.NewScope()); -// std::thread th(&NCCLTester::PerThreadProgram, this, gpu_list[i], -// *op2.get(), dev_scopes[i]); -// ths.emplace_back(std::move(th)); -// } - -// for (size_t i = 0; i < gpu_list.size(); ++i) { -// ths[i].join(); -// } - -// // check results -// float result = std::accumulate(gpu_list.begin(), gpu_list.end(), 0); - -// for (size_t i = 0; i < dev_scopes.size(); ++i) { -// p::CPUPlace cpu_place; -// p::GPUPlace gpu_place(gpu_list[i]); - -// auto &recv_tensor = dev_scopes[i]->FindVar("rt")->Get(); -// auto *rt = recv_tensor.data(); -// auto *result_tensor = -// dev_scopes[i]->Var("ct")->GetMutable(); -// result_tensor->Resize(kDims); -// auto *ct = result_tensor->mutable_data(cpu_place); - -// paddle::memory::Copy( -// cpu_place, ct, p::GPUPlace(gpu_list[i]), rt, -// recv_tensor.numel() * sizeof(float), -// static_cast(dev_ctxs[i])->stream()); - -// for (size_t j = 0; j < f::product(kDims); ++j) { -// ASSERT_NEAR(ct[j], result, 1e-5); -// } -// } -// } - -// // ncclAReduceOp with desc -// TEST_F(NCCLTester, ncclReduceOp) { -// std::unique_ptr op2(new f::OpDescBind); -// const int kRoot = 0; -// op2->SetType("ncclReduce"); -// op2->SetInput("X", {"st"}); -// op2->SetInput("Communicator", {"comm"}); -// op2->SetOutput("Out", {"rt"}); -// op2->SetAttr("root", {kRoot}); - -// std::vector dev_scopes; - -// std::vector ths; - -// for (size_t i = 0; i < gpu_list.size(); ++i) { -// dev_scopes.emplace_back(&g_scope.NewScope()); -// std::thread th(&NCCLTester::PerThreadProgram, this, gpu_list[i], -// *op2.get(), dev_scopes[i]); -// ths.emplace_back(std::move(th)); -// } - -// for (size_t i = 0; i < gpu_list.size(); ++i) { -// ths[i].join(); -// } - -// // check results on -// float result = std::accumulate(gpu_list.begin(), gpu_list.end(), 0); - -// p::CPUPlace cpu_place; -// p::GPUPlace gpu_place(gpu_list[kRoot]); - -// auto &recv_tensor = dev_scopes[kRoot]->FindVar("rt")->Get(); -// auto *rt = recv_tensor.data(); -// auto *result_tensor = -// dev_scopes[kRoot]->Var("ct")->GetMutable(); -// result_tensor->Resize(kDims); -// auto *ct = result_tensor->mutable_data(cpu_place); - -// paddle::memory::Copy( -// cpu_place, ct, p::GPUPlace(gpu_list[kRoot]), rt, -// recv_tensor.numel() * sizeof(float), -// static_cast(dev_ctxs[kRoot])->stream()); - -// for (int j = 0; j < f::product(kDims); ++j) { -// ASSERT_NEAR(ct[j], result, 1e-5); -// } -// } - -// // // ncclBcastOp with desc -// TEST_F(NCCLTester, ncclBcastOp) { -// std::unique_ptr op2(new f::OpDescBind); -// const int kRoot = 5; -// op2->SetType("ncclBcast"); -// op2->SetInput("X", {"st"}); -// op2->SetInput("Communicator", {"comm"}); -// op2->SetOutput("Out", {"rt"}); -// op2->SetAttr("root", {kRoot}); - -// std::vector dev_scopes; - -// std::vector ths; - -// for (size_t i = 0; i < gpu_list.size(); ++i) { -// dev_scopes.emplace_back(&g_scope.NewScope()); -// std::thread th(&NCCLTester::PerThreadProgram, this, gpu_list[i], -// *op2.get(), dev_scopes[i]); -// ths.emplace_back(std::move(th)); -// } - -// for (size_t i = 0; i < gpu_list.size(); ++i) { -// ths[i].join(); -// } - -// const int idx = 1; -// // check results on -// float result = kRoot; - -// p::CPUPlace cpu_place; -// p::GPUPlace gpu_place(gpu_list[idx]); - -// auto &recv_tensor = dev_scopes[idx]->FindVar("rt")->Get(); -// auto *rt = recv_tensor.data(); -// auto *result_tensor = -// dev_scopes[idx]->Var("ct")->GetMutable(); -// result_tensor->Resize(kDims); -// auto *ct = result_tensor->mutable_data(cpu_place); - -// paddle::memory::Copy( -// cpu_place, ct, p::GPUPlace(gpu_list[idx]), rt, -// recv_tensor.numel() * sizeof(float), -// static_cast(dev_ctxs[idx])->stream()); - -// for (size_t j = 0; j < f::product(kDims); ++j) { -// ASSERT_NEAR(ct[j], result, 1e-5); -// } -// } - -// joint ncclBcastOp and ncclReduceOp -TEST_F(NCCLTester, MultipleOp) { - const int kRoot = 0; - std::unique_ptr op1(new f::OpDescBind); - op1->SetType("ncclReduce"); - op1->SetInput("X", {"st"}); - op1->SetInput("Communicator", {"comm"}); - op1->SetOutput("Out", {"rt"}); - op1->SetAttr("root", {kRoot}); +// ncclInitOp with desc +TEST(NCCL, ncclInitOp) { + std::unique_ptr op_desc(new f::OpDescBind); + + op_desc->SetType("ncclInit"); + op_desc->SetOutput("Communicator", {"x1"}); + op_desc->SetAttr("gpus", {gpu_list}); + + f::Scope g_scope; + std::unique_ptr ctx(new p::CPUDeviceContext(p::CPUPlace())); + + auto *var = g_scope.Var("x1"); + var->GetMutable(); + auto op = f::OpRegistry::CreateOp(*op_desc); + VLOG(1) << "invoke NCCLInitOp."; + op->Run(g_scope, *ctx.get()); + VLOG(1) << "NCCLInitOp finished."; +} + +// ncclAllReduceOp with desc +TEST_F(NCCLTester, ncclAllReduceOp) { std::unique_ptr op2(new f::OpDescBind); - op2->SetType("ncclBcast"); - op2->SetInput("X", {"rt"}); + op2->SetType("ncclAllReduce"); + op2->SetInput("X", {"st"}); op2->SetInput("Communicator", {"comm"}); - op2->SetOutput("Out", {"out"}); - op2->SetAttr("root", {kRoot}); + op2->SetOutput("Out", {"rt"}); std::vector dev_scopes; - // for (size_t i = 0; i < dev_scopes.size(); ++i) { - // dev_scopes[i]->Var("out")->GetMutable(); - // } std::vector ths; - // run Reduce for (size_t i = 0; i < gpu_list.size(); ++i) { dev_scopes.emplace_back(&g_scope.NewScope()); - std::thread th(&NCCLTester::PerThreadProgram, this, gpu_list[i], - *op1.get(), dev_scopes[i]); - ths.emplace_back(std::move(th)); - } - - for (size_t i = 0; i < gpu_list.size(); ++i) { - ths[i].join(); - } - - ths.clear(); - - // run Bcast - for (size_t i = 0; i < gpu_list.size(); ++i) { - dev_scopes[i]->Var("out")->GetMutable(); std::thread th(&NCCLTester::PerThreadProgram, this, gpu_list[i], *op2.get(), dev_scopes[i]); ths.emplace_back(std::move(th)); @@ -360,12 +187,108 @@ TEST_F(NCCLTester, MultipleOp) { recv_tensor.numel() * sizeof(float), static_cast(dev_ctxs[i])->stream()); - for (int j = 0; j < f::product(kDims); ++j) { + for (size_t j = 0; j < f::product(kDims); ++j) { ASSERT_NEAR(ct[j], result, 1e-5); } } } +// ncclAReduceOp with desc +TEST_F(NCCLTester, ncclReduceOp) { + std::unique_ptr op2(new f::OpDescBind); + const int kRoot = 0; + op2->SetType("ncclReduce"); + op2->SetInput("X", {"st"}); + op2->SetInput("Communicator", {"comm"}); + op2->SetOutput("Out", {"rt"}); + op2->SetAttr("root", {kRoot}); + + std::vector dev_scopes; + + std::vector ths; + + for (size_t i = 0; i < gpu_list.size(); ++i) { + dev_scopes.emplace_back(&g_scope.NewScope()); + std::thread th(&NCCLTester::PerThreadProgram, this, gpu_list[i], + *op2.get(), dev_scopes[i]); + ths.emplace_back(std::move(th)); + } + + for (size_t i = 0; i < gpu_list.size(); ++i) { + ths[i].join(); + } + + // check results on + float result = std::accumulate(gpu_list.begin(), gpu_list.end(), 0); + + p::CPUPlace cpu_place; + p::GPUPlace gpu_place(gpu_list[kRoot]); + + auto &recv_tensor = dev_scopes[kRoot]->FindVar("rt")->Get(); + auto *rt = recv_tensor.data(); + auto *result_tensor = + dev_scopes[kRoot]->Var("ct")->GetMutable(); + result_tensor->Resize(kDims); + auto *ct = result_tensor->mutable_data(cpu_place); + + paddle::memory::Copy( + cpu_place, ct, p::GPUPlace(gpu_list[kRoot]), rt, + recv_tensor.numel() * sizeof(float), + static_cast(dev_ctxs[kRoot])->stream()); + + for (int j = 0; j < f::product(kDims); ++j) { + ASSERT_NEAR(ct[j], result, 1e-5); + } +} + +// // ncclBcastOp with desc +TEST_F(NCCLTester, ncclBcastOp) { + std::unique_ptr op2(new f::OpDescBind); + const int kRoot = 5; + op2->SetType("ncclBcast"); + op2->SetInput("X", {"st"}); + op2->SetInput("Communicator", {"comm"}); + op2->SetOutput("Out", {"rt"}); + op2->SetAttr("root", {kRoot}); + + std::vector dev_scopes; + + std::vector ths; + + for (size_t i = 0; i < gpu_list.size(); ++i) { + dev_scopes.emplace_back(&g_scope.NewScope()); + std::thread th(&NCCLTester::PerThreadProgram, this, gpu_list[i], + *op2.get(), dev_scopes[i]); + ths.emplace_back(std::move(th)); + } + + for (size_t i = 0; i < gpu_list.size(); ++i) { + ths[i].join(); + } + + const int idx = 1; + // check results on + float result = kRoot; + + p::CPUPlace cpu_place; + p::GPUPlace gpu_place(gpu_list[idx]); + + auto &recv_tensor = dev_scopes[idx]->FindVar("rt")->Get(); + auto *rt = recv_tensor.data(); + auto *result_tensor = dev_scopes[idx]->Var("ct")->GetMutable(); + result_tensor->Resize(kDims); + auto *ct = result_tensor->mutable_data(cpu_place); + + paddle::memory::Copy( + cpu_place, ct, p::GPUPlace(gpu_list[idx]), rt, + recv_tensor.numel() * sizeof(float), + static_cast(dev_ctxs[idx])->stream()); + + for (size_t j = 0; j < f::product(kDims); ++j) { + ASSERT_NEAR(ct[j], result, 1e-5); + } +} + int main(int argc, char **argv) { const int dev_count = p::GetCUDADeviceCount(); if (dev_count <= 1) { diff --git a/python/paddle/v2/framework/tests/test_nccl_allreduce_op.py b/python/paddle/v2/framework/tests/test_nccl_allreduce_op.py deleted file mode 100644 index 0a9163dd55c587ad8c56b4f664c17267ba27325b..0000000000000000000000000000000000000000 --- a/python/paddle/v2/framework/tests/test_nccl_allreduce_op.py +++ /dev/null @@ -1,97 +0,0 @@ -import unittest, os -from threading import Thread -import numpy as np -import paddle.v2 as paddle -from paddle.v2.framework.op import Operator -import paddle.v2.framework.core as core -from op_test import OpTest, create_op, set_input - -# gpu_list = os.environ["NV_LIST"] -gpu_list = "0,1,2,3" - -if not core.is_compile_gpu() or not gpu_list: - exit(0) - -g_scope = core.Scope() -g_ctx = core.DeviceContext.create(core.CPUPlace()) -gpus = [int(g) for g in gpu_list.split(",")] - - -# ground truth -def allreduce(tensors, gpus): - num_device = len(gpus) - assert (len(tensors) == num_device), "not match of tensor and device" - Out = tensors - for i in range(1, len(tensors)): - Out[0] += Out[i] - - for i in range(1, len(tensors)): - Out[i] = Out[0] - - return Out - - -input_data = [ - np.random.random((32, 32)).astype("float32") for i in range(len(gpus)) -] -output_data = allreduce(input_data, gpus) - - -def thread_allreduce_op(thread_id, gpu_id): - i = gpu_id - scope = g_scope.new_scope() - place = core.GPUPlace(gpus[i]) - inputs = { - "X": input_data[i], - "Communicator": scope.find_var("Communicator") - } - outputs = {"Out": output_data[i]} - - op = create_op(scope, "ncclAllReduce", inputs, outputs, attrs={}) - place = core.GPUPlace(gpus[i]) - set_input(scope, op, inputs, place) - - ctx = core.DeviceContext.create(place) - - print "thread_id : ", thread_id, "gpu_id : ", gpu_id, " invoke allreduce" - op.run(scope, ctx) - print "thread_id : ", thread_id, "gpu_id : ", gpu_id, " allreduce Done." - - -class TestNCCLAllReduce(unittest.TestCase): - def setUp(self): - self.op_type = "ncclAllReduce" - - nccl_init = create_op( - g_scope, - op_type="ncclInit", - inputs={}, - outputs={ - "Communicator": g_scope.var("Communicator").get_communicator() - }, - attrs={"gpus": gpus}) - nccl_init.run(g_scope, g_ctx) - - def test_output(self): - ops = [] - for i in range(len(gpus)): - th = Thread( - target=thread_allreduce_op, args=( - i, - gpus[i], )) - th.start() - ops.append(th) - for t in ops: - t.join() - - idx = 0 - for out_name, out_dup in Operator.get_op_outputs(self.op_type): - actual = np.array(g_scope.find_var(out_name).get_tensor()) - expect = output_data[idx] - - idx += 1 - self.assertTrue(actual, expect), "has diff" - - -if __name__ == "__main__": - unittest.main() diff --git a/python/paddle/v2/framework/tests/test_nccl_reduce_op.py b/python/paddle/v2/framework/tests/test_nccl_reduce_op.py deleted file mode 100644 index 0cee1923a69077c1a3304caaaba114a1a5f81756..0000000000000000000000000000000000000000 --- a/python/paddle/v2/framework/tests/test_nccl_reduce_op.py +++ /dev/null @@ -1,25 +0,0 @@ -import unittest, os -import numpy as np -import paddle.v2 as paddle -from paddle.v2.framework.op import Operator -import paddle.v2.framework.core as core -from op_test import OpTest, create_op, set_input - -gpu_list = "0,1,2,3" -g_scope = core.Scope() -g_ctx = core.DeviceContext.create(core.CPUPlace()) - -if not core.is_compile_gpu() or not gpu_list: - exit(0) - - -class TestNCCLReduce(OpTest): - def setUp(self): - self.op_type = "ncclReduce" - self.gpus = [int(g) for g in gpu_list.split(",")] - - self.scope = g_scope.var("Communicator").get_communicator() - self.outputs = {"Communicator": self.scope.var("Communicator")} - - def test_check_output(self): - self.check_output()