提交 1ab4fcb5 编写于 作者: T typhoonzero

prepare pserver executor

上级 12856c5f
...@@ -279,6 +279,21 @@ std::unique_ptr<ExecutorPrepareContext> Executor::Prepare( ...@@ -279,6 +279,21 @@ std::unique_ptr<ExecutorPrepareContext> Executor::Prepare(
return std::unique_ptr<ExecutorPrepareContext>(ctx); return std::unique_ptr<ExecutorPrepareContext>(ctx);
} }
std::vector<std::shared_ptr<ExecutorPrepareContext>> Prepare(
const ProgramDesc& program, const std::vector<int>& block_ids) {
std::vector<std::shared_ptr<ExecutorPrepareContext>> result;
for (auto& bid : block_ids) {
auto* ctx = new ExecutorPrepareContext(program, bid);
PADDLE_ENFORCE_LT(static_cast<size_t>(bid), program.Size());
auto& block = program.Block(bid);
for (auto& op_desc : block.AllOps()) {
ctx->ops_.push_back(OpRegistry::CreateOp(*op_desc));
}
result.push_back(std::shared_ptr<ExecutorPrepareContext>(ctx));
}
return result;
}
void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
bool create_local_scope, bool create_vars) { bool create_local_scope, bool create_vars) {
auto& block = ctx->prog_.Block(ctx->block_id_); auto& block = ctx->prog_.Block(ctx->block_id_);
......
...@@ -60,6 +60,9 @@ class Executor { ...@@ -60,6 +60,9 @@ class Executor {
static std::unique_ptr<ExecutorPrepareContext> Prepare( static std::unique_ptr<ExecutorPrepareContext> Prepare(
const ProgramDesc& program, int block_id); const ProgramDesc& program, int block_id);
static std::vector<std::shared_ptr<ExecutorPrepareContext>> Prepare(
const ProgramDesc& program, const std::vector<int>& block_ids);
void RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, void RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
bool create_local_scope = true, bool create_local_scope = true,
bool create_vars = true); bool create_vars = true);
......
...@@ -93,6 +93,10 @@ class ListenAndServOp : public framework::OperatorBase { ...@@ -93,6 +93,10 @@ class ListenAndServOp : public framework::OperatorBase {
"server program should have at least 2 blocks"); "server program should have at least 2 blocks");
framework::Executor executor(dev_place); framework::Executor executor(dev_place);
std::vector<int> block_list;
for (int blkid = 1; blkid < num_blocks; ++blkid)
block_list.push_back(blkid);
auto prepared = executor.Prepare(*program, block_list);
// TODO(typhoonzero): change this to a while_op for every cluster-batch. // TODO(typhoonzero): change this to a while_op for every cluster-batch.
bool exit_flag = false; bool exit_flag = false;
...@@ -143,11 +147,12 @@ class ListenAndServOp : public framework::OperatorBase { ...@@ -143,11 +147,12 @@ class ListenAndServOp : public framework::OperatorBase {
std::vector<std::future<void>> fs; std::vector<std::future<void>> fs;
// block0 contains only listen_and_serv op, start run from block1. // block0 contains only listen_and_serv op, start run from block1.
for (int blkid = 1; blkid < num_blocks - 1; ++blkid) { for (int blkid = 1; blkid < num_blocks - 1; ++blkid) {
fs.push_back( fs.push_back(framework::Async(
framework::Async([&executor, &program, &recv_scope, blkid]() { [&executor, &program, &recv_scope, &prepared, blkid]() {
int run_block = blkid; // thread local int run_block = blkid; // thread local
try { try {
executor.Run(*program, &recv_scope, run_block, false, false); executor.RunPreparedContext(prepared[run_block].get(),
&recv_scope, false, false);
} catch (std::exception &e) { } catch (std::exception &e) {
LOG(ERROR) << "run sub program error " << e.what(); LOG(ERROR) << "run sub program error " << e.what();
} }
...@@ -157,7 +162,9 @@ class ListenAndServOp : public framework::OperatorBase { ...@@ -157,7 +162,9 @@ class ListenAndServOp : public framework::OperatorBase {
// Run global block at final step, or block1 if there are only 2 blocks // Run global block at final step, or block1 if there are only 2 blocks
if (num_blocks >= 2) { if (num_blocks >= 2) {
try { try {
executor.Run(*program, &recv_scope, num_blocks - 1, false, false); // executor.Run(program, &recv_scope, num_blocks - 1, false, false);
executor.RunPreparedContext(prepared[num_blocks - 1].get(),
&recv_scope, false, false);
} catch (std::exception &e) { } catch (std::exception &e) {
LOG(ERROR) << "run sub program error " << e.what(); LOG(ERROR) << "run sub program error " << e.what();
} }
...@@ -172,14 +179,11 @@ class ListenAndServOp : public framework::OperatorBase { ...@@ -172,14 +179,11 @@ class ListenAndServOp : public framework::OperatorBase {
var->GetMutable<framework::SelectedRows>()->mutable_rows()->clear(); var->GetMutable<framework::SelectedRows>()->mutable_rows()->clear();
} }
rpc_service_->SetCond(1); rpc_service_->SetCond(1);
// FIXME(typhoonzero): use another condition to sync wait clients get. // NOTE: does not consider barrier request retry in here, we may use
// global barrier id to resolve this.
rpc_service_->WaitClientGet(fan_in); rpc_service_->WaitClientGet(fan_in);
sparse_vars.clear(); sparse_vars.clear();
} // while(true) } // while(true)
// for (int i = 0; i < num_blocks; ++i) {
// delete blk_ctx_list[i];
// }
} }
protected: protected:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册