diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index 0f6337d6f3627fcb4a5e7cfd63dc4d581a3e11dd..fd9567dd6517e756b2c1e83ee502c92bd4a440cf 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -31,6 +31,7 @@ paddle.fluid.memory_optimize (ArgSpec(args=['input_program', 'skip_opt_set', 'pr paddle.fluid.release_memory (ArgSpec(args=['input_program', 'skip_opt_set'], varargs=None, keywords=None, defaults=(None,)), ('document', 'ac4114d3df16264f1946deb3a8434a6f')) paddle.fluid.DistributeTranspilerConfig.__init__ paddle.fluid.ParallelExecutor.__init__ (ArgSpec(args=['self', 'use_cuda', 'loss_name', 'main_program', 'share_vars_from', 'exec_strategy', 'build_strategy', 'num_trainers', 'trainer_id', 'scope'], varargs=None, keywords=None, defaults=(None, None, None, None, None, 1, 0, None)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) +paddle.fluid.ParallelExecutor.drop_local_exe_scopes (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '80d857dc626612e2b2460d0154551e95')) paddle.fluid.ParallelExecutor.run (ArgSpec(args=['self', 'fetch_list', 'feed', 'feed_dict', 'return_numpy'], varargs=None, keywords=None, defaults=(None, None, True)), ('document', '33ce6ec50f8eeb05d340e6b114b026fd')) paddle.fluid.create_lod_tensor (ArgSpec(args=['data', 'recursive_seq_lens', 'place'], varargs=None, keywords=None, defaults=None), ('document', 'b82ea20e2dc5ff2372e0643169ca47ff')) paddle.fluid.create_random_int_lodtensor (ArgSpec(args=['recursive_seq_lens', 'base_shape', 'place', 'low', 'high'], varargs=None, keywords=None, defaults=None), ('document', '74dc6d23185d90a7a50fbac19f5b65fb')) diff --git a/paddle/fluid/framework/details/op_handle_base.h b/paddle/fluid/framework/details/op_handle_base.h index 647b238634a51aed92f3bcf4171416838c0f3cc6..3412fa0bb76fafbef7d1abbee72bf46c361152f9 100644 --- a/paddle/fluid/framework/details/op_handle_base.h +++ b/paddle/fluid/framework/details/op_handle_base.h @@ -27,7 +27,7 @@ namespace paddle { namespace framework { namespace details { -constexpr char kLocalExecScopeName[] = "@LOCAL_SCOPE@"; +constexpr char kLocalExecScopeName[] = "@LOCAL_EXE_SCOPE@"; // Wraps ir::Node and provide helper utilities. // It's responsible for populating necessary fields of ir::Node. diff --git a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc index 4a7b7d1329a6a6c6da9b581eaa93f54038c9420d..247d78479348da998a46d7838b89c481c9e299e5 100644 --- a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc @@ -68,15 +68,7 @@ FeedFetchList ScopeBufferedSSAGraphExecutor::Run( ++drop_scope_counter_; if (drop_scope_counter_ == strategy_.num_iteration_per_drop_scope_) { - WaitComputationalStreams(); - - for (auto &scope : local_scopes_) { - auto &local_scope = - *scope->Var(details::kLocalExecScopeName)->GetMutable(); - scope->DeleteScope(local_scope); - } - - drop_scope_counter_ = 0; + DropLocalExeScopes(); } if (eptr) { std::rethrow_exception(eptr); @@ -84,6 +76,25 @@ FeedFetchList ScopeBufferedSSAGraphExecutor::Run( return fetch_data; } } + +void ScopeBufferedSSAGraphExecutor::DropLocalExeScopes() { + drop_scope_counter_ = 0; + for (auto p : places_) { + platform::DeviceContextPool::Instance().Get(p)->Wait(); + } + + for (auto &scope : local_scopes_) { + auto &local_scope = + *scope->Var(details::kLocalExecScopeName)->GetMutable(); + scope->DeleteScope(local_scope); + VLOG(3) << "Drop local execution scope: " << local_scope; + } +} + +bool ScopeBufferedSSAGraphExecutor::NeedCreateLocalExeScope() { + return drop_scope_counter_ == 0; +} + } // namespace details } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h index 0f6340213daee98a75401f9db0e628f7b4fd79fc..030777cad894fa24ccdc0afa1aae8e7e4caa90ee 100644 --- a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h @@ -47,17 +47,12 @@ class ScopeBufferedSSAGraphExecutor : public SSAGraphExecutor { FeedFetchList Run(const std::vector& fetch_tensors) override; - private: - inline void WaitComputationalStreams() { - // Wait All computational streams - for (auto p : places_) { - platform::DeviceContextPool::Instance().Get(p)->Wait(); - } - } + void DropLocalExeScopes(); + + bool NeedCreateLocalExeScope(); private: size_t drop_scope_counter_{0}; - ExecutionStrategy strategy_; std::unique_ptr underlying_executor_; std::vector local_scopes_; diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index f474052d528d7835504a6c3bf2a047e4d9d295ce..f400e8a5cc031cb0982860a6c2c1c9aba77f35dc 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -46,6 +46,7 @@ static std::once_flag gProfileOnce; #ifdef WITH_GPERFTOOLS static bool gProfileStarted = false; #endif + class ParallelExecutorPrivate { public: explicit ParallelExecutorPrivate(const std::vector &places) @@ -57,7 +58,7 @@ class ParallelExecutorPrivate { gProfileStarted = true; #else LOG(WARNING) << "Paddle is not compiled with gperftools. " - "FLAGS_pe_profile_fname will be ignored"; + "FLAGS_pe_profile_fname will be ignored"; #endif }); } @@ -177,6 +178,20 @@ std::vector &ParallelExecutor::GetLocalScopes() { return member_->local_scopes_; } +void ParallelExecutor::DropLocalExeScopes() { + auto executor = dynamic_cast( + member_->executor_.get()); + if (executor) { + executor->DropLocalExeScopes(); + } +} + +bool ParallelExecutor::NeedCreateLocalExeScope() { + auto executor = dynamic_cast( + member_->executor_.get()); + return executor && executor->NeedCreateLocalExeScope(); +} + ParallelExecutor::ParallelExecutor(const std::vector &places, const std::vector &bcast_vars, const std::string &loss_var_name, @@ -342,8 +357,8 @@ ParallelExecutor::ParallelExecutor(const std::vector &places, member_->local_scopes_, member_->nranks_, member_->use_cuda_); } - #endif + auto max_memory_size = GetEagerDeletionThreshold(); VLOG(10) << "Eager Deletion Threshold " << static_cast(max_memory_size) / (1 << 30); diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index 5756627fbd8583428014e24e5aa3f626c908ce1c..2de6b7f73d2a03a4b9f23b49142f677df6120806 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -58,6 +58,11 @@ class ParallelExecutor { std::vector &GetLocalScopes(); + void DropLocalExeScopes(); + + // This API is used to check whether DropLocalExeScopes work. + bool NeedCreateLocalExeScope(); + /** * Feed tensors to local scopes. The size of tensors should be equal to the * size of local scopes. diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 1b595b010ff82395492a1ddf73856a369efecbf0..63d37223ca7a83ba47081a6b3fc90ec510866cf8 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -1504,6 +1504,9 @@ All parameter, weight, gradient are variables in Paddle. return &self.GetLocalScopes(); }, py::return_value_policy::reference) + .def("drop_local_exe_scopes", &ParallelExecutor::DropLocalExeScopes) + .def("_need_create_local_exe_scopes", + &ParallelExecutor::NeedCreateLocalExeScope) .def("feed_tensors_into_local_scopes", &ParallelExecutor::FeedTensorsIntoLocalScopes) .def("feed_and_split_tensor_into_local_scopes", diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index ad32f157d0e694477e009b5a9e3650701787d79e..a2c6537effafcc2134d05a3f972f88ea3ec985b5 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -288,3 +288,68 @@ class ParallelExecutor(object): @property def device_count(self): return len(self._places) + + def drop_local_exe_scopes(self): + """ + Drop the local execution scope immediately. + + During the execution of the Program, the generate intermediate + results are placed in local execution scope, in some model the + creation and deletion of those intermediate results are time-consuming. + To resolve that problem, ParallelExecutor provides an option in + ExecutionStrategy, i.g. num_iteration_per_drop_scope, this option + indicates how many iterations to run before dropping the local execution + scope. But in some situation, each iteration generates different + intermediate results, it will lead to the result that the memory which + is needed by local execution scope gradually increase. And if you want + to run another program at this time, there may be insufficient storage, + At this point you should drop the local execution scope of other Programs. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + import numpy + import os + + use_cuda = True + # NOTE: If you use CPU to run the program, you need + # to specify the CPU_NUM, otherwise, fluid will use + # all the number of the logic core as the CPU_NUM, + # in that case, the batch size of the input should be + # greater than CPU_NUM, if not, the process will be + # failed by an exception. + if not use_cuda: + os.environ['CPU_NUM'] = str(2) + + train_program = fluid.Program() + startup_program = fluid.Program() + with fluid.program_guard(train_program, startup_program): + data = fluid.layers.data(name='X', shape=[1], dtype='float32') + hidden = fluid.layers.fc(input=data, size=10) + loss = fluid.layers.mean(hidden) + + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + exe.run(startup_program) + + parallel_exe = fluid.ParallelExecutor(use_cuda=use_cuda, + main_program=train_program, + loss_name=loss.name) + + x = numpy.random.random(size=(10, 1)).astype('float32') + loss_data, = parallel_exe.run(feed={"X": x}, + fetch_list=[loss.name]) + + parallel_exe.drop_local_exe_scopes() + """ + assert isinstance( + self._compiled_program._executor, + core.ParallelExecutor), "The Executor should be ParallelExecutor." + self._compiled_program._executor.drop_local_exe_scopes() + + # This API is used to check whether DropLocalExeScopes can work. + def _need_create_local_exe_scopes(self): + assert isinstance( + self._compiled_program._executor, + core.ParallelExecutor), "The Executor should be ParallelExecutor." + return self._compiled_program._executor._need_create_local_exe_scopes() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_drop_scope.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_drop_scope.py new file mode 100644 index 0000000000000000000000000000000000000000..e0bae089829b330e1a2dba34782f096f24279368 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_drop_scope.py @@ -0,0 +1,77 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import unittest +import paddle.fluid as fluid +import numpy +import os + + +class TestParallelExecutorDropExeScope(unittest.TestCase): + def check_drop_scope(self, use_cuda=True): + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + + if not use_cuda: + os.environ['CPU_NUM'] = str(2) + + train_program = fluid.Program() + startup_program = fluid.Program() + with fluid.program_guard(train_program, startup_program): + data = fluid.layers.data(name='X', shape=[1], dtype='float32') + hidden = fluid.layers.fc(input=data, size=10) + loss = fluid.layers.mean(hidden) + test_program = fluid.default_main_program().clone(for_test=True) + fluid.optimizer.SGD(learning_rate=0.01).minimize(loss) + + exe = fluid.Executor(place) + exe.run(startup_program) + + exec_strateg = fluid.ExecutionStrategy() + exec_strateg.num_iteration_per_drop_scope = 10 + + train_exe = fluid.ParallelExecutor( + use_cuda=use_cuda, + main_program=train_program, + loss_name=loss.name, + exec_strategy=exec_strateg) + test_exe = fluid.ParallelExecutor( + use_cuda=use_cuda, + main_program=test_program, + share_vars_from=train_exe, + exec_strategy=exec_strateg) + + x = numpy.random.random(size=(10, 1)).astype('float32') + train_exe.run(feed={"X": x}, fetch_list=[loss.name]) + test_exe.run(feed={"X": x}, fetch_list=[loss.name]) + + assert train_exe._need_create_local_exe_scopes() == False + assert test_exe._need_create_local_exe_scopes() == False + + # drop the local execution scope immediately + train_exe.drop_local_exe_scopes() + test_exe.drop_local_exe_scopes() + + assert train_exe._need_create_local_exe_scopes() + assert test_exe._need_create_local_exe_scopes() + + def test_drop_scope(self): + self.check_drop_scope(use_cuda=False) + if fluid.core.is_compiled_with_cuda(): + self.check_drop_scope(use_cuda=True) + + +if __name__ == '__main__': + unittest.main()