diff --git a/paddle/fluid/operators/send_recv_op_test.cc b/paddle/fluid/operators/send_recv_op_test.cc index 2b3dc81676f8f2518b02b892d2da841a58ea76e4..e550552b195b768d68ec64e9c3b5889b56ca719f 100644 --- a/paddle/fluid/operators/send_recv_op_test.cc +++ b/paddle/fluid/operators/send_recv_op_test.cc @@ -156,6 +156,7 @@ TEST(SendRecvOp, CPUDense) { std::thread server_thread(StartServerNet, false, &initialized); while (!initialized) { } + static_cast(listen_and_serv_op.get()) ->WaitServerReady(); @@ -175,77 +176,77 @@ TEST(SendRecvOp, CPUDense) { std::string endpoint = paddle::string::Sprintf("127.0.0.1:%d", selected_port); attrs.insert({"endpoints", std::vector({endpoint})}); attrs.insert({"epmap", std::vector({endpoint})}); - auto send_op = f::OpRegistry::CreateOp( - "send", {{"X", {"x1"}}}, - {{"Out", {"Out"}}, attrs); - send_op->Run(scope, place); - - auto in_var = scope.Var("x1"); - auto tensor = in_var->GetMutable(); - float *expected = tensor->data(); - auto out_var = scope.Var("Out"); - auto target = out_var->GetMutable(); - // x1 * 2 == x0 - EXPECT_NE(target->memory_size(), size_t(0)); - float *actual = target->data(); - for (int64_t i = 0; i < target->numel(); ++i) { - EXPECT_EQ(expected[i] * 2, actual[i]); - } - listen_and_serv_op->Stop(); - server_thread.join(); - listen_and_serv_op.reset(nullptr); - paddle::operators::ListenAndServOp::ResetPort(); + const f::VariableNameMap &inputs = {{"X", {"x1"}}}; + const f::VariableNameMap &outputs = {{"Out", {"Out"}}}; + + auto send_op = f::OpRegistry::CreateOp("send", inputs, outputs, attrs); + send_op->Run(scope, place); + + auto in_var = scope.Var("x1"); + auto tensor = in_var->GetMutable(); + float *expected = tensor->data(); + auto out_var = scope.Var("Out"); + auto target = out_var->GetMutable(); + // x1 * 2 == x0 + EXPECT_NE(target->memory_size(), size_t(0)); + float *actual = target->data(); + for (int64_t i = 0; i < target->numel(); ++i) { + EXPECT_EQ(expected[i] * 2, actual[i]); + } + listen_and_serv_op->Stop(); + server_thread.join(); + listen_and_serv_op.reset(nullptr); + paddle::operators::ListenAndServOp::ResetPort(); } TEST(SendRecvOp, CPUSparse) { - std::atomic initialized; - initialized = false; - std::thread server_thread(StartServerNet, true, &initialized); - while (!initialized) { - } - auto *listen_and_serv_op_ptr = - static_cast( - listen_and_serv_op.get()); - ASSERT_TRUE(listen_and_serv_op_ptr != nullptr); - listen_and_serv_op_ptr->WaitServerReady(); - - // local net - f::Scope scope; - p::CPUPlace place; - p::CPUDeviceContext ctx(place); - InitSelectedRowsInScope(place, &scope); - scope.Var("RPC_CLIENT_VAR"); - f::AttributeMap attrs; - selected_port = listen_and_serv_op_ptr->GetSelectedPort(); - std::string endpoint = - paddle::string::Sprintf("127.0.0.1:%d", selected_port); - attrs.insert({"endpoints", std::vector({endpoint})}); - attrs.insert({"epmap", std::vector({endpoint})}); - auto send_op = f::OpRegistry::CreateOp("send", {{"X", {"x1"}}}, - {{"Out", {"Out"}}}, attrs); - send_op->Run(scope, place); - - auto x0 = scope.Var("x0")->GetMutable(); - auto x1 = scope.Var("x1")->GetMutable(); - auto out = scope.Var("Out")->GetMutable(); - auto actual = out->mutable_value(); - - std::unique_ptr expect{new f::SelectedRows()}; - auto expect_value = expect->mutable_value(); - expect_value->mutable_data(f::make_ddim({5, 10}), place); - - m::SelectedRowsAdd add_functor; - add_functor(ctx, *x0, *x1, expect.get()); - - EXPECT_EQ(actual->numel(), expect_value->numel()); - EXPECT_EQ(out->rows().size(), x0->rows().size() + x1->rows().size()); - - for (int64_t i = 0; i < expect_value->numel(); ++i) { - EXPECT_EQ(expect_value->mutable_data(place)[i], - actual->mutable_data(place)[i]); - } - listen_and_serv_op->Stop(); - server_thread.join(); - listen_and_serv_op.reset(); - paddle::operators::ListenAndServOp::ResetPort(); + std::atomic initialized; + initialized = false; + std::thread server_thread(StartServerNet, true, &initialized); + while (!initialized) { + } + auto *listen_and_serv_op_ptr = + static_cast( + listen_and_serv_op.get()); + ASSERT_TRUE(listen_and_serv_op_ptr != nullptr); + listen_and_serv_op_ptr->WaitServerReady(); + + // local net + f::Scope scope; + p::CPUPlace place; + p::CPUDeviceContext ctx(place); + InitSelectedRowsInScope(place, &scope); + scope.Var("RPC_CLIENT_VAR"); + f::AttributeMap attrs; + selected_port = listen_and_serv_op_ptr->GetSelectedPort(); + std::string endpoint = paddle::string::Sprintf("127.0.0.1:%d", selected_port); + attrs.insert({"endpoints", std::vector({endpoint})}); + attrs.insert({"epmap", std::vector({endpoint})}); + auto send_op = f::OpRegistry::CreateOp("send", {{"X", {"x1"}}}, + {{"Out", {"Out"}}}, attrs); + send_op->Run(scope, place); + + auto x0 = scope.Var("x0")->GetMutable(); + auto x1 = scope.Var("x1")->GetMutable(); + auto out = scope.Var("Out")->GetMutable(); + auto actual = out->mutable_value(); + + std::unique_ptr expect{new f::SelectedRows()}; + auto expect_value = expect->mutable_value(); + expect_value->mutable_data(f::make_ddim({5, 10}), place); + + m::SelectedRowsAdd add_functor; + add_functor(ctx, *x0, *x1, expect.get()); + + EXPECT_EQ(actual->numel(), expect_value->numel()); + EXPECT_EQ(out->rows().size(), x0->rows().size() + x1->rows().size()); + + for (int64_t i = 0; i < expect_value->numel(); ++i) { + EXPECT_EQ(expect_value->mutable_data(place)[i], + actual->mutable_data(place)[i]); + } + listen_and_serv_op->Stop(); + server_thread.join(); + listen_and_serv_op.reset(); + paddle::operators::ListenAndServOp::ResetPort(); }