diff --git a/paddle/fluid/framework/new_executor/interpretercore.cc b/paddle/fluid/framework/new_executor/interpretercore.cc index c792aa393b5196d3f802b077c4e92f1a606eaba6..070230af4d7867dc7b0b8a0c63e41219050b4b09 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.cc +++ b/paddle/fluid/framework/new_executor/interpretercore.cc @@ -638,9 +638,20 @@ void InterpreterCore::Convert( if (var_desc && ins.count(item.first) && !info.IsInArgBufferNeeded(var_desc->Name())) { continue; - } else if (!block_.HasVar(var_scope_.GetNameById(id))) { - VLOG(10) << "[gc_check_inputs] skip gc: " - << var_scope_.GetNameById(id); + } + // skip when this var is not in block and not a data_transferred var, + // which means this var is managed by other block + const auto& var_name = var_scope_.GetNameById(id); + bool not_owned = !block_.HasVar(var_name); + const auto& transferred_vars = var_scope_.DataTransferAddedVars(); + bool not_transferred = + std::all_of(transferred_vars.begin(), + transferred_vars.end(), + [&](const std::pair& elem) { + return elem.first != var_name; + }); + if (not_owned && not_transferred) { + VLOG(10) << "[gc_check_inputs] skip gc: " << var_name; continue; } gc_check_vars.insert(id); @@ -759,7 +770,7 @@ void InterpreterCore::RunOperator(const Instruction& instr_node) { auto place = instr_node.DeviceContext().GetPlace(); Scope* local_scope = HasLocalScope() ? var_scope_.GetMutableLocalScope() : var_scope_.GetMutableScope(); - VLOG(4) << "Start run " << place << " " << op->DebugStringEx(local_scope_); + VLOG(4) << "Start run " << place << " " << op->DebugStringEx(local_scope); SetDeviceId(place); @@ -873,7 +884,7 @@ void InterpreterCore::RunOperator(const Instruction& instr_node) { VLOG(4) << "Check nan/inf"; framework::details::CheckOpHasNanOrInf( *op, - *local_scope_, + *local_scope, place); // TODO(xiongkun03) change it to inner scope. } } diff --git a/paddle/fluid/operators/controlflow/CMakeLists.txt b/paddle/fluid/operators/controlflow/CMakeLists.txt index 07d72297b2b704ed0ba4c278f5d1a63dc70989a7..7f953f031b0164ccd6df8d43aaad2c398f9434ed 100644 --- a/paddle/fluid/operators/controlflow/CMakeLists.txt +++ b/paddle/fluid/operators/controlflow/CMakeLists.txt @@ -3,7 +3,8 @@ if(WITH_UNITY_BUILD) # Load Unity Build rules for operators in paddle/fluid/operators/controlflow. include(unity_build_rule.cmake) endif() -register_operators(EXCLUDES conditional_block_op DEPS naive_executor) +register_operators(EXCLUDES conditional_block_op DEPS naive_executor + standalone_executor) cc_library( conditional_block_op diff --git a/paddle/fluid/operators/controlflow/conditional_block_op.cc b/paddle/fluid/operators/controlflow/conditional_block_op.cc index 6425c3519e94c42cff88d13d82e26d38e0a1fca0..1efc5085165776a276c145d23e8f4c22841cb5e0 100644 --- a/paddle/fluid/operators/controlflow/conditional_block_op.cc +++ b/paddle/fluid/operators/controlflow/conditional_block_op.cc @@ -16,6 +16,7 @@ limitations under the License. */ #include "paddle/fluid/framework/new_executor/standalone_executor.h" #include "paddle/fluid/operators/assign_op.h" +#include "paddle/fluid/operators/controlflow/control_flow_op_helper.h" #include "paddle/fluid/platform/flags.h" #include "paddle/phi/kernels/funcs/math_function.h" @@ -39,43 +40,6 @@ using ExecutorPrepareContext = framework::ExecutorPrepareContext; using InterpreterCore = framework::InterpreterCore; -namespace details { -static void BuildScopeForConditionalBlockOp( - const paddle::framework::InterpreterCore &interpreter_core, - const paddle::framework::BlockDesc &block, - paddle::framework::Scope *scope) { - for (auto &var_desc : block.AllVars()) { - auto var_name = var_desc->Name(); - if (var_name == framework::kEmptyVarName) { - continue; - } - VLOG(5) << "[BuildScopeForConditionalBlockOp]" - << "start:" << var_name; - if (var_desc->Persistable()) { - VLOG(5) << "[BuildScopeForConditionalBlockOp]" - << "Don't process persistent: " << var_name; - } else { - auto *ptr = scope->Var(var_name); - InitializeVariable(ptr, var_desc->GetType()); - VLOG(5) << "[BuildScopeForConditionalBlockOp]" - << "Not Found locally and created: " << var_name; - } - } - - auto &data_transfer_added_vars = - interpreter_core.GetVariableScope()->DataTransferAddedVars(); - for (size_t i = 0; i < data_transfer_added_vars.size(); i++) { - auto *ptr = scope->Var(data_transfer_added_vars[i].first); - InitializeVariable(ptr, - static_cast( - data_transfer_added_vars[i].second)); - VLOG(10) << "[BuildScopeForConditionalBlockOp]" - << "Initialize Transfer Added Variable " - << data_transfer_added_vars[i].first; - } -} -} // namespace details - class ConditionalBlockOp : public ConditionalOp { public: ConditionalBlockOp(const std::string &type, @@ -141,51 +105,53 @@ class ConditionalBlockOp : public ConditionalOp { Attr>(ConditionalOp::kSkipEagerDeletionVars); if (FLAGS_control_flow_use_new_executor) { - std::set skip_gc_vars(skip_vars.begin(), skip_vars.end()); - - if (!core || !platform::is_same_place(core->GetPlace(), dev_place)) { - VLOG(10) << "[interpreterCore cache]" << core.get(); - VLOG_IF(10, core) - << platform::is_same_place(core->GetPlace(), dev_place); - core.reset(new InterpreterCore(dev_place, - *block, - skip_gc_vars, - &cur_scope, - /* used_for_jit */ false, - /* used_for_control_flow_op */ true)); + LOG_FIRST_N(INFO, 1) + << "[ControlFlow][ConditionalBlock] New Executor is Running."; + if (!core_ || !platform::is_same_place(core_->GetPlace(), dev_place)) { + std::set skip_gc_vars(skip_vars.begin(), + skip_vars.end()); + VLOG(10) << "[interpreterCore cache]" << core_.get(); + VLOG_IF(10, core_) + << platform::is_same_place(core_->GetPlace(), dev_place); + core_.reset(new InterpreterCore(dev_place, + *block, + skip_gc_vars, + &cur_scope, + /* used_for_jit */ false, + /* used_for_control_flow_op */ true)); VLOG(10) << "[interpreterCore cache]" - << "new created:" << core; + << "new created:" << core_; } else { - details::BuildScopeForConditionalBlockOp(*core, *block, &cur_scope); - core->reset_scope(&cur_scope); + BuildScopeForControlFlowOp(*core_, *block, &cur_scope); + core_->reset_scope(&cur_scope); } - core->Run({}, false); + core_->Run({}, false); } else { - if (!exec || !platform::is_same_place(exec->GetPlace(), dev_place)) { + if (!exec_ || !platform::is_same_place(exec_->GetPlace(), dev_place)) { auto &pdesc = *block->Program(); - exec.reset(new Executor(dev_place)); - if (FLAGS_use_mkldnn) exec->EnableMKLDNN(pdesc); - ctx = exec->Prepare(pdesc, block->ID(), skip_vars, false); + exec_.reset(new Executor(dev_place)); + if (FLAGS_use_mkldnn) exec_->EnableMKLDNN(pdesc); + ctx_ = exec_->Prepare(pdesc, block->ID(), skip_vars, false); #ifdef PADDLE_WITH_MKLDNN - platform::AttachPointerHashToMKLDNNKey(exec.get(), dev_place); - platform::RegisterModelLayout(ctx->ops_, dev_place); + platform::AttachPointerHashToMKLDNNKey(exec_.get(), dev_place); + platform::RegisterModelLayout(ctx_->ops_, dev_place); #endif } - exec->RunPreparedContext(ctx.get(), - &cur_scope, - /* create_local_scope */ false, - /* create_vars */ true, - /* keep_kids */ true); + exec_->RunPreparedContext(ctx_.get(), + &cur_scope, + /* create_local_scope */ false, + /* create_vars */ true, + /* keep_kids */ true); } } } private: - mutable std::shared_ptr exec{nullptr}; - mutable std::unique_ptr ctx{nullptr}; - mutable std::shared_ptr core{nullptr}; + mutable std::shared_ptr exec_{nullptr}; + mutable std::unique_ptr ctx_{nullptr}; + mutable std::shared_ptr core_{nullptr}; }; class ConditionalBlockInferShape : public framework::InferShapeBase { @@ -251,43 +217,44 @@ class ConditionalBlockGradOp : public ConditionalOp { << ", scope = " << &cur_scope; if (FLAGS_control_flow_use_new_executor) { - std::set skip_gc_vars(inside_grads.begin(), - inside_grads.end()); - - if (!core || !platform::is_same_place(core->GetPlace(), dev_place)) { - VLOG(10) << "[interpreterCore cache]" << core.get(); - VLOG_IF(10, core) - << platform::is_same_place(core->GetPlace(), dev_place); - core.reset(new InterpreterCore(dev_place, - *block, - skip_gc_vars, - &cur_scope, - /* used_for_jit */ false, - /* used_for_control_flow_op */ true)); + LOG_FIRST_N(INFO, 1) + << "[ControlFlow][ConditionalGradBlock] New Executor is Running."; + if (!core_ || !platform::is_same_place(core_->GetPlace(), dev_place)) { + VLOG(10) << "[interpreterCore cache]" << core_.get(); + VLOG_IF(10, core_) + << platform::is_same_place(core_->GetPlace(), dev_place); + std::set skip_gc_vars(inside_grads.begin(), + inside_grads.end()); + core_.reset(new InterpreterCore(dev_place, + *block, + skip_gc_vars, + &cur_scope, + /* used_for_jit */ false, + /* used_for_control_flow_op */ true)); VLOG(10) << "[interpreterCore cache]" - << "new created:" << core; + << "new created:" << core_; } else { - details::BuildScopeForConditionalBlockOp(*core, *block, &cur_scope); - core->reset_scope(&cur_scope); + BuildScopeForControlFlowOp(*core_, *block, &cur_scope); + core_->reset_scope(&cur_scope); } - core->Run({}, false); + core_->Run({}, false); } else { - if (!exec || !platform::is_same_place(exec->GetPlace(), dev_place)) { + if (!exec_ || !platform::is_same_place(exec_->GetPlace(), dev_place)) { auto &pdesc = *block->Program(); - exec.reset(new Executor(dev_place)); - if (FLAGS_use_mkldnn) exec->EnableMKLDNN(pdesc); - ctx = exec->Prepare(pdesc, block->ID(), inside_grads, false); + exec_.reset(new Executor(dev_place)); + if (FLAGS_use_mkldnn) exec_->EnableMKLDNN(pdesc); + ctx_ = exec_->Prepare(pdesc, block->ID(), inside_grads, false); #ifdef PADDLE_WITH_MKLDNN - platform::AttachPointerHashToMKLDNNKey(exec.get(), dev_place); - platform::RegisterModelLayout(ctx->ops_, dev_place); + platform::AttachPointerHashToMKLDNNKey(exec_.get(), dev_place); + platform::RegisterModelLayout(ctx_->ops_, dev_place); #endif } - exec->RunPreparedContext(ctx.get(), - &cur_scope, - /* create_local_scope */ false, - /* create_vars */ true, - /* keep_kids */ true); + exec_->RunPreparedContext(ctx_.get(), + &cur_scope, + /* create_local_scope */ false, + /* create_vars */ true, + /* keep_kids */ true); } AssignLocalGradientToParentScope( @@ -299,9 +266,9 @@ class ConditionalBlockGradOp : public ConditionalOp { } private: - mutable std::shared_ptr exec{nullptr}; - mutable std::unique_ptr ctx{nullptr}; - mutable std::shared_ptr core{nullptr}; + mutable std::shared_ptr exec_{nullptr}; + mutable std::unique_ptr ctx_{nullptr}; + mutable std::shared_ptr core_{nullptr}; private: void AssignLocalGradientToParentScope( diff --git a/paddle/fluid/operators/controlflow/control_flow_op_helper.h b/paddle/fluid/operators/controlflow/control_flow_op_helper.h new file mode 100644 index 0000000000000000000000000000000000000000..82b57831f935618305fdfe5a2e0b52ae7725c7bf --- /dev/null +++ b/paddle/fluid/operators/controlflow/control_flow_op_helper.h @@ -0,0 +1,58 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "paddle/fluid/framework/new_executor/standalone_executor.h" + +namespace paddle { +namespace operators { + +static void BuildScopeForControlFlowOp( + const framework::InterpreterCore &interpreter_core, + const framework::BlockDesc &block, + framework::Scope *scope) { + for (auto &var_desc : block.AllVars()) { + auto var_name = var_desc->Name(); + if (var_name == framework::kEmptyVarName) { + continue; + } + VLOG(5) << "[BuildScopeForControlFlowOp]" + << "start:" << var_name; + if (var_desc->Persistable()) { + VLOG(5) << "[BuildScopeForControlFlowOp]" + << "Don't process persistent: " << var_name; + } else { + auto *ptr = scope->Var(var_name); + InitializeVariable(ptr, var_desc->GetType()); + VLOG(5) << "[BuildScopeForControlFlowOp]" + << "Not Found locally and created: " << var_name; + } + } + + auto &data_transfer_added_vars = + interpreter_core.GetVariableScope()->DataTransferAddedVars(); + for (size_t i = 0; i < data_transfer_added_vars.size(); i++) { + auto *ptr = scope->Var(data_transfer_added_vars[i].first); + InitializeVariable(ptr, + static_cast( + data_transfer_added_vars[i].second)); + VLOG(5) << "[BuildScopeForControlFlowOp]" + << "Initialize Transfer Added Variable " + << data_transfer_added_vars[i].first; + } +} + +} // namespace operators +} // namespace paddle diff --git a/paddle/fluid/operators/controlflow/while_op.cc b/paddle/fluid/operators/controlflow/while_op.cc index cc6bb72324e57ca94bb5bd9a4eecfcfacf683c84..5fe51425dc44e761bce9a11433b5842ec5a25aa6 100644 --- a/paddle/fluid/operators/controlflow/while_op.cc +++ b/paddle/fluid/operators/controlflow/while_op.cc @@ -13,8 +13,10 @@ // limitations under the License. #include "paddle/fluid/framework/executor.h" +#include "paddle/fluid/framework/new_executor/standalone_executor.h" #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/operator.h" +#include "paddle/fluid/operators/controlflow/control_flow_op_helper.h" #include "paddle/fluid/operators/controlflow/while_op_helper.h" #ifdef PADDLE_WITH_MKLDNN @@ -44,6 +46,41 @@ static std::string GetSkipEagerDeletionVarsDebugString( } return str; } + +static void TransferVariablePlace(const framework::Scope *scope, + const std::string &var_name, + const phi::Place &dst_place, + const platform::DeviceContext &dev_ctx) { + framework::Variable *var = scope->FindVar(var_name); + if (var == nullptr) { + VLOG(4) << "[TransferVariablePlace]" + << "lost in_var: " << var_name; + return; + } + if (var->Type() != framework::proto::VarType::LOD_TENSOR) { + VLOG(10) << "[TransferVariablePlace]" << var_name << " type changed:" + << framework::TransToPhiDataType( + framework::ToVarType(var->Type())); + return; + } + phi::DenseTensor *t = var->GetMutable(); + if (t->place() == dst_place) { + VLOG(10) << "[TransferVariablePlace]" + << "no need transfer: " << var_name; + return; + } + + phi::DenseTensor *new_t = new phi::DenseTensor; + framework::TensorCopy(*t, dst_place, new_t); + dev_ctx.Wait(); + + t->set_meta(new_t->meta()); + t->ResetHolder(new_t->Holder()); + + VLOG(4) << "[TransferVariablePlace]" << var_name + << " place: " << new_t->place(); +} + } // namespace class WhileOp : public framework::OperatorBase { @@ -77,9 +114,12 @@ class WhileOp : public framework::OperatorBase { // Executors (executors declared inside control ops) platform::DontClearMKLDNNCache(dev_place); #endif - framework::Executor executor(dev_place); auto *block = Attr(kStepBlock); + // get device context from pool + platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); + auto &dev_ctx = *pool.Get(dev_place); + auto *program = block->Program(); bool is_test = Attr("is_test"); @@ -134,7 +174,53 @@ class WhileOp : public framework::OperatorBase { auto &skip_vars = Attr>(kSkipEagerDeletionVars); VLOG(2) << GetSkipEagerDeletionVarsDebugString(skip_vars); - auto ctx = executor.Prepare(*program, block->ID(), skip_vars); + // note(lvyongkang): The assign op in while loop may change the place of + // variable. However, InterpreterCore fix the kernel of every ops during its + // first run. A cpu tensor may become gpu tensor after first run. This will + // lead to segmetation fault when it's used in a cpu kernel. Here we record + // the place of every inputs and restore their place after + // InterpreterCore.run(). + std::map input_var_original_places; + for (const auto &in_name : Inputs(kX)) { + framework::Variable *var = scope.FindVar(in_name); + if (var == nullptr) { + VLOG(4) << "[while op]" + << "input not found:" << in_name; + } + + if (var->Type() == framework::proto::VarType::LOD_TENSOR) { + input_var_original_places[in_name] = + (var->Get()).place(); + } else { + VLOG(10) << "[while op]" + << "skip backup input " << in_name << " type:" + << framework::TransToPhiDataType( + framework::ToVarType(var->Type())); + } + } + + if (FLAGS_control_flow_use_new_executor) { + LOG_FIRST_N(INFO, 1) << "[ControlFlow][WhileOp] New Executor is Running."; + if (!core_ || !platform::is_same_place(core_->GetPlace(), dev_place)) { + std::set skip_gc_vars(skip_vars.begin(), skip_vars.end()); + framework::Scope placeholder; // Don't care if it's valid, just for + // initialize InterpreterCore + core_.reset(new framework::InterpreterCore( + dev_place, + *block, + skip_gc_vars, + &placeholder, + /* used_for_jit */ false, + /* used_for_control_flow_op */ true)); + } + } else { + if (!executor_ || + !platform::is_same_place(executor_->GetPlace(), dev_place)) { + executor_.reset(new framework::Executor(dev_place)); + ctx_ = executor_->Prepare(*program, block->ID(), skip_vars); + } + } + if (!is_test) { while (cond_data) { auto ¤t_scope = scope.NewScope(); @@ -158,8 +244,23 @@ class WhileOp : public framework::OperatorBase { } } } - executor.RunPreparedContext( - ctx.get(), ¤t_scope, false, true, true); + if (FLAGS_control_flow_use_new_executor) { + BuildScopeForControlFlowOp(*core_, *block, ¤t_scope); + core_->reset_scope(¤t_scope); + core_->Run({}, false); + + // restore inputs place + for (const auto &n : input_var_original_places) { + const std::string &in_name = n.first; + const phi::Place &original_place = n.second; + // input vars exist in `scope` not `current_scope` + TransferVariablePlace(&scope, in_name, original_place, dev_ctx); + } + + } else { + executor_->RunPreparedContext( + ctx_.get(), ¤t_scope, false, true, true); + } for (auto &var_rename : rename_vars) { std::string input_var_name = @@ -171,7 +272,14 @@ class WhileOp : public framework::OperatorBase { } } else { auto ¤t_scope = scope.NewScope(); - executor.CreateVariables(*program, ¤t_scope, block->ID()); + + if (FLAGS_control_flow_use_new_executor) { + BuildScopeForControlFlowOp(*core_, *block, ¤t_scope); + core_->reset_scope(¤t_scope); + } else { + executor_->CreateVariables(*program, ¤t_scope, block->ID()); + } + while (cond_data) { for (auto &name : current_scope.LocalVarNames()) { auto *var = current_scope.Var(name); @@ -186,14 +294,25 @@ class WhileOp : public framework::OperatorBase { t->clear(); } } - executor.RunPreparedContext( - ctx.get(), ¤t_scope, false, false, false); + + if (FLAGS_control_flow_use_new_executor) { + core_->Run({}, false); + } else { + executor_->RunPreparedContext( + ctx_.get(), ¤t_scope, false, false, false); + } + cond_data = GetCondData( scope.FindVar(Input(kCondition))->Get()); } scope.DeleteScope(¤t_scope); } } + + private: + mutable std::shared_ptr executor_{nullptr}; + mutable std::unique_ptr ctx_{nullptr}; + mutable std::shared_ptr core_{nullptr}; }; class WhileOpMaker : public framework::OpProtoAndCheckerMaker { @@ -245,13 +364,12 @@ class WhileGradOp : public framework::OperatorBase { // get device context from pool platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); auto &dev_ctx = *pool.Get(dev_place); - framework::Executor executor(dev_place); + auto *block = Attr(kStepBlock); auto *program = block->Program(); auto &skip_vars = Attr>(kSkipEagerDeletionVars); VLOG(2) << GetSkipEagerDeletionVarsDebugString(skip_vars); - auto ctx = executor.Prepare(*program, block->ID(), skip_vars); auto *step_scopes = scope.FindVar(Input(kStepScopes))->GetMutable(); @@ -271,6 +389,29 @@ class WhileGradOp : public framework::OperatorBase { outside_og_names.size(), inside_og_names.size())); + if (FLAGS_control_flow_use_new_executor) { + LOG_FIRST_N(INFO, 1) + << "[ControlFlow][WhileGradOp] New Executor is Running."; + if (!core_ || !platform::is_same_place(core_->GetPlace(), dev_place)) { + std::set skip_gc_vars(skip_vars.begin(), skip_vars.end()); + framework::Scope placeholder; // Don't care if it's valid, just for + // initialize InterpreterCore + core_.reset(new framework::InterpreterCore( + dev_place, + *block, + skip_gc_vars, + &placeholder, + /* used_for_jit */ false, + /* used_for_control_flow_op */ true)); + } + } else { + if (!executor_ || + !platform::is_same_place(executor_->GetPlace(), dev_place)) { + executor_.reset(new framework::Executor(dev_place)); + ctx_ = executor_->Prepare(*program, block->ID(), skip_vars); + } + } + for (auto cur_scope_iter = step_scopes->rbegin(); cur_scope_iter != step_scopes->rend(); ++cur_scope_iter) { @@ -330,8 +471,15 @@ class WhileGradOp : public framework::OperatorBase { "WhileGradOp.")); } } - executor.RunPreparedContext( - ctx.get(), *cur_scope_iter, false, true, true); + + if (FLAGS_control_flow_use_new_executor) { + BuildScopeForControlFlowOp(*core_, *block, *cur_scope_iter); + core_->reset_scope(*cur_scope_iter); + core_->Run({}, false); + } else { + executor_->RunPreparedContext( + ctx_.get(), *cur_scope_iter, false, true, true); + } // The Outputs(kXGRAD) contains the names of the gradient of parameters // and inputs. @@ -446,6 +594,11 @@ class WhileGradOp : public framework::OperatorBase { } step_scopes->clear(); } + + private: + mutable std::shared_ptr executor_{nullptr}; + mutable std::unique_ptr ctx_{nullptr}; + mutable std::shared_ptr core_{nullptr}; }; template