diff --git a/paddle/fluid/operators/detail/grpc_server.h b/paddle/fluid/operators/detail/grpc_server.h index f891c75dbc81a5cdb5274bbae84e9e85f42464fb..10e6dd45a901d36de4a6577db4da05551645eb73 100644 --- a/paddle/fluid/operators/detail/grpc_server.h +++ b/paddle/fluid/operators/detail/grpc_server.h @@ -22,9 +22,10 @@ limitations under the License. */ #include "paddle/fluid/framework/selected_rows.h" #include "paddle/fluid/framework/var_type.h" #include "paddle/fluid/operators/detail/grpc_service.h" -#include "paddle/fluid/operators/detail/grpc_service.h" #include "paddle/fluid/operators/detail/send_recv.grpc.pb.h" #include "paddle/fluid/operators/detail/send_recv.pb.h" +#include "paddle/fluid/operators/detail/sendrecvop_utils.h" +#include "paddle/fluid/operators/detail/simple_block_queue.h" namespace paddle { namespace operators { diff --git a/paddle/fluid/operators/detail/test_serde.cc b/paddle/fluid/operators/detail/test_serde.cc index 99c1577223c4fe8001b4ce651b10e9e1f0024296..e646c894d18d37f5343a10df2542a0e46ab13372 100644 --- a/paddle/fluid/operators/detail/test_serde.cc +++ b/paddle/fluid/operators/detail/test_serde.cc @@ -199,9 +199,9 @@ TEST(LodTensor, Run) { RunTestLodTensor(place); RunTestLodTensor(place, 1); #ifdef PADDLE_WITH_CUDA - platform::CUDAPlace place; - RunTestLodTensor(place); - RunTestLodTensor(place, 1); + platform::CUDAPlace gpu(0); + RunTestLodTensor(gpu); + RunTestLodTensor(gpu, 1); #endif } @@ -210,7 +210,7 @@ TEST(SelectedRows, Run) { RunSerdeTestSelectedRows(place); #ifdef PADDLE_WITH_CUDA - platform::CUDAPlace place; - RunSerdeTestSelectedRows(place); + platform::CUDAPlace gpu; + RunSerdeTestSelectedRows(gpu); #endif } diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index d8a3c45ac5bf64c0613fa5f3de1684954d3c056a..08b83375dd5462e67c3da2c6c7401dd5e54793f0 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -93,12 +93,6 @@ class ListenAndServOp : public framework::OperatorBase { "server program should have at least 2 blocks"); framework::Executor executor(dev_place); - std::vector blk_ctx_list; - blk_ctx_list.push_back(nullptr); // block0 is not used. - for (int blkid = 1; blkid < num_blocks; ++blkid) { - auto *exe_ctx = executor.Prepare(*program, blkid); - blk_ctx_list.push_back(exe_ctx); - } // TODO(typhoonzero): change this to a while_op for every cluster-batch. bool exit_flag = false; @@ -149,12 +143,11 @@ class ListenAndServOp : public framework::OperatorBase { std::vector> fs; // block0 contains only listen_and_serv op, start run from block1. for (int blkid = 1; blkid < num_blocks - 1; ++blkid) { - fs.push_back(framework::Async( - [&executor, &program, &recv_scope, &blk_ctx_list, blkid]() { + fs.push_back( + framework::Async([&executor, &program, &recv_scope, blkid]() { int run_block = blkid; // thread local try { - executor.RunPreparedContext(blk_ctx_list[run_block], - &recv_scope, false, false); + executor.Run(*program, &recv_scope, run_block, false, false); } catch (std::exception &e) { LOG(ERROR) << "run sub program error " << e.what(); } @@ -164,8 +157,7 @@ class ListenAndServOp : public framework::OperatorBase { // Run global block at final step, or block1 if there are only 2 blocks if (num_blocks >= 2) { try { - executor.RunPreparedContext(blk_ctx_list[num_blocks - 1], &recv_scope, - false, false); + executor.Run(*program, &recv_scope, num_blocks - 1, false, false); } catch (std::exception &e) { LOG(ERROR) << "run sub program error " << e.what(); } @@ -185,9 +177,9 @@ class ListenAndServOp : public framework::OperatorBase { sparse_vars.clear(); } // while(true) - for (int i = 0; i < num_blocks; ++i) { - delete blk_ctx_list[i]; - } + // for (int i = 0; i < num_blocks; ++i) { + // delete blk_ctx_list[i]; + // } } protected: