diff --git a/paddle/fluid/framework/prune.cc b/paddle/fluid/framework/prune.cc index db9d794b3c61dd23b8515a4edd8ae7468ad37cc4..f5e20e041e9e44437907f384c7cadae007a9f510 100644 --- a/paddle/fluid/framework/prune.cc +++ b/paddle/fluid/framework/prune.cc @@ -113,7 +113,6 @@ bool HasSubBlock(const proto::OpDesc& op_desc) { } int GetOpRole(const proto::OpDesc& op_desc) { - // The op role >= 0, so -1 is used to indicate "NotFound". for (auto& attr : op_desc.attrs()) { if (attr.name() == OpProtoAndCheckerMaker::OpRoleAttrName()) { PADDLE_ENFORCE_EQ( @@ -124,7 +123,10 @@ int GetOpRole(const proto::OpDesc& op_desc) { return attr.i(); } } - return -1; + // If attr op_role is not found, it may be operator created in c++ test, like + // prune_test.cc. In that case, the op_role should be defaut value, which is + // kNotSpecified. + return static_cast(OpRole::kNotSpecified); } void AppendOpInputVarNames(const proto::OpDesc& op_desc, @@ -145,6 +147,16 @@ void AppendOpOutputVarNames(const proto::OpDesc& op_desc, } } +int FindMapByValue(const std::map& m, int val) { + // The content in map should be >= 0, so -1 is used to indicate "NotFound". + for (auto& pair : m) { + if (pair.second == val) { + return pair.first; + } + } + return -1; +} + // block_id is the idx of the current block in the input desc // parent_block_id is the idx of the parent of the current block // in the output desc, -1 means the current block is global block @@ -153,30 +165,41 @@ void AppendOpOutputVarNames(const proto::OpDesc& op_desc, void prune_impl(const proto::ProgramDesc& input, proto::ProgramDesc* output, int block_id, int parent_block_id, std::unordered_set* dependent_vars, - const std::set feed_var_names) { + const std::set feed_var_names, + std::map* pruned_origin_block_id_map) { auto& block = input.blocks(block_id); auto& ops = block.ops(); bool expect_feed = true; for (auto& op_desc : ops) { - PADDLE_ENFORCE(op_desc.type() != kFeedOpType || expect_feed, - "All FeedOps are at the beginning of the ProgramDesc"); + PADDLE_ENFORCE_EQ( + op_desc.type() != kFeedOpType || expect_feed, true, + platform::errors::PreconditionNotMet( + "All FeedOps are at the beginning of the ProgramDesc")); expect_feed = (op_desc.type() == kFeedOpType); } bool expect_fetch = true; for (auto op_iter = ops.rbegin(); op_iter != ops.rend(); ++op_iter) { auto& op_desc = *op_iter; - PADDLE_ENFORCE(op_desc.type() != kFetchOpType || expect_fetch, - "All FetchOps must at the end of the ProgramDesc"); + PADDLE_ENFORCE_EQ(op_desc.type() != kFetchOpType || expect_fetch, true, + platform::errors::PreconditionNotMet( + "All FetchOps must at the end of the ProgramDesc")); expect_fetch = (op_desc.type() == kFetchOpType); } std::vector should_run; for (auto op_iter = ops.rbegin(); op_iter != ops.rend(); ++op_iter) { auto& op_desc = *op_iter; - if (IsTarget(op_desc) || HasDependentOutputVar(op_desc, *dependent_vars)) { - // insert its input to the dependency graph + + if (IsTarget(op_desc) || + (HasDependentOutputVar(op_desc, *dependent_vars) && + (GetOpRole(op_desc) & static_cast(OpRole::kOptimize)) == 0)) { + // NOTE(zhiqiu): since optimize op takes the trainable parameters as + // inputs and output, it may introduce wrong dependency graph. + // For train mode, the optimize op should be in targets, so is not need + // and not right to mark optimize op by its outputs. + // For eval / infer mode, there is no optimize op in program. for (auto& var : op_desc.inputs()) { for (auto& argu : var.arguments()) { if (feed_var_names.count(argu) == 0) { @@ -203,6 +226,8 @@ void prune_impl(const proto::ProgramDesc& input, proto::ProgramDesc* output, output_block->set_idx(output_block_id); output_block->set_parent_idx(parent_block_id); + (*pruned_origin_block_id_map)[output_block_id] = block_id; + auto* op_field = output_block->mutable_ops(); op_field->Clear(); for (size_t i = 0; i < should_run.size(); ++i) { @@ -244,7 +269,8 @@ void prune_impl(const proto::ProgramDesc& input, proto::ProgramDesc* output, // GetSubBlockIndex(*op) is the idx of the sub_block in the input desc // output_block_id is the idx of the current block in the output desc prune_impl(input, output, GetSubBlockIndex(*op), output_block_id, - &sub_block_dependent_vars, feed_var_names); + &sub_block_dependent_vars, feed_var_names, + pruned_origin_block_id_map); } } } @@ -284,22 +310,33 @@ void prune_impl(const proto::ProgramDesc& input, proto::ProgramDesc* output, } // TODO(fengjiayi): Prune() could be inplaced to avoid unnecessary copies -void Prune(const proto::ProgramDesc& input, - const std::set& feed_var_names, - proto::ProgramDesc* output) { +std::map Prune(const proto::ProgramDesc& input, + const std::set& feed_var_names, + proto::ProgramDesc* output) { std::unordered_set dependent_vars; output->clear_blocks(); - prune_impl(input, output, 0, -1, &dependent_vars, feed_var_names); -} - -int FindMapByValue(const std::map& m, int val) { - // The content in map should be >= 0, so -1 is used to indicate "NotFound". - for (auto& pair : m) { - if (pair.second == val) { - return pair.first; + std::map pruned_origin_block_id_map; + prune_impl(input, output, 0, -1, &dependent_vars, feed_var_names, + &pruned_origin_block_id_map); + // update subblock idx + for (int i = 0; i < output->blocks_size(); i++) { + auto* pruned = output->mutable_blocks(i); + auto* ops = pruned->mutable_ops(); + for (auto op_iter = ops->rbegin(); op_iter != ops->rend(); ++op_iter) { + auto& op_desc = *op_iter; + if (HasSubBlock(op_desc)) { + int origin_sub_idx = GetSubBlockIndex(op_desc); + auto sub_idx = + FindMapByValue(pruned_origin_block_id_map, origin_sub_idx); + PADDLE_ENFORCE_NE(sub_idx, -1, + platform::errors::NotFound( + "The origin sub block id should be found in " + "pruned_progin_block_id_map")); + SetSubBlockIndex(&op_desc, sub_idx); + } } } - return -1; + return pruned_origin_block_id_map; } void PruneBackwardImpl(proto::BlockDesc* origin, proto::BlockDesc* pruned) { @@ -348,8 +385,8 @@ void PruneBackwardImpl(proto::BlockDesc* origin, proto::BlockDesc* pruned) { var_names.insert(op_output_vars.begin(), op_output_vars.end()); for (const auto& name : var_names) { if (var_map.count(name)) { - // NOTE(zhiqiu): For operator in a conditional block, the related vars may - // not exist in current block, but in its futher block. + // NOTE(zhiqiu): For operator in a conditional block, the related vars + // may not exist in current block, but in its futher block. *pruned_vars->Add() = var_map[name]; } } @@ -389,6 +426,7 @@ std::tuple> PruneBackward( proto::ProgramDesc pruned_desc; pruned_desc.clear_blocks(); + // Step 2. Prune backward for each block. for (size_t i = 0; i < origin_clone.Size(); i++) { auto pruned = proto::BlockDesc(); diff --git a/paddle/fluid/framework/prune.h b/paddle/fluid/framework/prune.h index 857006d69c1b215125f4d2a5e209e0c489829709..63e8aaad73575e7561428a9de43b86f75c22e65e 100644 --- a/paddle/fluid/framework/prune.h +++ b/paddle/fluid/framework/prune.h @@ -26,9 +26,9 @@ limitations under the License. */ namespace paddle { namespace framework { -void Prune(const proto::ProgramDesc& input, - const std::set& feed_var_names, - proto::ProgramDesc* output); +std::map Prune(const proto::ProgramDesc& input, + const std::set& feed_var_names, + proto::ProgramDesc* output); std::tuple> PruneBackward( const framework::ProgramDesc& origin); diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index dc0dfd46da3f643579674e0d249856c5e6bc70f2..4fce0e5eb55ac4c8a8f3c526f3d994e001c2d660 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -1154,8 +1154,10 @@ All parameter, weight, gradient are variables in Paddle. prog_with_targets.MutableBlock(t[0])->Op(t[1])->SetIsTarget(true); } proto::ProgramDesc pruned_desc; - Prune(*prog_with_targets.Proto(), feeded_var_names, &pruned_desc); - return new ProgramDesc(pruned_desc); + auto pruned_origin_block_id_map = + Prune(*prog_with_targets.Proto(), feeded_var_names, &pruned_desc); + return std::make_tuple(ProgramDesc(pruned_desc), + pruned_origin_block_id_map); }); m.def("prune_backward", [](const framework::ProgramDesc &program) { diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 1ada5f74f8d85ef99008f22ab229b49906dc32d8..d8578a56ceaddc27c56426288ebb8dc15df5049f 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -23,12 +23,13 @@ import numpy as np from .wrapped_decorator import signature_safe_contextmanager import six from .data_feeder import convert_dtype -from .framework import Program, default_main_program, Variable, convert_np_dtype_to_dtype_ +from .framework import Program, default_main_program, Variable, Operator, convert_np_dtype_to_dtype_ from . import core from . import compiler from .. import compat as cpt from .trainer_factory import TrainerFactory from .trainer_factory import FetchHandlerMonitor +import copy __all__ = ['Executor', 'global_scope', 'scope_guard'] @@ -345,14 +346,27 @@ def _fetch_var(name, scope=None, return_numpy=True): def _to_name_str(var): - if isinstance(var, Variable): - return var.desc.name() - elif isinstance(var, str): - return var - elif isinstance(var, six.string_types): - return str(var) + def _to_str(var): + if isinstance(var, Variable): + return var.desc.name() + elif isinstance(var, str): + return var + elif isinstance(var, six.string_types): + return str(var) + elif isinstance(var, Operator): + return var.desc.type() + else: + raise TypeError(str(var) + " should be Variable, Operator or str") + + # NOTEz(zhiqiu): The item in fetch_list may be tuple returned by Optimizer.minimize(), + # see comments in _split_optimize_ops_in_fetch_list for more details. + if isinstance(var, tuple): + var = var[0] + if isinstance(var, list): + s = [_to_str(item) for item in var] + return ','.join(s) else: - raise TypeError(str(var) + " should be Variable or str") + return _to_str(var) def _get_strong_program_cache_key(program, feed, fetch_list): @@ -360,9 +374,13 @@ def _get_strong_program_cache_key(program, feed, fetch_list): def _get_program_cache_key(feed, fetch_list): - feed_var_names = list(feed.keys()) + feed_var_names = [] + if isinstance(feed, dict): + feed_var_names = list(feed.keys()) + elif isinstance(feed, list) or isinstance(feed, tuple): + for i, each in enumerate(feed): + feed_var_names += list(each.keys()) fetch_var_names = list(map(_to_name_str, fetch_list)) - return str(feed_var_names + fetch_var_names) @@ -503,10 +521,12 @@ class Executor(object): self.ctx_caches = dict() self.scope_caches = dict() self.var_caches = dict() + self.pruned_program_caches = dict() p = core.Place() p.set_place(self.place) self._default_executor = core.Executor(p) self._closed = False + self.pruned_program_scope_caches = dict() def _get_scope_cache(self, program_cache_key): return self.scope_caches.get(program_cache_key, None) @@ -520,6 +540,18 @@ class Executor(object): def _add_program_cache(self, program_cache_key, program): self.program_caches[program_cache_key] = program + def _get_pruned_program_cache(self, program_cache_key): + return self.pruned_program_caches.get(program_cache_key, None) + + def _add_pruned_program_cache(self, program_cache_key, program): + self.pruned_program_caches[program_cache_key] = program + + def _get_pruned_program_scope_cache(self, program_cache_key): + return self.pruned_program_scope_caches.get(program_cache_key, None) + + def _add_pruned_program_scope_cache(self, program_cache_key, program): + self.pruned_program_scope_caches[program_cache_key] = program + def _add_ctx_cache(self, ctx_cache_key, ctx): self.ctx_caches[ctx_cache_key] = ctx @@ -551,13 +583,17 @@ class Executor(object): # prepend feed operators if not has_feed_operators(global_block, feed, feed_var_name): for i, name in enumerate(feed): - out = global_block.var(name) - global_block._prepend_op( - type='feed', - inputs={'X': [feed_var]}, - outputs={'Out': [out]}, - attrs={'col': i}) - + if global_block.has_var(name): + out = global_block.var(name) + global_block._prepend_op( + type='feed', + inputs={'X': [feed_var]}, + outputs={'Out': [out]}, + attrs={'col': i}) + else: + warnings.warn( + "The variable %s is not found in program. It is not declared or is pruned." + % name) # append fetch_operators if not has_fetch_operators(global_block, fetch_list, fetch_var_name): for i, var in enumerate(fetch_list): @@ -595,6 +631,159 @@ class Executor(object): ] return outs + def _split_optimize_ops_in_fetch_list(self, fetch_list): + """ + Split optimize_ops from fetch_list, which provided to specify program prunning. + Args: + fetch_list(list): The original fetch_list. + Possible types of fetch_list are: + fetch_list = ['loss'] + fetch_list = [[sgd, sgd], 'loss'] + fetch_list = [([sgd, sgd], [(param, grad)]), 'loss'] + + Returns: + optimize_ops(list): The optimize operators splited from fetch_list. + fetch_list(list): The updated fetch_list which does not contain optimize operators. + """ + _optimize_ops = [] + _fetch_list = [] + + def _get_targets(_optimize_ops, _fetch_list, item): + if isinstance(item, Operator): + if item._is_optimize_op(): + _optimize_ops.append(item) + else: + raise TypeError( + "The operator in fetch_list is not an optimize_op") + elif isinstance(item, Variable) or isinstance( + item, str) or isinstance(item, six.string_types): + _fetch_list.append(item) + else: + raise TypeError( + "The item in fetch_list should be str, variable or optimize_op, but recieved %s.", + type(item)) + + for item in fetch_list: + # NOTE(zhiqiu): to support (optimizer_ops, param_and_grads) and optimizer_ops in fetch_list + # we should handle tuple and list in fetch_list. + # TODO(zhiqiu): find a better way to handle that. + if isinstance(item, list): + for i in item: + _get_targets(_optimize_ops, _fetch_list, i) + elif isinstance(item, tuple): + for i in item[0]: + _get_targets(_optimize_ops, _fetch_list, i) + else: + _get_targets(_optimize_ops, _fetch_list, item) + + return _fetch_list, _optimize_ops + + def _prune_program(self, + program, + feed=None, + fetch_list=None, + optimize_ops=None): + """ + Prune operators and variables which are not needed to generate + :code:`fetch_list` and optimize operators. + Prune operators and variables which are needed + to generate variables to be feeded. + + Notes: This is a very low level API. Users should not use this API + directly. + + Args: + program(Program): the origin program + feed(list|dict): feed dict or list. + fetch_list(list|Variable): A list of variables need to be fetched + optimize_ops(list[Operator]): A list of optimizer operators + + Returns: + Program: A new, pruned program. + """ + compiled = isinstance(program, compiler.CompiledProgram) + if compiled: + if program._program: + origin_program = program._program + else: + warnings.warn( + "The program holds no _program, maybe it is constructed by graph, which can't be pruned yet." + ) + return + else: + origin_program = program + + feed_names = [] + if isinstance(feed, dict): + feed_names = list(feed.keys()) + elif isinstance(feed, list) or isinstance(feed, tuple): + for i, each in enumerate(feed): + feed_names += list(each.keys()) + + # if optimize_ops is [], all optimize ops in the program is used. + if not optimize_ops: + for block in origin_program.blocks: + for op in block.ops: + if op._is_optimize_op(): + optimize_ops.append(op) + + targets = fetch_list + optimize_ops + pruned_program = origin_program._prune_with_input(feed_names, targets) + + if compiled: + # for compiled program, update the underlying program, re-generate graph, + # and reset the flag so it can be compiled again. + program._program = pruned_program + program._graph = core.Graph(pruned_program.desc) + program._compiled = False + else: + program = pruned_program + + return program + + def _update_feed(self, program, feed): + """ + Update the feed dict, remove the feed item which is pruned in program. + + Notes: This is a very low level API. Users should not use this API + directly. + + Args: + program(Program): the pruned program. + feed(list|dict): feed dict or list. + + Returns: + feed:(list|dict) updated feed. + """ + compiled = isinstance(program, compiler.CompiledProgram) + if compiled: + if program._program: + global_block = program._program.global_block() + else: + warnings.warn( + "The program holds no _program, maybe it is constructed by graph." + ) + else: + global_block = program.global_block() + + if isinstance(feed, dict): + for feed_name in list(feed.keys()): + if not global_block.has_var(feed_name): + feed.pop(feed_name) + warnings.warn( + "The variable %s is not found in program. It is not declared or is pruned." + % feed_name) + + elif isinstance(feed, list) or isinstance(feed, tuple): + for i, each in enumerate(feed): + for feed_name in list(each.keys()): + if not global_block.has_var(feed_name): + each.pop(feed_name) + warnings.warn( + "The variable %s is not found in program. It is not declared or is pruned." + % feed_name) + return feed + ''' TODO(typhoonzero): Define "no longer use" meaning? Can user create a new Executor for the same program and run? @@ -682,7 +871,8 @@ class Executor(object): scope=None, return_numpy=True, use_program_cache=False, - return_merged=True): + return_merged=True, + use_prune=False): """ Run the specified :code:`Program` or :code:`CompiledProgram`. It should be noted that the executor will execute all the operators in :code:`Program` or :code:`CompiledProgram` without pruning some @@ -706,7 +896,7 @@ class Executor(object): so the length of this list should be equal to the number of places. The default is None. fetch_list(list): This parameter represents the variables that need to be returned - after the model runs. The default is None. + after the model runs. The default is None. feed_var_name(str): This parameter represents the name of the input variable of the feed operator. The default is "feed". fetch_var_name(str): This parameter represents the name of the output variable of @@ -732,6 +922,13 @@ class Executor(object): set :code:`return_merged` as False, which denotes that the fetched results will not be merged. The default is True, but it is just for the compatibility, and may use False as default value in the future version. + use_prune(bool): This parameter indicates whether the input :code:`Program` will be pruned. + If the parameter is True, the program will be pruned accroding to the given feed and fetch_list, + which means the operators and variables in program that generate :code:`feed` and are not + needed to generate :code:`fetch_list` will be pruned. The default is False, which means the + program will not pruned and all the operators and variables will be executed during running. + Note that if the tuple returned from :code:`Optimizer.minimize()` is passed to :code:`fetch_list`, + :code:`use_prune` will be overrided to True, and the program will be pruned. Returns: @@ -844,6 +1041,7 @@ class Executor(object): scope=scope, return_numpy=return_numpy, use_program_cache=use_program_cache, + use_prune=use_prune, return_merged=return_merged) except Exception as e: if not isinstance(e, core.EOFException): @@ -853,7 +1051,7 @@ class Executor(object): def _run_impl(self, program, feed, fetch_list, feed_var_name, fetch_var_name, scope, return_numpy, use_program_cache, - return_merged): + return_merged, use_prune): if self._closed: raise RuntimeError("Attempted to use a closed Executor") @@ -877,7 +1075,9 @@ class Executor(object): scope = global_scope() if fetch_list is not None: - if isinstance(fetch_list, Variable) or isinstance(fetch_list, str): + if isinstance(fetch_list, Variable) or isinstance( + fetch_list, str) or isinstance(fetch_list, + six.string_types): fetch_list = [fetch_list] assert isinstance(fetch_list, tuple) or isinstance(fetch_list, list), \ "Currently , The fetch_list type only should be list or tuple, \n"\ @@ -886,6 +1086,38 @@ class Executor(object): else: fetch_list = [] + # use_prune can be overrided by putting optimize_ops in fetch_list + _origin_fetch_list = fetch_list + _origin_program = program + fetch_list, optimize_ops = self._split_optimize_ops_in_fetch_list( + fetch_list) + if optimize_ops: + use_prune = True + if use_prune: + cache_key = _get_strong_program_cache_key(program, feed, + _origin_fetch_list) + cached_pruned_program = self._get_pruned_program_cache(cache_key) + if cached_pruned_program is None: + if isinstance(program, compiler.CompiledProgram): + program_scope_cache = self._get_pruned_program_scope_cache( + str(id(_origin_program))) + # copy the original program, so it can be cached. + program = copy.copy(program) + # share the local scopes for same original CompiledProgram. + program._share_vars_from = program_scope_cache + if self._get_pruned_program_scope_cache( + str(id(_origin_program))) is None: + self._add_pruned_program_scope_cache( + str(id(_origin_program)), program) + pruned_program = self._prune_program(program, feed, fetch_list, + optimize_ops) + self._add_pruned_program_cache(cache_key, pruned_program) + else: + pruned_program = cached_pruned_program + + feed = self._update_feed(pruned_program, feed) + program = pruned_program + compiled = isinstance(program, compiler.CompiledProgram) # For backward compatibility, run directly. diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index 54433e5c0795ee254c757bf4bfbf2ac899f3e547..b23d967bfdb62dd6efd720ec8fac7d026a41939b 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -2172,6 +2172,15 @@ class Operator(object): return attr_map + def _is_optimize_op(self): + op_maker = core.op_proto_and_checker_maker + OPTIMIZE = core.op_proto_and_checker_maker.OpRole.Optimize + op_role = self.desc.attr(op_maker.kOpRoleAttrName()) + if op_role & int(OPTIMIZE): + return True + else: + return False + class Block(object): """ @@ -2706,8 +2715,8 @@ class Block(object): assert isinstance(p, Parameter) v = self.vars.get(p.name, None) if v is None: - raise ValueError("_copy_param_info_from should be invoked with " - "same topology") + # if the Parameter is pruned, v may be None + continue assert isinstance(v, Variable) new_p = None if in_dygraph_mode(): @@ -4056,52 +4065,13 @@ class Program(object): directly. This API is in flux and not stable. Args: - targets(list|Variable|Operator): A list of variables or operators + targets(list|Variable|Operator): A list of variables, operators, or variable names need to be pruned Returns: Program: A new, pruned program. """ - - #NOTE(zhiqiu): we sync the original program first, since its program may diff with - # its desc due to modifying desc in c++ space. E.g. save op will add kLookupTablePath in desc. - self._sync_with_cpp() - - if not isinstance(targets, list): - targets = [targets] - - targets_idx = [] - for t in targets: - if not isinstance(t, Operator): - if isinstance(t, Variable): - # After transpiler processing, the op that output this - # variable maybe has been changed, so t.op is not reliable - # and we need to find the current op that generate this - # variable here. - t.op = None - global_block = self.global_block() - for idx, op in enumerate(global_block.ops): - if t.name in op.output_arg_names: - t.op = op - break - - t = t.op - if t is None: - raise ValueError( - "The target variable must have an " - "associated operator that generates it.") - else: - raise ValueError("All targets of prune() can only be " - "Variable or Operator.") - - targets_idx.append([t.block.idx, t.idx]) - res = Program() - res.desc = core.prune(self.desc, set(), targets_idx) - res.blocks = [ - Block(res, i) for i in six.moves.range(res.desc.num_blocks()) - ] - res._sync_with_cpp() - return res + return self._prune_with_input([], targets) def _prune_with_input(self, feeded_var_names, targets): """ @@ -4115,7 +4085,7 @@ class Program(object): Args: feeded_var_names(list|str): A list of variable names from where pruning start. If it is set as [], this API works just like _prune() - targets(list|Variable|Operator): A list of variables or operators + targets(list|Variable|Operator): A list of variables, operators, or variable names need to be pruned Returns: @@ -4140,33 +4110,47 @@ class Program(object): for t in targets: if not isinstance(t, Operator): if isinstance(t, Variable): - # After transpiler processing, the op that output this - # variable maybe has been changed, so t.op is not reliable - # and we need to find the current op that generate this - # variable here. - t.op = None - global_block = self.global_block() - for idx, op in enumerate(global_block.ops): - if t.name in op.output_arg_names: - t.op = op - break - - t = t.op - if t is None: - raise ValueError( - "The target variable must have an " - "associated operator that generates it.") + name = t.name + elif isinstance(t, six.string_types): + name = str(t) else: raise ValueError("All targets of prune() can only be " "Variable or Operator.") - + # After transpiler processing, the op that output this + # variable maybe has been changed, so t.op is not reliable + # and we need to find the current op that generate this + # variable here. + target_op = None + global_block = self.global_block() + for idx, op in enumerate(global_block.ops): + if name in op.output_arg_names: + # NOTE(zhiqiu): Find op that generate target name. + # Skip optimize op except for optimize op in targets, + # since optimize op generates parameters. + if op._is_optimize_op() and op not in targets: + continue + else: + target_op = op + break + t = target_op + if t is None: + raise ValueError("The target variable must have an " + "associated operator that generates it.") targets_idx.append([t.block.idx, t.idx]) + res = Program() - res.desc = core.prune(self.desc, set(feeded_var_names), targets_idx) + res.desc, pruned_origin_block_id_map = core.prune(self.desc, + set(feeded_var_names), + targets_idx) res.blocks = [ Block(res, i) for i in six.moves.range(res.desc.num_blocks()) ] res._sync_with_cpp() + + res._copy_param_info_from(self) + res._copy_data_info_from(self, pruned_origin_block_id_map) + res._copy_dist_param_info_from(self) + return res def _inference_optimize(self, prune_read_op=True): diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index a1ba7abc6aea1ed20ac2801eae3cfbe85f9a9cab..984a854ebeed2345c2c4f25bbce8431276ebf30f 100644 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -811,6 +811,9 @@ class Optimizer(object): tuple: tuple (optimize_ops, params_grads), A list of operators appended by minimize and a list of (param, grad) variable pairs, param is ``Parameter``, grad is the gradient value corresponding to the parameter. + The returned tuple can be passed to ``fetch_list`` in ``Executor.run()`` to + indicate program pruning. If so, the program will be pruned by ``feed`` and + ``fetch_list`` before run, see details in ``Executor``. Examples: Please refer to the example of current Optimizer. diff --git a/python/paddle/fluid/tests/unittests/test_prune.py b/python/paddle/fluid/tests/unittests/test_prune.py index dd7e1153b2c006a7313ae28bbf96b7c2baa6117e..fe3ea43dde95169b6b3e93471ed9d52ed7240aa4 100644 --- a/python/paddle/fluid/tests/unittests/test_prune.py +++ b/python/paddle/fluid/tests/unittests/test_prune.py @@ -19,6 +19,9 @@ import unittest import paddle.fluid as fluid import paddle.fluid.framework as framework import paddle.compat as cpt +import numpy as np +import os +import contextlib class TestPrune(unittest.TestCase): @@ -96,5 +99,581 @@ class TestPrune(unittest.TestCase): cpt.get_exception_message(e)) +def mock(self, program, feed, fetch, optimize_ops): + self.prune_called_times += 1 + return program + + +@contextlib.contextmanager +def _mock_guard(mock): + original = fluid.Executor._prune_program + fluid.Executor._prune_program = mock + yield + fluid.Executor._prune_program = original + + +class TestExecutorRunAutoPrune(unittest.TestCase): + def net1(self): + x = fluid.layers.data(name='x', shape=[2], dtype='float32') + label = fluid.layers.data(name="label", shape=[1], dtype="int64") + w_param_attrs = fluid.ParamAttr( + name="fc_weight", + learning_rate=0.5, + initializer=fluid.initializer.Constant(1.0), + trainable=True) + y = fluid.layers.fc(input=[x], + size=2, + act="softmax", + param_attr=w_param_attrs) + loss1 = fluid.layers.cross_entropy(input=y, label=label) + loss1 = fluid.layers.mean(x=loss1) + loss2 = fluid.layers.cross_entropy(input=y, label=label) + loss2 = fluid.layers.mean(x=loss2) + loss1.persistable = True + loss2.persistable = True + return x, y, label, loss1, loss2, w_param_attrs + + def net2(self): + x1 = fluid.layers.data(name='x1', shape=[2], dtype='float32') + x2 = fluid.layers.data(name='x2', shape=[2], dtype='float32') + label = fluid.layers.data(name="label", shape=[1], dtype="int64") + w1_param_attrs = fluid.ParamAttr( + name="fc_weight1", + learning_rate=0.5, + initializer=fluid.initializer.Constant(1.0), + trainable=True) + w2_param_attrs = fluid.ParamAttr( + name="fc_weight2", + learning_rate=0.5, + initializer=fluid.initializer.Constant(1.0), + trainable=True) + y1 = fluid.layers.fc(input=[x1], + size=2, + act="softmax", + param_attr=w1_param_attrs) + y2 = fluid.layers.fc(input=[x2], + size=2, + act="softmax", + param_attr=w2_param_attrs) + loss1 = fluid.layers.cross_entropy(input=y1, label=label) + loss1 = fluid.layers.mean(x=loss1) + loss2 = fluid.layers.cross_entropy(input=y2, label=label) + loss2 = fluid.layers.mean(x=loss2) + return x1, x2, y1, y2, label, loss1, loss2, w1_param_attrs, w2_param_attrs + + def test_not_prune(self): + """ + If use_prune = False, the targets which is not fetched will be calculated. + """ + program = framework.Program() + startup_program = framework.Program() + scope = fluid.Scope() + with fluid.scope_guard(scope): + with fluid.program_guard(program, startup_program): + (x, y, label, loss1, loss2, w_param_attrs) = self.net1() + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(startup_program) + x_np = np.random.random(size=(10, 2)).astype('float32') + label_np = np.random.randint(1, size=(10, 1)).astype('int64') + res = exe.run(program, + feed={'x': x_np, + 'label': label_np}, + fetch_list=[loss1.name], + use_prune=False) + self.assertIsNotNone(scope.find_var(loss1.name)) + self.assertIsNotNone(scope.find_var(loss2.name)) + + def test_prune_fetches_without_optimizer(self): + """ + Prune operators and variables which are not needed to generate 'fetches'. + """ + program = framework.Program() + startup_program = framework.Program() + scope = fluid.Scope() + with fluid.scope_guard(scope): + with fluid.program_guard(program, startup_program): + (x, y, label, loss1, loss2, w_param_attrs) = self.net1() + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(startup_program) + weight_init = np.array( + scope.find_var(w_param_attrs.name).get_tensor()) + x_np = np.random.random(size=(10, 2)).astype('float32') + label_np = np.random.randint(1, size=(10, 1)).astype('int64') + res = exe.run(program, + feed={'x': x_np, + 'label': label_np}, + fetch_list=[loss1.name], + use_prune=True) + self.assertIsNotNone(scope.find_var(loss1.name)) + self.assertIsNone(scope.find_var(loss2.name)) #loss2 is pruned + weight = np.array( + scope.find_var(w_param_attrs.name).get_tensor()) + self.assertTrue(np.array_equal(weight_init, + weight)) # weight not changed + + def test_prune_fetches_with_optimizer(self): + """ + Prune operators and operators which are not needed to generate 'fetches'. + In train mode, the operators and operators in backward and optimization should be kept. + """ + program = framework.Program() + startup_program = framework.Program() + scope = fluid.Scope() + with fluid.scope_guard(scope): + with fluid.program_guard(program, startup_program): + (x, y, label, loss1, loss2, w_param_attrs) = self.net1() + sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.5) + sgd_optimizer.minimize(loss1) + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(startup_program) + weight_init = np.array( + scope.find_var(w_param_attrs.name).get_tensor()) + x_np = np.random.random(size=(10, 2)).astype('float32') + label_np = np.random.randint(1, size=(10, 1)).astype('int64') + res = exe.run(program, + feed={'x': x_np, + 'label': label_np}, + fetch_list=[loss1.name], + use_prune=True) + self.assertIsNotNone(scope.find_var(loss1.name)) + self.assertIsNone(scope.find_var(loss2.name)) #loss2 is pruned + weight = np.array( + scope.find_var(w_param_attrs.name).get_tensor()) + self.assertFalse(np.array_equal(weight_init, + weight)) # weight changed + + def test_prune_compiled_program(self): + program = framework.Program() + startup_program = framework.Program() + scope = fluid.Scope() + with fluid.scope_guard(scope): + with fluid.program_guard(program, startup_program): + (x, y, label, loss1, loss2, w_param_attrs) = self.net1() + sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.5) + sgd_optimizer.minimize(loss1) + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(startup_program) + compiled_prog = fluid.CompiledProgram( + program).with_data_parallel( + loss_name=loss1.name, places=fluid.CPUPlace()) + weight_init = np.array( + scope.find_var(w_param_attrs.name).get_tensor()) + x_np = np.random.random(size=(10, 2)).astype('float32') + label_np = np.random.randint(1, size=(10, 1)).astype('int64') + res = exe.run(compiled_prog, + feed={'x': x_np, + 'label': label_np}, + fetch_list=[loss1.name], + use_prune=True) + self.assertIsNotNone(scope.find_var(loss1.name)) + self.assertIsNone(scope.find_var(loss2.name)) + weight = np.array( + scope.find_var(w_param_attrs.name).get_tensor()) + self.assertFalse(np.array_equal(weight_init, + weight)) # weight changed + + def test_prune_feed_without_optimizer(self): + program = framework.Program() + startup_program = framework.Program() + scope = fluid.Scope() + with fluid.scope_guard(scope): + with fluid.program_guard(program, startup_program): + (x, y, label, loss1, loss2, w_param_attrs) = self.net1() + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(startup_program) + weight_init = np.array( + scope.find_var(w_param_attrs.name).get_tensor()) + x_np = np.random.random(size=(10, 2)).astype('float32') + label_np = np.random.randint(1, size=(10, 1)).astype('int64') + res = exe.run(program, + feed={y.name: x_np, + 'label': label_np}, + fetch_list=[loss1.name], + use_prune=True) + self.assertIsNotNone(scope.find_var(loss1.name)) + self.assertIsNone(scope.find_var(loss2.name)) + weight = np.array( + scope.find_var(w_param_attrs.name).get_tensor()) + self.assertTrue(np.array_equal(weight_init, + weight)) # weight unchanged + + def test_prune_feed_with_optimizer(self): + program = framework.Program() + startup_program = framework.Program() + scope = fluid.Scope() + with fluid.scope_guard(scope): + with fluid.program_guard(program, startup_program): + (x, y, label, loss1, loss2, w_param_attrs) = self.net1() + sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.5) + sgd_optimizer.minimize(loss1) + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(startup_program) + x_np = np.random.random(size=(10, 2)).astype('float32') + label_np = np.random.randint(1, size=(10, 1)).astype('int64') + self.assertRaises( + Exception, + exe.run, + program, + feed={y.name: x_np, + 'label': label_np}, + fetch_list=[loss1.name], + use_prune=True) + self.assertIsNotNone(scope.find_var(loss1.name)) + self.assertIsNone(scope.find_var(loss2.name)) + + def test_prune_with_cache_program(self): + ''' + When use_prune=True and use_program_cache=True, Executor should cache the pruned program. + If in next run, the program, feed, fetch are not changed, Executor use the cached pruned program, + and needn't to call _prune_program() to prune the program. + In this test, we hack the Executor._prune_program with a mock function which do nothing but increase + Executor.prune_called_times, and we check prune_called_times equals 1 even if we called exe.run() + 10 times with the same input arguments. + ''' + with _mock_guard(mock): + exe = fluid.Executor(fluid.CPUPlace()) + exe.prune_called_times = 0 + program = framework.Program() + startup_program = framework.Program() + scope = fluid.Scope() + with fluid.scope_guard(scope): + with fluid.program_guard(program, startup_program): + (x, y, label, loss1, loss2, w_param_attrs) = self.net1() + sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.5) + sgd_optimizer.minimize(loss1) + exe.run(startup_program) + x_np = np.random.random(size=(10, 2)).astype('float32') + label_np = np.random.randint( + 1, size=(10, 1)).astype('int64') + for i in range(10): + res = exe.run(program, + feed={'x': x_np, + 'label': label_np}, + fetch_list=[loss1.name], + use_prune=True, + use_program_cache=True) + if i == 0: + self.assertEqual(exe.prune_called_times, 1) + else: + self.assertEqual(exe.prune_called_times, 1) + + def test_prune_with_cache_compiled_program(self): + ''' + When use_prune=True and use_program_cache=True, Executor should cache the pruned program. + If in next run, the program, feed, fetch are not changed, Executor use the cached pruned program, + and needn't to call _prune_program() to prune the program. + In this test, we hack the Executor._prune_program with a mock function which do nothing but increase + Executor.prune_called_times, and we check prune_called_times equals 1 even if we called exe.run() + 10 times with the same input arguments. + ''' + with _mock_guard(mock): + exe = fluid.Executor(fluid.CPUPlace()) + exe.prune_called_times = 0 + program = framework.Program() + startup_program = framework.Program() + scope = fluid.Scope() + with fluid.scope_guard(scope): + with fluid.program_guard(program, startup_program): + (x, y, label, loss1, loss2, w_param_attrs) = self.net1() + sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.5) + sgd_optimizer.minimize(loss1) + exe.run(startup_program) + x_np = np.random.random(size=(10, 2)).astype('float32') + label_np = np.random.randint( + 1, size=(10, 1)).astype('int64') + compiled_prog = fluid.CompiledProgram( + program).with_data_parallel( + loss_name=loss1.name, places=fluid.CPUPlace()) + for i in range(10): + res = exe.run(compiled_prog, + feed={'x': x_np, + 'label': label_np}, + fetch_list=[loss1.name], + use_prune=True, + use_program_cache=True) + if i == 0: + self.assertEqual(exe.prune_called_times, 1) + else: + self.assertEqual(exe.prune_called_times, 1) + + def test_prune_with_multi_optimizers(self): + ''' + If there are multiple optimizers in the program, we can run specific one by + pass the return of optimize.minimize() to fetch_list. + ''' + exe = fluid.Executor(fluid.CPUPlace()) + program = framework.Program() + startup_program = framework.Program() + scope = fluid.Scope() + # do not use_prune + with fluid.scope_guard(scope): + with fluid.program_guard(program, startup_program): + (x, y, label, loss1, loss2, w_param_attrs) = self.net1() + sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.5) + train1, _ = sgd_optimizer.minimize(loss1) + cloned_program = program.clone() + train2, _ = sgd_optimizer.minimize(loss2) + exe.run(startup_program) + x_np = np.random.random(size=(10, 2)).astype('float32') + label_np = np.random.randint(1, size=(10, 1)).astype('int64') + res = exe.run(program, + feed={'x': x_np, + 'label': label_np}, + fetch_list=[loss1.name], + use_prune=False) + weight_without_prune = np.array( + scope.find_var(w_param_attrs.name).get_tensor()) + + scope = fluid.Scope() + # use_prune + with fluid.scope_guard(scope): + exe.run(startup_program) + res = exe.run(program, + feed={'x': x_np, + 'label': label_np}, + fetch_list=[loss1.name, train1], + use_prune=True) + weight_with_prune = np.array( + scope.find_var(w_param_attrs.name).get_tensor()) + + # expected + scope = fluid.Scope() + with fluid.scope_guard(scope): + exe.run(startup_program) + exe.run(cloned_program, + feed={'x': x_np, + 'label': label_np}, + fetch_list=[loss1.name], + use_prune=False) + weight_expected = np.array( + scope.find_var(w_param_attrs.name).get_tensor()) + + self.assertTrue(np.array_equal(weight_with_prune, weight_expected)) + self.assertFalse(np.array_equal(weight_without_prune, weight_expected)) + + def test_prune_with_multi_devices(self): + ''' + When training model with multi_devices, the pruned CompiledProgram should share same local scopes. + This test the correctness. + ''' + exe = fluid.Executor(fluid.CPUPlace()) + program = framework.Program() + startup_program = framework.Program() + scope = fluid.Scope() + os.environ['CPU_NUM'] = str(2) + # do not use_prune + with fluid.scope_guard(scope): + with fluid.program_guard(program, startup_program): + (x1, x2, y1, y2, label, loss1, loss2, w1_param_attrs, + w2_param_attrs) = self.net2() + adam_optimizer1 = fluid.optimizer.AdamOptimizer( + learning_rate=0.5) + train1 = adam_optimizer1.minimize(loss1) + cloned_program = program.clone() + adam_optimizer2 = fluid.optimizer.AdamOptimizer( + learning_rate=0.5) + train2 = adam_optimizer2.minimize(loss2) + exe.run(startup_program) + x_np = np.random.random(size=(10, 2)).astype('float32') + label_np = np.random.randint(1, size=(10, 1)).astype('int64') + compiled_prog1 = fluid.CompiledProgram( + program).with_data_parallel( + loss_name=loss1.name, places=[fluid.CPUPlace()] * 2) + compiled_prog2 = fluid.CompiledProgram( + program).with_data_parallel( + loss_name=loss2.name, places=[fluid.CPUPlace()] * 2) + for i in range(10): + if i % 2 == 1: + res = exe.run(compiled_prog1, + feed=[{ + 'x1': x_np[0:5, :], + 'label': label_np[0:5, :] + }, { + 'x1': x_np[5:, :], + 'label': label_np[5:, :] + }], + fetch_list=[loss1.name, train1], + use_prune=True) + else: + res = exe.run(compiled_prog2, + feed={'x2': x_np, + 'label': label_np}, + fetch_list=[loss2.name, train2], + use_prune=True) + weight1 = np.array( + scope.find_var(w1_param_attrs.name).get_tensor()) + # expected + scope = fluid.Scope() + with fluid.scope_guard(scope): + exe.run(startup_program) + for i in range(10): + if i % 2 == 1: + exe.run(cloned_program, + feed={'x1': x_np, + 'x2': x_np, + 'label': label_np}, + fetch_list=[loss1.name], + use_prune=False) + weight2 = np.array(scope.find_var(w1_param_attrs.name).get_tensor()) + self.assertTrue(np.allclose(weight1, weight2)) + + def test_prune_program_with_tupe_in_fetch_list(self): + ''' + If there are multiple optimizers in the program, we can run specific one by + pass the return of optimize.minimize() to fetch_list. + ''' + exe = fluid.Executor(fluid.CPUPlace()) + program = framework.Program() + startup_program = framework.Program() + scope = fluid.Scope() + # do not use_prune + with fluid.scope_guard(scope): + with fluid.program_guard(program, startup_program): + (x, y, label, loss1, loss2, w_param_attrs) = self.net1() + sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.5) + train1 = sgd_optimizer.minimize(loss1) + cloned_program = program.clone() + + train2 = sgd_optimizer.minimize(loss2) + exe.run(startup_program) + x_np = np.random.random(size=(10, 2)).astype('float32') + label_np = np.random.randint(1, size=(10, 1)).astype('int64') + + res = exe.run(program, + feed={'x': x_np, + 'label': label_np}, + fetch_list=[loss1.name], + use_prune=False) + + weight_without_prune = np.array( + scope.find_var(w_param_attrs.name).get_tensor()) + + scope = fluid.Scope() + # use_prune + with fluid.scope_guard(scope): + exe.run(startup_program) + res = exe.run(program, + feed={'x': x_np, + 'label': label_np}, + fetch_list=[loss1.name, train1], + use_prune=True) + weight_with_prune = np.array( + scope.find_var(w_param_attrs.name).get_tensor()) + + # expected + scope = fluid.Scope() + with fluid.scope_guard(scope): + exe.run(startup_program) + exe.run(cloned_program, + feed={'x': x_np, + 'label': label_np}, + fetch_list=[loss1.name], + use_prune=False) + weight_expected = np.array( + scope.find_var(w_param_attrs.name).get_tensor()) + + self.assertTrue(np.array_equal(weight_with_prune, weight_expected)) + self.assertFalse(np.array_equal(weight_without_prune, weight_expected)) + + def test_prune_program_partial_parameter_updated(self): + """ + When running startup program, all parameters declared will be initialized. + When running main program with prune=True, the pruned parameters will exist in scope and stay unchanged. + """ + program = framework.Program() + startup_program = framework.Program() + scope = fluid.Scope() + with fluid.scope_guard(scope): + with fluid.program_guard(program, startup_program): + (x1, x2, y1, y2, label, loss1, loss2, w1_param_attrs, + w2_param_attrs) = self.net2() + loss1.persistable = True + loss2.persistable = True + sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.5) + train1 = sgd_optimizer.minimize(loss1) + sgd_optimizer1 = fluid.optimizer.SGD(learning_rate=0.5) + train2 = sgd_optimizer1.minimize(loss2) + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(startup_program) + weight1_init = np.array( + scope.find_var(w1_param_attrs.name).get_tensor()) + weight2_init = np.array( + scope.find_var(w2_param_attrs.name).get_tensor()) + x_np = np.random.random(size=(10, 2)).astype('float32') + label_np = np.random.randint(1, size=(10, 1)).astype('int64') + + res = exe.run(program, + feed={'x1': x_np, + 'label': label_np}, + fetch_list=[loss1.name, train1], + use_prune=True) + self.assertIsNotNone(scope.find_var(w1_param_attrs.name)) + self.assertIsNotNone(scope.find_var(w2_param_attrs.name)) + self.assertIsNotNone(scope.find_var(loss1.name)) + self.assertIsNone(scope.find_var(loss2.name)) + weight1 = np.array( + scope.find_var(w1_param_attrs.name).get_tensor()) + weight2 = np.array( + scope.find_var(w2_param_attrs.name).get_tensor()) + self.assertFalse(np.array_equal(weight1_init, + weight1)) # weight changed + self.assertTrue(np.array_equal(weight2_init, + weight2)) # weight2 unchanged + + def test_prune_override_use_prune(self): + ''' + If optimize_ops in provided in the fetch_list, the argument use_prune is always override to True. + ''' + exe = fluid.Executor(fluid.CPUPlace()) + program = framework.Program() + startup_program = framework.Program() + scope = fluid.Scope() + # do not use_prune + with fluid.scope_guard(scope): + with fluid.program_guard(program, startup_program): + (x, y, label, loss1, loss2, w_param_attrs) = self.net1() + sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.5) + train1, _ = sgd_optimizer.minimize(loss1) + cloned_program = program.clone() + train2, _ = sgd_optimizer.minimize(loss2) + exe.run(startup_program) + x_np = np.random.random(size=(10, 2)).astype('float32') + label_np = np.random.randint(1, size=(10, 1)).astype('int64') + res = exe.run(program, + feed={'x': x_np, + 'label': label_np}, + fetch_list=[loss1.name], + use_prune=False) + + weight_without_prune = np.array( + scope.find_var(w_param_attrs.name).get_tensor()) + + scope = fluid.Scope() + # use_prune + with fluid.scope_guard(scope): + exe.run(startup_program) + res = exe.run(program, + feed={'x': x_np, + 'label': label_np}, + fetch_list=[loss1.name, train1]) + weight_with_prune = np.array( + scope.find_var(w_param_attrs.name).get_tensor()) + + # expected + scope = fluid.Scope() + with fluid.scope_guard(scope): + exe.run(startup_program) + exe.run(cloned_program, + feed={'x': x_np, + 'label': label_np}, + fetch_list=[loss1.name], + use_prune=False) + weight_expected = np.array( + scope.find_var(w_param_attrs.name).get_tensor()) + + self.assertTrue(np.array_equal(weight_with_prune, weight_expected)) + self.assertFalse(np.array_equal(weight_without_prune, weight_expected)) + + if __name__ == '__main__': unittest.main()