未验证 提交 75122319 编写于 作者: W WangZhen 提交者: GitHub

[JitLayer]Polish PEFuntion to speed up JitLayer and fix memory leak (#44738)

* Polish PEFuntion to speed up JitLayer

* Polish PEFunction code

* Fix comments
上级 212f015f
......@@ -22,6 +22,7 @@
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/phi/core/enforce.h"
#include "paddle/fluid/jit/base_function.h"
#include "paddle/fluid/jit/function_schema.h"
......@@ -36,9 +37,14 @@ class ExecutorFunction : public BaseFunction {
const Name2VariableMap &params_dict,
const phi::Place &place)
: info_(info), place_(place), inner_exe_(place_) {
info_->RemoveDescFeedFetch();
PADDLE_ENFORCE_GT(
static_cast<int64_t>(info_->ProgramDesc().Block(0).OpSize()),
0,
platform::errors::PreconditionNotMet(
"There is no operator in ProgramDesc."));
utils::ShareParamsIntoScope(info_->ParamNames(), params_dict, &scope_);
VLOG(6) << framework::GenScopeTreeDebugInfo(&scope_);
info_->RemoveDescFeedFetch();
}
~ExecutorFunction() noexcept {}
......@@ -56,9 +62,9 @@ class ExecutorFunction : public BaseFunction {
false,
true,
info_->OutputArgNames());
std::vector<DenseTensor> res;
utils::FetchOuts(info_->OutputArgNames(), scope_, &res);
return res;
std::vector<DenseTensor> outputs;
utils::FetchOuts(info_->OutputArgNames(), scope_, &outputs);
return outputs;
}
const std::shared_ptr<FunctionInfo> &Info() const { return info_; }
......
......@@ -19,10 +19,14 @@
#include <vector>
#include "paddle/fluid/framework/block_desc.h"
#include "paddle/fluid/framework/details/build_strategy.h"
#include "paddle/fluid/framework/details/execution_strategy.h"
#include "paddle/fluid/framework/executor_cache.h"
#include "paddle/fluid/framework/ir/graph.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/phi/core/enforce.h"
#include "paddle/fluid/jit/base_function.h"
#include "paddle/fluid/jit/function_schema.h"
......@@ -31,72 +35,99 @@
namespace paddle {
namespace jit {
using ExecutionStrategy = framework::details::ExecutionStrategy;
using ParallelExecutor = framework::ParallelExecutor;
using Graph = framework::ir::Graph;
class PEFunction : public BaseFunction {
public:
PEFunction(const std::shared_ptr<FunctionInfo> &info,
const Name2VariableMap &params_dict,
const phi::Place &place)
: info_(info), place_(place) {
info_->RemoveDescFeedFetch();
PADDLE_ENFORCE_GT(
static_cast<int64_t>(info_->ProgramDesc().Block(0).OpSize()),
0,
platform::errors::PreconditionNotMet(
"There is no operator in ProgramDesc."));
utils::ShareParamsIntoScope(info_->ParamNames(), params_dict, &scope_);
VLOG(6) << framework::GenScopeTreeDebugInfo(&scope_);
info_->RemoveDescFeedFetch();
CreateGraphAndPE();
}
~PEFunction() noexcept {}
std::vector<Tensor> operator()(const std::vector<Tensor> &inputs) {
auto dense_tensors = utils::ToDenseTensors(inputs);
return utils::ToTensors(this->operator()(dense_tensors));
static ExecutionStrategy GetExecutionStrategy(const platform::Place &place) {
ExecutionStrategy execution_strategy;
auto device_type = platform::Place2DeviceType(place);
switch (device_type) {
case platform::DeviceType::CPU: {
execution_strategy.num_threads_ = 2;
break;
}
case platform::DeviceType::CUDA: {
// NOTE: According experiments, one thread is faster in
// most model training.
execution_strategy.num_threads_ = 1;
break;
}
case platform::DeviceType::XPU: {
execution_strategy.num_threads_ = 1;
break;
}
case platform::DeviceType::IPU: {
execution_strategy.num_threads_ = 1;
break;
}
default:
PADDLE_THROW(platform::errors::Unavailable(
"Unsupported Device type %d.", device_type));
}
execution_strategy.use_device_ = device_type;
std::vector<DenseTensor> operator()(const std::vector<DenseTensor> &inputs) {
std::string prog_string;
std::hash<std::string> string_hash;
return execution_strategy;
}
auto &program_desc = info_->ProgramDesc();
// TODO(dev): Serialize is very slow.
const_cast<framework::ProgramDesc *>(&program_desc)
->Proto()
->SerializePartialToString(&prog_string);
int64_t program_id = static_cast<int64_t>(string_hash(prog_string));
void CreateGraphAndPE() {
framework::details::BuildStrategy build_strategy;
auto execution_strategy = GetExecutionStrategy(place_);
auto &program_desc = info_->ProgramDesc();
const framework::BlockDesc &global_block = program_desc.Block(0);
int64_t start_op_index = 0;
int64_t end_op_index = static_cast<int64_t>(global_block.OpSize());
utils::ShareIntoScope(info_->InputArgNames(), inputs, &scope_);
std::vector<std::string> input_var_names = info_->InputArgNames();
std::vector<std::string> output_var_names = info_->OutputArgNames();
if (end_op_index > start_op_index) {
auto cache_info = framework::GetExecutorInfoFromCache(program_desc,
place_,
start_op_index,
end_op_index,
/*is_grad=*/false,
program_id,
&scope_);
auto &parallel_executor = cache_info.first;
auto &skip_eager_delete_vars =
framework::ExecutorInfoCache::Instance().SkipEagerDeleteVars(
program_id, false);
if (cache_info.second /*is_new_created*/) {
parallel_executor->SkipMemoryReuse(/*scope_idx=*/0, input_var_names);
skip_eager_delete_vars.insert(skip_eager_delete_vars.end(),
output_var_names.begin(),
output_var_names.end());
framework::details::ParseSafeEagerDeletionSkipVars(
program_desc,
end_op_index,
output_var_names,
&skip_eager_delete_vars);
graph_ =
std::make_shared<Graph>(program_desc, start_op_index, end_op_index);
inner_pe_ = std::make_shared<ParallelExecutor>(
place_, &scope_, execution_strategy, build_strategy, graph_.get());
inner_pe_->PrepareVariables(&scope_);
inner_pe_->SkipMemoryReuse(/*scope_idx=*/0, info_->InputArgNames());
}
parallel_executor->RunWithoutFetch(skip_eager_delete_vars);
std::vector<Tensor> operator()(const std::vector<Tensor> &inputs) {
auto dense_tensors = utils::ToDenseTensors(inputs);
return utils::ToTensors(this->operator()(dense_tensors));
}
std::vector<DenseTensor> res;
utils::FetchOuts(info_->OutputArgNames(), scope_, &res);
return res;
std::vector<DenseTensor> operator()(const std::vector<DenseTensor> &inputs) {
utils::ShareIntoScope(info_->InputArgNames(), inputs, &scope_);
// update op_handle scope_map in pe->executor_->Graph
std::unordered_map<framework::Scope *, framework::Scope *> scope_map = {
{inner_pe_->GetLocalScopes().front(), &scope_}};
inner_pe_->ResetOpHandleScopeMapOfGraphs(scope_map);
// need to recreate tmp variables in new scope
inner_pe_->PrepareVariables(&scope_);
inner_pe_->RunWithoutFetch(info_->OutputArgNames());
std::vector<DenseTensor> outputs;
utils::FetchOuts(info_->OutputArgNames(), scope_, &outputs);
scope_.DropKids();
return outputs;
}
const std::shared_ptr<FunctionInfo> &Info() const { return info_; }
......@@ -105,6 +136,8 @@ class PEFunction : public BaseFunction {
std::shared_ptr<FunctionInfo> info_;
framework::Scope scope_;
phi::Place place_;
std::shared_ptr<ParallelExecutor> inner_pe_;
std::shared_ptr<Graph> graph_;
};
} // namespace jit
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册