未验证 提交 c4de1277 编写于 作者: R Ruibiao Chen 提交者: GitHub

Add ExecutionConfig and fix last-live-op bug for standalone executor (#46405)

* Add ExecutionConfig and fix last-live-op bug for standalone executor

* Improve code design
上级 1d04e021
...@@ -14,6 +14,7 @@ set(STANDALONE_EXECUTOR_SRCS ...@@ -14,6 +14,7 @@ set(STANDALONE_EXECUTOR_SRCS
set(STANDALONE_EXECUTOR_DEPS set(STANDALONE_EXECUTOR_DEPS
dependency_builder dependency_builder
device_context device_context
execution_config
op_registry op_registry
scope scope
framework_proto framework_proto
......
...@@ -2,3 +2,8 @@ cc_library( ...@@ -2,3 +2,8 @@ cc_library(
dependency_builder dependency_builder
SRCS dependency_builder.cc SRCS dependency_builder.cc
DEPS operator) DEPS operator)
cc_library(
execution_config
SRCS execution_config.cc
DEPS phi_backends)
...@@ -16,16 +16,6 @@ ...@@ -16,16 +16,6 @@
#include <queue> #include <queue>
// The difference between "sequential_run" and "serial_run":
// "sequential_run" dispatches OPs one by one according to the sequence in the
// Program, while "serial_run" ensures that all Ops are scheduled in a singal
// thread. In standalone executor, "sequential_run" is also "serial_run", while
// "serial_run" is not necessarily "sequential_run".
PADDLE_DEFINE_EXPORTED_bool(new_executor_sequential_run,
false,
"Enable sequential execution for standalone "
"executor, only applied to GPU OPs.");
namespace paddle { namespace paddle {
namespace framework { namespace framework {
namespace interpreter { namespace interpreter {
...@@ -67,7 +57,7 @@ const std::string StringizeDownstreamMap( ...@@ -67,7 +57,7 @@ const std::string StringizeDownstreamMap(
} }
const std::map<int, std::set<int>>& DependencyBuilder::Build( const std::map<int, std::set<int>>& DependencyBuilder::Build(
const std::vector<Instruction>& instructions) { const std::vector<Instruction>& instructions, bool is_sequential_run) {
PADDLE_ENFORCE_EQ( PADDLE_ENFORCE_EQ(
is_build_, is_build_,
false, false,
...@@ -85,7 +75,7 @@ const std::map<int, std::set<int>>& DependencyBuilder::Build( ...@@ -85,7 +75,7 @@ const std::map<int, std::set<int>>& DependencyBuilder::Build(
AddDependencyForRandomOp(); AddDependencyForRandomOp();
AddDependencyForReadOp(); AddDependencyForReadOp();
if (FLAGS_new_executor_sequential_run) { if (is_sequential_run) {
AddDependencyForSequentialRun(); AddDependencyForSequentialRun();
} }
...@@ -505,7 +495,7 @@ void DependencyBuilder::BuildOpHappensBefore() { ...@@ -505,7 +495,7 @@ void DependencyBuilder::BuildOpHappensBefore() {
next, next,
op_idx)); op_idx));
op_happens_before_[op_idx][next] = true; op_happens_before_[op_idx][next] = true;
VLOG(8) << "happens before: " << op_idx << " " << next; VLOG(10) << "happens before: " << op_idx << " " << next;
q.push(next); q.push(next);
} }
} }
......
...@@ -36,7 +36,7 @@ class DependencyBuilder { ...@@ -36,7 +36,7 @@ class DependencyBuilder {
// build op dependencies and return the mapping from op to its downstream-op // build op dependencies and return the mapping from op to its downstream-op
// set // set
const std::map<int, std::set<int>>& Build( const std::map<int, std::set<int>>& Build(
const std::vector<Instruction>& instructions); const std::vector<Instruction>& instructions, bool is_sequential_run);
bool OpHappensBefore(int prior_op_idx, int posterior_op_idx); bool OpHappensBefore(int prior_op_idx, int posterior_op_idx);
......
...@@ -12,12 +12,13 @@ ...@@ -12,12 +12,13 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#pragma once #include "paddle/fluid/framework/new_executor/interpreter/execution_config.h"
#include <set>
#include <thread> #include <thread>
#include "paddle/fluid/platform/device/ipu/ipu_info.h" #include "paddle/fluid/platform/device/ipu/ipu_info.h"
#include "paddle/fluid/platform/device/npu/npu_info.h" #include "paddle/fluid/platform/device/npu/npu_info.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/backends/device_manager.h" #include "paddle/phi/backends/device_manager.h"
#include "paddle/phi/backends/gpu/gpu_info.h" #include "paddle/phi/backends/gpu/gpu_info.h"
#include "paddle/phi/backends/xpu/xpu_info.h" #include "paddle/phi/backends/xpu/xpu_info.h"
...@@ -48,7 +49,7 @@ static constexpr size_t kMinOpNumForAsyncPrepare = 1000; ...@@ -48,7 +49,7 @@ static constexpr size_t kMinOpNumForAsyncPrepare = 1000;
// Note that the purpose of the config is to limit the total 'possible' // Note that the purpose of the config is to limit the total 'possible'
// threads introduced by interpretercore to avoid hurting performance. // threads introduced by interpretercore to avoid hurting performance.
inline std::tuple<int, int, int> GetThreadPoolConfig(const phi::Place place, inline std::tuple<int, int, int> GetThreadPoolConfig(const phi::Place& place,
size_t op_num) { size_t op_num) {
int num_device_threads = kDeviceNumThreads, int num_device_threads = kDeviceNumThreads,
num_host_threads = kHostNumThreads, num_host_threads = kHostNumThreads,
...@@ -131,6 +132,24 @@ inline std::tuple<int, int, int> GetThreadPoolConfig(const phi::Place place, ...@@ -131,6 +132,24 @@ inline std::tuple<int, int, int> GetThreadPoolConfig(const phi::Place place,
num_host_threads, num_device_threads, num_prepare_threads); num_host_threads, num_device_threads, num_prepare_threads);
} }
ExecutionConfig::ExecutionConfig(const phi::Place& place, size_t op_num) {
std::tie(host_num_threads, deivce_num_threads, prepare_num_threads) =
GetThreadPoolConfig(place, op_num);
}
void ExecutionConfig::Log(int log_level) {
VLOG(log_level) << "ExecutionConfig:";
VLOG(log_level) << "used_for_jit = " << used_for_jit;
VLOG(log_level) << "create_local_scope = " << create_local_scope;
VLOG(log_level) << "host_num_threads = " << host_num_threads;
VLOG(log_level) << "deivce_num_threads = " << deivce_num_threads;
VLOG(log_level) << "prepare_num_threads = " << prepare_num_threads;
VLOG(log_level) << "skip_gc_vars = ";
for (const std::string& var : skip_gc_vars) {
VLOG(log_level) << var;
}
}
} // namespace interpreter } // namespace interpreter
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <set>
#include <string>
#include "paddle/fluid/platform/place.h"
namespace paddle {
namespace framework {
namespace interpreter {
struct ExecutionConfig {
bool used_for_jit{false};
bool create_local_scope{true};
size_t host_num_threads;
size_t deivce_num_threads;
size_t prepare_num_threads;
std::set<std::string> skip_gc_vars;
ExecutionConfig(const phi::Place& place, size_t op_num);
void Log(int log_level);
};
} // namespace interpreter
} // namespace framework
} // namespace paddle
...@@ -19,7 +19,6 @@ ...@@ -19,7 +19,6 @@
#include "paddle/fluid/framework/details/nan_inf_utils.h" #include "paddle/fluid/framework/details/nan_inf_utils.h"
#include "paddle/fluid/framework/details/share_tensor_buffer_functor.h" #include "paddle/fluid/framework/details/share_tensor_buffer_functor.h"
#include "paddle/fluid/framework/new_executor/interpretercore_util.h" #include "paddle/fluid/framework/new_executor/interpretercore_util.h"
#include "paddle/fluid/framework/new_executor/threadpool_config.h"
#include "paddle/fluid/framework/operator.h" #include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/platform/device/gpu/gpu_info.h" #include "paddle/fluid/platform/device/gpu/gpu_info.h"
#include "paddle/fluid/platform/os_info.h" #include "paddle/fluid/platform/os_info.h"
...@@ -32,6 +31,15 @@ ...@@ -32,6 +31,15 @@
#endif #endif
#include "paddle/phi/backends/device_manager.h" #include "paddle/phi/backends/device_manager.h"
// The difference between "sequential_run" and "serial_run":
// "sequential_run" dispatches OPs one by one according to the sequence in the
// Program, while "serial_run" ensures that all Ops are scheduled in a singal
// thread. In standalone executor, "sequential_run" is also "serial_run", while
// "serial_run" is not necessarily "sequential_run".
PADDLE_DEFINE_EXPORTED_bool(new_executor_sequential_run,
false,
"Enable sequential execution for standalone "
"executor, only applied to GPU OPs.");
PADDLE_DEFINE_EXPORTED_bool(new_executor_use_inplace, PADDLE_DEFINE_EXPORTED_bool(new_executor_use_inplace,
false, false,
"Use inplace in new executor"); "Use inplace in new executor");
...@@ -49,6 +57,8 @@ constexpr const char* kTaskCompletion = "TaskCompletion"; ...@@ -49,6 +57,8 @@ constexpr const char* kTaskCompletion = "TaskCompletion";
namespace paddle { namespace paddle {
namespace framework { namespace framework {
// TODO(Ruibia): Pass skip_gc_vars, used_for_jit, and other config messages by
// constructing an interpreter::ExecutionConfig
InterpreterCore::InterpreterCore(const platform::Place& place, InterpreterCore::InterpreterCore(const platform::Place& place,
const BlockDesc& block, const BlockDesc& block,
const std::set<std::string>& skip_gc_vars, const std::set<std::string>& skip_gc_vars,
...@@ -56,10 +66,9 @@ InterpreterCore::InterpreterCore(const platform::Place& place, ...@@ -56,10 +66,9 @@ InterpreterCore::InterpreterCore(const platform::Place& place,
bool used_for_jit) bool used_for_jit)
: place_(place), : place_(place),
block_(block), block_(block),
skip_gc_vars_(skip_gc_vars), execution_config_(place, block.OpSize()),
var_scope_(scope), var_scope_(scope),
stream_analyzer_(place), stream_analyzer_(place) {
used_for_jit_(used_for_jit) {
VLOG(4) << "InterpreterCore(): " << this << " on " << place_; VLOG(4) << "InterpreterCore(): " << this << " on " << place_;
is_build_ = false; is_build_ = false;
...@@ -67,14 +76,13 @@ InterpreterCore::InterpreterCore(const platform::Place& place, ...@@ -67,14 +76,13 @@ InterpreterCore::InterpreterCore(const platform::Place& place,
exception_notifier_ = main_thread_blocker_.RegisterEvent(kExceptionCaught); exception_notifier_ = main_thread_blocker_.RegisterEvent(kExceptionCaught);
completion_notifier_ = main_thread_blocker_.RegisterEvent(kTaskCompletion); completion_notifier_ = main_thread_blocker_.RegisterEvent(kTaskCompletion);
create_local_scope_ = FLAGS_new_executor_use_local_scope; execution_config_.used_for_jit = used_for_jit;
execution_config_.create_local_scope =
if (used_for_jit_) { !used_for_jit && FLAGS_new_executor_use_local_scope;
create_local_scope_ = false; execution_config_.skip_gc_vars = skip_gc_vars;
} execution_config_.Log(/*log_level=*/8);
VLOG(4) << "create_local_scope_ is " << create_local_scope_;
if (create_local_scope_) { if (execution_config_.create_local_scope) {
auto local_scope = &var_scope_.GetMutableScope()->NewScope(); auto local_scope = &var_scope_.GetMutableScope()->NewScope();
local_scope_ = local_scope; local_scope_ = local_scope;
} }
...@@ -127,7 +135,7 @@ interpreter::CostInfo InterpreterCore::DryRun( ...@@ -127,7 +135,7 @@ interpreter::CostInfo InterpreterCore::DryRun(
platform::DeviceContextPool::Instance().Get(place_)->Wait(); platform::DeviceContextPool::Instance().Get(place_)->Wait();
} }
if (create_local_scope_) { if (execution_config_.create_local_scope) {
ClearLoDTensorArrayInLocalScope(); ClearLoDTensorArrayInLocalScope();
} }
...@@ -173,7 +181,7 @@ paddle::framework::FetchList InterpreterCore::Run( ...@@ -173,7 +181,7 @@ paddle::framework::FetchList InterpreterCore::Run(
} }
#endif #endif
} }
if (create_local_scope_) { if (execution_config_.create_local_scope) {
ClearLoDTensorArrayInLocalScope(); ClearLoDTensorArrayInLocalScope();
} }
...@@ -201,16 +209,17 @@ paddle::framework::FetchList InterpreterCore::Run( ...@@ -201,16 +209,17 @@ paddle::framework::FetchList InterpreterCore::Run(
if (!is_build_) { if (!is_build_) {
LOG_FIRST_N(INFO, 1) << "New Executor is Running."; LOG_FIRST_N(INFO, 1) << "New Executor is Running.";
paddle::framework::interpreter::build_variable_scope( paddle::framework::interpreter::build_variable_scope(
block_, &var_scope_, create_local_scope_); block_, &var_scope_, execution_config_.create_local_scope);
std::vector<paddle::framework::OpFuncNode> op_func_nodes; std::vector<paddle::framework::OpFuncNode> op_func_nodes;
paddle::framework::interpreter::build_op_func_list(place_, paddle::framework::interpreter::build_op_func_list(
block_, place_,
skip_gc_vars_, block_,
&op_func_nodes, execution_config_.skip_gc_vars,
&var_scope_, &op_func_nodes,
create_local_scope_, &var_scope_,
used_for_jit_); execution_config_.create_local_scope,
execution_config_.used_for_jit);
is_build_ = true; is_build_ = true;
SetFeedVarsInplaceSkip(feed_names); SetFeedVarsInplaceSkip(feed_names);
// convert vec func_list to graph // convert vec func_list to graph
...@@ -239,12 +248,13 @@ paddle::framework::FetchList InterpreterCore::Run( ...@@ -239,12 +248,13 @@ paddle::framework::FetchList InterpreterCore::Run(
#endif #endif
} }
if (create_local_scope_) { if (execution_config_.create_local_scope) {
ClearLoDTensorArrayInLocalScope(); ClearLoDTensorArrayInLocalScope();
} }
// return Fetch Tensors // return Fetch Tensors
Scope* inner_scope = Scope* inner_scope = execution_config_.create_local_scope
create_local_scope_ ? local_scope_ : var_scope_.GetMutableScope(); ? local_scope_
: var_scope_.GetMutableScope();
auto* fetch_var = inner_scope->FindVar(interpreter::kFetchVarName); auto* fetch_var = inner_scope->FindVar(interpreter::kFetchVarName);
if (fetch_var) { if (fetch_var) {
return std::move(*fetch_var->GetMutable<framework::FetchList>()); return std::move(*fetch_var->GetMutable<framework::FetchList>());
...@@ -259,12 +269,13 @@ void InterpreterCore::SetCopyProgram(std::shared_ptr<ProgramDesc> prog) { ...@@ -259,12 +269,13 @@ void InterpreterCore::SetCopyProgram(std::shared_ptr<ProgramDesc> prog) {
void InterpreterCore::SetSkipGcVars(const std::set<std::string>& skip_gc_vars) { void InterpreterCore::SetSkipGcVars(const std::set<std::string>& skip_gc_vars) {
PADDLE_ENFORCE_EQ( PADDLE_ENFORCE_EQ(
skip_gc_vars_.empty(), execution_config_.skip_gc_vars.empty(),
true, true,
platform::errors::PreconditionNotMet( platform::errors::PreconditionNotMet(
"Skip_gc_vars_ can only be initialized once, now skip_gc_vars_ is " "execution_config_.skip_gc_vars can only be initialized once, now "
"execution_config_.skip_gc_vars is "
"not empty, do not call SetSkipGcVars method repeatedly.")); "not empty, do not call SetSkipGcVars method repeatedly."));
skip_gc_vars_ = skip_gc_vars; execution_config_.skip_gc_vars = skip_gc_vars;
} }
const VariableScope* InterpreterCore::GetVariableScope() const { const VariableScope* InterpreterCore::GetVariableScope() const {
...@@ -288,12 +299,13 @@ void InterpreterCore::ShareWorkQueueFrom(std::shared_ptr<InterpreterCore> src) { ...@@ -288,12 +299,13 @@ void InterpreterCore::ShareWorkQueueFrom(std::shared_ptr<InterpreterCore> src) {
<< ") to InterpreterCore(" << this << ")"; << ") to InterpreterCore(" << this << ")";
} }
bool InterpreterCore::BuildInplaceCheckVarIsOnlyInput(size_t var_index) { bool InterpreterCore::BuildInplaceCheckVarIsOnlyInput(
const std::vector<std::vector<size_t>>& input_var2op, size_t var_index) {
if (!var_scope_.VarDesc(var_index)) { if (!var_scope_.VarDesc(var_index)) {
return input_var2op_info_.at(var_index).size() == 1; return input_var2op.at(var_index).size() == 1;
} else { } else {
int is_input_cnt = 0; int is_input_cnt = 0;
for (auto inst_id : input_var2op_info_.at(var_index)) { for (auto inst_id : input_var2op.at(var_index)) {
OpInOutInfo info; OpInOutInfo info;
info.Build(vec_instruction_.at(inst_id).OpBase()); info.Build(vec_instruction_.at(inst_id).OpBase());
if (info.IsInArgBufferNeeded(var_scope_.VarDesc(var_index)->Name())) { if (info.IsInArgBufferNeeded(var_scope_.VarDesc(var_index)->Name())) {
...@@ -306,21 +318,19 @@ bool InterpreterCore::BuildInplaceCheckVarIsOnlyInput(size_t var_index) { ...@@ -306,21 +318,19 @@ bool InterpreterCore::BuildInplaceCheckVarIsOnlyInput(size_t var_index) {
std::shared_ptr<interpreter::AsyncWorkQueue> InterpreterCore::GetWorkQueue() { std::shared_ptr<interpreter::AsyncWorkQueue> InterpreterCore::GetWorkQueue() {
if (async_work_queue_ == nullptr) { if (async_work_queue_ == nullptr) {
int host_num_threads = 1, deivce_num_threads = 1, prepare_num_threads = 1; async_work_queue_ = std::make_shared<interpreter::AsyncWorkQueue>(
std::tie(host_num_threads, deivce_num_threads, prepare_num_threads) = execution_config_.host_num_threads,
interpreter::GetThreadPoolConfig(place_, vec_instruction_.size()); execution_config_.deivce_num_threads,
async_work_queue_ = execution_config_.prepare_num_threads,
std::make_shared<interpreter::AsyncWorkQueue>(host_num_threads, &main_thread_blocker_);
deivce_num_threads,
prepare_num_threads,
&main_thread_blocker_);
} }
return async_work_queue_; return async_work_queue_;
} }
void InterpreterCore::BuildAndCacheInstructionCtx(Instruction* instr_node) { void InterpreterCore::BuildAndCacheInstructionCtx(Instruction* instr_node) {
Scope* inner_scope = Scope* inner_scope = execution_config_.create_local_scope
create_local_scope_ ? local_scope_ : var_scope_.GetMutableScope(); ? local_scope_
: var_scope_.GetMutableScope();
VariableValueMap ins_map; VariableValueMap ins_map;
for (auto& var_name_item : instr_node->Inputs()) { for (auto& var_name_item : instr_node->Inputs()) {
std::vector<Variable*> input_vars; std::vector<Variable*> input_vars;
...@@ -346,8 +356,9 @@ void InterpreterCore::BuildAndCacheInstructionCtx(Instruction* instr_node) { ...@@ -346,8 +356,9 @@ void InterpreterCore::BuildAndCacheInstructionCtx(Instruction* instr_node) {
// set runtime_ctx and infershape_ctx_ // set runtime_ctx and infershape_ctx_
if (instr_node->OpBase()->Type() == "cinn_launch") { // OP use scope in if (instr_node->OpBase()->Type() == "cinn_launch") { // OP use scope in
// kernel // kernel
Scope* local_scope = create_local_scope_ ? var_scope_.GetMutableLocalScope() Scope* local_scope = execution_config_.create_local_scope
: var_scope_.GetMutableScope(); ? var_scope_.GetMutableLocalScope()
: var_scope_.GetMutableScope();
instr_node->ResetContextWithScope(ins_map, outs_map, *local_scope); instr_node->ResetContextWithScope(ins_map, outs_map, *local_scope);
} else { } else {
instr_node->ResetContext(ins_map, outs_map); instr_node->ResetContext(ins_map, outs_map);
...@@ -377,8 +388,19 @@ void InterpreterCore::BuildInplace() { ...@@ -377,8 +388,19 @@ void InterpreterCore::BuildInplace() {
} }
} }
Scope* local_scope = create_local_scope_ ? var_scope_.GetMutableLocalScope() Scope* local_scope = execution_config_.create_local_scope
: var_scope_.GetMutableScope(); ? var_scope_.GetMutableLocalScope()
: var_scope_.GetMutableScope();
std::vector<std::vector<size_t>> input_var2op(var_scope_.VarSize());
for (Instruction& instr : vec_instruction_) {
for (auto& item : instr.Inputs()) {
for (int var_id : item.second) {
if (var_id != kEmptyVarIndex) {
input_var2op.at(var_id).push_back(instr.Id());
}
}
}
}
for (size_t i = 0; i < vec_instruction_.size(); ++i) { for (size_t i = 0; i < vec_instruction_.size(); ++i) {
auto& instr = vec_instruction_[i]; auto& instr = vec_instruction_[i];
...@@ -402,7 +424,7 @@ void InterpreterCore::BuildInplace() { ...@@ -402,7 +424,7 @@ void InterpreterCore::BuildInplace() {
if (var_scope_.GetVarSikpInplace(iter->second[0])) { if (var_scope_.GetVarSikpInplace(iter->second[0])) {
continue; continue;
} }
if (BuildInplaceCheckVarIsOnlyInput(iter->second[0])) { if (BuildInplaceCheckVarIsOnlyInput(input_var2op, iter->second[0])) {
auto iterout = outputs.find(pair.second); auto iterout = outputs.find(pair.second);
if (iterout != outputs.end() && !iterout->second.empty()) { if (iterout != outputs.end() && !iterout->second.empty()) {
const std::string& invar_name = const std::string& invar_name =
...@@ -432,7 +454,9 @@ void InterpreterCore::BuildOperatorDependences() { ...@@ -432,7 +454,9 @@ void InterpreterCore::BuildOperatorDependences() {
// Schedule // Schedule
auto op_nums = vec_instruction_.size(); auto op_nums = vec_instruction_.size();
dependecy_count_.resize(op_nums); dependecy_count_.resize(op_nums);
auto op2downstream = dependency_builder_.Build(vec_instruction_); auto op2downstream = dependency_builder_.Build(
vec_instruction_,
/*is_sequential_run=*/FLAGS_new_executor_sequential_run);
for (size_t op = 0; op < vec_instruction_.size(); ++op) { for (size_t op = 0; op < vec_instruction_.size(); ++op) {
auto op_list = op2downstream[op]; auto op_list = op2downstream[op];
std::vector<size_t> downsteam_vector(op_list.begin(), op_list.end()); std::vector<size_t> downsteam_vector(op_list.begin(), op_list.end());
...@@ -460,10 +484,7 @@ void InterpreterCore::ClearLoDTensorArrayInLocalScope() { ...@@ -460,10 +484,7 @@ void InterpreterCore::ClearLoDTensorArrayInLocalScope() {
void InterpreterCore::Convert( void InterpreterCore::Convert(
std::vector<paddle::framework::OpFuncNode>* op_func_nodes) { std::vector<paddle::framework::OpFuncNode>* op_func_nodes) {
auto& vec_meta_info = var_scope_.MutableVecMetaInfo(); auto& vec_meta_info = var_scope_.MutableVecMetaInfo();
auto var_nums = var_scope_.VarSize();
input_var2op_info_.resize(var_nums);
auto nodes = *op_func_nodes; auto nodes = *op_func_nodes;
auto op_nums = nodes.size(); auto op_nums = nodes.size();
vec_instruction_.reserve(op_nums); vec_instruction_.reserve(op_nums);
for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) { for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) {
...@@ -471,35 +492,42 @@ void InterpreterCore::Convert( ...@@ -471,35 +492,42 @@ void InterpreterCore::Convert(
auto* dev_ctx_ = stream_analyzer_.ParseDeviceContext(op_func_node); auto* dev_ctx_ = stream_analyzer_.ParseDeviceContext(op_func_node);
vec_instruction_.emplace_back(op_idx, std::move(op_func_node), *dev_ctx_); vec_instruction_.emplace_back(op_idx, std::move(op_func_node), *dev_ctx_);
} }
BuildOperatorDependences(); BuildOperatorDependences();
// calculate last_live_ops_ // calculate last_live_ops_
for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) { for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) {
auto& instr = vec_instruction_[op_idx]; Instruction& instr = vec_instruction_[op_idx];
OpInOutInfo info; OpInOutInfo info;
std::set<size_t> gc_check_inputs; info.Build(instr.OpBase());
for (auto& item : instr.Inputs()) {
std::set<size_t> gc_check_vars;
const std::map<std::string, std::vector<int>>& ins = instr.Inputs();
const std::map<std::string, std::vector<int>>& outs = instr.Outputs();
std::multimap<std::string, std::vector<int>> ins_and_outs{ins.begin(),
ins.end()};
ins_and_outs.insert(outs.begin(), outs.end());
for (auto& item : ins_and_outs) {
for (auto id : item.second) { for (auto id : item.second) {
if (id == kEmptyVarIndex) { if (id == kEmptyVarIndex) {
continue; continue;
} }
input_var2op_info_.at(id).push_back(op_idx);
// var can be gc-ed
if (!info.IsBuilt()) {
info.Build(instr.OpBase());
}
auto* var_desc = var_scope_.VarDesc(id); auto* var_desc = var_scope_.VarDesc(id);
if (var_desc) { // skip no_need_buffer input vars
if (info.IsInArgBufferNeeded(var_desc->Name())) { if (var_desc && ins.count(item.first) &&
gc_check_inputs.insert(id); !info.IsInArgBufferNeeded(var_desc->Name())) {
} continue;
} else {
gc_check_inputs.insert(id);
} }
gc_check_vars.insert(id);
} }
} }
for (auto var_id : gc_check_inputs) {
Scope* inner_scope = for (auto var_id : gc_check_vars) {
create_local_scope_ ? local_scope_ : var_scope_.GetMutableScope(); Scope* inner_scope = execution_config_.create_local_scope
? local_scope_
: var_scope_.GetMutableScope();
paddle::framework::Variable* var = paddle::framework::Variable* var =
inner_scope->FindVar(var_scope_.GetNameById(var_id)); inner_scope->FindVar(var_scope_.GetNameById(var_id));
if (var->IsType<LoDTensor>() || var->IsType<phi::SelectedRows>() || if (var->IsType<LoDTensor>() || var->IsType<phi::SelectedRows>() ||
...@@ -512,18 +540,9 @@ void InterpreterCore::Convert( ...@@ -512,18 +540,9 @@ void InterpreterCore::Convert(
} }
} }
} }
for (size_t i = 0; i < vec_instruction_.size(); ++i) {
// checkout output
for (auto& item : vec_instruction_[i].Outputs()) {
for (auto var_id : item.second) {
if (input_var2op_info_.at(var_id).size() == 0) {
last_live_ops_[var_id].insert(i);
}
}
}
}
// clear the last_live_ops list for all vars in skip_gc_vars // clear the last_live_ops list for all vars in skip_gc_vars
for (const std::string& skip_gc_var : skip_gc_vars_) { for (const std::string& skip_gc_var : execution_config_.skip_gc_vars) {
int var_id = var_scope_.GetIdByName(skip_gc_var); int var_id = var_scope_.GetIdByName(skip_gc_var);
if (var_id != -1) { if (var_id != -1) {
last_live_ops_[var_id].clear(); last_live_ops_[var_id].clear();
...@@ -662,8 +681,9 @@ inline void SetDeviceId(const platform::Place& place) { ...@@ -662,8 +681,9 @@ inline void SetDeviceId(const platform::Place& place) {
void InterpreterCore::RunInstruction(const Instruction& instr_node) { void InterpreterCore::RunInstruction(const Instruction& instr_node) {
auto* op = instr_node.OpBase(); auto* op = instr_node.OpBase();
auto place = instr_node.DeviceContext().GetPlace(); auto place = instr_node.DeviceContext().GetPlace();
Scope* local_scope = create_local_scope_ ? var_scope_.GetMutableLocalScope() Scope* local_scope = execution_config_.create_local_scope
: var_scope_.GetMutableScope(); ? var_scope_.GetMutableLocalScope()
: var_scope_.GetMutableScope();
VLOG(4) << "Start run " << place << " " << op->DebugStringEx(local_scope_); VLOG(4) << "Start run " << place << " " << op->DebugStringEx(local_scope_);
SetDeviceId(place); SetDeviceId(place);
...@@ -947,6 +967,8 @@ void InterpreterCore::RunInstructionAsync( ...@@ -947,6 +967,8 @@ void InterpreterCore::RunInstructionAsync(
CheckGC(instr_node, atomic_var_ref); CheckGC(instr_node, atomic_var_ref);
interpreter::LogDeviceMemoryStats(place_);
interpreter::RecordEvent(instr_node, place_); interpreter::RecordEvent(instr_node, place_);
} catch (platform::EnforceNotMet& ex) { } catch (platform::EnforceNotMet& ex) {
framework::InsertCallStackInfo(op->Type(), op->Attrs(), &ex); framework::InsertCallStackInfo(op->Type(), op->Attrs(), &ex);
...@@ -1135,16 +1157,17 @@ void InterpreterCore::Prepare( ...@@ -1135,16 +1157,17 @@ void InterpreterCore::Prepare(
if (!is_build_) { if (!is_build_) {
paddle::framework::interpreter::build_variable_scope( paddle::framework::interpreter::build_variable_scope(
block_, &var_scope_, create_local_scope_); block_, &var_scope_, execution_config_.create_local_scope);
FeedInput(); FeedInput();
std::vector<paddle::framework::OpFuncNode> op_func_nodes; std::vector<paddle::framework::OpFuncNode> op_func_nodes;
paddle::framework::interpreter::build_op_func_list(place_, paddle::framework::interpreter::build_op_func_list(
block_, place_,
skip_gc_vars_, block_,
&op_func_nodes, execution_config_.skip_gc_vars,
&var_scope_, &op_func_nodes,
create_local_scope_, &var_scope_,
used_for_jit_); execution_config_.create_local_scope,
execution_config_.used_for_jit);
is_build_ = true; is_build_ = true;
SetFeedVarsInplaceSkip(feed_names); SetFeedVarsInplaceSkip(feed_names);
// convert vec func_list to graph // convert vec func_list to graph
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "paddle/fluid/framework/new_executor/event_manager.h" #include "paddle/fluid/framework/new_executor/event_manager.h"
#include "paddle/fluid/framework/new_executor/garbage_collector/garbage_collector.h" #include "paddle/fluid/framework/new_executor/garbage_collector/garbage_collector.h"
#include "paddle/fluid/framework/new_executor/interpreter/dependency_builder.h" #include "paddle/fluid/framework/new_executor/interpreter/dependency_builder.h"
#include "paddle/fluid/framework/new_executor/interpreter/execution_config.h"
#include "paddle/fluid/framework/new_executor/interpretercore_util.h" #include "paddle/fluid/framework/new_executor/interpretercore_util.h"
#include "paddle/fluid/framework/new_executor/new_executor_defs.h" #include "paddle/fluid/framework/new_executor/new_executor_defs.h"
#include "paddle/fluid/framework/new_executor/profiler.h" #include "paddle/fluid/framework/new_executor/profiler.h"
...@@ -67,7 +68,8 @@ class InterpreterCore { ...@@ -67,7 +68,8 @@ class InterpreterCore {
void reset_scope(Scope* new_scope); void reset_scope(Scope* new_scope);
private: private:
bool BuildInplaceCheckVarIsOnlyInput(size_t var_index); bool BuildInplaceCheckVarIsOnlyInput(
const std::vector<std::vector<size_t>>& input_var2op, size_t var_index);
std::shared_ptr<interpreter::AsyncWorkQueue> GetWorkQueue(); std::shared_ptr<interpreter::AsyncWorkQueue> GetWorkQueue();
...@@ -112,9 +114,9 @@ class InterpreterCore { ...@@ -112,9 +114,9 @@ class InterpreterCore {
platform::Place place_; platform::Place place_;
const BlockDesc& block_; // not owned const BlockDesc& block_; // not owned
std::set<std::string> skip_gc_vars_;
interpreter::DependencyBuilder dependency_builder_; interpreter::DependencyBuilder dependency_builder_;
interpreter::ExecutionConfig execution_config_;
// NOTE(zhiqiu): when add fetch ops in GetInterpreterCore, we will // NOTE(zhiqiu): when add fetch ops in GetInterpreterCore, we will
// copy a new program and block, the copy_program_ here is used to // copy a new program and block, the copy_program_ here is used to
...@@ -134,10 +136,7 @@ class InterpreterCore { ...@@ -134,10 +136,7 @@ class InterpreterCore {
std::vector<size_t> dependecy_count_; std::vector<size_t> dependecy_count_;
std::atomic<size_t> unfinished_op_numer_{0}; std::atomic<size_t> unfinished_op_numer_{0};
std::vector<std::vector<size_t>> input_var2op_info_;
VariableScope var_scope_; VariableScope var_scope_;
bool create_local_scope_{true};
Scope* local_scope_{nullptr}; // not owned Scope* local_scope_{nullptr}; // not owned
StreamAnalyzer stream_analyzer_; StreamAnalyzer stream_analyzer_;
...@@ -151,8 +150,6 @@ class InterpreterCore { ...@@ -151,8 +150,6 @@ class InterpreterCore {
std::future<std::unique_ptr<AtomicVectorSizeT>> atomic_deps_; std::future<std::unique_ptr<AtomicVectorSizeT>> atomic_deps_;
std::future<std::unique_ptr<AtomicVectorSizeT>> atomic_var_ref_; std::future<std::unique_ptr<AtomicVectorSizeT>> atomic_var_ref_;
bool used_for_jit_{false};
}; };
std::shared_ptr<InterpreterCore> CreateInterpreterCore( std::shared_ptr<InterpreterCore> CreateInterpreterCore(
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#include "paddle/fluid/framework/new_executor/interpretercore_util.h" #include "paddle/fluid/framework/new_executor/interpretercore_util.h"
#include <algorithm> #include <algorithm>
...@@ -18,6 +19,7 @@ ...@@ -18,6 +19,7 @@
#include "paddle/fluid/framework/details/nan_inf_utils.h" #include "paddle/fluid/framework/details/nan_inf_utils.h"
#include "paddle/fluid/framework/executor_gc_helper.h" #include "paddle/fluid/framework/executor_gc_helper.h"
#include "paddle/fluid/framework/new_executor/data_transfer.h" #include "paddle/fluid/framework/new_executor/data_transfer.h"
#include "paddle/fluid/memory/stats.h"
#include "paddle/fluid/operators/controlflow/conditional_block_op_helper.h" #include "paddle/fluid/operators/controlflow/conditional_block_op_helper.h"
#include "paddle/fluid/operators/controlflow/recurrent_op_helper.h" #include "paddle/fluid/operators/controlflow/recurrent_op_helper.h"
#include "paddle/fluid/operators/controlflow/while_op_helper.h" #include "paddle/fluid/operators/controlflow/while_op_helper.h"
...@@ -34,6 +36,11 @@ PADDLE_DEFINE_EXPORTED_bool( ...@@ -34,6 +36,11 @@ PADDLE_DEFINE_EXPORTED_bool(
false, false,
"Enable serial execution for standalone executor, used for debug."); "Enable serial execution for standalone executor, used for debug.");
PADDLE_DEFINE_EXPORTED_bool(
new_executor_log_memory_stats,
false,
"Log memory stats after each op runs, just used for debug.");
DECLARE_bool(use_mkldnn); DECLARE_bool(use_mkldnn);
DECLARE_bool(check_nan_inf); DECLARE_bool(check_nan_inf);
...@@ -135,6 +142,21 @@ std::unique_ptr<AtomicVectorSizeT> PrepareAtomicVarRef( ...@@ -135,6 +142,21 @@ std::unique_ptr<AtomicVectorSizeT> PrepareAtomicVarRef(
return var_ref; return var_ref;
} }
void LogDeviceMemoryStats(const platform::Place& place) {
if (FLAGS_new_executor_log_memory_stats && platform::is_gpu_place(place)) {
VLOG(0) << "memory_allocated: "
<< static_cast<double>(memory::DeviceMemoryStatCurrentValue(
"Allocated", place.device)) /
1024 / 1024
<< " MB";
VLOG(0) << "max_memory_allocated: "
<< static_cast<double>(memory::DeviceMemoryStatPeakValue(
"Allocated", place.device)) /
1024 / 1024
<< " MB";
}
}
bool var_can_be_deleted(const std::string& name, const BlockDesc& block) { bool var_can_be_deleted(const std::string& name, const BlockDesc& block) {
auto* var_desc = block.FindVar(name); auto* var_desc = block.FindVar(name);
if (var_desc == nullptr || var_desc->Persistable()) { if (var_desc == nullptr || var_desc->Persistable()) {
...@@ -629,18 +651,14 @@ void build_op_func_list(const platform::Place& place, ...@@ -629,18 +651,14 @@ void build_op_func_list(const platform::Place& place,
// step 5. run kernel // step 5. run kernel
if (run_phi_kernel) { if (run_phi_kernel) {
VLOG(1) << "start run phi kernel. ";
phi::KernelContext phi_kernel_context; phi::KernelContext phi_kernel_context;
op_with_kernel->BuildPhiKernelContext( op_with_kernel->BuildPhiKernelContext(
runtime_context, dev_ctx, &phi_kernel_context); runtime_context, dev_ctx, &phi_kernel_context);
(*op_func_node.phi_kernel_)(&phi_kernel_context); (*op_func_node.phi_kernel_)(&phi_kernel_context);
VLOG(1) << "end run phi kernel. ";
} else { } else {
VLOG(4) << "start run kernel. ";
// the place of exec_ctx maybe has changed. // the place of exec_ctx maybe has changed.
op_func_node.kernel_func_(ExecutionContext( op_func_node.kernel_func_(ExecutionContext(
*op_with_kernel, *runtime_scope, *dev_ctx, runtime_context)); *op_with_kernel, *runtime_scope, *dev_ctx, runtime_context));
VLOG(4) << "end run kernel. ";
} }
// post-process grad_op.outputs if need cast complex grad into real // post-process grad_op.outputs if need cast complex grad into real
...@@ -702,6 +720,7 @@ void build_op_func_list(const platform::Place& place, ...@@ -702,6 +720,7 @@ void build_op_func_list(const platform::Place& place,
// gc--------------------------------------------------------------------------- // gc---------------------------------------------------------------------------
auto iter = unused_var_map.find(op); auto iter = unused_var_map.find(op);
if (iter == unused_var_map.end()) { if (iter == unused_var_map.end()) {
interpreter::LogDeviceMemoryStats(place);
continue; continue;
} }
...@@ -722,6 +741,8 @@ void build_op_func_list(const platform::Place& place, ...@@ -722,6 +741,8 @@ void build_op_func_list(const platform::Place& place,
} }
} }
delete garbages; // free mem delete garbages; // free mem
interpreter::LogDeviceMemoryStats(place);
} }
} }
......
...@@ -44,7 +44,6 @@ constexpr size_t kPrepareWorkQueueIdx = 2; ...@@ -44,7 +44,6 @@ constexpr size_t kPrepareWorkQueueIdx = 2;
namespace paddle { namespace paddle {
namespace framework { namespace framework {
namespace interpreter { namespace interpreter {
class AsyncWorkQueue { class AsyncWorkQueue {
public: public:
AsyncWorkQueue(size_t host_num_threads, AsyncWorkQueue(size_t host_num_threads,
...@@ -77,6 +76,8 @@ std::unique_ptr<AtomicVectorSizeT> PrepareAtomicDeps( ...@@ -77,6 +76,8 @@ std::unique_ptr<AtomicVectorSizeT> PrepareAtomicDeps(
std::unique_ptr<AtomicVectorSizeT> PrepareAtomicVarRef( std::unique_ptr<AtomicVectorSizeT> PrepareAtomicVarRef(
const std::vector<VariableMetaInfo>& vec_meta_info); const std::vector<VariableMetaInfo>& vec_meta_info);
void LogDeviceMemoryStats(const platform::Place& place);
void build_variable_scope(const framework::BlockDesc& block, void build_variable_scope(const framework::BlockDesc& block,
VariableScope* var_scope, VariableScope* var_scope,
bool use_local_scope = true); bool use_local_scope = true);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册