未验证 提交 609f8225 编写于 作者: A Aurelius84 提交者: GitHub

[Dy2Stat] Refactor ExecutorCache logic and pre-support BuildStrategy for pass (#34181)

* modify into program_id

* fix cache_info declare problem

* fix python int to C long problem

* modify point to reference

* add ENVS
上级 832c3894
......@@ -13,6 +13,7 @@
// limitations under the License.
#include "paddle/fluid/framework/executor_cache.h"
#include "paddle/fluid/framework/op_info.h"
namespace paddle {
namespace framework {
......@@ -25,11 +26,11 @@ namespace framework {
namespace details {
static ExecutionStrategy GetExecutionStrategy(
const ExecutorInfoCache::CacheKey &cache_key) {
static ExecutionStrategy GetExecutionStrategy(const platform::Place &place) {
framework::ExecutionStrategy execution_strategy;
switch (cache_key.device_type_) {
auto device_type = platform::Place2DeviceType(place);
switch (device_type) {
case platform::DeviceType::CPU: {
execution_strategy.num_threads_ = 2;
break;
......@@ -46,9 +47,9 @@ static ExecutionStrategy GetExecutionStrategy(
}
default:
PADDLE_THROW(platform::errors::Unavailable("Unsupported Device type %d.",
cache_key.device_type_));
device_type));
}
execution_strategy.use_device_ = cache_key.device_type_;
execution_strategy.use_device_ = device_type;
return execution_strategy;
}
......@@ -136,50 +137,36 @@ ExecutorInfoCache &ExecutorInfoCache::Instance() {
return g_exe_cache_info_map;
}
void ExecutorInfoCache::Finalize() {
// NOTE(Aurelius84): DO NOT perform finalize in destructor
// to avoid problems caused by destructor order of static
// object.
info_map_.clear();
}
CacheInfo GetExecutorInfoFromCache(const ExecutorInfoCache::CacheKey &cache_key,
CacheInfo GetExecutorInfoFromCache(const ProgramDesc &program_desc,
const platform::Place &place,
int64_t start_op_index, int64_t end_op_index,
bool is_grad, int64_t program_id,
framework::Scope *scope) {
auto &cached_exe_info = framework::ExecutorInfoCache::Instance();
if (!cached_exe_info.Has(cache_key)) {
VLOG(1) << "create exe_info for " << cache_key.DebugString();
// TODO(Aurelius84): Consider to use LRU algorithm to replace this.
if (cached_exe_info.Size() > 4u /* max_cached_size*/) {
VLOG(2) << "The cached info size has exceeded max_cached_size: 4, clear "
"all cache!";
cached_exe_info.Finalize();
}
framework::BuildStrategy build_strategy;
auto execution_strategy = details::GetExecutionStrategy(cache_key);
if (!cached_exe_info.Has(program_id, is_grad)) {
VLOG(1) << "create exe_info for " << program_id << " is_grad: " << is_grad;
auto execution_strategy = details::GetExecutionStrategy(place);
auto &build_strategy = cached_exe_info.GetBuildStrategy(program_id);
// 2. Construct Graph and ParallelExecutor.
auto graph = std::make_shared<framework::ir::Graph>(
*cache_key.program_desc_, cache_key.start_op_index_,
cache_key.end_op_index_);
program_desc, start_op_index, end_op_index);
auto parallel_executor = std::make_shared<framework::ParallelExecutor>(
cache_key.place_, scope, execution_strategy, build_strategy,
graph.get());
place, scope, execution_strategy, build_strategy, graph.get());
parallel_executor->PrepareVariables(scope);
framework::ExecutorInfoCache::ValueType cache_val = {parallel_executor,
graph};
cached_exe_info.Insert(cache_key, cache_val);
bool is_new_created = true;
return std::make_pair(parallel_executor, is_new_created);
// 3. Insert value into cached map.
auto &cached_value = cached_exe_info.GetMutable(program_id, is_grad);
cached_value.executor_ = parallel_executor;
cached_value.graph_ = std::move(graph);
return std::make_pair(parallel_executor, /*is_new_created=*/true);
} else {
VLOG(1) << "get exe_info from cache by: " << cache_key.DebugString();
bool is_new_created = false;
auto cache_val = cached_exe_info.GetMutable(cache_key);
auto parallel_executor = cache_val.first;
VLOG(1) << "get exe_info from cache by: " << program_id
<< " is_grad: " << is_grad;
auto &cached_value = cached_exe_info.GetMutable(program_id, is_grad);
auto &parallel_executor = cached_value.executor_;
// update op_handle scope_map in pe->executor_->Graph
std::unordered_map<Scope *, Scope *> scope_map = {
{parallel_executor->GetLocalScopes().front(), scope}};
......@@ -187,7 +174,7 @@ CacheInfo GetExecutorInfoFromCache(const ExecutorInfoCache::CacheKey &cache_key,
// need to recreate tmp variables in new scope
parallel_executor->PrepareVariables(scope);
return std::make_pair(parallel_executor, is_new_created);
return std::make_pair(parallel_executor, /*is_new_created=*/false);
}
}
......
......@@ -45,121 +45,90 @@ void ParseSafeEagerDeletionSkipVars(
std::vector<std::string>* skip_eager_delete_vars);
} // namespace details
class ExecutorInfoCache {
public:
struct CacheKey {
CacheKey(const ProgramDesc* program_desc, const platform::Place& place,
int64_t start_op_index, int64_t end_op_index, bool is_grad)
: program_desc_(program_desc),
place_(place),
start_op_index_(start_op_index),
end_op_index_(end_op_index),
is_grad_(is_grad) {
device_type_ = platform::Place2DeviceType(place);
PADDLE_ENFORCE_NOT_NULL(program_desc_,
"program_desc should not be null.");
}
std::string DebugString() const {
std::stringstream ss;
ss << "\n CacheKey(program_desc: " << program_desc_;
ss << ", start_op_index: " << start_op_index_;
ss << ", end_op_index: " << end_op_index_;
ss << ", is_grad: " << is_grad_;
ss << ", device_type: " << device_type_ << ")";
return ss.str();
}
class ExecutorInfo {
public:
struct CacheValue {
std::shared_ptr<ParallelExecutor> executor_{nullptr};
std::shared_ptr<ir::Graph> graph_{nullptr};
const ProgramDesc* program_desc_;
platform::Place place_;
int64_t start_op_index_;
int64_t end_op_index_;
bool is_grad_;
platform::DeviceType device_type_;
std::vector<std::string> skip_eager_delete_vars_;
};
using KeyType = size_t;
using ValueType =
std::pair<std::shared_ptr<ParallelExecutor>, std::shared_ptr<ir::Graph>>;
struct KeyHasher {
size_t operator()(const CacheKey& key) const noexcept {
size_t seed = 10;
auto* prog_desc = key.program_desc_;
/*
* Note(Aurelius84): DO NOT use only ProgramDesc* to calculate hash value
* because a new program will hold same pointer address after an older
* program is destructed with a small probability. Add op size while
* hashing because program may contains at least one block.
*/
hash_combine(&seed, prog_desc);
for (size_t i = 0; i < prog_desc->Size(); ++i) {
hash_combine(&seed, &prog_desc->Block(i));
hash_combine(&seed, prog_desc->Block(i).OpSize());
}
hash_combine(&seed, static_cast<int>(key.device_type_));
hash_combine(&seed, key.start_op_index_);
hash_combine(&seed, key.end_op_index_);
hash_combine(&seed, key.is_grad_);
VLOG(3) << "hash value is : " << seed
<< " of key: " << key.DebugString();
return seed;
bool IsAvailable(bool is_grad) {
const auto& executor =
is_grad ? backward_info_.executor_ : forward_info_.executor_;
return executor != nullptr;
}
template <typename T>
void hash_combine(size_t* seed, const T& val) const {
std::hash<T> hasher;
(*seed) ^= hasher(val) + 0x9e3779b9 + ((*seed) << 6) + ((*seed >> 2));
CacheValue& GetMutable(bool is_grad) {
return is_grad ? backward_info_ : forward_info_;
}
};
private:
CacheValue forward_info_;
CacheValue backward_info_;
};
class ExecutorInfoCache {
public:
static ExecutorInfoCache& Instance();
ValueType GetMutable(const CacheKey& key) {
auto key_val = key_hash_func_(key);
const BuildStrategy& GetBuildStrategy(int64_t program_id) {
// If not found, insert build_strategy with default value.
return strategy_map_[program_id];
}
void SetBuildStrategy(int64_t program_id,
const BuildStrategy& build_strategy) {
PADDLE_ENFORCE_EQ(
Has(key_val), true,
platform::errors::NotFound("%s doesn't exist in ExecutorInfoCache",
key.DebugString()));
return info_map_[key_val];
strategy_map_.count(program_id), 0,
platform::errors::PreconditionNotMet(
"program_id: %s already exist in ExecutorInfoCache", program_id));
strategy_map_[program_id] = build_strategy;
}
bool Has(const CacheKey& key) const {
auto key_val = key_hash_func_(key);
return Has(key_val);
bool Has(int64_t program_id, bool is_grad) {
return info_map_.find(program_id) != info_map_.end() &&
info_map_[program_id].IsAvailable(is_grad);
}
bool Has(const KeyType& key) const {
return info_map_.find(key) != info_map_.end();
ExecutorInfo::CacheValue& GetMutable(int64_t program_id, bool is_grad) {
return info_map_[program_id].GetMutable(is_grad);
}
void Insert(const CacheKey& key, ValueType value) {
auto key_val = key_hash_func_(key);
PADDLE_ENFORCE_EQ(
Has(key_val), false,
platform::errors::NotFound("%s has existed in ExecutorInfoCache",
key.DebugString()));
info_map_.insert({key_val, value});
void UpdateSkipEagerDeleteVars(int64_t program_id, bool is_grad,
const std::vector<std::string>& skip_vars) {
auto& cached_value = GetMutable(program_id, is_grad);
cached_value.skip_eager_delete_vars_ = std::move(skip_vars);
}
size_t Size() const { return info_map_.size(); }
std::vector<std::string>& SkipEagerDeleteVars(int64_t program_id,
bool is_grad) {
auto& cached_value = GetMutable(program_id, is_grad);
return cached_value.skip_eager_delete_vars_;
}
void Finalize();
void Finalize() {
// NOTE(Aurelius84): DO NOT perform finalize in destructor
// to avoid problems caused by destructor order of static
// object.
info_map_.clear();
strategy_map_.clear();
}
private:
ExecutorInfoCache() = default;
DISABLE_COPY_AND_ASSIGN(ExecutorInfoCache);
KeyHasher key_hash_func_;
std::unordered_map<KeyType, ValueType> info_map_;
std::unordered_map<int64_t, ExecutorInfo> info_map_;
std::unordered_map<int64_t, BuildStrategy> strategy_map_;
};
using CacheInfo =
std::pair<std::shared_ptr<ParallelExecutor>, bool /*is_new_created*/>;
CacheInfo GetExecutorInfoFromCache(const ExecutorInfoCache::CacheKey& cache_key,
CacheInfo GetExecutorInfoFromCache(const ProgramDesc& program_desc,
const platform::Place& place,
int64_t start_op_index, int64_t end_op_index,
bool is_grad, int64_t program_id,
framework::Scope* scope);
} // namespace framework
......
......@@ -103,6 +103,10 @@ class RunProgramOpMaker : public framework::OpProtoAndCheckerMaker {
"(bool, default false) Set to true for inference only, false "
"for training.")
.SetDefault(false);
AddAttr<int64_t>(
"program_id",
"(int64_t)"
"The unique hash id used as cache key for ExecutorInfoCache.");
AddComment(R"DOC(
RunProgram operator.
......
......@@ -188,6 +188,7 @@ class RunProgramOpKernel : public framework::OpKernel<T> {
auto start_op_index = ctx.Attr<int64_t>("start_op_index");
auto end_op_index = ctx.Attr<int64_t>("end_op_index");
auto is_test = ctx.Attr<bool>("is_test");
auto program_id = ctx.Attr<int64_t>("program_id");
// NOTE(chenweihang): In order not to add new variable type, use vector
// here. Originally, here can use scope directly.
......@@ -215,23 +216,27 @@ class RunProgramOpKernel : public framework::OpKernel<T> {
if (end_op_index > start_op_index) {
auto *program = ctx.Attr<BlockDesc *>("global_block")->Program();
auto cache_key = framework::ExecutorInfoCache::CacheKey(
program, ctx.GetPlace(), start_op_index, end_op_index,
/*is_grad=*/false);
auto cache_info = framework::GetExecutorInfoFromCache(cache_key, &scope);
auto cache_info = framework::GetExecutorInfoFromCache(
*program, ctx.GetPlace(), start_op_index, end_op_index,
/*is_grad=*/false, program_id, &scope);
auto &parallel_executor = cache_info.first;
// all out_vars are skip_eager_var
auto &skip_eager_delete_vars =
framework::ExecutorInfoCache::Instance().SkipEagerDeleteVars(
program_id, false);
if (cache_info.second /*is_new_created*/) {
parallel_executor->SkipMemoryReuse(/*scope_idx=*/0, input_var_names);
}
// Step 3. run ops
// all out_vars are skip_eager_var
std::vector<std::string> skip_eager_delete_vars(output_var_names);
skip_eager_delete_vars.insert(skip_eager_delete_vars.end(),
output_var_names.begin(),
output_var_names.end());
skip_eager_delete_vars.insert(skip_eager_delete_vars.end(),
dout_var_names.begin(),
dout_var_names.end());
framework::details::ParseSafeEagerDeletionSkipVars(
*program, end_op_index, output_var_names, &skip_eager_delete_vars);
}
// Step 3. run ops
parallel_executor->RunWithoutFetch(skip_eager_delete_vars);
}
// Step 4. Get Output
......@@ -280,6 +285,7 @@ class RunProgramGradOpKernel : public framework::OpKernel<T> {
auto *block = ctx.Attr<BlockDesc *>("global_block");
auto orig_end_op_index = ctx.Attr<int64_t>("end_op_index");
auto program_id = ctx.Attr<int64_t>("program_id");
// NOTE: skip `shape` and `fill_constant` op created by
// fluid.backward.gradients, one forward output will generate one `shape`
// and `fill_constant`
......@@ -305,24 +311,30 @@ class RunProgramGradOpKernel : public framework::OpKernel<T> {
if (end_op_index > start_op_index) {
// Step 2. prepare executor and scope
auto *program = ctx.Attr<BlockDesc *>("global_block")->Program();
auto cache_key = framework::ExecutorInfoCache::CacheKey(
program, ctx.GetPlace(), start_op_index, end_op_index,
/*is_grad*/ true);
auto cache_info = framework::GetExecutorInfoFromCache(cache_key, &scope);
auto cache_info = framework::GetExecutorInfoFromCache(
*program, ctx.GetPlace(), start_op_index, end_op_index,
/*is_grad*/ true, program_id, &scope);
auto &parallel_executor = cache_info.first;
auto &skip_eager_delete_vars =
framework::ExecutorInfoCache::Instance().SkipEagerDeleteVars(
program_id, true);
if (cache_info.second /*is_new_created*/) {
parallel_executor->SkipMemoryReuse(/*scope_idx=*/0,
output_grad_var_names);
skip_eager_delete_vars.insert(skip_eager_delete_vars.end(),
input_grad_var_names.begin(),
input_grad_var_names.end());
framework::details::AppendSkipDeletionVars(param_grad_names,
&skip_eager_delete_vars);
}
details::ShareVarsIntoScope(output_grad_vars, output_grad_var_names,
&scope);
// Debug info: scope info when run end
VLOG(3) << framework::GenScopeTreeDebugInfo(out_scope_vec->front());
std::vector<std::string> skip_eager_delete_vars(input_grad_var_names);
framework::details::AppendSkipDeletionVars(param_grad_names,
&skip_eager_delete_vars);
// Step 3. run ops
parallel_executor->RunWithoutFetch(
/*skip_eager_delete_vars=*/skip_eager_delete_vars);
......
......@@ -3011,6 +3011,12 @@ All parameter, weight, gradient are variables in Paddle.
optimization passes should be defined in this way. BuildStrategy
cannot be updated after being finalized.)DOC");
m.def("_set_cached_executor_build_strategy",
[](int64_t program_id, const BuildStrategy &build_strategy) {
auto &cached_exe_info = framework::ExecutorInfoCache::Instance();
cached_exe_info.SetBuildStrategy(program_id, build_strategy);
});
pe.def(py::init<const std::vector<platform::Place> &,
const std::vector<std::string> &, const std::string &,
Scope *, std::vector<Scope *> &, const ExecutionStrategy &,
......
......@@ -273,6 +273,7 @@ if avx_supported():
from .core_avx import _cuda_synchronize
from .core_avx import _is_compiled_with_heterps
from .core_avx import _promote_types_if_complex_exists
from .core_avx import _set_cached_executor_build_strategy
from .core_avx import _device_synchronize
from .core_avx import _get_current_stream
if sys.platform != 'win32':
......@@ -324,6 +325,7 @@ if load_noavx:
from .core_noavx import _cuda_synchronize
from .core_noavx import _is_compiled_with_heterps
from .core_noavx import _promote_types_if_complex_exists
from .core_noavx import _set_cached_executor_build_strategy
from .core_noavx import _device_synchronize
from .core_noavx import _get_current_stream
if sys.platform != 'win32':
......
......@@ -24,6 +24,8 @@ from paddle.fluid.dygraph.dygraph_to_static import logging_utils
from paddle.fluid.dygraph.dygraph_to_static.return_transformer import RETURN_NO_VALUE_MAGIC_NUM
from paddle.fluid.layers.utils import flatten
from paddle.fluid.layers.utils import pack_sequence_as
from paddle.fluid.layers.utils import _hash_with_id
from paddle.fluid.compiler import BuildStrategy
import paddle.compat as cpt
from paddle import _C_ops
......@@ -162,6 +164,14 @@ class PartialProgramLayer:
return train_program
@LazyInitialized
def _infer_program_id(self):
return _hash_with_id(self._infer_program, self)
@LazyInitialized
def _train_program_id(self):
return _hash_with_id(self._train_program, self)
def _verify_program(self, main_program):
"""
Verify that the program parameter is initialized, prune some unused params,
......@@ -228,7 +238,7 @@ class PartialProgramLayer:
attrs = ('global_block', self.program.desc.block(0), 'start_op_index',
0, 'end_op_index', self._infer_program.desc.block(0).op_size(),
'is_test', not self.training)
'is_test', not self.training, 'program_id', self.program_id)
_C_ops.run_program(
self._valid_vars(in_vars),
self._valid_vars(self._params),
......@@ -242,6 +252,10 @@ class PartialProgramLayer:
def program(self):
return self._train_program if self.training else self._infer_program
@property
def program_id(self):
return self._train_program_id if self.training else self._infer_program_id
def _prepare(self, inputs):
"""
Prepare inputs, outputs, attrs.
......
......@@ -27,6 +27,7 @@ from paddle.fluid import backward
from paddle.fluid import unique_name
from paddle.fluid.dygraph import layers
from paddle.fluid.layers import nn
from paddle.fluid.layers.utils import _hash_with_id
from paddle.fluid.dygraph.base import switch_to_static_graph
from paddle.fluid.framework import in_dygraph_mode
......@@ -824,7 +825,8 @@ def _run_dygraph(instance, input, program_holder):
'global_block': trace_program.block(0),
'start_op_index': 0,
'end_op_index': end_op_index,
'is_test': instance._is_test
'is_test': instance._is_test,
'program_id': _hash_with_id(trace_program)
})
# NOTE: [ why need set param's gradient type here ]
# if user set sparse gradient mode, the param's gradient
......
......@@ -78,6 +78,15 @@ def is_sequence(seq):
not isinstance(seq, six.string_types))
def _hash_with_id(*args):
"""
Return int hash value calculated by id(arg) or tuple(id1,id2, ...).
"""
assert len(args) > 0
info = tuple([id(v) for v in args])
return hash(info) & 0xfffffff
def _sorted(dict_):
"""
Returns a sorted list of the dict keys, with error if keys not sortable.
......
file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py")
string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}")
set(GC_ENVS FLAGS_eager_delete_tensor_gb=0.0)
list(REMOVE_ITEM TEST_OPS test_lac)
# NOTE(Aurelius84): In case of Windows CI, if open ON_INFER, RWLOCK of Scope will
......@@ -10,7 +11,7 @@ if(NOT ON_INFER)
endif()
foreach(TEST_OP ${TEST_OPS})
py_test_modules(${TEST_OP} MODULES ${TEST_OP})
py_test_modules(${TEST_OP} MODULES ${TEST_OP} ENVS ${GC_ENVS})
endforeach(TEST_OP)
set_tests_properties(test_se_resnet PROPERTIES TIMEOUT 900)
......
......@@ -23,6 +23,7 @@ import paddle
import paddle.fluid as fluid
from paddle import compat as cpt
from paddle.fluid import core, framework, executor
from paddle.fluid.layers.utils import _hash_with_id
paddle.enable_static()
......@@ -174,6 +175,7 @@ class RunProgramOpTest(unittest.TestCase):
def calc_dygraph_output(self, place):
self.program_desc, self.fwd_op_num = self.get_program_desc()
self.attrs = self.prepare_attrs()
self.attrs['program_id'] = _hash_with_id(self.program_desc)
with fluid.dygraph.guard(place):
inputs = self.prepare_dygraph_input(place)
......@@ -189,6 +191,7 @@ class RunProgramOpTest(unittest.TestCase):
def calc_dygraph_grad(self, place):
self.program_desc, self.fwd_op_num = self.get_program_desc()
self.attrs = self.prepare_attrs()
self.attrs['program_id'] = _hash_with_id(self.program_desc)
with fluid.dygraph.guard(place):
# Step 1. run forward
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册