From 0bddb951c2017fd9cc9d370e718eb01902bf00f2 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Tue, 3 Dec 2019 13:13:23 +0800 Subject: [PATCH] fix async mode, test=develop (#21367) --- .../distributed_ops/listen_and_serv_op.cc | 31 ++++++++++++------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/paddle/fluid/operators/distributed_ops/listen_and_serv_op.cc b/paddle/fluid/operators/distributed_ops/listen_and_serv_op.cc index a7476b9b3a..05b95dbd4e 100644 --- a/paddle/fluid/operators/distributed_ops/listen_and_serv_op.cc +++ b/paddle/fluid/operators/distributed_ops/listen_and_serv_op.cc @@ -464,23 +464,21 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, f(request_get_no_barrier_handler_.get()); f(request_notify_handler_.get()); - // start the server listening after all member initialized. - server_thread_.reset(new std::thread(RunServer, rpc_service_)); - VLOG(3) << "wait server thread to become ready..."; - rpc_service_->WaitServerReady(); - // register SIGINT(from ctrl+C) and SIGTERM(from kill) signal handlers signal(SIGINT, SignalHandler::StopAndExit); signal(SIGTERM, SignalHandler::StopAndExit); - // Cache the type of the received vars as `sparse_vars_` and `dense_vars_` - // so that we can reset them at the end of each iteration. - // NOTE: only used in sync update - CacheVarsType(inputs, recv_scope); - - // Write to a file of server selected port for python use. - SavePort(); if (sync_mode) { + // start the server listening after all member initialized. + server_thread_.reset(new std::thread(RunServer, rpc_service_)); + VLOG(3) << "wait server thread to become ready..."; + rpc_service_->WaitServerReady(); + + CacheVarsType(inputs, recv_scope); + + // Write to a file of server selected port for python use. + SavePort(); + RunSyncLoop(&executor, program, &recv_scope, &dev_ctx, prefetch_block_id_list, checkpoint_block_id); } else { @@ -498,6 +496,15 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, split(grad_to_block_id_str[0], ':', &pieces); distributed::HeartBeatMonitor::Init(fan_in, pserver_id == 0, pieces[0]); } + + // start the server listening after all member initialized. + server_thread_.reset(new std::thread(RunServer, rpc_service_)); + VLOG(3) << "wait server thread to become ready..."; + rpc_service_->WaitServerReady(); + + // Write to a file of server selected port for python use. + SavePort(); + RunAsyncLoop(&executor, program, &recv_scope); } } -- GitLab