未验证 提交 bc833945 编写于 作者: C chengduo 提交者: GitHub

Add DropLocalExeScopes in ParallelExecutor (#17297)

* reset drop local scope counter
test=develop
上级 d4b67e16
...@@ -31,6 +31,7 @@ paddle.fluid.memory_optimize (ArgSpec(args=['input_program', 'skip_opt_set', 'pr ...@@ -31,6 +31,7 @@ paddle.fluid.memory_optimize (ArgSpec(args=['input_program', 'skip_opt_set', 'pr
paddle.fluid.release_memory (ArgSpec(args=['input_program', 'skip_opt_set'], varargs=None, keywords=None, defaults=(None,)), ('document', 'ac4114d3df16264f1946deb3a8434a6f')) paddle.fluid.release_memory (ArgSpec(args=['input_program', 'skip_opt_set'], varargs=None, keywords=None, defaults=(None,)), ('document', 'ac4114d3df16264f1946deb3a8434a6f'))
paddle.fluid.DistributeTranspilerConfig.__init__ paddle.fluid.DistributeTranspilerConfig.__init__
paddle.fluid.ParallelExecutor.__init__ (ArgSpec(args=['self', 'use_cuda', 'loss_name', 'main_program', 'share_vars_from', 'exec_strategy', 'build_strategy', 'num_trainers', 'trainer_id', 'scope'], varargs=None, keywords=None, defaults=(None, None, None, None, None, 1, 0, None)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.ParallelExecutor.__init__ (ArgSpec(args=['self', 'use_cuda', 'loss_name', 'main_program', 'share_vars_from', 'exec_strategy', 'build_strategy', 'num_trainers', 'trainer_id', 'scope'], varargs=None, keywords=None, defaults=(None, None, None, None, None, 1, 0, None)), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.ParallelExecutor.drop_local_exe_scopes (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '80d857dc626612e2b2460d0154551e95'))
paddle.fluid.ParallelExecutor.run (ArgSpec(args=['self', 'fetch_list', 'feed', 'feed_dict', 'return_numpy'], varargs=None, keywords=None, defaults=(None, None, True)), ('document', '33ce6ec50f8eeb05d340e6b114b026fd')) paddle.fluid.ParallelExecutor.run (ArgSpec(args=['self', 'fetch_list', 'feed', 'feed_dict', 'return_numpy'], varargs=None, keywords=None, defaults=(None, None, True)), ('document', '33ce6ec50f8eeb05d340e6b114b026fd'))
paddle.fluid.create_lod_tensor (ArgSpec(args=['data', 'recursive_seq_lens', 'place'], varargs=None, keywords=None, defaults=None), ('document', 'b82ea20e2dc5ff2372e0643169ca47ff')) paddle.fluid.create_lod_tensor (ArgSpec(args=['data', 'recursive_seq_lens', 'place'], varargs=None, keywords=None, defaults=None), ('document', 'b82ea20e2dc5ff2372e0643169ca47ff'))
paddle.fluid.create_random_int_lodtensor (ArgSpec(args=['recursive_seq_lens', 'base_shape', 'place', 'low', 'high'], varargs=None, keywords=None, defaults=None), ('document', '74dc6d23185d90a7a50fbac19f5b65fb')) paddle.fluid.create_random_int_lodtensor (ArgSpec(args=['recursive_seq_lens', 'base_shape', 'place', 'low', 'high'], varargs=None, keywords=None, defaults=None), ('document', '74dc6d23185d90a7a50fbac19f5b65fb'))
......
...@@ -27,7 +27,7 @@ namespace paddle { ...@@ -27,7 +27,7 @@ namespace paddle {
namespace framework { namespace framework {
namespace details { namespace details {
constexpr char kLocalExecScopeName[] = "@LOCAL_SCOPE@"; constexpr char kLocalExecScopeName[] = "@LOCAL_EXE_SCOPE@";
// Wraps ir::Node and provide helper utilities. // Wraps ir::Node and provide helper utilities.
// It's responsible for populating necessary fields of ir::Node. // It's responsible for populating necessary fields of ir::Node.
......
...@@ -68,15 +68,7 @@ FeedFetchList ScopeBufferedSSAGraphExecutor::Run( ...@@ -68,15 +68,7 @@ FeedFetchList ScopeBufferedSSAGraphExecutor::Run(
++drop_scope_counter_; ++drop_scope_counter_;
if (drop_scope_counter_ == strategy_.num_iteration_per_drop_scope_) { if (drop_scope_counter_ == strategy_.num_iteration_per_drop_scope_) {
WaitComputationalStreams(); DropLocalExeScopes();
for (auto &scope : local_scopes_) {
auto &local_scope =
*scope->Var(details::kLocalExecScopeName)->GetMutable<Scope *>();
scope->DeleteScope(local_scope);
}
drop_scope_counter_ = 0;
} }
if (eptr) { if (eptr) {
std::rethrow_exception(eptr); std::rethrow_exception(eptr);
...@@ -84,6 +76,25 @@ FeedFetchList ScopeBufferedSSAGraphExecutor::Run( ...@@ -84,6 +76,25 @@ FeedFetchList ScopeBufferedSSAGraphExecutor::Run(
return fetch_data; return fetch_data;
} }
} }
void ScopeBufferedSSAGraphExecutor::DropLocalExeScopes() {
drop_scope_counter_ = 0;
for (auto p : places_) {
platform::DeviceContextPool::Instance().Get(p)->Wait();
}
for (auto &scope : local_scopes_) {
auto &local_scope =
*scope->Var(details::kLocalExecScopeName)->GetMutable<Scope *>();
scope->DeleteScope(local_scope);
VLOG(3) << "Drop local execution scope: " << local_scope;
}
}
bool ScopeBufferedSSAGraphExecutor::NeedCreateLocalExeScope() {
return drop_scope_counter_ == 0;
}
} // namespace details } // namespace details
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
...@@ -47,17 +47,12 @@ class ScopeBufferedSSAGraphExecutor : public SSAGraphExecutor { ...@@ -47,17 +47,12 @@ class ScopeBufferedSSAGraphExecutor : public SSAGraphExecutor {
FeedFetchList Run(const std::vector<std::string>& fetch_tensors) override; FeedFetchList Run(const std::vector<std::string>& fetch_tensors) override;
private: void DropLocalExeScopes();
inline void WaitComputationalStreams() {
// Wait All computational streams bool NeedCreateLocalExeScope();
for (auto p : places_) {
platform::DeviceContextPool::Instance().Get(p)->Wait();
}
}
private: private:
size_t drop_scope_counter_{0}; size_t drop_scope_counter_{0};
ExecutionStrategy strategy_; ExecutionStrategy strategy_;
std::unique_ptr<SSAGraphExecutor> underlying_executor_; std::unique_ptr<SSAGraphExecutor> underlying_executor_;
std::vector<Scope*> local_scopes_; std::vector<Scope*> local_scopes_;
......
...@@ -46,6 +46,7 @@ static std::once_flag gProfileOnce; ...@@ -46,6 +46,7 @@ static std::once_flag gProfileOnce;
#ifdef WITH_GPERFTOOLS #ifdef WITH_GPERFTOOLS
static bool gProfileStarted = false; static bool gProfileStarted = false;
#endif #endif
class ParallelExecutorPrivate { class ParallelExecutorPrivate {
public: public:
explicit ParallelExecutorPrivate(const std::vector<platform::Place> &places) explicit ParallelExecutorPrivate(const std::vector<platform::Place> &places)
...@@ -57,7 +58,7 @@ class ParallelExecutorPrivate { ...@@ -57,7 +58,7 @@ class ParallelExecutorPrivate {
gProfileStarted = true; gProfileStarted = true;
#else #else
LOG(WARNING) << "Paddle is not compiled with gperftools. " LOG(WARNING) << "Paddle is not compiled with gperftools. "
"FLAGS_pe_profile_fname will be ignored"; "FLAGS_pe_profile_fname will be ignored";
#endif #endif
}); });
} }
...@@ -177,6 +178,20 @@ std::vector<Scope *> &ParallelExecutor::GetLocalScopes() { ...@@ -177,6 +178,20 @@ std::vector<Scope *> &ParallelExecutor::GetLocalScopes() {
return member_->local_scopes_; return member_->local_scopes_;
} }
void ParallelExecutor::DropLocalExeScopes() {
auto executor = dynamic_cast<details::ScopeBufferedSSAGraphExecutor *>(
member_->executor_.get());
if (executor) {
executor->DropLocalExeScopes();
}
}
bool ParallelExecutor::NeedCreateLocalExeScope() {
auto executor = dynamic_cast<details::ScopeBufferedSSAGraphExecutor *>(
member_->executor_.get());
return executor && executor->NeedCreateLocalExeScope();
}
ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places, ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
const std::vector<std::string> &bcast_vars, const std::vector<std::string> &bcast_vars,
const std::string &loss_var_name, const std::string &loss_var_name,
...@@ -342,8 +357,8 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places, ...@@ -342,8 +357,8 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
member_->local_scopes_, member_->nranks_, member_->local_scopes_, member_->nranks_,
member_->use_cuda_); member_->use_cuda_);
} }
#endif #endif
auto max_memory_size = GetEagerDeletionThreshold(); auto max_memory_size = GetEagerDeletionThreshold();
VLOG(10) << "Eager Deletion Threshold " VLOG(10) << "Eager Deletion Threshold "
<< static_cast<float>(max_memory_size) / (1 << 30); << static_cast<float>(max_memory_size) / (1 << 30);
......
...@@ -58,6 +58,11 @@ class ParallelExecutor { ...@@ -58,6 +58,11 @@ class ParallelExecutor {
std::vector<Scope *> &GetLocalScopes(); std::vector<Scope *> &GetLocalScopes();
void DropLocalExeScopes();
// This API is used to check whether DropLocalExeScopes work.
bool NeedCreateLocalExeScope();
/** /**
* Feed tensors to local scopes. The size of tensors should be equal to the * Feed tensors to local scopes. The size of tensors should be equal to the
* size of local scopes. * size of local scopes.
......
...@@ -1504,6 +1504,9 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -1504,6 +1504,9 @@ All parameter, weight, gradient are variables in Paddle.
return &self.GetLocalScopes(); return &self.GetLocalScopes();
}, },
py::return_value_policy::reference) py::return_value_policy::reference)
.def("drop_local_exe_scopes", &ParallelExecutor::DropLocalExeScopes)
.def("_need_create_local_exe_scopes",
&ParallelExecutor::NeedCreateLocalExeScope)
.def("feed_tensors_into_local_scopes", .def("feed_tensors_into_local_scopes",
&ParallelExecutor::FeedTensorsIntoLocalScopes) &ParallelExecutor::FeedTensorsIntoLocalScopes)
.def("feed_and_split_tensor_into_local_scopes", .def("feed_and_split_tensor_into_local_scopes",
......
...@@ -288,3 +288,68 @@ class ParallelExecutor(object): ...@@ -288,3 +288,68 @@ class ParallelExecutor(object):
@property @property
def device_count(self): def device_count(self):
return len(self._places) return len(self._places)
def drop_local_exe_scopes(self):
"""
Drop the local execution scope immediately.
During the execution of the Program, the generate intermediate
results are placed in local execution scope, in some model the
creation and deletion of those intermediate results are time-consuming.
To resolve that problem, ParallelExecutor provides an option in
ExecutionStrategy, i.g. num_iteration_per_drop_scope, this option
indicates how many iterations to run before dropping the local execution
scope. But in some situation, each iteration generates different
intermediate results, it will lead to the result that the memory which
is needed by local execution scope gradually increase. And if you want
to run another program at this time, there may be insufficient storage,
At this point you should drop the local execution scope of other Programs.
Examples:
.. code-block:: python
import paddle.fluid as fluid
import numpy
import os
use_cuda = True
# NOTE: If you use CPU to run the program, you need
# to specify the CPU_NUM, otherwise, fluid will use
# all the number of the logic core as the CPU_NUM,
# in that case, the batch size of the input should be
# greater than CPU_NUM, if not, the process will be
# failed by an exception.
if not use_cuda:
os.environ['CPU_NUM'] = str(2)
train_program = fluid.Program()
startup_program = fluid.Program()
with fluid.program_guard(train_program, startup_program):
data = fluid.layers.data(name='X', shape=[1], dtype='float32')
hidden = fluid.layers.fc(input=data, size=10)
loss = fluid.layers.mean(hidden)
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe.run(startup_program)
parallel_exe = fluid.ParallelExecutor(use_cuda=use_cuda,
main_program=train_program,
loss_name=loss.name)
x = numpy.random.random(size=(10, 1)).astype('float32')
loss_data, = parallel_exe.run(feed={"X": x},
fetch_list=[loss.name])
parallel_exe.drop_local_exe_scopes()
"""
assert isinstance(
self._compiled_program._executor,
core.ParallelExecutor), "The Executor should be ParallelExecutor."
self._compiled_program._executor.drop_local_exe_scopes()
# This API is used to check whether DropLocalExeScopes can work.
def _need_create_local_exe_scopes(self):
assert isinstance(
self._compiled_program._executor,
core.ParallelExecutor), "The Executor should be ParallelExecutor."
return self._compiled_program._executor._need_create_local_exe_scopes()
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
import paddle.fluid as fluid
import numpy
import os
class TestParallelExecutorDropExeScope(unittest.TestCase):
def check_drop_scope(self, use_cuda=True):
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
if not use_cuda:
os.environ['CPU_NUM'] = str(2)
train_program = fluid.Program()
startup_program = fluid.Program()
with fluid.program_guard(train_program, startup_program):
data = fluid.layers.data(name='X', shape=[1], dtype='float32')
hidden = fluid.layers.fc(input=data, size=10)
loss = fluid.layers.mean(hidden)
test_program = fluid.default_main_program().clone(for_test=True)
fluid.optimizer.SGD(learning_rate=0.01).minimize(loss)
exe = fluid.Executor(place)
exe.run(startup_program)
exec_strateg = fluid.ExecutionStrategy()
exec_strateg.num_iteration_per_drop_scope = 10
train_exe = fluid.ParallelExecutor(
use_cuda=use_cuda,
main_program=train_program,
loss_name=loss.name,
exec_strategy=exec_strateg)
test_exe = fluid.ParallelExecutor(
use_cuda=use_cuda,
main_program=test_program,
share_vars_from=train_exe,
exec_strategy=exec_strateg)
x = numpy.random.random(size=(10, 1)).astype('float32')
train_exe.run(feed={"X": x}, fetch_list=[loss.name])
test_exe.run(feed={"X": x}, fetch_list=[loss.name])
assert train_exe._need_create_local_exe_scopes() == False
assert test_exe._need_create_local_exe_scopes() == False
# drop the local execution scope immediately
train_exe.drop_local_exe_scopes()
test_exe.drop_local_exe_scopes()
assert train_exe._need_create_local_exe_scopes()
assert test_exe._need_create_local_exe_scopes()
def test_drop_scope(self):
self.check_drop_scope(use_cuda=False)
if fluid.core.is_compiled_with_cuda():
self.check_drop_scope(use_cuda=True)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册