未验证 提交 f1472039 编写于 作者: W wanghuancoder 提交者: GitHub

gc for newexecutor (#35085)

* gc for newexecutor, test=develop

* refine, test=develop

* add interpretercore_gc_helper.h,test=develop

* backup

* gc whit thread and device_event, test=develop

* refine, test=develop

* refine, test=develop

* refine, test=develop

* refine, test=develop

* fix bug, test=develop

* refine, test=develop

* refine, test=develop

* refine, test=develop

* add CheckGC, test=develop
上级 f1275fb6
......@@ -31,49 +31,6 @@
namespace paddle {
namespace framework {
struct OpInOutInfo {
public:
void Build(const OperatorBase *op) {
is_built_ = true;
auto &inferer = op->Info().NoNeedBufferVarsInferer();
if (inferer) {
no_need_buffer_ins_ = inferer(op->Inputs(), op->Outputs(), op->Attrs());
if (no_need_buffer_ins_.empty()) return;
for (auto &in_name_pair : op->Inputs()) {
if (no_need_buffer_ins_.count(in_name_pair.first) != 0) {
continue;
}
for (auto &in_arg_name : in_name_pair.second) {
other_args_set_.insert(in_arg_name);
}
}
for (auto &out_name_pair : op->Outputs()) {
for (auto &out_arg_name : out_name_pair.second) {
other_args_set_.insert(out_arg_name);
}
}
}
}
bool IsBuilt() const { return is_built_; }
bool IsInArgBufferNeeded(const std::string &in_arg_name) const {
return no_need_buffer_ins_.empty() ||
other_args_set_.count(in_arg_name) != 0;
}
private:
// A set to record unused buffer input vars of op
std::unordered_set<std::string> no_need_buffer_ins_;
// A set to record other args of op (including in, out)
std::unordered_set<std::string> other_args_set_;
bool is_built_{false};
};
static bool VarCanBeDeleted(const std::string &name, const BlockDesc &block,
const std::unordered_set<std::string> &skip_vars) {
if (skip_vars.count(name) != 0) {
......
......@@ -31,6 +31,49 @@ class GarbageCollector;
class OperatorBase;
class Scope;
struct OpInOutInfo {
public:
void Build(const OperatorBase *op) {
is_built_ = true;
auto &inferer = op->Info().NoNeedBufferVarsInferer();
if (inferer) {
no_need_buffer_ins_ = inferer(op->Inputs(), op->Outputs(), op->Attrs());
if (no_need_buffer_ins_.empty()) return;
for (auto &in_name_pair : op->Inputs()) {
if (no_need_buffer_ins_.count(in_name_pair.first) != 0) {
continue;
}
for (auto &in_arg_name : in_name_pair.second) {
other_args_set_.insert(in_arg_name);
}
}
for (auto &out_name_pair : op->Outputs()) {
for (auto &out_arg_name : out_name_pair.second) {
other_args_set_.insert(out_arg_name);
}
}
}
}
bool IsBuilt() const { return is_built_; }
bool IsInArgBufferNeeded(const std::string &in_arg_name) const {
return no_need_buffer_ins_.empty() ||
other_args_set_.count(in_arg_name) != 0;
}
private:
// A set to record unused buffer input vars of op
std::unordered_set<std::string> no_need_buffer_ins_;
// A set to record other args of op (including in, out)
std::unordered_set<std::string> other_args_set_;
bool is_built_{false};
};
std::unordered_map<const OperatorBase *, std::vector<std::string>>
GetUnusedVars(const BlockDesc &block,
const std::vector<std::unique_ptr<OperatorBase>> &ops,
......
cc_library(workqueue SRCS workqueue.cc)
cc_library(interpretercore SRCS interpretercore.cc DEPS op_registry
device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method
graph_to_program_pass variable_helper timer monitor)
graph_to_program_pass variable_helper timer monitor workqueue device_event device_event_gpu)
cc_library(standalone_executor SRCS standalone_executor.cc DEPS interpretercore)
cc_library(workqueue SRCS workqueue.cc)
cc_test(workqueue_test SRCS workqueue_test.cc DEPS workqueue)
# cc_binary(standalone_executor_test SRCS standalone_executor_test.cc DEPS interpretercore standalone_executor operator op_registry executor ${GLOB_OP_LIB} ${GLOB_OPERATOR_DEPS} profiler)
......@@ -12,6 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/new_executor/interpretercore.h"
#include "paddle/fluid/framework/executor_gc_helper.h"
#include "paddle/fluid/framework/new_executor/interpretercore_gc_helper.h"
#if defined(PADDLE_WITH_CUDA)
using ::paddle::platform::kCUDA;
USE_EVENT(kCUDA);
#endif
#include <unordered_set>
......@@ -146,6 +153,12 @@ InterpreterCore::InterpreterCore(const platform::Place& place,
h2d_ctx_pool_({place}),
fetch_context_pool_({place}) {
is_build_ = false;
garbages_.reset(new GarbageQueue());
max_memory_size_ = static_cast<size_t>(GetEagerDeletionThreshold());
cur_memory_size_ = 0;
gc_queue_ = CreateSingleThreadedWorkQueue();
feed_names_ = feed_names;
// Step1: add feedop and fetchop to main_program
......@@ -216,11 +229,24 @@ void InterpreterCore::Convert() {
temp_inst.input_index_ = vec_func_list_[i].input_index;
temp_inst.output_index_ = vec_func_list_[i].output_index;
OpInOutInfo info;
std::vector<size_t> gc_check_input_list;
for (auto& item : vec_func_list_[i].input_index) {
for (auto id : item.second) {
input_var2op_info_[id].push_back(i);
gc_check_input_list.push_back(id);
// var can be gc-ed
if (!info.IsBuilt()) {
info.Build(op_list_[i]);
}
if (global_scope_->vec_meta_info_[id].vardesc_) {
if (info.IsInArgBufferNeeded(
global_scope_->vec_meta_info_[id].vardesc_->Name())) {
gc_check_input_list.push_back(id);
}
} else {
gc_check_input_list.push_back(id);
}
}
}
std::sort(gc_check_input_list.begin(), gc_check_input_list.end());
......@@ -237,6 +263,13 @@ void InterpreterCore::Convert() {
}
for (size_t i = 0; i < vec_instruction_.size(); ++i) {
#if defined(PADDLE_WITH_CUDA)
int device_type = static_cast<int>(paddle::platform::DeviceType::CUDA);
paddle::platform::DeviceOption dev_opt(
device_type, BOOST_GET_CONST(platform::CUDAPlace, place_).device);
gc_event_.emplace_back(dev_opt);
#endif
std::vector<size_t> vec_temp;
for (auto& item : vec_instruction_[i].output_index_) {
for (auto id : item.second) {
......@@ -375,11 +408,8 @@ void InterpreterCore::ExecuteInstructionList(
}
// GC infomation
auto& gc_check_list = instr_node.gc_check_var_list;
for (auto var_id : gc_check_list) {
--working_var_ref[var_id].var_ref_count_;
}
CheckGC(instr_id, instr_node.gc_check_var_list, var_scope, place,
working_var_ref);
}
fetch_context_pool_.Get(place)->Wait();
......@@ -391,6 +421,87 @@ void InterpreterCore::ExecuteInstructionList(
}
}
void InterpreterCore::CheckGC(size_t instr_id,
const std::vector<size_t>& gc_check_list,
const VariableScope& var_scope,
const platform::Place& place,
std::vector<VariableMetaInfo>& working_var_ref) {
for (auto var_id : gc_check_list) {
--working_var_ref[var_id].var_ref_count_;
if (var_scope.vec_meta_info_[var_id].vardesc_ &&
!var_scope.vec_meta_info_[var_id].vardesc_->Persistable() &&
working_var_ref[var_id].var_ref_count_ == 0) {
Variable* var = var_scope.var_list[var_id];
if (var->IsType<LoDTensor>()) {
garbages_->emplace_back(
var->GetMutable<LoDTensor>()->MoveMemoryHolder());
if (garbages_->back()) {
cur_memory_size_ += garbages_->back()->size();
}
} else if (var->IsType<SelectedRows>()) {
garbages_->emplace_back(var->GetMutable<SelectedRows>()
->mutable_value()
->MoveMemoryHolder());
if (garbages_->back()) {
cur_memory_size_ += garbages_->back()->size();
}
} else if (var->IsType<LoDTensorArray>()) {
auto* tensor_arr = var->GetMutable<LoDTensorArray>();
for (auto& t : *tensor_arr) {
garbages_->emplace_back(t.MoveMemoryHolder());
if (garbages_->back()) {
cur_memory_size_ += garbages_->back()->size();
}
}
} else {
PADDLE_THROW(platform::errors::Unimplemented(
"The variable(%s) is not supported in eager deletion.",
framework::ToTypeName(var->Type())));
}
}
}
if (!garbages_->empty()) {
if (max_memory_size_ <= 1) {
#if defined(PADDLE_WITH_CUDA)
auto* dev_ctx = reinterpret_cast<platform::CUDADeviceContext*>(
platform::DeviceContextPool::Instance().Get(place));
gc_event_[instr_id].Record(place, dev_ctx);
gc_queue_->AddTask(
[ container = garbages_.release(), event = &gc_event_[instr_id] ]() {
while (!event->Query()) {
continue;
}
delete container;
});
garbages_.reset(new GarbageQueue());
#else
delete garbages_.release();
garbages_.reset(new GarbageQueue());
#endif
} else if (cur_memory_size_ >= max_memory_size_) {
#if defined(PADDLE_WITH_CUDA)
auto* dev_ctx = reinterpret_cast<platform::CUDADeviceContext*>(
platform::DeviceContextPool::Instance().Get(place));
gc_event_[instr_id].Record(place, dev_ctx);
gc_queue_->AddTask(
[ container = garbages_.release(), event = &gc_event_[instr_id] ]() {
while (!event->Query()) {
continue;
}
delete container;
});
garbages_.reset(new GarbageQueue());
cur_memory_size_ = 0;
#else
delete garbages_.release();
garbages_.reset(new GarbageQueue());
cur_memory_size_ = 0;
#endif
}
}
}
std::vector<size_t> InterpreterCore::MergeVector(
const std::vector<size_t>& first, const std::vector<size_t>& second) {
std::vector<size_t> out(first.size() + second.size());
......@@ -419,6 +530,11 @@ void InterpreterCore::BuildVariableScope(const framework::ProgramDesc& pdesc,
auto v = new Variable();
InitializeVariable(v, var->GetType());
var_scope->var_list.push_back(v);
VariableMetaInfo info;
info.var_ref_count_ = 0;
info.vardesc_ = var;
var_scope->vec_meta_info_.push_back(info);
}
}
}
......@@ -431,6 +547,7 @@ void InterpreterCore::BuildOpFuncList(const platform::Place& place,
auto& global_block = pdesc.Block(0);
auto& all_op_kernels = OperatorWithKernel::AllOpKernels();
std::vector<OperatorBase*> ops;
for (auto& op : global_block.AllOps()) {
VLOG(3) << "Build OpFuncNode from : " << op->Type();
......@@ -446,6 +563,20 @@ void InterpreterCore::BuildOpFuncList(const platform::Place& place,
// step 1. Prepare VariableValueMap of input/output
auto op_base =
info.Creator()(op->Type(), inputs_names, outputs_names, op_attr_map);
ops.push_back(op_base);
}
auto unused_var_map = get_unused_vars(global_block, ops);
size_t ops_index = 0;
for (auto& op : global_block.AllOps()) {
VLOG(3) << op->Type();
// << op->Type() << endl;
auto op_base = ops[ops_index++];
auto inputs_names = op->Inputs();
auto outputs_names = op->Outputs();
VariableValueMap ins_map;
std::map<std::string, std::vector<int>> ins_name2id;
......@@ -551,6 +682,11 @@ void InterpreterCore::BuildOpFuncList(const platform::Place& place,
var_scope->name2id[new_var_name] = var_scope->var_list.size();
var_scope->var_list.push_back(v);
VariableMetaInfo info;
info.var_ref_count_ = 0;
info.vardesc_ = nullptr;
var_scope->vec_meta_info_.push_back(info);
VariableNameMap copy_in_map;
auto x_iter = inputs_names.find(var_name_item.first);
copy_in_map["X"] = {x_iter->second[i]};
......@@ -656,6 +792,47 @@ void InterpreterCore::BuildOpFuncList(const platform::Place& place,
op_func_node.kernel_func_ = OpKernelComputeFunc(kernel_iter->second);
op_func_node.kernel_func_(exec_ctx);
vec_func_list->push_back(op_func_node);
// gc---------------------------------------------------------------------------
auto iter = unused_var_map.find(op_base);
if (iter == unused_var_map.end()) {
continue;
}
auto& delete_vars = iter->second;
std::deque<std::shared_ptr<memory::Allocation>>* garbages =
new std::deque<std::shared_ptr<memory::Allocation>>();
for (auto& var_name : delete_vars) {
auto it = var_scope->name2id.find(var_name);
assert(it != var_scope->name2id.end());
auto* var = var_scope->var_list[it->second];
if (var == nullptr) {
continue;
}
VLOG(2) << "Erase variable " << var_name;
if (var->IsType<LoDTensor>()) {
garbages->emplace_back(
var->GetMutable<LoDTensor>()->MoveMemoryHolder());
} else if (var->IsType<SelectedRows>()) {
garbages->emplace_back(var->GetMutable<SelectedRows>()
->mutable_value()
->MoveMemoryHolder());
} else if (var->IsType<LoDTensorArray>()) {
auto* lod_tensor_arr = var->GetMutable<LoDTensorArray>();
for (auto& t : *lod_tensor_arr) {
garbages->emplace_back(t.MoveMemoryHolder());
}
} else {
PADDLE_THROW(platform::errors::Unimplemented(
"Type %s of variable %s is not supported eager deletion.",
framework::ToTypeName(var->Type()), var_name));
}
}
delete garbages; // free mem
VLOG(3) << "run " << op_base->Type() << " done.";
}
}
......
......@@ -21,9 +21,11 @@
#include "paddle/fluid/framework/new_executor/interpretercore_util.h"
#include "paddle/fluid/framework/new_executor/new_executor_defs.h"
#include "paddle/fluid/framework/new_executor/workqueue.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/platform/device_event.h"
#include "paddle/fluid/platform/event.h"
namespace paddle {
......@@ -31,6 +33,7 @@ namespace framework {
class InterpreterCore {
public:
using GarbageQueue = std::deque<std::shared_ptr<memory::Allocation>>;
InterpreterCore(const platform::Place& place, const ProgramDesc& main_prog,
VariableScope* global_scope,
const std::vector<std::string>& feed_names,
......@@ -64,6 +67,10 @@ class InterpreterCore {
void BuildVariableScope(const framework::ProgramDesc& pdesc,
VariableScope* var_scope);
void CheckGC(size_t instr_id, const std::vector<size_t>& gc_check_list,
const VariableScope& var_scope, const platform::Place& place,
std::vector<VariableMetaInfo>& working_var_ref); // NOLINT
platform::DeviceContext* ParseDeviceContextForInstruction(
const OpFuncNode& op_func_node, const OperatorBase& op_base);
......@@ -96,6 +103,12 @@ class InterpreterCore {
std::vector<std::string> feed_names_;
std::map<size_t, std::shared_ptr<platform::CudaEvent>> var_id2event_;
std::vector<paddle::platform::DeviceEvent> gc_event_;
std::unique_ptr<GarbageQueue> garbages_;
size_t max_memory_size_;
size_t cur_memory_size_;
std::unique_ptr<WorkQueue> gc_queue_;
platform::DeviceContextPool fetch_context_pool_;
};
} // namespace framework
......
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
#include "paddle/fluid/framework/garbage_collector.h"
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/framework/scope.h"
namespace paddle {
namespace framework {
bool var_can_be_deleted(const std::string &name, const BlockDesc &block) {
auto *var_desc = block.FindVar(name);
if (var_desc == nullptr || var_desc->Persistable()) {
return false;
}
auto type = var_desc->Proto()->type().type();
return type == proto::VarType::LOD_TENSOR ||
type == proto::VarType::SELECTED_ROWS ||
type == proto::VarType::LOD_TENSOR_ARRAY;
}
std::unordered_map<const paddle::framework::OperatorBase *,
std::vector<std::string>>
get_unused_vars(const BlockDesc &block,
const std::vector<OperatorBase *> &ops) {
std::unordered_map<std::string, size_t> var_op_idx_map;
for (size_t i = 0; i < ops.size(); ++i) {
auto *op = ops[i];
OpInOutInfo info;
for (auto &name_pair : op->Inputs()) {
for (auto &name : name_pair.second) {
if (!var_can_be_deleted(name, block)) {
continue;
}
// var can be gc-ed
if (!info.IsBuilt()) {
info.Build(op);
}
if (info.IsInArgBufferNeeded(name)) {
// Update the last living op of variable to current op
var_op_idx_map[name] = i;
} else {
VLOG(10) << "Skip reference count computing of variable "
<< name_pair.first << "(" << name << ") in Operator "
<< op->Type();
}
}
}
for (auto &name_pair : op->Outputs()) {
for (auto &name : name_pair.second) {
if (var_can_be_deleted(name, block)) {
// Update the last living op of variable to current op
var_op_idx_map[name] = i;
}
}
}
}
std::unordered_map<const OperatorBase *, std::vector<std::string>> result;
for (auto &name_op_idx_pair : var_op_idx_map) {
auto &name = name_op_idx_pair.first;
size_t op_idx = name_op_idx_pair.second;
result[ops[op_idx]].emplace_back(name);
}
return result;
}
} // namespace framework
} // namespace paddle
......@@ -35,11 +35,13 @@ struct OpKernelFunc {
struct VariableMetaInfo {
int var_ref_count_;
paddle::framework::VarDesc* vardesc_;
};
struct VariableScope {
std::vector<Variable*> var_list;
std::map<std::string, int> name2id;
std::vector<VariableMetaInfo> vec_meta_info_;
};
struct EventRun {
......
......@@ -37,6 +37,11 @@ StandaloneExecutor::StandaloneExecutor(const platform::Place& place,
}
global_scope_.var_list.push_back(v);
VariableMetaInfo info;
info.var_ref_count_ = 0;
info.vardesc_ = nullptr;
global_scope_.vec_meta_info_.push_back(info);
}
}
......@@ -71,6 +76,11 @@ void StandaloneExecutor::BuildVariableOuterScope(
auto v = outer_scope->Var(var->Name());
InitializeVariable(v, var->GetType());
var_scope->var_list.push_back(v);
VariableMetaInfo info;
info.var_ref_count_ = 0;
info.vardesc_ = var;
var_scope->vec_meta_info_.push_back(info);
}
}
}
......
......@@ -21,6 +21,8 @@
#include <unordered_map>
#include <vector>
// #include "gperftools/profiler.h"
#include "paddle/fluid/framework/new_executor/standalone_executor.h"
USE_OP(fill_constant);
......@@ -72,26 +74,46 @@ paddle::framework::ProgramDesc load_from_file(const std::string& file_name) {
return program_desc;
}
int main() {
int main(int argc, char* argv[]) {
std::cout << "main" << std::endl;
int64_t batch_size = std::stoi(argv[1]);
paddle::framework::InitDevices();
auto place = paddle::platform::CUDAPlace(0);
auto test_prog = load_from_file("lm_startup_program");
auto main_prog = load_from_file("lm_main_program");
auto& global_block = main_prog.Block(0);
auto& op1 = global_block.AllOps()[1];
auto shape1 = BOOST_GET_CONST(std::vector<int64_t>, op1->GetAttr("shape"));
shape1[0] = batch_size * 20;
op1->SetAttr("shape", shape1);
auto& op2 = global_block.AllOps()[2];
auto shape2 = BOOST_GET_CONST(std::vector<int64_t>, op2->GetAttr("shape"));
shape2[0] = batch_size;
op2->SetAttr("shape", shape2);
auto& op3 = global_block.AllOps()[3];
auto shape3 = BOOST_GET_CONST(std::vector<int64_t>, op3->GetAttr("shape"));
shape3[0] = batch_size;
op3->SetAttr("shape", shape3);
paddle::framework::Scope scope;
paddle::framework::StandaloneExecutor exec(place, test_prog, main_prog,
&scope);
auto start = std::chrono::steady_clock::now();
// ProfilerStart("new_executor.prof");
for (size_t i = 0; i < 2320; ++i) {
if (i % 200 == 0) {
std::cout << i << std::endl;
}
std::vector<paddle::framework::Tensor> vec_out;
exec.Run({}, {}, {}, &vec_out);
exec.Run({}, {}, {});
}
// ProfilerStop();
auto end = std::chrono::steady_clock::now();
std::chrono::duration<double> diff = end - start;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册