From 139ae08fdf80779306f0a562824770258c3eb9ad Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Tue, 20 Mar 2018 14:17:02 +0800 Subject: [PATCH] workable --- paddle/fluid/operators/listen_and_serv_op.cc | 26 ++++++++------------ 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 8cae29d74d..2abb5fc101 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -91,6 +91,9 @@ class ListenAndServOp : public framework::OperatorBase { auto *block = Attr(kOptimizeBlock); auto *program = block->Program(); int num_blocks = program->Size(); + PADDLE_ENFORCE_GE(num_blocks, 2, + "server program should have at least 2 blocks"); + framework::Executor executor(dev_place); // TODO(typhoonzero): change this to a while_op for every cluster-batch. @@ -139,39 +142,30 @@ class ListenAndServOp : public framework::OperatorBase { // should be global ops. // NOTE: if is_gpu_place, CUDA kernels are laugched by multiple threads // and this will still work. - double ts = detail::GetTimestamp(); std::vector> fs; - for (int blkid = 0; blkid < num_blocks - 1; ++blkid) { - fs.push_back(framework::Async([&]() { + // block0 contains only listen_and_serv op, start run from block1. + for (int blkid = 1; blkid < num_blocks - 1; ++blkid) { + fs.push_back(framework::Async([&executor, &program, &recv_scope, + blkid]() { + int run_block = blkid; // thread local try { - VLOG(2) << "begin run in thread" << blkid; - executor.Run(*program, &recv_scope, blkid, + executor.Run(*program, &recv_scope, run_block, false /*create_local_scope*/, false /*create_vars*/); - VLOG(2) << "end run in thread"; } catch (std::exception &e) { LOG(ERROR) << "run sub program error " << e.what(); } })); } - VLOG(2) << "waiting opts..."; - for (int blkid = 0; blkid < num_blocks - 1; ++blkid) fs[blkid].wait(); - VLOG(2) << "waiting opts...OK"; + for (int i = 0; i < num_blocks - 2; ++i) fs[i].wait(); // Run global block at final step if (num_blocks > 2) { try { executor.Run(*program, &recv_scope, num_blocks - 1, false /*create_local_scope*/, false /*create_vars*/); - VLOG(2) << "run global OK , spent " << detail::GetTimestamp() - ts; } catch (std::exception &e) { LOG(ERROR) << "run sub program error " << e.what(); } } - for (auto &n : recv_scope.LocalVarNames()) { - VLOG(2) << "vars in scope: " << n; - } - for (auto &n : recv_scope.LocalVarNames()) { - VLOG(2) << "vars in parent scope: " << n; - } // Reset the received sparse variables, the sum operator would not // sum the input sparse variables which rows is empty at the next -- GitLab