From ebc7303990a74ca8de91af4d68543614825d25f2 Mon Sep 17 00:00:00 2001 From: Wu Yi Date: Fri, 18 May 2018 10:21:30 +0800 Subject: [PATCH] listen_and_serv use local scope (#10663) * listen_and_serv use localscope * fix ut --- paddle/fluid/framework/executor.cc | 15 +++++++++------ paddle/fluid/framework/executor.h | 3 ++- paddle/fluid/inference/tests/test_helper.h | 4 ++-- paddle/fluid/operators/detail/grpc_server.cc | 2 +- paddle/fluid/operators/listen_and_serv_op.cc | 7 +++---- paddle/fluid/operators/send_recv_op_test.cc | 9 +++++++-- .../fluid/tests/unittests/test_dist_train.py | 9 ++++++--- 7 files changed, 30 insertions(+), 19 deletions(-) diff --git a/paddle/fluid/framework/executor.cc b/paddle/fluid/framework/executor.cc index ce91d7a826..4e431561f8 100644 --- a/paddle/fluid/framework/executor.cc +++ b/paddle/fluid/framework/executor.cc @@ -228,7 +228,8 @@ static bool has_fetch_operators( void Executor::Run(const ProgramDesc& program, Scope* scope, std::map* feed_targets, std::map* fetch_targets, - bool create_vars, const std::string& feed_holder_name, + bool create_local_scope, bool create_vars, + const std::string& feed_holder_name, const std::string& fetch_holder_name) { platform::RecordBlock b(kProgramId); bool has_feed_ops = @@ -290,8 +291,9 @@ void Executor::Run(const ProgramDesc& program, Scope* scope, } auto ctx = Prepare(*copy_program, 0); - RunPreparedContext(ctx.get(), scope, feed_targets, fetch_targets, create_vars, - feed_holder_name, fetch_holder_name); + RunPreparedContext(ctx.get(), scope, feed_targets, fetch_targets, + create_local_scope, create_vars, feed_holder_name, + fetch_holder_name); } std::unique_ptr Executor::Prepare( @@ -366,8 +368,9 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, void Executor::RunPreparedContext( ExecutorPrepareContext* ctx, Scope* scope, std::map* feed_targets, - std::map* fetch_targets, bool create_vars, - const std::string& feed_holder_name, const std::string& fetch_holder_name) { + std::map* fetch_targets, bool create_local_scope, + bool create_vars, const std::string& feed_holder_name, + const std::string& fetch_holder_name) { auto& global_block = ctx->prog_.Block(ctx->block_id_); PADDLE_ENFORCE( @@ -387,7 +390,7 @@ void Executor::RunPreparedContext( } } - RunPreparedContext(ctx, scope, create_vars, create_vars); + RunPreparedContext(ctx, scope, create_local_scope, create_vars); // obtain the data of fetch_targets from fetch_holder for (auto* op : global_block.AllOps()) { diff --git a/paddle/fluid/framework/executor.h b/paddle/fluid/framework/executor.h index 4a3d637e2d..0c3c23611d 100644 --- a/paddle/fluid/framework/executor.h +++ b/paddle/fluid/framework/executor.h @@ -57,7 +57,7 @@ class Executor { void Run(const ProgramDesc& program, Scope* scope, std::map* feed_targets, std::map* fetch_targets, - bool create_vars = true, + bool create_local_scope = true, bool create_vars = true, const std::string& feed_holder_name = "feed", const std::string& fetch_holder_name = "fetch"); @@ -76,6 +76,7 @@ class Executor { void RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, std::map* feed_targets, std::map* fetch_targets, + bool create_local_scope = true, bool create_vars = true, const std::string& feed_holder_name = "feed", const std::string& fetch_holder_name = "fetch"); diff --git a/paddle/fluid/inference/tests/test_helper.h b/paddle/fluid/inference/tests/test_helper.h index b02e5c99f0..e33eb7f170 100644 --- a/paddle/fluid/inference/tests/test_helper.h +++ b/paddle/fluid/inference/tests/test_helper.h @@ -208,10 +208,10 @@ void TestInference(const std::string& dirname, if (PrepareContext) { ctx = executor.Prepare(*inference_program, 0); executor.RunPreparedContext(ctx.get(), scope, &feed_targets, - &fetch_targets, CreateVars); + &fetch_targets, true, CreateVars); } else { executor.Run(*inference_program, scope, &feed_targets, &fetch_targets, - CreateVars); + true, CreateVars); } // Enable the profiler diff --git a/paddle/fluid/operators/detail/grpc_server.cc b/paddle/fluid/operators/detail/grpc_server.cc index d09f8479b7..eb114a47d9 100644 --- a/paddle/fluid/operators/detail/grpc_server.cc +++ b/paddle/fluid/operators/detail/grpc_server.cc @@ -184,7 +184,7 @@ class RequestPrefetch final : public RequestBase { framework::Scope* local_scope = &scope_->NewScope(); auto* var = local_scope->FindVar(var_name); InitializeVariable(var, var_desc->GetType()); - executor_->RunPreparedContext(prefetch_ctx_, scope_, false, false); + executor_->RunPreparedContext(prefetch_ctx_, scope_); SerializeToByteBuffer(var_name, var, *dev_ctx_, &reply); diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index abc88d3eb1..57eb5d9a0e 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -57,8 +57,7 @@ static void ParallelExecuteBlocks( framework::Async([&executor, &prepared, &program, &scope, idx]() { int run_block = idx; // thread local try { - executor->RunPreparedContext(prepared[run_block].get(), scope, - false, false); + executor->RunPreparedContext(prepared[run_block].get(), scope); } catch (std::exception &e) { LOG(ERROR) << "run sub program error " << e.what(); } @@ -211,8 +210,8 @@ static void AsyncUpdateThread( } auto fs = framework::Async([var_name, &executor, &v, prepared] { try { - executor->RunPreparedContext(prepared, v.second->GetMutableLocalScope(), - false, false); + executor->RunPreparedContext(prepared, + v.second->GetMutableLocalScope()); } catch (std::exception &e) { LOG(ERROR) << "run sub program error " << e.what(); } diff --git a/paddle/fluid/operators/send_recv_op_test.cc b/paddle/fluid/operators/send_recv_op_test.cc index eb51f301bf..d5303eaf50 100644 --- a/paddle/fluid/operators/send_recv_op_test.cc +++ b/paddle/fluid/operators/send_recv_op_test.cc @@ -92,12 +92,16 @@ void InitSelectedRowsInScope(const p::CPUPlace &place, f::Scope *scope) { void AddOp(const std::string &type, const f::VariableNameMap &inputs, const f::VariableNameMap &outputs, f::AttributeMap attrs, - f::BlockDesc *block) { + f::BlockDesc *block, bool is_sparse) { // insert output for (auto kv : outputs) { for (auto v : kv.second) { auto var = block->Var(v); var->SetDataType(f::proto::VarType::FP32); + var->SetPersistable(true); + if (is_sparse) { + var->SetType(f::proto::VarType::SELECTED_ROWS); + } } } @@ -128,7 +132,8 @@ void StartServerNet(bool is_sparse, std::atomic *initialized) { auto *optimize_block = program.AppendBlock(root_block); auto *prefetch_block = program.AppendBlock(root_block); // X for server side tensors, RX for received tensors, must be of same shape. - AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, optimize_block); + AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, optimize_block, + is_sparse); f::AttributeMap attrs; attrs.insert({"endpoint", std::string("127.0.0.1:0")}); attrs.insert({"Fanin", 1}); diff --git a/python/paddle/fluid/tests/unittests/test_dist_train.py b/python/paddle/fluid/tests/unittests/test_dist_train.py index 77e9a8f7e7..c2393a288c 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_train.py +++ b/python/paddle/fluid/tests/unittests/test_dist_train.py @@ -52,15 +52,18 @@ class TestSendOp(unittest.TestCase): serv = layers.ListenAndServ( "127.0.0.1:0", ["X"], optimizer_mode=False) with serv.do(): + out_var = main.global_block().create_var( + name="scale_0.tmp_0", + psersistable=True, + dtype="float32", + shape=[32, 32]) x = layers.data( shape=[32, 32], dtype='float32', name="X", append_batch_size=False) fluid.initializer.Constant(value=1.0)(x, main.global_block()) - o = layers.scale(x=x, scale=10.0) - main.global_block().create_var( - name=o.name, psersistable=False, dtype=o.dtype, shape=o.shape) + layers.scale(x=x, scale=10.0, out=out_var) self.server_exe = fluid.Executor(place) self.server_exe.run(main) -- GitLab