diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index a807911147939e7d13a26be27e7f7c3ab86ed52a..54f3511e15729c1db6fff1215317ceafeff39e08 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -193,7 +193,7 @@ else() cc_test(test_naive_executor SRCS naive_executor_test.cc DEPS naive_executor elementwise_add_op) endif() -target_link_libraries(executor while_op_helper executor_gc_helper recurrent_op_helper) +target_link_libraries(executor while_op_helper executor_gc_helper recurrent_op_helper conditional_block_op_helper) cc_library(parallel_executor SRCS parallel_executor.cc DEPS threaded_ssa_graph_executor scope_buffered_ssa_graph_executor parallel_ssa_graph_executor async_ssa_graph_executor diff --git a/paddle/fluid/framework/executor.cc b/paddle/fluid/framework/executor.cc index cfab2f5f4cc99a41b40e5ab31f30f009a346c5c5..defe97cd6f2d4e5a9ca3fd1880d8bbfc0989e482 100644 --- a/paddle/fluid/framework/executor.cc +++ b/paddle/fluid/framework/executor.cc @@ -30,6 +30,7 @@ limitations under the License. */ #include "paddle/fluid/framework/trainer_factory.h" #include "paddle/fluid/framework/transfer_scope_cache.h" #include "paddle/fluid/framework/variable_helper.h" +#include "paddle/fluid/operators/controlflow/conditional_block_op_helper.h" #include "paddle/fluid/operators/controlflow/recurrent_op_helper.h" #include "paddle/fluid/operators/controlflow/while_op_helper.h" #include "paddle/fluid/operators/distributed/distributed.h" @@ -58,10 +59,30 @@ ExecutorPrepareContext::ExecutorPrepareContext( void ExecutorPrepareContext::PrepareUnusedVars( const std::vector& keep_vars, bool force_disable_gc) { +#ifdef PADDLE_WITH_NGRAPH + if (FLAGS_use_ngraph) { + // FIXME(zjl): There is difference when ngraph and gc are both enabled + // in unittests. I do not know why it happens. Maybe ngraph engine + // would cache some variables? + LOG_FIRST_N(WARNING, 1) + << "FLAGS_use_ngraph=True, garbage collection strategy is " + "disabled in Executor"; + force_disable_gc = true; + } +#endif force_disable_gc_ = force_disable_gc; if (GetEagerDeletionThreshold() < 0 || force_disable_gc_) { return; } + + // If gc is enabled and block size > 1 + if (prog_.Size() > 1) { + operators::PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp( + block_id_, ops_); + operators::PrepareSafeEagerDeletionOnWhileOpAndWhileGradOp(block_id_, ops_); + operators::PrepareSafeEagerDeletionOnRecurrentOpAndRecurrentGradOp( + block_id_, ops_); + } unused_vars_ = GetUnusedVars(prog_.Block(block_id_), ops_, keep_vars); } @@ -407,13 +428,6 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, #ifdef PADDLE_WITH_CUDA } #endif - // If gc is enabled and block size > 1 - if (gc && ctx->prog_.Size() > 1) { - operators::PrepareSafeEagerDeletionOnWhileOpAndWhileGradOp(ctx->block_id_, - ctx->ops_); - operators::PrepareSafeEagerDeletionOnRecurrentOpAndRecurrentGradOp( - ctx->block_id_, ctx->ops_); - } } for (auto& op : ctx->ops_) { diff --git a/paddle/fluid/framework/garbage_collector.cc b/paddle/fluid/framework/garbage_collector.cc index 789b2ef80ec09a69ca227a27c61dd58e58a2fc04..abab2fdb8773e27c725890ce8ca7fcb321019c6c 100644 --- a/paddle/fluid/framework/garbage_collector.cc +++ b/paddle/fluid/framework/garbage_collector.cc @@ -28,8 +28,15 @@ namespace paddle { namespace framework { +// Disable gc by default when inference library is built +#ifdef PADDLE_ON_INFERENCE +static const double kDefaultEagerDeleteTensorGB = -1; +#else +static const double kDefaultEagerDeleteTensorGB = 0; +#endif + DEFINE_double( - eager_delete_tensor_gb, -1.0, + eager_delete_tensor_gb, kDefaultEagerDeleteTensorGB, "Memory size threshold (GB) when the garbage collector clear tensors." "Disabled when this value is less than 0"); @@ -48,6 +55,9 @@ GarbageCollector::GarbageCollector(const platform::Place &place, : max_memory_size_((std::max)(max_memory_size, static_cast(1))) { garbages_.reset(new GarbageQueue()); dev_ctx_ = platform::DeviceContextPool::Instance().Get(place); + if (max_memory_size_ > 1) { + mutex_.reset(new std::mutex()); + } } CPUGarbageCollector::CPUGarbageCollector(const platform::CPUPlace &place, diff --git a/paddle/fluid/framework/garbage_collector.h b/paddle/fluid/framework/garbage_collector.h index 6ce797bd962a10fffb42ae120153ec9bf6e5871e..610339520db540f5b6ca6caf9d37634b0a236e5f 100644 --- a/paddle/fluid/framework/garbage_collector.h +++ b/paddle/fluid/framework/garbage_collector.h @@ -46,7 +46,7 @@ class GarbageCollector { platform::DeviceContext *dev_ctx_; std::unique_ptr garbages_; - mutable std::mutex mutex_; + mutable std::unique_ptr mutex_; const size_t max_memory_size_; size_t cur_memory_size_{0}; }; @@ -118,7 +118,7 @@ void GarbageCollector::Add(Container &&objs, Callback &&callback) { GarbageQueue *garbage_queue = nullptr; { - std::lock_guard guard(mutex_); + std::lock_guard guard(*mutex_); for (auto &obj : objs) { if (!obj) continue; cur_memory_size_ += obj->size(); diff --git a/paddle/fluid/framework/ir/memory_optimize_pass/CMakeLists.txt b/paddle/fluid/framework/ir/memory_optimize_pass/CMakeLists.txt index 32388f239c2dc9b9dc7407975de8f8a2d4ebd06b..37993d3f0d96170c3926c91654cf321cabb2539f 100644 --- a/paddle/fluid/framework/ir/memory_optimize_pass/CMakeLists.txt +++ b/paddle/fluid/framework/ir/memory_optimize_pass/CMakeLists.txt @@ -1,11 +1,12 @@ cc_library(op_graph_view SRCS op_graph_view.cc DEPS op_handle_base) +cc_library(conditional_block_op_eager_deletion_pass SRCS conditional_block_op_eager_deletion_pass.cc DEPS conditional_block_op_helper graph_helper pass computation_op_handle) cc_library(while_op_eager_deletion_pass SRCS while_op_eager_deletion_pass.cc DEPS while_op_helper graph_helper pass computation_op_handle) cc_library(recurrent_op_eager_deletion_pass SRCS recurrent_op_eager_deletion_pass.cc DEPS recurrent_op_helper graph_helper pass computation_op_handle) cc_library(reference_count_pass_helper SRCS reference_count_pass_helper.cc DEPS garbage_collector computation_op_handle var_handle) cc_library(reference_count_pass SRCS reference_count_pass.cc DEPS computation_op_handle graph graph_helper pass op_graph_view reference_count_pass_helper) cc_library(eager_deletion_pass SRCS eager_deletion_pass.cc DEPS computation_op_handle - eager_deletion_op_handle graph graph_helper pass while_op_eager_deletion_pass recurrent_op_eager_deletion_pass reference_count_pass_helper) + eager_deletion_op_handle graph graph_helper pass conditional_block_op_eager_deletion_pass while_op_eager_deletion_pass recurrent_op_eager_deletion_pass reference_count_pass_helper) cc_library(memory_reuse_pass SRCS memory_reuse_pass.cc DEPS computation_op_handle reference_count_pass_helper share_tensor_buffer_op_handle multi_devices_helper graph pass) diff --git a/paddle/fluid/framework/ir/memory_optimize_pass/conditional_block_op_eager_deletion_pass.cc b/paddle/fluid/framework/ir/memory_optimize_pass/conditional_block_op_eager_deletion_pass.cc new file mode 100644 index 0000000000000000000000000000000000000000..5bceb4e8346ae04945da72ce248a187adb1288b3 --- /dev/null +++ b/paddle/fluid/framework/ir/memory_optimize_pass/conditional_block_op_eager_deletion_pass.cc @@ -0,0 +1,61 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/details/computation_op_handle.h" +#include "paddle/fluid/framework/ir/graph_helper.h" +#include "paddle/fluid/framework/ir/pass.h" +#include "paddle/fluid/operators/controlflow/conditional_block_op_helper.h" +#include "paddle/fluid/operators/controlflow/op_variant.h" + +namespace paddle { +namespace framework { +namespace ir { + +class ConditionalOpEagerDeletionPass : public Pass { + protected: + void ApplyImpl(Graph *graph) const override { + auto all_ops = ir::FilterByNodeWrapper(*graph); + + // Find all conditional_op and conditional_grad_op + std::unordered_map, + std::vector>> + target_ops; + for (auto *op : all_ops) { + auto compute_op = dynamic_cast(op); + if (compute_op == nullptr) continue; + + if (compute_op->Name() == "conditional_block") { + target_ops[compute_op->GetScopeIdx()].first.emplace_back( + compute_op->GetOp()); + } else if (compute_op->Name() == "conditional_block_grad") { + target_ops[compute_op->GetScopeIdx()].second.emplace_back( + compute_op->GetOp()); + } + } + + for (auto &ops_pair : target_ops) { + auto &ifelse_ops = ops_pair.second.first; + auto &ifelse_grad_ops = ops_pair.second.second; + operators::PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp( + ifelse_ops, ifelse_grad_ops); + } + } +}; + +} // namespace ir +} // namespace framework +} // namespace paddle + +REGISTER_PASS(conditional_block_op_eager_deletion_pass, + paddle::framework::ir::ConditionalOpEagerDeletionPass); diff --git a/paddle/fluid/framework/ir/memory_optimize_pass/eager_deletion_pass.cc b/paddle/fluid/framework/ir/memory_optimize_pass/eager_deletion_pass.cc index dc32dd6cda9374deb2550d881466a5a29eadf055..962401a672d44939f4aa908ccbda4a42d1ef040a 100644 --- a/paddle/fluid/framework/ir/memory_optimize_pass/eager_deletion_pass.cc +++ b/paddle/fluid/framework/ir/memory_optimize_pass/eager_deletion_pass.cc @@ -269,6 +269,11 @@ void EagerDeletionPass::ApplyImpl(ir::Graph *graph) const { } } + auto conditional_block_op_eager_deletion_pass = + ir::PassRegistry::Instance().Get( + "conditional_block_op_eager_deletion_pass"); + conditional_block_op_eager_deletion_pass->Apply(graph); + auto while_op_eager_deletion_pass = ir::PassRegistry::Instance().Get("while_op_eager_deletion_pass"); while_op_eager_deletion_pass->Apply(graph); @@ -288,5 +293,6 @@ REGISTER_PASS(eager_deletion_pass, paddle::framework::ir::EagerDeletionPass) .RequirePassAttr(paddle::framework::ir::kAllPlaces) .RequirePassAttr(paddle::framework::ir::kGarbageCollector); +USE_PASS(conditional_block_op_eager_deletion_pass); USE_PASS(while_op_eager_deletion_pass); USE_PASS(recurrent_op_eager_deletion_pass); diff --git a/paddle/fluid/framework/ir/memory_optimize_pass/reference_count_pass.cc b/paddle/fluid/framework/ir/memory_optimize_pass/reference_count_pass.cc index c0e3a9290bf4c1bb324631e4249633adaa869530..f34c112dbe9d442e27997e14a12787fa922b8c70 100644 --- a/paddle/fluid/framework/ir/memory_optimize_pass/reference_count_pass.cc +++ b/paddle/fluid/framework/ir/memory_optimize_pass/reference_count_pass.cc @@ -337,6 +337,10 @@ void ReferenceCountPass::ApplyImpl(ir::Graph *graph) const { for (auto iter = var_handles.rbegin(); iter != var_handles.rend(); ++iter) { + if ((*iter)->Node()->IsCtrlVar()) { + break; + } + VLOG(10) << "Try to find last living ops of " << var_name << " " << (iter - var_handles.rbegin()) << " time"; LastLiveOpSearchStatus status = LastLiveOpSearchStatus::kFailure; diff --git a/paddle/fluid/operators/controlflow/CMakeLists.txt b/paddle/fluid/operators/controlflow/CMakeLists.txt index f7281a2d1a00a6b85aaa353e5137919edf96f288..758f0a65d13c1d8ec88212ca82199293678f99cb 100644 --- a/paddle/fluid/operators/controlflow/CMakeLists.txt +++ b/paddle/fluid/operators/controlflow/CMakeLists.txt @@ -1,7 +1,10 @@ include(operators) register_operators(DEPS naive_executor) cc_library(op_variant SRCS op_variant.cc DEPS operator proto_desc) +cc_library(conditional_block_op_helper SRCS conditional_block_op_helper.cc DEPS operator op_variant conditional_block_op) cc_library(recurrent_op_helper SRCS recurrent_op_helper.cc DEPS operator op_variant recurrent_op) cc_library(while_op_helper SRCS while_op_helper.cc DEPS operator op_variant) +target_link_libraries(conditional_block_infer_op conditional_block_op) + file(APPEND ${pybind_file} "USE_OP(less_than);\nUSE_OP(logical_and);\nUSE_NO_KERNEL_OP(read_from_array);\n") diff --git a/paddle/fluid/operators/controlflow/conditional_block_op.cc b/paddle/fluid/operators/controlflow/conditional_block_op.cc index 8358ef755b90e914e839ae72c50024fc132cd3de..260b5672b4f06ab37b9ac0d7fe40e5fb69beb96f 100644 --- a/paddle/fluid/operators/controlflow/conditional_block_op.cc +++ b/paddle/fluid/operators/controlflow/conditional_block_op.cc @@ -17,6 +17,12 @@ limitations under the License. */ namespace paddle { namespace operators { +const char ConditionalOp::kInputs[] = "Input"; +const char ConditionalOp::kOutputs[] = "Out"; +const char ConditionalOp::kCondition[] = "Cond"; +const char ConditionalOp::kScope[] = "Scope"; +const char ConditionalOp::kSkipEagerDeletionVars[] = "skip_eager_deletion_vars"; + class ConditionalBlockOp : public ConditionalOp { public: ConditionalBlockOp(const std::string &type, @@ -33,20 +39,20 @@ class ConditionalBlockOp : public ConditionalOp { // When is_scalar_condition is True, the conditional variable is a scalar, // whether need to execute the operators in sub-block depends on the // conditional variable (Cond). - auto xs = InputTensors(scope, "Cond"); + auto xs = InputTensors(scope, ConditionalOp::kCondition); need_run = ScalarCondition(xs); } else { // When is_scalar_condition is False, the conditional variable maybe a // vector or tensor, whether need to execute the operators in sub-block // depends on the input variables (Input). - auto xs = InputTensors(scope, "Input"); + auto xs = InputTensors(scope, ConditionalOp::kInputs); need_run = std::all_of( xs.begin(), xs.end(), [](const framework::LoDTensor *t) { return t->numel() != 0; }); } if (need_run) { - auto *scope_var = scope.FindVar(Output("Scope")); + auto *scope_var = scope.FindVar(Output(ConditionalOp::kScope)); PADDLE_ENFORCE(scope_var != nullptr, "Must set scope"); auto *scopes = scope_var->GetMutable>(); scopes->resize(1); @@ -55,7 +61,10 @@ class ConditionalBlockOp : public ConditionalOp { framework::Executor exec(dev_place); auto *block = Attr("sub_block"); - exec.Run(*block->Program(), &cur_scope, block->ID(), false); + auto &skip_vars = + Attr>(ConditionalOp::kSkipEagerDeletionVars); + exec.Run(*block->Program(), &cur_scope, block->ID(), false, true, + skip_vars); } } }; @@ -73,17 +82,17 @@ class ConditionalBlockGradOp : public ConditionalOp { const platform::Place &dev_place) const override { bool need_run; if (Attr("is_scalar_condition")) { - auto xs = this->InputTensors(scope, "Cond"); + auto xs = this->InputTensors(scope, ConditionalOp::kCondition); need_run = ScalarCondition(xs); } else { - auto xs = this->InputTensors(scope, "Input"); + auto xs = this->InputTensors(scope, ConditionalOp::kInputs); need_run = std::all_of( xs.begin(), xs.end(), [](const framework::LoDTensor *t) { return t->numel() != 0; }); } if (need_run) { - auto *scope_var = scope.FindVar(Input("Scope")); + auto *scope_var = scope.FindVar(Input(ConditionalOp::kScope)); PADDLE_ENFORCE(scope_var != nullptr, "Must set scope"); auto &scopes = scope_var->Get>(); framework::Scope &cur_scope = *scopes[0]; @@ -91,10 +100,12 @@ class ConditionalBlockGradOp : public ConditionalOp { framework::Executor exec(dev_place); auto *block = Attr("sub_block"); - const auto &ins = Inputs("Input"); - const auto &d_ins = Outputs(framework::GradVarName("Input")); - const auto &conds = Inputs("Cond"); - const auto &d_conds = Outputs(framework::GradVarName("Cond")); + const auto &ins = Inputs(ConditionalOp::kInputs); + const auto &d_ins = + Outputs(framework::GradVarName(ConditionalOp::kInputs)); + const auto &conds = Inputs(ConditionalOp::kCondition); + const auto &d_conds = + Outputs(framework::GradVarName(ConditionalOp::kCondition)); std::vector ins_conds_grads; ins_conds_grads.reserve(ins.size() + conds.size()); @@ -142,15 +153,17 @@ class ConditionalBlockGradOp : public ConditionalOp { class ConditionalBlockGradInferShape : public framework::InferShapeBase { public: void operator()(framework::InferShapeContext *context) const override { - PADDLE_ENFORCE(context->HasInputs("Cond")); - if (context->HasInputs("Input")) { - PADDLE_ENFORCE(context->HasOutputs(framework::GradVarName("Input"))); - context->SetOutputsDim(framework::GradVarName("Input"), - context->GetInputsDim("Input")); + PADDLE_ENFORCE(context->HasInputs(ConditionalOp::kCondition)); + if (context->HasInputs(ConditionalOp::kInputs)) { + PADDLE_ENFORCE( + context->HasOutputs(framework::GradVarName(ConditionalOp::kInputs))); + context->SetOutputsDim(framework::GradVarName(ConditionalOp::kInputs), + context->GetInputsDim(ConditionalOp::kInputs)); } - if (context->HasOutputs(framework::GradVarName("Cond"))) { - context->SetOutputsDim(framework::GradVarName("Cond"), - context->GetInputsDim("Cond")); + if (context->HasOutputs( + framework::GradVarName(ConditionalOp::kCondition))) { + context->SetOutputsDim(framework::GradVarName(ConditionalOp::kCondition), + context->GetInputsDim(ConditionalOp::kCondition)); } } }; @@ -163,15 +176,17 @@ class ConditionalBlockGradMaker : public framework::SingleGradOpDescMaker { std::unique_ptr Apply() const override { auto grad_op = new framework::OpDesc(); grad_op->SetType("conditional_block_grad"); - grad_op->SetInput("Cond", Input("Cond")); - grad_op->SetInput("Input", Input("Input")); - grad_op->SetInput("Out", Output("Out")); - grad_op->SetInput(framework::GradVarName("Out"), OutputGrad("Out")); - grad_op->SetInput("Scope", Output("Scope")); - grad_op->SetOutput(framework::GradVarName("Cond"), - InputGrad("Cond", false)); - grad_op->SetOutput(framework::GradVarName("Input"), - InputGrad("Input", false)); + grad_op->SetInput(ConditionalOp::kCondition, + Input(ConditionalOp::kCondition)); + grad_op->SetInput(ConditionalOp::kInputs, Input(ConditionalOp::kInputs)); + grad_op->SetInput(ConditionalOp::kOutputs, Output(ConditionalOp::kOutputs)); + grad_op->SetInput(framework::GradVarName(ConditionalOp::kOutputs), + OutputGrad(ConditionalOp::kOutputs)); + grad_op->SetInput(ConditionalOp::kScope, Output(ConditionalOp::kScope)); + grad_op->SetOutput(framework::GradVarName(ConditionalOp::kCondition), + InputGrad(ConditionalOp::kCondition, false)); + grad_op->SetOutput(framework::GradVarName(ConditionalOp::kInputs), + InputGrad(ConditionalOp::kInputs, false)); grad_op->SetBlockAttr("sub_block", this->grad_block_[0]); grad_op->SetAttr("is_scalar_condition", GetAttr("is_scalar_condition")); return std::unique_ptr(grad_op); diff --git a/paddle/fluid/operators/controlflow/conditional_block_op.h b/paddle/fluid/operators/controlflow/conditional_block_op.h index 9a079c8453eafc8e3cd6f382fa8122d382d1c595..9d65c33c51c1226b2518225c3e8efdc5b349238b 100644 --- a/paddle/fluid/operators/controlflow/conditional_block_op.h +++ b/paddle/fluid/operators/controlflow/conditional_block_op.h @@ -33,6 +33,12 @@ class ConditionalOp : public framework::OperatorBase { const framework::AttributeMap &attrs) : OperatorBase(type, inputs, outputs, attrs) {} + static const char kInputs[]; + static const char kOutputs[]; + static const char kCondition[]; + static const char kScope[]; + static const char kSkipEagerDeletionVars[]; + protected: std::vector InputTensors( const framework::Scope &scope, const std::string &in_name) const { @@ -78,13 +84,15 @@ class ConditionalOp : public framework::OperatorBase { class ConditionalBlockOpProtoMaker : public framework::OpProtoAndCheckerMaker { public: void Make() override { - AddInput("Cond", + AddInput(ConditionalOp::kCondition, "The conditional variable of this operator. If Cond is empty, the " "whole sub-block will not be executed.") .AsDuplicable(); - AddInput("Input", "The input variables of the sub-block.").AsDuplicable(); - AddOutput("Out", "The output variables of the sub-block.").AsDuplicable(); - AddOutput("Scope", + AddInput(ConditionalOp::kInputs, "The input variables of the sub-block.") + .AsDuplicable(); + AddOutput(ConditionalOp::kOutputs, "The output variables of the sub-block.") + .AsDuplicable(); + AddOutput(ConditionalOp::kScope, "(std::vector) The step scope of conditional block. To " "unify the conditional block, rnn and while op, the type of " "scope is std::vector"); @@ -94,6 +102,10 @@ class ConditionalBlockOpProtoMaker : public framework::OpProtoAndCheckerMaker { "The conditional variable (Cond) is used as scalar " "condition.") .SetDefault(false); + AddAttr>(ConditionalOp::kSkipEagerDeletionVars, + "Vars that would not be deleted when " + "garbage collection strategy enables") + .SetDefault(std::vector()); AddComment(R"DOC(Conditional block operator If `is_scalar_condition` is True, the conditional variable (Cond) is a scalar, diff --git a/paddle/fluid/operators/controlflow/conditional_block_op_helper.cc b/paddle/fluid/operators/controlflow/conditional_block_op_helper.cc new file mode 100644 index 0000000000000000000000000000000000000000..357a9d93b69a4758359e9a68cdec7c286482cc1b --- /dev/null +++ b/paddle/fluid/operators/controlflow/conditional_block_op_helper.cc @@ -0,0 +1,169 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/operators/controlflow/conditional_block_op_helper.h" +#include +#include +#include +#include +#include "paddle/fluid/operators/controlflow/op_variant.h" + +namespace paddle { +namespace operators { + +static bool IsMatchedConditionalBlockOpAndConditionalBlockGradOp( + const OpVariant &fwd_op, const OpVariant &bwd_op) { + return fwd_op.Outputs().at(ConditionalOp::kScope) == + bwd_op.Inputs().at(ConditionalOp::kScope); +} + +static void FindAllConditionalBlockAndConditionalBlockGradOp( + std::vector *fwd_ops, std::vector *bwd_ops) { + PADDLE_ENFORCE_GE(fwd_ops->size(), bwd_ops->size()); + + if (fwd_ops->empty()) return; + + const auto *program = + fwd_ops->front().Attr("sub_block")->Program(); + + for (size_t i = 1; i < program->Size(); ++i) { + auto &block = program->Block(i); + for (size_t j = 0; j < block.OpSize(); ++j) { + auto *op = block.Op(j); + if (op->Type() == "conditional_block") { + fwd_ops->emplace_back(op); + } else if (op->Type() == "conditional_block_grad") { + bwd_ops->emplace_back(op); + } + } + } + + PADDLE_ENFORCE_GE( + fwd_ops->size(), bwd_ops->size(), + "There are extra conditional_block_grad ops in the graph or program"); +} + +static void SetSkipVarsForConditionalBlockOp(OpVariant *fwd_op, + OpVariant *bwd_op) { + auto *grad_block = bwd_op->Attr("sub_block"); + auto is_skippable_in_fwd = [grad_block](const std::string &var_name) { + return var_name != framework::kEmptyVarName && + !grad_block->HasVar(var_name); + }; + + std::unordered_set forward_skip_vars; + for (auto *op_desc : grad_block->AllOps()) { + for (auto &in_arg_name : op_desc->InputArgumentNames()) { + if (is_skippable_in_fwd(in_arg_name)) { + forward_skip_vars.insert(in_arg_name); + } + } + + for (auto &out_arg_name : op_desc->OutputArgumentNames()) { + if (is_skippable_in_fwd(out_arg_name)) { + forward_skip_vars.insert(out_arg_name); + } + } + } + + auto &fwd_attrs = const_cast(fwd_op->Attrs()); + std::vector skip_vars_vec(forward_skip_vars.begin(), + forward_skip_vars.end()); + VLOG(2) << "Prepare to skip " << skip_vars_vec.size() + << " var(s): " << string::join_strings(skip_vars_vec, ' '); + fwd_attrs[ConditionalOp::kSkipEagerDeletionVars] = std::move(skip_vars_vec); +} + +static void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOpImpl( + std::vector *ifelse_ops, + std::vector *ifelse_grad_ops) { + FindAllConditionalBlockAndConditionalBlockGradOp(ifelse_ops, ifelse_grad_ops); + + VLOG(2) << "Found conditional_block op num: " << ifelse_ops->size() + << ", conditional_block_grad op num: " << ifelse_grad_ops->size(); + + if (ifelse_grad_ops->empty()) { + return; + } + + std::unordered_set ifelse_op_set( + ifelse_ops->begin(), ifelse_ops->end()); + + for (auto &bwd_op : *ifelse_grad_ops) { + const OpVariant *matched_fwd_op = nullptr; + for (auto &fwd_op : ifelse_op_set) { + if (IsMatchedConditionalBlockOpAndConditionalBlockGradOp(fwd_op, + bwd_op)) { + PADDLE_ENFORCE(matched_fwd_op == nullptr, + "Found multiple matched conditional_block ops"); + matched_fwd_op = &fwd_op; + } + } + + PADDLE_ENFORCE_NOT_NULL(matched_fwd_op, + "Cannot find matched forward conditional_block op"); + + SetSkipVarsForConditionalBlockOp(const_cast(matched_fwd_op), + &bwd_op); + ifelse_op_set.erase(*matched_fwd_op); + } +} + +void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp( + int block_id, + const std::vector> &all_ops) { + // If block_id is not 0, returns + // This is because all conditional_block_ops and conditional_block_grad_ops + // in the whole program would be processed when block_id is 0 (i.e. + // when Executor::Run() or ParallelExecutor constructs). + + // What's more, all conditional_block_ops and conditional_block_grad_ops + // must be processed when block_id is zero. If not, conditional_block_op + // may run first and erase variables used in conditional_block_grad_op, + // and in this moment, conditional_block_grad_ops may be not constructed yet. + if (block_id != 0) return; + + std::vector fwd_ops, bwd_ops; + for (auto &op : all_ops) { + if (op->Type() == "conditional_block") { + fwd_ops.emplace_back(op.get()); + } else if (op->Type() == "conditional_block_grad") { + bwd_ops.emplace_back(op.get()); + } + } + + PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOpImpl(&fwd_ops, + &bwd_ops); +} + +void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp( + const std::vector &ifelse_ops, + const std::vector &ifelse_grad_ops) { + std::vector fwd_ops, bwd_ops; + fwd_ops.reserve(ifelse_ops.size()); + for (auto *op : ifelse_ops) { + fwd_ops.emplace_back(op); + } + + bwd_ops.reserve(ifelse_grad_ops.size()); + for (auto *op : ifelse_grad_ops) { + bwd_ops.emplace_back(op); + } + + PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOpImpl(&fwd_ops, + &bwd_ops); +} + +} // namespace operators +} // namespace paddle diff --git a/paddle/fluid/operators/controlflow/conditional_block_op_helper.h b/paddle/fluid/operators/controlflow/conditional_block_op_helper.h new file mode 100644 index 0000000000000000000000000000000000000000..572b6ac4e466fd070f3955b0c2379bd1c67d0825 --- /dev/null +++ b/paddle/fluid/operators/controlflow/conditional_block_op_helper.h @@ -0,0 +1,34 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include "paddle/fluid/framework/operator.h" +#include "paddle/fluid/operators/controlflow/conditional_block_op.h" + +namespace paddle { +namespace operators { + +void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp( + int block_id, + const std::vector> &all_ops); + +void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp( + const std::vector &ifelse_ops, + const std::vector &ifelse_grad_ops); + +} // namespace operators +} // namespace paddle diff --git a/paddle/fluid/operators/reader/create_custom_reader_op.cc b/paddle/fluid/operators/reader/create_custom_reader_op.cc index fdc7b0f6a0e8de232865adb70677af80eb08a174..975f7b991f80ee292aa7eb02109ab5e518331726 100644 --- a/paddle/fluid/operators/reader/create_custom_reader_op.cc +++ b/paddle/fluid/operators/reader/create_custom_reader_op.cc @@ -167,7 +167,7 @@ void CustomReader::ReadNextImpl(std::vector* out) { tensor->set_lod(underlying_outs[i].lod()); } // 2. Run the sub-block. - exe_.Run(program_, exe_scope, sub_block_id_, false, true); + exe_.Run(program_, exe_scope, sub_block_id_, false, true, {}, true); // 3. Copy LoDTensors from sink variables to out. out->resize(sink_var_names_.size()); for (size_t i = 0; i < sink_var_names_.size(); ++i) { diff --git a/paddle/fluid/train/test_train_recognize_digits.cc b/paddle/fluid/train/test_train_recognize_digits.cc index a7846da8c191ac96e9ad7fb5b3184518e32120b2..bd2a439f6ab5273b29010cf3599460ea8bdd68d4 100644 --- a/paddle/fluid/train/test_train_recognize_digits.cc +++ b/paddle/fluid/train/test_train_recognize_digits.cc @@ -74,7 +74,8 @@ void Train() { float first_loss = 0.0; float last_loss = 0.0; for (int i = 0; i < 100; ++i) { - executor.Run(*train_program, &scope, 0, false, true); + executor.Run(*train_program, &scope, 0, false, true, + {loss_name, "img", "label"}); if (i == 0) { first_loss = loss_var->Get().data()[0]; } else if (i == 99) { diff --git a/python/paddle/fluid/tests/unittests/test_dist_train.py b/python/paddle/fluid/tests/unittests/test_dist_train.py index d0875d9ea442d0e88dfd958e5948b26225416df2..6c49ee757e6414282cbd656f8185c855a4eb46b1 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_train.py +++ b/python/paddle/fluid/tests/unittests/test_dist_train.py @@ -112,6 +112,7 @@ class TestSendOp(unittest.TestCase): dtype='float32', name='X', append_batch_size=False) + x.persistable = True fluid.initializer.Constant(value=2.3)(x, main.global_block()) get_var = main.global_block().create_var( @@ -121,6 +122,13 @@ class TestSendOp(unittest.TestCase): shape=[32, 32]) fluid.initializer.Constant(value=2.3)(get_var, main.global_block()) + # NOTE(zjl): `Send` is async send, which means that the sent + # variable would be needed even though `Send` op runs. + # Is it a right design? If I do not set `x.persistable = True`, + # this unittest would hang in rpc client after x is deleted. + # + # BTW, `Send` is not a public API to users. So I set + # `x.persistable = True` to be a hot fix of this unittest. Send("127.0.0.1:%d" % port, [x]) o = Recv("127.0.0.1:%d" % port, [get_var])