提交 139ae08f 编写于 作者: T typhoonzero

workable

上级 093e07d3
...@@ -91,6 +91,9 @@ class ListenAndServOp : public framework::OperatorBase { ...@@ -91,6 +91,9 @@ class ListenAndServOp : public framework::OperatorBase {
auto *block = Attr<framework::BlockDesc *>(kOptimizeBlock); auto *block = Attr<framework::BlockDesc *>(kOptimizeBlock);
auto *program = block->Program(); auto *program = block->Program();
int num_blocks = program->Size(); int num_blocks = program->Size();
PADDLE_ENFORCE_GE(num_blocks, 2,
"server program should have at least 2 blocks");
framework::Executor executor(dev_place); framework::Executor executor(dev_place);
// TODO(typhoonzero): change this to a while_op for every cluster-batch. // TODO(typhoonzero): change this to a while_op for every cluster-batch.
...@@ -139,39 +142,30 @@ class ListenAndServOp : public framework::OperatorBase { ...@@ -139,39 +142,30 @@ class ListenAndServOp : public framework::OperatorBase {
// should be global ops. // should be global ops.
// NOTE: if is_gpu_place, CUDA kernels are laugched by multiple threads // NOTE: if is_gpu_place, CUDA kernels are laugched by multiple threads
// and this will still work. // and this will still work.
double ts = detail::GetTimestamp();
std::vector<std::future<void>> fs; std::vector<std::future<void>> fs;
for (int blkid = 0; blkid < num_blocks - 1; ++blkid) { // block0 contains only listen_and_serv op, start run from block1.
fs.push_back(framework::Async([&]() { for (int blkid = 1; blkid < num_blocks - 1; ++blkid) {
fs.push_back(framework::Async([&executor, &program, &recv_scope,
blkid]() {
int run_block = blkid; // thread local
try { try {
VLOG(2) << "begin run in thread" << blkid; executor.Run(*program, &recv_scope, run_block,
executor.Run(*program, &recv_scope, blkid,
false /*create_local_scope*/, false /*create_vars*/); false /*create_local_scope*/, false /*create_vars*/);
VLOG(2) << "end run in thread";
} catch (std::exception &e) { } catch (std::exception &e) {
LOG(ERROR) << "run sub program error " << e.what(); LOG(ERROR) << "run sub program error " << e.what();
} }
})); }));
} }
VLOG(2) << "waiting opts..."; for (int i = 0; i < num_blocks - 2; ++i) fs[i].wait();
for (int blkid = 0; blkid < num_blocks - 1; ++blkid) fs[blkid].wait();
VLOG(2) << "waiting opts...OK";
// Run global block at final step // Run global block at final step
if (num_blocks > 2) { if (num_blocks > 2) {
try { try {
executor.Run(*program, &recv_scope, num_blocks - 1, executor.Run(*program, &recv_scope, num_blocks - 1,
false /*create_local_scope*/, false /*create_vars*/); false /*create_local_scope*/, false /*create_vars*/);
VLOG(2) << "run global OK , spent " << detail::GetTimestamp() - ts;
} catch (std::exception &e) { } catch (std::exception &e) {
LOG(ERROR) << "run sub program error " << e.what(); LOG(ERROR) << "run sub program error " << e.what();
} }
} }
for (auto &n : recv_scope.LocalVarNames()) {
VLOG(2) << "vars in scope: " << n;
}
for (auto &n : recv_scope.LocalVarNames()) {
VLOG(2) << "vars in parent scope: " << n;
}
// Reset the received sparse variables, the sum operator would not // Reset the received sparse variables, the sum operator would not
// sum the input sparse variables which rows is empty at the next // sum the input sparse variables which rows is empty at the next
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册