diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index 69978a0b906867a923caf48a73e431fdcbe689f5..2ea89df818c5d80c45d297560aa41e9ebcf6f753 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -268,6 +268,7 @@ cc_library(parallel_executor SRCS parallel_executor.cc DEPS graph build_strategy collective_helper fast_threaded_ssa_graph_executor variable_helper) +cc_library(executor_cache SRCS executor_cache.cc DEPS executor) cc_test(dist_multi_trainer_test SRCS dist_multi_trainer_test.cc DEPS conditional_block_op executor) cc_library(prune SRCS prune.cc DEPS framework_proto boost) diff --git a/paddle/fluid/framework/executor_cache.cc b/paddle/fluid/framework/executor_cache.cc new file mode 100644 index 0000000000000000000000000000000000000000..4e32520e07b06bdb8da76d449ce77db306cd4094 --- /dev/null +++ b/paddle/fluid/framework/executor_cache.cc @@ -0,0 +1,111 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/executor_cache.h" + +#include +#include +#include + +namespace paddle { +namespace framework { + +namespace details { + +static void AppendSkipDeletionVars(const std::vector &append_vars, + std::vector *all_vars) { + for (auto &var : append_vars) { + all_vars->emplace_back(var); + } +} + +static void AppendSafeEagerDeletionSkipVars( + const framework::ProgramDesc &program, + std::vector *skip_vars) { + const framework::BlockDesc &block = program.Block(0); + const std::vector &all_ops = block.AllOps(); + + std::unordered_set grad_op_output; + std::unordered_set grad_op_input; + for (const framework::OpDesc *op : all_ops) { + int op_role = BOOST_GET_CONST( + int, op->GetAttr(framework::OpProtoAndCheckerMaker::OpRoleAttrName())); + if ((op_role & static_cast(framework::OpRole::kBackward)) == 0) { + continue; + } + + for (const std::string &in_arg_name : op->InputArgumentNames()) { + grad_op_input.emplace(in_arg_name); + } + for (const std::string &out_arg_name : op->OutputArgumentNames()) { + grad_op_output.emplace(out_arg_name); + } + } + + // For the grad op input variables, if it is not output of grad_op, it may + // be output of forward op and we should set the variables as skip_var to + // prevent it being deleted when grad op is called multiple times. + for (const std::string &var_name : grad_op_input) { + if (grad_op_output.find(var_name) == grad_op_output.end()) { + skip_vars->emplace_back(var_name); + } + } +} +} // namespace details + +// C++11 removes the need for manual locking. Concurrent execution shall wait if +// a static local variable is already being initialized. +// https://stackoverflow.com/questions/11711920/how-to-implement-multithread-safe-singleton-in-c11-without-using-mutex +ExecutorInfoCache &ExecutorInfoCache::Instance() { + static ExecutorInfoCache g_exe_cache_info_map; + return g_exe_cache_info_map; +} + +std::shared_ptr GetExecutorInfoFromCache( + const framework::Executor &exe, const framework::ExecutionContext &ctx, + const std::vector> &ctx_output_names, + bool is_grad) { + auto *program = ctx.Attr("global_block")->Program(); + + auto &cached_exe_info = framework::ExecutorInfoCache::Instance(); + auto cache_key = framework::ExecutorInfoCache::KeyType(program, is_grad); + + if (!cached_exe_info.Has(cache_key)) { + VLOG(1) << "create exe_info for program: " << program + << " is_grad: " << is_grad; + // skip delete vars + std::vector skip_vars; + for (auto &output_names : ctx_output_names) { + details::AppendSkipDeletionVars(output_names, &skip_vars); + } + if (is_grad) { + details::AppendSafeEagerDeletionSkipVars(*program, &skip_vars); + } + + VLOG(2) << "Prepare to skip " << skip_vars.size() + << " var(s): " << string::join_strings(skip_vars, ' '); + std::shared_ptr exe_ctx = + std::move(exe.Prepare(*program, /*block_id=*/0, skip_vars)); + + cached_exe_info.Insert(cache_key, exe_ctx); + return exe_ctx; + } else { + VLOG(1) << "get exe_info from cache by program: " << program + << " is_grad: " << is_grad; + return cached_exe_info.Get(cache_key); + } +} + +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/executor_cache.h b/paddle/fluid/framework/executor_cache.h new file mode 100644 index 0000000000000000000000000000000000000000..d83cadc22397a732cbb5f62f3e0fa40b568ce511 --- /dev/null +++ b/paddle/fluid/framework/executor_cache.h @@ -0,0 +1,96 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "paddle/fluid/framework/executor.h" +#include "paddle/fluid/framework/program_desc.h" +#include "paddle/fluid/platform/macros.h" + +namespace paddle { +namespace framework { + +class ExecutorInfoCache { + public: + /* + * The ExecutorPrepareContext is different while running forward program and + * backward program. We add bool value into cached key to distinguish this. + */ + using KeyType = std::pair; + + struct HashPair { + template + size_t operator()(const std::pair& p) const noexcept { + size_t seed = 10; + hash_combine(&seed, p.first); + hash_combine(&seed, p.second); + return seed; + } + template + void hash_combine(size_t* seed, const T& val) const { + std::hash hasher; + (*seed) ^= hasher(val) + 0x9e3779b9 + ((*seed) << 6) + ((*seed >> 2)); + } + }; + + static ExecutorInfoCache& Instance(); + + std::shared_ptr Get( + const KeyType& key) const { + PADDLE_ENFORCE_EQ( + Has(key), true, + platform::errors::NotFound( + "(programDesc: %s, is_grad: %s) doesn't exist in ExecutorInfoCache", + key.first, key.second)); + return info_map_.at(key); + } + + bool Has(const KeyType& key) const { + return info_map_.find(key) != info_map_.end(); + } + + void Insert(const KeyType& key, + std::shared_ptr exe_ctx) { + PADDLE_ENFORCE_NE( + Has(key), true, + platform::errors::NotFound( + "(programDesc: %s, is_grad: %s) has existed in ExecutorInfoCache", + key.first, key.second)); + + info_map_.insert(std::make_pair(key, exe_ctx)); + } + + private: + ExecutorInfoCache() = default; + + std::unordered_map< + KeyType, std::shared_ptr, HashPair> + info_map_; + DISABLE_COPY_AND_ASSIGN(ExecutorInfoCache); +}; + +std::shared_ptr GetExecutorInfoFromCache( + const framework::Executor& exe, const framework::ExecutionContext& ctx, + const std::vector>& ctx_output_names, + bool is_grad); + +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/operators/CMakeLists.txt b/paddle/fluid/operators/CMakeLists.txt index 3b9d3e7e9374e62b0e0b3a708207115f8191e00c..f0b9107bee5e2722ca15173a9d0f14e42befd018 100644 --- a/paddle/fluid/operators/CMakeLists.txt +++ b/paddle/fluid/operators/CMakeLists.txt @@ -64,9 +64,11 @@ if(WITH_COVERAGE OR WIN32 OR WITH_NV_JETSON) SET(OP_MKL_DEPS ${OP_MKL_DEPS} pyramid_hash_op) endif() -register_operators(EXCLUDES py_func_op warpctc_op dgc_op lstm_op +register_operators(EXCLUDES py_func_op warpctc_op dgc_op lstm_op run_program_op sync_batch_norm_op ${OP_MKL_DEPS} DEPS ${OP_HEADER_DEPS} ${OP_PREFETCH_DEPS}) +op_library(run_program_op SRCS run_program_op.cc run_program_op.cu.cc DEPS executor_cache ${OP_HEADER_DEPS} ${OP_PREFETCH_DEPS}) + if (WITH_GPU) # warpctc_op needs cudnn 7 above if (${CUDNN_MAJOR_VERSION} VERSION_LESS 7) diff --git a/paddle/fluid/operators/run_program_op.h b/paddle/fluid/operators/run_program_op.h index 5afe25cf687fc96d1eaac33b2d0516c96c394a46..f78f5c5b948c63e02d9121c540b6207c30b2d0f9 100644 --- a/paddle/fluid/operators/run_program_op.h +++ b/paddle/fluid/operators/run_program_op.h @@ -16,12 +16,15 @@ limitations under the License. */ #include #include +#include #include +#include #include #include #include #include "paddle/fluid/framework/executor.h" +#include "paddle/fluid/framework/executor_cache.h" #include "paddle/fluid/framework/op_desc.h" #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/operator.h" @@ -156,46 +159,6 @@ static void ShareVarsFromScope(const std::vector &vars, } } -static void AppendSkipDeletionVars(const std::vector &append_vars, - std::vector *all_vars) { - for (auto &var : append_vars) { - all_vars->emplace_back(var); - } -} - -static void AppendSafeEagerDeletionSkipVars( - const framework::ProgramDesc &program, - std::vector *skip_vars) { - const framework::BlockDesc &block = program.Block(0); - const std::vector &all_ops = block.AllOps(); - - std::unordered_set grad_op_output; - std::unordered_set grad_op_input; - for (const framework::OpDesc *op : all_ops) { - int op_role = BOOST_GET_CONST( - int, op->GetAttr(framework::OpProtoAndCheckerMaker::OpRoleAttrName())); - if ((op_role & static_cast(framework::OpRole::kBackward)) == 0) { - continue; - } - - for (const std::string &in_arg_name : op->InputArgumentNames()) { - grad_op_input.emplace(in_arg_name); - } - for (const std::string &out_arg_name : op->OutputArgumentNames()) { - grad_op_output.emplace(out_arg_name); - } - } - - // For the grad op input variables, if it is not output of grad_op, it may - // be output of forward op and we should set the variables as skip_var to - // prevent it being deleted when grad op is called multiple times. - for (const std::string &var_name : grad_op_input) { - if (grad_op_output.find(var_name) == grad_op_output.end()) { - skip_vars->emplace_back(var_name); - } - } -} - } // namespace details template @@ -217,8 +180,6 @@ class RunProgramOpKernel : public framework::OpKernel { param_names = ctx.InputNames("Params"); } - auto *block = ctx.Attr("global_block"); - auto *program = block->Program(); auto start_op_index = ctx.Attr("start_op_index"); auto end_op_index = ctx.Attr("end_op_index"); auto is_test = ctx.Attr("is_test"); @@ -233,14 +194,8 @@ class RunProgramOpKernel : public framework::OpKernel { // Step 2. prepare executor and init persistable variables framework::Executor exe(ctx.GetPlace()); - - // skip delete vars - std::vector skip_vars; - details::AppendSkipDeletionVars(output_var_names, &skip_vars); - VLOG(2) << "Prepare to skip " << skip_vars.size() - << " var(s): " << string::join_strings(skip_vars, ' '); - - auto exe_ctx = exe.Prepare(*program, 0, skip_vars); + auto exe_ctx = framework::GetExecutorInfoFromCache( + exe, ctx, {output_var_names}, /*is_grad=*/false); // NOTE(Aurelius84): While training some models, forward can be called many // times and then apply backpropagation all at once, such as Reinforcement @@ -259,7 +214,8 @@ class RunProgramOpKernel : public framework::OpKernel { // Step 3. run ops exe.RunPartialPreparedContext(exe_ctx.get(), &scope, start_op_index, end_op_index, /*create_local_scope=*/false, - /*create_vars=*/true, /*keep_kids=*/!is_test); + /*create_vars=*/true, + /*keep_kids=*/!is_test); // Step 4. Get Output details::ShareVarsFromScope(output_vars, output_var_names, &scope); @@ -305,8 +261,6 @@ class RunProgramGradOpKernel : public framework::OpKernel { } auto *block = ctx.Attr("global_block"); - auto *program = block->Program(); - auto orig_end_op_index = ctx.Attr("end_op_index"); // NOTE: skip `shape` and `fill_constant` op created by // fluid.backward.gradients, one forward output will generate one `shape` @@ -332,20 +286,12 @@ class RunProgramGradOpKernel : public framework::OpKernel { // Step 2. prepare executor and scope framework::Executor exe(ctx.GetPlace()); - - // skip delete vars - std::vector skip_vars; - details::AppendSkipDeletionVars(input_grad_var_names, &skip_vars); - details::AppendSkipDeletionVars(param_grad_names, &skip_vars); - details::AppendSafeEagerDeletionSkipVars(*program, &skip_vars); - VLOG(2) << "Prepare to skip " << skip_vars.size() - << " var(s): " << string::join_strings(skip_vars, ' '); - - auto exe_ctx = exe.Prepare(*program, 0, skip_vars); + auto exe_ctx = framework::GetExecutorInfoFromCache( + exe, ctx, {input_grad_var_names, param_grad_names}, + /*is_grad=*/true); details::ShareVarsIntoScope(output_grad_vars, output_grad_var_names, &scope); - // Debug info: scope info when run end VLOG(3) << framework::GenScopeTreeDebugInfo(out_scope_vec->front()); diff --git a/python/paddle/fluid/tests/unittests/test_run_program_op.py b/python/paddle/fluid/tests/unittests/test_run_program_op.py index 55810faff13e272a03df91269de28f9643068316..f6332859f92f7af78bb664c3b12038ce9f767096 100644 --- a/python/paddle/fluid/tests/unittests/test_run_program_op.py +++ b/python/paddle/fluid/tests/unittests/test_run_program_op.py @@ -167,6 +167,9 @@ class RunProgramOpTest(unittest.TestCase): return outputs def calc_dygraph_output(self, place): + self.program_desc, self.fwd_op_num = self.get_program_desc() + self.attrs = self.prepare_attrs() + with fluid.dygraph.guard(place): inputs = self.prepare_dygraph_input(place) outputs = self.prepare_dygraph_output() @@ -179,6 +182,9 @@ class RunProgramOpTest(unittest.TestCase): return outputs['Out'] def calc_dygraph_grad(self, place): + self.program_desc, self.fwd_op_num = self.get_program_desc() + self.attrs = self.prepare_attrs() + with fluid.dygraph.guard(place): # Step 1. run forward inputs, input_param_list = self.prepare_dygraph_input(place, True) @@ -241,10 +247,6 @@ class TestRunProgramOpWithFC(RunProgramOpTest): } } - self.program_desc, self.fwd_op_num = self.get_program_desc() - - self.attrs = self.prepare_attrs() - def test_check_output(self): self.check_output() @@ -298,10 +300,6 @@ class TestRunProgramOpWithEmbedding(RunProgramOpTest): } } - self.program_desc, self.fwd_op_num = self.get_program_desc() - - self.attrs = self.prepare_attrs() - def test_check_output(self): self.check_output()