未验证 提交 db26de83 编写于 作者: Z Zeng Jinle 提交者: GitHub

[Bug fix] Disable memory reuse on feeded variables (#19835)

* fix memory reuse bug on feeding variables, test=develop

* add comments to reference count members, test=develop
上级 dfdd73cb
...@@ -96,7 +96,8 @@ void EagerDeletionOpHandle::RunImpl() { ...@@ -96,7 +96,8 @@ void EagerDeletionOpHandle::RunImpl() {
std::deque<std::shared_ptr<memory::Allocation>> garbages; std::deque<std::shared_ptr<memory::Allocation>> garbages;
for (size_t i = 0; i < var_infos_.size(); ++i) { for (size_t i = 0; i < var_infos_.size(); ++i) {
auto *var_info = var_infos_[i]; auto *var_info = var_infos_[i];
if (var_info->IsSkipped() || !var_info->DecreaseRefCnt()) { if (var_info->IsSkippedAllMemoryOptimization() ||
!var_info->DecreaseRefCnt()) {
continue; continue;
} }
......
...@@ -100,7 +100,7 @@ void ShareTensorBufferFunctor::operator()(Scope *exec_scope) { ...@@ -100,7 +100,7 @@ void ShareTensorBufferFunctor::operator()(Scope *exec_scope) {
auto *out_tensor = GetMutableTensorFromVar(in_out_vars_[i].second); auto *out_tensor = GetMutableTensorFromVar(in_out_vars_[i].second);
auto *in_var_info = in_var_infos_[i]; auto *in_var_info = in_var_infos_[i];
if (UNLIKELY(in_var_info->IsSkipped())) { if (UNLIKELY(in_var_info->IsSkippedMemoryReuse())) {
// If in_var is inplaced in the previous batch and we want to fetch // If in_var is inplaced in the previous batch and we want to fetch
// in_var in the current batch, we have to reset memory of out_var // in_var in the current batch, we have to reset memory of out_var
// to avoid wrong calculation result. // to avoid wrong calculation result.
......
...@@ -35,7 +35,11 @@ class MemOptVarInfo { ...@@ -35,7 +35,11 @@ class MemOptVarInfo {
return ref_cnt_ == 1 || (runtime_ref_cnt_.fetch_sub(1) == 1); return ref_cnt_ == 1 || (runtime_ref_cnt_.fetch_sub(1) == 1);
} }
void ResetRuntimeRefCnt() { runtime_ref_cnt_ = ref_cnt_; } void ResetRuntimeRefCnt() {
if (ref_cnt_ != 1) {
runtime_ref_cnt_ = ref_cnt_;
}
}
void SetRefCnt(size_t ref_cnt) { void SetRefCnt(size_t ref_cnt) {
PADDLE_ENFORCE_GE(ref_cnt, 1, PADDLE_ENFORCE_GE(ref_cnt, 1,
...@@ -44,17 +48,44 @@ class MemOptVarInfo { ...@@ -44,17 +48,44 @@ class MemOptVarInfo {
runtime_ref_cnt_ = ref_cnt; runtime_ref_cnt_ = ref_cnt;
} }
bool IsSkipped() const { return skipped_; } // Skip all memory optimization, including memory reuse and garbage collection
void SetSkipAllMemoryOptimization(bool is_skipped) {
skip_all_memory_optimization_ = is_skipped;
}
bool IsSkippedAllMemoryOptimization() const {
return skip_all_memory_optimization_;
}
// Skip all memory reuse, including inplace and cross op memory reuse
void SetSkipMemoryReuse(bool is_skipped) { skip_memory_reuse_ = is_skipped; }
void SetSkip(bool skipped) { skipped_ = skipped; } bool IsSkippedMemoryReuse() const {
return skip_memory_reuse_ || skip_all_memory_optimization_;
}
const std::string &Name() const { return name_; } const std::string &Name() const { return name_; }
private: private:
std::string name_; std::string name_;
/**
* ref_cnt_ is the total number of last-lived ops of variable. It would not
* be changed during iterations.
*
* runtime_ref_cnt_ is the runtime reference count of variable, which would
* decrease 1 when each EagerDeletionOpHandle runs. As a result, it should
* be reset to ref_cnt_ after each iteration ends. Since operators are
* scheduled in many threads inside ParallelExecutor, runtime_ref_cnt_
* must be an atomic integer to guarantee the thread safety and visibility.
*
* Speciallly, if ref_cnt_ is 1, we do not need to reset runtime_ref_cnt_
* after iteration ends.
*/
size_t ref_cnt_; size_t ref_cnt_;
std::atomic<size_t> runtime_ref_cnt_; std::atomic<size_t> runtime_ref_cnt_;
bool skipped_{false}; bool skip_memory_reuse_{false};
bool skip_all_memory_optimization_{false};
}; };
using MemOptVarInfoMapList = std::vector< using MemOptVarInfoMapList = std::vector<
...@@ -72,8 +103,9 @@ class SkipMemOptVarsGuard { ...@@ -72,8 +103,9 @@ class SkipMemOptVarsGuard {
for (auto &var : vars) { for (auto &var : vars) {
for (auto &map : *list_) { for (auto &map : *list_) {
auto iter = map.find(var); auto iter = map.find(var);
if (iter != map.end() && !iter->second->IsSkipped()) { if (iter != map.end() &&
iter->second->SetSkip(true); !iter->second->IsSkippedAllMemoryOptimization()) {
iter->second->SetSkipAllMemoryOptimization(true);
skip_vars_.emplace_back(iter->second.get()); skip_vars_.emplace_back(iter->second.get());
} }
} }
...@@ -82,7 +114,7 @@ class SkipMemOptVarsGuard { ...@@ -82,7 +114,7 @@ class SkipMemOptVarsGuard {
~SkipMemOptVarsGuard() { ~SkipMemOptVarsGuard() {
for (auto *var : skip_vars_) { for (auto *var : skip_vars_) {
var->SetSkip(false); var->SetSkipAllMemoryOptimization(false);
} }
if (list_ && need_reset_ref_cnt_) { if (list_ && need_reset_ref_cnt_) {
......
...@@ -84,6 +84,29 @@ class ParallelExecutorPrivate { ...@@ -84,6 +84,29 @@ class ParallelExecutorPrivate {
inline bool HasGarbageCollectors() const { return !gcs_.empty(); } inline bool HasGarbageCollectors() const { return !gcs_.empty(); }
/**
* NOTE(zengjinle): the feeded variables of users should not be reused,
* because users may feed them into another network. Changing the feeded
* variables that users can visit may cause calculation wrong, which is
* a very subtle bug when traning networks. However, these variables
* can be garbage collected.
*
* ParallelExecutor provides 2 methods to feed variables:
*
* - FeedTensorsIntoLocalScopes: this method would share memory of feeded
* variables, so we have to skip these.
*
* - FeedAndSplitTensorIntoLocalScopes: this method would copy data of feeded
* variables, so we do not need to skip
* them.
*/
inline void SetSkipMemoryReuse(size_t scope_idx, const std::string &name) {
auto iter = mem_opt_var_infos_[scope_idx].find(name);
if (iter != mem_opt_var_infos_[scope_idx].end()) {
iter->second->SetSkipMemoryReuse(true);
}
}
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
void InitNCCLCtxs(framework::Scope *scope, const BuildStrategy &bst) { void InitNCCLCtxs(framework::Scope *scope, const BuildStrategy &bst) {
VLOG(1) << "nccl comm num:" << bst.nccl_comm_num_ << ", nranks:" << nranks_ VLOG(1) << "nccl comm num:" << bst.nccl_comm_num_ << ", nranks:" << nranks_
...@@ -724,6 +747,9 @@ void ParallelExecutor::FeedTensorsIntoLocalScopes( ...@@ -724,6 +747,9 @@ void ParallelExecutor::FeedTensorsIntoLocalScopes(
auto &map = tensors[i]; auto &map = tensors[i];
for (auto &pair : map) { for (auto &pair : map) {
bool is_persistable = member_->IsPersistable(pair.first); bool is_persistable = member_->IsPersistable(pair.first);
if (!is_persistable) {
member_->SetSkipMemoryReuse(i, pair.first);
}
auto *feed_scope = is_persistable ? member_->local_scopes_[i] auto *feed_scope = is_persistable ? member_->local_scopes_[i]
: member_->local_exec_scopes_[i]; : member_->local_exec_scopes_[i];
auto *feed_var = feed_scope->Var(pair.first); auto *feed_var = feed_scope->Var(pair.first);
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid as fluid
import numpy as np
import unittest
class TestMemoryReuseExcludeFeedVar(unittest.TestCase):
def setUp(self):
self.image_shape = [28, 28]
self.iteration = 10
def main_impl(self, place):
image = fluid.layers.data(
name='image', shape=self.image_shape, dtype='float32')
relu_image = fluid.layers.relu(image)
loss = fluid.layers.reduce_mean(relu_image)
build_strategy = fluid.BuildStrategy()
build_strategy.enable_inplace = True
build_strategy.memory_optimize = True
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
compiled_prog = fluid.CompiledProgram(fluid.default_main_program(
)).with_data_parallel(
loss_name=loss.name, build_strategy=build_strategy)
image_tensor = fluid.LoDTensor()
np_image = np.random.uniform(
low=-10, high=10, size=self.image_shape).astype('float32')
image_tensor.set(np_image, place)
feed_dict = [{image.name: image_tensor}]
for _ in range(self.iteration):
exe.run(compiled_prog, feed=feed_dict, fetch_list=[loss.name])
self.assertTrue(np.array_equal(np.array(image_tensor), np_image))
def test_main(self):
places = [fluid.CPUPlace()]
if fluid.is_compiled_with_cuda():
places.append(fluid.CUDAPlace(0))
for p in places:
with fluid.program_guard(fluid.Program(), fluid.Program()):
with fluid.unique_name.guard():
with fluid.scope_guard(fluid.Scope()):
self.main_impl(p)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册