未验证 提交 2386db87 编写于 作者: Z zhangbo9674 提交者: GitHub

[IR] Support multi-thread run && delete unused code of new_ir interpreter (#56148)

* add code

* fix bug

* fix bug

* delete unused code

* refine code

* fix bug

* fix bug

* fix bug

* fix bug

* fix bug
上级 982100ab
......@@ -664,6 +664,12 @@ void NewIrDependencyBuilder::BuildDownstreamMap() {
}
}
void NewIrDependencyBuilder::ShareDependencyFrom(
const NewIrDependencyBuilder& src) {
std::tie(op_downstream_map_, op_happens_before_) = src.GetDependency();
is_build_ = true;
}
} // namespace interpreter
} // namespace framework
} // namespace paddle
......@@ -114,6 +114,8 @@ class NewIrDependencyBuilder : public DependencyBuilder {
void BuildDownstreamMap();
void ShareDependencyFrom(const NewIrDependencyBuilder& src);
private:
std::vector<paddle::framework::InstructionBase*> instructions_; // not_owned
};
......
......@@ -684,7 +684,7 @@ platform::DeviceType NewIrStreamAnalyzer::GetWaiterType(
}
}
void NewIrStreamAnalyzer::ShareEventInfoFrom(const StreamAnalyzer& src) {
void NewIrStreamAnalyzer::ShareEventInfoFrom(const NewIrStreamAnalyzer& src) {
event_info_ = src.GetEventInfo();
is_event_info_build_ = true;
}
......
......@@ -138,7 +138,7 @@ class NewIrStreamAnalyzer {
platform::DeviceType GetWaiterType(
const paddle::framework::InstructionBase* instr) const;
void ShareEventInfoFrom(const StreamAnalyzer& src);
void ShareEventInfoFrom(const NewIrStreamAnalyzer& src);
std::shared_ptr<
std::map<const DeviceContext*, std::map<size_t, std::set<size_t>>>>
......
......@@ -72,12 +72,6 @@ class InterpreterBaseImpl {
virtual paddle::framework::FetchList Run(
const std::vector<std::string>& feed_names, bool need_fetch = true) = 0;
// NOTE(zhangbo): This interface is only used for temporary testing and only
// for testing during the iteration process of the new IR access actuator
// version. It will be deleted in the future.
virtual paddle::framework::FetchList BetaRun(
const std::vector<std::string>& feed_names, bool need_fetch = true) = 0;
virtual void ShareWorkQueueFrom(InterpreterBaseImpl* src) = 0;
virtual void ShareBuildResultsFrom(const InterpreterBaseImpl& src) = 0;
......@@ -107,6 +101,12 @@ class InterpreterBaseImpl {
virtual const interpreter::StreamAnalyzer& GetStreamAnalyzer() const = 0;
virtual const interpreter::NewIrDependencyBuilder& GetNewIrDependencyBuilder()
const = 0;
virtual const interpreter::NewIrStreamAnalyzer& GetNewIrStreamAnalyzer()
const = 0;
virtual bool IsSharedResultsBuild() const = 0;
};
......
......@@ -74,11 +74,6 @@ FetchList InterpreterCore::Run(const std::vector<std::string>& feed_names,
return impl_->Run(feed_names, need_fetch);
}
FetchList InterpreterCore::BetaRun(const std::vector<std::string>& feed_names,
bool need_fetch) {
return impl_->BetaRun(feed_names, need_fetch);
}
void InterpreterCore::ShareWorkQueueFrom(std::shared_ptr<InterpreterCore> src) {
impl_->ShareWorkQueueFrom(const_cast<InterpreterBaseImpl*>(src->Impl()));
}
......
......@@ -52,9 +52,6 @@ class InterpreterCore {
paddle::framework::FetchList Run(const std::vector<std::string>& feed_names,
bool need_fetch = true);
paddle::framework::FetchList BetaRun(
const std::vector<std::string>& feed_names, bool need_fetch = true);
void ShareWorkQueueFrom(std::shared_ptr<InterpreterCore> src);
void ShareBuildResultsFrom(std::shared_ptr<InterpreterCore> src);
......
......@@ -43,9 +43,7 @@
PHI_DECLARE_bool(enable_new_ir_in_executor);
PHI_DECLARE_bool(enable_new_ir_in_executor_beta_run);
PHI_DECLARE_bool(enable_new_ir_in_executor_loop_run);
PHI_DECLARE_bool(enable_new_ir_in_executor_trace_run);
namespace paddle {
namespace framework {
......@@ -57,7 +55,6 @@ NewIRInterpreter::NewIRInterpreter(
framework::Scope* scope,
const ExecutionConfig& execution_config)
: place_(place),
stream_analyzer_(place),
execution_config_(execution_config),
var_scope_(scope),
scope_(scope),
......@@ -74,6 +71,8 @@ NewIRInterpreter::NewIRInterpreter(
exception_notifier_ = main_thread_blocker_.RegisterEvent(kExceptionCaught);
completion_notifier_ = main_thread_blocker_.RegisterEvent(kTaskCompletion);
dependecy_count_ = std::make_shared<std::vector<size_t>>();
if (!FLAGS_new_executor_use_local_scope) {
execution_config_.create_local_scope = false;
}
......@@ -90,17 +89,6 @@ NewIRInterpreter::NewIRInterpreter(
ir_program_->block()->size());
execution_config_.Log(/*log_level=*/8);
instruction_scheduling_priority_less = [this](size_t lhs, size_t rhs) {
SchedulingPriority lhs_scheduling_priority =
vec_instruction_[lhs].GetSchedulingPriority();
SchedulingPriority rhs_scheduling_priority =
vec_instruction_[rhs].GetSchedulingPriority();
if (lhs_scheduling_priority == rhs_scheduling_priority) {
return lhs < rhs;
}
return lhs_scheduling_priority > rhs_scheduling_priority;
};
ir_instruction_scheduling_priority_less = [this](size_t lhs, size_t rhs) {
SchedulingPriority lhs_scheduling_priority =
vec_instruction_base_[lhs]->GetSchedulingPriority();
......@@ -128,163 +116,6 @@ NewIRInterpreter::~NewIRInterpreter() {
#endif
}
void NewIRInterpreter::RunImpl() {
// lazy initialization of gc, do not create gc is the program only run once
if (!gc_) {
gc_ = CreateInterpreterCoreGarbageCollector(place_, vec_instruction_);
}
interpreter::ResetAtomicGuard guard(&deps_, &refs_);
// if ((execution_config_.used_for_jit || execution_config_.used_for_cinn)
// &&
// (sync_op_num_ == 0)) {
VLOG(4) << "Tracing Instruction List";
TraceInstructionList(vec_instruction_);
// } else {
// VLOG(4) << "Non-tracing";
// // For the program that only run once, it is no need to
// // create work_queue, so the async_work_queue_ is created
// // until the second step run.
// async_work_queue_ = GetWorkQueue();
// ExecuteInstructionList(vec_instruction_);
// }
// #ifdef PADDLE_WITH_CUSTOM_DEVICE
// if (platform::is_custom_place(place_)) {
// platform::DeviceContextPool::Instance().Get(place_)->Wait();
// }
// #endif
}
FetchList NewIRInterpreter::Run(
const std::vector<std::string>& feed_names,
const std::vector<phi::DenseTensor>& feed_tensors) {
SetDeviceId(place_);
CheckCUDAGraphBeforeRun(feed_names);
#ifdef PADDLE_WITH_DNNL
platform::AttachPointerHashToMKLDNNKey(this, place_);
#endif
bool is_build = is_build_;
Prepare(feed_names, feed_tensors, is_build);
if (is_build) {
RunImpl();
}
if (HasLocalScope()) {
ClearLoDTensorArrayInLocalScope();
}
// return Fetch Tensors
auto* fetch_var = local_scope_->FindVar(interpreter::kFetchVarName);
if (fetch_var) {
auto fetch_list = std::move(*fetch_var->GetMutable<framework::FetchList>());
#ifdef PADDLE_WITH_CUDA
if (platform::IsCUDAGraphCapturing()) {
PADDLE_ENFORCE_EQ(fetch_list.empty(),
true,
platform::errors::InvalidArgument(
"Cannot fetch data when using CUDA Graph."));
}
#endif
return fetch_list;
} else {
return {};
}
}
FetchList NewIRInterpreter::Run(const std::vector<std::string>& feed_names,
bool need_fetch) {
if (FLAGS_enable_new_ir_in_executor_beta_run) {
LOG_FIRST_N(INFO, 1) << "New ir interpreter is running in BetaRun mode.";
return BetaRun(feed_names, need_fetch);
}
SetDeviceId(place_);
CheckCUDAGraphBeforeRun(feed_names);
#ifdef PADDLE_WITH_DNNL
platform::AttachPointerHashToMKLDNNKey(this, place_);
#endif
if (!is_build_) {
LOG_FIRST_N(INFO, 1) << "New ir interpreter is running in OldRun mode.";
std::stringstream ss;
ss << this;
::ir::BuildScope(*ir_program_->block(),
InnerScope(),
ss.str(),
&value_2_var_name_,
&variable_2_var_name_,
&var_name_2_id_,
&variable_list_);
VLOG(4) << DebugValueInfo();
SolvePersisableVarNames();
std::vector<paddle::framework::OpFuncNode> op_func_nodes;
interpreter::BuildOpFuncList(place_,
ir_program_->block(),
&op_func_nodes,
scope_,
local_scope_,
value_2_var_name_,
execution_config_);
// SetFeedVarsInplaceSkip(feed_names);
// convert vec func_list to graph
Convert(&op_func_nodes);
UpdateSyncOpNum();
if (static_build_) {
VLOG(4) << "RUN impl";
RunImpl();
}
is_build_ = true;
} else {
RunImpl();
}
if (HasLocalScope()) {
ClearLoDTensorArrayInLocalScope();
}
// return Fetch Tensors
Scope* inner_scope = InnerScope();
if (FLAGS_enable_new_ir_in_executor) {
framework::FetchList fetch_res;
if (need_fetch) {
for (auto& var_name : fetch_var_names_) {
auto* var = inner_scope->FindVar(var_name);
VLOG(0) << "fetch " << var_name << "[" << var << "]";
fetch_res.push_back(var->Get<phi::DenseTensor>());
}
}
VLOG(4) << "get fetch list size: " << fetch_res.size();
return fetch_res;
} else {
auto* fetch_var = inner_scope->FindVar(interpreter::kFetchVarName);
if (fetch_var && need_fetch) {
auto fetch_list =
std::move(*fetch_var->GetMutable<framework::FetchList>());
#ifdef PADDLE_WITH_CUDA
if (platform::IsCUDAGraphCapturing()) {
PADDLE_ENFORCE_EQ(fetch_list.empty(),
true,
platform::errors::InvalidArgument(
"Cannot fetch data when using CUDA Graph."));
}
#endif
return fetch_list;
} else {
return {};
}
}
}
int NewIRInterpreter::GetIdByName(const std::string& name) const {
auto it = var_name_2_id_.find(name);
if (it != var_name_2_id_.end()) {
......@@ -294,7 +125,8 @@ int NewIRInterpreter::GetIdByName(const std::string& name) const {
}
void NewIRInterpreter::SetCopyProgram(std::shared_ptr<ProgramDesc> prog) {
copy_program_ = prog;
PADDLE_THROW(platform::errors::Unimplemented(
"SetCopyProgram is not implemented in NewIRInterpreter."));
}
void NewIRInterpreter::SetSkipGcVars(
......@@ -331,21 +163,17 @@ const VariableScope* NewIRInterpreter::GetVariableScope() const {
void NewIRInterpreter::reset_scope(Scope* new_scope) {
var_scope_.SetScope(new_scope);
auto& var_list = var_scope_.MutableVarList();
for (size_t i = 0; i < var_list.size(); i++) {
const auto& var_name = var_scope_.GetNameById(i);
var_list[i] = new_scope->FindVar(var_name);
scope_ = new_scope;
for (size_t i = 0; i < variable_list_.size(); i++) {
const auto& var_name = GetNameById(i);
variable_list_[i] = new_scope->FindVar(var_name);
}
// The index should be assured valid, cause the InterpreterCore may not be
// fully built, but was still cached and used. For example, see unit test
// `test_assert.py`, it may exit before `NewIRInterpreter::Convert`,
// but still was cached and used by later tests.
for (size_t i = 0; i < std::min(refs_.size(), var_list.size()); i++) {
refs_[i]->ResetVariable(var_list[i]);
}
for (auto& ins : vec_instruction_) {
BuildAndCacheInstructionCtx(&ins);
for (size_t i = 0; i < std::min(refs_.size(), variable_list_.size()); i++) {
refs_[i]->ResetVariable(variable_list_[i]);
}
}
......@@ -373,8 +201,17 @@ void NewIRInterpreter::ShareWorkQueueFrom(InterpreterBaseImpl* src) {
}
void NewIRInterpreter::ShareBuildResultsFrom(const InterpreterBaseImpl& src) {
PADDLE_THROW(platform::errors::Unimplemented(
"ShareBuildResultsFrom is not implemented in NewIRInterpreter."));
if (is_shared_results_build_ || !src.IsSharedResultsBuild()) {
return;
}
// share op dependency
ir_dependency_builder_.ShareDependencyFrom(src.GetNewIrDependencyBuilder());
dependecy_count_ = src.GetDependencyCount();
// share event analysis
ir_stream_analyzer_.ShareEventInfoFrom(src.GetNewIrStreamAnalyzer());
is_shared_results_build_ = true;
VLOG(8) << "Share Build Results from InterpreterCore(" << &src
<< ") to InterpreterCore(" << this << ")";
}
// op dependences
......@@ -384,10 +221,14 @@ const interpreter::DependencyBuilder& NewIRInterpreter::GetDependencyBuilder()
"GetDependencyBuilder is not implemented in NewIRInterpreter."));
}
const interpreter::NewIrDependencyBuilder&
NewIRInterpreter::GetNewIrDependencyBuilder() const {
return ir_dependency_builder_;
}
std::shared_ptr<std::vector<size_t>> NewIRInterpreter::GetDependencyCount()
const {
PADDLE_THROW(platform::errors::Unimplemented(
"GetDependencyCount is not implemented in NewIRInterpreter."));
return dependecy_count_;
}
const interpreter::StreamAnalyzer& NewIRInterpreter::GetStreamAnalyzer() const {
......@@ -395,1069 +236,114 @@ const interpreter::StreamAnalyzer& NewIRInterpreter::GetStreamAnalyzer() const {
"GetStreamAnalyzer is not implemented in NewIRInterpreter."));
}
bool NewIRInterpreter::IsSharedResultsBuild() const {
PADDLE_THROW(platform::errors::Unimplemented(
"IsSharedResultsBuild is not implemented in NewIRInterpreter."));
}
bool NewIRInterpreter::BuildInplaceCheckVarIsOnlyInput(
const std::vector<std::vector<size_t>>& input_var2op, size_t var_index) {
if (!var_scope_.VarDesc(var_index)) {
return input_var2op.at(var_index).size() == 1;
} else {
int is_input_cnt = 0;
for (auto inst_id : input_var2op.at(var_index)) {
OpInOutInfo info;
info.Build(vec_instruction_.at(inst_id).OpBase());
if (info.IsInArgBufferNeeded(var_scope_.VarDesc(var_index)->Name())) {
is_input_cnt++;
}
}
return is_input_cnt == 1;
}
}
std::shared_ptr<interpreter::AsyncWorkQueue> NewIRInterpreter::GetWorkQueue() {
if (async_work_queue_ == nullptr) {
async_work_queue_ = std::make_shared<interpreter::AsyncWorkQueue>(
execution_config_.host_num_threads,
execution_config_.device_num_threads,
nullptr);
}
return async_work_queue_;
}
void NewIRInterpreter::BuildAndCacheInstructionCtx(Instruction* instr_node) {
Scope* inner_scope = InnerScope();
VariableValueMap ins_map;
for (auto& var_name_item : instr_node->Inputs()) {
std::vector<Variable*> input_vars;
input_vars.reserve(var_name_item.second.size());
for (auto& id : var_name_item.second) {
input_vars.emplace_back(inner_scope->FindVar(var_scope_.GetNameById(id)));
}
ins_map.emplace(var_name_item.first, std::move(input_vars));
}
VariableValueMap outs_map;
for (auto& var_name_item : instr_node->Outputs()) {
std::vector<Variable*> out_vars;
out_vars.reserve(var_name_item.second.size());
for (auto& id : var_name_item.second) {
out_vars.emplace_back(inner_scope->FindVar(var_scope_.GetNameById(id)));
}
outs_map.emplace(var_name_item.first, std::move(out_vars));
}
// set runtime_ctx and infershape_ctx_
if (instr_node->OpBase()->Type() == "cinn_launch" ||
instr_node->OpBase()->Type() == "cinn_instruction_run") { // OP use scope
// in kernel
Scope* inner_scope = InnerScope();
instr_node->ResetContextWithScope(ins_map, outs_map, *inner_scope);
} else {
instr_node->ResetContext(ins_map, outs_map);
}
}
void NewIRInterpreter::BuildInplace() {
// NOTE(Ruibiao): coalesce_tensor_op outputs a FusedOutput phi::DenseTensor
// and a list of Output Tensors which are sliced from the FusedOutput. These
// outputs sholud not be the outvar of the in-place var-pair since memory
// reuse between FusedOutput and Output Tensors is assumed. For the following
// example:
// fused_var, var1, var2, var3 = coalesce_tensor(var1, var2, var3)
// var1 = sum(var4, var5)
// ...
//
// After running coalesce_tensor_op, var1 is assumed to share the buffer
// slices from fused_var. However, if sum_op is in-place, then var1 would
// re-share the buffer with var4 instead of fused_var.
std::set<std::string> skip_inplace_outvars;
for (Instruction& instr : vec_instruction_) {
OperatorBase* op = instr.OpBase();
if (op->Type() == kCoalesceTensor) {
const std::vector<std::string>& outputs =
op->OutputVars(/*has_intermediate=*/false);
skip_inplace_outvars.insert(outputs.begin(), outputs.end());
}
}
Scope* local_scope = InnerScope();
std::vector<std::vector<size_t>> input_var2op(var_scope_.VarSize());
for (Instruction& instr : vec_instruction_) {
for (auto& item : instr.Inputs()) {
for (int var_id : item.second) {
if (var_id != kEmptyVarIndex) {
input_var2op.at(var_id).push_back(instr.Id());
}
}
}
}
for (auto& instr : vec_instruction_) {
auto* op_base = instr.OpBase();
if (!op_base->Info().infer_inplace_) {
continue;
}
auto in_to_outs = op_base->Info().infer_inplace_(
platform::is_gpu_place(instr.DeviceContext().GetPlace()));
auto& inputs = instr.Inputs();
auto& outputs = instr.Outputs();
for (auto& pair : in_to_outs) {
auto iter = inputs.find(pair.first);
if (iter != inputs.end() && !iter->second.empty()) {
auto in_var_desc = var_scope_.VarDesc(iter->second[0]);
if (in_var_desc && in_var_desc->Persistable()) {
continue;
}
if (var_scope_.GetVarSikpInplace(iter->second[0])) {
continue;
}
if (BuildInplaceCheckVarIsOnlyInput(input_var2op, iter->second[0])) {
auto iterout = outputs.find(pair.second);
if (iterout != outputs.end() && !iterout->second.empty()) {
const std::string& invar_name =
var_scope_.GetNameById(iter->second[0]);
const std::string& outvar_name =
var_scope_.GetNameById(iterout->second[0]);
auto invar = local_scope->FindVar(invar_name);
auto outvar = local_scope->FindVar(outvar_name);
if (invar && outvar && invar->IsType<phi::DenseTensor>() &&
outvar->IsType<phi::DenseTensor>() &&
skip_inplace_outvars.find(outvar_name) ==
skip_inplace_outvars.end()) {
instr.AddInplace(invar, outvar);
VLOG(3) << "inplace " << op_base->Type() << " " << invar_name
<< " -> " << outvar_name;
}
}
}
}
}
}
}
void NewIRInterpreter::PrepareForCUDAGraphCapture() {
if (!FLAGS_new_executor_use_cuda_graph) return;
#ifdef PADDLE_WITH_CUDA
PADDLE_ENFORCE_EQ(
platform::IsCUDAGraphCapturing(),
false,
platform::errors::PermissionDenied("CUDA Graph is not allowed to capture "
"before prepare."));
PADDLE_ENFORCE_EQ(platform::is_gpu_place(place_),
true,
platform::errors::InvalidArgument(
"CUDA Graph is only supported on NVIDIA GPU device."));
// If set true, will call `cudaStreamSynchronize(nccl_stream)`after allreduce.
// which may cause error in cuda graph. This behavior is consistent with PE.
PADDLE_ENFORCE_EQ(FLAGS_sync_nccl_allreduce,
false,
platform::errors::InvalidArgument(
"FLAGS_sync_nccl_allreduce must be False to support "
"CUDA Graph capturing."));
// All output vars of coalesce_tensor op should be persistable.
// If fused output var of coalesce_tensor is gc, it will cause accuracy
// problem. The specific reasons need to be analyzed.
// for (auto& op_desc : block_.AllOps()) {
// if (op_desc->Type() == kCoalesceTensor) {
// for (auto& out_var_name : op_desc->OutputArgumentNames()) {
// // The fused var needs to be set to persistable, not just added to
// // skip_gc_vars.
// // In the case where the feed fetch var is changed,
// StandaloneExecutor
// // will be newly constructed. If the fused var is not persistable,
// // these vars will be recreated and initialized, resulting in
// // precision problems.
// auto* out_var = op_desc->Block()->FindVarRecursive(out_var_name);
// if (out_var) {
// out_var->SetPersistable(true);
// VLOG(4) << "Mark Var(" << out_var_name << ") as Persistable.";
// }
// }
// }
// }
#else
PADDLE_THROW(platform::errors::Unimplemented(
"CUDA Graph is only supported on NVIDIA GPU device."));
#endif
}
void NewIRInterpreter::CheckCUDAGraphBeforeRun(
const std::vector<std::string>& feed_names) {
#ifdef PADDLE_WITH_CUDA
if (platform::IsCUDAGraphCapturing()) {
PADDLE_ENFORCE_EQ(
feed_names.empty(),
true,
platform::errors::InvalidArgument(
"Feeding data is not permitted when capturing CUDA Graph."));
PADDLE_ENFORCE_EQ(
FLAGS_new_executor_use_cuda_graph,
true,
platform::errors::InvalidArgument(
"You must turn on FLAGS_new_executor_use_cuda_graph to True "
"to enable CUDA Graph capturing."));
PADDLE_ENFORCE_EQ(
place_,
platform::CUDAGraphCapturingPlace(),
platform::errors::InvalidArgument("The place to capture CUDAGraph is "
"not the same as the place to run."));
}
#endif
}
void NewIRInterpreter::BuildOperatorDependences() {
// analysis the dependences between ops, add next_instr_list to each instr,
// and set the dependecy_count_
size_t instr_num = vec_instruction_.size();
dependecy_count_ = std::vector<size_t>(instr_num, 0);
auto downstream_map = dependency_builder_.Build(vec_instruction_);
for (size_t instr_id = 0; instr_id < instr_num; ++instr_id) {
Instruction& cur_instr = vec_instruction_[instr_id];
const std::set<size_t>& next_instr_ids = downstream_map[instr_id];
if (FLAGS_new_executor_serial_run) {
for (size_t next_instr_id : next_instr_ids) {
cur_instr.AddNextInstrInSameThread(next_instr_id);
}
} else {
if (cur_instr.KernelType() == OpFuncType::kGpuAsync) {
for (size_t next_instr_id : next_instr_ids) {
if (vec_instruction_[next_instr_id].KernelType() ==
OpFuncType::kGpuAsync) {
cur_instr.AddNextInstrInSameThread(next_instr_id);
} else {
cur_instr.AddNextInstrInDifferentThread(next_instr_id);
}
}
} else {
bool has_instr_in_same_thread = false;
for (size_t next_instr_id : next_instr_ids) {
if (!has_instr_in_same_thread &&
vec_instruction_[next_instr_id].KernelType() !=
OpFuncType::kGpuAsync) {
cur_instr.AddNextInstrInSameThread(next_instr_id);
has_instr_in_same_thread = true;
} else {
cur_instr.AddNextInstrInDifferentThread(next_instr_id);
}
}
}
}
for (size_t next_instr_id : next_instr_ids) {
++dependecy_count_[next_instr_id];
}
}
}
// At the end of each step, the holder of phi::DenseTensor in LoDTensorArray is
// null. Clear these Tensors and leave LoDTensorArray empty, otherwise an
// exception will occur in the next step
void NewIRInterpreter::ClearLoDTensorArrayInLocalScope() {
auto vars = local_scope_->LocalVars();
for (auto var : vars) {
if (var->IsType<LoDTensorArray>()) {
auto* lod_tensor_arr = var->GetMutable<LoDTensorArray>();
lod_tensor_arr->clear();
}
}
}
void NewIRInterpreter::Convert(
std::vector<paddle::framework::OpFuncNode>* op_func_nodes) {
auto& vec_meta_info = var_scope_.MutableVecMetaInfo();
auto nodes = *op_func_nodes;
auto op_nums = nodes.size();
vec_instruction_.clear();
vec_instruction_.reserve(op_nums);
for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) {
auto& op_func_node = nodes[op_idx];
auto* dev_ctx_ = stream_analyzer_.ParseDeviceContext(op_func_node);
vec_instruction_.emplace_back(op_idx, std::move(op_func_node), *dev_ctx_);
#ifdef PADDLE_WITH_CUDA
if (FLAGS_new_executor_use_cuda_graph) {
auto& op = op_func_node.operator_base_;
auto& op_type = op->Type();
if (op_type == interpreter::kMemcpyD2H ||
op_type == interpreter::kMemcpyH2D) {
PADDLE_THROW(paddle::platform::errors::Fatal(
"Cuda memory copy d2h/h2d is not allowed while using cuda graph."));
}
PADDLE_ENFORCE_EQ(typeid(*dev_ctx_) == typeid(phi::GPUContext),
true,
platform::errors::InvalidArgument(
"Device context of op %s must be [%s] while using "
"cuda graph, but got [%s].",
op_type,
typeid(phi::GPUContext).name(),
typeid(*dev_ctx_).name()));
// cuda graph needs to record all stream
phi::backends::gpu::CUDAGraphContextManager::Instance()
.RecordCapturingDeviceContext(dev_ctx_);
}
#endif
}
BuildOperatorDependences();
// NOTE(Ruibiao): For cross-step stream synchronization, an event may be
// recorded in the first step and waited in the second step. So, in the first
// step, the WaitEvent may be called without RecordEvent. Considering that
// before the first call to RecordEvent, an Event represents an empty set of
// work and WaitEvent always return succeed immediately, we omit the
// prelude-record for the first step here.
stream_analyzer_.ConstructEvents(&vec_instruction_);
// add event for the input var of jit program, since there are async copied
// from gpu_pinned place to gpu place on compute stream.
for (size_t i = 0; i < dependecy_count_.size(); ++i) {
if (dependecy_count_[i] == 0) {
auto& inst = vec_instruction_[i];
if (inst.OpBaseValid() &&
inst.OpBase()->Type() == interpreter::kMemcpyD2H &&
platform::is_gpu_place(place_)) {
for (auto& item : inst.Inputs()) {
for (auto var_id : item.second) {
auto name = var_scope_.GetNameById(var_id);
if (JitInputVars().count(name)) {
auto device_event = std::make_shared<platform::DeviceEvent>(
place_, platform::GenerateDeviceEventFlag());
VLOG(4) << "Add input event for input: " << name << " of "
<< inst.OpBase()->Type();
inst.AddEventToWait(
i, device_event, stream_analyzer_.GetWaiterType(inst));
}
}
}
}
}
}
// clear the last_live_ops list for all vars in skip_gc_vars
for (const std::string& skip_gc_var : execution_config_.skip_gc_vars) {
int var_id = var_scope_.GetIdByName(skip_gc_var);
if (var_id != -1) {
last_live_ops_[var_id].clear();
VLOG(8) << "Skip gc for var: " << skip_gc_var;
}
}
// shrink, find the downstream op that has no other op in the
// downstream list happens before it
// For example,
// b = op1(a)
// c = op2(a, b)
// in this case, a is the input of op1 and op2, we only need to check
// a after op2, because op2 always uses a after op1.
for (size_t i = 0; i < last_live_ops_.size(); ++i) {
std::set<size_t> minumum_last_live_ops;
for (size_t item : last_live_ops_[i]) {
bool not_before_any = true;
// find the op that is not executed before any
for (size_t other_item : last_live_ops_[i]) {
if (dependency_builder_.OpHappensBefore(item, other_item)) {
VLOG(8) << "happens_before: " << item << "->" << other_item
<< ", so skip " << item;
not_before_any = false;
break;
}
}
if (not_before_any) {
VLOG(8) << "last live op of var " << i << " "
<< var_scope_.GetNameById(i) << " : " << item << " "
<< vec_instruction_[item].OpBase()->Type();
minumum_last_live_ops.insert(item);
vec_instruction_[item].AddGCCheckVar(i);
}
}
last_live_ops_[i] = minumum_last_live_ops;
vec_meta_info[i].var_ref_count_ = last_live_ops_[i].size();
}
for (auto& dep : dependecy_count_) {
deps_.emplace_back(std::make_shared<interpreter::OpDepInfo>(dep));
}
for (size_t i = 0; i < vec_meta_info.size(); ++i) {
refs_.emplace_back(std::make_shared<interpreter::VarRefInfo>(
vec_meta_info[i].var_ref_count_, var_scope_.VarRef(i)));
}
AnalyseExecuteOrderForTrace(dependency_builder_.OpDownstreamMap(),
instruction_scheduling_priority_less);
}
void NewIRInterpreter::BuildSkipShareLoDInfo() {
for (size_t i = 0; i < vec_instruction_.size(); ++i) {
bool can_skip_lod = true;
for (auto& input : vec_instruction_[i].InnerRuntimeContext()->inputs) {
for (auto& var : input.second) {
if (var->IsType<phi::DenseTensor>()) {
if (!var->Get<phi::DenseTensor>().lod().empty()) {
can_skip_lod = false;
break;
}
} else {
can_skip_lod = false;
break;
}
}
}
if (can_skip_lod) {
VLOG(8) << "skip share lod for: " << vec_instruction_[i].OpBase()->Type()
<< " (" << i << ")";
}
vec_instruction_[i].InnerInferShapeContext()->SetSkipLoD(can_skip_lod);
}
}
void NewIRInterpreter::RunOperator(const Instruction& instr_node) {
auto* op = instr_node.OpBase();
auto place = instr_node.DeviceContext().GetPlace();
Scope* local_scope = InnerScope();
VLOG(4) << "Start run " << place << " " << op->DebugStringEx(local_scope);
auto op_with_kernel = dynamic_cast<const framework::OperatorWithKernel*>(op);
{
// If it is OperatorBase, InferShape do nothing.
if (op_with_kernel != nullptr) {
platform::RecordEvent infershape_event(
"infer_shape",
platform::TracerEventType::OperatorInner,
1,
platform::EventRole::kInnerOp);
// see OperatorWithKernel::RunImpl in operator.cc for why
if (!(op_with_kernel->HasAttr(kAllKernelsMustComputeRuntimeShape) &&
op_with_kernel->Attr<bool>(kAllKernelsMustComputeRuntimeShape))) {
op_with_kernel->Info().infer_shape_(
instr_node.InnerInferShapeContext().get());
}
infershape_event.End();
platform::RecordOpInfoSupplement(op->Type(),
op->Attrs(),
*(instr_node.InnerInferShapeContext()),
*(instr_node.InnerRuntimeContext()),
op->Id());
}
}
if (op_with_kernel != nullptr && FLAGS_new_executor_use_inplace) {
// TODO(xiongkun03) Does operator base support inplace ?
for (auto& pair : instr_node.InplaceInfo()) {
const auto& in = paddle::framework::details::GetTensorFromVar(pair.first);
auto* out =
paddle::framework::details::GetMutableTensorFromVar(pair.second);
if (in.dims() == out->dims()) {
out->ShareBufferWith(in);
}
}
}
{
platform::RecordEvent compute_event(
"compute",
platform::TracerEventType::OperatorInner,
1,
platform::EventRole::kInnerOp);
if (op_with_kernel == nullptr) { // operator base
instr_node.OpBase()->Run(*local_scope, place_);
} else {
phi::Kernel* kernel = instr_node.PhiKernel();
if (kernel && kernel->IsValid()) { // phi kernel
if (kernel->GetKernelRegisteredType() ==
phi::KernelRegisteredType::FUNCTION) {
VLOG(4) << "Run function kernel: " << op->Type();
VLOG(4) << instr_node.InnerRuntimeContext().get() << " "
<< &instr_node.DeviceContext();
phi::KernelContext phi_kernel_context;
op_with_kernel->BuildPhiKernelContext(
*instr_node.InnerRuntimeContext().get(),
const_cast<platform::DeviceContext*>(&instr_node.DeviceContext()),
&phi_kernel_context);
(*kernel)(&phi_kernel_context);
} else {
VLOG(4) << "Run structure kernel: " << op->Type();
(*kernel)(instr_node.InnerExecutionContext().get());
}
} else { // fluid kernel
instr_node.KernelFunc()(*instr_node.InnerExecutionContext().get());
}
}
}
VLOG(4) << "End run " << place << " " << op->DebugStringEx(local_scope);
if (!instr_node.InplaceBackMap().empty()) {
platform::RecordEvent inplaceback_event(
"InplaceVarsBack", platform::TracerEventType::UserDefined, 10);
auto& m = instr_node.InplaceBackMap();
// NOTE(zhiqiu): same logic as TransferInplaceVarsBack() in operator.cc
for (auto& p : m) {
auto* transformed_tensor = GetMutableLoDTensorOrSelectedRowsValueFromVar(
var_scope_.VarRef(p.first));
auto* original_tensor = GetMutableLoDTensorOrSelectedRowsValueFromVar(
var_scope_.VarRef(p.second));
original_tensor->ShareDataWith(*transformed_tensor);
VLOG(4) << "Transfer inplace variable back form "
<< var_scope_.GetNameById(p.first) << " to "
<< var_scope_.GetNameById(p.second);
}
}
/*For profiling/benchmark only*/
if (FLAGS_benchmark) {
instr_node.DeviceContext().Wait();
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
PADDLE_ENFORCE_GPU_SUCCESS(platform::GpuGetLastError());
VLOG(4) << "Operator(" << op->Type()
<< "): context wait and get last error";
#endif
}
for (auto& hook : hookfuncs_) {
hook(op, local_scope);
}
// for debug nan/inf
if (op_with_kernel != nullptr && FLAGS_check_nan_inf) {
VLOG(4) << "Check nan/inf";
try {
framework::details::CheckOpHasNanOrInf(
*op,
*local_scope,
place); // TODO(xiongkun03) change it to inner scope.
} catch (...) {
const std::vector<std::string>* callstack = nullptr;
auto attrs = op->Attrs();
auto iter =
attrs.find(OpProtoAndCheckerMaker::OpCreationCallstackAttrName());
if (iter != attrs.end()) {
callstack = &PADDLE_GET_CONST(std::vector<std::string>, iter->second);
if (callstack->empty()) callstack = nullptr;
}
std::ostringstream sout;
if (callstack) {
if (FLAGS_call_stack_level > 1) {
sout << "\n\n Compile Traceback (most recent call last):";
} else {
sout << "In user code:\n";
}
for (auto& line : *callstack) {
sout << "\n " << line;
}
}
std::cout << sout.str() << std::endl;
std::rethrow_exception(std::current_exception());
}
}
}
void NewIRInterpreter::RunInstruction(const Instruction& instr_node) {
OperatorBase* op = nullptr;
if (instr_node.OpBaseValid()) {
op = instr_node.OpBase();
platform::RecordEvent instruction_event(
op->Type(), platform::TracerEventType::Operator, 1);
}
SetDeviceId(instr_node.DeviceContext().GetPlace());
try {
instr_node.WaitEvent(place_);
if (instr_node.PreDefineContext()) {
VLOG(5) << "run new ir selected kernel";
auto op_func_node = const_cast<OpFuncNode*>((instr_node.OpFunc()));
VLOG(5) << "begin to run op " << op_func_node->phi_op_name_;
if (op_func_node->infer_meta_interface_) {
op_func_node->infer_meta_interface_->infer_meta_(
&(op_func_node->infer_meta_context_));
}
VLOG(5) << "after run infer meta";
if (op_func_node->fluid_op) {
// run fluid op
ExecutionContext exe_ctx(*(op_func_node->operator_base_.get()),
*scope_,
*(op_func_node->dev_ctx_),
*(op_func_node->runtime_ctx_.get()));
(*(op_func_node->phi_kernel_))(&exe_ctx);
} else {
(*(op_func_node->phi_kernel_))(&(op_func_node->kernel_context_));
}
VLOG(5) << "after run kernel";
} else if (!instr_node.IsArtificial()) {
RunOperator(instr_node);
CheckGC(instr_node);
interpreter::LogDeviceMemoryStats(place_);
}
instr_node.RecordEvent(place_);
} catch (platform::EnforceNotMet& ex) {
LOG(WARNING) << instr_node.OpFunc()->phi_op_name_
<< " raises an EnforceNotMet exception "
<< platform::demangle(typeid(ex).name()) << ", " << ex.what();
exception_holder_.Catch(std::make_exception_ptr(std::move(ex)));
} catch (platform::EOFException&) {
exception_holder_.Catch(std::current_exception());
} catch (std::exception& ex) {
LOG(WARNING) << instr_node.OpFunc()->phi_op_name_ << " raises an exception "
<< platform::demangle(typeid(ex).name()) << ", " << ex.what();
exception_holder_.Catch(std::current_exception());
} catch (...) {
LOG(WARNING) << instr_node.OpFunc()->phi_op_name_
<< " raises an unknown exception";
exception_holder_.Catch(std::current_exception());
}
}
std::string NewIRInterpreter::GetDepsString() const {
std::stringstream ss;
auto downstream_map = dependency_builder_.OpDownstreamMap();
ss << "Note: when static_dep is 1, it is ok that the dynamic_dep will not "
"be decreased to 0."
<< std::endl;
ss << "unfinished_op_number_:" << unfinished_op_number_ << std::endl;
for (size_t i = 0; i < deps_.size(); ++i) {
ss << "op:" << i << ", type: " << vec_instruction_[i].OpBase()->Type()
<< ", static_dep:" << deps_[i]->StaticDep()
<< ", dynamic_dep:" << deps_[i]->DynamicDep() << ", downstream op: ";
for (auto id : downstream_map[i]) {
ss << id << ", ";
}
ss << std::endl;
}
return ss.str();
}
void NewIRInterpreter::ExecuteInstructionList(
const std::vector<Instruction>& vec_instr) {
unfinished_op_number_ = vec_instr.size();
if (unfinished_op_number_ == 0) {
VLOG(4) << "No op to run, return";
return;
}
exception_holder_.Clear();
for (size_t i = 0; i < dependecy_count_.size(); ++i) {
if (dependecy_count_[i] == 0) {
// NOTE(zhiqiu): hot fix for jit input var
RecordMemcpyD2H(vec_instr.at(i));
if (FLAGS_new_executor_serial_run) {
RunInstructionAsync(i);
} else {
async_work_queue_->AddTask(vec_instr.at(i).KernelType(),
[this, i] { RunInstructionAsync(i); });
}
}
}
// For debug hang in main_thread_blocker_.WaitEvent(),
// launch async task to log deps every
// FLAGS_executor_log_deps_every_microseconds, then cancel the std::async when
// main_thread_blocker_.WaitEvent() executed. Why not use std::async instead
// of workqueue? To make sure that the logging thread itself will not affect
// the workqueue
// used in interpretercore.
std::future<int> logged_times;
std::atomic_bool cancel_log = ATOMIC_VAR_INIT(false);
if (FLAGS_executor_log_deps_every_microseconds) {
logged_times = std::async(
std::launch::async,
[this](const std::atomic_bool& cancel) {
int times = 0;
while (!cancel) {
std::this_thread::sleep_for(std::chrono::microseconds(
FLAGS_executor_log_deps_every_microseconds));
// check again, since cancel may be changed during sleep
if (cancel) {
break;
}
VLOG(6) << "deps:\n" << GetDepsString();
times++;
}
return times;
},
std::ref(cancel_log));
}
auto event_name = main_thread_blocker_.WaitEvent();
VLOG(1) << "main_thread_blocker_(" << &main_thread_blocker_
<< ") got event_name: " << event_name;
cancel_log = true;
if (logged_times.valid()) {
VLOG(1) << "Logged deps for " << logged_times.get() << " times";
}
if (UNLIKELY(exception_holder_.IsCaught())) {
VLOG(1) << "Exception caught " << exception_holder_.Type();
// Graceful exit when the executor encountered a fatal error.
// EOF is not a fatal error.
if (exception_holder_.Type() != "EOF") {
async_work_queue_->Cancel();
async_work_queue_.reset();
}
VLOG(4) << "Cancel ok";
PADDLE_ENFORCE_EQ(
main_thread_blocker_.Clear(),
0,
platform::errors::PreconditionNotMet(
"main_thread_blocker_.Clear() return -1, clear failed"));
VLOG(4) << "clear ok";
exception_holder_.ReThrow();
}
}
void NewIRInterpreter::RunNextInstructions(const Instruction& instr,
SchedulingQueue* reserved_next_ops) {
platform::RecordEvent record(
"RunNextInstructions", platform::TracerEventType::UserDefined, 10);
auto IsReady = [this](size_t next_id) {
VLOG(4) << "op_id: " << next_id
<< ", remain deps: " << deps_[next_id]->DynamicDep();
return deps_[next_id]->CheckAndDecrease();
};
for (size_t next_instr_id : instr.NextInstrsInDifferenceThread()) {
if (IsReady(next_instr_id)) {
async_work_queue_->AddTask(
vec_instruction_[next_instr_id].KernelType(),
[this, next_instr_id]() { RunInstructionAsync(next_instr_id); });
}
}
for (size_t next_instr_id : instr.NextInstrsInSameThread()) {
if (IsReady(next_instr_id)) {
reserved_next_ops->push(next_instr_id);
}
}
}
void NewIRInterpreter::RunInstructionAsync(size_t instr_id) {
// NOTE(Ruibiao): Due to the uncertain order in multi-threading asynchronous
// scheduling, the priority order involved cross-thread scheduling is not
// guaranteed. Only Ops scheduled by the same AddTask call have the guarantee
// of priority order.
SchedulingQueue ready_ops(instruction_scheduling_priority_less);
ready_ops.push(instr_id);
while (!ready_ops.empty()) {
instr_id = ready_ops.top();
ready_ops.pop();
auto& instr_node = vec_instruction_.at(instr_id);
RunInstruction(instr_node);
if (UNLIKELY(exception_holder_.IsCaught())) {
VLOG(4) << "Exception caught";
if (exception_notifier_ != nullptr) {
exception_notifier_->NotifyEvent();
}
return;
}
VLOG(4) << "unfinished_op_number_: " << unfinished_op_number_;
if (UNLIKELY(unfinished_op_number_.fetch_sub(
1, std::memory_order_relaxed) == 1)) {
if (completion_notifier_ != nullptr) {
completion_notifier_->NotifyEvent();
}
}
RunNextInstructions(instr_node, &ready_ops);
}
}
void NewIRInterpreter::RecordStreamForGC(const Instruction& instr) {
#if !defined(PADDLE_WITH_CUDA) && !defined(PADDLE_WITH_HIP)
PADDLE_THROW(platform::errors::Unimplemented(
"RecordStreamForGC is only implemented when compiled with GPU."));
#else
if (!IsInterpretercoreFastGCEnabled() ||
instr.KernelType() != OpFuncType::kGpuAsync) {
return;
}
if (instr.DeviceContext().GetPlace().GetType() ==
phi::AllocationType::CUSTOM) {
return;
}
platform::RecordEvent record(
"RecordStreamForGC", platform::TracerEventType::UserDefined, 10);
gpuStream_t stream =
reinterpret_cast<const phi::GPUContext&>(instr.DeviceContext()).stream();
auto TensorRecordStream = [&stream](phi::DenseTensor& tensor) {
auto allocation = tensor.Holder();
if (allocation == nullptr) {
return;
}
const platform::Place& place = allocation->place();
if (platform::is_gpu_place(place)) {
memory::RecordStream(allocation, stream);
} else if (platform::is_cuda_pinned_place(place)) {
// TODO(Ruibiao): Here should do something to make sure that the tensor
// is not freed until the H2D copies done. However, simplely launch a
// CUDA runtime callback to the H2D stream may lead a high performance
// overhead. As all the cases we meet in H2D are copies from CPUPlace at
// present, we just log a WARNING here. A better design is required.
LOG(WARNING) << "Copy data from a CUDAPinned tensor in an asynchronous "
"manner may lead a data inconsistent";
} else {
// memory copies involve CPUPlace are always synchronous, so just do
// nothing here
}
};
/* NOTE(Ruibiao):Cross-stream tensor synchronization is required only when
* all the following conditions are satisfied:
* 1. The tensor will be GC after running the instruction, i.e., in
* instr.GCCheckVars.
* 2. The stream which initializes this tensor is different from the stream
* which the instruction run in.
* 3. The tensor is the instruction's input, cause we assume that
* instruction will initialize all output tensors with its running stream.
* 4. In the OP function of this instruction, the tensor is an input of a
* async CUDA kernel.
*
* Here we only process the first condition, because:
* 1. Since the RecordStream function will directly return when the recored
* stream is equal to the owning stream, recording a stream same as which
* initialized this tensor has less time overhead. Conversely, it may take
* more time if we try to extract those cross-stream input vars from
* instr.GCCheckVars.
* 2. Now the instruction has no idea of which vars involving async running
* in OP function, and thus we can not recognize condition 4. It should be
* supported later.
*/
for (int var_id : instr.GCCheckVars()) {
VLOG(4) << "GC sync " << var_scope_.GetNameById(var_id) << " "
<< var_scope_.VarDesc(var_id);
// persistable var will be ignore while GC
if (var_scope_.VarDesc(var_id) &&
var_scope_.VarDesc(var_id)->Persistable()) {
continue;
}
paddle::framework::Variable* var = var_scope_.VarRef(var_id);
if (var == nullptr) {
continue;
}
if (var->IsType<phi::DenseTensor>()) {
TensorRecordStream(*(var->GetMutable<phi::DenseTensor>()));
} else if (var->IsType<
operators::reader::
OrderedMultiDeviceLoDTensorBlockingQueueHolder>()) {
// do nothing
} else if (var->IsType<phi::SelectedRows>()) {
TensorRecordStream(
*(var->GetMutable<phi::SelectedRows>()->mutable_value()));
} else if (var->IsType<LoDTensorArray>()) {
auto* tensor_arr = var->GetMutable<LoDTensorArray>();
for (auto& tensor : *tensor_arr) {
TensorRecordStream(tensor);
}
} else if (var->IsType<std::vector<Scope*>>()) {
// do nothing
} else {
PADDLE_THROW(platform::errors::Unimplemented(
"The variable(%s) is not supported in eager deletion.",
framework::ToTypeName(var->Type())));
}
}
#endif
}
void NewIRInterpreter::CheckGC(const Instruction& instr) {
platform::RecordEvent record(
"CheckGC", platform::TracerEventType::UserDefined, 10);
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
RecordStreamForGC(instr);
#endif
auto& var_scope = var_scope_;
for (auto var_id : instr.GCCheckVars()) {
VLOG(4) << "GC:" << var_scope_.GetNameById(var_id) << ", id:" << var_id
<< ", ref:" << refs_[var_id]->DynamicRef();
bool is_ready = refs_[var_id]->CheckAndDecrease();
// ignore all persistable var while GC
if (var_scope.VarDesc(var_id) && var_scope.VarDesc(var_id)->Persistable()) {
continue;
}
if (is_ready) {
VLOG(6) << "Async delete variable with name : "
<< var_scope.GetNameById(var_id);
gc_->Add(refs_[var_id]->Var(), instr);
}
}
}
void NewIRInterpreter::Prepare(
const std::vector<std::string>& feed_names,
const std::vector<phi::DenseTensor>& feed_tensors,
bool prepare_feed) {
PADDLE_ENFORCE_EQ(feed_names.size(),
feed_tensors.size(),
platform::errors::PreconditionNotMet(
"Required feed_names.size() == feed_tensors.size(), "
"but received %d != %d",
feed_names.size(),
feed_tensors.size()));
auto FeedInput = [&] {
VLOG(4) << "Feed inputs";
for (size_t i = 0; i < feed_names.size(); ++i) {
auto* feed_var = local_scope_->FindVar(feed_names[i]);
PADDLE_ENFORCE_NOT_NULL(
feed_var,
platform::errors::NotFound("Variable %s should not be nullptr.",
feed_names[i]));
auto feed_tensor = feed_var->GetMutable<phi::DenseTensor>();
feed_tensor->ShareDataWith(feed_tensors[i]);
feed_tensor->set_lod(feed_tensors[i].lod());
}
};
// TODO(dev): Support this
// if (!is_build_) {
// paddle::framework::interpreter::BuildVariableScope(
// block_, execution_config_, &var_scope_);
// FeedInput();
// std::vector<paddle::framework::OpFuncNode> op_func_nodes;
// paddle::framework::interpreter::BuildOpFuncList(
// place_,
// block_,
// execution_config_.skip_gc_vars,
// &op_func_nodes,
// &var_scope_,
// execution_config_,
// HasLocalScope(),
// static_build_);
// SetFeedVarsInplaceSkip(feed_names);
// // convert vec func_list to graph
// Convert(&op_func_nodes);
// UpdateSyncOpNum();
// if (static_build_) {
// VLOG(4) << "RUN impl";
// RunImpl();
// }
// BuildSkipShareLoDInfo();
// is_build_ = true;
// }
// NOTE: Because feed_tensor will be GC after
// paddle::framework::BuildOpFuncList, so we should
// call FeedInput again.
if (prepare_feed) {
FeedInput();
}
}
void NewIRInterpreter::SetFeedVarsInplaceSkip(
const std::vector<std::string>& feed_names) {
for (auto& feed_name : feed_names) {
var_scope_.SetVarSikpInplace(feed_name, true);
}
const interpreter::NewIrStreamAnalyzer&
NewIRInterpreter::GetNewIrStreamAnalyzer() const {
return ir_stream_analyzer_;
}
bool NewIRInterpreter::HasLocalScope() const { return local_scope_ != nullptr; }
Scope* NewIRInterpreter::InnerScope() {
return local_scope_ != nullptr ? local_scope_ : scope_;
bool NewIRInterpreter::IsSharedResultsBuild() const {
return is_shared_results_build_;
}
// Note(zhangbo):
// (1) What is "Trace"?
// The OP execute scheduling rule adopted by Interpretercore by default is a
// multi-threaded scheduling mode(see ExecuteInstructionList). By maintaining a
// high-performance thread pool, the OP's execute scheduling is distributed to
// the sub threads maintained by the thread pool, but the main thread does not
// have any tasks. In Trace mode, the executor will execute directly in the main
// thread according to the pre provided OP sequence(trace_execute_order_),
// instead of being distributed to the thread pool.
// (2) When we use "Trace"?
// In dygraph to static, This scheduling causes that the execution of the
// forward and backward OPs and the execution of the dygraph optimizer cannot be
// executed in the same thread. Executing thread switch may cause cpu cache
// miss. When a model is all KQueueAsync type OPs, all OPs will be distributed
// to the DeviceThread for execution, and the multithreading scheduling will not
// have any benefits. Therefore, in the dynamic to static, when the number of
// KQueueAsync Ops is 0, we choose Trace mode.
void NewIRInterpreter::TraceInstructionList(
const std::vector<Instruction>& vec_instr) {
unfinished_op_number_ = vec_instr.size();
if (unfinished_op_number_ == 0) {
VLOG(4) << "No op to run, return";
return;
std::shared_ptr<interpreter::AsyncWorkQueue> NewIRInterpreter::GetWorkQueue() {
if (async_work_queue_ == nullptr) {
async_work_queue_ = std::make_shared<interpreter::AsyncWorkQueue>(
execution_config_.host_num_threads,
execution_config_.device_num_threads,
nullptr);
}
return async_work_queue_;
}
exception_holder_.Clear();
void NewIRInterpreter::PrepareForCUDAGraphCapture() {
if (!FLAGS_new_executor_use_cuda_graph) return;
#ifdef PADDLE_WITH_CUDA
PADDLE_ENFORCE_EQ(
platform::IsCUDAGraphCapturing(),
false,
platform::errors::PermissionDenied("CUDA Graph is not allowed to capture "
"before prepare."));
PADDLE_ENFORCE_EQ(platform::is_gpu_place(place_),
true,
platform::errors::InvalidArgument(
"CUDA Graph is only supported on NVIDIA GPU device."));
// If set true, will call `cudaStreamSynchronize(nccl_stream)`after allreduce.
// which may cause error in cuda graph. This behavior is consistent with PE.
PADDLE_ENFORCE_EQ(FLAGS_sync_nccl_allreduce,
false,
platform::errors::InvalidArgument(
"FLAGS_sync_nccl_allreduce must be False to support "
"CUDA Graph capturing."));
#else
PADDLE_THROW(platform::errors::Unimplemented(
"CUDA Graph is only supported on NVIDIA GPU device."));
#endif
}
for (size_t i = 0; i < dependecy_count_.size(); ++i) {
if (dependecy_count_[i] == 0) {
// NOTE(zhiqiu): hot fix for jit input var
RecordMemcpyD2H(vec_instr.at(i));
}
void NewIRInterpreter::CheckCUDAGraphBeforeRun(
const std::vector<std::string>& feed_names) {
#ifdef PADDLE_WITH_CUDA
if (platform::IsCUDAGraphCapturing()) {
PADDLE_ENFORCE_EQ(
feed_names.empty(),
true,
platform::errors::InvalidArgument(
"Feeding data is not permitted when capturing CUDA Graph."));
PADDLE_ENFORCE_EQ(
FLAGS_new_executor_use_cuda_graph,
true,
platform::errors::InvalidArgument(
"You must turn on FLAGS_new_executor_use_cuda_graph to True "
"to enable CUDA Graph capturing."));
PADDLE_ENFORCE_EQ(
place_,
platform::CUDAGraphCapturingPlace(),
platform::errors::InvalidArgument("The place to capture CUDAGraph is "
"not the same as the place to run."));
}
#endif
}
// TODO(phlrain) use orignal order for now, use better dependecy
for (auto& instr_node : vec_instruction_) {
/// auto instr_id = trace_execute_order_[idx];
RunInstruction(instr_node);
if (UNLIKELY(exception_holder_.IsCaught())) {
VLOG(4) << "Exception caught";
break;
void NewIRInterpreter::ClearLoDTensorArrayInLocalScope() {
auto vars = local_scope_->LocalVars();
for (auto var : vars) {
if (var->IsType<LoDTensorArray>()) {
auto* lod_tensor_arr = var->GetMutable<LoDTensorArray>();
lod_tensor_arr->clear();
}
}
if (UNLIKELY(exception_holder_.IsCaught())) {
VLOG(1) << "Exception caught " << exception_holder_.Type();
PADDLE_ENFORCE_EQ(
main_thread_blocker_.Clear(),
0,
platform::errors::PreconditionNotMet(
"main_thread_blocker_.Clear() return -1, clear failed"));
VLOG(4) << "clear ok";
exception_holder_.ReThrow();
}
}
void NewIRInterpreter::RecordMemcpyD2H(const Instruction& instr_node) {
// NOTE(zhiqiu): hot fix for jit input var
if (instr_node.OpBaseValid() &&
instr_node.OpBase()->Type() == interpreter::kMemcpyD2H) {
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
auto* default_dev_ctx = pool.Get(place_);
for (auto& event : instr_node.EventsToWait()) {
platform::RecordEvent record(
"RecordStreamEvent", platform::TracerEventType::UserDefined, 10);
VLOG(3) << "Record event on default stream in jit_input_var at op: "
<< instr_node.OpBase()->Type();
event.event_->Record(default_dev_ctx);
std::string NewIRInterpreter::GetDepsString() const {
std::stringstream ss;
auto downstream_map = ir_dependency_builder_.OpDownstreamMap();
ss << "Note: when static_dep is 1, it is ok that the dynamic_dep will not "
"be decreased to 0."
<< std::endl;
ss << "unfinished_op_number_:" << unfinished_op_number_ << std::endl;
for (size_t i = 0; i < deps_.size(); ++i) {
ss << "op:" << i << ", type: " << vec_instruction_base_[i]->Name()
<< ", static_dep:" << deps_[i]->StaticDep()
<< ", dynamic_dep:" << deps_[i]->DynamicDep() << ", downstream op: ";
for (auto id : downstream_map[i]) {
ss << id << ", ";
}
ss << std::endl;
}
return ss.str();
}
bool NewIRInterpreter::HasLocalScope() const { return local_scope_ != nullptr; }
Scope* NewIRInterpreter::InnerScope() {
return local_scope_ != nullptr ? local_scope_ : scope_;
}
void NewIRInterpreter::UpdateSyncOpNum() {
int64_t sync_op_num = 0;
for (auto& ins : vec_instruction_) {
if (ins.KernelType() == OpFuncType::kCpuSync ||
ins.KernelType() == OpFuncType::kGpuSync) {
for (auto& ins : vec_instruction_base_) {
if (ins->KernelType() == OpFuncType::kCpuSync ||
ins->KernelType() == OpFuncType::kGpuSync) {
sync_op_num = sync_op_num + 1;
}
}
......@@ -1486,8 +372,8 @@ void NewIRInterpreter::AnalyseExecuteOrderForTrace(
std::vector<size_t> trace_order;
SchedulingQueue ready_ops(compare);
for (size_t instr_id = 0; instr_id < dependecy_count_.size(); ++instr_id) {
if (dependecy_count_[instr_id] == 0) {
for (size_t instr_id = 0; instr_id < dependecy_count_->size(); ++instr_id) {
if ((*dependecy_count_)[instr_id] == 0) {
ready_ops.push(instr_id);
}
}
......@@ -1508,7 +394,7 @@ void NewIRInterpreter::AnalyseExecuteOrderForTrace(
PADDLE_ENFORCE_EQ(
trace_order.size(),
dependecy_count_.size(),
dependecy_count_->size(),
platform::errors::PreconditionNotMet(
"trace_order size should be equal to dependecy_count_."));
......@@ -1608,7 +494,10 @@ void NewIRInterpreter::BuildInstructionDependences() {
// analysis the dependences between instructions, add next_instr_list to each
// instr, and set the dependecy_count_
size_t instr_num = vec_instruction_base_.size();
dependecy_count_ = std::vector<size_t>(instr_num, 0);
dependecy_count_ = GetDependencyCount();
if (!is_shared_results_build_) {
dependecy_count_->assign(instr_num, 0);
}
std::vector<paddle::framework::InstructionBase*> instructions_ptr;
for (auto& instr : vec_instruction_base_) {
......@@ -1649,8 +538,10 @@ void NewIRInterpreter::BuildInstructionDependences() {
}
}
for (size_t next_instr_id : next_instr_ids) {
++dependecy_count_[next_instr_id];
if (!is_shared_results_build_) {
for (size_t next_instr_id : next_instr_ids) {
++(*dependecy_count_)[next_instr_id];
}
}
}
}
......@@ -1886,7 +777,7 @@ void NewIRInterpreter::CalculateLastLiveOps() {
var_ref_count_[i] = last_live_ops_[i].size();
}
for (auto& dep : dependecy_count_) {
for (auto& dep : *dependecy_count_) {
deps_.emplace_back(std::make_shared<interpreter::OpDepInfo>(dep));
}
for (size_t i = 0; i < variable_list_.size(); ++i) {
......@@ -1896,8 +787,8 @@ void NewIRInterpreter::CalculateLastLiveOps() {
}
void NewIRInterpreter::ConstructEventForJitInput() {
for (size_t i = 0; i < dependecy_count_.size(); ++i) {
if (dependecy_count_[i] == 0) {
for (size_t i = 0; i < dependecy_count_->size(); ++i) {
if ((*dependecy_count_)[i] == 0) {
InstructionBase* inst = vec_instruction_base_[i].get();
if (inst->Name() == "pd.memcpy_d2h" && platform::is_gpu_place(place_)) {
for (auto& item : inst->Inputs()) {
......@@ -1918,8 +809,15 @@ void NewIRInterpreter::ConstructEventForJitInput() {
}
}
FetchList NewIRInterpreter::BetaRun(const std::vector<std::string>& feed_names,
bool need_fetch) {
paddle::framework::FetchList NewIRInterpreter::Run(
const std::vector<std::string>& feed_names,
const std::vector<phi::DenseTensor>& feed_tensors) {
PADDLE_THROW(platform::errors::Unimplemented(
"Run with feed_tensors is not implemented in NewIRInterpreter."));
}
FetchList NewIRInterpreter::Run(const std::vector<std::string>& feed_names,
bool need_fetch) {
SetDeviceId(place_);
CheckCUDAGraphBeforeRun(feed_names);
......@@ -1956,22 +854,27 @@ FetchList NewIRInterpreter::BetaRun(const std::vector<std::string>& feed_names,
VLOG(4) << "Done PreAnalysis";
// Run
if (FLAGS_enable_new_ir_in_executor_loop_run) {
LOG_FIRST_N(INFO, 1) << "New ir interpreter is running in BetaRun mode "
"with for_loop version.";
LoopRunImpl();
} else {
if (FLAGS_enable_new_ir_in_executor_trace_run ||
((execution_config_.used_for_jit || execution_config_.used_for_cinn) &&
(sync_op_num_ == 0))) {
LOG_FIRST_N(INFO, 1) << "New ir interpreter is running in BetaRun mode "
"with trace version.";
TraceRunImpl();
} else {
LOG_FIRST_N(INFO, 1) << "New ir interpreter is running in BetaRun mode "
"with multi thread version.";
MultiThreadRunImpl();
}
is_build_ = true;
is_shared_results_build_ = true;
} else {
if (FLAGS_enable_new_ir_in_executor_loop_run) {
LoopRunImpl();
} else {
if (FLAGS_enable_new_ir_in_executor_trace_run ||
((execution_config_.used_for_jit || execution_config_.used_for_cinn) &&
(sync_op_num_ == 0))) {
TraceRunImpl();
} else {
MultiThreadRunImpl();
}
}
......@@ -2013,33 +916,34 @@ FetchList NewIRInterpreter::BetaRun(const std::vector<std::string>& feed_names,
}
}
void NewIRInterpreter::LoopRunImpl() {
void NewIRInterpreter::TraceRunImpl() {
// lazy initialization of gc, do not create gc is the program only run once
if (!gc_) {
gc_ = CreateInterpreterCoreGarbageCollector(place_, vec_instruction_base_);
}
interpreter::ResetAtomicGuard guard(&deps_, &refs_);
VLOG(4) << "Loop Instruction List";
VLOG(4) << "Tracing Instruction List";
LoopRunInstructionList(vec_instruction_base_);
VLOG(4) << "Done LoopRunImpl";
TraceRunInstructionList(vec_instruction_base_);
VLOG(4) << "Done TraceRunInstructionList";
}
void NewIRInterpreter::TraceRunImpl() {
void NewIRInterpreter::MultiThreadRunImpl() {
// lazy initialization of gc, do not create gc is the program only run once
if (!gc_) {
gc_ = CreateInterpreterCoreGarbageCollector(place_, vec_instruction_base_);
}
interpreter::ResetAtomicGuard guard(&deps_, &refs_);
VLOG(4) << "Tracing Instruction List";
VLOG(4) << "Multi Thread Run Instruction List";
TraceRunInstructionList(vec_instruction_base_);
VLOG(4) << "Done TraceRunImpl";
async_work_queue_ = GetWorkQueue();
MultiThreadRunInstructionList(vec_instruction_base_);
VLOG(4) << "Done MultiThreadRunInstructionList";
}
void NewIRInterpreter::LoopRunInstructionList(
void NewIRInterpreter::TraceRunInstructionList(
const std::vector<std::unique_ptr<InstructionBase>>& vec_instr) {
unfinished_op_number_ = vec_instr.size();
if (unfinished_op_number_ == 0) {
......@@ -2049,17 +953,19 @@ void NewIRInterpreter::LoopRunInstructionList(
exception_holder_.Clear();
for (size_t i = 0; i < dependecy_count_.size(); ++i) {
if (dependecy_count_[i] == 0) {
for (size_t i = 0; i < dependecy_count_->size(); ++i) {
if ((*dependecy_count_)[i] == 0) {
// NOTE(zhiqiu): hot fix for jit input var
RecordMemcpyD2H(vec_instr.at(i).get());
}
}
for (size_t idx = 0; idx < vec_instr.size(); idx++) {
InstructionBase* instr_node = vec_instr[idx].get();
for (size_t idx = 0; idx < trace_execute_order_.size(); idx++) {
auto instr_id = trace_execute_order_[idx];
InstructionBase* instr_node = vec_instruction_base_.at(instr_id).get();
VLOG(6) << "Run InstructionBase " << idx;
VLOG(6) << "Run InstructionBase " << instr_node->Name() << "[" << instr_id
<< "]";
RunInstructionBase(instr_node);
if (UNLIKELY(exception_holder_.IsCaught())) {
......@@ -2078,10 +984,10 @@ void NewIRInterpreter::LoopRunInstructionList(
VLOG(4) << "clear ok";
exception_holder_.ReThrow();
}
VLOG(4) << "Done LoopRunInstructionList";
VLOG(4) << "Done TraceRunInstructionList";
}
void NewIRInterpreter::TraceRunInstructionList(
void NewIRInterpreter::MultiThreadRunInstructionList(
const std::vector<std::unique_ptr<InstructionBase>>& vec_instr) {
unfinished_op_number_ = vec_instr.size();
if (unfinished_op_number_ == 0) {
......@@ -2091,29 +997,67 @@ void NewIRInterpreter::TraceRunInstructionList(
exception_holder_.Clear();
for (size_t i = 0; i < dependecy_count_.size(); ++i) {
if (dependecy_count_[i] == 0) {
for (size_t i = 0; i < dependecy_count_->size(); ++i) {
if ((*dependecy_count_)[i] == 0) {
// NOTE(zhiqiu): hot fix for jit input var
RecordMemcpyD2H(vec_instr.at(i).get());
if (FLAGS_new_executor_serial_run) {
RunInstructionBaseAsync(i);
} else {
async_work_queue_->AddTask(vec_instr.at(i)->KernelType(),
[this, i] { RunInstructionBaseAsync(i); });
}
}
}
for (size_t idx = 0; idx < trace_execute_order_.size(); idx++) {
auto instr_id = trace_execute_order_[idx];
InstructionBase* instr_node = vec_instruction_base_.at(instr_id).get();
// For debug hang in main_thread_blocker_.WaitEvent(),
// launch async task to log deps every
// FLAGS_executor_log_deps_every_microseconds, then cancel the std::async when
// main_thread_blocker_.WaitEvent() executed. Why not use std::async instead
// of workqueue? To make sure that the logging thread itself will not affect
// the workqueue
// used in interpretercore.
VLOG(6) << "Run InstructionBase " << instr_node->Name() << "[" << instr_id
<< "]";
RunInstructionBase(instr_node);
std::future<int> logged_times;
std::atomic_bool cancel_log = ATOMIC_VAR_INIT(false);
if (FLAGS_executor_log_deps_every_microseconds) {
logged_times = std::async(
std::launch::async,
[this](const std::atomic_bool& cancel) {
int times = 0;
while (!cancel) {
std::this_thread::sleep_for(std::chrono::microseconds(
FLAGS_executor_log_deps_every_microseconds));
// check again, since cancel may be changed during sleep
if (cancel) {
break;
}
VLOG(0) << "deps:\n" << GetDepsString();
times++;
}
return times;
},
std::ref(cancel_log));
}
if (UNLIKELY(exception_holder_.IsCaught())) {
VLOG(4) << "Exception caught";
break;
}
auto event_name = main_thread_blocker_.WaitEvent();
VLOG(1) << "main_thread_blocker_(" << &main_thread_blocker_
<< ") got event_name: " << event_name;
cancel_log = true;
if (logged_times.valid()) {
VLOG(1) << "Logged deps for " << logged_times.get() << " times";
}
if (UNLIKELY(exception_holder_.IsCaught())) {
VLOG(1) << "Exception caught " << exception_holder_.Type();
// Graceful exit when the executor encountered a fatal error.
// EOF is not a fatal error.
if (exception_holder_.Type() != "EOF") {
async_work_queue_->Cancel();
async_work_queue_.reset();
}
VLOG(4) << "Cancel ok";
PADDLE_ENFORCE_EQ(
main_thread_blocker_.Clear(),
0,
......@@ -2122,7 +1066,66 @@ void NewIRInterpreter::TraceRunInstructionList(
VLOG(4) << "clear ok";
exception_holder_.ReThrow();
}
VLOG(4) << "Done TraceRunInstructionList";
}
void NewIRInterpreter::RunInstructionBaseAsync(size_t instr_id) {
// NOTE(Ruibiao): Due to the uncertain order in multi-threading asynchronous
// scheduling, the priority order involved cross-thread scheduling is not
// guaranteed. Only Ops scheduled by the same AddTask call have the guarantee
// of priority order.
SchedulingQueue ready_ops(ir_instruction_scheduling_priority_less);
ready_ops.push(instr_id);
while (!ready_ops.empty()) {
instr_id = ready_ops.top();
ready_ops.pop();
auto* instr_node = vec_instruction_base_.at(instr_id).get();
RunInstructionBase(instr_node);
if (UNLIKELY(exception_holder_.IsCaught())) {
VLOG(4) << "Exception caught";
if (exception_notifier_ != nullptr) {
exception_notifier_->NotifyEvent();
}
return;
}
VLOG(4) << "unfinished_op_number_: " << unfinished_op_number_;
if (UNLIKELY(unfinished_op_number_.fetch_sub(
1, std::memory_order_relaxed) == 1)) {
if (completion_notifier_ != nullptr) {
completion_notifier_->NotifyEvent();
}
}
RunNextInstructions(instr_node, &ready_ops);
}
}
void NewIRInterpreter::RunNextInstructions(InstructionBase* instr,
SchedulingQueue* reserved_next_ops) {
platform::RecordEvent record(
"RunNextInstructions", platform::TracerEventType::UserDefined, 10);
auto IsReady = [this](size_t next_id) {
VLOG(4) << "op_id: " << next_id
<< ", remain deps: " << deps_[next_id]->DynamicDep();
return deps_[next_id]->CheckAndDecrease();
};
for (size_t next_instr_id : instr->NextInstrsInDifferenceThread()) {
if (IsReady(next_instr_id)) {
async_work_queue_->AddTask(
vec_instruction_base_[next_instr_id]->KernelType(),
[this, next_instr_id]() { RunInstructionBaseAsync(next_instr_id); });
}
}
for (size_t next_instr_id : instr->NextInstrsInSameThread()) {
if (IsReady(next_instr_id)) {
reserved_next_ops->push(next_instr_id);
}
}
}
void NewIRInterpreter::RunInstructionBase(InstructionBase* instr_node) {
......@@ -2178,6 +1181,9 @@ void NewIRInterpreter::PreAnalysis() {
AnalyseExecuteOrderForTrace(ir_dependency_builder_.OpDownstreamMap(),
ir_instruction_scheduling_priority_less);
VLOG(4) << "Done AnalyseExecuteOrderForTrace";
UpdateSyncOpNum();
VLOG(4) << "Done UpdateSyncOpNum";
}
::ir::Value NewIRInterpreter::GetValueByName(const std::string& var_name) {
......
......@@ -49,10 +49,6 @@ class NewIRInterpreter : public InterpreterBaseImpl {
paddle::framework::FetchList Run(const std::vector<std::string>& feed_names,
bool need_fetch = true) override;
paddle::framework::FetchList BetaRun(
const std::vector<std::string>& feed_names,
bool need_fetch = true) override;
void ShareWorkQueueFrom(InterpreterBaseImpl* src) override;
void ShareBuildResultsFrom(const InterpreterBaseImpl& src) override;
......@@ -92,10 +88,6 @@ class NewIRInterpreter : public InterpreterBaseImpl {
private:
// build graph
void Convert(std::vector<paddle::framework::OpFuncNode>* op_func_nodes);
void BuildOperatorDependences();
void BuildAndCacheInstructionCtx(Instruction* instr_node);
void BuildSkipShareLoDInfo();
void UpdateSyncOpNum();
void AnalyseExecuteOrderForTrace(
std::map<size_t, std::set<size_t>> op_downstream_map,
......@@ -103,39 +95,13 @@ class NewIRInterpreter : public InterpreterBaseImpl {
void ConstructEventForJitInput();
void CalculateLastLiveOps();
// inplace
void BuildInplace();
bool BuildInplaceCheckVarIsOnlyInput(
const std::vector<std::vector<size_t>>& input_var2op, size_t var_index);
void SetFeedVarsInplaceSkip(const std::vector<std::string>& feed_names);
// gc
void ClearLoDTensorArrayInLocalScope();
// cuda graph
void CheckCUDAGraphBeforeRun(const std::vector<std::string>& feed_names);
void PrepareForCUDAGraphCapture();
// execution
void RunImpl();
void ExecuteInstructionList(const std::vector<Instruction>& vec_instr);
void RunInstructionAsync(size_t instr_id);
void RunInstruction(const Instruction& instr_node);
void RunNextInstructions(const Instruction& instr_id,
SchedulingQueue* reserved_next_ops);
void RunOperator(const Instruction& instr_node);
// Trace
void TraceInstructionList(const std::vector<Instruction>& vec_instr);
// only used when program contains no feed op
void Prepare(const std::vector<std::string>& feed_names,
const std::vector<phi::DenseTensor>& feed_tensors,
bool prepare_feed);
void RecordMemcpyD2H(const Instruction& instr_node);
// gc
void RecordStreamForGC(const Instruction& instr);
void CheckGC(const Instruction& instr);
void ClearLoDTensorArrayInLocalScope();
// workqueue
std::shared_ptr<interpreter::AsyncWorkQueue> GetWorkQueue();
......@@ -150,23 +116,12 @@ class NewIRInterpreter : public InterpreterBaseImpl {
bool is_build_{false};
bool static_build_{false};
const platform::Place place_;
// Note(sonder): share the op dependency and event analysis procedure.
bool is_shared_results_build_{false};
interpreter::DependencyBuilder dependency_builder_;
interpreter::StreamAnalyzer stream_analyzer_;
// NOTE(zhiqiu): when add fetch ops in GetInterpreterCore, we will
// copy a new program and block, the copy_program_ here is used to
// hold the program, otherwise block_ maybe not valid after the
// new program is deleted.
std::shared_ptr<ProgramDesc> copy_program_{nullptr};
const platform::Place place_;
// from variable scope
std::vector<Variable*> var_list_;
std::map<std::string, int> name2id_;
std::vector<VariableMetaInfo> vec_meta_info_;
std::vector<Instruction> vec_instruction_; // deconstruct before OpFuncNode
std::atomic<size_t> unfinished_op_number_{0};
......@@ -189,9 +144,9 @@ class NewIRInterpreter : public InterpreterBaseImpl {
// var
std::map<size_t, std::set<size_t>> last_live_ops_;
// dependecy_count_[i] contains the number of dependencies that the i-th op
// (*dependecy_count_)[i] contains the number of dependencies that the i-th op
// need to wait
std::vector<size_t> dependecy_count_;
std::shared_ptr<std::vector<size_t>> dependecy_count_;
std::vector<std::shared_ptr<interpreter::OpDepInfo>> deps_;
std::vector<std::shared_ptr<interpreter::VarRefInfo>> refs_;
......@@ -200,8 +155,6 @@ class NewIRInterpreter : public InterpreterBaseImpl {
int64_t sync_op_num_{-1};
std::vector<size_t> trace_execute_order_;
InstructionSchedulingPriorityLess instruction_scheduling_priority_less;
std::vector<HookFunc> hookfuncs_;
/// ======================== ///
......@@ -215,16 +168,21 @@ class NewIRInterpreter : public InterpreterBaseImpl {
void BuildInstructionDependences();
void LoopRunImpl();
void TraceRunImpl();
void TraceRunInstructionList(
const std::vector<std::unique_ptr<InstructionBase>>& vec_instr);
void LoopRunInstructionList(
void MultiThreadRunImpl();
void MultiThreadRunInstructionList(
const std::vector<std::unique_ptr<InstructionBase>>& vec_instr);
void RunInstructionBaseAsync(size_t instr_id);
void RunNextInstructions(InstructionBase* instr,
SchedulingQueue* reserved_next_ops);
void RunInstructionBase(InstructionBase* instr_node);
void RecordMemcpyD2H(InstructionBase* instr_node);
......@@ -237,6 +195,12 @@ class NewIRInterpreter : public InterpreterBaseImpl {
void SolvePersisableVarNames();
const interpreter::NewIrDependencyBuilder& GetNewIrDependencyBuilder()
const override;
const interpreter::NewIrStreamAnalyzer& GetNewIrStreamAnalyzer()
const override;
InstructionSchedulingPriorityLess ir_instruction_scheduling_priority_less;
std::unique_ptr<::ir::Program> ir_program_{nullptr};
......
......@@ -222,11 +222,6 @@ FetchList ProgramInterpreter::Run(const std::vector<std::string>& feed_names,
}
}
FetchList ProgramInterpreter::BetaRun(
const std::vector<std::string>& feed_names, bool need_fetch) {
return {};
}
void ProgramInterpreter::SetCopyProgram(std::shared_ptr<ProgramDesc> prog) {
copy_program_ = prog;
}
......@@ -348,6 +343,18 @@ const interpreter::StreamAnalyzer& ProgramInterpreter::GetStreamAnalyzer()
return stream_analyzer_;
}
const interpreter::NewIrDependencyBuilder&
ProgramInterpreter::GetNewIrDependencyBuilder() const {
PADDLE_THROW(platform::errors::Unimplemented(
"GetDependencyBuilder is not implemented in ProgramInterpreter."));
}
const interpreter::NewIrStreamAnalyzer&
ProgramInterpreter::GetNewIrStreamAnalyzer() const {
PADDLE_THROW(platform::errors::Unimplemented(
"GetDependencyBuilder is not implemented in ProgramInterpreter."));
}
bool ProgramInterpreter::IsSharedResultsBuild() const {
return is_shared_results_build_;
}
......
......@@ -48,10 +48,6 @@ class ProgramInterpreter : public InterpreterBaseImpl {
paddle::framework::FetchList Run(const std::vector<std::string>& feed_names,
bool need_fetch = true) override;
paddle::framework::FetchList BetaRun(
const std::vector<std::string>& feed_names,
bool need_fetch = true) override;
void ShareWorkQueueFrom(InterpreterBaseImpl* src) override;
void ShareBuildResultsFrom(const InterpreterBaseImpl& src) override;
......@@ -63,6 +59,12 @@ class ProgramInterpreter : public InterpreterBaseImpl {
const interpreter::StreamAnalyzer& GetStreamAnalyzer() const override;
const interpreter::NewIrDependencyBuilder& GetNewIrDependencyBuilder()
const override;
const interpreter::NewIrStreamAnalyzer& GetNewIrStreamAnalyzer()
const override;
bool IsSharedResultsBuild() const override;
void SetCopyProgram(std::shared_ptr<ProgramDesc> prog) override;
......
......@@ -1283,25 +1283,13 @@ PHI_DEFINE_EXPORTED_bool(enable_new_ir_api,
/**
* Using new IR in executor FLAG
* Name: enable_new_ir_in_executor_beta_run
* Since Version: 2.6.0
* Value Range: bool, default=true
* Example:
* Note: If Ture, executor will use new IR and run in beta version.
*/
PHI_DEFINE_EXPORTED_bool(enable_new_ir_in_executor_beta_run,
true,
"Enable new IR in executor");
/**
* Using new IR in executor FLAG
* Name: enable_new_ir_in_executor_loop_run
* Name: enable_new_ir_in_executor_trace_run
* Since Version: 2.6.0
* Value Range: bool, default=false
* Example:
* Note: If Ture, executor will use new IR and run in beta version by for loop
* Note: If Ture, executor will use new IR and run in beta version by for trace
* version.
*/
PHI_DEFINE_EXPORTED_bool(enable_new_ir_in_executor_loop_run,
PHI_DEFINE_EXPORTED_bool(enable_new_ir_in_executor_trace_run,
false,
"Enable new IR in executor");
......@@ -77,7 +77,7 @@ TEST(StandaloneExecutor, run) {
std::string out_name = os.str() + "_inner_var_2";
test_core.SetSkipGcVars({out_name});
test_core.BetaRun({});
test_core.Run({});
auto out_tensor =
test_core.local_scope() == nullptr
......@@ -118,7 +118,7 @@ TEST(StandaloneExecutor, run_inplace_sqrt) {
std::string out_name = os.str() + "_inner_var_0";
test_core.SetSkipGcVars({out_name});
test_core.BetaRun({});
test_core.Run({});
auto out_tensor =
test_core.local_scope() == nullptr
......
......@@ -76,7 +76,7 @@ TEST(VJP, TanhBackwardTest) {
std::string prefix_str = os.str();
test_core.SetSkipGcVars(
{prefix_str + "_inner_var_1", prefix_str + "_inner_var_3"});
test_core.BetaRun({});
test_core.Run({});
auto out_tensor =
test_core.local_scope() == nullptr
? scope.FindVar(prefix_str + "_inner_var_1")->Get<phi::DenseTensor>()
......@@ -130,7 +130,7 @@ TEST(VJP, Tanh_BackwardTest) {
std::string prefix_str = os.str();
test_core.SetSkipGcVars(
{prefix_str + "_inner_var_0", prefix_str + "_inner_var_2"});
test_core.BetaRun({});
test_core.Run({});
auto out_tensor =
test_core.local_scope() == nullptr
? scope.FindVar(prefix_str + "_inner_var_0")->Get<phi::DenseTensor>()
......@@ -184,7 +184,7 @@ TEST(VJP, MeanBackwardTest) {
std::string prefix_str = os.str();
test_core.SetSkipGcVars(
{prefix_str + "_inner_var_1", prefix_str + "_inner_var_3"});
test_core.BetaRun({});
test_core.Run({});
auto out_tensor =
test_core.local_scope() == nullptr
? scope.FindVar(prefix_str + "_inner_var_1")->Get<phi::DenseTensor>()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册