From 7f8bc49d0001ee48d509d2c115c4f1edef05dcc0 Mon Sep 17 00:00:00 2001 From: guru4elephant <35550832+guru4elephant@users.noreply.github.com> Date: Fri, 24 May 2019 09:23:45 +0800 Subject: [PATCH] polish_executor_and_add_ctx_cache (#17536) * polish_executor_and_add_ctx_cache --- paddle/fluid/framework/executor.cc | 21 ++++++++++++++++++- paddle/fluid/framework/executor.h | 24 ++++++++++++++-------- paddle/fluid/framework/hogwild_worker.cc | 2 +- paddle/fluid/pybind/pybind.cc | 20 +++++++++++++++++- python/paddle/fluid/executor.py | 26 +++++++++++++++++++++--- 5 files changed, 78 insertions(+), 15 deletions(-) diff --git a/paddle/fluid/framework/executor.cc b/paddle/fluid/framework/executor.cc index 239a3ce0a..abeca93e5 100644 --- a/paddle/fluid/framework/executor.cc +++ b/paddle/fluid/framework/executor.cc @@ -244,6 +244,25 @@ static bool has_fetch_operators( return fetch_count > 0; } +std::unique_ptr Executor::PrepareCtxCache( + const ProgramDesc& program, int block_id, + const std::vector& skip_ref_cnt_vars, bool force_disable_gc) { + std::unique_ptr ctx; + ctx.reset(new ExecutorPrepareContext(program, block_id)); + auto& block = program.Block(block_id); + for (auto& op_desc : block.AllOps()) { + ctx->ops_.push_back(OpRegistry::CreateOp(*op_desc)); + } +#ifdef PADDLE_WITH_NGRAPH + if (FLAGS_use_ngraph) { + paddle::operators::NgraphEngine::FuseNgraphOps( + ctx->prog_.Block(ctx->block_id_), &ctx->ops_); + } +#endif + ctx->PrepareUnusedVars(skip_ref_cnt_vars, force_disable_gc); + return ctx; +} + void Executor::Run(const ProgramDesc& program, Scope* scope, std::map* feed_targets, std::map* fetch_targets, @@ -368,6 +387,7 @@ std::vector> Executor::Prepare( void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, bool create_local_scope, bool create_vars, bool keep_kids) { + platform::RecordBlock b(kProgramId); PADDLE_ENFORCE_NOT_NULL(scope); Scope* local_scope = scope; if (create_vars) { @@ -407,7 +427,6 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, for (auto& op : ctx->ops_) { op->Run(*local_scope, place_); - if (gc) { DeleteUnusedTensors(*local_scope, op.get(), ctx->unused_vars_, gc.get()); } diff --git a/paddle/fluid/framework/executor.h b/paddle/fluid/framework/executor.h index 6eeeb1efc..d0d12b307 100644 --- a/paddle/fluid/framework/executor.h +++ b/paddle/fluid/framework/executor.h @@ -83,6 +83,21 @@ class Executor { const std::string& feed_holder_name = "feed", const std::string& fetch_holder_name = "fetch"); + // This API is very slow. + void RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, + std::map* feed_targets, + std::map* fetch_targets, + bool create_local_scope = true, + bool create_vars = true, + const std::string& feed_holder_name = "feed", + const std::string& fetch_holder_name = "fetch"); + + std::unique_ptr PrepareCtxCache( + const ProgramDesc& program, int block_id, + const std::vector& skip_ref_cnt_vars = + std::vector(), + bool force_disable_gc = false); + static std::unique_ptr Prepare( const ProgramDesc& program, int block_id, const std::vector& skip_ref_cnt_vars = @@ -101,15 +116,6 @@ class Executor { bool create_local_scope = true, bool create_vars = true, bool keep_kids = false); - // This API is very slow. - void RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, - std::map* feed_targets, - std::map* fetch_targets, - bool create_local_scope = true, - bool create_vars = true, - const std::string& feed_holder_name = "feed", - const std::string& fetch_holder_name = "fetch"); - void EnableMKLDNN(const ProgramDesc& program); void RunFromDataset(const ProgramDesc& main_program, Scope* scope, diff --git a/paddle/fluid/framework/hogwild_worker.cc b/paddle/fluid/framework/hogwild_worker.cc index f16fefefd..f02828eba 100644 --- a/paddle/fluid/framework/hogwild_worker.cc +++ b/paddle/fluid/framework/hogwild_worker.cc @@ -24,7 +24,7 @@ void HogwildWorker::Initialize(const TrainerDesc& desc) { fetch_config_ = desc.fetch_config(); param_ = desc.hogwild_param(); skip_ops_.resize(param_.skip_ops_size()); - for (size_t i = 0; i < param_.skip_ops_size(); ++i) { + for (int i = 0; i < param_.skip_ops_size(); ++i) { skip_ops_[i] = param_.skip_ops(i); } use_cvm_ = desc.use_cvm(); diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index e0eaefad6..af9b27533 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -1032,10 +1032,28 @@ All parameter, weight, gradient are variables in Paddle. [](const OperatorBase &op) { return op.OutputVars(false); }) .def("support_gpu", &OperatorBase::SupportGPU); + py::class_(m, "ExecutorPrepareContext") + .def(py::init()); + py::class_(m, "Executor") .def(py::init()) .def("close", &Executor::Close) - .def("run_from_dataset", &Executor::RunFromDataset) + .def("run_from_dataset", &Executor::RunFromDataset, + py::call_guard()) + .def("run_prepared_ctx", + [](Executor &self, ExecutorPrepareContext *ctx, Scope *scope, + std::map *feed_targets, + std::map *fetch_targets, + bool create_local_scope = true, bool create_vars = true, + const std::string &feed_holder_name = "feed", + const std::string &fetch_holder_name = "fetch") { + pybind11::gil_scoped_release release; + self.RunPreparedContext(ctx, scope, feed_targets, fetch_targets, + create_local_scope, create_vars, + feed_holder_name, fetch_holder_name); + }) + .def("prepare_ctx_cache", &Executor::PrepareCtxCache, + py::call_guard()) .def("run", [](Executor &self, const ProgramDesc &prog, Scope *scope, int block_id, bool create_local_scope, bool create_vars, const std::vector &fetch_vars) { diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 68abd7e7f..4c47a309e 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -247,6 +247,10 @@ def _to_name_str(var): raise TypeError(str(var) + " should be Variable or str") +def _get_strong_program_cache_key(program, feed, fetch_list): + return str(id(program)) + _get_program_cache_key(feed, fetch_list) + + def _get_program_cache_key(feed, fetch_list): feed_var_names = list(feed.keys()) fetch_var_names = list(map(_to_name_str, fetch_list)) @@ -356,17 +360,24 @@ class Executor(object): def __init__(self, place): self.place = place self.program_caches = dict() + self.ctx_caches = dict() p = core.Place() p.set_place(self.place) self._default_executor = core.Executor(p) self._closed = False + def _get_ctx_cache(self, program_cache_key): + return self.ctx_caches.get(program_cache_key, None) + def _get_program_cache(self, program_cache_key): return self.program_caches.get(program_cache_key, None) def _add_program_cache(self, program_cache_key, program): self.program_caches[program_cache_key] = program + def _add_ctx_cache(self, ctx_cache_key, ctx): + self.ctx_caches[ctx_cache_key] = ctx + def _add_feed_fetch_ops(self, program, feed, fetch_list, feed_var_name, fetch_var_name): tmp_program = program.clone() @@ -645,6 +656,7 @@ class Executor(object): # performance. # TODO(panyx0718): executor should be able to run graph. assert program._program, "CompiledProgram is compiled from graph, can only run with_data_parallel." + # use_program_cache is not valid with CompiledProgram return self._run( program._program, self._default_executor, @@ -654,7 +666,7 @@ class Executor(object): fetch_var_name=fetch_var_name, scope=scope, return_numpy=return_numpy, - use_program_cache=use_program_cache) + use_program_cache=False) def _run(self, program, exe, feed, fetch_list, feed_var_name, fetch_var_name, scope, return_numpy, use_program_cache): @@ -677,9 +689,10 @@ class Executor(object): "Executor requires Program as its Parameter. But you passed in %s" % (type(program))) - cache_key = _get_program_cache_key(feed, fetch_list) + cache_key = _get_strong_program_cache_key(program, feed, fetch_list) if use_program_cache: cached_program = self._get_program_cache(cache_key) + cached_ctx = self._get_ctx_cache(cache_key) if cached_program is None: cached_program = self._add_feed_fetch_ops( program=program, @@ -688,7 +701,11 @@ class Executor(object): feed_var_name=feed_var_name, fetch_var_name=fetch_var_name) self._add_program_cache(cache_key, cached_program) + cached_ctx = self._default_executor.prepare_ctx_cache( + cached_program.desc, 0, fetch_list, False) + self._add_ctx_cache(cache_key, cached_ctx) program = cached_program + ctx = cached_ctx else: self.program_caches.pop(cache_key, None) program = self._add_feed_fetch_ops( @@ -699,7 +716,10 @@ class Executor(object): fetch_var_name=fetch_var_name) self._feed_data(program, feed, feed_var_name, scope) - exe.run(program.desc, scope, 0, True, True, fetch_var_name) + if not use_program_cache: + exe.run(program.desc, scope, 0, True, True, fetch_var_name) + else: + exe.run_prepared_ctx(ctx, scope, True, True, False) outs = self._fetch_data(fetch_list, fetch_var_name, scope) if return_numpy: outs = as_numpy(outs) -- GitLab