diff --git a/paddle/operators/send_op.cc b/paddle/operators/send_op.cc index bb719dc2a8a577bc042a2a70f7169b7d70f83684..0be3b37859508b21ed4f42ff784b2e714453734c 100644 --- a/paddle/operators/send_op.cc +++ b/paddle/operators/send_op.cc @@ -19,6 +19,7 @@ limitations under the License. */ #include "paddle/framework/lod_tensor.h" #include "paddle/framework/op_registry.h" +#include #include #include "paddle/operators/detail/grpc_client.h" @@ -42,28 +43,35 @@ class SendOp : public framework::OperatorBase { platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); auto& ctx = *pool.Get(place); + + auto client_var_name = Output("RPCClient"); + PADDLE_ENFORCE_NOT_NULL(scope.FindVar(client_var_name), + "Can not find variable '%s' in the scope.", + client_var_name); + auto* client_var = scope.FindVar(client_var_name); + detail::RPCClient* rpc_client = client_var->GetMutable(); + for (size_t i = 0; i < ins.size(); i++) { VLOG(3) << "sending " << ins[i] << " to " << epmap[i]; - client_.AsyncSendVariable(epmap[i], ctx, scope, ins[i]); + rpc_client->AsyncSendVariable(epmap[i], ctx, scope, ins[i]); } - PADDLE_ENFORCE(client_.Wait()); + PADDLE_ENFORCE(rpc_client->Wait()); for (auto& ep : endpoints) { VLOG(3) << "batch barrier, ep: " << ep; - client_.AsyncSendBatchBarrier(ep); + rpc_client->AsyncSendBatchBarrier(ep); } - PADDLE_ENFORCE(client_.Wait()); + PADDLE_ENFORCE(rpc_client->Wait()); for (size_t i = 0; i < outs.size(); i++) { VLOG(3) << "getting " << outs[i] << " from " << epmap[i]; - client_.AsyncGetVariable(epmap[i], ctx, scope, outs[i]); + rpc_client->AsyncGetVariable(epmap[i], ctx, scope, outs[i]); } - - PADDLE_ENFORCE(client_.Wait()); + PADDLE_ENFORCE(rpc_client->Wait()); } private: - mutable detail::RPCClient client_; + // mutable detail::RPCClient client_; }; class SendOpMaker : public framework::OpProtoAndCheckerMaker { @@ -73,6 +81,9 @@ class SendOpMaker : public framework::OpProtoAndCheckerMaker { AddInput("X", "(Tensor) Input tensor to be sent").AsDuplicable(); AddOutput("Out", "(Tensor) Output tensor to be received from server") .AsDuplicable(); + AddOutput("RPCClient", + "(RPCClient) The RPC client object which is" + "initialized at most once."); AddComment(R"DOC( Send operator diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index 77f80442e06cb18402bb1b8b97aa9119c7473f54..a4464a281aae714d79a531ec8a2cf793d6330a12 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -153,11 +153,18 @@ class DistributeTranspiler: self.param_grad_ep_mapping[ep]["params"].append(param) self.param_grad_ep_mapping[ep]["grads"].append(grad) + rpc_client_var = program.global_block().create_var( + name="RPC_CLIENT_VAR", + psersistable=True, + dtype='float32', # dtype and shape is not used in fact + shape=[0]) + # create send_op send_op = program.global_block().append_op( type="send", inputs={"X": send_inputs}, - outputs={"Out": send_outputs}, + outputs={"Out": send_outputs, + "RPCClient": rpc_client_var}, attrs={"endpoints": pserver_endpoints, "epmap": eplist}) # step4