diff --git a/paddle/fluid/framework/executor.cc b/paddle/fluid/framework/executor.cc index ce91d7a82674364560b8065277b28b51ae1b303a..4e431561f81b2a84c06dff9fcb041317ebc84ae3 100644 --- a/paddle/fluid/framework/executor.cc +++ b/paddle/fluid/framework/executor.cc @@ -228,7 +228,8 @@ static bool has_fetch_operators( void Executor::Run(const ProgramDesc& program, Scope* scope, std::map* feed_targets, std::map* fetch_targets, - bool create_vars, const std::string& feed_holder_name, + bool create_local_scope, bool create_vars, + const std::string& feed_holder_name, const std::string& fetch_holder_name) { platform::RecordBlock b(kProgramId); bool has_feed_ops = @@ -290,8 +291,9 @@ void Executor::Run(const ProgramDesc& program, Scope* scope, } auto ctx = Prepare(*copy_program, 0); - RunPreparedContext(ctx.get(), scope, feed_targets, fetch_targets, create_vars, - feed_holder_name, fetch_holder_name); + RunPreparedContext(ctx.get(), scope, feed_targets, fetch_targets, + create_local_scope, create_vars, feed_holder_name, + fetch_holder_name); } std::unique_ptr Executor::Prepare( @@ -366,8 +368,9 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, void Executor::RunPreparedContext( ExecutorPrepareContext* ctx, Scope* scope, std::map* feed_targets, - std::map* fetch_targets, bool create_vars, - const std::string& feed_holder_name, const std::string& fetch_holder_name) { + std::map* fetch_targets, bool create_local_scope, + bool create_vars, const std::string& feed_holder_name, + const std::string& fetch_holder_name) { auto& global_block = ctx->prog_.Block(ctx->block_id_); PADDLE_ENFORCE( @@ -387,7 +390,7 @@ void Executor::RunPreparedContext( } } - RunPreparedContext(ctx, scope, create_vars, create_vars); + RunPreparedContext(ctx, scope, create_local_scope, create_vars); // obtain the data of fetch_targets from fetch_holder for (auto* op : global_block.AllOps()) { diff --git a/paddle/fluid/framework/executor.h b/paddle/fluid/framework/executor.h index 4a3d637e2d79f8cbd83412eea2d73e4b497ef1e7..0c3c23611d95e0da67cabfb8fb2755a4a52c991b 100644 --- a/paddle/fluid/framework/executor.h +++ b/paddle/fluid/framework/executor.h @@ -57,7 +57,7 @@ class Executor { void Run(const ProgramDesc& program, Scope* scope, std::map* feed_targets, std::map* fetch_targets, - bool create_vars = true, + bool create_local_scope = true, bool create_vars = true, const std::string& feed_holder_name = "feed", const std::string& fetch_holder_name = "fetch"); @@ -76,6 +76,7 @@ class Executor { void RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, std::map* feed_targets, std::map* fetch_targets, + bool create_local_scope = true, bool create_vars = true, const std::string& feed_holder_name = "feed", const std::string& fetch_holder_name = "fetch"); diff --git a/paddle/fluid/inference/tests/test_helper.h b/paddle/fluid/inference/tests/test_helper.h index b02e5c99f00eaf03c3753e43575cbc67e834774e..e33eb7f170ea04b998283b24d9210a9bb7538675 100644 --- a/paddle/fluid/inference/tests/test_helper.h +++ b/paddle/fluid/inference/tests/test_helper.h @@ -208,10 +208,10 @@ void TestInference(const std::string& dirname, if (PrepareContext) { ctx = executor.Prepare(*inference_program, 0); executor.RunPreparedContext(ctx.get(), scope, &feed_targets, - &fetch_targets, CreateVars); + &fetch_targets, true, CreateVars); } else { executor.Run(*inference_program, scope, &feed_targets, &fetch_targets, - CreateVars); + true, CreateVars); } // Enable the profiler diff --git a/paddle/fluid/operators/detail/grpc_server.cc b/paddle/fluid/operators/detail/grpc_server.cc index d09f8479b765ad26cc202bfdb2692828213c7956..eb114a47d99541402f748bfffcf6b10fde3e78e2 100644 --- a/paddle/fluid/operators/detail/grpc_server.cc +++ b/paddle/fluid/operators/detail/grpc_server.cc @@ -184,7 +184,7 @@ class RequestPrefetch final : public RequestBase { framework::Scope* local_scope = &scope_->NewScope(); auto* var = local_scope->FindVar(var_name); InitializeVariable(var, var_desc->GetType()); - executor_->RunPreparedContext(prefetch_ctx_, scope_, false, false); + executor_->RunPreparedContext(prefetch_ctx_, scope_); SerializeToByteBuffer(var_name, var, *dev_ctx_, &reply); diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index abc88d3eb1514e159f4a880f44ecc0c0960a73d9..57eb5d9a0e73a51d9e2cef7ad7539c1b9da2c4ea 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -57,8 +57,7 @@ static void ParallelExecuteBlocks( framework::Async([&executor, &prepared, &program, &scope, idx]() { int run_block = idx; // thread local try { - executor->RunPreparedContext(prepared[run_block].get(), scope, - false, false); + executor->RunPreparedContext(prepared[run_block].get(), scope); } catch (std::exception &e) { LOG(ERROR) << "run sub program error " << e.what(); } @@ -211,8 +210,8 @@ static void AsyncUpdateThread( } auto fs = framework::Async([var_name, &executor, &v, prepared] { try { - executor->RunPreparedContext(prepared, v.second->GetMutableLocalScope(), - false, false); + executor->RunPreparedContext(prepared, + v.second->GetMutableLocalScope()); } catch (std::exception &e) { LOG(ERROR) << "run sub program error " << e.what(); } diff --git a/paddle/fluid/operators/send_recv_op_test.cc b/paddle/fluid/operators/send_recv_op_test.cc index eb51f301bfe2a97c65dd1fec23ff5a44f3843b05..d5303eaf50722234d205264e56892b1723104d53 100644 --- a/paddle/fluid/operators/send_recv_op_test.cc +++ b/paddle/fluid/operators/send_recv_op_test.cc @@ -92,12 +92,16 @@ void InitSelectedRowsInScope(const p::CPUPlace &place, f::Scope *scope) { void AddOp(const std::string &type, const f::VariableNameMap &inputs, const f::VariableNameMap &outputs, f::AttributeMap attrs, - f::BlockDesc *block) { + f::BlockDesc *block, bool is_sparse) { // insert output for (auto kv : outputs) { for (auto v : kv.second) { auto var = block->Var(v); var->SetDataType(f::proto::VarType::FP32); + var->SetPersistable(true); + if (is_sparse) { + var->SetType(f::proto::VarType::SELECTED_ROWS); + } } } @@ -128,7 +132,8 @@ void StartServerNet(bool is_sparse, std::atomic *initialized) { auto *optimize_block = program.AppendBlock(root_block); auto *prefetch_block = program.AppendBlock(root_block); // X for server side tensors, RX for received tensors, must be of same shape. - AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, optimize_block); + AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, optimize_block, + is_sparse); f::AttributeMap attrs; attrs.insert({"endpoint", std::string("127.0.0.1:0")}); attrs.insert({"Fanin", 1}); diff --git a/python/paddle/fluid/tests/unittests/test_dist_train.py b/python/paddle/fluid/tests/unittests/test_dist_train.py index 77e9a8f7e72a9e0790ce1d1f48356abcca8eaccf..c2393a288c6ebb5dd4a12f7b591d12cc94f4ea55 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_train.py +++ b/python/paddle/fluid/tests/unittests/test_dist_train.py @@ -52,15 +52,18 @@ class TestSendOp(unittest.TestCase): serv = layers.ListenAndServ( "127.0.0.1:0", ["X"], optimizer_mode=False) with serv.do(): + out_var = main.global_block().create_var( + name="scale_0.tmp_0", + psersistable=True, + dtype="float32", + shape=[32, 32]) x = layers.data( shape=[32, 32], dtype='float32', name="X", append_batch_size=False) fluid.initializer.Constant(value=1.0)(x, main.global_block()) - o = layers.scale(x=x, scale=10.0) - main.global_block().create_var( - name=o.name, psersistable=False, dtype=o.dtype, shape=o.shape) + layers.scale(x=x, scale=10.0, out=out_var) self.server_exe = fluid.Executor(place) self.server_exe.run(main)