未验证 提交 d7251a8e 编写于 作者: C chengduo 提交者: GitHub

Delete local execution scopes (#19749)

* Add RecordHistoryLocalExecScopes
test=develop
上级 4836ee68
......@@ -80,7 +80,9 @@ cc_test(broadcast_op_test SRCS broadcast_op_handle_test.cc DEPS var_handle op_ha
device_context broadcast_op_handle)
cc_test(gather_op_test SRCS gather_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory
device_context gather_op_handle)
cc_library(scope_buffered_ssa_graph_executor SRCS scope_buffered_ssa_graph_executor.cc DEPS ssa_graph_executor)
cc_library(scope_buffered_monitor SRCS scope_buffered_monitor.cc DEPS scope profiler selected_rows)
cc_library(scope_buffered_ssa_graph_executor SRCS scope_buffered_ssa_graph_executor.cc DEPS ssa_graph_executor scope_buffered_monitor)
#cc_test(reduce_op_handle_test SRCS reduce_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory
# device_context reduce_op_handle )
cc_library(fast_threaded_ssa_graph_executor SRCS fast_threaded_ssa_graph_executor.cc
......
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/details/scope_buffered_monitor.h"
#include <memory>
#include <string>
#include <vector>
#include "paddle/fluid/framework/lod_tensor_array.h"
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/platform/profiler.h"
DECLARE_double(local_exe_sub_scope_limit);
namespace paddle {
namespace framework {
namespace details {
static constexpr double kMB = 1 / (1024 * 1024);
static void GetTensors(Variable *var,
std::unordered_set<Tensor *> *tensor_set) {
if (var->IsType<LoDTensor>() && var->Get<LoDTensor>().IsInitialized()) {
tensor_set->insert(var->GetMutable<LoDTensor>());
} else if (var->IsType<SelectedRows>() &&
var->Get<SelectedRows>().value().IsInitialized()) {
tensor_set->insert(var->GetMutable<SelectedRows>()->mutable_value());
} else if (var->IsType<LoDTensorArray>()) {
auto *tensor_arr = var->GetMutable<LoDTensorArray>();
for (auto &t : *tensor_arr) {
if (t.IsInitialized()) {
tensor_set->insert(&t);
}
}
}
}
static void GetTensors(Scope *scope, std::unordered_set<Tensor *> *tensor_set) {
for (auto &var_name : scope->LocalVarNames()) {
GetTensors(scope->FindVar(var_name), tensor_set);
}
for (auto *kid : scope->kids()) {
GetTensors(kid, tensor_set);
}
}
static size_t GetTensorMemorySize(Scope *scope, bool clear_cpu_tensor) {
std::unordered_set<Tensor *> tensor_set;
GetTensors(scope, &tensor_set);
size_t memory_size = 0;
std::unordered_set<memory::Allocation *> allocation_set;
for (auto *tensor : tensor_set) {
if (clear_cpu_tensor && platform::is_cpu_place(tensor->place())) {
tensor->clear();
} else {
auto allocation = tensor->Holder().get();
if (!allocation_set.count(allocation)) {
memory_size += allocation->size();
allocation_set.insert(allocation);
}
}
}
return memory_size;
}
size_t GetScopeVarMemorySize(Scope *scope) {
return GetTensorMemorySize(scope, false /*clear_cpu_tensor*/);
}
ScopeBufferedMonitor::ScopeBufferedMonitor(
const std::vector<platform::Place> &places,
const std::vector<Scope *> &local_exec_scopes)
: places_(places), local_exec_scopes_(local_exec_scopes) {
pre_local_exec_scopes_.resize(local_exec_scopes_.size());
post_local_exec_scopes_.resize(local_exec_scopes_.size());
}
void ScopeBufferedMonitor::Apply(const std::function<void()> &callback,
bool has_fetch) {
std::unique_ptr<platform::RecordEvent> pre_local_exec_scopes_event(
new platform::RecordEvent(
"ScopeBufferedMonitor::pre_local_exec_scopes_process"));
for (size_t scope_id = 0; scope_id < local_exec_scopes_.size(); ++scope_id) {
pre_local_exec_scopes_.at(scope_id).clear();
auto scopes = local_exec_scopes_.at(scope_id)->kids();
VLOG(10) << "pre_local_exec_scopes[" << scope_id
<< "] sub-scope: " << scopes.size();
pre_local_exec_scopes_.at(scope_id).insert(scopes.begin(), scopes.end());
}
pre_local_exec_scopes_event.reset();
callback();
std::unique_ptr<platform::RecordEvent> post_local_exec_scopes_event(
new platform::RecordEvent(
"ScopeBufferedMonitor::post_local_exec_scopes_process"));
for (size_t scope_id = 0; scope_id < local_exec_scopes_.size(); ++scope_id) {
post_local_exec_scopes_.at(scope_id).clear();
auto scopes = local_exec_scopes_.at(scope_id)->kids();
VLOG(10) << "post_local_exec_scopes[" << scope_id
<< "] sub-scope: " << scopes.size();
post_local_exec_scopes_.at(scope_id).insert(scopes.begin(), scopes.end());
}
history_local_exec_scopes_.emplace_back();
auto &incr_local_exec_scopes = history_local_exec_scopes_.back();
incr_local_exec_scopes.resize(local_exec_scopes_.size());
for (size_t scope_id = 0; scope_id < local_exec_scopes_.size(); ++scope_id) {
for (auto &scope : post_local_exec_scopes_.at(scope_id)) {
if (!pre_local_exec_scopes_.at(scope_id).count(scope)) {
incr_local_exec_scopes.at(scope_id).insert(scope);
}
}
if (VLOG_IS_ON(10)) {
if (incr_local_exec_scopes.at(scope_id).size() &&
FLAGS_local_exe_sub_scope_limit > 0) {
VLOG(10)
<< "FLAGS_local_exe_sub_scope_limit is "
<< FLAGS_local_exe_sub_scope_limit
<< " MBytes now. If you don't need to limit the memory of local "
"execution scope, you should set "
"FLAGS_local_exe_sub_scope_limit=-1.";
}
std::stringstream out;
out << scope_id << " kids: ";
for (auto &scope : incr_local_exec_scopes.at(scope_id)) {
out << scope << ", ";
}
VLOG(10) << out.str();
}
}
size_t history_step = history_local_exec_scopes_.size();
if (has_fetch && history_step >= 2) {
ClearHistoryLocalExecScopes(history_step - 1);
}
// Delete CPU Memory
std::vector<size_t> gpu_memory_size_per_gpu(places_.size());
for (auto &scope_vec : history_local_exec_scopes_) {
for (size_t idx = 0; idx < scope_vec.size(); ++idx) {
for (auto &scope : scope_vec.at(idx)) {
gpu_memory_size_per_gpu.at(idx) +=
GetTensorMemorySize(scope, true /*clear_cpu_tensor*/);
}
}
}
if (VLOG_IS_ON(8)) {
for (size_t idx = 0; idx < gpu_memory_size_per_gpu.size(); ++idx) {
VLOG(8) << "history local exec scopes contains "
<< string::HumanReadableSize(gpu_memory_size_per_gpu.at(idx))
<< " in " << places_.at(idx);
}
}
if (FLAGS_local_exe_sub_scope_limit > 0) {
for (size_t idx = 0; idx < gpu_memory_size_per_gpu.size(); ++idx) {
if (gpu_memory_size_per_gpu.at(idx) / kMB >=
FLAGS_local_exe_sub_scope_limit) {
platform::DeviceContextPool::Instance().Get(places_.at(idx))->Wait();
local_exec_scopes_.at(idx)->DropKids();
}
for (auto &scope_vec : history_local_exec_scopes_) {
scope_vec.at(idx).clear();
}
}
}
}
void ScopeBufferedMonitor::ClearHistoryLocalExecScopes(size_t history_step) {
VLOG(10) << "delete pre_incr_local_exec_scopes.";
for (size_t i = 0; i < history_step; ++i) {
auto &pre_incr_local_exec_scopes = history_local_exec_scopes_.front();
for (size_t scope_idx = 0; scope_idx < pre_incr_local_exec_scopes.size();
++scope_idx) {
for (auto scope : pre_incr_local_exec_scopes[scope_idx]) {
local_exec_scopes_.at(scope_idx)->DeleteScope(scope);
}
}
history_local_exec_scopes_.pop_front();
}
}
void ScopeBufferedMonitor::ClearHistoryLocalExecScopes() {
history_local_exec_scopes_.clear();
}
} // namespace details
} // namespace framework
} // namespace paddle
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <deque>
#include <unordered_set>
#include <utility>
#include <vector>
#include "paddle/fluid/framework/scope.h"
namespace paddle {
namespace framework {
namespace details {
class ScopeBufferedMonitor {
public:
ScopeBufferedMonitor(const std::vector<platform::Place> &places,
const std::vector<Scope *> &local_exec_scopes);
void Apply(const std::function<void()> &callback, bool has_fetch);
void ClearHistoryLocalExecScopes();
void ClearHistoryLocalExecScopes(size_t history_step);
private:
std::vector<platform::Place> places_;
std::vector<Scope *> local_exec_scopes_;
std::vector<std::unordered_set<Scope *>> pre_local_exec_scopes_;
std::vector<std::unordered_set<Scope *>> post_local_exec_scopes_;
std::deque<std::vector<std::unordered_set<Scope *>>>
history_local_exec_scopes_;
};
size_t GetScopeVarMemorySize(Scope *scope);
} // namespace details
} // namespace framework
} // namespace paddle
......@@ -21,49 +21,10 @@
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/variable_helper.h"
#include "paddle/fluid/platform/profiler.h"
namespace paddle {
namespace framework {
namespace details {
static void CollectUniqueAllocations(
const Variable &var,
std::unordered_set<memory::Allocation *> *allocation_set) {
if (var.IsType<LoDTensor>()) {
allocation_set->insert(var.Get<LoDTensor>().Holder().get());
} else if (var.IsType<SelectedRows>()) {
allocation_set->insert(var.Get<SelectedRows>().value().Holder().get());
} else if (var.IsType<LoDTensorArray>()) {
for (auto &t : var.Get<LoDTensorArray>()) {
allocation_set->insert(t.Holder().get());
}
}
}
static void CollectUniqueAllocations(
const Scope &scope,
std::unordered_set<memory::Allocation *> *allocation_set) {
for (auto &var_name : scope.LocalVarNames()) {
CollectUniqueAllocations(*scope.FindVar(var_name), allocation_set);
}
for (auto *kid : scope.kids()) {
CollectUniqueAllocations(*kid, allocation_set);
}
}
static size_t GetScopeVarMemorySize(const Scope &scope) {
std::unordered_set<memory::Allocation *> allocation_set;
CollectUniqueAllocations(scope, &allocation_set);
size_t memory_size = 0;
for (auto *allocation : allocation_set) {
if (allocation) {
memory_size += allocation->size();
}
}
return memory_size;
}
ScopeBufferedSSAGraphExecutor::ScopeBufferedSSAGraphExecutor(
ExecutionStrategy strategy, std::vector<Scope *> local_scopes,
std::vector<Scope *> local_exec_scopes, std::vector<VariableInfo> var_infos,
......@@ -74,7 +35,8 @@ ScopeBufferedSSAGraphExecutor::ScopeBufferedSSAGraphExecutor(
local_scopes_(std::move(local_scopes)),
local_exec_scopes_(std::move(local_exec_scopes)),
var_infos_(std::move(var_infos)),
places_(std::move(places)) {
places_(std::move(places)),
scope_monitor_(places_, local_exec_scopes_) {
PADDLE_ENFORCE_EQ(local_scopes_.size(), local_exec_scopes_.size());
PrepareLocalExeScopes();
}
......@@ -88,16 +50,25 @@ FeedFetchList ScopeBufferedSSAGraphExecutor::Run(
std::vector<framework::LoDTensor> fetch_data;
std::exception_ptr eptr = nullptr;
try {
fetch_data = underlying_executor_->Run(fetch_tensors);
} catch (...) {
eptr = std::current_exception();
auto exe_run_func = [&]() {
try {
fetch_data = underlying_executor_->Run(fetch_tensors);
} catch (...) {
eptr = std::current_exception();
}
};
if (strategy_.num_iteration_per_drop_scope_ == 1) {
exe_run_func();
} else {
scope_monitor_.Apply(exe_run_func, fetch_tensors.size() > 0);
}
if (VLOG_IS_ON(5)) {
for (auto *scope : local_exec_scopes_) {
VLOG(5) << "Left "
<< string::HumanReadableSize(GetScopeVarMemorySize(*scope))
<< string::HumanReadableSize(GetScopeVarMemorySize(scope))
<< " on scope " << scope << " before deleting";
}
}
......@@ -110,7 +81,7 @@ FeedFetchList ScopeBufferedSSAGraphExecutor::Run(
if (VLOG_IS_ON(5)) {
for (auto *scope : local_exec_scopes_) {
VLOG(5) << "Left "
<< string::HumanReadableSize(GetScopeVarMemorySize(*scope))
<< string::HumanReadableSize(GetScopeVarMemorySize(scope))
<< " on scope " << scope << " after deleting";
}
}
......@@ -159,7 +130,7 @@ void ScopeBufferedSSAGraphExecutor::DropLocalExeScopes() {
for (auto &p : places_) {
platform::DeviceContextPool::Instance().Get(p)->Wait();
}
scope_monitor_.ClearHistoryLocalExecScopes();
for (size_t i = 0; i < local_exec_scopes_.size(); ++i) {
local_exec_scopes_[i]->EraseVarsExcept(preserve_vars_[i]);
local_exec_scopes_[i]->DropKids();
......
......@@ -14,17 +14,18 @@
#pragma once
#include <ThreadPool.h>
#include <deque>
#include <list>
#include <memory>
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>
#include "paddle/fluid/framework/details/op_handle_base.h"
#include "paddle/fluid/framework/details/var_handle.h"
#include "paddle/fluid/framework/details/execution_strategy.h"
#include "paddle/fluid/framework/details/op_handle_base.h"
#include "paddle/fluid/framework/details/scope_buffered_monitor.h"
#include "paddle/fluid/framework/details/ssa_graph_executor.h"
#include "paddle/fluid/framework/details/var_handle.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/platform/place.h"
namespace paddle {
......@@ -72,6 +73,7 @@ class ScopeBufferedSSAGraphExecutor : public SSAGraphExecutor {
std::vector<VariableInfo> var_infos_;
std::vector<platform::Place> places_;
ScopeBufferedMonitor scope_monitor_;
};
} // namespace details
} // namespace framework
......
......@@ -437,3 +437,17 @@ DEFINE_uint64(reallocate_gpu_memory_in_mb, 0ul,
"FLAGS_fraction_of_gpu_memory_to_use");
#endif
/**
* Scope related FLAG
* Name: local_exe_sub_scope_limit
* Since Version: 1.6.0
* Value Range: double, default=256 (MB)
* Example:
* Note:
*/
DEFINE_double(local_exe_sub_scope_limit, 256.0, // MBytes
"The memory up limit of sub-scopes of local execution scope for "
"each CUDAPlace. If you don't need to limit the memory, "
"you should set FLAGS_local_exe_sub_scope_limit=-1. "
"The default value is 256 MBytes.");
......@@ -205,7 +205,8 @@ def __bootstrap__():
'reallocate_gpu_memory_in_mb', 'cudnn_deterministic',
'enable_cublas_tensor_op_math', 'conv_workspace_size_limit',
'cudnn_exhaustive_search', 'selected_gpus', 'sync_nccl_allreduce',
'cudnn_batchnorm_spatial_persistent', 'gpu_allocator_retry_time'
'cudnn_batchnorm_spatial_persistent', 'gpu_allocator_retry_time',
'local_exe_sub_scope_limit'
]
core.init_gflags([sys.argv[0]] +
["--tryfromenv=" + ",".join(read_env_flags)])
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册