From ebf0027391a657279365be5ead628c6bc8daed37 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Wed, 2 May 2018 11:22:27 +0800 Subject: [PATCH] use IOThreadPool to dispatch async update task --- paddle/fluid/operators/listen_and_serv_op.cc | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index e0b828548fe..f22f8b26103 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -201,14 +201,16 @@ static void AsyncUpdateThread( LOG(ERROR) << "Can not find server side var: " << recv_var_name; PADDLE_THROW("Can not find server side var"); } - try { - executor->RunPreparedContext(prepared, v.second->GetMutableLocalScope(), - false, false); - } catch (std::exception &e) { - LOG(ERROR) << "run sub program error " << e.what(); - } + auto fs = framework::Async([var_name, &executor, &v, prepared] { + try { + executor->RunPreparedContext(prepared, v.second->GetMutableLocalScope(), + false, false); + } catch (std::exception &e) { + LOG(ERROR) << "run sub program error " << e.what(); + } + }); + fs.wait(); } - VLOG(3) << "update thread for " << var_name << " ended"; } void ListenAndServOp::RunAsyncLoop(framework::Executor *executor, @@ -256,8 +258,8 @@ void ListenAndServOp::RunAsyncLoop(framework::Executor *executor, for (auto iter = grad_to_queue.begin(); iter != grad_to_queue.end(); iter++) { std::string grad_name = iter->first; VLOG(3) << "create async update thread for " << grad_name; - fs.push_back(framework::Async([grad_name, &exit_flag, &executor, - &grad_to_queue, &grad_to_prepared_ctx]() { + fs.push_back(framework::AsyncIO([grad_name, &exit_flag, &executor, + &grad_to_queue, &grad_to_prepared_ctx]() { AsyncUpdateThread(grad_name, exit_flag, grad_to_queue[grad_name], executor, grad_to_prepared_ctx[grad_name].get()); })); -- GitLab