diff --git a/paddle/fluid/framework/executor_gc_helper.cc b/paddle/fluid/framework/executor_gc_helper.cc index 1fe6b70d26ec5b5611b977e477ec53ecb7a563e5..43eb1ce8c77f891100d406a0882c2963a297d820 100644 --- a/paddle/fluid/framework/executor_gc_helper.cc +++ b/paddle/fluid/framework/executor_gc_helper.cc @@ -31,49 +31,6 @@ namespace paddle { namespace framework { -struct OpInOutInfo { - public: - void Build(const OperatorBase *op) { - is_built_ = true; - auto &inferer = op->Info().NoNeedBufferVarsInferer(); - if (inferer) { - no_need_buffer_ins_ = inferer(op->Inputs(), op->Outputs(), op->Attrs()); - - if (no_need_buffer_ins_.empty()) return; - - for (auto &in_name_pair : op->Inputs()) { - if (no_need_buffer_ins_.count(in_name_pair.first) != 0) { - continue; - } - - for (auto &in_arg_name : in_name_pair.second) { - other_args_set_.insert(in_arg_name); - } - } - - for (auto &out_name_pair : op->Outputs()) { - for (auto &out_arg_name : out_name_pair.second) { - other_args_set_.insert(out_arg_name); - } - } - } - } - - bool IsBuilt() const { return is_built_; } - - bool IsInArgBufferNeeded(const std::string &in_arg_name) const { - return no_need_buffer_ins_.empty() || - other_args_set_.count(in_arg_name) != 0; - } - - private: - // A set to record unused buffer input vars of op - std::unordered_set no_need_buffer_ins_; - // A set to record other args of op (including in, out) - std::unordered_set other_args_set_; - bool is_built_{false}; -}; - static bool VarCanBeDeleted(const std::string &name, const BlockDesc &block, const std::unordered_set &skip_vars) { if (skip_vars.count(name) != 0) { diff --git a/paddle/fluid/framework/executor_gc_helper.h b/paddle/fluid/framework/executor_gc_helper.h index 184516d4d6160b5ca74f2c9ff7ffa491d2603220..e354a83e5c8caca8c22f6d930fb978485b1f94a4 100644 --- a/paddle/fluid/framework/executor_gc_helper.h +++ b/paddle/fluid/framework/executor_gc_helper.h @@ -31,6 +31,49 @@ class GarbageCollector; class OperatorBase; class Scope; +struct OpInOutInfo { + public: + void Build(const OperatorBase *op) { + is_built_ = true; + auto &inferer = op->Info().NoNeedBufferVarsInferer(); + if (inferer) { + no_need_buffer_ins_ = inferer(op->Inputs(), op->Outputs(), op->Attrs()); + + if (no_need_buffer_ins_.empty()) return; + + for (auto &in_name_pair : op->Inputs()) { + if (no_need_buffer_ins_.count(in_name_pair.first) != 0) { + continue; + } + + for (auto &in_arg_name : in_name_pair.second) { + other_args_set_.insert(in_arg_name); + } + } + + for (auto &out_name_pair : op->Outputs()) { + for (auto &out_arg_name : out_name_pair.second) { + other_args_set_.insert(out_arg_name); + } + } + } + } + + bool IsBuilt() const { return is_built_; } + + bool IsInArgBufferNeeded(const std::string &in_arg_name) const { + return no_need_buffer_ins_.empty() || + other_args_set_.count(in_arg_name) != 0; + } + + private: + // A set to record unused buffer input vars of op + std::unordered_set no_need_buffer_ins_; + // A set to record other args of op (including in, out) + std::unordered_set other_args_set_; + bool is_built_{false}; +}; + std::unordered_map> GetUnusedVars(const BlockDesc &block, const std::vector> &ops, diff --git a/paddle/fluid/framework/new_executor/CMakeLists.txt b/paddle/fluid/framework/new_executor/CMakeLists.txt index 39ab9fe974ae2b9071a0c1c0669a1f246dd37457..13962f985272a65788bfc4703098b6550770af0e 100644 --- a/paddle/fluid/framework/new_executor/CMakeLists.txt +++ b/paddle/fluid/framework/new_executor/CMakeLists.txt @@ -1,9 +1,8 @@ +cc_library(workqueue SRCS workqueue.cc) cc_library(interpretercore SRCS interpretercore.cc DEPS op_registry device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method - graph_to_program_pass variable_helper timer monitor) + graph_to_program_pass variable_helper timer monitor workqueue device_event device_event_gpu) cc_library(standalone_executor SRCS standalone_executor.cc DEPS interpretercore) -cc_library(workqueue SRCS workqueue.cc) - cc_test(workqueue_test SRCS workqueue_test.cc DEPS workqueue) # cc_binary(standalone_executor_test SRCS standalone_executor_test.cc DEPS interpretercore standalone_executor operator op_registry executor ${GLOB_OP_LIB} ${GLOB_OPERATOR_DEPS} profiler) diff --git a/paddle/fluid/framework/new_executor/interpretercore.cc b/paddle/fluid/framework/new_executor/interpretercore.cc index 8a9b478ca73ab74f2396d277136fb43e951deca6..277560a5ecb841beb4ccd42e23ca1129c686fc77 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.cc +++ b/paddle/fluid/framework/new_executor/interpretercore.cc @@ -12,6 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. #include "paddle/fluid/framework/new_executor/interpretercore.h" +#include "paddle/fluid/framework/executor_gc_helper.h" +#include "paddle/fluid/framework/new_executor/interpretercore_gc_helper.h" + +#if defined(PADDLE_WITH_CUDA) +using ::paddle::platform::kCUDA; +USE_EVENT(kCUDA); +#endif #include @@ -146,6 +153,12 @@ InterpreterCore::InterpreterCore(const platform::Place& place, h2d_ctx_pool_({place}), fetch_context_pool_({place}) { is_build_ = false; + + garbages_.reset(new GarbageQueue()); + max_memory_size_ = static_cast(GetEagerDeletionThreshold()); + cur_memory_size_ = 0; + gc_queue_ = CreateSingleThreadedWorkQueue(); + feed_names_ = feed_names; // Step1: add feedop and fetchop to main_program @@ -216,11 +229,24 @@ void InterpreterCore::Convert() { temp_inst.input_index_ = vec_func_list_[i].input_index; temp_inst.output_index_ = vec_func_list_[i].output_index; + OpInOutInfo info; + std::vector gc_check_input_list; for (auto& item : vec_func_list_[i].input_index) { for (auto id : item.second) { input_var2op_info_[id].push_back(i); - gc_check_input_list.push_back(id); + // var can be gc-ed + if (!info.IsBuilt()) { + info.Build(op_list_[i]); + } + if (global_scope_->vec_meta_info_[id].vardesc_) { + if (info.IsInArgBufferNeeded( + global_scope_->vec_meta_info_[id].vardesc_->Name())) { + gc_check_input_list.push_back(id); + } + } else { + gc_check_input_list.push_back(id); + } } } std::sort(gc_check_input_list.begin(), gc_check_input_list.end()); @@ -237,6 +263,13 @@ void InterpreterCore::Convert() { } for (size_t i = 0; i < vec_instruction_.size(); ++i) { +#if defined(PADDLE_WITH_CUDA) + int device_type = static_cast(paddle::platform::DeviceType::CUDA); + paddle::platform::DeviceOption dev_opt( + device_type, BOOST_GET_CONST(platform::CUDAPlace, place_).device); + gc_event_.emplace_back(dev_opt); +#endif + std::vector vec_temp; for (auto& item : vec_instruction_[i].output_index_) { for (auto id : item.second) { @@ -375,11 +408,8 @@ void InterpreterCore::ExecuteInstructionList( } // GC infomation - - auto& gc_check_list = instr_node.gc_check_var_list; - for (auto var_id : gc_check_list) { - --working_var_ref[var_id].var_ref_count_; - } + CheckGC(instr_id, instr_node.gc_check_var_list, var_scope, place, + working_var_ref); } fetch_context_pool_.Get(place)->Wait(); @@ -391,6 +421,87 @@ void InterpreterCore::ExecuteInstructionList( } } +void InterpreterCore::CheckGC(size_t instr_id, + const std::vector& gc_check_list, + const VariableScope& var_scope, + const platform::Place& place, + std::vector& working_var_ref) { + for (auto var_id : gc_check_list) { + --working_var_ref[var_id].var_ref_count_; + if (var_scope.vec_meta_info_[var_id].vardesc_ && + !var_scope.vec_meta_info_[var_id].vardesc_->Persistable() && + working_var_ref[var_id].var_ref_count_ == 0) { + Variable* var = var_scope.var_list[var_id]; + if (var->IsType()) { + garbages_->emplace_back( + var->GetMutable()->MoveMemoryHolder()); + if (garbages_->back()) { + cur_memory_size_ += garbages_->back()->size(); + } + } else if (var->IsType()) { + garbages_->emplace_back(var->GetMutable() + ->mutable_value() + ->MoveMemoryHolder()); + if (garbages_->back()) { + cur_memory_size_ += garbages_->back()->size(); + } + } else if (var->IsType()) { + auto* tensor_arr = var->GetMutable(); + for (auto& t : *tensor_arr) { + garbages_->emplace_back(t.MoveMemoryHolder()); + if (garbages_->back()) { + cur_memory_size_ += garbages_->back()->size(); + } + } + } else { + PADDLE_THROW(platform::errors::Unimplemented( + "The variable(%s) is not supported in eager deletion.", + framework::ToTypeName(var->Type()))); + } + } + } + + if (!garbages_->empty()) { + if (max_memory_size_ <= 1) { +#if defined(PADDLE_WITH_CUDA) + auto* dev_ctx = reinterpret_cast( + platform::DeviceContextPool::Instance().Get(place)); + gc_event_[instr_id].Record(place, dev_ctx); + gc_queue_->AddTask( + [ container = garbages_.release(), event = &gc_event_[instr_id] ]() { + while (!event->Query()) { + continue; + } + delete container; + }); + garbages_.reset(new GarbageQueue()); +#else + delete garbages_.release(); + garbages_.reset(new GarbageQueue()); +#endif + } else if (cur_memory_size_ >= max_memory_size_) { +#if defined(PADDLE_WITH_CUDA) + auto* dev_ctx = reinterpret_cast( + platform::DeviceContextPool::Instance().Get(place)); + gc_event_[instr_id].Record(place, dev_ctx); + gc_queue_->AddTask( + [ container = garbages_.release(), event = &gc_event_[instr_id] ]() { + while (!event->Query()) { + continue; + } + delete container; + }); + garbages_.reset(new GarbageQueue()); + cur_memory_size_ = 0; +#else + delete garbages_.release(); + garbages_.reset(new GarbageQueue()); + cur_memory_size_ = 0; +#endif + } + } +} + std::vector InterpreterCore::MergeVector( const std::vector& first, const std::vector& second) { std::vector out(first.size() + second.size()); @@ -419,6 +530,11 @@ void InterpreterCore::BuildVariableScope(const framework::ProgramDesc& pdesc, auto v = new Variable(); InitializeVariable(v, var->GetType()); var_scope->var_list.push_back(v); + + VariableMetaInfo info; + info.var_ref_count_ = 0; + info.vardesc_ = var; + var_scope->vec_meta_info_.push_back(info); } } } @@ -431,6 +547,7 @@ void InterpreterCore::BuildOpFuncList(const platform::Place& place, auto& global_block = pdesc.Block(0); auto& all_op_kernels = OperatorWithKernel::AllOpKernels(); + std::vector ops; for (auto& op : global_block.AllOps()) { VLOG(3) << "Build OpFuncNode from : " << op->Type(); @@ -446,6 +563,20 @@ void InterpreterCore::BuildOpFuncList(const platform::Place& place, // step 1. Prepare VariableValueMap of input/output auto op_base = info.Creator()(op->Type(), inputs_names, outputs_names, op_attr_map); + ops.push_back(op_base); + } + + auto unused_var_map = get_unused_vars(global_block, ops); + + size_t ops_index = 0; + for (auto& op : global_block.AllOps()) { + VLOG(3) << op->Type(); + // << op->Type() << endl; + + auto op_base = ops[ops_index++]; + + auto inputs_names = op->Inputs(); + auto outputs_names = op->Outputs(); VariableValueMap ins_map; std::map> ins_name2id; @@ -551,6 +682,11 @@ void InterpreterCore::BuildOpFuncList(const platform::Place& place, var_scope->name2id[new_var_name] = var_scope->var_list.size(); var_scope->var_list.push_back(v); + VariableMetaInfo info; + info.var_ref_count_ = 0; + info.vardesc_ = nullptr; + var_scope->vec_meta_info_.push_back(info); + VariableNameMap copy_in_map; auto x_iter = inputs_names.find(var_name_item.first); copy_in_map["X"] = {x_iter->second[i]}; @@ -656,6 +792,47 @@ void InterpreterCore::BuildOpFuncList(const platform::Place& place, op_func_node.kernel_func_ = OpKernelComputeFunc(kernel_iter->second); op_func_node.kernel_func_(exec_ctx); vec_func_list->push_back(op_func_node); + + // gc--------------------------------------------------------------------------- + auto iter = unused_var_map.find(op_base); + if (iter == unused_var_map.end()) { + continue; + } + + auto& delete_vars = iter->second; + std::deque>* garbages = + new std::deque>(); + + for (auto& var_name : delete_vars) { + auto it = var_scope->name2id.find(var_name); + assert(it != var_scope->name2id.end()); + auto* var = var_scope->var_list[it->second]; + if (var == nullptr) { + continue; + } + + VLOG(2) << "Erase variable " << var_name; + if (var->IsType()) { + garbages->emplace_back( + var->GetMutable()->MoveMemoryHolder()); + } else if (var->IsType()) { + garbages->emplace_back(var->GetMutable() + ->mutable_value() + ->MoveMemoryHolder()); + } else if (var->IsType()) { + auto* lod_tensor_arr = var->GetMutable(); + for (auto& t : *lod_tensor_arr) { + garbages->emplace_back(t.MoveMemoryHolder()); + } + } else { + PADDLE_THROW(platform::errors::Unimplemented( + "Type %s of variable %s is not supported eager deletion.", + framework::ToTypeName(var->Type()), var_name)); + } + } + + delete garbages; // free mem + VLOG(3) << "run " << op_base->Type() << " done."; } } diff --git a/paddle/fluid/framework/new_executor/interpretercore.h b/paddle/fluid/framework/new_executor/interpretercore.h index 63a57e0c036b80bc074ba1e2aeab5e1b7e91a7db..de47f3cba8bbddfc58964da6bfc19622b9397130 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.h +++ b/paddle/fluid/framework/new_executor/interpretercore.h @@ -21,9 +21,11 @@ #include "paddle/fluid/framework/new_executor/interpretercore_util.h" #include "paddle/fluid/framework/new_executor/new_executor_defs.h" +#include "paddle/fluid/framework/new_executor/workqueue.h" #include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/tensor.h" #include "paddle/fluid/framework/variable.h" +#include "paddle/fluid/platform/device_event.h" #include "paddle/fluid/platform/event.h" namespace paddle { @@ -31,6 +33,7 @@ namespace framework { class InterpreterCore { public: + using GarbageQueue = std::deque>; InterpreterCore(const platform::Place& place, const ProgramDesc& main_prog, VariableScope* global_scope, const std::vector& feed_names, @@ -64,6 +67,10 @@ class InterpreterCore { void BuildVariableScope(const framework::ProgramDesc& pdesc, VariableScope* var_scope); + void CheckGC(size_t instr_id, const std::vector& gc_check_list, + const VariableScope& var_scope, const platform::Place& place, + std::vector& working_var_ref); // NOLINT + platform::DeviceContext* ParseDeviceContextForInstruction( const OpFuncNode& op_func_node, const OperatorBase& op_base); @@ -96,6 +103,12 @@ class InterpreterCore { std::vector feed_names_; std::map> var_id2event_; + std::vector gc_event_; + std::unique_ptr garbages_; + size_t max_memory_size_; + size_t cur_memory_size_; + std::unique_ptr gc_queue_; + platform::DeviceContextPool fetch_context_pool_; }; } // namespace framework diff --git a/paddle/fluid/framework/new_executor/interpretercore_gc_helper.h b/paddle/fluid/framework/new_executor/interpretercore_gc_helper.h new file mode 100644 index 0000000000000000000000000000000000000000..5352389e68f6e5513ecbeb16b0322baacbc8bbac --- /dev/null +++ b/paddle/fluid/framework/new_executor/interpretercore_gc_helper.h @@ -0,0 +1,93 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include +#include +#include +#include + +#include "paddle/fluid/framework/garbage_collector.h" +#include "paddle/fluid/framework/operator.h" +#include "paddle/fluid/framework/scope.h" + +namespace paddle { +namespace framework { + +bool var_can_be_deleted(const std::string &name, const BlockDesc &block) { + auto *var_desc = block.FindVar(name); + if (var_desc == nullptr || var_desc->Persistable()) { + return false; + } + + auto type = var_desc->Proto()->type().type(); + + return type == proto::VarType::LOD_TENSOR || + type == proto::VarType::SELECTED_ROWS || + type == proto::VarType::LOD_TENSOR_ARRAY; +} + +std::unordered_map> +get_unused_vars(const BlockDesc &block, + const std::vector &ops) { + std::unordered_map var_op_idx_map; + + for (size_t i = 0; i < ops.size(); ++i) { + auto *op = ops[i]; + + OpInOutInfo info; + for (auto &name_pair : op->Inputs()) { + for (auto &name : name_pair.second) { + if (!var_can_be_deleted(name, block)) { + continue; + } + + // var can be gc-ed + if (!info.IsBuilt()) { + info.Build(op); + } + + if (info.IsInArgBufferNeeded(name)) { + // Update the last living op of variable to current op + var_op_idx_map[name] = i; + } else { + VLOG(10) << "Skip reference count computing of variable " + << name_pair.first << "(" << name << ") in Operator " + << op->Type(); + } + } + } + + for (auto &name_pair : op->Outputs()) { + for (auto &name : name_pair.second) { + if (var_can_be_deleted(name, block)) { + // Update the last living op of variable to current op + var_op_idx_map[name] = i; + } + } + } + } + + std::unordered_map> result; + for (auto &name_op_idx_pair : var_op_idx_map) { + auto &name = name_op_idx_pair.first; + size_t op_idx = name_op_idx_pair.second; + result[ops[op_idx]].emplace_back(name); + } + return result; +} + +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/new_executor/new_executor_defs.h b/paddle/fluid/framework/new_executor/new_executor_defs.h index 27526a9455559e384882f069af5d1ff779415bcc..e1fee63b70c6bd3fb21da3cf590bdee5284422b2 100644 --- a/paddle/fluid/framework/new_executor/new_executor_defs.h +++ b/paddle/fluid/framework/new_executor/new_executor_defs.h @@ -35,11 +35,13 @@ struct OpKernelFunc { struct VariableMetaInfo { int var_ref_count_; + paddle::framework::VarDesc* vardesc_; }; struct VariableScope { std::vector var_list; std::map name2id; + std::vector vec_meta_info_; }; struct EventRun { diff --git a/paddle/fluid/framework/new_executor/standalone_executor.cc b/paddle/fluid/framework/new_executor/standalone_executor.cc index f43d1f9f9a982eb45c19f17f34122df40c857abe..665a99d7d761b6044e7d4ef1dd9cc1f5ddbf3aa0 100644 --- a/paddle/fluid/framework/new_executor/standalone_executor.cc +++ b/paddle/fluid/framework/new_executor/standalone_executor.cc @@ -37,6 +37,11 @@ StandaloneExecutor::StandaloneExecutor(const platform::Place& place, } global_scope_.var_list.push_back(v); + + VariableMetaInfo info; + info.var_ref_count_ = 0; + info.vardesc_ = nullptr; + global_scope_.vec_meta_info_.push_back(info); } } @@ -71,6 +76,11 @@ void StandaloneExecutor::BuildVariableOuterScope( auto v = outer_scope->Var(var->Name()); InitializeVariable(v, var->GetType()); var_scope->var_list.push_back(v); + + VariableMetaInfo info; + info.var_ref_count_ = 0; + info.vardesc_ = var; + var_scope->vec_meta_info_.push_back(info); } } } diff --git a/paddle/fluid/framework/new_executor/standalone_executor_test.cc b/paddle/fluid/framework/new_executor/standalone_executor_test.cc index eff505d164ae26b727b7e8c4f108214645d5ab8a..2aad8d245a69aa9a9f5433de48e3706f918303d8 100644 --- a/paddle/fluid/framework/new_executor/standalone_executor_test.cc +++ b/paddle/fluid/framework/new_executor/standalone_executor_test.cc @@ -21,6 +21,8 @@ #include #include +// #include "gperftools/profiler.h" + #include "paddle/fluid/framework/new_executor/standalone_executor.h" USE_OP(fill_constant); @@ -72,26 +74,46 @@ paddle::framework::ProgramDesc load_from_file(const std::string& file_name) { return program_desc; } -int main() { +int main(int argc, char* argv[]) { + std::cout << "main" << std::endl; + int64_t batch_size = std::stoi(argv[1]); paddle::framework::InitDevices(); auto place = paddle::platform::CUDAPlace(0); auto test_prog = load_from_file("lm_startup_program"); auto main_prog = load_from_file("lm_main_program"); + auto& global_block = main_prog.Block(0); + + auto& op1 = global_block.AllOps()[1]; + auto shape1 = BOOST_GET_CONST(std::vector, op1->GetAttr("shape")); + shape1[0] = batch_size * 20; + op1->SetAttr("shape", shape1); + + auto& op2 = global_block.AllOps()[2]; + auto shape2 = BOOST_GET_CONST(std::vector, op2->GetAttr("shape")); + shape2[0] = batch_size; + op2->SetAttr("shape", shape2); + + auto& op3 = global_block.AllOps()[3]; + auto shape3 = BOOST_GET_CONST(std::vector, op3->GetAttr("shape")); + shape3[0] = batch_size; + op3->SetAttr("shape", shape3); + paddle::framework::Scope scope; paddle::framework::StandaloneExecutor exec(place, test_prog, main_prog, &scope); auto start = std::chrono::steady_clock::now(); + // ProfilerStart("new_executor.prof"); for (size_t i = 0; i < 2320; ++i) { if (i % 200 == 0) { std::cout << i << std::endl; } - std::vector vec_out; - exec.Run({}, {}, {}, &vec_out); + exec.Run({}, {}, {}); } + // ProfilerStop(); auto end = std::chrono::steady_clock::now(); std::chrono::duration diff = end - start;