From 78d98c45d54013829380c18cde1109d0232bdeb1 Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Tue, 4 Sep 2018 22:54:22 +0800 Subject: [PATCH] fix async mode handle COMPLETE_MESSAGE (#13212) --- .../distributed/request_handler_impl.cc | 49 ++++++++++--------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/paddle/fluid/operators/distributed/request_handler_impl.cc b/paddle/fluid/operators/distributed/request_handler_impl.cc index de1a503154d..7229e2d2630 100644 --- a/paddle/fluid/operators/distributed/request_handler_impl.cc +++ b/paddle/fluid/operators/distributed/request_handler_impl.cc @@ -39,19 +39,6 @@ bool RequestSendHandler::Handle(const std::string& varname, const std::string& out_var_name) { VLOG(4) << "RequestSendHandler:" << varname; - // Async - if (!sync_mode_) { - rpc_server_->Profiler().OneStep(); - try { - executor_->RunPreparedContext((*grad_to_prepared_ctx_)[varname].get(), - scope); - } catch (std::exception& e) { - LOG(ERROR) << "async: run sub program error " << e.what(); - return false; - } - return true; - } - // Sync if (varname == BATCH_BARRIER_MESSAGE) { VLOG(3) << "sync: recv BATCH_BARRIER_MESSAGE"; @@ -60,17 +47,31 @@ bool RequestSendHandler::Handle(const std::string& varname, VLOG(3) << "sync: recv complete message"; rpc_server_->Complete(); } else { - VLOG(3) << "sync: received var_name: " << varname; - rpc_server_->WaitCond(kRequestSend); - VLOG(3) << "sync: processing received var: " << varname; - - if (invar == nullptr) { - LOG(FATAL) << "sync: Can not find server side var: " << varname; - return false; - } - if (invar->IsType()) { - std::unique_lock lock(mutex_sparse_vars_); - sparse_vars_.push_back(invar); + // Async + if (!sync_mode_) { + VLOG(3) << "async process var: " << varname; + rpc_server_->Profiler().OneStep(); + try { + executor_->RunPreparedContext((*grad_to_prepared_ctx_)[varname].get(), + scope); + } catch (std::exception& e) { + LOG(ERROR) << "async: run sub program error " << e.what(); + return false; + } + return true; + } else { // sync + rpc_server_->WaitCond(kRequestSend); + VLOG(3) << "sync: processing received var: " << varname; + + if (invar == nullptr) { + LOG(FATAL) << "sync: Can not find server side var: " << varname; + return false; + } + + if (invar->IsType()) { + std::unique_lock lock(mutex_sparse_vars_); + sparse_vars_.push_back(invar); + } } } return true; -- GitLab