From bc8e600ce5f871a23ae0bcea9466eba922447954 Mon Sep 17 00:00:00 2001 From: Chengmo Date: Tue, 5 Nov 2019 13:35:06 +0800 Subject: [PATCH] Fix rpc not wait in GEO communicator (#20967) * test=develop,fix rpc not wait in geo --- paddle/fluid/operators/distributed/communicator.cc | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/paddle/fluid/operators/distributed/communicator.cc b/paddle/fluid/operators/distributed/communicator.cc index 6db04b3ad1f..2027a874630 100644 --- a/paddle/fluid/operators/distributed/communicator.cc +++ b/paddle/fluid/operators/distributed/communicator.cc @@ -937,8 +937,9 @@ void GeoSgdCommunicator::RpcSend(const std::string &origin_var_name, auto &cpu_ctx_send = *pool.Get(platform::CPUPlace()); distributed::RPCClient *rpc_client = distributed::RPCClient::GetInstance(trainer_id); - rpc_client->AsyncSendVar(endpoint, cpu_ctx_send, *delta_scope_.get(), - splited_var_name); + auto handle = rpc_client->AsyncSendVar(endpoint, cpu_ctx_send, + *delta_scope_.get(), splited_var_name); + handle->Wait(); } void GeoSgdCommunicator::RpcRecv(const std::string &var_name, @@ -951,8 +952,10 @@ void GeoSgdCommunicator::RpcRecv(const std::string &var_name, distributed::RPCClient *rpc_client = distributed::RPCClient::GetInstance(train_id); pserver_scope_->Var(splited_var_name); - rpc_client->AsyncGetVar(endpoint, cpu_ctx_recv, *pserver_scope_.get(), - splited_var_name, splited_var_name, splited_var_name); + auto handle = rpc_client->AsyncGetVar(endpoint, cpu_ctx_recv, + *pserver_scope_.get(), splited_var_name, + splited_var_name, splited_var_name); + handle->Wait(); } void GeoSgdCommunicator::Recv() {} -- GitLab