diff --git a/paddle/fluid/framework/details/eager_deletion_op_handle.cc b/paddle/fluid/framework/details/eager_deletion_op_handle.cc index 817fe03cf5ce06f723e06f9b523c2a98017982c6..2e64f9d4fb3f3673efe466e030038afc043046b4 100644 --- a/paddle/fluid/framework/details/eager_deletion_op_handle.cc +++ b/paddle/fluid/framework/details/eager_deletion_op_handle.cc @@ -96,7 +96,8 @@ void EagerDeletionOpHandle::RunImpl() { std::deque> garbages; for (size_t i = 0; i < var_infos_.size(); ++i) { auto *var_info = var_infos_[i]; - if (var_info->IsSkipped() || !var_info->DecreaseRefCnt()) { + if (var_info->IsSkippedAllMemoryOptimization() || + !var_info->DecreaseRefCnt()) { continue; } diff --git a/paddle/fluid/framework/details/share_tensor_buffer_functor.cc b/paddle/fluid/framework/details/share_tensor_buffer_functor.cc index 5f10f41a599f50ed9b9ed454964d48eb73bb291d..fb43bfbf342ea282b517694305e26990069dbf07 100644 --- a/paddle/fluid/framework/details/share_tensor_buffer_functor.cc +++ b/paddle/fluid/framework/details/share_tensor_buffer_functor.cc @@ -100,7 +100,7 @@ void ShareTensorBufferFunctor::operator()(Scope *exec_scope) { auto *out_tensor = GetMutableTensorFromVar(in_out_vars_[i].second); auto *in_var_info = in_var_infos_[i]; - if (UNLIKELY(in_var_info->IsSkipped())) { + if (UNLIKELY(in_var_info->IsSkippedMemoryReuse())) { // If in_var is inplaced in the previous batch and we want to fetch // in_var in the current batch, we have to reset memory of out_var // to avoid wrong calculation result. diff --git a/paddle/fluid/framework/ir/memory_optimize_pass/memory_optimization_var_info.h b/paddle/fluid/framework/ir/memory_optimize_pass/memory_optimization_var_info.h index 73b03be7a4b78f8a44458df861eae077ae88b439..4f6bacecab4aac39b6f4cb01138560ca8378c13a 100644 --- a/paddle/fluid/framework/ir/memory_optimize_pass/memory_optimization_var_info.h +++ b/paddle/fluid/framework/ir/memory_optimize_pass/memory_optimization_var_info.h @@ -35,7 +35,11 @@ class MemOptVarInfo { return ref_cnt_ == 1 || (runtime_ref_cnt_.fetch_sub(1) == 1); } - void ResetRuntimeRefCnt() { runtime_ref_cnt_ = ref_cnt_; } + void ResetRuntimeRefCnt() { + if (ref_cnt_ != 1) { + runtime_ref_cnt_ = ref_cnt_; + } + } void SetRefCnt(size_t ref_cnt) { PADDLE_ENFORCE_GE(ref_cnt, 1, @@ -44,17 +48,44 @@ class MemOptVarInfo { runtime_ref_cnt_ = ref_cnt; } - bool IsSkipped() const { return skipped_; } + // Skip all memory optimization, including memory reuse and garbage collection + void SetSkipAllMemoryOptimization(bool is_skipped) { + skip_all_memory_optimization_ = is_skipped; + } + + bool IsSkippedAllMemoryOptimization() const { + return skip_all_memory_optimization_; + } + + // Skip all memory reuse, including inplace and cross op memory reuse + void SetSkipMemoryReuse(bool is_skipped) { skip_memory_reuse_ = is_skipped; } - void SetSkip(bool skipped) { skipped_ = skipped; } + bool IsSkippedMemoryReuse() const { + return skip_memory_reuse_ || skip_all_memory_optimization_; + } const std::string &Name() const { return name_; } private: std::string name_; + + /** + * ref_cnt_ is the total number of last-lived ops of variable. It would not + * be changed during iterations. + * + * runtime_ref_cnt_ is the runtime reference count of variable, which would + * decrease 1 when each EagerDeletionOpHandle runs. As a result, it should + * be reset to ref_cnt_ after each iteration ends. Since operators are + * scheduled in many threads inside ParallelExecutor, runtime_ref_cnt_ + * must be an atomic integer to guarantee the thread safety and visibility. + * + * Speciallly, if ref_cnt_ is 1, we do not need to reset runtime_ref_cnt_ + * after iteration ends. + */ size_t ref_cnt_; std::atomic runtime_ref_cnt_; - bool skipped_{false}; + bool skip_memory_reuse_{false}; + bool skip_all_memory_optimization_{false}; }; using MemOptVarInfoMapList = std::vector< @@ -72,8 +103,9 @@ class SkipMemOptVarsGuard { for (auto &var : vars) { for (auto &map : *list_) { auto iter = map.find(var); - if (iter != map.end() && !iter->second->IsSkipped()) { - iter->second->SetSkip(true); + if (iter != map.end() && + !iter->second->IsSkippedAllMemoryOptimization()) { + iter->second->SetSkipAllMemoryOptimization(true); skip_vars_.emplace_back(iter->second.get()); } } @@ -82,7 +114,7 @@ class SkipMemOptVarsGuard { ~SkipMemOptVarsGuard() { for (auto *var : skip_vars_) { - var->SetSkip(false); + var->SetSkipAllMemoryOptimization(false); } if (list_ && need_reset_ref_cnt_) { diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 7b83f87d5d8e51bbe8a61f6ef08e65369399635b..48c26a216a5b97b94b295f02603790316b8eb68b 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -84,6 +84,29 @@ class ParallelExecutorPrivate { inline bool HasGarbageCollectors() const { return !gcs_.empty(); } + /** + * NOTE(zengjinle): the feeded variables of users should not be reused, + * because users may feed them into another network. Changing the feeded + * variables that users can visit may cause calculation wrong, which is + * a very subtle bug when traning networks. However, these variables + * can be garbage collected. + * + * ParallelExecutor provides 2 methods to feed variables: + * + * - FeedTensorsIntoLocalScopes: this method would share memory of feeded + * variables, so we have to skip these. + * + * - FeedAndSplitTensorIntoLocalScopes: this method would copy data of feeded + * variables, so we do not need to skip + * them. + */ + inline void SetSkipMemoryReuse(size_t scope_idx, const std::string &name) { + auto iter = mem_opt_var_infos_[scope_idx].find(name); + if (iter != mem_opt_var_infos_[scope_idx].end()) { + iter->second->SetSkipMemoryReuse(true); + } + } + #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) void InitNCCLCtxs(framework::Scope *scope, const BuildStrategy &bst) { VLOG(1) << "nccl comm num:" << bst.nccl_comm_num_ << ", nranks:" << nranks_ @@ -724,6 +747,9 @@ void ParallelExecutor::FeedTensorsIntoLocalScopes( auto &map = tensors[i]; for (auto &pair : map) { bool is_persistable = member_->IsPersistable(pair.first); + if (!is_persistable) { + member_->SetSkipMemoryReuse(i, pair.first); + } auto *feed_scope = is_persistable ? member_->local_scopes_[i] : member_->local_exec_scopes_[i]; auto *feed_var = feed_scope->Var(pair.first); diff --git a/python/paddle/fluid/tests/unittests/test_memory_reuse_exclude_feed_var.py b/python/paddle/fluid/tests/unittests/test_memory_reuse_exclude_feed_var.py new file mode 100644 index 0000000000000000000000000000000000000000..a1b7380fdd9a201a6cb17aeaf72ca8577506a63a --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_memory_reuse_exclude_feed_var.py @@ -0,0 +1,66 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle.fluid as fluid +import numpy as np +import unittest + + +class TestMemoryReuseExcludeFeedVar(unittest.TestCase): + def setUp(self): + self.image_shape = [28, 28] + self.iteration = 10 + + def main_impl(self, place): + image = fluid.layers.data( + name='image', shape=self.image_shape, dtype='float32') + relu_image = fluid.layers.relu(image) + loss = fluid.layers.reduce_mean(relu_image) + + build_strategy = fluid.BuildStrategy() + build_strategy.enable_inplace = True + build_strategy.memory_optimize = True + + exe = fluid.Executor(place) + exe.run(fluid.default_startup_program()) + + compiled_prog = fluid.CompiledProgram(fluid.default_main_program( + )).with_data_parallel( + loss_name=loss.name, build_strategy=build_strategy) + + image_tensor = fluid.LoDTensor() + np_image = np.random.uniform( + low=-10, high=10, size=self.image_shape).astype('float32') + image_tensor.set(np_image, place) + + feed_dict = [{image.name: image_tensor}] + + for _ in range(self.iteration): + exe.run(compiled_prog, feed=feed_dict, fetch_list=[loss.name]) + self.assertTrue(np.array_equal(np.array(image_tensor), np_image)) + + def test_main(self): + places = [fluid.CPUPlace()] + if fluid.is_compiled_with_cuda(): + places.append(fluid.CUDAPlace(0)) + + for p in places: + with fluid.program_guard(fluid.Program(), fluid.Program()): + with fluid.unique_name.guard(): + with fluid.scope_guard(fluid.Scope()): + self.main_impl(p) + + +if __name__ == '__main__': + unittest.main()