diff --git a/paddle/fluid/operators/send_vars_op.cc b/paddle/fluid/operators/send_vars_op.cc index af791bc8e2ffe5f04acd2b53294c8df0594ae3b6..523e9e27808e428acb7900fe90a29de80f316bfb 100644 --- a/paddle/fluid/operators/send_vars_op.cc +++ b/paddle/fluid/operators/send_vars_op.cc @@ -53,7 +53,7 @@ class SendVarsOp : public framework::OperatorBase { auto ins = Inputs("X"); std::vector epmap = Attr>("epmap"); - int flag_wait = Attr("wait"); + int sync_send = Attr("sync_sent"); platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); auto& ctx = *pool.Get(place); @@ -68,12 +68,14 @@ class SendVarsOp : public framework::OperatorBase { for (size_t i = 0; i < ins.size(); i++) { if (NeedSend(scope, ins[i])) { VLOG(3) << "sending " << ins[i] << " to " << epmap[i]; + // TODO(Yancey1989): we need to use an IO threadpool which has + // a larger number of threads than the computing threadpool. rpc_client->AsyncSendVariable(epmap[i], ctx, scope, ins[i]); } else { VLOG(3) << "don't send no-initialied variable: " << ins[i]; } } - if (flag_wait) { + if (sync_send) { rpc_client->Wait(); } } @@ -86,16 +88,16 @@ class SendVarsOpMaker : public framework::OpProtoAndCheckerMaker { AddInput("X", "(Tensor, SelectedRows) Input variables to be sent") .AsDuplicable(); AddOutput("RPCClient", - "(RPCClient) The RPC client object which is" + "(RPCClient) The RPC client object which will be" "initialized at most once."); AddComment(R"DOC( Send operator This operator will send variables to listen_and_serve op at the parameter server. )DOC"); - AddAttr("wait", + AddAttr("ync_send", "(int, default 0)" - "whether watting for all send request have been sent.") + "sync send or async send.") .SetDefault(0); AddAttr>("epmap", "(string vector, default 127.0.0.1:6164)"