diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index 555cd91d242f82d58260e0367613a35444452b14..284c438e395d38a0e8f98a564320adf92affe6d6 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -340,7 +340,7 @@ cc_library(parallel_executor SRCS parallel_executor.cc DEPS graph build_strategy bind_threaded_ssa_graph_executor collective_helper fast_threaded_ssa_graph_executor variable_helper) -cc_library(executor_cache SRCS executor_cache.cc DEPS executor) +cc_library(executor_cache SRCS executor_cache.cc DEPS parallel_executor) if(WITH_PSCORE) get_property(RPC_DEPS GLOBAL PROPERTY RPC_DEPS) cc_test(dist_multi_trainer_test SRCS dist_multi_trainer_test.cc DEPS diff --git a/paddle/fluid/framework/details/CMakeLists.txt b/paddle/fluid/framework/details/CMakeLists.txt index 9d2bf5bf3fe27da4096e959ea8068ee38d487c82..219b6581fd7044d626e48ee944321ac717e06f7f 100644 --- a/paddle/fluid/framework/details/CMakeLists.txt +++ b/paddle/fluid/framework/details/CMakeLists.txt @@ -88,16 +88,12 @@ cc_library(eager_deletion_op_handle SRCS eager_deletion_op_handle.cc DEPS lod_te set(SSA_GRAPH_EXECUTOR_DEPS graph framework_proto multi_devices_helper - sequential_execution_pass - modify_op_lock_and_record_event_pass - all_reduce_deps_pass reference_count_pass eager_deletion_pass buffer_shared_inplace_op_pass buffer_shared_cross_op_memory_reuse_pass inplace_addto_op_pass - set_reader_device_info_utils - add_reader_dependency_pass) + set_reader_device_info_utils) cc_library(ssa_graph_executor SRCS ssa_graph_executor.cc DEPS ${SSA_GRAPH_EXECUTOR_DEPS}) cc_library(threaded_ssa_graph_executor SRCS threaded_ssa_graph_executor.cc DEPS fetch_op_handle ssa_graph_executor scope @@ -132,6 +128,10 @@ set(IR_PASS_DEPS graph_viz_pass multi_devices_graph_pass multi_batch_merge_pass fuse_relu_depthwise_conv_pass lock_free_optimize_pass + sequential_execution_pass + all_reduce_deps_pass + add_reader_dependency_pass + modify_op_lock_and_record_event_pass coalesce_grad_tensor_pass fuse_all_reduce_op_pass backward_optimizer_op_deps_pass fuse_adam_op_pass fuse_sgd_op_pass fuse_momentum_op_pass sync_batch_norm_pass runtime_context_cache_pass) diff --git a/paddle/fluid/framework/details/eager_deletion_op_handle.cc b/paddle/fluid/framework/details/eager_deletion_op_handle.cc index 2fefbd61776e2838d1401a2c498b01dab14df75d..ba076173b4a21a42bd2c84019e7ed9aa5c6730c9 100644 --- a/paddle/fluid/framework/details/eager_deletion_op_handle.cc +++ b/paddle/fluid/framework/details/eager_deletion_op_handle.cc @@ -109,7 +109,8 @@ void EagerDeletionOpHandle::CallOnce() { std::string EagerDeletionOpHandle::Name() const { return "eager_deletion"; } void EagerDeletionOpHandle::RunImpl() { - if (vars_.size() != var_infos_.size()) { + if (vars_.size() != var_infos_.size() || is_variant_scope_) { + vars_.clear(); CallOnce(); } @@ -119,6 +120,7 @@ void EagerDeletionOpHandle::RunImpl() { auto *var_info = var_infos_[i]; if (var_info->IsSkippedAllMemoryOptimization() || !var_info->DecreaseRefCnt()) { + VLOG(4) << "skip memory optimization with var: " << var_info->Name(); continue; } diff --git a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc index e13059e36d32c59bca84dab73ad5bafcc8e2e15d..8b41b99ac7ab86a54575c82c6c58763dd7ffa934 100644 --- a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc @@ -76,7 +76,6 @@ FetchResultType FastThreadedSSAGraphExecutor::Run( std::vector fetch_ops; std::vector ready_fetch_ops; exception_.Clear(); - InsertFetchOps(fetch_tensors, &fetches, &fetched_vars, op_deps.get(), &fetch_ops, &ready_fetch_ops, return_merged); event.reset(nullptr); @@ -95,6 +94,8 @@ FetchResultType FastThreadedSSAGraphExecutor::Run( traced_ops_.clear(); remaining_ = 0; auto complete_q = std::make_shared>(); + VLOG(3) << "number of bootstrap_ops_: " << bootstrap_ops_.size(); + VLOG(3) << "number of ready_fetch_ops: " << ready_fetch_ops.size(); for (auto op : bootstrap_ops_) { RunOpAsync(op_deps.get(), op, complete_q); } @@ -247,11 +248,10 @@ void FastThreadedSSAGraphExecutor::RunOpAsync( RunOpAsync(op_deps, post_op, complete_q); } } - + VLOG(3) << "start to run op: " << op_to_run->Name(); if (!RunOp(op_to_run, complete_q, &complete)) { return; } - auto &outputs = op_to_run->Outputs(); op_to_run = nullptr; for (auto &output : outputs) { diff --git a/paddle/fluid/framework/details/op_handle_base.h b/paddle/fluid/framework/details/op_handle_base.h index 93bdf92f197dd66e56c53b4c60687692ce00ba24..cbad39c87161f160164cef3ea2776797676b9269 100644 --- a/paddle/fluid/framework/details/op_handle_base.h +++ b/paddle/fluid/framework/details/op_handle_base.h @@ -136,6 +136,10 @@ class OpHandleBase { void SetLocalExecScopes( const std::unordered_map &scope_map); + void SetIsVariantScope(bool is_variant_scope) { + is_variant_scope_ = is_variant_scope; + } + protected: virtual std::vector GetLocalScopes() = 0; @@ -156,6 +160,12 @@ class OpHandleBase { std::vector local_exec_scopes_; bool skip_running_ = false; + // NOTE(Aurelius84): Indicate whether scope held in OpHandle is chanageable. + // Ophandle's scope noramlly keep same in most cases, except running + // run_program_op from @to_static. + // The scope may be chanaged while each training iteration. + // See https://github.com/PaddlePaddle/Paddle/pull/32283 + bool is_variant_scope_ = false; #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) std::unordered_map events_; diff --git a/paddle/fluid/framework/details/share_tensor_buffer_functor.cc b/paddle/fluid/framework/details/share_tensor_buffer_functor.cc index 315ee59c91eeaf2748c5f76e428cc9aa0e255ee6..0dc83448b17d100de43c3768719b7a9ffee683d5 100644 --- a/paddle/fluid/framework/details/share_tensor_buffer_functor.cc +++ b/paddle/fluid/framework/details/share_tensor_buffer_functor.cc @@ -41,7 +41,8 @@ static inline const Tensor &GetTensorFromVar(const Variable *var) { return var->Get(); } else { PADDLE_THROW(platform::errors::InvalidArgument( - "Variable must be type of LoDTensor.")); + "Variable must be type of LoDTensor, but received %s.", + framework::ToTypeName(var->Type()))); } } @@ -50,19 +51,22 @@ static inline Tensor *GetMutableTensorFromVar(Variable *var) { return var->GetMutable(); } else { PADDLE_THROW(platform::errors::InvalidArgument( - "Variable must be type of LoDTensor.")); + "Variable must be type of LoDTensor, but received %s.", + framework::ToTypeName(var->Type()))); } } ShareTensorBufferFunctor::ShareTensorBufferFunctor( Scope *scope, size_t scope_idx, const std::string &op_type, const std::vector &in_var_infos, - const std::vector &out_var_names, bool share_dims) + const std::vector &out_var_names, const bool &is_variant_scope, + bool share_dims) : scope_(scope), scope_idx_(scope_idx), op_type_(op_type), in_var_infos_(in_var_infos), out_var_names_(out_var_names), + is_variant_scope_(is_variant_scope), share_dims_(share_dims) { PADDLE_ENFORCE_EQ(in_var_infos_.size(), out_var_names_.size(), platform::errors::PreconditionNotMet( @@ -126,12 +130,13 @@ void ShareTensorBufferFunctor::CallOnce() { } void ShareTensorBufferFunctor::operator()(Scope *exec_scope) { - if (!exec_scope_) { + if (!exec_scope_ || is_variant_scope_) { PADDLE_ENFORCE_NOT_NULL(exec_scope, platform::errors::InvalidArgument( "The given execution scope should not be NULL " "if the cached scope is NULL.")); exec_scope_ = exec_scope; + in_out_vars_.clear(); CallOnce(); } else { PADDLE_ENFORCE_EQ(exec_scope_, exec_scope, diff --git a/paddle/fluid/framework/details/share_tensor_buffer_functor.h b/paddle/fluid/framework/details/share_tensor_buffer_functor.h index 79326e4532dc2e7da4b2bfe9739fb587acdf1c6e..d464098202a54f860ee4196c669b3f8161231f0b 100644 --- a/paddle/fluid/framework/details/share_tensor_buffer_functor.h +++ b/paddle/fluid/framework/details/share_tensor_buffer_functor.h @@ -51,7 +51,8 @@ class ShareTensorBufferFunctor { ShareTensorBufferFunctor( Scope *scope, size_t scope_idx, const std::string &op_type, const std::vector &in_var_infos, - const std::vector &out_var_names, bool share_dims = false); + const std::vector &out_var_names, + const bool &is_variant_scope, bool share_dims = false); void AddReuseVarPair(const ir::MemOptVarInfo *in_var_info, const std::string &out_var_name); @@ -80,6 +81,9 @@ class ShareTensorBufferFunctor { std::vector> in_out_vars_; + // NOTE(Aurelius84): Use const reference to always keep consistant with + // share_tensor_buffer_op_handle. + const bool &is_variant_scope_; // NOTE(zhiqiu): In the case of inplace addto, if the operator of // the in_out_vars is skipped during running, we should set the dims of output // as the same as input. diff --git a/paddle/fluid/framework/details/share_tensor_buffer_op_handle.cc b/paddle/fluid/framework/details/share_tensor_buffer_op_handle.cc index f75cd982f7f40bfdad2cc67ce4800f908a2919f1..7e10c669ac478b89db7fac26eb850811fab3aab3 100644 --- a/paddle/fluid/framework/details/share_tensor_buffer_op_handle.cc +++ b/paddle/fluid/framework/details/share_tensor_buffer_op_handle.cc @@ -67,7 +67,7 @@ ShareTensorBufferOpHandle::ShareTensorBufferOpHandle( const std::vector &out_var_names, bool share_dims) : OpHandleBase(node), functor_(scope, scope_idx, op_type, in_var_infos, out_var_names, - share_dims) {} + is_variant_scope_, share_dims) {} std::unordered_map ShareTensorBufferOpHandle::ReusedVars() const { diff --git a/paddle/fluid/framework/executor_cache.cc b/paddle/fluid/framework/executor_cache.cc index 36cee418f9532163f957f3415c20866a8964e301..8ec75e992b3fcd542f0d344eaac542d7fc5fd2ff 100644 --- a/paddle/fluid/framework/executor_cache.cc +++ b/paddle/fluid/framework/executor_cache.cc @@ -16,7 +16,6 @@ namespace paddle { namespace framework { -class BlockDesc; class ProgramDesc; } // namespace framework } // namespace paddle @@ -26,45 +25,89 @@ namespace framework { namespace details { -static void AppendSkipDeletionVars(const std::vector &append_vars, - std::vector *all_vars) { +static ExecutionStrategy GetExecutionStrategy( + const ExecutorInfoCache::CacheKey &cache_key) { + framework::ExecutionStrategy execution_strategy; + + switch (cache_key.device_type_) { + case platform::DeviceType::CPU: { + execution_strategy.num_threads_ = 2; + break; + } + case platform::DeviceType::CUDA: { + // NOTE: According experiments, one thread is faster in + // most model training. + execution_strategy.num_threads_ = 1; + break; + } + case platform::DeviceType::XPU: { + execution_strategy.num_threads_ = 1; + break; + } + default: + PADDLE_THROW(platform::errors::Unavailable("Unsupported Device type %d.", + cache_key.device_type_)); + } + execution_strategy.use_device_ = cache_key.device_type_; + + return execution_strategy; +} + +void AppendSkipDeletionVars(const std::vector &append_vars, + std::vector *all_vars) { for (auto &var : append_vars) { all_vars->emplace_back(var); } } -static void AppendSafeEagerDeletionSkipVars( - const framework::ProgramDesc &program, - std::vector *skip_vars) { - const framework::BlockDesc &block = program.Block(0); - const std::vector &all_ops = block.AllOps(); - - std::unordered_set grad_op_output; - std::unordered_set grad_op_input; - for (const framework::OpDesc *op : all_ops) { - int op_role = BOOST_GET_CONST( - int, op->GetAttr(framework::OpProtoAndCheckerMaker::OpRoleAttrName())); - if ((op_role & static_cast(framework::OpRole::kBackward)) == 0) { - continue; - } - +/* + * NOTE(Aurelius84): In ParallelExecutor, memory optimized pass will be applied. + * To avoid eagerly deleting last alive variables which are necessary in + * backward program, we firstly parse these variable names as + * skip_eager_vars. While executing pe.run skip_eager_vars are used to + * skip memory optimization. + * + * Variables satisfying the following rules are considered as skip_eager_var: + * + * 1. it is an output var in run_program_op + * 2. it is an input var used in backward_op + */ +void ParseSafeEagerDeletionSkipVars( + const ProgramDesc &program, int64_t forward_op_nums, + const std::vector &output_var_names, + std::vector *skip_eager_delete_vars) { + auto all_ops = program.Block(0).AllOps(); + // NOTE: skip `shape` and `fill_constant` op created by + // fluid.backward.gradients, one forward output will generate one `shape` + // and `fill_constant`. + size_t backward_op_start_index = + forward_op_nums + (output_var_names.size() * 2); + + // step 2: parse the necessary variable of backward op + std::unordered_set op_outputs; + std::unordered_set op_inputs; + for (auto i = backward_op_start_index; i < all_ops.size(); ++i) { + framework::OpDesc *op = all_ops[i]; for (const std::string &in_arg_name : op->InputArgumentNames()) { - grad_op_input.emplace(in_arg_name); + op_inputs.emplace(in_arg_name); } for (const std::string &out_arg_name : op->OutputArgumentNames()) { - grad_op_output.emplace(out_arg_name); + op_outputs.emplace(out_arg_name); } } // For the grad op input variables, if it is not output of grad_op, it may // be output of forward op and we should set the variables as skip_var to // prevent it being deleted when grad op is called multiple times. - for (const std::string &var_name : grad_op_input) { - if (grad_op_output.find(var_name) == grad_op_output.end()) { - skip_vars->emplace_back(var_name); + for (const std::string &var_name : op_inputs) { + if (op_outputs.find(var_name) == op_outputs.end()) { + VLOG(2) << "skip eager var: " << var_name; + skip_eager_delete_vars->emplace_back(var_name); } } + VLOG(3) << "Found skip_eager_delete_vars: " << skip_eager_delete_vars->size(); } + } // namespace details // C++11 removes the need for manual locking. Concurrent execution shall wait if @@ -75,38 +118,58 @@ ExecutorInfoCache &ExecutorInfoCache::Instance() { return g_exe_cache_info_map; } -std::shared_ptr GetExecutorInfoFromCache( - const framework::Executor &exe, const framework::ExecutionContext &ctx, - const std::vector> &ctx_output_names, - bool is_grad) { - auto *program = ctx.Attr("global_block")->Program(); +void ExecutorInfoCache::Finalize() { + // NOTE(Aurelius84): DO NOT perform finalize in destructor + // to avoid problems caused by destructor order of static + // object. + info_map_.clear(); +} +CacheInfo GetExecutorInfoFromCache(const ExecutorInfoCache::CacheKey &cache_key, + framework::Scope *scope) { auto &cached_exe_info = framework::ExecutorInfoCache::Instance(); - auto cache_key = framework::ExecutorInfoCache::KeyInfo(program, is_grad); if (!cached_exe_info.Has(cache_key)) { - VLOG(1) << "create exe_info for program: " << program - << " is_grad: " << is_grad; - // skip delete vars - std::vector skip_vars; - for (auto &output_names : ctx_output_names) { - details::AppendSkipDeletionVars(output_names, &skip_vars); - } - if (is_grad) { - details::AppendSafeEagerDeletionSkipVars(*program, &skip_vars); + VLOG(1) << "create exe_info for " << cache_key.DebugString(); + + // TODO(Aurelius84): Consider to use LRU algorithm to replace this. + if (cached_exe_info.Size() > 4u /* max_cached_size*/) { + VLOG(2) << "The cached info size has exceeded max_cached_size: 4, clear " + "all cache!"; + cached_exe_info.Finalize(); } - VLOG(2) << "Prepare to skip " << skip_vars.size() - << " var(s): " << string::join_strings(skip_vars, ' '); - std::shared_ptr exe_ctx = - std::move(exe.Prepare(*program, /*block_id=*/0, skip_vars)); + framework::BuildStrategy build_strategy; + auto execution_strategy = details::GetExecutionStrategy(cache_key); + + auto graph = std::make_shared( + *cache_key.program_desc_, cache_key.start_op_index_, + cache_key.end_op_index_); + auto parallel_executor = std::make_shared( + cache_key.place_, scope, execution_strategy, build_strategy, + graph.get()); + parallel_executor->PrepareVariables(scope); + + framework::ExecutorInfoCache::ValueType cache_val = {parallel_executor, + graph}; + cached_exe_info.Insert(cache_key, cache_val); - cached_exe_info.Insert(cache_key, exe_ctx); - return exe_ctx; + bool is_new_created = true; + return std::make_pair(parallel_executor, is_new_created); } else { - VLOG(1) << "get exe_info from cache by program: " << program - << " is_grad: " << is_grad; - return cached_exe_info.Get(cache_key); + VLOG(1) << "get exe_info from cache by: " << cache_key.DebugString(); + bool is_new_created = false; + auto cache_val = cached_exe_info.GetMutable(cache_key); + auto parallel_executor = cache_val.first; + + // update op_handle scope_map in pe->executor_->Graph + std::unordered_map scope_map = { + {parallel_executor->GetLocalScopes().front(), scope}}; + parallel_executor->ResetOpHandleScopeMapOfGraphs(scope_map); + // need to recreate tmp variables in new scope + parallel_executor->PrepareVariables(scope); + + return std::make_pair(parallel_executor, is_new_created); } } diff --git a/paddle/fluid/framework/executor_cache.h b/paddle/fluid/framework/executor_cache.h index 3beeacb1010d2687ac0dfd58092773f52c4fafdc..3e08579ba42f85069794859910526e6e1b9c8e59 100644 --- a/paddle/fluid/framework/executor_cache.h +++ b/paddle/fluid/framework/executor_cache.h @@ -16,38 +16,78 @@ #include #include +#include #include #include +#include #include #include -#include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/op_proto_maker.h" +#include "paddle/fluid/framework/parallel_executor.h" #include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/platform/macros.h" #include "paddle/fluid/string/string_helper.h" namespace paddle { namespace framework { +namespace ir { +class Graph; +} -class ExecutionContext; -class Executor; -class ProgramDesc; -struct ExecutorPrepareContext; +namespace details { +void AppendSkipDeletionVars(const std::vector& append_vars, + std::vector* all_vars); +void ParseSafeEagerDeletionSkipVars( + const ProgramDesc& program, int64_t forward_op_nums, + const std::vector& output_var_names, + std::vector* skip_eager_delete_vars); + +} // namespace details class ExecutorInfoCache { public: - /* - * The ExecutorPrepareContext is different while running forward program and - * backward program. We add bool value into cached key to distinguish this. - */ - using KeyInfo = std::pair; + struct CacheKey { + CacheKey(const ProgramDesc* program_desc, const platform::Place& place, + int64_t start_op_index, int64_t end_op_index, bool is_grad) + : program_desc_(program_desc), + place_(place), + start_op_index_(start_op_index), + end_op_index_(end_op_index), + is_grad_(is_grad) { + device_type_ = platform::Place2DeviceType(place); + PADDLE_ENFORCE_NOT_NULL(program_desc_, + "program_desc should not be null."); + } + + std::string DebugString() const { + std::stringstream ss; + + ss << "\n CacheKey(program_desc: " << program_desc_; + ss << ", start_op_index: " << start_op_index_; + ss << ", end_op_index: " << end_op_index_; + ss << ", is_grad: " << is_grad_; + ss << ", device_type: " << device_type_ << ")"; + + return ss.str(); + } + + const ProgramDesc* program_desc_; + platform::Place place_; + int64_t start_op_index_; + int64_t end_op_index_; + bool is_grad_; + platform::DeviceType device_type_; + }; + using KeyType = size_t; + using ValueType = + std::pair, std::shared_ptr>; - struct HashPair { - size_t operator()(const KeyInfo& key) const noexcept { + struct KeyHasher { + size_t operator()(const CacheKey& key) const noexcept { size_t seed = 10; - auto* prog_desc = key.first; + auto* prog_desc = key.program_desc_; /* * Note(Aurelius84): DO NOT use only ProgramDesc* to calculate hash value * because a new program will hold same pointer address after an older @@ -59,8 +99,12 @@ class ExecutorInfoCache { hash_combine(&seed, &prog_desc->Block(i)); hash_combine(&seed, prog_desc->Block(i).OpSize()); } - hash_combine(&seed, key.second); - VLOG(1) << "hash value is : " << seed << " of pointer " << prog_desc; + hash_combine(&seed, static_cast(key.device_type_)); + hash_combine(&seed, key.start_op_index_); + hash_combine(&seed, key.end_op_index_); + hash_combine(&seed, key.is_grad_); + VLOG(3) << "hash value is : " << seed + << " of key: " << key.DebugString(); return seed; } @@ -73,54 +117,50 @@ class ExecutorInfoCache { static ExecutorInfoCache& Instance(); - std::shared_ptr Get( - const KeyInfo& key) const { - KeyType key_value = key_hash_func_(key); + ValueType GetMutable(const CacheKey& key) { + auto key_val = key_hash_func_(key); PADDLE_ENFORCE_EQ( - Has(key_value), true, - platform::errors::NotFound( - "(programDesc: %s, is_grad: %s) doesn't exist in ExecutorInfoCache", - key.first, key.second)); - return info_map_.at(key_value); + Has(key_val), true, + platform::errors::NotFound("%s doesn't exist in ExecutorInfoCache", + key.DebugString())); + return info_map_[key_val]; } - bool Has(const KeyInfo& key) const { - KeyType key_value = key_hash_func_(key); - return Has(key_value); + bool Has(const CacheKey& key) const { + auto key_val = key_hash_func_(key); + return Has(key_val); } bool Has(const KeyType& key) const { return info_map_.find(key) != info_map_.end(); } - void Insert(const KeyInfo& key, - std::shared_ptr exe_ctx) { - KeyType key_value = key_hash_func_(key); - PADDLE_ENFORCE_NE( - Has(key_value), true, - platform::errors::NotFound( - "(programDesc: %s, is_grad: %s) has existed in ExecutorInfoCache", - key.first, key.second)); - info_map_.insert({key_value, exe_ctx}); + void Insert(const CacheKey& key, ValueType value) { + auto key_val = key_hash_func_(key); + PADDLE_ENFORCE_EQ( + Has(key_val), false, + platform::errors::NotFound("%s has existed in ExecutorInfoCache", + key.DebugString())); + info_map_.insert({key_val, value}); } - private: - ExecutorInfoCache() = default; + size_t Size() const { return info_map_.size(); } - HashPair key_hash_func_; + void Finalize(); - // Note: we shall avoid using raw pointer as key but use hash code, - // beacause pointer doesn't hold resource indeed. - std::unordered_map> - info_map_; + private: + ExecutorInfoCache() = default; DISABLE_COPY_AND_ASSIGN(ExecutorInfoCache); + + KeyHasher key_hash_func_; + std::unordered_map info_map_; }; -std::shared_ptr GetExecutorInfoFromCache( - const framework::Executor& exe, const framework::ExecutionContext& ctx, - const std::vector>& ctx_output_names, - bool is_grad); +using CacheInfo = + std::pair, bool /*is_new_created*/>; + +CacheInfo GetExecutorInfoFromCache(const ExecutorInfoCache::CacheKey& cache_key, + framework::Scope* scope); } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/ir/graph.cc b/paddle/fluid/framework/ir/graph.cc index 706df467d35350621b57ab0dd001c2c3c8894268..e8a3de1a88a9d20b660006ca4a8b74b98640a697 100644 --- a/paddle/fluid/framework/ir/graph.cc +++ b/paddle/fluid/framework/ir/graph.cc @@ -21,14 +21,30 @@ namespace paddle { namespace framework { namespace ir { -Graph::Graph(const ProgramDesc &program) : program_(program) { - auto var_nodes = InitFromProgram(program_); +Graph::Graph(const ProgramDesc &program) + : Graph(program, 0, program.Block(0).AllOps().size()) {} + +Graph::Graph(const ProgramDesc &program, int64_t start_op_index, + int64_t end_op_index) + : program_(program) { + auto var_nodes = InitFromProgram(program_, start_op_index, end_op_index); ResolveHazard(var_nodes); } std::map> Graph::InitFromProgram( - const ProgramDesc &program) { + const ProgramDesc &program, int64_t start_op_index, int64_t end_op_index) { VLOG(3) << "block in program:" << program_.Size(); + PADDLE_ENFORCE_GE(start_op_index, 0, + platform::errors::InvalidArgument( + "Required start_op_index >= 0, but received " + "start_op_index = %d", + start_op_index)); + PADDLE_ENFORCE_GE(end_op_index, start_op_index, + platform::errors::InvalidArgument( + "Required end_op_index >= start_op_index, but received " + "end_op_index: %d < start_op_index: %d", + end_op_index, start_op_index)); + std::unordered_map all_vars; // var nodes for each var name, will have multiple versions in SSA std::map> var_nodes; @@ -37,8 +53,16 @@ std::map> Graph::InitFromProgram( } auto not_visited_vars = all_vars; - - for (auto *op : program.Block(0).AllOps()) { + auto all_ops = program.Block(0).AllOps(); + PADDLE_ENFORCE_LE( + end_op_index, all_ops.size(), + platform::errors::InvalidArgument( + "Required end_op_index <= %d, but received end_op_index = %d", + all_ops.size(), end_op_index)); + + for (auto i = start_op_index; i < end_op_index; ++i) { + auto *op = all_ops[i]; + VLOG(3) << "create OpNode by " << op->Type(); ir::Node *node = CreateOpNode(op); // For input args, reuse the same var name if it was created before. // Otherwise, create a new one. @@ -88,18 +112,28 @@ std::map> Graph::InitFromProgram( } } - for (auto &pair : not_visited_vars) { - const auto &var_name = pair.first; - auto *var_desc = pair.second; - if (var_name != kEmptyVarName) { - VLOG(10) << "Create isolated var node " << var_name; - var_nodes[var_name].push_back(CreateVarNode(var_desc)); + if (end_op_index < static_cast(all_ops.size()) || + start_op_index > 0) { + is_partial_ = true; + } + if (!is_partial_) { + for (auto &pair : not_visited_vars) { + const auto &var_name = pair.first; + auto *var_desc = pair.second; + if (var_name != kEmptyVarName) { + VLOG(10) << "Create isolated var node " << var_name; + var_nodes[var_name].push_back(CreateVarNode(var_desc)); + } } } Set>( details::kStaleProgramOpDescs, - new std::vector(program.Block(0).AllOps())); + new std::vector(all_ops.begin() + start_op_index, + all_ops.begin() + end_op_index)); + VLOG(3) + << "kStaleProgramOpDescs.size: " + << Get>(details::kStaleProgramOpDescs).size(); return var_nodes; } diff --git a/paddle/fluid/framework/ir/graph.h b/paddle/fluid/framework/ir/graph.h index 593ac214e56f9722538c89ff80ec9d6b98fcd8ae..26ca64ba821cb510b3156a95d24e55c9b73c64f5 100644 --- a/paddle/fluid/framework/ir/graph.h +++ b/paddle/fluid/framework/ir/graph.h @@ -79,6 +79,9 @@ namespace ir { class Graph { public: explicit Graph(const ProgramDesc &program); + // Construct a Graph with ops[start_op_index, end_op_index) + explicit Graph(const ProgramDesc &program, int64_t start_op_index, + int64_t end_op_index); virtual ~Graph() { for (auto &attr : attrs_) { @@ -88,6 +91,8 @@ class Graph { attr_dels_.clear(); } + bool IsConstructedByPartialProgram() const { return is_partial_; } + bool Has(const std::string &attr_name) const { return attrs_.count(attr_name) > 0; } @@ -253,7 +258,7 @@ class Graph { private: std::map> InitFromProgram( - const ProgramDesc &program); + const ProgramDesc &program, int64_t start_op_index, int64_t end_op_index); // NOTE: program_ shouldn't be exposed to user. const ProgramDesc program_; @@ -262,6 +267,11 @@ class Graph { std::map> nodes_; std::unordered_set node_set_; size_t num_node_created_{0}; // help to generate a unique node id. + // NOTE(Aurelius84): Whether is constructed with partial ProgramDesc. + // In case of @to_static, whole trainning program is splited into two + // parts: forward graph and backward graph, which can be executed + // independently. + bool is_partial_{false}; }; bool IsControlDepVar(const ir::Node &var); diff --git a/paddle/fluid/framework/ir/memory_optimize_pass/conditional_block_op_eager_deletion_pass.cc b/paddle/fluid/framework/ir/memory_optimize_pass/conditional_block_op_eager_deletion_pass.cc index 69098cb3e6fc4dde6db0b7ce7f6efdd747d1a713..873f9ceaa7abf73bf2d07719162d09108fbc440b 100644 --- a/paddle/fluid/framework/ir/memory_optimize_pass/conditional_block_op_eager_deletion_pass.cc +++ b/paddle/fluid/framework/ir/memory_optimize_pass/conditional_block_op_eager_deletion_pass.cc @@ -20,15 +20,15 @@ namespace paddle { namespace framework { namespace ir { - +using OpVariant = operators::OpVariant; class ConditionalOpEagerDeletionPass : public Pass { protected: void ApplyImpl(Graph *graph) const override { auto all_ops = ir::FilterByNodeWrapper(*graph); // Find all conditional_op and conditional_grad_op - std::unordered_map, - std::vector>> + std::unordered_map< + size_t, std::pair, std::vector>> target_ops; for (auto *op : all_ops) { auto compute_op = dynamic_cast(op); @@ -43,6 +43,30 @@ class ConditionalOpEagerDeletionPass : public Pass { } } + // NOTE(Aurelius84): In case of @to_static, after we finish executing + // forward graph, some necessaray variable in step_scope of controlflow_op + // should be kept for backward graph. + if (graph->IsConstructedByPartialProgram()) { + PADDLE_ENFORCE_LE(target_ops.size(), 1, + platform::errors::InvalidArgument( + "Unsupported multi devices if graph is constructed " + "with partial program.")); + size_t scope_idx = 0; + auto &ifelse_ops = target_ops[scope_idx].first; + auto &ifelse_grad_ops = target_ops[scope_idx].second; + + auto all_ops = graph->OriginProgram().Block(0).AllOps(); + if (ifelse_ops.empty()) { + operators::AppendOpVariantByOpName( + all_ops, std::string("conditional_block"), &ifelse_ops); + } else if (ifelse_grad_ops.empty()) { + operators::AppendOpVariantByOpName( + all_ops, std::string("conditional_block_grad"), &ifelse_grad_ops); + } else { + PADDLE_THROW("One of ifelse_ops or ifelse_grad_ops should be empty."); + } + } + for (auto &ops_pair : target_ops) { auto &ifelse_ops = ops_pair.second.first; auto &ifelse_grad_ops = ops_pair.second.second; diff --git a/paddle/fluid/framework/ir/memory_optimize_pass/while_op_eager_deletion_pass.cc b/paddle/fluid/framework/ir/memory_optimize_pass/while_op_eager_deletion_pass.cc index da0da4c7125953d386fbd4d14bc2607837616cc3..6755cf0b2757eb776d95b12d8567be4b646a7ea6 100644 --- a/paddle/fluid/framework/ir/memory_optimize_pass/while_op_eager_deletion_pass.cc +++ b/paddle/fluid/framework/ir/memory_optimize_pass/while_op_eager_deletion_pass.cc @@ -15,20 +15,24 @@ #include "paddle/fluid/framework/details/computation_op_handle.h" #include "paddle/fluid/framework/details/multi_devices_helper.h" #include "paddle/fluid/framework/ir/graph_helper.h" +#include "paddle/fluid/operators/controlflow/op_variant.h" #include "paddle/fluid/operators/controlflow/while_op_helper.h" namespace paddle { namespace framework { namespace ir { +using OpVariant = operators::OpVariant; class WhileOpEagerDeletionPass : public ir::Pass { protected: void ApplyImpl(ir::Graph *graph) const override { auto all_ops = ir::FilterByNodeWrapper(*graph); - // Find all while_op and while_grad_op - std::unordered_map, - std::vector>> + // Find all while_op and while_grad_op. In case of @to_static, graph + // may be constructed only by forward or backward program, so we use + // OpVariant here instead of OperatorBase. + std::unordered_map< + size_t, std::pair, std::vector>> target_ops; for (auto *op : all_ops) { auto compute_op = dynamic_cast(op); @@ -42,6 +46,27 @@ class WhileOpEagerDeletionPass : public ir::Pass { compute_op->GetOp()); } } + if (graph->IsConstructedByPartialProgram()) { + PADDLE_ENFORCE_LE( + target_ops.size(), 1, + platform::errors::InvalidArgument( + "Unsupported multi device if graph is constructed by " + "partial program.")); + size_t scope_idx = 0; + auto &while_ops = target_ops[scope_idx].first; + auto &while_grad_ops = target_ops[scope_idx].second; + + auto all_ops = graph->OriginProgram().Block(0).AllOps(); + if (while_ops.empty()) { + operators::AppendOpVariantByOpName(all_ops, std::string("while"), + &while_ops); + } else if (while_grad_ops.empty()) { + operators::AppendOpVariantByOpName(all_ops, std::string("while_grad"), + &while_grad_ops); + } else { + PADDLE_THROW("One of while_ops or while_grad_ops should be empty."); + } + } for (auto &ops_pair : target_ops) { auto &while_ops = ops_pair.second.first; diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index eb021609e825839825b657ef516a18c5b4cbcc74..badabce7b34470aefb4b11f3f3bfa5bed84f2dc2 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -33,6 +33,7 @@ limitations under the License. */ #include "paddle/fluid/framework/ir/memory_optimize_pass/memory_optimization_var_info.h" #include "paddle/fluid/framework/ir/memory_optimize_pass/reference_count_pass_helper.h" #include "paddle/fluid/framework/ir/multi_devices_graph_pass/set_reader_device_info_utils.h" +#include "paddle/fluid/framework/variable_helper.h" #include "paddle/fluid/platform/event.h" #include "paddle/fluid/platform/profiler.h" @@ -684,6 +685,51 @@ ParallelExecutor::ParallelExecutor(const std::vector &places, SetReaderOpDeviceInfoOfGraphs(final_graphs); } +ParallelExecutor::ParallelExecutor(const platform::Place &place, Scope *scope, + const ExecutionStrategy &exec_strategy, + const BuildStrategy &build_strategy, + ir::Graph *graph) + : member_(new ParallelExecutorPrivate({place}, scope)) { + // Initialize necessary info of member_ with strategy. + InitExecutorPrivateMemberInfo(exec_strategy, build_strategy, + /*device_count=*/1, *graph); + + CreateLocalScopes(scope, /*local_scope=*/{scope}, /*create_new=*/false); + + // Apply BuildStrategy to compile graph. + std::vector graphs = {graph}; + std::vector async_graphs = + CompileGraphWithBuildStrategy(graph, &graphs, /*loss_var_name=*/""); + + graph = member_->ApplyMemoryOptimizePass(graph); + + // Create vars in each scope. Passes may also create new vars. + // skip control vars and empty vars + CreateVariableInfos(&var_infos_, graph); + + // Create local execution scopes + std::unordered_map scope_map = + CreateLocalExecScopes(member_->local_scopes_, /*create_new=*/false); + + std::vector final_graphs = + CreateSSAGraphExecutor(exec_strategy, &async_graphs, graph); + + // Set scope_map of op from each graph + ResetOpHandleScopeMapOfGraphs(final_graphs, scope_map); +} + +void ParallelExecutor::PrepareVariables(Scope *scope) { + for (auto &info : var_infos_) { + auto var = scope->FindVar(info.name_); + if (var != nullptr) { + VLOG(2) << info.name_ + << " has been initialized beforehand in global scope, skipped."; + continue; + } + framework::InitializeVariable(scope->Var(info.name_), info.type_); + } +} + void ParallelExecutor::BCastParamsToDevices( const std::vector &vars, int trainer_id) const { VLOG(3) << "BCastParamsToDevices"; @@ -845,6 +891,36 @@ FetchResultType ParallelExecutor::Run( return fetch_data; } +void ParallelExecutor::RunWithoutFetch( + const std::vector &skip_eager_vars) { + VLOG(3) << "enter ParallelExecutor RunWithoutFetch"; +#ifdef WITH_GPERFTOOLS + if (gProfileStarted) { + ProfilerFlush(); + } +#endif + platform::RecordBlock b(0); + + ResetHasFeedGuard reset_has_feed_guard(member_); + + ir::SkipMemOptVarsGuard guard(&(member_->mem_opt_var_infos_), skip_eager_vars, + member_->HasGarbageCollectors()); + + VLOG(3) << "ParallelExecutor begin to run member_->executor_->Run"; + member_->executor_->Run(/*fetch_tensors*/ {}, /*return_merged*/ false); +} + +void ParallelExecutor::SkipMemoryReuse( + size_t scope_idx, const std::vector &skip_vars) { + for (auto &var_name : skip_vars) { + bool is_persistable = member_->IsPersistable(var_name); + if (!is_persistable) { + VLOG(3) << "SkipMemoryReuse for var: " << var_name; + member_->SetSkipMemoryReuse(scope_idx, var_name); + } + } +} + void ParallelExecutor::FeedTensorsIntoLocalScopes( const std::vector> &tensors) { if (!member_->AllowPartialFeed()) { @@ -1449,10 +1525,18 @@ void ParallelExecutor::ResetOpHandleScopeMapOfGraphs( auto ops = ir::FilterByNodeWrapper(*g); for (auto *op : ops) { op->SetLocalExecScopes(scope_map); + op->SetIsVariantScope(true); } } } +void ParallelExecutor::ResetOpHandleScopeMapOfGraphs( + const std::unordered_map &scope_map) { + auto inner_graph = const_cast(&Graph()); + std::vector graphs = {inner_graph}; + ResetOpHandleScopeMapOfGraphs(graphs, scope_map); +} + void ParallelExecutor::SetReaderOpDeviceInfoOfGraphs( const std::vector &final_graphs) { if (final_graphs.size() == 1) { diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index d4d0b534b55f05f1e4145064eb55e7858f25185d..6c871a8d858156ecc72532eb65971852058f4b28 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -60,6 +60,12 @@ class ParallelExecutor { const BuildStrategy &build_strategy, ir::Graph *graph); + // NOTE(Aurelius84): Construct a PE running on single device for @to_static + explicit ParallelExecutor(const platform::Place &place, Scope *scope, + const ExecutionStrategy &exec_strategy, + const BuildStrategy &build_strategy, + ir::Graph *graph); + ~ParallelExecutor(); size_t DeviceCount() const; @@ -84,7 +90,16 @@ class ParallelExecutor { FetchResultType Run(const std::vector &fetch_tensors, bool return_merged = true); + void RunWithoutFetch(const std::vector &skip_eager_vars); + + void ResetOpHandleScopeMapOfGraphs( + const std::unordered_map &scope_map); + const ir::Graph &Graph() const; + void PrepareVariables(Scope *scope); + + void SkipMemoryReuse(size_t scope_idx, + const std::vector &skip_vars); private: // broadcast the parameters from the 0th device. @@ -131,6 +146,7 @@ class ParallelExecutor { ParallelExecutorPrivate *member_; std::vector> async_graphs_; + std::vector var_infos_; }; } // namespace framework } // namespace paddle diff --git a/paddle/fluid/operators/controlflow/conditional_block_op_helper.cc b/paddle/fluid/operators/controlflow/conditional_block_op_helper.cc index 9f29955ea7de453a9934c22920129ce4df7e19d2..8dea0a1d95320f29e5d1d1380c0100698facaba0 100644 --- a/paddle/fluid/operators/controlflow/conditional_block_op_helper.cc +++ b/paddle/fluid/operators/controlflow/conditional_block_op_helper.cc @@ -16,8 +16,6 @@ #include -#include "paddle/fluid/operators/controlflow/op_variant.h" - namespace paddle { namespace framework { class ProgramDesc; @@ -189,18 +187,10 @@ void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp( void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp( const framework::ProgramDesc &program, - const std::vector &ifelse_ops, - const std::vector &ifelse_grad_ops) { - std::vector fwd_ops, bwd_ops; - fwd_ops.reserve(ifelse_ops.size()); - for (auto *op : ifelse_ops) { - fwd_ops.emplace_back(op); - } - - bwd_ops.reserve(ifelse_grad_ops.size()); - for (auto *op : ifelse_grad_ops) { - bwd_ops.emplace_back(op); - } + const std::vector &ifelse_ops, + const std::vector &ifelse_grad_ops) { + std::vector fwd_ops = ifelse_ops; + std::vector bwd_ops = ifelse_grad_ops; PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOpImpl( program, &fwd_ops, &bwd_ops); diff --git a/paddle/fluid/operators/controlflow/conditional_block_op_helper.h b/paddle/fluid/operators/controlflow/conditional_block_op_helper.h index 7ce63aa9cbbfaaa4adb7834dd33e24cb6491a7a9..afefc96d6749e0c01f7417ead5ba1f7ce41f00ed 100644 --- a/paddle/fluid/operators/controlflow/conditional_block_op_helper.h +++ b/paddle/fluid/operators/controlflow/conditional_block_op_helper.h @@ -19,6 +19,7 @@ #include "paddle/fluid/framework/operator.h" #include "paddle/fluid/operators/controlflow/conditional_block_op.h" +#include "paddle/fluid/operators/controlflow/op_variant.h" #include "paddle/fluid/string/string_helper.h" namespace paddle { @@ -40,8 +41,8 @@ void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp( void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp( const framework::ProgramDesc &program, - const std::vector &ifelse_ops, - const std::vector &ifelse_grad_ops); + const std::vector &ifelse_ops, + const std::vector &ifelse_grad_ops); } // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/controlflow/op_variant.cc b/paddle/fluid/operators/controlflow/op_variant.cc index d6eea8c4c8d4b0d006185f3c0515f21f57c0e80d..895360441219b799437ede504a4934e541d51f55 100644 --- a/paddle/fluid/operators/controlflow/op_variant.cc +++ b/paddle/fluid/operators/controlflow/op_variant.cc @@ -68,5 +68,20 @@ const void *OpVariant::RawPointer() const { return boost::apply_visitor(RawPointerVisitor(), op_); } +void AppendOpVariantByOpName(const std::vector &op_descs, + const std::string &candidate_op_name, + std::vector *result_ops) { + PADDLE_ENFORCE_NOT_NULL( + result_ops, + platform::errors::Unavailable("result_ops should not be a null_ptr.")); + for (auto *op_desc : op_descs) { + PADDLE_ENFORCE_NOT_NULL(op_desc, platform::errors::Unavailable( + "op_desc should not be a null_ptr.")); + if (op_desc->Type() == candidate_op_name) { + result_ops->emplace_back(op_desc); + } + } +} + } // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/controlflow/op_variant.h b/paddle/fluid/operators/controlflow/op_variant.h index 28c27437de12e37eaa94cef677fe3c9fc292c7c7..cc1f36a875f774892b38d203e4fd74d8b1626e76 100644 --- a/paddle/fluid/operators/controlflow/op_variant.h +++ b/paddle/fluid/operators/controlflow/op_variant.h @@ -73,5 +73,9 @@ class OpVariant { op_; }; +void AppendOpVariantByOpName(const std::vector &op_descs, + const std::string &candidate_op_name, + std::vector *result_ops); + } // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/controlflow/while_op_helper.cc b/paddle/fluid/operators/controlflow/while_op_helper.cc index c9d4e1510985fbb7025006159705842513a4c54e..5c94c0827100b39ee2d275073808fc5b4bbfc717 100644 --- a/paddle/fluid/operators/controlflow/while_op_helper.cc +++ b/paddle/fluid/operators/controlflow/while_op_helper.cc @@ -15,7 +15,6 @@ #include "paddle/fluid/operators/controlflow/while_op_helper.h" #include -#include "paddle/fluid/operators/controlflow/op_variant.h" #include "paddle/fluid/string/string_helper.h" namespace paddle { @@ -199,18 +198,10 @@ void PrepareSafeEagerDeletionOnWhileOpAndWhileGradOp( void PrepareSafeEagerDeletionOnWhileOpAndWhileGradOp( const framework::ProgramDesc &program, - const std::vector &while_ops, - const std::vector &while_grad_ops) { - std::vector fwd_ops, bwd_ops; - fwd_ops.reserve(while_ops.size()); - for (auto *op : while_ops) { - fwd_ops.emplace_back(op); - } - - bwd_ops.reserve(while_grad_ops.size()); - for (auto *op : while_grad_ops) { - bwd_ops.emplace_back(op); - } + const std::vector &while_ops, + const std::vector &while_grad_ops) { + std::vector fwd_ops = while_ops; + std::vector bwd_ops = while_grad_ops; PrepareSafeEagerDeletionOnWhileOpAndWhileGradOpImpl(program, &fwd_ops, &bwd_ops); diff --git a/paddle/fluid/operators/controlflow/while_op_helper.h b/paddle/fluid/operators/controlflow/while_op_helper.h index 8b4a14570b1ef522fd3269c942d1842c603b9553..1685da4e958220fb5cbadcbc8a9796bc42534e21 100644 --- a/paddle/fluid/operators/controlflow/while_op_helper.h +++ b/paddle/fluid/operators/controlflow/while_op_helper.h @@ -19,6 +19,7 @@ #include #include "paddle/fluid/framework/operator.h" +#include "paddle/fluid/operators/controlflow/op_variant.h" #include "paddle/fluid/platform/variant.h" namespace paddle { @@ -46,8 +47,8 @@ void PrepareSafeEagerDeletionOnWhileOpAndWhileGradOp( void PrepareSafeEagerDeletionOnWhileOpAndWhileGradOp( const framework::ProgramDesc &program, - const std::vector &while_ops, - const std::vector &while_grad_ops); + const std::vector &while_ops, + const std::vector &while_grad_ops); bool GetCondData(const framework::LoDTensor &cond); diff --git a/paddle/fluid/operators/run_program_op.h b/paddle/fluid/operators/run_program_op.h index c7aeb0e145e4cb704c56dabb2f090e63ecb280a7..69345dc8a08206b1b402886aafd8aefec261832a 100644 --- a/paddle/fluid/operators/run_program_op.h +++ b/paddle/fluid/operators/run_program_op.h @@ -23,7 +23,6 @@ limitations under the License. */ #include #include -#include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/executor_cache.h" #include "paddle/fluid/framework/op_desc.h" #include "paddle/fluid/framework/op_registry.h" @@ -43,6 +42,7 @@ namespace operators { using StepScopeVar = std::vector; using BlockDesc = framework::BlockDesc; +using ProgramDesc = framework::ProgramDesc; using Variable = framework::Variable; using LoDTensor = framework::LoDTensor; @@ -198,9 +198,6 @@ class RunProgramOpKernel : public framework::OpKernel { "The OutScope of RunProgramGradOp should only hold one scope.")); // Step 2. prepare executor and init persistable variables - framework::Executor exe(ctx.GetPlace()); - auto exe_ctx = framework::GetExecutorInfoFromCache( - exe, ctx, {output_var_names, dout_var_names}, /*is_grad=*/false); // NOTE(Aurelius84): While training some models, forward can be called many // times and then apply backpropagation all at once, such as Reinforcement @@ -216,12 +213,27 @@ class RunProgramOpKernel : public framework::OpKernel { details::ShareVarsIntoScope(input_vars, input_var_names, &scope); details::ShareVarsIntoScope(param_vars, param_names, &scope); - // Step 3. run ops - exe.RunPartialPreparedContext(exe_ctx.get(), &scope, start_op_index, - end_op_index, /*create_local_scope=*/false, - /*create_vars=*/true, - /*keep_kids=*/!is_test); - + if (end_op_index > start_op_index) { + auto *program = ctx.Attr("global_block")->Program(); + auto cache_key = framework::ExecutorInfoCache::CacheKey( + program, ctx.GetPlace(), start_op_index, end_op_index, + /*is_grad=*/false); + auto cache_info = framework::GetExecutorInfoFromCache(cache_key, &scope); + auto ¶llel_executor = cache_info.first; + if (cache_info.second /*is_new_created*/) { + parallel_executor->SkipMemoryReuse(/*scope_idx=*/0, input_var_names); + } + + // Step 3. run ops + // all out_vars are skip_eager_var + std::vector skip_eager_delete_vars(output_var_names); + skip_eager_delete_vars.insert(skip_eager_delete_vars.end(), + dout_var_names.begin(), + dout_var_names.end()); + framework::details::ParseSafeEagerDeletionSkipVars( + *program, end_op_index, output_var_names, &skip_eager_delete_vars); + parallel_executor->RunWithoutFetch(skip_eager_delete_vars); + } // Step 4. Get Output details::ShareVarsFromScope(output_vars, output_var_names, &scope); details::ShareVarsFromScope(dout_vars, dout_var_names, &scope); @@ -290,21 +302,31 @@ class RunProgramGradOpKernel : public framework::OpKernel { auto &scope = *(global_inner_scope->kids().front()); - // Step 2. prepare executor and scope - framework::Executor exe(ctx.GetPlace()); - auto exe_ctx = framework::GetExecutorInfoFromCache( - exe, ctx, {input_grad_var_names, param_grad_names}, - /*is_grad=*/true); - - details::ShareVarsIntoScope(output_grad_vars, output_grad_var_names, - &scope); - // Debug info: scope info when run end - VLOG(3) << framework::GenScopeTreeDebugInfo(out_scope_vec->front()); - - // Step 3. run ops - exe.RunPartialPreparedContext(exe_ctx.get(), &scope, start_op_index, - end_op_index, /*create_local_scope=*/false, - /*create_vars=*/true, /*keep_kids=*/false); + if (end_op_index > start_op_index) { + // Step 2. prepare executor and scope + auto *program = ctx.Attr("global_block")->Program(); + auto cache_key = framework::ExecutorInfoCache::CacheKey( + program, ctx.GetPlace(), start_op_index, end_op_index, + /*is_grad*/ true); + auto cache_info = framework::GetExecutorInfoFromCache(cache_key, &scope); + auto ¶llel_executor = cache_info.first; + + parallel_executor->SkipMemoryReuse(/*scope_idx=*/0, + output_grad_var_names); + + details::ShareVarsIntoScope(output_grad_vars, output_grad_var_names, + &scope); + // Debug info: scope info when run end + VLOG(3) << framework::GenScopeTreeDebugInfo(out_scope_vec->front()); + + std::vector skip_eager_delete_vars(input_grad_var_names); + framework::details::AppendSkipDeletionVars(param_grad_names, + &skip_eager_delete_vars); + + // Step 3. run ops + parallel_executor->RunWithoutFetch( + /*skip_eager_delete_vars=*/skip_eager_delete_vars); + } // Step 4. get outputs details::ShareVarsFromScope(input_grad_vars, input_grad_var_names, &scope); diff --git a/paddle/fluid/platform/device_context.cc b/paddle/fluid/platform/device_context.cc index 1179677fd6b9f57152cf7821f6fd088b8945c129..82f14c612d1fa2bd303822fc1f787ea8863dcc30 100644 --- a/paddle/fluid/platform/device_context.cc +++ b/paddle/fluid/platform/device_context.cc @@ -75,6 +75,19 @@ void SetAllowTF32Cudnn(bool active) { allow_tf32_cudnn = active; } bool AllowTF32Cudnn() { return allow_tf32_cudnn; } #endif // PADDLE_WITH_CUDA +DeviceType Place2DeviceType(const platform::Place& place) { + if (platform::is_cpu_place(place)) { + return platform::DeviceType::CPU; + } else if (platform::is_gpu_place(place)) { + return platform::DeviceType::CUDA; + } else if (platform::is_xpu_place(place)) { + return platform::DeviceType::XPU; + } else { + PADDLE_THROW(platform::errors::Unavailable( + "Unsupported place %s to convert into platform::DeviceType.", place)); + } +} + DeviceContextPool* DeviceContextPool::pool = nullptr; platform::DeviceContext* DeviceContextPool::Get(const platform::Place& place) { diff --git a/paddle/fluid/platform/device_context.h b/paddle/fluid/platform/device_context.h index e2dbc90b5d1444b7f27ac00439a769ee3165a911..68589f546dc78773df715161c488b3eb0d50fc5d 100644 --- a/paddle/fluid/platform/device_context.h +++ b/paddle/fluid/platform/device_context.h @@ -99,6 +99,8 @@ enum DeviceType { NPU = 3, }; +DeviceType Place2DeviceType(const platform::Place& place); + constexpr DeviceType kCPU = DeviceType::CPU; constexpr DeviceType kCUDA = DeviceType::CUDA; constexpr DeviceType kXPU = DeviceType::XPU; diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 4a43e51e7cabcfe76418f7187f755bb0bce5455d..e701d3cc16d7236210a00f7ce72977b7b1f558ad 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -31,6 +31,7 @@ limitations under the License. */ #include "paddle/fluid/framework/custom_operator.h" #include "paddle/fluid/framework/data_layout.h" #include "paddle/fluid/framework/executor.h" +#include "paddle/fluid/framework/executor_cache.h" #include "paddle/fluid/framework/executor_gc_helper.h" #include "paddle/fluid/framework/feed_fetch_method.h" #include "paddle/fluid/framework/feed_fetch_type.h" @@ -2216,6 +2217,8 @@ All parameter, weight, gradient are variables in Paddle. m.def("set_cudnn_switch", platform::SetAllowTF32Cudnn); m.def("get_cudnn_switch", platform::AllowTF32Cudnn); #endif // PADDLE_WITH_CUDA + m.def("clear_executor_cache", + []() { framework::ExecutorInfoCache::Instance().Finalize(); }); using VarQuantScale = std::unordered_map>; diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index fb1be483083a8c275d5da989ddf5d0271a76dd68..b8f74048320a38d370ba6f828f8c9ca2177f9e55 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -262,3 +262,5 @@ monkey_patch_varbase() # do some clean up manually. if core.is_compiled_with_npu(): atexit.register(core.npu_finalize) +# NOTE(Aurelius84): clean up ExecutorCacheInfo in advance manually. +atexit.register(core.clear_executor_cache) diff --git a/python/paddle/fluid/tests/unittests/dygraph_to_static/CMakeLists.txt b/python/paddle/fluid/tests/unittests/dygraph_to_static/CMakeLists.txt index 1bf762ab1a10b6df077cdbd067e8bc81a8ad2345..18f2ffc2382a933fdc4cc9120e9567a3fa7cf2cd 100644 --- a/python/paddle/fluid/tests/unittests/dygraph_to_static/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/dygraph_to_static/CMakeLists.txt @@ -1,6 +1,14 @@ file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py") string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}") +list(REMOVE_ITEM TEST_OPS test_lac) +# NOTE(Aurelius84): In case of Windows CI, if open ON_INFER, RWLOCK of Scope will +# be removed and will cause some random failed in multi-thread. +if(NOT ON_INFER) + py_test_modules(test_lac MODULES test_lac) + set_tests_properties(test_lac PROPERTIES TIMEOUT 120) +endif() + foreach(TEST_OP ${TEST_OPS}) py_test_modules(${TEST_OP} MODULES ${TEST_OP}) endforeach(TEST_OP) @@ -10,7 +18,6 @@ set_tests_properties(test_yolov3 PROPERTIES TIMEOUT 900 LABELS "RUN_TYPE=EXCLUSI set_tests_properties(test_mobile_net PROPERTIES TIMEOUT 120) set_tests_properties(test_seq2seq PROPERTIES TIMEOUT 120) set_tests_properties(test_cycle_gan PROPERTIES TIMEOUT 120) -set_tests_properties(test_lac PROPERTIES TIMEOUT 120) set_tests_properties(test_bert PROPERTIES TIMEOUT 120) set_tests_properties(test_basic_api_transformation PROPERTIES TIMEOUT 120) set_tests_properties(test_reinforcement_learning PROPERTIES TIMEOUT 120) diff --git a/python/paddle/fluid/tests/unittests/dygraph_to_static/test_lac.py b/python/paddle/fluid/tests/unittests/dygraph_to_static/test_lac.py index 68f86c4702c0d64bdcc2c04ddee7ec0bdd9770ab..ab6a82808eb6fc34d4dd23acea93be83123dbac0 100644 --- a/python/paddle/fluid/tests/unittests/dygraph_to_static/test_lac.py +++ b/python/paddle/fluid/tests/unittests/dygraph_to_static/test_lac.py @@ -31,6 +31,12 @@ from paddle.fluid.dygraph.io import INFER_MODEL_SUFFIX, INFER_PARAMS_SUFFIX SEED = 2020 program_translator = ProgramTranslator() +# Add InputSpec to make unittest run faster. +input_specs = [ + paddle.static.InputSpec([None, None], 'int64'), + paddle.static.InputSpec([None, None], 'int64'), + paddle.static.InputSpec([None], 'int64') +] class DynamicGRU(fluid.dygraph.Layer): @@ -354,7 +360,7 @@ class LexNet(fluid.dygraph.Layer): # share weight self.crf_decoding.weight = self.linear_chain_crf.weight - @declarative + @declarative(input_spec=input_specs) def forward(self, word, target, length=None): """ Configure the network @@ -494,7 +500,7 @@ def do_train(args, to_static): fluid.dygraph.jit.save( layer=model, path=args.model_save_prefix, - input_spec=[words, length], + input_spec=[input_specs[0], input_specs[-1]], output_spec=[crf_decode]) else: fluid.dygraph.save_dygraph(model.state_dict(), args.dy_param_path) diff --git a/python/paddle/fluid/tests/unittests/dygraph_to_static/test_resnet.py b/python/paddle/fluid/tests/unittests/dygraph_to_static/test_resnet.py index dcc323d0644be2d43711a7c708fa5ac04b914b66..f06441e1fdf74bf12cf88ba7ff64f4bded0dfc40 100644 --- a/python/paddle/fluid/tests/unittests/dygraph_to_static/test_resnet.py +++ b/python/paddle/fluid/tests/unittests/dygraph_to_static/test_resnet.py @@ -358,7 +358,8 @@ class TestResnet(unittest.TestCase): def test_in_static_mode_mkldnn(self): fluid.set_flags({'FLAGS_use_mkldnn': True}) try: - train(to_static=True) + if paddle.fluid.core.is_compiled_with_mkldnn(): + train(to_static=True) finally: fluid.set_flags({'FLAGS_use_mkldnn': False}) diff --git a/python/paddle/fluid/tests/unittests/dygraph_to_static/test_resnet_v2.py b/python/paddle/fluid/tests/unittests/dygraph_to_static/test_resnet_v2.py index 10346ab0cc442eb87184171ccd4f2b403f309f45..ae7a58857905916d07750a10e0012a3870283ba4 100644 --- a/python/paddle/fluid/tests/unittests/dygraph_to_static/test_resnet_v2.py +++ b/python/paddle/fluid/tests/unittests/dygraph_to_static/test_resnet_v2.py @@ -358,7 +358,8 @@ class TestResnet(unittest.TestCase): def test_in_static_mode_mkldnn(self): paddle.fluid.set_flags({'FLAGS_use_mkldnn': True}) try: - train(to_static=True) + if paddle.fluid.core.is_compiled_with_mkldnn(): + train(to_static=True) finally: paddle.fluid.set_flags({'FLAGS_use_mkldnn': False}) diff --git a/tools/windows/run_unittests.sh b/tools/windows/run_unittests.sh index 4dbacbaa59a5da3f8025f7dd8ace1dfd46519c04..f9a4c7fc86f2980446a7ec6a034200b0fc84d5f8 100644 --- a/tools/windows/run_unittests.sh +++ b/tools/windows/run_unittests.sh @@ -79,7 +79,6 @@ disable_wingpu_test="^test_model$|\ ^test_fuse_bn_add_act_pass$|\ ^disable_wingpu_test$" - # /*============================================================================*/ # /*==================Fixed Disabled Windows CPU OPENBLAS unittests==============================*/